From 84d09800c3c90ce50d276985bf27f37097e0eafb Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 4 Jul 2023 16:51:53 +0300 Subject: [PATCH] chore: refresh helio (#1506) In addition, add more states to tx local_mask to allow easier debugging. Finally, add check-fail to verify tx invariants in order to prevent reaching errorneous states that are nearly impossible to analyze. Signed-off-by: Roman Gershman --- helio | 2 +- src/server/engine_shard_set.cc | 5 +++-- src/server/list_family.cc | 11 ++++------- src/server/transaction.cc | 6 +++++- src/server/transaction.h | 5 +++-- 5 files changed, 16 insertions(+), 13 deletions(-) diff --git a/helio b/helio index 30b233f58..003cb1cc7 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 30b233f5882f34e53cb410d4bdf6d315260c05cf +Subproject commit 003cb1cc799db443e5189d5b10ee95758d547d66 diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 66a3aea74..ca4c583d5 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -295,8 +295,9 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { uint16_t trans_mask = trans ? trans->GetLocalMask(sid) : 0; if (trans_mask & Transaction::AWAKED_Q) { - DCHECK(continuation_trans_ == nullptr) - << continuation_trans_->DebugId() << " when polling " << trans->DebugId(); + CHECK(continuation_trans_ == nullptr) + << continuation_trans_->DebugId() << " when polling " << trans->DebugId() + << "cont_mask: " << continuation_trans_->GetLocalMask(sid) << " vs " << trans_mask; bool keep = trans->RunInShard(this, false); if (keep) { diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 738e7875e..594e35338 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -155,7 +155,7 @@ struct CircularMessages { // Temporary debug measures. Trace what happens with list keys on given shard. // Used to recover logs for BLPOP failures. See OpBPop. -thread_local CircularMessages debugMessages{100}; +thread_local CircularMessages debugMessages{50}; class BPopPusher { public: @@ -200,7 +200,7 @@ std::string OpBPop(Transaction* t, EngineShard* shard, std::string_view key, Lis PrimeIterator it = *it_res; quicklist* ql = GetQL(it->second); - absl::StrAppend(debugMessages.Next(), "OpBPop", key, " by ", t->DebugId()); + absl::StrAppend(debugMessages.Next(), "OpBPop: ", key, " by ", t->DebugId()); db_slice.PreUpdate(t->GetDbIndex(), it); std::string value = ListPop(dir, ql); @@ -356,9 +356,9 @@ OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir d string tmp; string_view key = it->first.GetSlice(&tmp); - absl::StrAppend(debugMessages.Next(), "OpPush AwakeWatched: ", string{key}, " by ", - op_args.tx->DebugId(), " expire: ", it->second.HasExpire()); es->blocking_controller()->AwakeWatched(op_args.db_cntx.db_index, key); + absl::StrAppend(debugMessages.Next(), "OpPush AwakeWatched: ", key, " by ", + op_args.tx->DebugId()); } } else { es->db_slice().PostUpdate(op_args.db_cntx.db_index, it, key, true); @@ -692,7 +692,6 @@ OpStatus OpTrim(const OpArgs& op_args, string_view key, long start, long end) { db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); if (quicklistCount(ql) == 0) { - absl::StrAppend(debugMessages.Next(), "OpTrim Del: ", key, " by ", op_args.tx->DebugId()); CHECK(db_slice.Del(op_args.db_cntx.db_index, it)); } return OpStatus::OK; @@ -1198,8 +1197,6 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn Transaction* transaction = cntx->transaction; - absl::StrAppend(debugMessages.Next(), "BPopGeneric by ", transaction->DebugId()); - std::string popped_value; auto cb = [dir, &popped_value](Transaction* t, EngineShard* shard, std::string_view key) { popped_value = OpBPop(t, shard, key, dir); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 4d7bdd854..3d6bd242c 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -455,6 +455,10 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED) || (multi_ && multi_->mode == GLOBAL)); + if (txq_ooo) { + DCHECK(sd.local_mask & OUT_OF_ORDER); + } + /*************************************************************************/ // Actually running the callback. // If you change the logic here, also please change the logic @@ -530,7 +534,6 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { // 1: to go over potential wakened keys, verify them and activate watch queues. // 2: if this transaction was notified and finished running - to remove it from the head // of the queue and notify the next one. - // RunStep is also called for global transactions because of commands like MOVE. if (auto* bcontroller = shard->blocking_controller(); bcontroller) { if (awaked_prerun || was_suspended) { bcontroller->FinalizeWatched(largs, this); @@ -1227,6 +1230,7 @@ void Transaction::UnlockMultiShardCb(const std::vector& sharded_keys, E } auto& sd = shard_data_[SidToId(shard->shard_id())]; + sd.local_mask |= UNLOCK_MULTI; // It does not have to be that all shards in multi transaction execute this tx. // Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from diff --git a/src/server/transaction.h b/src/server/transaction.h index 3d171d95c..c6c7efbba 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -129,11 +129,12 @@ class Transaction { enum LocalMask : uint16_t { ACTIVE = 1, // Set on all active shards. // UNUSED = 1 << 1, - OUT_OF_ORDER = 1 << 2, // Whether its running out of order + OUT_OF_ORDER = 1 << 2, // Whether it can run as out of order KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired SUSPENDED_Q = 1 << 4, // Whether is suspened (by WatchInShard()) AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended()) EXPIRED_Q = 1 << 6, // Whether it timed out and should be dropped + UNLOCK_MULTI = 1 << 7, // Whether this shard executed UnlockMultiShardCb }; public: @@ -342,7 +343,7 @@ class Transaction { uint32_t pq_pos = TxQueue::kEnd; // Accessed within shard thread. - // Bitmask of LocalState enums. + // Bitmask of LocalMask enums. uint16_t local_mask = 0; // Index of key relative to args in shard that the shard was woken up after blocking wait.