mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
chore: refresh helio (#1506)
In addition, add more states to tx local_mask to allow easier debugging. Finally, add check-fail to verify tx invariants in order to prevent reaching errorneous states that are nearly impossible to analyze. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
ea0364329e
commit
84d09800c3
5 changed files with 16 additions and 13 deletions
2
helio
2
helio
|
@ -1 +1 @@
|
|||
Subproject commit 30b233f5882f34e53cb410d4bdf6d315260c05cf
|
||||
Subproject commit 003cb1cc799db443e5189d5b10ee95758d547d66
|
|
@ -295,8 +295,9 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
|
|||
|
||||
uint16_t trans_mask = trans ? trans->GetLocalMask(sid) : 0;
|
||||
if (trans_mask & Transaction::AWAKED_Q) {
|
||||
DCHECK(continuation_trans_ == nullptr)
|
||||
<< continuation_trans_->DebugId() << " when polling " << trans->DebugId();
|
||||
CHECK(continuation_trans_ == nullptr)
|
||||
<< continuation_trans_->DebugId() << " when polling " << trans->DebugId()
|
||||
<< "cont_mask: " << continuation_trans_->GetLocalMask(sid) << " vs " << trans_mask;
|
||||
|
||||
bool keep = trans->RunInShard(this, false);
|
||||
if (keep) {
|
||||
|
|
|
@ -155,7 +155,7 @@ struct CircularMessages {
|
|||
|
||||
// Temporary debug measures. Trace what happens with list keys on given shard.
|
||||
// Used to recover logs for BLPOP failures. See OpBPop.
|
||||
thread_local CircularMessages debugMessages{100};
|
||||
thread_local CircularMessages debugMessages{50};
|
||||
|
||||
class BPopPusher {
|
||||
public:
|
||||
|
@ -200,7 +200,7 @@ std::string OpBPop(Transaction* t, EngineShard* shard, std::string_view key, Lis
|
|||
PrimeIterator it = *it_res;
|
||||
quicklist* ql = GetQL(it->second);
|
||||
|
||||
absl::StrAppend(debugMessages.Next(), "OpBPop", key, " by ", t->DebugId());
|
||||
absl::StrAppend(debugMessages.Next(), "OpBPop: ", key, " by ", t->DebugId());
|
||||
|
||||
db_slice.PreUpdate(t->GetDbIndex(), it);
|
||||
std::string value = ListPop(dir, ql);
|
||||
|
@ -356,9 +356,9 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
|
|||
string tmp;
|
||||
string_view key = it->first.GetSlice(&tmp);
|
||||
|
||||
absl::StrAppend(debugMessages.Next(), "OpPush AwakeWatched: ", string{key}, " by ",
|
||||
op_args.tx->DebugId(), " expire: ", it->second.HasExpire());
|
||||
es->blocking_controller()->AwakeWatched(op_args.db_cntx.db_index, key);
|
||||
absl::StrAppend(debugMessages.Next(), "OpPush AwakeWatched: ", key, " by ",
|
||||
op_args.tx->DebugId());
|
||||
}
|
||||
} else {
|
||||
es->db_slice().PostUpdate(op_args.db_cntx.db_index, it, key, true);
|
||||
|
@ -692,7 +692,6 @@ OpStatus OpTrim(const OpArgs& op_args, string_view key, long start, long end) {
|
|||
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);
|
||||
|
||||
if (quicklistCount(ql) == 0) {
|
||||
absl::StrAppend(debugMessages.Next(), "OpTrim Del: ", key, " by ", op_args.tx->DebugId());
|
||||
CHECK(db_slice.Del(op_args.db_cntx.db_index, it));
|
||||
}
|
||||
return OpStatus::OK;
|
||||
|
@ -1198,8 +1197,6 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn
|
|||
|
||||
Transaction* transaction = cntx->transaction;
|
||||
|
||||
absl::StrAppend(debugMessages.Next(), "BPopGeneric by ", transaction->DebugId());
|
||||
|
||||
std::string popped_value;
|
||||
auto cb = [dir, &popped_value](Transaction* t, EngineShard* shard, std::string_view key) {
|
||||
popped_value = OpBPop(t, shard, key, dir);
|
||||
|
|
|
@ -455,6 +455,10 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
|
|||
|
||||
DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED) || (multi_ && multi_->mode == GLOBAL));
|
||||
|
||||
if (txq_ooo) {
|
||||
DCHECK(sd.local_mask & OUT_OF_ORDER);
|
||||
}
|
||||
|
||||
/*************************************************************************/
|
||||
// Actually running the callback.
|
||||
// If you change the logic here, also please change the logic
|
||||
|
@ -530,7 +534,6 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
|
|||
// 1: to go over potential wakened keys, verify them and activate watch queues.
|
||||
// 2: if this transaction was notified and finished running - to remove it from the head
|
||||
// of the queue and notify the next one.
|
||||
// RunStep is also called for global transactions because of commands like MOVE.
|
||||
if (auto* bcontroller = shard->blocking_controller(); bcontroller) {
|
||||
if (awaked_prerun || was_suspended) {
|
||||
bcontroller->FinalizeWatched(largs, this);
|
||||
|
@ -1227,6 +1230,7 @@ void Transaction::UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, E
|
|||
}
|
||||
|
||||
auto& sd = shard_data_[SidToId(shard->shard_id())];
|
||||
sd.local_mask |= UNLOCK_MULTI;
|
||||
|
||||
// It does not have to be that all shards in multi transaction execute this tx.
|
||||
// Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from
|
||||
|
|
|
@ -129,11 +129,12 @@ class Transaction {
|
|||
enum LocalMask : uint16_t {
|
||||
ACTIVE = 1, // Set on all active shards.
|
||||
// UNUSED = 1 << 1,
|
||||
OUT_OF_ORDER = 1 << 2, // Whether its running out of order
|
||||
OUT_OF_ORDER = 1 << 2, // Whether it can run as out of order
|
||||
KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired
|
||||
SUSPENDED_Q = 1 << 4, // Whether is suspened (by WatchInShard())
|
||||
AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended())
|
||||
EXPIRED_Q = 1 << 6, // Whether it timed out and should be dropped
|
||||
UNLOCK_MULTI = 1 << 7, // Whether this shard executed UnlockMultiShardCb
|
||||
};
|
||||
|
||||
public:
|
||||
|
@ -342,7 +343,7 @@ class Transaction {
|
|||
uint32_t pq_pos = TxQueue::kEnd;
|
||||
|
||||
// Accessed within shard thread.
|
||||
// Bitmask of LocalState enums.
|
||||
// Bitmask of LocalMask enums.
|
||||
uint16_t local_mask = 0;
|
||||
|
||||
// Index of key relative to args in shard that the shard was woken up after blocking wait.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue