From a4d8ded6ceaff8aa89f2fa06e8ca9fd4645728fa Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Mon, 3 Jan 2022 11:20:08 +0200 Subject: [PATCH] Add string_family_test.cc unit test Add unit test helper code that we will leverage for writing more unit tests. Numerous fixes in the transactional code. Allow controlling of expiration clock in unit-test environment with configurable task synchronization. The latter is enabled for prod and disable in the tests. --- helio | 2 +- server/CMakeLists.txt | 1 + server/conn_context.h | 7 ++ server/engine_shard_set.cc | 82 +++++++------- server/engine_shard_set.h | 27 +++-- server/main_service.cc | 23 +++- server/main_service.h | 9 +- server/reply_builder.cc | 3 - server/string_family_test.cc | 130 +++++++++++++++++++++ server/test_utils.cc | 153 ++++++++++++++++++++++++- server/test_utils.h | 54 +++++++++ server/transaction.cc | 214 ++++++++++++++++++++--------------- server/transaction.h | 106 +++++++++-------- 13 files changed, 605 insertions(+), 206 deletions(-) create mode 100644 server/string_family_test.cc diff --git a/helio b/helio index dab0fc768..b4bfdd0b8 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit dab0fc768494d8e7cc42d77b08fbee1a81415072 +Subproject commit b4bfdd0b8e67b293e98d5d3ce92c60ed05646b61 diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index b502ca915..99dcb95fe 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -14,4 +14,5 @@ add_library(dfly_test_lib test_utils.cc) cxx_link(dfly_test_lib dragonfly_lib gtest_main_ext) cxx_test(redis_parser_test dfly_test_lib LABELS DFLY) +cxx_test(string_family_test dfly_test_lib LABELS DFLY) cxx_test(memcache_parser_test dfly_test_lib LABELS DFLY) diff --git a/server/conn_context.h b/server/conn_context.h index fe53b75fd..7220e8ea1 100644 --- a/server/conn_context.h +++ b/server/conn_context.h @@ -36,6 +36,13 @@ class ConnectionContext : public ReplyBuilder { public: ConnectionContext(::io::Sink* stream, Connection* owner); + struct DebugInfo { + uint32_t shards_count = 0; + TxClock clock = 0; + }; + + DebugInfo last_command_debug; + // TODO: to introduce proper accessors. Transaction* transaction = nullptr; const CommandId* cid = nullptr; diff --git a/server/engine_shard_set.cc b/server/engine_shard_set.cc index aecbad860..98d05a2b1 100644 --- a/server/engine_shard_set.cc +++ b/server/engine_shard_set.cc @@ -19,19 +19,22 @@ namespace fibers = ::boost::fibers; thread_local EngineShard* EngineShard::shard_ = nullptr; constexpr size_t kQueueLen = 64; -EngineShard::EngineShard(util::ProactorBase* pb) - : queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), db_slice_(pb->GetIndex(), this) { +EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time) + : queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), + db_slice_(pb->GetIndex(), this) { fiber_q_ = fibers::fiber([this, index = pb->GetIndex()] { this_fiber::properties().set_name(absl::StrCat("shard_queue", index)); queue_.Run(); }); - periodic_task_ = pb->AddPeriodic(1, [] { - auto* shard = EngineShard::tlocal(); - DCHECK(shard); - // absl::GetCurrentTimeNanos() returns current time since the Unix Epoch. - shard->db_slice().UpdateExpireClock(absl::GetCurrentTimeNanos() / 1000000); - }); + if (update_db_time) { + periodic_task_ = pb->AddPeriodic(1, [] { + auto* shard = EngineShard::tlocal(); + DCHECK(shard); + // absl::GetCurrentTimeNanos() returns current time since the Unix Epoch. + shard->db_slice().UpdateExpireClock(absl::GetCurrentTimeNanos() / 1000000); + }); + } tmp_str = sdsempty(); } @@ -45,9 +48,9 @@ EngineShard::~EngineShard() { } } -void EngineShard::InitThreadLocal(ProactorBase* pb) { +void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) { CHECK(shard_ == nullptr) << pb->GetIndex(); - shard_ = new EngineShard(pb); + shard_ = new EngineShard(pb, update_db_time); } void EngineShard::DestroyThreadLocal() { @@ -61,33 +64,29 @@ void EngineShard::DestroyThreadLocal() { VLOG(1) << "Shard reset " << index; } - -void EngineShard::RunContinuationTransaction() { - auto sid = shard_id(); - - if (continuation_trans_->IsArmedInShard(sid)) { - bool to_keep = continuation_trans_->RunInShard(sid); - DVLOG(1) << "RunContTransaction " << continuation_trans_->DebugId() << " keep: " << to_keep; - if (!to_keep) { - continuation_trans_ = nullptr; - } - } -} - // Is called by Transaction::ExecuteAsync in order to run transaction tasks. // Only runs in its own thread. -void EngineShard::Execute(Transaction* trans) { +void EngineShard::PollExecution(Transaction* trans) { + DVLOG(1) << "PollExecution " << (trans ? trans->DebugId() : ""); ShardId sid = shard_id(); if (continuation_trans_) { if (trans == continuation_trans_) trans = nullptr; - RunContinuationTransaction(); - // Once we start executing transaction we do not continue until it's finished. - // This preserves atomicity property of multi-hop transactions. - if (continuation_trans_) + if (continuation_trans_->IsArmedInShard(sid)) { + bool to_keep = continuation_trans_->RunInShard(this); + DVLOG(1) << "RunContTrans: " << continuation_trans_->DebugId() << " keep: " << to_keep; + if (!to_keep) { + continuation_trans_ = nullptr; + } + } + + if (continuation_trans_) { + // Once we start executing transaction we do not continue until it's finished. + // This preserves atomicity property of multi-hop transactions. return; + } } DCHECK(!continuation_trans_); @@ -98,6 +97,8 @@ void EngineShard::Execute(Transaction* trans) { auto val = txq_.Front(); head = absl::get(val); + // The fact that Tx is in the queue, already means that coordinator fiber will not progress, + // hence here it's enough to test for run_count and check local_mask. bool is_armed = head->IsArmedInShard(sid); if (!is_armed) break; @@ -112,16 +113,18 @@ void EngineShard::Execute(Transaction* trans) { trans = nullptr; TxId txid = head->txid(); - // Could be equal to ts in case the same transaction had few hops. - DCHECK_LE(committed_txid_, txid); + DCHECK_LT(committed_txid_, txid); - // We update committed_ts_ before calling Run() to avoid cases where a late transaction might - // try to push back this one. + // We update committed_txid_ before calling RunInShard() to avoid cases where + // a transaction stalls the execution with IO while another fiber queries this shard for + // committed_txid_ (for example during the scheduling). committed_txid_ = txid; if (VLOG_IS_ON(2)) { dbg_id = head->DebugId(); } - bool keep = head->RunInShard(sid); + bool keep = head->RunInShard(this); + DCHECK(head == absl::get(txq_.Front())); + // We should not access head from this point since RunInShard callback decrements refcount. DLOG_IF(INFO, !dbg_id.empty()) << "RunHead " << dbg_id << ", keep " << keep; txq_.PopFront(); @@ -135,13 +138,12 @@ void EngineShard::Execute(Transaction* trans) { if (!trans) return; - if (txq_.Empty()) - return; + uint16_t local_mask = trans->GetLocalMask(sid); // If trans is out of order, i.e. locks keys that previous transactions have not locked. // It may be that there are other transactions that touch those keys but they necessary ordered // after trans in the queue, hence it's safe to run trans out of order. - if (trans->IsOutOfOrder() && trans->IsArmedInShard(sid)) { + if (local_mask & Transaction::OUT_OF_ORDER) { DCHECK(trans != head); dbg_id.clear(); @@ -151,7 +153,7 @@ void EngineShard::Execute(Transaction* trans) { dbg_id = trans->DebugId(); } - bool keep = trans->RunInShard(sid); // resets TxQueuePos, this is why we get it before. + bool keep = trans->RunInShard(this); // resets TxQueuePos, this is why we get it before. DLOG_IF(INFO, !dbg_id.empty()) << "Eager run " << sid << ", " << dbg_id << ", keep " << keep; // Should be enforced via Schedule(). TODO: to remove the check once the code is mature. @@ -166,10 +168,10 @@ void EngineShardSet::Init(uint32_t sz) { shard_queue_.resize(sz); } -void EngineShardSet::InitThreadLocal(ProactorBase* pb) { - EngineShard::InitThreadLocal(pb); +void EngineShardSet::InitThreadLocal(ProactorBase* pb, bool update_db_time) { + EngineShard::InitThreadLocal(pb, update_db_time); EngineShard* es = EngineShard::tlocal(); - shard_queue_[es->shard_id()] = es->GetQueue(); + shard_queue_[es->shard_id()] = es->GetFiberQueue(); } } // namespace dfly diff --git a/server/engine_shard_set.h b/server/engine_shard_set.h index b9a4c7c79..0619b17e7 100644 --- a/server/engine_shard_set.h +++ b/server/engine_shard_set.h @@ -23,7 +23,10 @@ class EngineShard { // EngineShard() is private down below. ~EngineShard(); - static void InitThreadLocal(util::ProactorBase* pb); + // Sets up a new EngineShard in the thread. + // If update_db_time is true, initializes periodic time update for its db_slice. + static void InitThreadLocal(util::ProactorBase* pb, bool update_db_time); + static void DestroyThreadLocal(); static EngineShard* tlocal() { @@ -42,12 +45,13 @@ class EngineShard { return db_slice_; } - ::util::fibers_ext::FiberQueue* GetQueue() { + ::util::fibers_ext::FiberQueue* GetFiberQueue() { return &queue_; } - // Executes a transaction. This transaction is pending in the queue. - void Execute(Transaction* trans); + // Processes TxQueue, blocked transactions or any other execution state related to that + // shard. Tries executing the passed transaction if possible (does not guarantee though). + void PollExecution(Transaction* trans); // Returns transaction queue. TxQueue* txq() { @@ -65,9 +69,7 @@ class EngineShard { sds tmp_str; private: - EngineShard(util::ProactorBase* pb); - - void RunContinuationTransaction(); + EngineShard(util::ProactorBase* pb, bool update_db_time); ::util::fibers_ext::FiberQueue queue_; ::boost::fibers::fiber fiber_q_; @@ -98,22 +100,25 @@ class EngineShardSet { } void Init(uint32_t size); - void InitThreadLocal(util::ProactorBase* pb); + void InitThreadLocal(util::ProactorBase* pb, bool update_db_time); + // Uses a shard queue to dispatch. Callback runs in a dedicated fiber. template auto Await(ShardId sid, F&& f) { return shard_queue_[sid]->Await(std::forward(f)); } + // Uses a shard queue to dispatch. Callback runs in a dedicated fiber. template auto Add(ShardId sid, F&& f) { assert(sid < shard_queue_.size()); return shard_queue_[sid]->Add(std::forward(f)); } - // Runs a brief function on all shards. + // Runs a brief function on all shards. Waits for it to complete. template void RunBriefInParallel(U&& func) { RunBriefInParallel(std::forward(func), [](auto i) { return true; }); } + // Runs a brief function on selected shards. Waits for it to complete. template void RunBriefInParallel(U&& func, P&& pred); template void RunBlockingInParallel(U&& func); @@ -145,8 +150,8 @@ template void EngineShardSet::RunBlockingInParallel(U&& func) { for (uint32_t i = 0; i < size(); ++i) { util::ProactorBase* dest = pp_->at(i); - dest->AsyncFiber([f = std::forward(func), bc]() mutable { - f(EngineShard::tlocal()); + dest->AsyncFiber([func, bc]() mutable { + func(EngineShard::tlocal()); bc.Dec(); }); } diff --git a/server/main_service.cc b/server/main_service.cc index 5994038dc..40cfe5695 100644 --- a/server/main_service.cc +++ b/server/main_service.cc @@ -4,6 +4,10 @@ #include "server/main_service.h" +extern "C" { + #include "redis/redis_aux.h" +} + #include #include @@ -42,24 +46,32 @@ DEFINE_VARZ(VarzQps, ping_qps); std::optional engine_varz; metrics::CounterFamily cmd_req("requests_total", "Number of served redis requests"); +constexpr size_t kMaxThreadSize = 1024; + } // namespace Service::Service(ProactorPool* pp) : shard_set_(pp), pp_(*pp) { CHECK(pp); + + // We support less than 1024 threads. + CHECK_LT(pp->size(), kMaxThreadSize); RegisterCommands(); + engine_varz.emplace("engine", [this] { return GetVarzStats(); }); } Service::~Service() { } -void Service::Init(util::AcceptServer* acceptor) { +void Service::Init(util::AcceptServer* acceptor, const InitOpts& opts) { + InitRedisTables(); + uint32_t shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size(); shard_set_.Init(shard_num); pp_.AwaitOnAll([&](uint32_t index, ProactorBase* pb) { if (index < shard_count()) { - shard_set_.InitThreadLocal(pb); + shard_set_.InitThreadLocal(pb, !opts.disable_time_update); } }); @@ -78,6 +90,7 @@ void Service::Shutdown() { ping_qps.Shutdown(); StringFamily::Shutdown(); GenericFamily::Shutdown(); + cmd_req.Shutdown(); shard_set_.RunBlockingInParallel([&](EngineShard*) { EngineShard::DestroyThreadLocal(); }); } @@ -115,7 +128,8 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) { cntx->transaction = dist_trans.get(); if (cid->first_key_pos() > 0) { - dist_trans->InitByArgs(args); + dist_trans->InitByArgs(cntx->conn_state.db_index, args); + cntx->last_command_debug.shards_count = cntx->transaction->unique_shard_cnt(); } } else { cntx->transaction = nullptr; @@ -127,6 +141,9 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) { end_usec = ProactorBase::GetMonotonicTimeNs(); request_latency_usec.IncBy(cmd_str, (end_usec - start_usec) / 1000); + if (dist_trans) { + cntx->last_command_debug.clock = dist_trans->txid(); + } } void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, diff --git a/server/main_service.h b/server/main_service.h index d182c45f9..a26806620 100644 --- a/server/main_service.h +++ b/server/main_service.h @@ -20,12 +20,19 @@ class Service { public: using error_code = std::error_code; + struct InitOpts { + bool disable_time_update; + + InitOpts() : disable_time_update{false} { + } + }; + explicit Service(util::ProactorPool* pp); ~Service(); void RegisterHttp(util::HttpListenerBase* listener); - void Init(util::AcceptServer* acceptor); + void Init(util::AcceptServer* acceptor, const InitOpts& opts = InitOpts{}); void Shutdown(); diff --git a/server/reply_builder.cc b/server/reply_builder.cc index c4e9e18b9..e80154576 100644 --- a/server/reply_builder.cc +++ b/server/reply_builder.cc @@ -151,9 +151,6 @@ void ReplyBuilder::SendError(OpStatus status) { case OpStatus::OK: SendOk(); break; - case OpStatus::KEY_NOTFOUND: - SendError("no such key"); - break; default: LOG(ERROR) << "Unsupported status " << status; SendError("Internal error"); diff --git a/server/string_family_test.cc b/server/string_family_test.cc new file mode 100644 index 000000000..13d8e4de2 --- /dev/null +++ b/server/string_family_test.cc @@ -0,0 +1,130 @@ +// Copyright 2021, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/string_family.h" + +#include "base/gtest.h" +#include "base/logging.h" +#include "server/command_registry.h" +#include "server/conn_context.h" +#include "server/engine_shard_set.h" +#include "server/error.h" +#include "server/test_utils.h" +#include "server/transaction.h" +#include "util/uring/uring_pool.h" + +using namespace testing; +using namespace std; +using namespace util; +using absl::StrCat; + +namespace dfly { + +class StringFamilyTest : public BaseFamilyTest { + protected: +}; + +TEST_F(StringFamilyTest, SetGet) { + auto resp = Run({"set", "key", "val"}); + + EXPECT_THAT(resp, RespEq("OK")); + EXPECT_THAT(Run({"get", "key"}), RespEq("val")); + EXPECT_THAT(Run({"set", "key1", "1"}), RespEq("OK")); + EXPECT_THAT(Run({"get", "key1"}), RespEq("1")); + EXPECT_THAT(Run({"set", "key", "2"}), RespEq("OK")); + EXPECT_THAT(Run({"get", "key"}), RespEq("2")); +} + +TEST_F(StringFamilyTest, Expire) { + constexpr uint64_t kNow = 232279092000; + + UpdateTime(kNow); + ASSERT_THAT(Run({"set", "key", "val", "PX", "20"}), RespEq("OK")); + + UpdateTime(kNow + 10); + EXPECT_THAT(Run({"get", "key"}), RespEq("val")); + + UpdateTime(kNow + 20); + + EXPECT_THAT(Run({"get", "key"}), ElementsAre(ArgType(RespExpr::NIL))); + + ASSERT_THAT(Run({"set", "i", "1", "PX", "10"}), RespEq("OK")); +} + +TEST_F(StringFamilyTest, Set) { + auto resp = Run({"set", "foo", "bar", "XX"}); + EXPECT_THAT(resp, ElementsAre(ArgType(RespExpr::NIL))); + + resp = Run({"set", "foo", "bar", "NX"}); + ASSERT_THAT(resp, RespEq("OK")); + resp = Run({"set", "foo", "bar", "NX"}); + EXPECT_THAT(resp, ElementsAre(ArgType(RespExpr::NIL))); + + resp = Run({"set", "foo", "bar", "xx"}); + ASSERT_THAT(resp, RespEq("OK")); + + resp = Run({"set", "foo", "bar", "ex", "1"}); + ASSERT_THAT(resp, RespEq("OK")); +} + +TEST_F(StringFamilyTest, MGetSet) { + Run({"mset", "x", "0", "b", "0"}); + + ASSERT_EQ(2, GetDebugInfo("IO0").shards_count); + + auto mget_fb = pp_->at(0)->LaunchFiber([&] { + for (size_t i = 0; i < 1000; ++i) { + auto resp = Run({"mget", "b", "x"}); + ASSERT_EQ(2, resp.size()); + auto ivec = ToIntArr(resp); + + ASSERT_GE(ivec[1], ivec[0]); + } + }); + + auto set_fb = pp_->at(1)->LaunchFiber([&] { + for (size_t i = 1; i < 2000; ++i) { + Run({"set", "x", StrCat(i)}); + Run({"set", "b", StrCat(i)}); + } + }); + + mget_fb.join(); + set_fb.join(); +} + +TEST_F(StringFamilyTest, IntKey) { + Run({"mset", "1", "1", "-1000", "-1000"}); + auto resp = Run({"get", "1"}); + ASSERT_THAT(resp, RespEq("1")); +} + +TEST_F(StringFamilyTest, SingleShard) { + Run({"mset", "x", "1", "y", "1"}); + ASSERT_EQ(1, GetDebugInfo("IO0").shards_count); + + Run({"mget", "x", "y", "b"}); + ASSERT_EQ(2, GetDebugInfo("IO0").shards_count); + + auto resp = Run({"mget", "x", "y"}); + ASSERT_EQ(1, GetDebugInfo("IO0").shards_count); + ASSERT_THAT(ToIntArr(resp), ElementsAre(1, 1)); + + auto mset_fb = pp_->at(0)->LaunchFiber([&] { + for (size_t i = 0; i < 100; ++i) { + Run({"mset", "x", "0", "y", "0"}); + } + }); + + // Specially multiple shards to avoid fast-path. + auto mget_fb = pp_->at(1)->LaunchFiber([&] { + for (size_t i = 0; i < 100; ++i) { + Run({"mget", "x", "b", "y"}); + } + }); + mset_fb.join(); + mget_fb.join(); +} + +} // namespace dfly diff --git a/server/test_utils.cc b/server/test_utils.cc index b06d728ec..5be31ef45 100644 --- a/server/test_utils.cc +++ b/server/test_utils.cc @@ -7,6 +7,7 @@ #include #include "base/logging.h" +#include "server/dragonfly_connection.h" #include "util/uring/uring_pool.h" namespace dfly { @@ -29,7 +30,7 @@ bool RespMatcher::MatchAndExplain(const RespExpr& e, MatchResultListener* listen *listener << "Actual does not contain '" << exp_str_ << "'"; return false; } - if (type_ == RespExpr::STRING && exp_str_ != actual) { + if (type_ == RespExpr::STRING && exp_str_ != actual) { *listener << "\nActual string: " << actual; return false; } @@ -111,4 +112,154 @@ vector ToIntArr(const RespVec& vec) { return res; } +BaseFamilyTest::TestConn::TestConn() + : dummy_conn(new Connection(Protocol::REDIS, nullptr, nullptr)), + cmd_cntx(&sink, dummy_conn.get()) { +} + +BaseFamilyTest::TestConn::~TestConn() { +} + +BaseFamilyTest::BaseFamilyTest() { +} + +BaseFamilyTest::~BaseFamilyTest() { +} + +void BaseFamilyTest::SetUp() { + pp_.reset(new uring::UringPool(16, num_threads_)); + pp_->Run(); + service_.reset(new Service{pp_.get()}); + + Service::InitOpts opts; + opts.disable_time_update = true; + service_->Init(nullptr, opts); + ess_ = &service_->shard_set(); + + const TestInfo* const test_info = UnitTest::GetInstance()->current_test_info(); + LOG(INFO) << "Starting " << test_info->name(); +} + +void BaseFamilyTest::TearDown() { + service_->Shutdown(); + service_.reset(); + pp_->Stop(); + + const TestInfo* const test_info = UnitTest::GetInstance()->current_test_info(); + LOG(INFO) << "Finishing " << test_info->name(); +} + +// ts is ms +void BaseFamilyTest::UpdateTime(uint64_t ms) { + auto cb = [ms](EngineShard* s) { s->db_slice().UpdateExpireClock(ms); }; + ess_->RunBriefInParallel(cb); +} + +RespVec BaseFamilyTest::Run(initializer_list list) { + if (!ProactorBase::IsProactorThread()) { + return pp_->at(0)->AwaitBlocking([&] { return this->Run(list); }); + } + + mu_.lock(); + string id = GetId(); + auto [it, inserted] = connections_.emplace(id, nullptr); + + if (inserted) { + it->second.reset(new TestConn); + } else { + it->second->sink.Clear(); + } + TestConn* conn = it->second.get(); + mu_.unlock(); + + CmdArgVec args = conn->Args(list); + CmdArgList cmd_arg_list{args.data(), args.size()}; + auto& context = conn->cmd_cntx; + context.shard_set = ess_; + + service_->DispatchCommand(cmd_arg_list, &context); + + unique_lock lk(mu_); + last_cmd_dbg_info_ = context.last_command_debug; + + RespVec vec = conn->ParseResp(); + if (vec.size() == 1) { + auto buf = vec.front().GetBuf(); + if (!buf.empty() && buf[0] == '+') { + buf.remove_prefix(1); + std::get(vec.front().u) = buf; + } + } + + return vec; +} + +int64_t BaseFamilyTest::CheckedInt(std::initializer_list list) { + RespVec resp = Run(list); + CHECK_EQ(1u, resp.size()); + if (resp.front().type == RespExpr::INT64) { + return get(resp.front().u); + } + if (resp.front().type == RespExpr::NIL) { + return INT64_MIN; + } + CHECK_EQ(RespExpr::STRING, int(resp.front().type)); + string_view sv = ToSV(resp.front().GetBuf()); + int64_t res; + CHECK(absl::SimpleAtoi(sv, &res)) << "|" << sv << "|"; + return res; +} + +CmdArgVec BaseFamilyTest::TestConn::Args(std::initializer_list list) { + CHECK_NE(0u, list.size()); + + CmdArgVec res; + for (auto v : list) { + tmp_str_vec.emplace_back(new string{v}); + auto& s = *tmp_str_vec.back(); + + res.emplace_back(s.data(), s.size()); + } + + return res; +} + +RespVec BaseFamilyTest::TestConn::ParseResp() { + tmp_str_vec.emplace_back(new string{sink.str()}); + auto& s = *tmp_str_vec.back(); + auto buf = RespExpr::buffer(&s); + uint32_t consumed = 0; + + parser.reset(new RedisParser); + RespVec res; + RedisParser::Result st = parser->Parse(buf, &consumed, &res); + CHECK_EQ(RedisParser::OK, st); + + return res; +} + +bool BaseFamilyTest::IsLocked(DbIndex db_index, std::string_view key) const { + ShardId sid = Shard(key, ess_->size()); + KeyLockArgs args; + args.db_index = db_index; + args.args = ArgSlice{&key, 1}; + args.key_step = 1; + bool is_open = pp_->at(sid)->AwaitBrief( + [args] { return EngineShard::tlocal()->db_slice().CheckLock(IntentLock::EXCLUSIVE, args); }); + return !is_open; +} + +string BaseFamilyTest::GetId() const { + int32 id = ProactorBase::GetIndex(); + CHECK_GE(id, 0); + return absl::StrCat("IO", id); +} + +ConnectionContext::DebugInfo BaseFamilyTest::GetDebugInfo(const std::string& id) const { + auto it = connections_.find(id); + CHECK(it != connections_.end()); + + return it->second->cmd_cntx.last_command_debug; +} + } // namespace dfly diff --git a/server/test_utils.h b/server/test_utils.h index 28ee1221d..c6a3a3b54 100644 --- a/server/test_utils.h +++ b/server/test_utils.h @@ -7,6 +7,8 @@ #include #include "io/io.h" +#include "server/conn_context.h" +#include "server/main_service.h" #include "server/redis_parser.h" #include "util/proactor_pool.h" @@ -83,4 +85,56 @@ MATCHER_P(RespEq, val, "") { return ::testing::ExplainMatchResult(::testing::ElementsAre(StrArg(val)), arg, result_listener); } +std::vector ToIntArr(const RespVec& vec); + +class BaseFamilyTest : public ::testing::Test { + protected: + BaseFamilyTest(); + ~BaseFamilyTest(); + + void SetUp() override; + void TearDown() override; + + protected: + RespVec Run(std::initializer_list list); + int64_t CheckedInt(std::initializer_list list); + + bool IsLocked(DbIndex db_index, std::string_view key) const; + ConnectionContext::DebugInfo GetDebugInfo(const std::string& id) const; + + ConnectionContext::DebugInfo GetDebugInfo() const { + return GetDebugInfo("IO0"); + } + + // ts is ms + void UpdateTime(uint64_t ms); + std::string GetId() const; + + std::unique_ptr pp_; + std::unique_ptr service_; + EngineShardSet* ess_ = nullptr; + unsigned num_threads_ = 3; + + struct TestConn { + io::StringSink sink; + std::unique_ptr dummy_conn; + + ConnectionContext cmd_cntx; + std::vector> tmp_str_vec; + + std::unique_ptr parser; + + TestConn(); + ~TestConn(); + + CmdArgVec Args(std::initializer_list list); + + RespVec ParseResp(); + }; + + absl::flat_hash_map> connections_; + ::boost::fibers::mutex mu_; + ConnectionContext::DebugInfo last_cmd_dbg_info_; +}; + } // namespace dfly diff --git a/server/transaction.cc b/server/transaction.cc index 1feb3a466..620a1abcb 100644 --- a/server/transaction.cc +++ b/server/transaction.cc @@ -22,10 +22,8 @@ std::atomic_uint64_t op_seq{1}; constexpr size_t kTransSize = sizeof(Transaction); - } // namespace - IntentLock::Mode Transaction::Mode() const { return (trans_options_ & CO::READONLY) ? IntentLock::SHARED : IntentLock::EXCLUSIVE; } @@ -46,10 +44,10 @@ Transaction::Transaction(const CommandId* cid, EngineShardSet* ess) : cid_(cid), bool single_key = cid_->first_key_pos() > 0 && !cid_->is_multi_key(); if (single_key) { - dist_.shard_data.resize(1); // Single key optimization + shard_data_.resize(1); // Single key optimization } else { // Our shard_data is not sparse, so we must allocate for all threads :( - dist_.shard_data.resize(ess_->size()); + shard_data_.resize(ess_->size()); } } @@ -73,18 +71,18 @@ Transaction::Transaction(const CommandId* cid, EngineShardSet* ess) : cid_(cid), * **/ -void Transaction::InitByArgs(CmdArgList args) { +void Transaction::InitByArgs(DbIndex index, CmdArgList args) { CHECK_GT(args.size(), 1U); CHECK_LT(size_t(cid_->first_key_pos()), args.size()); DCHECK_EQ(unique_shard_cnt_, 0u); + db_index_ = index; if (!cid_->is_multi_key()) { // Single key optimization. auto key = ArgS(args, cid_->first_key_pos()); args_.push_back(key); unique_shard_cnt_ = 1; unique_shard_id_ = Shard(key, ess_->size()); - num_keys_ = 1; return; } @@ -93,7 +91,7 @@ void Transaction::InitByArgs(CmdArgList args) { // Reuse thread-local temporary storage. Since this code is non-preemptive we can use it here. auto& shard_index = tmp_space.shard_cache; - shard_index.resize(dist_.shard_data.size()); + shard_index.resize(shard_data_.size()); for (auto& v : shard_index) { v.Clear(); } @@ -102,10 +100,9 @@ void Transaction::InitByArgs(CmdArgList args) { : (args.size() + 1 + cid_->last_key_pos()); for (size_t i = 1; i < key_end; ++i) { std::string_view key = ArgS(args, i); - uint32_t sid = Shard(key, dist_.shard_data.size()); + uint32_t sid = Shard(key, shard_data_.size()); shard_index[sid].args.push_back(key); shard_index[sid].original_index.push_back(i - 1); - ++num_keys_; if (cid_->key_arg_step() == 2) { // value ++i; @@ -116,15 +113,15 @@ void Transaction::InitByArgs(CmdArgList args) { } args_.resize(key_end - 1); - dist_.reverse_index.resize(args_.size()); + reverse_index_.resize(args_.size()); auto next_arg = args_.begin(); - auto rev_indx_it = dist_.reverse_index.begin(); + auto rev_indx_it = reverse_index_.begin(); // slice.arg_start/arg_count point to args_ array which is sorted according to shard of each key. // reverse_index_[i] says what's the original position of args_[i] in args. - for (size_t i = 0; i < dist_.shard_data.size(); ++i) { - auto& sd = dist_.shard_data[i]; + for (size_t i = 0; i < shard_data_.size(); ++i) { + auto& sd = shard_data_[i]; auto& si = shard_index[i]; CHECK_LT(si.args.size(), 1u << 15); sd.arg_count = si.args.size(); @@ -152,14 +149,14 @@ void Transaction::InitByArgs(CmdArgList args) { if (unique_shard_cnt_ == 1) { PerShardData* sd; - dist_.shard_data.resize(1); - sd = &dist_.shard_data.front(); + shard_data_.resize(1); + sd = &shard_data_.front(); sd->arg_count = -1; sd->arg_start = -1; } // Validation. - for (const auto& sd : dist_.shard_data) { + for (const auto& sd : shard_data_) { DCHECK_EQ(sd.local_mask, 0u); DCHECK_EQ(0, sd.local_mask & ARMED); DCHECK_EQ(TxQueue::kEnd, sd.pq_pos); @@ -171,37 +168,29 @@ string Transaction::DebugId() const { } // Runs in the dbslice thread. Returns true if transaction needs to be kept in the queue. -bool Transaction::RunInShard(ShardId sid) { - CHECK(cb_); +bool Transaction::RunInShard(EngineShard* shard) { + DCHECK_GT(run_count_.load(memory_order_relaxed), 0u); + CHECK(cb_) << DebugId(); DCHECK_GT(txid_, 0u); - EngineShard* shard = EngineShard::tlocal(); - // Unlike with regular transactions we do not acquire locks upon scheduling // because Scheduling is done before multi-exec batch is executed. Therefore we // lock keys right before the execution of each statement. - DVLOG(1) << "RunInShard: " << DebugId() << " sid:" << sid; + DVLOG(1) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id(); + + unsigned idx = SidToId(shard->shard_id()); + auto& sd = shard_data_[idx]; - sid = SidToId(sid); - auto& sd = dist_.shard_data[sid]; DCHECK(sd.local_mask & ARMED); sd.local_mask &= ~ARMED; - bool concluding = dist_.is_concluding_cb; - DCHECK(sd.local_mask & KEYS_ACQUIRED); + /*************************************************************************/ // Actually running the callback. OpStatus status = cb_(this, shard); - - // If it's a final hop we should release the locks. - if (concluding) { - - auto largs = GetLockArgs(sid); - shard->db_slice().Release(Mode(), largs); - sd.local_mask &= ~KEYS_ACQUIRED; - } + /*************************************************************************/ if (unique_shard_cnt_ == 1) { cb_ = nullptr; // We can do it because only a single thread runs the callback. @@ -210,28 +199,27 @@ bool Transaction::RunInShard(ShardId sid) { CHECK_EQ(OpStatus::OK, status); } - // This shard should own a reference for transaction as well as coordinator thread. - DCHECK_GT(use_count(), 1u); - CHECK_GE(DecreaseRunCnt(), 1u); + // at least the coordinator thread owns the reference. + DCHECK_GE(use_count(), 1u); - // must be computed before intrusive_ptr_release call. - if (concluding) { - sd.pq_pos = TxQueue::kEnd; - // For multi-transaction we need to clear this flag to allow locking of the next set of keys - // during the next child transaction. + // If it's a final hop we should release the locks. + if (is_concluding_cb_) { + KeyLockArgs largs = GetLockArgs(idx); + + shard->db_slice().Release(Mode(), largs); sd.local_mask &= ~KEYS_ACQUIRED; - DVLOG(2) << "ptr_release " << DebugId() << " " << this->use_count(); - - intrusive_ptr_release(this); // Against ScheduleInternal. } - return !concluding; // keep + CHECK_GE(DecreaseRunCnt(), 1u); + + return !is_concluding_cb_; // keep } void Transaction::ScheduleInternal(bool single_hop) { DCHECK_EQ(0, state_mask_.load(memory_order_acquire) & SCHEDULED); DCHECK_EQ(0u, txid_); + bool out_of_order = false; uint32_t num_shards; std::function is_active; @@ -239,22 +227,19 @@ void Transaction::ScheduleInternal(bool single_hop) { DCHECK_GT(num_shards, 0u); is_active = [&](uint32_t i) { - return num_shards == 1 ? (i == unique_shard_id_) : dist_.shard_data[i].arg_count > 0; + return num_shards == 1 ? (i == unique_shard_id_) : shard_data_[i].arg_count > 0; }; - // intrusive_ptr_add num_shards times. - use_count_.fetch_add(num_shards, memory_order_relaxed); - while (true) { txid_ = op_seq.fetch_add(1, std::memory_order_relaxed); - std::atomic_uint32_t lock_acquire_cnt{0}; + std::atomic_uint32_t lock_granted_cnt{0}; std::atomic_uint32_t success{0}; auto cb = [&](EngineShard* shard) { pair res = ScheduleInShard(shard); success.fetch_add(res.first, memory_order_relaxed); - lock_acquire_cnt.fetch_add(res.second, memory_order_relaxed); + lock_granted_cnt.fetch_add(res.second, memory_order_relaxed); }; ess_->RunBriefInParallel(std::move(cb), is_active); @@ -263,10 +248,10 @@ void Transaction::ScheduleInternal(bool single_hop) { // We allow out of order execution only for single hop transactions. // It might be possible to do it for multi-hop transactions as well but currently is // too complicated to reason about. - if (single_hop && lock_acquire_cnt.load(memory_order_relaxed) == num_shards) { - dist_.out_of_order.store(true, memory_order_relaxed); + if (single_hop && lock_granted_cnt.load(memory_order_relaxed) == num_shards) { + out_of_order = true; } - DVLOG(1) << "Scheduled " << DebugId() << " OutOfOrder: " << dist_.out_of_order; + DVLOG(1) << "Scheduled " << DebugId() << " OutOfOrder: " << out_of_order; state_mask_.fetch_or(SCHEDULED, memory_order_release); break; @@ -281,6 +266,12 @@ void Transaction::ScheduleInternal(bool single_hop) { ess_->RunBriefInParallel(std::move(cancel), is_active); CHECK_EQ(0u, success.load(memory_order_relaxed)); } + + if (out_of_order) { + for (auto& sd : shard_data_) { + sd.local_mask |= OUT_OF_ORDER; + } + } } // Optimized "Schedule and execute" function for the most common use-case of a single hop @@ -291,25 +282,43 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { cb_ = std::move(cb); - bool run_eager = false; bool schedule_fast = (unique_shard_cnt_ == 1); if (schedule_fast) { // Single shard (local) optimization. // We never resize shard_data because that would affect MULTI transaction correctness. - DCHECK_EQ(1u, dist_.shard_data.size()); + DCHECK_EQ(1u, shard_data_.size()); - dist_.shard_data[0].local_mask |= ARMED; - run_count_.fetch_add(1, memory_order_release); // Decreases in RunLocal. - auto schedule_cb = [&] { return ScheduleUniqueShard(EngineShard::tlocal()); }; - run_eager = ess_->Await(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier. - (void)run_eager; - } else { // Transaction spans multiple shards or it's global (like flushdb) + shard_data_[0].local_mask |= ARMED; + + // memory_order_release because we do not want it to be reordered with shard_data writes + // above. + // IsArmedInShard() first checks run_count_ before accessing shard_data. + run_count_.fetch_add(1, memory_order_release); + + // Please note that schedule_cb can not update any data on ScheduleSingleHop stack + // since the latter can exit before ScheduleUniqueShard returns. + // The problematic flow is as follows: ScheduleUniqueShard schedules into TxQueue and then + // call PollExecute that runs the callback which calls DecreaseRunCnt. + // As a result WaitForShardCallbacks below is unblocked. + auto schedule_cb = [&] { + bool run_eager = ScheduleUniqueShard(EngineShard::tlocal()); + if (run_eager) { + // it's important to DecreaseRunCnt only for run_eager and after run_eager was assigned. + // If DecreaseRunCnt were called before ScheduleUniqueShard finishes + // then WaitForShardCallbacks below could exit before schedule_cb assigns return value + // to run_eager and cause stack corruption. + CHECK_GE(DecreaseRunCnt(), 1u); + } + }; + + ess_->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier. + } else { ScheduleInternal(true); ExecuteAsync(true); } - DVLOG(1) << "Before DoneWait " << DebugId() << " " << args_.front(); + DVLOG(1) << "ScheduleSingleHop before Wait " << DebugId() << " " << run_count_.load(); WaitForShardCallbacks(); - DVLOG(1) << "After DoneWait"; + DVLOG(1) << "ScheduleSingleHop after Wait " << DebugId(); cb_ = nullptr; state_mask_.fetch_or(AFTERRUN, memory_order_release); @@ -323,23 +332,24 @@ void Transaction::Execute(RunnableType cb, bool conclude) { ExecuteAsync(conclude); - DVLOG(1) << "Wait on " << DebugId(); + DVLOG(1) << "Wait on Exec " << DebugId(); WaitForShardCallbacks(); - DVLOG(1) << "Wait on " << DebugId() << " completed"; + DVLOG(1) << "Wait on Exec " << DebugId() << " completed"; + cb_ = nullptr; - dist_.out_of_order.store(false, memory_order_relaxed); uint32_t mask = conclude ? AFTERRUN : RUNNING; - state_mask_.fetch_or(mask, memory_order_release); + state_mask_.fetch_or(mask, memory_order_relaxed); } // Runs in coordinator thread. void Transaction::ExecuteAsync(bool concluding_cb) { DVLOG(1) << "ExecuteAsync " << DebugId() << " concluding " << concluding_cb; - dist_.is_concluding_cb = concluding_cb; + is_concluding_cb_ = concluding_cb; DCHECK_GT(unique_shard_cnt_, 0u); + DCHECK_GT(use_count_.load(memory_order_relaxed), 0u); // We do not necessarily Execute this transaction in 'cb' below. It well may be that it will be // executed by the engine shard once it has been armed and coordinator thread will finish the @@ -349,10 +359,10 @@ void Transaction::ExecuteAsync(bool concluding_cb) { use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed); if (unique_shard_cnt_ == 1) { - dist_.shard_data[SidToId(unique_shard_id_)].local_mask |= ARMED; + shard_data_[SidToId(unique_shard_id_)].local_mask |= ARMED; } else { - for (ShardId i = 0; i < dist_.shard_data.size(); ++i) { - auto& sd = dist_.shard_data[i]; + for (ShardId i = 0; i < shard_data_.size(); ++i) { + auto& sd = shard_data_[i]; if (sd.arg_count == 0) continue; DCHECK_LT(sd.arg_count, 1u << 15); @@ -360,18 +370,37 @@ void Transaction::ExecuteAsync(bool concluding_cb) { } } + uint32_t seq = seqlock_.load(memory_order_relaxed); + // this fence prevents that a read or write operation before a release fence will be reordered // with a write operation after a release fence. Specifically no writes below will be reordered // upwards. Important, because it protects non-threadsafe local_mask from being accessed by // IsArmedInShard in other threads. - run_count_.fetch_add(unique_shard_cnt_, memory_order_acq_rel); + run_count_.store(unique_shard_cnt_, memory_order_release); - auto cb = [this] { + // We verify seq lock has the same generation number. See below for more info. + auto cb = [seq, this] { EngineShard* shard = EngineShard::tlocal(); - DVLOG(2) << "TriggerExec " << DebugId() << " sid:" << shard->shard_id(); + DVLOG(2) << "EngineShard::Exec " << DebugId() << " sid:" << shard->shard_id() << " " + << run_count_.load(memory_order_relaxed); - // Everything that should be handled during the callback execution should go into RunInShard. - shard->Execute(this); + uint16_t local_mask = GetLocalMask(shard->shard_id()); + + // we use fetch_add with release trick to make sure that local_mask is loaded before + // we load seq_after. We could gain similar result with "atomic_thread_fence(acquire)" + uint32_t seq_after = seqlock_.fetch_add(0, memory_order_release); + + // We verify that this callback is still relevant. + // If we still have the same sequence number and local_mask is ARMED it means + // the coordinator thread has not crossed WaitForShardCallbacks barrier. + // Otherwise, this callback is redundant. We may still call PollExecution but + // we should not pass this to it since it can be in undefined state for this callback. + if (seq_after == seq && (local_mask & ARMED)) { + // shard->PollExecution(this) does not necessarily execute this transaction. + // Therefore, everything that should be handled during the callback execution + // should go into RunInShard. + shard->PollExecution(this); + } DVLOG(2) << "ptr_release " << DebugId() << " " << use_count(); intrusive_ptr_release(this); // against use_count_.fetch_add above. @@ -381,8 +410,8 @@ void Transaction::ExecuteAsync(bool concluding_cb) { if (unique_shard_cnt_ == 1) { ess_->Add(unique_shard_id_, std::move(cb)); // serves as a barrier. } else { - for (ShardId i = 0; i < dist_.shard_data.size(); ++i) { - auto& sd = dist_.shard_data[i]; + for (ShardId i = 0; i < shard_data_.size(); ++i) { + auto& sd = shard_data_[i]; if (sd.arg_count == 0) continue; ess_->Add(i, cb); // serves as a barrier. @@ -391,11 +420,11 @@ void Transaction::ExecuteAsync(bool concluding_cb) { } void Transaction::RunQuickie() { - DCHECK_EQ(1u, dist_.shard_data.size()); + DCHECK_EQ(1u, shard_data_.size()); DCHECK_EQ(0u, txid_); EngineShard* shard = EngineShard::tlocal(); - auto& sd = dist_.shard_data[0]; + auto& sd = shard_data_[0]; DCHECK_EQ(0, sd.local_mask & KEYS_ACQUIRED); DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id() << " " << args_[0]; @@ -405,7 +434,6 @@ void Transaction::RunQuickie() { sd.local_mask &= ~ARMED; cb_ = nullptr; // We can do it because only a single shard runs the callback. - CHECK_GE(DecreaseRunCnt(), 1u); } const char* Transaction::Name() const { @@ -414,7 +442,7 @@ const char* Transaction::Name() const { KeyLockArgs Transaction::GetLockArgs(ShardId sid) const { KeyLockArgs res; - res.db_index = 0; // TODO + res.db_index = db_index_; res.key_step = cid_->key_arg_step(); res.args = ShardArgsInShard(sid); @@ -426,19 +454,19 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const { // Returns true if was eagerly executed, false if it was scheduled into queue. bool Transaction::ScheduleUniqueShard(EngineShard* shard) { DCHECK_EQ(0u, txid_); - DCHECK_EQ(1u, dist_.shard_data.size()); + DCHECK_EQ(1u, shard_data_.size()); auto mode = Mode(); auto lock_args = GetLockArgs(shard->shard_id()); - auto& sd = dist_.shard_data.front(); + auto& sd = shard_data_.front(); DCHECK_EQ(TxQueue::kEnd, sd.pq_pos); // Fast path - for uncontended keys, just run the callback. // That applies for single key operations like set, get, lpush etc. if (shard->db_slice().CheckLock(mode, lock_args)) { RunQuickie(); // TODO: for journal - this can become multi-shard - // transaction on replica. + // transaction on replica. return true; } @@ -455,6 +483,9 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { DCHECK(!lock_acquired); // Because CheckLock above failed. state_mask_.fetch_or(SCHEDULED, memory_order_release); + DVLOG(1) << "Rescheduling into TxQueue " << DebugId(); + + shard->PollExecution(nullptr); return false; } @@ -475,7 +506,7 @@ pair Transaction::ScheduleInShard(EngineShard* shard) { bool lock_granted = false; ShardId sid = SidToId(shard->shard_id()); - auto& sd = dist_.shard_data[sid]; + auto& sd = shard_data_[sid]; bool shard_unlocked = true; lock_args = GetLockArgs(shard->shard_id()); @@ -495,7 +526,7 @@ pair Transaction::ScheduleInShard(EngineShard* shard) { // before we start scheduling them we lock the shards and disable OOO. // We may record when they disable OOO via barrier_ts so if the queue contains transactions // that were only scheduled afterwards we know they are not free so we can still - // reorder the queue. Currently, this optimization is disabled: barrier_ts < pq->HeadRank(). + // reorder the queue. Currently, this optimization is disabled: barrier_ts < pq->HeadScore(). bool to_proceed = lock_granted || pq->TailScore() < txid_; if (!to_proceed) { if (sd.local_mask & KEYS_ACQUIRED) { // rollback the lock. @@ -521,7 +552,7 @@ pair Transaction::ScheduleInShard(EngineShard* shard) { bool Transaction::CancelInShard(EngineShard* shard) { ShardId sid = SidToId(shard->shard_id()); - auto& sd = dist_.shard_data[sid]; + auto& sd = shard_data_[sid]; auto pos = sd.pq_pos; if (pos == TxQueue::kEnd) @@ -555,7 +586,7 @@ ArgSlice Transaction::ShardArgsInShard(ShardId sid) const { return args_; } - const auto& sd = dist_.shard_data[sid]; + const auto& sd = shard_data_[sid]; return ArgSlice{args_.data() + sd.arg_start, sd.arg_count}; } @@ -563,12 +594,13 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const { if (unique_shard_cnt_ == 1) return arg_index; - return dist_.reverse_index[dist_.shard_data[shard_id].arg_start + arg_index]; + return reverse_index_[shard_data_[shard_id].arg_start + arg_index]; } inline uint32_t Transaction::DecreaseRunCnt() { // We use release so that no stores will be reordered after. uint32_t res = run_count_.fetch_sub(1, std::memory_order_release); + if (res == 1) run_ec_.notify(); return res; diff --git a/server/transaction.h b/server/transaction.h index eb6eebda5..a4a46ab9c 100644 --- a/server/transaction.h +++ b/server/transaction.h @@ -7,15 +7,15 @@ #include #include #include -#include -#include #include +#include +#include #include #include "core/intent_lock.h" -#include "core/tx_queue.h" #include "core/op_status.h" +#include "core/tx_queue.h" #include "server/common_types.h" #include "server/table.h" #include "util/fibers/fibers_ext.h" @@ -32,7 +32,7 @@ class Transaction { ~Transaction(); - // Transactions are reference counted. + // Transactions are reference counted. friend void intrusive_ptr_add_ref(Transaction* trans) noexcept { trans->use_count_.fetch_add(1, std::memory_order_relaxed); } @@ -49,8 +49,9 @@ class Transaction { using time_point = ::std::chrono::steady_clock::time_point; enum LocalState : uint8_t { - ARMED = 1, // Transaction was armed with the callback - KEYS_ACQUIRED = 0x20, + ARMED = 1, // Transaction was armed with the callback + OUT_OF_ORDER = 2, + KEYS_ACQUIRED = 4, }; enum State : uint8_t { @@ -61,7 +62,7 @@ class Transaction { Transaction(const CommandId* cid, EngineShardSet* ess); - void InitByArgs(CmdArgList args); + void InitByArgs(DbIndex index, CmdArgList args); std::string DebugId() const; @@ -77,35 +78,30 @@ class Transaction { //! Runs from the coordinator thread. bool IsActive(ShardId shard_id) const { return unique_shard_cnt_ == 1 ? unique_shard_id_ == shard_id - : dist_.shard_data[shard_id].arg_count > 0; + : shard_data_[shard_id].arg_count > 0; } //! Returns true if the transaction is armed for execution on this sid (used to avoid //! duplicate runs). Supports local transactions under multi as well. bool IsArmedInShard(ShardId sid) const { - if (sid >= dist_.shard_data.size()) + if (sid >= shard_data_.size()) sid = 0; // We use acquire so that no reordering will move before this load. - return run_count_.load(std::memory_order_acquire) > 0 && - dist_.shard_data[sid].local_mask & ARMED; + return run_count_.load(std::memory_order_acquire) > 0 && shard_data_[sid].local_mask & ARMED; } // Called from engine set shard threads. uint16_t GetLocalMask(ShardId sid) const { - return dist_.shard_data[SidToId(sid)].local_mask; + return shard_data_[SidToId(sid)].local_mask; } uint32_t GetStateMask() const { return state_mask_.load(std::memory_order_relaxed); } - bool IsOutOfOrder() const { - return dist_.out_of_order.load(std::memory_order_relaxed); - } - // Relevant only when unique_shards_ > 1. uint32_t TxQueuePos(ShardId sid) const { - return dist_.shard_data[sid].pq_pos; + return shard_data_[sid].pq_pos; } // if conclude is true, removes the transaction from the pending queue. @@ -145,16 +141,17 @@ class Transaction { return unique_shard_cnt_; } - EngineShardSet* shard_set() { return ess_; } - + EngineShardSet* shard_set() { + return ess_; + } // Called by EngineShard when performing Execute over the tx queue. // Returns true if transaction should be kept in the queue. - bool RunInShard(ShardId sid); + bool RunInShard(EngineShard* shard); private: unsigned SidToId(ShardId sid) const { - return sid < dist_.shard_data.size() ? sid : 0; + return sid < shard_data_.size() ? sid : 0; } void ScheduleInternal(bool single_hop); @@ -182,9 +179,13 @@ class Transaction { void WaitForShardCallbacks() { run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); }); + + // store operations below can not be ordered above the fence + std::atomic_thread_fence(std::memory_order_release); + seqlock_.fetch_add(1, std::memory_order_relaxed); } - // Returns the previous value of arm count. + // Returns the previous value of run count. uint32_t DecreaseRunCnt(); uint32_t use_count() const { @@ -208,59 +209,54 @@ class Transaction { }; enum { kPerShardSize = sizeof(PerShardData) }; - struct Dist { - // shard_data spans all the shards in ess_. - // I wish we could use a dense array of size [0..uniq_shards] but since - // multiple threads access this array to synchronize between themselves using - // PerShardData.state, it can be tricky. The complication comes from multi_ transactions where - // scheduled transaction is accessed between operations as well. - absl::InlinedVector shard_data; // length = shard_count - - // Reverse argument mapping. Allows to reconstruct responses according to the original order of - // keys. - std::vector reverse_index; - - // NOTE: to move to bitmask if it grows. - // Written by coordinator thread, read by shard threads but not concurrently. - // Says whether the current callback function is concluding for this operation. - bool is_concluding_cb{true}; - - // out_of_order true - transaction can execute before other scheduled transactions, - // not necessary according to its queue order. - std::atomic_bool out_of_order{false}; + struct LockCnt { + unsigned cnt[2] = {0, 0}; }; - enum { kDistSize = sizeof(Dist) }; + util::fibers_ext::EventCount blocking_ec_; // used to wake blocking transactions. + util::fibers_ext::EventCount run_ec_; + + // shard_data spans all the shards in ess_. + // I wish we could use a dense array of size [0..uniq_shards] but since + // multiple threads access this array to synchronize between themselves using + // PerShardData.state, it can be tricky. The complication comes from multi_ transactions where + // scheduled transaction is accessed between operations as well. + absl::InlinedVector shard_data_; // length = shard_count + + //! Stores arguments of the transaction (i.e. keys + values) partitioned by shards. + absl::InlinedVector args_; + + // Reverse argument mapping. Allows to reconstruct responses according to the original order of + // keys. + std::vector reverse_index_; + + RunnableType cb_; const CommandId* cid_; EngineShardSet* ess_; TxId txid_{0}; - std::atomic_uint32_t use_count_{0}, run_count_{0}; + std::atomic_uint32_t use_count_{0}, run_count_{0}, seqlock_{0}; // unique_shard_cnt_ and unique_shard_id_ is accessed only by coordinator thread. uint32_t unique_shard_cnt_{0}; // number of unique shards span by args_ - ShardId unique_shard_id_{kInvalidSid}; + + uint32_t trans_options_ = 0; // Written by coordination thread but may be read by Shard threads. // A mask of State values. Mostly used for debugging and for invariant checks. std::atomic state_mask_{0}; + ShardId unique_shard_id_{kInvalidSid}; DbIndex db_index_ = 0; // For single-hop transactions with unique_shards_ == 1, hence no data-race. OpStatus local_result_ = OpStatus::OK; - uint32_t trans_options_ = 0; - uint32_t num_keys_ = 0; - Dist dist_; - - util::fibers_ext::EventCount run_ec_; - - //! Stores arguments of the transaction (i.e. keys + values) ordered by shards. - absl::InlinedVector args_; - - RunnableType cb_; + // NOTE: to move to bitmask if it grows. + // Written by coordinator thread, read by shard threads but not concurrently. + // Says whether the current callback function is concluding for this operation. + bool is_concluding_cb_{true}; struct PerShardCache { std::vector args;