From c96f637f737c087262bb253ec2b7ecad04fb5c90 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Fri, 17 Mar 2023 08:30:17 +0200 Subject: [PATCH] chore: some pytests and logging improvements 1. pytest extensions and fixes - allows running them with the existing local server by providing its port (--existing ). 2. Extend "DEBUG WATCHED" command to provide more information about watched state. 3. Improve debug/vlog printings around the code. This noisy PR is a preparation before BRPOP fix that will follow later. Signed-off-by: Roman Gershman --- src/server/blocking_controller.cc | 9 +-- src/server/blocking_controller.h | 4 ++ src/server/debugcmd.cc | 12 +++- src/server/dragonfly_test.cc | 2 +- src/server/engine_shard_set.cc | 5 +- src/server/list_family.cc | 54 ++++++++++------- src/server/list_family_test.cc | 25 ++++++-- src/server/test_utils.cc | 2 +- src/server/transaction.cc | 70 +++++++++-------------- src/server/transaction.h | 4 +- tests/dragonfly/__init__.py | 17 ++++-- tests/dragonfly/conftest.py | 7 ++- tests/dragonfly/list_family_test.py | 2 +- tests/dragonfly/redis_replicaiton_test.py | 6 +- 14 files changed, 124 insertions(+), 95 deletions(-) diff --git a/src/server/blocking_controller.cc b/src/server/blocking_controller.cc index 213c587a3..68d943ac4 100644 --- a/src/server/blocking_controller.cc +++ b/src/server/blocking_controller.cc @@ -21,7 +21,7 @@ using namespace std; struct WatchItem { Transaction* trans; - Transaction* get() { + Transaction* get() const { return trans; } @@ -64,7 +64,7 @@ BlockingController::~BlockingController() { } void BlockingController::DbWatchTable::RemoveEntry(WatchQueueMap::iterator it) { - DVLOG(1) << "Erasing watchqueue key " << it->first; + DVLOG(2) << "Erasing watchqueue key " << it->first; awakened_keys.erase(it->first); queue_map.erase(it); @@ -137,8 +137,6 @@ void BlockingController::RunStep(Transaction* completed_t) { } void BlockingController::AddWatched(ArgSlice keys, Transaction* trans) { - VLOG(1) << "AddWatched [" << owner_->shard_id() << "] " << trans->DebugId(); - auto [dbit, added] = watched_dbs_.emplace(trans->GetDbIndex(), nullptr); if (added) { dbit->second.reset(new DbWatchTable); @@ -160,7 +158,7 @@ void BlockingController::AddWatched(ArgSlice keys, Transaction* trans) { if (last == trans) continue; } - DVLOG(2) << "Emplace " << trans << " " << trans->DebugId() << " to watch " << key; + DVLOG(2) << "Emplace " << trans->DebugId() << " to watch " << key; res->second->items.emplace_back(trans); } } @@ -219,7 +217,6 @@ void BlockingController::AwakeWatched(DbIndex db_index, string_view db_key) { } } -// Internal function called from RunStep(). // Marks the queue as active and notifies the first transaction in the queue. void BlockingController::NotifyWatchQueue(std::string_view key, WatchQueueMap* wqm) { auto w_it = wqm->find(key); diff --git a/src/server/blocking_controller.h b/src/server/blocking_controller.h index 5964177a6..a5f2ae361 100644 --- a/src/server/blocking_controller.h +++ b/src/server/blocking_controller.h @@ -24,6 +24,10 @@ class BlockingController { return !awakened_transactions_.empty(); } + const auto& awakened_transactions() const { + return awakened_transactions_; + } + // Iterates over awakened key candidates in each db and moves verified ones into // global verified_awakened_ array. // Returns true if there are active awakened keys, false otherwise. diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index dad58b5cb..acbbac250 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -38,6 +38,7 @@ using namespace facade; namespace fs = std::filesystem; using absl::GetFlag; using absl::StrAppend; +using absl::StrCat; struct PopulateBatch { DbIndex dbid; @@ -418,9 +419,11 @@ void DebugCmd::Inspect(string_view key) { } void DebugCmd::Watched() { - vector watched_keys; boost::fibers::mutex mu; + vector watched_keys; + vector awaked_trans; + auto cb = [&](EngineShard* shard) { auto* bc = shard->blocking_controller(); if (bc) { @@ -428,10 +431,17 @@ void DebugCmd::Watched() { lock_guard lk(mu); watched_keys.insert(watched_keys.end(), keys.begin(), keys.end()); + for (auto* tx : bc->awakened_transactions()) { + awaked_trans.push_back(StrCat("[", shard->shard_id(), "] ", tx->DebugId())); + } } }; shard_set->RunBlockingInParallel(cb); + (*cntx_)->StartArray(4); + (*cntx_)->SendBulkString("awaked"); + (*cntx_)->SendStringArr(awaked_trans); + (*cntx_)->SendBulkString("watched"); (*cntx_)->SendStringArr(watched_keys); } diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 54fe2e5d3..519ffc070 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -185,7 +185,7 @@ TEST_F(DflyEngineTest, EvalBug713b) { fibers_ext::Fiber fibers[kNumFibers]; for (unsigned j = 0; j < kNumFibers; ++j) { - fibers[j] = pp_->at(1)->LaunchFiber([=, this] { + fibers[j] = pp_->at(1)->LaunchFiber([j, script, this] { for (unsigned i = 0; i < 50; ++i) { Run(StrCat("fb", j), {"eval", script, "3", kKeySid0, kKeySid1, kKeySid2}); } diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index bd768ecfb..1aadf9d0f 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -294,7 +294,6 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { DCHECK(continuation_trans_ == nullptr) << continuation_trans_->DebugId() << " when polling " << trans->DebugId(); - CHECK_EQ(committed_txid_, trans->GetNotifyTxid()); bool keep = trans->RunInShard(this); if (keep) { return; @@ -374,15 +373,13 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { DVLOG(1) << "Skipped TxQueue " << continuation_trans_; } - // we need to run trans if it's OOO or when trans is blocked in this shard and should - // be treated here as noop. + // we need to run trans if it's OOO or when trans is blocked in this shard. bool should_run = trans_mask & (Transaction::OUT_OF_ORDER | Transaction::SUSPENDED_Q); // It may be that there are other transactions that touch those keys but they necessary ordered // after trans in the queue, hence it's safe to run trans out of order. if (trans && should_run) { DCHECK(trans != head); - DCHECK(trans_mask & Transaction::ARMED); dbg_id.clear(); diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 4a08b9cab..3600ab951 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -241,21 +241,21 @@ class BPopPusher { BPopper::BPopper(ListDir dir) : dir_(dir) { } -OpStatus BPopper::Run(Transaction* t, unsigned msec) { +OpStatus BPopper::Run(Transaction* trans, unsigned msec) { time_point tp = msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max(); - bool is_multi = t->IsMulti(); - t->Schedule(); + bool is_multi = trans->IsMulti(); + trans->Schedule(); auto* stats = ServerState::tl_connection_stats(); - OpResult result = FindFirst(t); + OpResult result = FindFirst(trans); if (result.status() == OpStatus::KEY_NOTFOUND) { if (is_multi) { // close transaction and return. auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; - t->Execute(std::move(cb), true); + trans->Execute(std::move(cb), true); return OpStatus::TIMED_OUT; } @@ -265,34 +265,37 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) { return t->GetShardArgs(shard->shard_id()); }; + VLOG(1) << "Blocking BLPOP " << trans->DebugId(); ++stats->num_blocked_clients; - bool wait_succeeded = t->WaitOnWatch(tp, std::move(wcb)); + bool wait_succeeded = trans->WaitOnWatch(tp, std::move(wcb)); --stats->num_blocked_clients; if (!wait_succeeded) return OpStatus::TIMED_OUT; // Now we have something for sure. - result = FindFirst(t); // retry - must find something. + result = FindFirst(trans); // retry - must find something. } - // We got here if (!result) { + // Could be the wrong-type error. // cleanups, locks removal etc. auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; - t->Execute(std::move(cb), true); + trans->Execute(std::move(cb), true); + + DCHECK_NE(result.status(), OpStatus::KEY_NOTFOUND); return result.status(); } - VLOG(1) << "Popping an element " << t->DebugId(); + VLOG(1) << "Popping an element " << trans->DebugId(); ff_result_ = move(result.value()); auto cb = [this](Transaction* t, EngineShard* shard) { Pop(t, shard); return OpStatus::OK; }; - t->Execute(std::move(cb), true); + trans->Execute(std::move(cb), true); return OpStatus::OK; } @@ -302,14 +305,16 @@ void BPopper::Pop(Transaction* t, EngineShard* shard) { ff_result_.key.GetString(&key_); auto& db_slice = shard->db_slice(); auto it_res = db_slice.Find(t->GetDbContext(), key_, OBJ_LIST); - CHECK(it_res); // must exist and must be ok. + CHECK(it_res) << t->DebugId() << " " << key_; // must exist and must be ok. PrimeIterator it = *it_res; quicklist* ql = GetQL(it->second); + DVLOG(2) << "popping from " << key_ << " " << t->DebugId(); db_slice.PreUpdate(t->GetDbIndex(), it); value_ = ListPop(dir_, ql); db_slice.PostUpdate(t->GetDbIndex(), it, key_); if (quicklistCount(ql) == 0) { + DVLOG(1) << "deleting key " << key_ << " " << t->DebugId(); CHECK(shard->db_slice().Del(t->GetDbIndex(), it)); } OpArgs op_args = t->GetOpArgs(shard); @@ -408,10 +413,7 @@ OpResult Peek(const OpArgs& op_args, string_view key, ListDir dir, bool } OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir dir, - bool skip_notexist, absl::Span vals, - bool journal_rewrite) { - DVLOG(1) << "OpPush " << key; - + bool skip_notexist, ArgSlice vals, bool journal_rewrite) { EngineShard* es = op_args.shard; PrimeIterator it; bool new_key = false; @@ -430,6 +432,7 @@ OpResult OpPush(const OpArgs& op_args, std::string_view key, ListDir d } quicklist* ql = nullptr; + DVLOG(1) << "OpPush " << key << " new_key " << new_key; if (new_key) { robj* o = createQuicklistObject(); @@ -544,16 +547,21 @@ OpResult MoveTwoShards(Transaction* trans, string_view src, string_view // Everything is ok, lets proceed with the mutations. auto cb = [&](Transaction* t, EngineShard* shard) { auto args = t->GetShardArgs(shard->shard_id()); - bool is_dest = args.front() == dest; + auto key = args.front(); + bool is_dest = (key == dest); OpArgs op_args = t->GetOpArgs(shard); if (is_dest) { string_view val{find_res[0].value()}; - absl::Span span{&val, 1}; - OpPush(op_args, args.front(), dest_dir, false, span, true); + DVLOG(1) << "Pushing value: " << val << " to list: " << dest; + + ArgSlice span{&val, 1}; + OpPush(op_args, key, dest_dir, false, span, true); } else { - OpPop(op_args, args.front(), src_dir, 1, false, true); + DVLOG(1) << "Popping value from list: " << key; + OpPop(op_args, key, src_dir, 1, false, true); } + return OpStatus::OK; }; trans->Execute(move(cb), true); @@ -1288,7 +1296,7 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn if (timeout < 0) { return (*cntx)->SendError("timeout is negative"); } - VLOG(1) << "BLPop start " << timeout; + VLOG(1) << "BPop timeout(" << timeout << ")"; Transaction* transaction = cntx->transaction; BPopper popper(dir); @@ -1297,13 +1305,15 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn if (result == OpStatus::OK) { auto res = popper.result(); - VLOG(1) << "BLPop returned from " << res.first; // key. + DVLOG(1) << "BPop " << transaction->DebugId() << " popped from key " << res.first; // key. std::string_view str_arr[2] = {res.first, res.second}; return (*cntx)->SendStringArr(str_arr); } + DVLOG(1) << "result for " << transaction->DebugId() << " is " << result; + switch (result) { case OpStatus::WRONG_TYPE: return (*cntx)->SendError(kWrongTypeErr); diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index c7827dd60..3d8d3e534 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -21,6 +21,7 @@ using namespace testing; using namespace std; using namespace util; namespace fibers = ::boost::fibers; +using absl::StrCat; namespace dfly { @@ -30,7 +31,7 @@ class ListFamilyTest : public BaseFamilyTest { num_threads_ = 4; } - unsigned NumWatched() { + static unsigned NumWatched() { atomic_uint32_t sum{0}; shard_set->RunBriefInParallel([&](EngineShard* es) { auto* bc = es->blocking_controller(); @@ -40,6 +41,17 @@ class ListFamilyTest : public BaseFamilyTest { return sum.load(); } + + static bool HasAwakened() { + atomic_uint32_t sum{0}; + shard_set->RunBriefInParallel([&](EngineShard* es) { + auto* bc = es->blocking_controller(); + if (bc) + sum.fetch_add(bc->HasAwakedTransaction(), memory_order_relaxed); + }); + + return sum.load() > 0; + } }; const char kKey1[] = "x"; @@ -330,8 +342,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) { auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"}); - auto watched = Run({"debug", "watched"}); - ASSERT_THAT(watched, ArrLen(0)); + EXPECT_EQ(0, NumWatched()); }); WaitUntilLocked(0, kKey1); @@ -362,10 +373,8 @@ TEST_F(ListFamilyTest, BPopTwoKeysSameShard) { auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { blpop_resp = Run({"blpop", "x", "y", "0"}); - auto watched = Run({"debug", "watched"}); - EXPECT_FALSE(IsLocked(0, "y")); - ASSERT_THAT(watched, ArrLen(0)); + ASSERT_EQ(0, NumWatched()); }); WaitUntilLocked(0, "x"); @@ -674,6 +683,7 @@ TEST_F(ListFamilyTest, TwoQueueBug451) { for (auto& f : fbs) f.Join(); + ASSERT_EQ(0, NumWatched()); } TEST_F(ListFamilyTest, BRPopLPushSingleShard) { @@ -731,6 +741,7 @@ TEST_F(ListFamilyTest, BRPopLPushTwoShards) { resp = Run({"lrange", "z", "0", "-1"}); ASSERT_EQ(resp, "val"); Run({"del", "z"}); + ASSERT_EQ(0, NumWatched()); // Run the fiber at creation. auto fb0 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { @@ -754,6 +765,8 @@ TEST_F(ListFamilyTest, BRPopLPushTwoShards) { ASSERT_FALSE(IsLocked(0, "x")); ASSERT_FALSE(IsLocked(0, "z")); ASSERT_EQ(0, NumWatched()); + ASSERT_FALSE(HasAwakened()); + // TODO: there is a bug here. // we do not wake the dest shard, when source is awaked which prevents // the atomicity and causes the first bug as well. diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index da688c0a9..a43ed98ba 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -199,7 +199,7 @@ RespExpr BaseFamilyTest::Run(std::string_view id, ArgSlice slice) { auto* context = conn_wrapper->cmd_cntx(); - DCHECK(context->transaction == nullptr); + DCHECK(context->transaction == nullptr) << id; service_->DispatchCommand(CmdArgList{args}, context); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 7c8fb55dc..766e92860 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -53,7 +53,7 @@ Transaction::Transaction(const CommandId* cid, uint32_t thread_index) } Transaction::~Transaction() { - DVLOG(2) << "Transaction " << StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, ")") + DVLOG(3) << "Transaction " << StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, ")") << " destroyed"; } @@ -402,16 +402,16 @@ bool Transaction::RunInShard(EngineShard* shard) { // because Scheduling is done before multi-exec batch is executed. Therefore we // lock keys right before the execution of each statement. - VLOG(2) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id(); - unsigned idx = SidToId(shard->shard_id()); auto& sd = shard_data_[idx]; DCHECK(sd.local_mask & ARMED); sd.local_mask &= ~ARMED; + VLOG(2) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id() << " " << sd.local_mask; + bool was_suspended = sd.local_mask & SUSPENDED_Q; - bool awaked_prerun = (sd.local_mask & AWAKED_Q) != 0; + bool awaked_prerun = sd.local_mask & AWAKED_Q; bool incremental_lock = multi_ && multi_->IsIncrLocks(); // For multi we unlock transaction (i.e. its keys) in UnlockMulti() call. @@ -426,7 +426,7 @@ bool Transaction::RunInShard(EngineShard* shard) { // We make sure that we lock exactly once for each (multi-hop) transaction inside // transactions that lock incrementally. if (!IsGlobal() && incremental_lock && ((sd.local_mask & KEYLOCK_ACQUIRED) == 0)) { - DCHECK(!awaked_prerun); // we should not have blocking transaction inside multi block. + DCHECK(!awaked_prerun); // we should not have a blocking transaction inside multi block. sd.local_mask |= KEYLOCK_ACQUIRED; shard->db_slice().Acquire(mode, GetLockArgs(idx)); @@ -438,9 +438,10 @@ bool Transaction::RunInShard(EngineShard* shard) { // Actually running the callback. // If you change the logic here, also please change the logic try { - // if transaction is suspended (blocked in watched queue), then it's a noop. OpStatus status = OpStatus::OK; + // if a transaction is suspended, we still run it because of brpoplpush/blmove case + // that needs to run lpush on its suspended shard. status = cb_(this, shard); if (unique_shard_cnt_ == 1) { @@ -478,24 +479,26 @@ bool Transaction::RunInShard(EngineShard* shard) { // If it's a final hop we should release the locks. if (should_release) { - bool become_suspended = sd.local_mask & SUSPENDED_Q; + bool became_suspended = sd.local_mask & SUSPENDED_Q; + KeyLockArgs largs; if (IsGlobal()) { - DCHECK(!awaked_prerun && !become_suspended); // Global transactions can not be blocking. + DCHECK(!awaked_prerun && !became_suspended); // Global transactions can not be blocking. shard->shard_lock()->Release(Mode()); } else { // not global. - KeyLockArgs largs = GetLockArgs(idx); + largs = GetLockArgs(idx); DCHECK(sd.local_mask & KEYLOCK_ACQUIRED); // If a transaction has been suspended, we keep the lock so that future transaction // touching those keys will be ordered via TxQueue. It's necessary because we preserve // the atomicity of awaked transactions by halting the TxQueue. - if (was_suspended || !become_suspended) { + if (was_suspended || !became_suspended) { shard->db_slice().Release(mode, largs); sd.local_mask &= ~KEYLOCK_ACQUIRED; } sd.local_mask &= ~OUT_OF_ORDER; } + // It has 2 responsibilities. // 1: to go over potential wakened keys, verify them and activate watch queues. // 2: if this transaction was notified and finished running - to remove it from the head @@ -644,7 +647,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { // IsArmedInShard() first checks run_count_ before shard_data, so use release ordering. shard_data_[0].local_mask |= ARMED; - run_count_.fetch_add(1, memory_order_release); + run_count_.store(1, memory_order_release); time_now_ms_ = GetCurrentTimeMs(); @@ -679,9 +682,9 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { ExecuteAsync(); } - DVLOG(1) << "ScheduleSingleHop before Wait " << DebugId() << " " << run_count_.load(); + DVLOG(2) << "ScheduleSingleHop before Wait " << DebugId() << " " << run_count_.load(); WaitForShardCallbacks(); - DVLOG(1) << "ScheduleSingleHop after Wait " << DebugId(); + DVLOG(2) << "ScheduleSingleHop after Wait " << DebugId(); if (was_ooo) coordinator_state_ |= COORD_OOO; @@ -795,7 +798,7 @@ void Transaction::ExecuteAsync() { uint32_t seq_after = seqlock_.load(memory_order_acquire); bool should_poll = (seq_after == seq) && (GetLocalMask(shard->shard_id()) & ARMED); - DVLOG(2) << "PollExecCb " << DebugId() << " sid(" << shard->shard_id() << ") " + DVLOG(3) << "PollExecCb " << DebugId() << " sid(" << shard->shard_id() << ") " << run_count_.load(memory_order_relaxed) << ", should_poll: " << should_poll; // We verify that this callback is still relevant. @@ -808,9 +811,11 @@ void Transaction::ExecuteAsync() { // Therefore, everything that should be handled during the callback execution // should go into RunInShard. shard->PollExecution("exec_cb", this); + } else { + VLOG(1) << "Skipping PollExecution " << DebugId() << " sid(" << shard->shard_id() << ")"; } - DVLOG(2) << "ptr_release " << DebugId() << " " << seq; + DVLOG(3) << "ptr_release " << DebugId() << " " << seq; intrusive_ptr_release(this); // against use_count_.fetch_add above. }; @@ -850,7 +855,7 @@ void Transaction::RunQuickie(EngineShard* shard) { // runs in coordinator thread. // Marks the transaction as expired and removes it from the waiting queue. void Transaction::UnwatchBlocking(bool should_expire, WaitKeysProvider wcb) { - DVLOG(1) << "UnwatchBlocking " << DebugId(); + DVLOG(1) << "UnwatchBlocking " << DebugId() << " expire: " << should_expire; DCHECK(!IsGlobal()); run_count_.store(unique_shard_cnt_, memory_order_release); @@ -951,7 +956,7 @@ pair Transaction::ScheduleInShard(EngineShard* shard) { // All transactions in the queue must acquire the intent lock. lock_granted = shard->db_slice().Acquire(mode, lock_args) && shard_unlocked; sd.local_mask |= KEYLOCK_ACQUIRED; - DVLOG(2) << "Lock granted " << lock_granted << " for trans " << DebugId(); + DVLOG(3) << "Lock granted " << lock_granted << " for trans " << DebugId(); } if (!txq->Empty()) { @@ -1044,8 +1049,7 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const { } bool Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider) { - // Assumes that transaction is pending and scheduled. TODO: To verify it with state machine. - VLOG(2) << "WaitOnWatch Start use_count(" << GetUseCount() << ")"; + DVLOG(2) << "WaitOnWatch " << DebugId(); using namespace chrono; auto cb = [&](Transaction* t, EngineShard* shard) { @@ -1090,13 +1094,13 @@ OpStatus Transaction::WatchInShard(ArgSlice keys, EngineShard* shard) { auto& sd = shard_data_[idx]; CHECK_EQ(0, sd.local_mask & SUSPENDED_Q); - DCHECK_EQ(0, sd.local_mask & ARMED); auto* bc = shard->EnsureBlockingController(); bc->AddWatched(keys, this); sd.local_mask |= SUSPENDED_Q; - DVLOG(1) << "AddWatched " << DebugId() << " local_mask:" << sd.local_mask; + DVLOG(2) << "AddWatched " << DebugId() << " local_mask:" << sd.local_mask + << ", first_key:" << keys.front(); return OpStatus::OK; } @@ -1172,28 +1176,6 @@ void Transaction::UnlockMultiShardCb(const std::vector& sharded_keys, E this->DecreaseRunCnt(); } -#if 0 -// HasResultConverged has detailed documentation on how convergence is determined. -void Transaction::CheckForConvergence(EngineShard* shard) { - unsigned idx = SidToId(shard->shard_id()); - auto& sd = shard_data_[idx]; - - TxId notify = notify_txid(); - - if ((sd.local_mask & AWAKED_Q) || shard->HasResultConverged(notify)) { - CHECK_GE(DecreaseRunCnt(), 1u); - return; - } - - LOG(DFATAL) << "TBD"; - - BlockingController* bc = shard->blocking_controller(); - CHECK(bc); // must be present because we have watched this shard before. - - bc->RegisterAwaitForConverge(this); -} -#endif - inline uint32_t Transaction::DecreaseRunCnt() { // to protect against cases where Transaction is destroyed before run_ec_.notify // finishes running. We can not put it inside the (res == 1) block because then it's too late. @@ -1223,7 +1205,7 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) { return false; } - DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask << " by " + DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask << " by commited_id " << committed_txid; // local_mask could be awaked (i.e. not suspended) if the transaction has been diff --git a/src/server/transaction.h b/src/server/transaction.h index e63c70e4b..836c0a7fd 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -278,7 +278,7 @@ class Transaction { uint32_t arg_start = 0; // Indices into args_ array. uint16_t arg_count = 0; - // Accessed only within the engine-shard thread. + // Accessed within shard thread. // Bitmask of LocalState enums. uint16_t local_mask{0}; @@ -321,7 +321,7 @@ class Transaction { }; struct PerShardCache { - bool requested_active = false; // Activate on shard geradless of presence of keys. + bool requested_active = false; // Activate on shard regardless of presence of keys. std::vector args; std::vector original_index; diff --git a/tests/dragonfly/__init__.py b/tests/dragonfly/__init__.py index e50a987c2..f0ad7a4ee 100644 --- a/tests/dragonfly/__init__.py +++ b/tests/dragonfly/__init__.py @@ -14,6 +14,7 @@ class DflyParams: cwd: str gdb: bool args: list + existing_port: int env: any @@ -55,9 +56,12 @@ class DflyInstance: proc.communicate() def _start(self): + if self.params.existing_port: + return base_args = [f"--{v}" for v in self.params.args] all_args = self.format_args(self.args) + base_args - print(f"Starting instance on {self.port} with arguments {all_args}") + print( + f"Starting instance on {self.port} with arguments {all_args} from {self.params.path}") run_cmd = [self.params.path, *all_args] if self.params.gdb: @@ -65,16 +69,19 @@ class DflyInstance: self.proc = subprocess.Popen(run_cmd, cwd=self.params.cwd) def _check_status(self): - return_code = self.proc.poll() - if return_code is not None: - raise Exception( - f"Failed to start instance, return code {return_code}") + if not self.params.existing_port: + return_code = self.proc.poll() + if return_code is not None: + raise Exception( + f"Failed to start instance, return code {return_code}") def __getitem__(self, k): return self.args.get(k) @property def port(self) -> int: + if self.params.existing_port: + return self.params.existing_port return int(self.args.get("port", "6379")) @staticmethod diff --git a/tests/dragonfly/conftest.py b/tests/dragonfly/conftest.py index e1777ab00..ab6603c1f 100644 --- a/tests/dragonfly/conftest.py +++ b/tests/dragonfly/conftest.py @@ -40,10 +40,12 @@ def test_env(tmp_dir: Path): env["DRAGONFLY_TMP"] = str(tmp_dir) return env + @pytest.fixture(scope="session", params=[{}]) def df_seeder_factory(request) -> DflySeederFactory: return DflySeederFactory(request.config.getoption("--log-seeder")) + @pytest.fixture(scope="session", params=[{}]) def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory: """ @@ -54,12 +56,13 @@ def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory: scripts_dir, '../../build-dbg/dragonfly')) args = request.param if request.param else {} - + existing = request.config.getoption("--existing-port") params = DflyParams( path=path, cwd=tmp_dir, gdb=request.config.getoption("--gdb"), args=request.config.getoption("--df"), + existing_port=int(existing) if existing else None, env=test_env ) @@ -166,3 +169,5 @@ def pytest_addoption(parser): parser.addoption( '--log-seeder', action='store', default=None, help='Store last generator commands in file' ) + parser.addoption( + '--existing-port', action='store', default=None, help='Provide a port to the existing process for the test') diff --git a/tests/dragonfly/list_family_test.py b/tests/dragonfly/list_family_test.py index b4bdfdae1..fe624737c 100644 --- a/tests/dragonfly/list_family_test.py +++ b/tests/dragonfly/list_family_test.py @@ -34,7 +34,7 @@ class TestBlPop: assert wt_blpop.wait(2) assert wt_blpop.result[1] == 'a' watched = client.execute_command('DEBUG WATCHED') - assert watched == [] + assert watched == ['awaked', [], 'watched', []] wt_blpop.async_blpop(client) client.lpush('list2{t}', 'b') diff --git a/tests/dragonfly/redis_replicaiton_test.py b/tests/dragonfly/redis_replicaiton_test.py index c23798940..b769fac40 100644 --- a/tests/dragonfly/redis_replicaiton_test.py +++ b/tests/dragonfly/redis_replicaiton_test.py @@ -68,7 +68,11 @@ async def check_data(seeder, replicas, c_replicas): @pytest.fixture(scope="function") def redis_server() -> RedisServer: s = RedisServer() - s.start() + try: + s.start() + except FileNotFoundError as e: + pytest.skip("Redis server not found") + return None time.sleep(1) yield s s.stop()