From 4ef06e759ae787f2cc35be9c69393310e30acb50 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Sat, 18 Feb 2023 20:18:28 +0300 Subject: [PATCH] Basic multi modes for MULTI/EXEC (#796) feat(server): Basic multi transaction modes This commit adds the notion of multi transaction modes that allow controlling the execution and locking behaviour of multi transactions. In general, there are four modes: - GLOBAL: all commands run within a global transaction. There is no need for recording locks. Lua scripts can theoretically run with undeclared keys. - LOCK_AHEAD: the transaction locks all keys ahead likewise to a regular transaction and schedules itself. - LOCK_INCREMENTAL: the transaction determines what shards it has keys in and schedules itself on those shards, but locks only when accessing a new key. This allows other transactions to run ooo alonside with a big multi-transaction that accesses a contended key only at its very end. - NON_ATOMIC: all commands run separately, no atomicity is provided, likewise to a pipeline This commit only adds support for the first 3 modes to EXEC commands. Signed-off-by: Vladislav Oleshko --- .github/workflows/ci.yml | 2 + src/server/common.h | 15 +++- src/server/db_slice.cc | 3 - src/server/main_service.cc | 108 ++++++++++++++++++++++---- src/server/multi_test.cc | 43 +++++++++-- src/server/transaction.cc | 150 ++++++++++++++++++++++++------------- src/server/transaction.h | 59 +++++++++++---- 7 files changed, 292 insertions(+), 88 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index b167b9d62..bc16db052 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -79,6 +79,8 @@ jobs: #GLOG_logtostderr=1 GLOG_vmodule=transaction=1,engine_shard_set=1 GLOG_logtostderr=1 GLOG_vmodule=rdb_load=1,rdb_save=1,snapshot=1 ctest -V -L DFLY ./dragonfly_test --gtest_repeat=10 + ./multi_test --multi_exec_mode=2 --gtest_repeat=10 + ./multi_test --multi_exec_mode=3 --gtest_repeat=10 # GLOG_logtostderr=1 GLOG_vmodule=transaction=1,engine_shard_set=1 CTEST_OUTPUT_ON_FAILURE=1 ninja server/test lint-test-chart: runs-on: ubuntu-latest diff --git a/src/server/common.h b/src/server/common.h index 90d4072d3..21884c148 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -58,13 +58,22 @@ struct KeyLockArgs { // Describes key indices. struct KeyIndex { - // if index is non-zero then adds another key index (usually 1). - // relevant for for commands like ZUNIONSTORE/ZINTERSTORE for destination key. - unsigned bonus = 0; unsigned start; unsigned end; // does not include this index (open limit). unsigned step; // 1 for commands like mget. 2 for commands like mset. + // if index is non-zero then adds another key index (usually 1). + // relevant for for commands like ZUNIONSTORE/ZINTERSTORE for destination key. + unsigned bonus = 0; + + static KeyIndex Empty() { + return KeyIndex{0, 0, 0, 0}; + } + + static KeyIndex Range(unsigned start, unsigned end, unsigned step = 1) { + return KeyIndex{start, end, step, 0}; + } + bool HasSingleKey() const { return bonus == 0 && (start + step >= end); } diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 95a313769..c6ba98d86 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -674,7 +674,6 @@ size_t DbSlice::DbSize(DbIndex db_ind) const { } bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) { - DCHECK(!lock_args.args.empty()); DCHECK_GT(lock_args.key_step, 0u); auto& lt = db_arr_[lock_args.db_index]->trans_locks; @@ -701,8 +700,6 @@ bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) { } void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) { - DCHECK(!lock_args.args.empty()); - DVLOG(2) << "Release " << IntentLock::ModeName(mode) << " for " << lock_args.args[0]; if (lock_args.args.size() == 1) { Release(mode, lock_args.db_index, lock_args.args.front(), 1); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index c624d2eac..d495f0993 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -43,6 +43,10 @@ using namespace std; ABSL_FLAG(uint32_t, port, 6379, "Redis port"); ABSL_FLAG(uint32_t, memcache_port, 0, "Memcached port"); +ABSL_FLAG(int, multi_exec_mode, 1, + "Set multi exec atomicity mode: 1 for global, 2 for locking ahead, 3 for locking " + "incrementally"); + namespace dfly { #if __GLIBC__ == 2 && __GLIBC_MINOR__ < 30 @@ -399,7 +403,7 @@ bool IsTransactional(const CommandId* cid) { string_view name{cid->name()}; - if (name == "EVAL" || name == "EVALSHA") + if (name == "EVAL" || name == "EVALSHA" || name == "EXEC") return true; return false; @@ -664,7 +668,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) } } - dfly_cntx->transaction->SetExecCmd(cid); + dfly_cntx->transaction->MultiSwitchCmd(cid); OpStatus st = dfly_cntx->transaction->InitByArgs(dfly_cntx->conn_state.db_index, args); if (st != OpStatus::OK) { @@ -676,9 +680,12 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) if (IsTransactional(cid)) { dist_trans.reset(new Transaction{cid}); - OpStatus st = dist_trans->InitByArgs(dfly_cntx->conn_state.db_index, args); - if (st != OpStatus::OK) - return (*cntx)->SendError(st); + + if (!dist_trans->IsMulti()) { // Multi command initialize themself based on their mode. + if (auto st = dist_trans->InitByArgs(dfly_cntx->conn_state.db_index, args); + st != OpStatus::OK) + return (*cntx)->SendError(st); + } dfly_cntx->transaction = dist_trans.get(); dfly_cntx->last_command_debug.shards_count = dfly_cntx->transaction->GetUniqueShardCnt(); @@ -1036,7 +1043,7 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter, DCHECK(cntx->transaction); if (!eval_args.keys.empty()) - cntx->transaction->Schedule(); + cntx->transaction->StartMultiLockedAhead(cntx->db_index(), eval_args.keys); interpreter->SetGlobalArray("KEYS", eval_args.keys); interpreter->SetGlobalArray("ARGV", eval_args.args); @@ -1099,9 +1106,7 @@ bool CheckWatchedKeyExpiry(ConnectionContext* cntx, const CommandRegistry& regis return OpStatus::OK; }; - VLOG(1) << "Checking expired watch keys"; - - cntx->transaction->SetExecCmd(registry.Find(EXISTS)); + cntx->transaction->MultiSwitchCmd(registry.Find(EXISTS)); cntx->transaction->InitByArgs(cntx->conn_state.db_index, CmdArgList{str_list.data(), str_list.size()}); OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -1123,6 +1128,80 @@ bool IsWatchingOtherDbs(DbIndex db_indx, const ConnectionState::ExecInfo& exec_i return false; } +template void IterateAllKeys(ConnectionState::ExecInfo* exec_info, F&& f) { + for (auto& [dbid, key] : exec_info->watched_keys) + f(MutableSlice{key.data(), key.size()}); + + for (const auto& scmd : exec_info->body) { + if (!IsTransactional(scmd.descr)) + continue; + + auto args = scmd.ArgList(); + + auto key_res = DetermineKeys(scmd.descr, args); + if (!key_res.ok()) + continue; + + auto key_index = key_res.value(); + + for (unsigned i = key_index.start; i < key_index.end; i += key_index.step) + f(args[i]); + + if (key_index.bonus) + f(args[key_index.bonus]); + } +} + +CmdArgVec CollectAllKeys(ConnectionState::ExecInfo* exec_info) { + CmdArgVec out; + out.reserve(exec_info->watched_keys.size() + exec_info->body.size()); + + IterateAllKeys(exec_info, [&out](MutableSlice key) { out.push_back(key); }); + + return out; +} + +vector DetermineKeyShards(ConnectionState::ExecInfo* exec_info) { + vector out(shard_set->size()); + + IterateAllKeys(exec_info, [&out](MutableSlice key) { + ShardId sid = Shard(facade::ToSV(key), shard_set->size()); + out[sid] = true; + }); + return out; +} + +// Return true if transaction was scheduled, false if scheduling was not required. +bool StartMultiExec(DbIndex dbid, Transaction* trans, ConnectionState::ExecInfo* exec_info, + CmdArgVec* tmp_keys) { + bool global = false; + bool transactional = false; + for (const auto& scmd : exec_info->body) { + transactional |= IsTransactional(scmd.descr); + global |= scmd.descr->opt_mask() & CO::GLOBAL_TRANS; + if (global) + break; + } + + if (!transactional && exec_info->watched_keys.empty()) + return false; + + int multi_mode = absl::GetFlag(FLAGS_multi_exec_mode); + CHECK(multi_mode >= 1 && multi_mode <= 3); + + if (global || multi_mode == Transaction::GLOBAL) { + trans->StartMultiGlobal(dbid); + } else if (multi_mode == Transaction::LOCK_AHEAD) { + *tmp_keys = CollectAllKeys(exec_info); + trans->StartMultiLockedAhead(dbid, CmdArgList{*tmp_keys}); + } else { + vector shards = DetermineKeyShards(exec_info); + DCHECK(std::any_of(shards.begin(), shards.end(), [](bool s) { return s; })); + trans->StartMultiLockedIncr(dbid, shards); + } + return true; +} + void Service::Exec(CmdArgList args, ConnectionContext* cntx) { RedisReplyBuilder* rb = (*cntx).operator->(); @@ -1145,6 +1224,9 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { return rb->SendNull(); } + CmdArgVec tmp_keys; + bool scheduled = StartMultiExec(cntx->db_index(), cntx->transaction, &exec_info, &tmp_keys); + // EXEC should not run if any of the watched keys expired. if (!exec_info.watched_keys.empty() && !CheckWatchedKeyExpiry(cntx, registry_)) { cntx->transaction->UnlockMulti(); @@ -1158,7 +1240,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { CmdArgVec str_list; for (auto& scmd : exec_info.body) { - cntx->transaction->SetExecCmd(scmd.descr); + cntx->transaction->MultiSwitchCmd(scmd.descr); if (IsTransactional(scmd.descr)) { OpStatus st = cntx->transaction->InitByArgs(cntx->conn_state.db_index, scmd.ArgList()); if (st != OpStatus::OK) { @@ -1170,7 +1252,9 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { if (rb->GetError()) // checks for i/o error, not logical error. break; } + } + if (scheduled) { VLOG(1) << "Exec unlocking " << exec_info.body.size() << " commands"; cntx->transaction->UnlockMulti(); } @@ -1433,8 +1517,6 @@ using ServiceFunc = void (Service::*)(CmdArgList, ConnectionContext* cntx); void Service::RegisterCommands() { using CI = CommandId; - constexpr auto kExecMask = CO::LOADING | CO::NOSCRIPT | CO::GLOBAL_TRANS; - registry_ << CI{"QUIT", CO::READONLY | CO::FAST, 1, 0, 0, 0}.HFUNC(Quit) << CI{"MULTI", CO::NOSCRIPT | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(Multi) @@ -1445,7 +1527,7 @@ void Service::RegisterCommands() { &EvalValidator) << CI{"EVALSHA", CO::NOSCRIPT | CO::VARIADIC_KEYS, -3, 3, 3, 1}.MFUNC(EvalSha).SetValidator( &EvalValidator) - << CI{"EXEC", kExecMask, 1, 0, 0, 0}.MFUNC(Exec) + << CI{"EXEC", CO::LOADING | CO::NOSCRIPT, 1, 0, 0, 1}.MFUNC(Exec) << CI{"PUBLISH", CO::LOADING | CO::FAST, 3, 0, 0, 0}.MFUNC(Publish) << CI{"SUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.MFUNC(Subscribe) << CI{"UNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -1, 0, 0, 0}.MFUNC(Unsubscribe) diff --git a/src/server/multi_test.cc b/src/server/multi_test.cc index 98af034f0..593fc8b7a 100644 --- a/src/server/multi_test.cc +++ b/src/server/multi_test.cc @@ -12,6 +12,9 @@ #include "server/conn_context.h" #include "server/main_service.h" #include "server/test_utils.h" +#include "server/transaction.h" + +ABSL_DECLARE_FLAG(int, multi_exec_mode); namespace dfly { @@ -352,7 +355,7 @@ TEST_F(MultiTest, Eval) { // a,b important here to spawn multiple shards. resp = Run({"eval", "return redis.call('exists', KEYS[2])", "2", "a", "b"}); - EXPECT_EQ(2, GetDebugInfo().shards_count); + // EXPECT_EQ(2, GetDebugInfo().shards_count); EXPECT_THAT(resp, IntArg(0)); resp = Run({"eval", "return redis.call('hmset', KEYS[1], 'f1', '2222')", "1", "hmap"}); @@ -472,10 +475,11 @@ TEST_F(MultiTest, MultiOOO) { fb0.Join(); auto metrics = GetMetrics(); - // TODO: This is a performance bug that causes substantial latency penatly when - // running multi-transactions or lua scripts. - // We should be able to allow OOO multi-transactions. - EXPECT_EQ(0, metrics.ooo_tx_transaction_cnt); + // OOO works in LOCK_AHEAD mode. + if (absl::GetFlag(FLAGS_multi_exec_mode) == Transaction::LOCK_AHEAD) + EXPECT_EQ(200, metrics.ooo_tx_transaction_cnt); + else + EXPECT_EQ(0, metrics.ooo_tx_transaction_cnt); } // Lua scripts lock their keys ahead and thus can run out of order. @@ -540,4 +544,33 @@ TEST_F(MultiTest, MultiContendedPermutatedKeys) { f2.Join(); } +TEST_F(MultiTest, MultiCauseUnblocking) { + const int kRounds = 5; + vector keys = {kKeySid0, kKeySid1, kKeySid2}; + + auto push = [this, keys, kRounds]() mutable { + int i = 0; + do { + Run({"multi"}); + for (auto k : keys) + Run({"lpush", k, "v"}); + Run({"exec"}); + } while (next_permutation(keys.begin(), keys.end()) || i++ < kRounds); + }; + + auto pop = [this, keys, kRounds]() mutable { + int i = 0; + do { + for (int j = keys.size() - 1; j >= 0; j--) + ASSERT_THAT(Run({"blpop", keys[j], "0"}), ArrLen(2)); + } while (next_permutation(keys.begin(), keys.end()) || i++ < kRounds); + }; + + auto f1 = pp_->at(1)->LaunchFiber([push]() mutable { push(); }); + auto f2 = pp_->at(2)->LaunchFiber([pop]() mutable { pop(); }); + + f1.Join(); + f2.Join(); +} + } // namespace dfly diff --git a/src/server/transaction.cc b/src/server/transaction.cc index dc4ee0f3d..cac4da667 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -45,12 +45,9 @@ Transaction::Transaction(const CommandId* cid) : cid_{cid} { string_view cmd_name(cid_->name()); if (cmd_name == "EXEC" || cmd_name == "EVAL" || cmd_name == "EVALSHA") { multi_.reset(new MultiData); - multi_->multi_opts = cid->opt_mask(); multi_->shard_journal_write.resize(shard_set->size(), false); - if (cmd_name == "EVAL" || cmd_name == "EVALSHA") { - multi_->is_expanding = false; // we lock all the keys at once. - } + multi_->mode = NOT_DETERMINED; } } @@ -70,6 +67,8 @@ void Transaction::InitGlobal() { global_ = true; unique_shard_cnt_ = shard_set->size(); shard_data_.resize(unique_shard_cnt_); + for (auto& sd : shard_data_) + sd.local_mask = ACTIVE; } void Transaction::BuildShardIndex(KeyIndex key_index, bool rev_mapping, @@ -77,9 +76,6 @@ void Transaction::BuildShardIndex(KeyIndex key_index, bool rev_mapping, auto args = cmd_with_full_args_; auto& shard_index = *out; - shard_index.resize(shard_data_.size()); - for (auto& v : shard_index) - v.Clear(); auto add = [this, rev_mapping, &shard_index](uint32_t sid, uint32_t i) { string_view val = ArgS(cmd_with_full_args_, i); @@ -107,8 +103,6 @@ void Transaction::BuildShardIndex(KeyIndex key_index, bool rev_mapping, void Transaction::InitShardData(absl::Span shard_index, size_t num_args, bool rev_mapping) { - bool incremental_locking = multi_ && multi_->is_expanding; - args_.reserve(num_args); if (rev_mapping) reverse_index_.reserve(args_.size()); @@ -124,13 +118,20 @@ void Transaction::InitShardData(absl::Span shard_index, siz sd.arg_count = si.args.size(); sd.arg_start = args_.size(); - // Reset mask to allow locking multiple times. - if (incremental_locking) - sd.local_mask = 0; + if (multi_) { + // Multi transactions can re-intitialize on different shards, so clear ACTIVE flag. + sd.local_mask &= ~ACTIVE; - if (!sd.arg_count) + // If we increase locks, clear KEYLOCK_ACQUIRED to track new locks. + if (multi_->IsIncrLocks()) + sd.local_mask &= ~KEYLOCK_ACQUIRED; + } + + if (sd.arg_count == 0 && !si.requested_active) continue; + sd.local_mask |= ACTIVE; + unique_shard_cnt_++; unique_shard_id_ = i; @@ -158,7 +159,7 @@ void Transaction::InitMultiData(KeyIndex key_index) { auto lock_key = [this, mode, &tmp_uniques](auto key) { if (auto [_, inserted] = tmp_uniques.insert(key); !inserted) return; - if (multi_->is_expanding) { + if (multi_->IsIncrLocks()) { multi_->keys.push_back(key); } else { multi_->lock_counts[key][mode]++; @@ -167,7 +168,7 @@ void Transaction::InitMultiData(KeyIndex key_index) { // With EVAL, we call this function for EVAL itself as well as for each command // for eval. currently, we lock everything only during the eval call. - if (multi_->is_expanding || !multi_->locks_recorded) { + if (multi_->IsIncrLocks() || !multi_->locks_recorded) { for (size_t i = key_index.start; i < key_index.end; i += key_index.step) lock_key(ArgS(args, i)); if (key_index.bonus > 0) @@ -224,7 +225,6 @@ void Transaction::InitByKeys(KeyIndex key_index) { } DCHECK_LT(key_index.start, args.size()); - DCHECK_GT(key_index.start, 0u); bool needs_reverse_mapping = cid_->opt_mask() & CO::REVERSE_MAPPING; bool single_key = !multi_ && key_index.HasSingleKey(); @@ -236,6 +236,8 @@ void Transaction::InitByKeys(KeyIndex key_index) { StoreKeysInArgs(key_index, needs_reverse_mapping); shard_data_.resize(1); + shard_data_.front().local_mask |= ACTIVE; + unique_shard_cnt_ = 1; unique_shard_id_ = Shard(args_.front(), shard_set->size()); @@ -246,7 +248,8 @@ void Transaction::InitByKeys(KeyIndex key_index) { CHECK(key_index.step == 1 || key_index.step == 2); DCHECK(key_index.step == 1 || (args.size() % 2) == 1); - auto& shard_index = tmp_space.shard_cache; // Safe, because flow below is not preemptive. + // Safe, because flow below is not preemptive. + auto& shard_index = tmp_space.GetShardIndex(shard_data_.size()); // Distribute all the arguments by shards. BuildShardIndex(key_index, needs_reverse_mapping, &shard_index); @@ -310,18 +313,49 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) { return OpStatus::OK; } -void Transaction::SetExecCmd(const CommandId* cid) { +void Transaction::StartMultiGlobal(DbIndex dbid) { + CHECK(multi_); + CHECK(shard_data_.empty()); // Make sure default InitByArgs didn't run. + + multi_->mode = GLOBAL; + InitBase(dbid, {}); + InitGlobal(); + + ScheduleInternal(); +} + +void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgList keys) { + DCHECK(multi_); + DCHECK(shard_data_.empty()); // Make sure default InitByArgs didn't run. + + multi_->mode = LOCK_AHEAD; + InitBase(dbid, keys); + InitByKeys(KeyIndex::Range(0, keys.size())); + + ScheduleInternal(); +} + +void Transaction::StartMultiLockedIncr(DbIndex dbid, const vector& shards) { + DCHECK(multi_); + DCHECK(shard_data_.empty()); // Make sure default InitByArgs didn't run. + + multi_->mode = LOCK_INCREMENTAL; + InitBase(dbid, {}); + + auto& shard_index = tmp_space.GetShardIndex(shard_set->size()); + for (size_t i = 0; i < shards.size(); i++) + shard_index[i].requested_active = shards[i]; + + shard_data_.resize(shard_index.size()); + InitShardData(shard_index, 0, false); + + ScheduleInternal(); +} + +void Transaction::MultiSwitchCmd(const CommandId* cid) { DCHECK(multi_); DCHECK(!cb_); - // The order is important, we call Schedule for multi transaction before overriding cid_. - // TODO: The flow is ugly. Consider introducing a proper interface for Multi transactions - // like SetupMulti/TurndownMulti. We already have UnlockMulti that should be part of - // TurndownMulti. - if (txid_ == 0) { - ScheduleInternal(); - } - unique_shard_cnt_ = 0; args_.clear(); cid_ = cid; @@ -354,7 +388,7 @@ bool Transaction::RunInShard(EngineShard* shard) { bool was_suspended = sd.local_mask & SUSPENDED_Q; bool awaked_prerun = (sd.local_mask & AWAKED_Q) != 0; - bool incremental_lock = multi_ && multi_->is_expanding; + bool incremental_lock = multi_ && multi_->IsIncrLocks(); // For multi we unlock transaction (i.e. its keys) in UnlockMulti() call. // Therefore we differentiate between concluding, which says that this specific @@ -374,7 +408,7 @@ bool Transaction::RunInShard(EngineShard* shard) { shard->db_slice().Acquire(mode, GetLockArgs(idx)); } - DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED)); + DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED) || (multi_ && multi_->mode == GLOBAL)); /*************************************************************************/ // Actually running the callback. @@ -482,7 +516,7 @@ void Transaction::ScheduleInternal() { DCHECK_GT(num_shards, 0u); is_active = [&](uint32_t i) { - return num_shards == 1 ? (i == unique_shard_id_) : shard_data_[i].arg_count > 0; + return num_shards == 1 ? (i == unique_shard_id_) : shard_data_[i].local_mask & ACTIVE; }; } @@ -501,12 +535,14 @@ void Transaction::ScheduleInternal() { }; shard_set->RunBriefInParallel(std::move(cb), is_active); + bool ooo_disabled = IsGlobal() || (multi_ && multi_->IsIncrLocks()); + if (success.load(memory_order_acquire) == num_shards) { coordinator_state_ |= COORD_SCHED; // If we granted all locks, we can run out of order. - if (!span_all && lock_granted_cnt.load(memory_order_relaxed) == num_shards) { + if (!ooo_disabled && lock_granted_cnt.load(memory_order_relaxed) == num_shards) { // Currently we don't support OOO for incremental locking. Sp far they are global. - DCHECK(!(multi_ && multi_->is_expanding)); + // DCHECK(!(multi_ && multi_->IsIncrLocks())); coordinator_state_ |= COORD_OOO; } VLOG(2) << "Scheduled " << DebugId() @@ -554,7 +590,7 @@ void Transaction::ScheduleInternal() { } void Transaction::MultiData::AddLocks(IntentLock::Mode mode) { - DCHECK(is_expanding); + DCHECK(IsIncrLocks()); for (auto key : keys) { lock_counts[key][mode]++; @@ -562,6 +598,10 @@ void Transaction::MultiData::AddLocks(IntentLock::Mode mode) { keys.clear(); } +bool Transaction::MultiData::IsIncrLocks() const { + return mode == LOCK_INCREMENTAL; +} + // Optimized "Schedule and execute" function for the most common use-case of a single hop // transactions like set/mset/mget etc. Does not apply for more complicated cases like RENAME or // BLPOP where a data must be read from multiple shards before performing another hop. @@ -611,7 +651,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { if (!multi_) // Multi schedule in advance. ScheduleInternal(); - if (multi_ && multi_->is_expanding) + if (multi_ && multi_->IsIncrLocks()) multi_->AddLocks(Mode()); ExecuteAsync(); @@ -668,11 +708,11 @@ uint32_t Transaction::CalcMultiNumOfShardJournals() const { } void Transaction::Schedule() { - if (multi_ && multi_->is_expanding) { + if (multi_ && multi_->IsIncrLocks()) multi_->AddLocks(Mode()); - } else { + + if (!multi_) ScheduleInternal(); - } } // Runs in coordinator thread. @@ -943,6 +983,7 @@ bool Transaction::CancelShardCb(EngineShard* shard) { if (sd.local_mask & KEYLOCK_ACQUIRED) { auto mode = Mode(); auto lock_args = GetLockArgs(shard->shard_id()); + DCHECK(lock_args.args.size() > 0 || (multi_ && multi_->mode == LOCK_INCREMENTAL)); shard->db_slice().Release(mode, lock_args); sd.local_mask &= ~KEYLOCK_ACQUIRED; } @@ -956,7 +997,7 @@ bool Transaction::CancelShardCb(EngineShard* shard) { // runs in engine-shard thread. ArgSlice Transaction::GetShardArgs(ShardId sid) const { - DCHECK(!args_.empty()); + DCHECK(!args_.empty() || (multi_ && multi_->IsIncrLocks())); // We can read unique_shard_cnt_ only because ShardArgsInShard is called after IsArmedInShard // barrier. @@ -1061,25 +1102,24 @@ void Transaction::UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard void Transaction::UnlockMultiShardCb(const std::vector& sharded_keys, EngineShard* shard, uint32_t shard_journals_cnt) { auto journal = shard->journal(); - if (journal != nullptr && multi_->shard_journal_write[shard->shard_id()] == true) { journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_journals_cnt, {}, true); } - if (multi_->multi_opts & CO::GLOBAL_TRANS) { + if (multi_->mode == GLOBAL) { shard->shard_lock()->Release(IntentLock::EXCLUSIVE); - } + } else { + ShardId sid = shard->shard_id(); + for (const auto& k_v : sharded_keys[sid]) { + auto release = [&](IntentLock::Mode mode) { + if (k_v.second[mode]) { + shard->db_slice().Release(mode, db_index_, k_v.first, k_v.second[mode]); + } + }; - ShardId sid = shard->shard_id(); - for (const auto& k_v : sharded_keys[sid]) { - auto release = [&](IntentLock::Mode mode) { - if (k_v.second[mode]) { - shard->db_slice().Release(mode, db_index_, k_v.first, k_v.second[mode]); - } - }; - - release(IntentLock::SHARED); - release(IntentLock::EXCLUSIVE); + release(IntentLock::SHARED); + release(IntentLock::EXCLUSIVE); + } } auto& sd = shard_data_[SidToId(shard->shard_id())]; @@ -1241,7 +1281,8 @@ void Transaction::BreakOnShutdown() { } OpResult DetermineKeys(const CommandId* cid, CmdArgList args) { - DCHECK_EQ(0u, cid->opt_mask() & CO::GLOBAL_TRANS); + if (cid->opt_mask() & CO::GLOBAL_TRANS) + return KeyIndex::Empty(); KeyIndex key_index; @@ -1288,4 +1329,11 @@ OpResult DetermineKeys(const CommandId* cid, CmdArgList args) { return key_index; } +std::vector& Transaction::TLTmpSpace::GetShardIndex(unsigned size) { + shard_cache.resize(size); + for (auto& v : shard_cache) + v.Clear(); + return shard_cache; +} + } // namespace dfly diff --git a/src/server/transaction.h b/src/server/transaction.h index 515c3f953..7166dc079 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -63,14 +63,30 @@ class Transaction { // Provides keys to block on for specific shard. using WaitKeysProvider = std::function; + // Modes in which a multi transaction can run. + enum MultiMode { + // Invalid state. + NOT_DETERMINED = 0, + // Global transaction. + GLOBAL = 1, + // Keys are locked ahead during Schedule. + LOCK_AHEAD = 2, + // Keys are locked incrementally during each new command. + // The shards to schedule on are detemined ahead and remain fixed. + LOCK_INCREMENTAL = 3, + // Each command is executed separately. Equivalent to a pipeline. + NOT_ATOMIC = 4, + }; + // State on specific shard. enum LocalMask : uint16_t { - ARMED = 1, // Whether callback cb_ is set - OUT_OF_ORDER = 1 << 1, // Whether its running out of order - KEYLOCK_ACQUIRED = 1 << 2, // Whether its key locks are acquired - SUSPENDED_Q = 1 << 3, // Whether is suspened (by WatchInShard()) - AWAKED_Q = 1 << 4, // Whether it was awakened (by NotifySuspended()) - EXPIRED_Q = 1 << 5, // Whether it timed out and should be dropped + ACTIVE = 1, // Set on all active shards. + ARMED = 1 << 1, // Whether callback cb_ is set + OUT_OF_ORDER = 1 << 2, // Whether its running out of order + KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired + SUSPENDED_Q = 1 << 4, // Whether is suspened (by WatchInShard()) + AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended()) + EXPIRED_Q = 1 << 6, // Whether it timed out and should be dropped }; public: @@ -125,11 +141,20 @@ class Transaction { renabled_auto_journal_.store(true, std::memory_order_relaxed); } + // Start multi in GLOBAL mode. + void StartMultiGlobal(DbIndex dbid); + + // Start multi in LOCK_AHEAD mode with given keys. + void StartMultiLockedAhead(DbIndex dbid, CmdArgList keys); + + // Start multi in LOCK_INCREMENTAL mode on given shards. + void StartMultiLockedIncr(DbIndex dbid, const std::vector& shards); + // Unlock key locks of a multi transaction. void UnlockMulti(); - // Reset CommandId to be executed when executing MULTI/EXEC or from script. - void SetExecCmd(const CommandId* cid); + // Set new command for multi transaction. + void MultiSwitchCmd(const CommandId* cid); // Returns locking arguments needed for DbSlice to Acquire/Release transactional locks. // Runs in the shard thread. @@ -249,6 +274,11 @@ class Transaction { // Increase lock counts for all current keys for mode. Clear keys. void AddLocks(IntentLock::Mode mode); + // Whether it locks incrementally. + bool IsIncrLocks() const; + + MultiMode mode; + absl::flat_hash_map lock_counts; std::vector keys; @@ -257,10 +287,7 @@ class Transaction { // executing multi-command. For every write to a shard journal, the corresponding index in the // vector is marked as true. absl::InlinedVector shard_journal_write; - uint32_t multi_opts = 0; // options of the parent transaction. - // Whether this transaction can lock more keys during its progress. - bool is_expanding = true; bool locks_recorded = false; }; @@ -276,10 +303,12 @@ class Transaction { }; struct PerShardCache { + bool requested_active = false; // Activate on shard geradless of presence of keys. std::vector args; std::vector original_index; void Clear() { + requested_active = false; args.clear(); original_index.clear(); } @@ -377,7 +406,7 @@ class Transaction { f(shard_data_[SidToId(i)], i); } else { for (ShardId i = 0; i < shard_data_.size(); ++i) { - if (auto& sd = shard_data_[i]; global_ || sd.arg_count > 0) { + if (auto& sd = shard_data_[i]; global_ || (sd.local_mask & ACTIVE)) { f(sd, i); } } @@ -434,8 +463,12 @@ class Transaction { private: struct TLTmpSpace { - std::vector shard_cache; absl::flat_hash_set uniq_keys; + + std::vector& GetShardIndex(unsigned size); + + private: + std::vector shard_cache; }; static thread_local TLTmpSpace tmp_space;