mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
chore: some pytests and logging improvements
1. pytest extensions and fixes - allows running them with the existing local server by providing its port (--existing <port>). 2. Extend "DEBUG WATCHED" command to provide more information about watched state. 3. Improve debug/vlog printings around the code. This noisy PR is a preparation before BRPOP fix that will follow later. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
f9ec60ee5f
commit
c96f637f73
14 changed files with 124 additions and 95 deletions
|
@ -21,7 +21,7 @@ using namespace std;
|
|||
struct WatchItem {
|
||||
Transaction* trans;
|
||||
|
||||
Transaction* get() {
|
||||
Transaction* get() const {
|
||||
return trans;
|
||||
}
|
||||
|
||||
|
@ -64,7 +64,7 @@ BlockingController::~BlockingController() {
|
|||
}
|
||||
|
||||
void BlockingController::DbWatchTable::RemoveEntry(WatchQueueMap::iterator it) {
|
||||
DVLOG(1) << "Erasing watchqueue key " << it->first;
|
||||
DVLOG(2) << "Erasing watchqueue key " << it->first;
|
||||
|
||||
awakened_keys.erase(it->first);
|
||||
queue_map.erase(it);
|
||||
|
@ -137,8 +137,6 @@ void BlockingController::RunStep(Transaction* completed_t) {
|
|||
}
|
||||
|
||||
void BlockingController::AddWatched(ArgSlice keys, Transaction* trans) {
|
||||
VLOG(1) << "AddWatched [" << owner_->shard_id() << "] " << trans->DebugId();
|
||||
|
||||
auto [dbit, added] = watched_dbs_.emplace(trans->GetDbIndex(), nullptr);
|
||||
if (added) {
|
||||
dbit->second.reset(new DbWatchTable);
|
||||
|
@ -160,7 +158,7 @@ void BlockingController::AddWatched(ArgSlice keys, Transaction* trans) {
|
|||
if (last == trans)
|
||||
continue;
|
||||
}
|
||||
DVLOG(2) << "Emplace " << trans << " " << trans->DebugId() << " to watch " << key;
|
||||
DVLOG(2) << "Emplace " << trans->DebugId() << " to watch " << key;
|
||||
res->second->items.emplace_back(trans);
|
||||
}
|
||||
}
|
||||
|
@ -219,7 +217,6 @@ void BlockingController::AwakeWatched(DbIndex db_index, string_view db_key) {
|
|||
}
|
||||
}
|
||||
|
||||
// Internal function called from RunStep().
|
||||
// Marks the queue as active and notifies the first transaction in the queue.
|
||||
void BlockingController::NotifyWatchQueue(std::string_view key, WatchQueueMap* wqm) {
|
||||
auto w_it = wqm->find(key);
|
||||
|
|
|
@ -24,6 +24,10 @@ class BlockingController {
|
|||
return !awakened_transactions_.empty();
|
||||
}
|
||||
|
||||
const auto& awakened_transactions() const {
|
||||
return awakened_transactions_;
|
||||
}
|
||||
|
||||
// Iterates over awakened key candidates in each db and moves verified ones into
|
||||
// global verified_awakened_ array.
|
||||
// Returns true if there are active awakened keys, false otherwise.
|
||||
|
|
|
@ -38,6 +38,7 @@ using namespace facade;
|
|||
namespace fs = std::filesystem;
|
||||
using absl::GetFlag;
|
||||
using absl::StrAppend;
|
||||
using absl::StrCat;
|
||||
|
||||
struct PopulateBatch {
|
||||
DbIndex dbid;
|
||||
|
@ -418,9 +419,11 @@ void DebugCmd::Inspect(string_view key) {
|
|||
}
|
||||
|
||||
void DebugCmd::Watched() {
|
||||
vector<string> watched_keys;
|
||||
boost::fibers::mutex mu;
|
||||
|
||||
vector<string> watched_keys;
|
||||
vector<string> awaked_trans;
|
||||
|
||||
auto cb = [&](EngineShard* shard) {
|
||||
auto* bc = shard->blocking_controller();
|
||||
if (bc) {
|
||||
|
@ -428,10 +431,17 @@ void DebugCmd::Watched() {
|
|||
|
||||
lock_guard lk(mu);
|
||||
watched_keys.insert(watched_keys.end(), keys.begin(), keys.end());
|
||||
for (auto* tx : bc->awakened_transactions()) {
|
||||
awaked_trans.push_back(StrCat("[", shard->shard_id(), "] ", tx->DebugId()));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
shard_set->RunBlockingInParallel(cb);
|
||||
(*cntx_)->StartArray(4);
|
||||
(*cntx_)->SendBulkString("awaked");
|
||||
(*cntx_)->SendStringArr(awaked_trans);
|
||||
(*cntx_)->SendBulkString("watched");
|
||||
(*cntx_)->SendStringArr(watched_keys);
|
||||
}
|
||||
|
||||
|
|
|
@ -185,7 +185,7 @@ TEST_F(DflyEngineTest, EvalBug713b) {
|
|||
fibers_ext::Fiber fibers[kNumFibers];
|
||||
|
||||
for (unsigned j = 0; j < kNumFibers; ++j) {
|
||||
fibers[j] = pp_->at(1)->LaunchFiber([=, this] {
|
||||
fibers[j] = pp_->at(1)->LaunchFiber([j, script, this] {
|
||||
for (unsigned i = 0; i < 50; ++i) {
|
||||
Run(StrCat("fb", j), {"eval", script, "3", kKeySid0, kKeySid1, kKeySid2});
|
||||
}
|
||||
|
|
|
@ -294,7 +294,6 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
|
|||
DCHECK(continuation_trans_ == nullptr)
|
||||
<< continuation_trans_->DebugId() << " when polling " << trans->DebugId();
|
||||
|
||||
CHECK_EQ(committed_txid_, trans->GetNotifyTxid());
|
||||
bool keep = trans->RunInShard(this);
|
||||
if (keep) {
|
||||
return;
|
||||
|
@ -374,15 +373,13 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
|
|||
DVLOG(1) << "Skipped TxQueue " << continuation_trans_;
|
||||
}
|
||||
|
||||
// we need to run trans if it's OOO or when trans is blocked in this shard and should
|
||||
// be treated here as noop.
|
||||
// we need to run trans if it's OOO or when trans is blocked in this shard.
|
||||
bool should_run = trans_mask & (Transaction::OUT_OF_ORDER | Transaction::SUSPENDED_Q);
|
||||
|
||||
// It may be that there are other transactions that touch those keys but they necessary ordered
|
||||
// after trans in the queue, hence it's safe to run trans out of order.
|
||||
if (trans && should_run) {
|
||||
DCHECK(trans != head);
|
||||
DCHECK(trans_mask & Transaction::ARMED);
|
||||
|
||||
dbg_id.clear();
|
||||
|
||||
|
|
|
@ -241,21 +241,21 @@ class BPopPusher {
|
|||
BPopper::BPopper(ListDir dir) : dir_(dir) {
|
||||
}
|
||||
|
||||
OpStatus BPopper::Run(Transaction* t, unsigned msec) {
|
||||
OpStatus BPopper::Run(Transaction* trans, unsigned msec) {
|
||||
time_point tp =
|
||||
msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max();
|
||||
bool is_multi = t->IsMulti();
|
||||
t->Schedule();
|
||||
bool is_multi = trans->IsMulti();
|
||||
trans->Schedule();
|
||||
|
||||
auto* stats = ServerState::tl_connection_stats();
|
||||
|
||||
OpResult<ShardFFResult> result = FindFirst(t);
|
||||
OpResult<ShardFFResult> result = FindFirst(trans);
|
||||
|
||||
if (result.status() == OpStatus::KEY_NOTFOUND) {
|
||||
if (is_multi) {
|
||||
// close transaction and return.
|
||||
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
|
||||
t->Execute(std::move(cb), true);
|
||||
trans->Execute(std::move(cb), true);
|
||||
|
||||
return OpStatus::TIMED_OUT;
|
||||
}
|
||||
|
@ -265,34 +265,37 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) {
|
|||
return t->GetShardArgs(shard->shard_id());
|
||||
};
|
||||
|
||||
VLOG(1) << "Blocking BLPOP " << trans->DebugId();
|
||||
++stats->num_blocked_clients;
|
||||
bool wait_succeeded = t->WaitOnWatch(tp, std::move(wcb));
|
||||
bool wait_succeeded = trans->WaitOnWatch(tp, std::move(wcb));
|
||||
--stats->num_blocked_clients;
|
||||
|
||||
if (!wait_succeeded)
|
||||
return OpStatus::TIMED_OUT;
|
||||
|
||||
// Now we have something for sure.
|
||||
result = FindFirst(t); // retry - must find something.
|
||||
result = FindFirst(trans); // retry - must find something.
|
||||
}
|
||||
|
||||
// We got here
|
||||
if (!result) {
|
||||
// Could be the wrong-type error.
|
||||
// cleanups, locks removal etc.
|
||||
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
|
||||
t->Execute(std::move(cb), true);
|
||||
trans->Execute(std::move(cb), true);
|
||||
|
||||
DCHECK_NE(result.status(), OpStatus::KEY_NOTFOUND);
|
||||
|
||||
return result.status();
|
||||
}
|
||||
|
||||
VLOG(1) << "Popping an element " << t->DebugId();
|
||||
VLOG(1) << "Popping an element " << trans->DebugId();
|
||||
ff_result_ = move(result.value());
|
||||
|
||||
auto cb = [this](Transaction* t, EngineShard* shard) {
|
||||
Pop(t, shard);
|
||||
return OpStatus::OK;
|
||||
};
|
||||
t->Execute(std::move(cb), true);
|
||||
trans->Execute(std::move(cb), true);
|
||||
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
@ -302,14 +305,16 @@ void BPopper::Pop(Transaction* t, EngineShard* shard) {
|
|||
ff_result_.key.GetString(&key_);
|
||||
auto& db_slice = shard->db_slice();
|
||||
auto it_res = db_slice.Find(t->GetDbContext(), key_, OBJ_LIST);
|
||||
CHECK(it_res); // must exist and must be ok.
|
||||
CHECK(it_res) << t->DebugId() << " " << key_; // must exist and must be ok.
|
||||
PrimeIterator it = *it_res;
|
||||
quicklist* ql = GetQL(it->second);
|
||||
|
||||
DVLOG(2) << "popping from " << key_ << " " << t->DebugId();
|
||||
db_slice.PreUpdate(t->GetDbIndex(), it);
|
||||
value_ = ListPop(dir_, ql);
|
||||
db_slice.PostUpdate(t->GetDbIndex(), it, key_);
|
||||
if (quicklistCount(ql) == 0) {
|
||||
DVLOG(1) << "deleting key " << key_ << " " << t->DebugId();
|
||||
CHECK(shard->db_slice().Del(t->GetDbIndex(), it));
|
||||
}
|
||||
OpArgs op_args = t->GetOpArgs(shard);
|
||||
|
@ -408,10 +413,7 @@ OpResult<string> Peek(const OpArgs& op_args, string_view key, ListDir dir, bool
|
|||
}
|
||||
|
||||
OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir dir,
|
||||
bool skip_notexist, absl::Span<std::string_view> vals,
|
||||
bool journal_rewrite) {
|
||||
DVLOG(1) << "OpPush " << key;
|
||||
|
||||
bool skip_notexist, ArgSlice vals, bool journal_rewrite) {
|
||||
EngineShard* es = op_args.shard;
|
||||
PrimeIterator it;
|
||||
bool new_key = false;
|
||||
|
@ -430,6 +432,7 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
|
|||
}
|
||||
|
||||
quicklist* ql = nullptr;
|
||||
DVLOG(1) << "OpPush " << key << " new_key " << new_key;
|
||||
|
||||
if (new_key) {
|
||||
robj* o = createQuicklistObject();
|
||||
|
@ -544,16 +547,21 @@ OpResult<string> MoveTwoShards(Transaction* trans, string_view src, string_view
|
|||
// Everything is ok, lets proceed with the mutations.
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
auto args = t->GetShardArgs(shard->shard_id());
|
||||
bool is_dest = args.front() == dest;
|
||||
auto key = args.front();
|
||||
bool is_dest = (key == dest);
|
||||
OpArgs op_args = t->GetOpArgs(shard);
|
||||
|
||||
if (is_dest) {
|
||||
string_view val{find_res[0].value()};
|
||||
absl::Span<string_view> span{&val, 1};
|
||||
OpPush(op_args, args.front(), dest_dir, false, span, true);
|
||||
DVLOG(1) << "Pushing value: " << val << " to list: " << dest;
|
||||
|
||||
ArgSlice span{&val, 1};
|
||||
OpPush(op_args, key, dest_dir, false, span, true);
|
||||
} else {
|
||||
OpPop(op_args, args.front(), src_dir, 1, false, true);
|
||||
DVLOG(1) << "Popping value from list: " << key;
|
||||
OpPop(op_args, key, src_dir, 1, false, true);
|
||||
}
|
||||
|
||||
return OpStatus::OK;
|
||||
};
|
||||
trans->Execute(move(cb), true);
|
||||
|
@ -1288,7 +1296,7 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn
|
|||
if (timeout < 0) {
|
||||
return (*cntx)->SendError("timeout is negative");
|
||||
}
|
||||
VLOG(1) << "BLPop start " << timeout;
|
||||
VLOG(1) << "BPop timeout(" << timeout << ")";
|
||||
|
||||
Transaction* transaction = cntx->transaction;
|
||||
BPopper popper(dir);
|
||||
|
@ -1297,13 +1305,15 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn
|
|||
if (result == OpStatus::OK) {
|
||||
auto res = popper.result();
|
||||
|
||||
VLOG(1) << "BLPop returned from " << res.first; // key.
|
||||
DVLOG(1) << "BPop " << transaction->DebugId() << " popped from key " << res.first; // key.
|
||||
|
||||
std::string_view str_arr[2] = {res.first, res.second};
|
||||
|
||||
return (*cntx)->SendStringArr(str_arr);
|
||||
}
|
||||
|
||||
DVLOG(1) << "result for " << transaction->DebugId() << " is " << result;
|
||||
|
||||
switch (result) {
|
||||
case OpStatus::WRONG_TYPE:
|
||||
return (*cntx)->SendError(kWrongTypeErr);
|
||||
|
|
|
@ -21,6 +21,7 @@ using namespace testing;
|
|||
using namespace std;
|
||||
using namespace util;
|
||||
namespace fibers = ::boost::fibers;
|
||||
using absl::StrCat;
|
||||
|
||||
namespace dfly {
|
||||
|
||||
|
@ -30,7 +31,7 @@ class ListFamilyTest : public BaseFamilyTest {
|
|||
num_threads_ = 4;
|
||||
}
|
||||
|
||||
unsigned NumWatched() {
|
||||
static unsigned NumWatched() {
|
||||
atomic_uint32_t sum{0};
|
||||
shard_set->RunBriefInParallel([&](EngineShard* es) {
|
||||
auto* bc = es->blocking_controller();
|
||||
|
@ -40,6 +41,17 @@ class ListFamilyTest : public BaseFamilyTest {
|
|||
|
||||
return sum.load();
|
||||
}
|
||||
|
||||
static bool HasAwakened() {
|
||||
atomic_uint32_t sum{0};
|
||||
shard_set->RunBriefInParallel([&](EngineShard* es) {
|
||||
auto* bc = es->blocking_controller();
|
||||
if (bc)
|
||||
sum.fetch_add(bc->HasAwakedTransaction(), memory_order_relaxed);
|
||||
});
|
||||
|
||||
return sum.load() > 0;
|
||||
}
|
||||
};
|
||||
|
||||
const char kKey1[] = "x";
|
||||
|
@ -330,8 +342,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
|
|||
|
||||
auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
|
||||
blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"});
|
||||
auto watched = Run({"debug", "watched"});
|
||||
ASSERT_THAT(watched, ArrLen(0));
|
||||
EXPECT_EQ(0, NumWatched());
|
||||
});
|
||||
|
||||
WaitUntilLocked(0, kKey1);
|
||||
|
@ -362,10 +373,8 @@ TEST_F(ListFamilyTest, BPopTwoKeysSameShard) {
|
|||
|
||||
auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
|
||||
blpop_resp = Run({"blpop", "x", "y", "0"});
|
||||
auto watched = Run({"debug", "watched"});
|
||||
|
||||
EXPECT_FALSE(IsLocked(0, "y"));
|
||||
ASSERT_THAT(watched, ArrLen(0));
|
||||
ASSERT_EQ(0, NumWatched());
|
||||
});
|
||||
|
||||
WaitUntilLocked(0, "x");
|
||||
|
@ -674,6 +683,7 @@ TEST_F(ListFamilyTest, TwoQueueBug451) {
|
|||
|
||||
for (auto& f : fbs)
|
||||
f.Join();
|
||||
ASSERT_EQ(0, NumWatched());
|
||||
}
|
||||
|
||||
TEST_F(ListFamilyTest, BRPopLPushSingleShard) {
|
||||
|
@ -731,6 +741,7 @@ TEST_F(ListFamilyTest, BRPopLPushTwoShards) {
|
|||
resp = Run({"lrange", "z", "0", "-1"});
|
||||
ASSERT_EQ(resp, "val");
|
||||
Run({"del", "z"});
|
||||
ASSERT_EQ(0, NumWatched());
|
||||
|
||||
// Run the fiber at creation.
|
||||
auto fb0 = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
|
||||
|
@ -754,6 +765,8 @@ TEST_F(ListFamilyTest, BRPopLPushTwoShards) {
|
|||
ASSERT_FALSE(IsLocked(0, "x"));
|
||||
ASSERT_FALSE(IsLocked(0, "z"));
|
||||
ASSERT_EQ(0, NumWatched());
|
||||
ASSERT_FALSE(HasAwakened());
|
||||
|
||||
// TODO: there is a bug here.
|
||||
// we do not wake the dest shard, when source is awaked which prevents
|
||||
// the atomicity and causes the first bug as well.
|
||||
|
|
|
@ -199,7 +199,7 @@ RespExpr BaseFamilyTest::Run(std::string_view id, ArgSlice slice) {
|
|||
|
||||
auto* context = conn_wrapper->cmd_cntx();
|
||||
|
||||
DCHECK(context->transaction == nullptr);
|
||||
DCHECK(context->transaction == nullptr) << id;
|
||||
|
||||
service_->DispatchCommand(CmdArgList{args}, context);
|
||||
|
||||
|
|
|
@ -53,7 +53,7 @@ Transaction::Transaction(const CommandId* cid, uint32_t thread_index)
|
|||
}
|
||||
|
||||
Transaction::~Transaction() {
|
||||
DVLOG(2) << "Transaction " << StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, ")")
|
||||
DVLOG(3) << "Transaction " << StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, ")")
|
||||
<< " destroyed";
|
||||
}
|
||||
|
||||
|
@ -402,16 +402,16 @@ bool Transaction::RunInShard(EngineShard* shard) {
|
|||
// because Scheduling is done before multi-exec batch is executed. Therefore we
|
||||
// lock keys right before the execution of each statement.
|
||||
|
||||
VLOG(2) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id();
|
||||
|
||||
unsigned idx = SidToId(shard->shard_id());
|
||||
auto& sd = shard_data_[idx];
|
||||
|
||||
DCHECK(sd.local_mask & ARMED);
|
||||
sd.local_mask &= ~ARMED;
|
||||
|
||||
VLOG(2) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id() << " " << sd.local_mask;
|
||||
|
||||
bool was_suspended = sd.local_mask & SUSPENDED_Q;
|
||||
bool awaked_prerun = (sd.local_mask & AWAKED_Q) != 0;
|
||||
bool awaked_prerun = sd.local_mask & AWAKED_Q;
|
||||
bool incremental_lock = multi_ && multi_->IsIncrLocks();
|
||||
|
||||
// For multi we unlock transaction (i.e. its keys) in UnlockMulti() call.
|
||||
|
@ -426,7 +426,7 @@ bool Transaction::RunInShard(EngineShard* shard) {
|
|||
// We make sure that we lock exactly once for each (multi-hop) transaction inside
|
||||
// transactions that lock incrementally.
|
||||
if (!IsGlobal() && incremental_lock && ((sd.local_mask & KEYLOCK_ACQUIRED) == 0)) {
|
||||
DCHECK(!awaked_prerun); // we should not have blocking transaction inside multi block.
|
||||
DCHECK(!awaked_prerun); // we should not have a blocking transaction inside multi block.
|
||||
|
||||
sd.local_mask |= KEYLOCK_ACQUIRED;
|
||||
shard->db_slice().Acquire(mode, GetLockArgs(idx));
|
||||
|
@ -438,9 +438,10 @@ bool Transaction::RunInShard(EngineShard* shard) {
|
|||
// Actually running the callback.
|
||||
// If you change the logic here, also please change the logic
|
||||
try {
|
||||
// if transaction is suspended (blocked in watched queue), then it's a noop.
|
||||
OpStatus status = OpStatus::OK;
|
||||
|
||||
// if a transaction is suspended, we still run it because of brpoplpush/blmove case
|
||||
// that needs to run lpush on its suspended shard.
|
||||
status = cb_(this, shard);
|
||||
|
||||
if (unique_shard_cnt_ == 1) {
|
||||
|
@ -478,24 +479,26 @@ bool Transaction::RunInShard(EngineShard* shard) {
|
|||
|
||||
// If it's a final hop we should release the locks.
|
||||
if (should_release) {
|
||||
bool become_suspended = sd.local_mask & SUSPENDED_Q;
|
||||
bool became_suspended = sd.local_mask & SUSPENDED_Q;
|
||||
KeyLockArgs largs;
|
||||
|
||||
if (IsGlobal()) {
|
||||
DCHECK(!awaked_prerun && !become_suspended); // Global transactions can not be blocking.
|
||||
DCHECK(!awaked_prerun && !became_suspended); // Global transactions can not be blocking.
|
||||
shard->shard_lock()->Release(Mode());
|
||||
} else { // not global.
|
||||
KeyLockArgs largs = GetLockArgs(idx);
|
||||
largs = GetLockArgs(idx);
|
||||
DCHECK(sd.local_mask & KEYLOCK_ACQUIRED);
|
||||
|
||||
// If a transaction has been suspended, we keep the lock so that future transaction
|
||||
// touching those keys will be ordered via TxQueue. It's necessary because we preserve
|
||||
// the atomicity of awaked transactions by halting the TxQueue.
|
||||
if (was_suspended || !become_suspended) {
|
||||
if (was_suspended || !became_suspended) {
|
||||
shard->db_slice().Release(mode, largs);
|
||||
sd.local_mask &= ~KEYLOCK_ACQUIRED;
|
||||
}
|
||||
sd.local_mask &= ~OUT_OF_ORDER;
|
||||
}
|
||||
|
||||
// It has 2 responsibilities.
|
||||
// 1: to go over potential wakened keys, verify them and activate watch queues.
|
||||
// 2: if this transaction was notified and finished running - to remove it from the head
|
||||
|
@ -644,7 +647,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
|
|||
|
||||
// IsArmedInShard() first checks run_count_ before shard_data, so use release ordering.
|
||||
shard_data_[0].local_mask |= ARMED;
|
||||
run_count_.fetch_add(1, memory_order_release);
|
||||
run_count_.store(1, memory_order_release);
|
||||
|
||||
time_now_ms_ = GetCurrentTimeMs();
|
||||
|
||||
|
@ -679,9 +682,9 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
|
|||
ExecuteAsync();
|
||||
}
|
||||
|
||||
DVLOG(1) << "ScheduleSingleHop before Wait " << DebugId() << " " << run_count_.load();
|
||||
DVLOG(2) << "ScheduleSingleHop before Wait " << DebugId() << " " << run_count_.load();
|
||||
WaitForShardCallbacks();
|
||||
DVLOG(1) << "ScheduleSingleHop after Wait " << DebugId();
|
||||
DVLOG(2) << "ScheduleSingleHop after Wait " << DebugId();
|
||||
|
||||
if (was_ooo)
|
||||
coordinator_state_ |= COORD_OOO;
|
||||
|
@ -795,7 +798,7 @@ void Transaction::ExecuteAsync() {
|
|||
uint32_t seq_after = seqlock_.load(memory_order_acquire);
|
||||
bool should_poll = (seq_after == seq) && (GetLocalMask(shard->shard_id()) & ARMED);
|
||||
|
||||
DVLOG(2) << "PollExecCb " << DebugId() << " sid(" << shard->shard_id() << ") "
|
||||
DVLOG(3) << "PollExecCb " << DebugId() << " sid(" << shard->shard_id() << ") "
|
||||
<< run_count_.load(memory_order_relaxed) << ", should_poll: " << should_poll;
|
||||
|
||||
// We verify that this callback is still relevant.
|
||||
|
@ -808,9 +811,11 @@ void Transaction::ExecuteAsync() {
|
|||
// Therefore, everything that should be handled during the callback execution
|
||||
// should go into RunInShard.
|
||||
shard->PollExecution("exec_cb", this);
|
||||
} else {
|
||||
VLOG(1) << "Skipping PollExecution " << DebugId() << " sid(" << shard->shard_id() << ")";
|
||||
}
|
||||
|
||||
DVLOG(2) << "ptr_release " << DebugId() << " " << seq;
|
||||
DVLOG(3) << "ptr_release " << DebugId() << " " << seq;
|
||||
intrusive_ptr_release(this); // against use_count_.fetch_add above.
|
||||
};
|
||||
|
||||
|
@ -850,7 +855,7 @@ void Transaction::RunQuickie(EngineShard* shard) {
|
|||
// runs in coordinator thread.
|
||||
// Marks the transaction as expired and removes it from the waiting queue.
|
||||
void Transaction::UnwatchBlocking(bool should_expire, WaitKeysProvider wcb) {
|
||||
DVLOG(1) << "UnwatchBlocking " << DebugId();
|
||||
DVLOG(1) << "UnwatchBlocking " << DebugId() << " expire: " << should_expire;
|
||||
DCHECK(!IsGlobal());
|
||||
|
||||
run_count_.store(unique_shard_cnt_, memory_order_release);
|
||||
|
@ -951,7 +956,7 @@ pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) {
|
|||
// All transactions in the queue must acquire the intent lock.
|
||||
lock_granted = shard->db_slice().Acquire(mode, lock_args) && shard_unlocked;
|
||||
sd.local_mask |= KEYLOCK_ACQUIRED;
|
||||
DVLOG(2) << "Lock granted " << lock_granted << " for trans " << DebugId();
|
||||
DVLOG(3) << "Lock granted " << lock_granted << " for trans " << DebugId();
|
||||
}
|
||||
|
||||
if (!txq->Empty()) {
|
||||
|
@ -1044,8 +1049,7 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const {
|
|||
}
|
||||
|
||||
bool Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider) {
|
||||
// Assumes that transaction is pending and scheduled. TODO: To verify it with state machine.
|
||||
VLOG(2) << "WaitOnWatch Start use_count(" << GetUseCount() << ")";
|
||||
DVLOG(2) << "WaitOnWatch " << DebugId();
|
||||
using namespace chrono;
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
|
@ -1090,13 +1094,13 @@ OpStatus Transaction::WatchInShard(ArgSlice keys, EngineShard* shard) {
|
|||
|
||||
auto& sd = shard_data_[idx];
|
||||
CHECK_EQ(0, sd.local_mask & SUSPENDED_Q);
|
||||
DCHECK_EQ(0, sd.local_mask & ARMED);
|
||||
|
||||
auto* bc = shard->EnsureBlockingController();
|
||||
bc->AddWatched(keys, this);
|
||||
|
||||
sd.local_mask |= SUSPENDED_Q;
|
||||
DVLOG(1) << "AddWatched " << DebugId() << " local_mask:" << sd.local_mask;
|
||||
DVLOG(2) << "AddWatched " << DebugId() << " local_mask:" << sd.local_mask
|
||||
<< ", first_key:" << keys.front();
|
||||
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
@ -1172,28 +1176,6 @@ void Transaction::UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, E
|
|||
this->DecreaseRunCnt();
|
||||
}
|
||||
|
||||
#if 0
|
||||
// HasResultConverged has detailed documentation on how convergence is determined.
|
||||
void Transaction::CheckForConvergence(EngineShard* shard) {
|
||||
unsigned idx = SidToId(shard->shard_id());
|
||||
auto& sd = shard_data_[idx];
|
||||
|
||||
TxId notify = notify_txid();
|
||||
|
||||
if ((sd.local_mask & AWAKED_Q) || shard->HasResultConverged(notify)) {
|
||||
CHECK_GE(DecreaseRunCnt(), 1u);
|
||||
return;
|
||||
}
|
||||
|
||||
LOG(DFATAL) << "TBD";
|
||||
|
||||
BlockingController* bc = shard->blocking_controller();
|
||||
CHECK(bc); // must be present because we have watched this shard before.
|
||||
|
||||
bc->RegisterAwaitForConverge(this);
|
||||
}
|
||||
#endif
|
||||
|
||||
inline uint32_t Transaction::DecreaseRunCnt() {
|
||||
// to protect against cases where Transaction is destroyed before run_ec_.notify
|
||||
// finishes running. We can not put it inside the (res == 1) block because then it's too late.
|
||||
|
@ -1223,7 +1205,7 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) {
|
|||
return false;
|
||||
}
|
||||
|
||||
DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask << " by "
|
||||
DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask << " by commited_id "
|
||||
<< committed_txid;
|
||||
|
||||
// local_mask could be awaked (i.e. not suspended) if the transaction has been
|
||||
|
|
|
@ -278,7 +278,7 @@ class Transaction {
|
|||
uint32_t arg_start = 0; // Indices into args_ array.
|
||||
uint16_t arg_count = 0;
|
||||
|
||||
// Accessed only within the engine-shard thread.
|
||||
// Accessed within shard thread.
|
||||
// Bitmask of LocalState enums.
|
||||
uint16_t local_mask{0};
|
||||
|
||||
|
@ -321,7 +321,7 @@ class Transaction {
|
|||
};
|
||||
|
||||
struct PerShardCache {
|
||||
bool requested_active = false; // Activate on shard geradless of presence of keys.
|
||||
bool requested_active = false; // Activate on shard regardless of presence of keys.
|
||||
std::vector<std::string_view> args;
|
||||
std::vector<uint32_t> original_index;
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@ class DflyParams:
|
|||
cwd: str
|
||||
gdb: bool
|
||||
args: list
|
||||
existing_port: int
|
||||
env: any
|
||||
|
||||
|
||||
|
@ -55,9 +56,12 @@ class DflyInstance:
|
|||
proc.communicate()
|
||||
|
||||
def _start(self):
|
||||
if self.params.existing_port:
|
||||
return
|
||||
base_args = [f"--{v}" for v in self.params.args]
|
||||
all_args = self.format_args(self.args) + base_args
|
||||
print(f"Starting instance on {self.port} with arguments {all_args}")
|
||||
print(
|
||||
f"Starting instance on {self.port} with arguments {all_args} from {self.params.path}")
|
||||
|
||||
run_cmd = [self.params.path, *all_args]
|
||||
if self.params.gdb:
|
||||
|
@ -65,16 +69,19 @@ class DflyInstance:
|
|||
self.proc = subprocess.Popen(run_cmd, cwd=self.params.cwd)
|
||||
|
||||
def _check_status(self):
|
||||
return_code = self.proc.poll()
|
||||
if return_code is not None:
|
||||
raise Exception(
|
||||
f"Failed to start instance, return code {return_code}")
|
||||
if not self.params.existing_port:
|
||||
return_code = self.proc.poll()
|
||||
if return_code is not None:
|
||||
raise Exception(
|
||||
f"Failed to start instance, return code {return_code}")
|
||||
|
||||
def __getitem__(self, k):
|
||||
return self.args.get(k)
|
||||
|
||||
@property
|
||||
def port(self) -> int:
|
||||
if self.params.existing_port:
|
||||
return self.params.existing_port
|
||||
return int(self.args.get("port", "6379"))
|
||||
|
||||
@staticmethod
|
||||
|
|
|
@ -40,10 +40,12 @@ def test_env(tmp_dir: Path):
|
|||
env["DRAGONFLY_TMP"] = str(tmp_dir)
|
||||
return env
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", params=[{}])
|
||||
def df_seeder_factory(request) -> DflySeederFactory:
|
||||
return DflySeederFactory(request.config.getoption("--log-seeder"))
|
||||
|
||||
|
||||
@pytest.fixture(scope="session", params=[{}])
|
||||
def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory:
|
||||
"""
|
||||
|
@ -54,12 +56,13 @@ def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory:
|
|||
scripts_dir, '../../build-dbg/dragonfly'))
|
||||
|
||||
args = request.param if request.param else {}
|
||||
|
||||
existing = request.config.getoption("--existing-port")
|
||||
params = DflyParams(
|
||||
path=path,
|
||||
cwd=tmp_dir,
|
||||
gdb=request.config.getoption("--gdb"),
|
||||
args=request.config.getoption("--df"),
|
||||
existing_port=int(existing) if existing else None,
|
||||
env=test_env
|
||||
)
|
||||
|
||||
|
@ -166,3 +169,5 @@ def pytest_addoption(parser):
|
|||
parser.addoption(
|
||||
'--log-seeder', action='store', default=None, help='Store last generator commands in file'
|
||||
)
|
||||
parser.addoption(
|
||||
'--existing-port', action='store', default=None, help='Provide a port to the existing process for the test')
|
||||
|
|
|
@ -34,7 +34,7 @@ class TestBlPop:
|
|||
assert wt_blpop.wait(2)
|
||||
assert wt_blpop.result[1] == 'a'
|
||||
watched = client.execute_command('DEBUG WATCHED')
|
||||
assert watched == []
|
||||
assert watched == ['awaked', [], 'watched', []]
|
||||
|
||||
wt_blpop.async_blpop(client)
|
||||
client.lpush('list2{t}', 'b')
|
||||
|
|
|
@ -68,7 +68,11 @@ async def check_data(seeder, replicas, c_replicas):
|
|||
@pytest.fixture(scope="function")
|
||||
def redis_server() -> RedisServer:
|
||||
s = RedisServer()
|
||||
s.start()
|
||||
try:
|
||||
s.start()
|
||||
except FileNotFoundError as e:
|
||||
pytest.skip("Redis server not found")
|
||||
return None
|
||||
time.sleep(1)
|
||||
yield s
|
||||
s.stop()
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue