From 03e99a5d96e7e2914eb34c6c245a5e6cc4795035 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Mon, 20 Feb 2023 09:43:31 +0300 Subject: [PATCH] EVAL multi modes + non atomic modes (#818) - Implement multi modes for eval - Implement non atomic mode - Enhance tests --- src/server/conn_context.cc | 4 - src/server/conn_context.h | 2 - src/server/main_service.cc | 160 +++++++++++++++++++++++++++---------- src/server/main_service.h | 4 + src/server/multi_test.cc | 103 ++++++++++++++++++++++-- src/server/transaction.cc | 73 ++++++++++++----- src/server/transaction.h | 21 ++++- 7 files changed, 287 insertions(+), 80 deletions(-) diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index 6d343f203..6d54f80e1 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -34,10 +34,6 @@ StoredCmd::StoredCmd(const CommandId* d, CmdArgList args) : descr(d) { arg_list_ = {arg_vec_.data(), arg_vec_.size()}; } -void StoredCmd::Invoke(ConnectionContext* ctx) { - descr->Invoke(arg_list_, ctx); -} - void ConnectionContext::ChangeMonitor(bool start) { // This will either remove or register a new connection // at the "top level" thread --> ServerState context diff --git a/src/server/conn_context.h b/src/server/conn_context.h index 646dc3519..6858b0a72 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -29,8 +29,6 @@ struct StoredCmd { CmdArgList ArgList() const { return arg_list_; } - - void Invoke(ConnectionContext* ctx); }; struct ConnectionState { diff --git a/src/server/main_service.cc b/src/server/main_service.cc index d495f0993..9d50c8436 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -45,7 +45,10 @@ ABSL_FLAG(uint32_t, memcache_port, 0, "Memcached port"); ABSL_FLAG(int, multi_exec_mode, 1, "Set multi exec atomicity mode: 1 for global, 2 for locking ahead, 3 for locking " - "incrementally"); + "incrementally, 4 for non atomic"); +ABSL_FLAG(int, multi_eval_mode, 2, + "Set EVAL atomicity mode: 1 for global, 2 for locking ahead, 3 for locking " + "incrementally, 4 for non atomic"); namespace dfly { @@ -534,6 +537,34 @@ static void MultiSetError(ConnectionContext* cntx) { } } +// Return OK if all keys are allowed to be accessed: either declared in EVAL or +// transaction is running in global or non-atomic mode. +OpStatus CheckKeysDeclared(const ConnectionState::ScriptInfo& eval_info, const CommandId* cid, + CmdArgList args, Transaction* trans) { + Transaction::MultiMode multi_mode = trans->GetMultiMode(); + + // We either scheduled on all shards or re-schedule for each operation, + // so we are not restricted to any keys. + if (multi_mode == Transaction::GLOBAL || multi_mode == Transaction::NON_ATOMIC) + return OpStatus::OK; + + OpResult key_index_res = DetermineKeys(cid, args); + if (!key_index_res) + return key_index_res.status(); + + const auto& key_index = *key_index_res; + for (unsigned i = key_index.start; i < key_index.end; ++i) { + if (!eval_info.keys.contains(ArgS(args, i))) { + return OpStatus::KEY_NOTFOUND; + } + } + + if (unsigned i = key_index.bonus; i && !eval_info.keys.contains(ArgS(args, i))) + return OpStatus::KEY_NOTFOUND; + + return OpStatus::OK; +} + void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) { CHECK(!args.empty()); DCHECK_NE(0u, shard_set->size()) << "Init was not called"; @@ -656,24 +687,20 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) if (under_script) { DCHECK(dfly_cntx->transaction); if (IsTransactional(cid)) { - OpResult key_index_res = DetermineKeys(cid, args); - if (!key_index_res) - return (*cntx)->SendError(key_index_res.status()); + OpStatus status = + CheckKeysDeclared(*dfly_cntx->conn_state.script_info, cid, args, dfly_cntx->transaction); - const auto& key_index = *key_index_res; - for (unsigned i = key_index.start; i < key_index.end; ++i) { - string_view key = ArgS(args, i); - if (!dfly_cntx->conn_state.script_info->keys.contains(key)) { - return (*cntx)->SendError("script tried accessing undeclared key"); - } - } + if (status == OpStatus::KEY_NOTFOUND) + return (*cntx)->SendError("script tried accessing undeclared key"); + + if (status != OpStatus::OK) + return (*cntx)->SendError(status); dfly_cntx->transaction->MultiSwitchCmd(cid); - OpStatus st = dfly_cntx->transaction->InitByArgs(dfly_cntx->conn_state.db_index, args); + status = dfly_cntx->transaction->InitByArgs(dfly_cntx->conn_state.db_index, args); - if (st != OpStatus::OK) { - return (*cntx)->SendError(st); - } + if (status != OpStatus::OK) + return (*cntx)->SendError(status); } } else { DCHECK(dfly_cntx->transaction == nullptr); @@ -696,29 +723,45 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) dfly_cntx->cid = cid; - try { - cid->Invoke(args, dfly_cntx); - } catch (std::exception& e) { - LOG(ERROR) << "Internal error, system probably unstable " << e.what(); + // Collect stats for all regular transactions and all multi transactions from scripts, except EVAL + // itself. EXEC does not use DispatchCommand for dispatching. + bool collect_stats = + dfly_cntx->transaction && (!dfly_cntx->transaction->IsMulti() || under_script); + if (!InvokeCmd(args, cid, dfly_cntx, collect_stats)) { dfly_cntx->reply_builder()->SendError("Internal Error"); dfly_cntx->reply_builder()->CloseConnection(); } end_usec = ProactorBase::GetMonotonicTimeNs(); - request_latency_usec.IncBy(cmd_str, (end_usec - start_usec) / 1000); - if (dist_trans) { - bool is_ooo = dist_trans->IsOOO(); - dfly_cntx->last_command_debug.clock = dist_trans->txid(); - dfly_cntx->last_command_debug.is_ooo = is_ooo; - etl.stats.ooo_tx_cnt += is_ooo; - } if (!under_script) { dfly_cntx->transaction = nullptr; } } +bool Service::InvokeCmd(CmdArgList args, const CommandId* cid, ConnectionContext* cntx, + bool record_stats) { + try { + cid->Invoke(args, cntx); + } catch (std::exception& e) { + LOG(ERROR) << "Internal error, system probably unstable " << e.what(); + return false; + } + + if (record_stats) { + DCHECK(cntx->transaction); + ServerState& etl = *ServerState::tlocal(); + bool is_ooo = cntx->transaction->IsOOO(); + + cntx->last_command_debug.clock = cntx->transaction->txid(); + cntx->last_command_debug.is_ooo = is_ooo; + etl.stats.ooo_tx_cnt += is_ooo; + } + + return true; +} + void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, facade::ConnectionContext* cntx) { absl::InlinedVector args; @@ -1007,6 +1050,13 @@ void Service::EvalSha(CmdArgList args, ConnectionContext* cntx) { ss->RecordCallLatency(sha, (end - start) / 1000); } +vector DetermineKeyShards(CmdArgList keys) { + vector out(shard_set->size()); + for (auto k : keys) + out[Shard(facade::ToSV(k), out.size())] = true; + return out; +} + void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter, ConnectionContext* cntx) { DCHECK(!eval_args.sha.empty()); @@ -1042,8 +1092,24 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter, } DCHECK(cntx->transaction); - if (!eval_args.keys.empty()) + bool scheduled = false; + int multi_mode = absl::GetFlag(FLAGS_multi_eval_mode); + DCHECK(multi_mode >= Transaction::GLOBAL && multi_mode <= Transaction::NON_ATOMIC); + + if (multi_mode == Transaction::GLOBAL) { + scheduled = true; + cntx->transaction->StartMultiGlobal(cntx->db_index()); + } else if (multi_mode == Transaction::LOCK_INCREMENTAL && !eval_args.keys.empty()) { + scheduled = true; + vector shards = DetermineKeyShards(eval_args.keys); + cntx->transaction->StartMultiLockedIncr(cntx->db_index(), shards); + } else if (multi_mode == Transaction::LOCK_AHEAD && !eval_args.keys.empty()) { + scheduled = true; cntx->transaction->StartMultiLockedAhead(cntx->db_index(), eval_args.keys); + } else if (multi_mode == Transaction::NON_ATOMIC) { + scheduled = true; + cntx->transaction->StartMultiNonAtomic(); + }; interpreter->SetGlobalArray("KEYS", eval_args.keys); interpreter->SetGlobalArray("ARGV", eval_args.args); @@ -1056,7 +1122,7 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter, cntx->conn_state.script_info.reset(); // reset script_info // Conclude the transaction. - if (!eval_args.keys.empty()) + if (scheduled) cntx->transaction->UnlockMulti(); if (result == Interpreter::RUN_ERR) { @@ -1187,18 +1253,30 @@ bool StartMultiExec(DbIndex dbid, Transaction* trans, ConnectionState::ExecInfo* return false; int multi_mode = absl::GetFlag(FLAGS_multi_exec_mode); - CHECK(multi_mode >= 1 && multi_mode <= 3); + DCHECK(multi_mode >= Transaction::GLOBAL && multi_mode <= Transaction::NON_ATOMIC); - if (global || multi_mode == Transaction::GLOBAL) { - trans->StartMultiGlobal(dbid); - } else if (multi_mode == Transaction::LOCK_AHEAD) { - *tmp_keys = CollectAllKeys(exec_info); - trans->StartMultiLockedAhead(dbid, CmdArgList{*tmp_keys}); - } else { - vector shards = DetermineKeyShards(exec_info); - DCHECK(std::any_of(shards.begin(), shards.end(), [](bool s) { return s; })); - trans->StartMultiLockedIncr(dbid, shards); - } + // Atomic modes fall back to GLOBAL if they contain global commands. + if (global && + (multi_mode == Transaction::LOCK_AHEAD || multi_mode == Transaction::LOCK_INCREMENTAL)) + multi_mode = Transaction::GLOBAL; + + switch ((Transaction::MultiMode)multi_mode) { + case Transaction::GLOBAL: + trans->StartMultiGlobal(dbid); + break; + case Transaction::LOCK_AHEAD: + *tmp_keys = CollectAllKeys(exec_info); + trans->StartMultiLockedAhead(dbid, CmdArgList{*tmp_keys}); + break; + case Transaction::LOCK_INCREMENTAL: + trans->StartMultiLockedIncr(dbid, DetermineKeyShards(exec_info)); + break; + case Transaction::NON_ATOMIC: + trans->StartMultiNonAtomic(); + break; + case Transaction::NOT_DETERMINED: + DCHECK(false); + }; return true; } @@ -1248,8 +1326,8 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { break; } } - scmd.Invoke(cntx); - if (rb->GetError()) // checks for i/o error, not logical error. + bool ok = InvokeCmd(scmd.ArgList(), scmd.descr, cntx, true); + if (!ok || rb->GetError()) // checks for i/o error, not logical error. break; } } diff --git a/src/server/main_service.h b/src/server/main_service.h index 6fd29bc4c..a9a59c419 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -39,6 +39,10 @@ class Service : public facade::ServiceInterface { void Shutdown(); void DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) final; + + // Returns true if command was executed successfully. + bool InvokeCmd(CmdArgList args, const CommandId* cid, ConnectionContext* cntx, bool record_stats); + void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value, facade::ConnectionContext* cntx) final; diff --git a/src/server/multi_test.cc b/src/server/multi_test.cc index 593fc8b7a..700913805 100644 --- a/src/server/multi_test.cc +++ b/src/server/multi_test.cc @@ -15,6 +15,7 @@ #include "server/transaction.h" ABSL_DECLARE_FLAG(int, multi_exec_mode); +ABSL_DECLARE_FLAG(int, multi_eval_mode); namespace dfly { @@ -191,6 +192,10 @@ TEST_F(MultiTest, MultiSeq) { } TEST_F(MultiTest, MultiConsistent) { + int multi_mode = absl::GetFlag(FLAGS_multi_exec_mode); + if (multi_mode == Transaction::NON_ATOMIC) + return; + auto mset_fb = pp_->at(0)->LaunchFiber([&] { for (size_t i = 1; i < 10; ++i) { string base = StrCat(i * 900); @@ -327,6 +332,10 @@ TEST_F(MultiTest, FlushDb) { } TEST_F(MultiTest, Eval) { + int multi_mode = absl::GetFlag(FLAGS_multi_eval_mode); + if (multi_mode == Transaction::GLOBAL || multi_mode == Transaction::NON_ATOMIC) + absl::SetFlag(&FLAGS_multi_eval_mode, Transaction::LOCK_AHEAD); + RespExpr resp; resp = Run({"incrby", "foo", "42"}); @@ -363,6 +372,8 @@ TEST_F(MultiTest, Eval) { resp = Run({"hvals", "hmap"}); EXPECT_EQ(resp, "2222"); + + absl::SetFlag(&FLAGS_multi_eval_mode, multi_mode); } TEST_F(MultiTest, Watch) { @@ -476,7 +487,8 @@ TEST_F(MultiTest, MultiOOO) { auto metrics = GetMetrics(); // OOO works in LOCK_AHEAD mode. - if (absl::GetFlag(FLAGS_multi_exec_mode) == Transaction::LOCK_AHEAD) + int mode = absl::GetFlag(FLAGS_multi_exec_mode); + if (mode == Transaction::LOCK_AHEAD || mode == Transaction::NON_ATOMIC) EXPECT_EQ(200, metrics.ooo_tx_transaction_cnt); else EXPECT_EQ(0, metrics.ooo_tx_transaction_cnt); @@ -490,9 +502,6 @@ TEST_F(MultiTest, EvalOOO) { { auto resp = Run({"eval", kScript, "3", kKey1, kKey2, kKey3}); ASSERT_EQ(resp, "OK"); - - auto metrics = GetMetrics(); - EXPECT_EQ(1, metrics.ooo_tx_transaction_cnt); } const int kTimes = 10; @@ -508,10 +517,14 @@ TEST_F(MultiTest, EvalOOO) { f1.Join(); f2.Join(); - - auto metrics = GetMetrics(); - EXPECT_EQ(1 + 2 * kTimes, metrics.ooo_tx_transaction_cnt); } + + auto metrics = GetMetrics(); + int mode = absl::GetFlag(FLAGS_multi_eval_mode); + if (mode == Transaction::LOCK_AHEAD || mode == Transaction::NON_ATOMIC) + EXPECT_EQ(1 + 2 * kTimes, metrics.ooo_tx_transaction_cnt); + else + EXPECT_EQ(0, metrics.ooo_tx_transaction_cnt); } // Run MULTI/EXEC commands in parallel, where each command is: @@ -545,7 +558,7 @@ TEST_F(MultiTest, MultiContendedPermutatedKeys) { } TEST_F(MultiTest, MultiCauseUnblocking) { - const int kRounds = 5; + const int kRounds = 10; vector keys = {kKeySid0, kKeySid1, kKeySid2}; auto push = [this, keys, kRounds]() mutable { @@ -573,4 +586,78 @@ TEST_F(MultiTest, MultiCauseUnblocking) { f2.Join(); } +TEST_F(MultiTest, EvalUndeclared) { + int start_mode = absl::GetFlag(FLAGS_multi_eval_mode); + int allowed_modes[] = {Transaction::GLOBAL, Transaction::NON_ATOMIC}; + + Run({"set", "undeclared-k", "works"}); + const char* kScript = "return redis.call('GET', 'undeclared-k')"; + + for (int multi_mode : allowed_modes) { + absl::SetFlag(&FLAGS_multi_eval_mode, multi_mode); + auto res = Run({"eval", kScript, "0"}); + EXPECT_EQ(res, "works"); + } + + absl::SetFlag(&FLAGS_multi_eval_mode, start_mode); +} + +TEST_F(MultiTest, GlobalFallback) { + // Check global command MOVE falls back to global mode from lock ahead. + absl::SetFlag(&FLAGS_multi_exec_mode, Transaction::LOCK_AHEAD); + Run({"multi"}); + Run({"set", "a", "1"}); // won't run ooo, because it became part of global + Run({"move", "a", "1"}); + Run({"exec"}); + EXPECT_EQ(0, GetMetrics().ooo_tx_transaction_cnt); + + // Check non atomic mode does not fall back to global. + absl::SetFlag(&FLAGS_multi_exec_mode, Transaction::NON_ATOMIC); + Run({"multi"}); + Run({"set", "a", "1"}); // will run ooo + Run({"move", "a", "1"}); + Run({"exec"}); + EXPECT_EQ(1, GetMetrics().ooo_tx_transaction_cnt); +} + +// Run multi-exec transactions that move values from a source list +// to destination list through two contended channels. +TEST_F(MultiTest, ContendedList) { + if (absl::GetFlag(FLAGS_multi_exec_mode) == Transaction::NON_ATOMIC) + return; + + const int listSize = 50; + const int stepSize = 5; + + auto run = [this, listSize, stepSize](string_view src, string_view dest) { + for (int i = 0; i < listSize / stepSize; i++) { + Run({"multi"}); + Run({"sort", src}); + for (int j = 0; j < stepSize; j++) + Run({"lmove", src, j % 2 ? "chan-1" : "chan-2", "RIGHT", "RIGHT"}); + for (int j = 0; j < stepSize; j++) + Run({"lmove", j % 2 ? "chan-1" : "chan-2", dest, "LEFT", "RIGHT"}); + Run({"exec"}); + } + }; + + for (int i = 0; i < listSize; i++) { + Run({"lpush", "l1", "a"}); + Run({"lpush", "l2", "b"}); + } + + auto f1 = pp_->at(1)->LaunchFiber([run]() mutable { run("l1", "l1-out"); }); + auto f2 = pp_->at(2)->LaunchFiber([run]() mutable { run("l2", "l2-out"); }); + + f1.Join(); + f2.Join(); + + for (int i = 0; i < listSize; i++) { + EXPECT_EQ(Run({"lpop", "l1"}), "a"); + EXPECT_EQ(Run({"lpop", "l2"}), "b"); + } + EXPECT_EQ(Run({"llen", "chan-1"}), "0"); + EXPECT_EQ(Run({"llen", "chan-2"}), "0"); +} + } // namespace dfly diff --git a/src/server/transaction.cc b/src/server/transaction.cc index cac4da667..7702477c2 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -64,6 +64,8 @@ void Transaction::InitBase(DbIndex dbid, CmdArgList args) { } void Transaction::InitGlobal() { + DCHECK(!multi_ || (multi_->mode == GLOBAL || multi_->mode == NON_ATOMIC)); + global_ = true; unique_shard_cnt_ = shard_set->size(); shard_data_.resize(unique_shard_cnt_); @@ -149,6 +151,9 @@ void Transaction::InitMultiData(KeyIndex key_index) { DCHECK(multi_); auto args = cmd_with_full_args_; + if (multi_->mode == NON_ATOMIC) + return; + // TODO: determine correct locking mode for transactions, scripts and regular commands. IntentLock::Mode mode = Mode(); multi_->keys.clear(); @@ -156,11 +161,12 @@ void Transaction::InitMultiData(KeyIndex key_index) { auto& tmp_uniques = tmp_space.uniq_keys; tmp_uniques.clear(); - auto lock_key = [this, mode, &tmp_uniques](auto key) { + auto lock_key = [this, mode, &tmp_uniques](string_view key) { if (auto [_, inserted] = tmp_uniques.insert(key); !inserted) return; + if (multi_->IsIncrLocks()) { - multi_->keys.push_back(key); + multi_->keys.emplace_back(key); } else { multi_->lock_counts[key][mode]++; } @@ -176,6 +182,8 @@ void Transaction::InitMultiData(KeyIndex key_index) { } multi_->locks_recorded = true; + DCHECK(IsAtomicMulti()); + DCHECK(multi_->mode == GLOBAL || !multi_->keys.empty() || !multi_->lock_counts.empty()); } void Transaction::StoreKeysInArgs(KeyIndex key_index, bool rev_mapping) { @@ -227,7 +235,7 @@ void Transaction::InitByKeys(KeyIndex key_index) { DCHECK_LT(key_index.start, args.size()); bool needs_reverse_mapping = cid_->opt_mask() & CO::REVERSE_MAPPING; - bool single_key = !multi_ && key_index.HasSingleKey(); + bool single_key = key_index.HasSingleKey() && !IsAtomicMulti(); if (single_key) { DCHECK_GT(key_index.step, 0u); @@ -265,12 +273,13 @@ void Transaction::InitByKeys(KeyIndex key_index) { // Compress shard data, if we occupy only one shard. if (unique_shard_cnt_ == 1) { PerShardData* sd; - if (multi_) { + if (IsAtomicMulti()) { sd = &shard_data_[unique_shard_id_]; } else { shard_data_.resize(1); sd = &shard_data_.front(); } + sd->local_mask |= ACTIVE; sd->arg_count = -1; sd->arg_start = -1; } @@ -320,6 +329,7 @@ void Transaction::StartMultiGlobal(DbIndex dbid) { multi_->mode = GLOBAL; InitBase(dbid, {}); InitGlobal(); + multi_->locks_recorded = true; ScheduleInternal(); } @@ -338,6 +348,7 @@ void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgList keys) { void Transaction::StartMultiLockedIncr(DbIndex dbid, const vector& shards) { DCHECK(multi_); DCHECK(shard_data_.empty()); // Make sure default InitByArgs didn't run. + DCHECK(std::any_of(shards.begin(), shards.end(), [](bool s) { return s; })); multi_->mode = LOCK_INCREMENTAL; InitBase(dbid, {}); @@ -352,14 +363,26 @@ void Transaction::StartMultiLockedIncr(DbIndex dbid, const vector& shards) ScheduleInternal(); } +void Transaction::StartMultiNonAtomic() { + DCHECK(multi_); + multi_->mode = NON_ATOMIC; +} + void Transaction::MultiSwitchCmd(const CommandId* cid) { DCHECK(multi_); DCHECK(!cb_); + unique_shard_id_ = 0; unique_shard_cnt_ = 0; args_.clear(); cid_ = cid; cb_ = nullptr; + + if (multi_->mode == NON_ATOMIC) { + shard_data_.resize(0); + txid_ = 0; + coordinator_state_ = 0; + } } string Transaction::DebugId() const { @@ -396,7 +419,7 @@ bool Transaction::RunInShard(EngineShard* shard) { // whether we should unlock the keys. should_release is false for multi and // equal to concluding otherwise. bool is_concluding = (coordinator_state_ & COORD_EXEC_CONCLUDING); - bool should_release = is_concluding && !multi_; + bool should_release = is_concluding && !IsAtomicMulti(); IntentLock::Mode mode = Mode(); // We make sure that we lock exactly once for each (multi-hop) transaction inside @@ -535,14 +558,13 @@ void Transaction::ScheduleInternal() { }; shard_set->RunBriefInParallel(std::move(cb), is_active); - bool ooo_disabled = IsGlobal() || (multi_ && multi_->IsIncrLocks()); + bool ooo_disabled = IsGlobal() || (IsAtomicMulti() && multi_->mode != LOCK_AHEAD); if (success.load(memory_order_acquire) == num_shards) { coordinator_state_ |= COORD_SCHED; // If we granted all locks, we can run out of order. if (!ooo_disabled && lock_granted_cnt.load(memory_order_relaxed) == num_shards) { // Currently we don't support OOO for incremental locking. Sp far they are global. - // DCHECK(!(multi_ && multi_->IsIncrLocks())); coordinator_state_ |= COORD_OOO; } VLOG(2) << "Scheduled " << DebugId() @@ -591,9 +613,8 @@ void Transaction::ScheduleInternal() { void Transaction::MultiData::AddLocks(IntentLock::Mode mode) { DCHECK(IsIncrLocks()); - - for (auto key : keys) { - lock_counts[key][mode]++; + for (auto& key : keys) { + lock_counts[std::move(key)][mode]++; } keys.clear(); } @@ -609,14 +630,14 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { DCHECK(!cb_); cb_ = std::move(cb); - DCHECK(multi_ || (coordinator_state_ & COORD_SCHED) == 0); // Only multi schedule in advance. + DCHECK(IsAtomicMulti() || (coordinator_state_ & COORD_SCHED) == 0); // Multi schedule in advance. coordinator_state_ |= (COORD_EXEC | COORD_EXEC_CONCLUDING); // Single hop means we conclude. bool was_ooo = false; // If we run only on one shard and conclude, we can avoid scheduling at all // and directly dispatch the task to its destination shard. - bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !multi_; + bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !IsAtomicMulti(); if (schedule_fast) { DCHECK_EQ(1u, shard_data_.size()); @@ -648,7 +669,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { } else { // This transaction either spans multiple shards and/or is multi. - if (!multi_) // Multi schedule in advance. + if (!IsAtomicMulti()) // Multi schedule in advance. ScheduleInternal(); if (multi_ && multi_->IsIncrLocks()) @@ -674,10 +695,14 @@ void Transaction::UnlockMulti() { DCHECK(multi_); DCHECK_GE(GetUseCount(), 1u); // Greater-equal because there may be callbacks in progress. + if (multi_->mode == NON_ATOMIC) + return; + auto sharded_keys = make_shared>(shard_set->size()); - for (const auto& [key, cnt] : multi_->lock_counts) { - ShardId sid = Shard(key, sharded_keys->size()); - (*sharded_keys)[sid].emplace_back(key, cnt); + while (!multi_->lock_counts.empty()) { + auto entry = multi_->lock_counts.extract(multi_->lock_counts.begin()); + ShardId sid = Shard(entry.key(), sharded_keys->size()); + (*sharded_keys)[sid].emplace_back(std::move(entry.key()), entry.mapped()); } unsigned shard_journals_cnt = @@ -711,7 +736,7 @@ void Transaction::Schedule() { if (multi_ && multi_->IsIncrLocks()) multi_->AddLocks(Mode()); - if (!multi_) + if (!IsAtomicMulti()) ScheduleInternal(); } @@ -743,6 +768,7 @@ void Transaction::ExecuteAsync() { DCHECK_GT(unique_shard_cnt_, 0u); DCHECK_GT(use_count_.load(memory_order_relaxed), 0u); + DCHECK(!IsAtomicMulti() || multi_->locks_recorded); // We do not necessarily Execute this transaction in 'cb' below. It well may be that it will be // executed by the engine shard once it has been armed and coordinator thread will finish the @@ -796,7 +822,7 @@ void Transaction::ExecuteAsync() { } void Transaction::RunQuickie(EngineShard* shard) { - DCHECK(!multi_); + DCHECK(!IsAtomicMulti()); DCHECK_EQ(1u, shard_data_.size()); DCHECK_EQ(0u, txid_); @@ -866,7 +892,7 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const { // Optimized path that schedules and runs transactions out of order if possible. // Returns true if was eagerly executed, false if it was scheduled into queue. bool Transaction::ScheduleUniqueShard(EngineShard* shard) { - DCHECK(!multi_); + DCHECK(!IsAtomicMulti()); DCHECK_EQ(0u, txid_); DCHECK_EQ(1u, shard_data_.size()); @@ -901,6 +927,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { // This function should not block since it's run via RunBriefInParallel. pair Transaction::ScheduleInShard(EngineShard* shard) { DCHECK(!shard_data_.empty()); + DCHECK(shard_data_[SidToId(shard->shard_id())].local_mask & ACTIVE); // schedule_success, lock_granted. pair result{false, false}; @@ -1257,10 +1284,12 @@ void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload& bool allow_await) const { auto journal = shard->journal(); CHECK(journal); - if (multi_) { + if (multi_) multi_->shard_journal_write[shard->shard_id()] = true; - } - auto opcode = (multi_ || multi_commands) ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND; + + bool is_multi = multi_commands || IsAtomicMulti(); + + auto opcode = is_multi ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND; journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, std::move(payload), allow_await); } diff --git a/src/server/transaction.h b/src/server/transaction.h index 7166dc079..feafdf585 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -75,7 +75,7 @@ class Transaction { // The shards to schedule on are detemined ahead and remain fixed. LOCK_INCREMENTAL = 3, // Each command is executed separately. Equivalent to a pipeline. - NOT_ATOMIC = 4, + NON_ATOMIC = 4, }; // State on specific shard. @@ -150,6 +150,9 @@ class Transaction { // Start multi in LOCK_INCREMENTAL mode on given shards. void StartMultiLockedIncr(DbIndex dbid, const std::vector& shards); + // Start multi in NON_ATOMIC mode. + void StartMultiNonAtomic(); + // Unlock key locks of a multi transaction. void UnlockMulti(); @@ -205,6 +208,10 @@ class Transaction { return bool(multi_); } + MultiMode GetMultiMode() const { + return multi_->mode; + } + bool IsGlobal() const; bool IsOOO() const { @@ -279,8 +286,8 @@ class Transaction { MultiMode mode; - absl::flat_hash_map lock_counts; - std::vector keys; + absl::flat_hash_map lock_counts; + std::vector keys; // The shard_journal_write vector variable is used to determine the number of shards // involved in a multi-command transaction. This information is utilized by replicas when @@ -395,6 +402,14 @@ class Transaction { return use_count_.load(std::memory_order_relaxed); } + // Whether the transaction is multi and runs in an atomic mode. + // This, instead of just IsMulti(), should be used to check for the possibility of + // different optimizations, because they can safely be applied to non-atomic multi + // transactions as well. + bool IsAtomicMulti() const { + return multi_ && multi_->mode != NON_ATOMIC; + } + unsigned SidToId(ShardId sid) const { return sid < shard_data_.size() ? sid : 0; }