mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
fix: Update cntx->cid on multi-tx'es (#1081)
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
f013699613
commit
282c168d34
4 changed files with 9 additions and 2 deletions
|
@ -1395,6 +1395,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
for (auto& scmd : exec_info.body) {
|
||||
cntx->transaction->MultiSwitchCmd(scmd.Cid());
|
||||
cntx->cid = scmd.Cid();
|
||||
|
||||
arg_vec.resize(scmd.NumArgs());
|
||||
CmdArgList args = absl::MakeSpan(arg_vec);
|
||||
|
|
|
@ -94,6 +94,7 @@ void MultiCommandSquasher::ExecuteStandalone(StoredCmd* cmd) {
|
|||
|
||||
auto* tx = cntx_->transaction;
|
||||
tx->MultiSwitchCmd(cmd->Cid());
|
||||
cntx_->cid = cmd->Cid();
|
||||
|
||||
tmp_keylist_.resize(cmd->NumArgs());
|
||||
auto args = absl::MakeSpan(tmp_keylist_);
|
||||
|
@ -116,6 +117,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard
|
|||
|
||||
for (auto* cmd : sinfo.cmds) {
|
||||
local_tx->MultiSwitchCmd(cmd->Cid());
|
||||
local_cntx.cid = cmd->Cid();
|
||||
|
||||
arg_vec.resize(cmd->NumArgs());
|
||||
auto args = absl::MakeSpan(arg_vec);
|
||||
|
@ -149,6 +151,7 @@ void MultiCommandSquasher::ExecuteSquashed() {
|
|||
tx->PrepareSquashedMultiHop(base_cid_, cb);
|
||||
}
|
||||
|
||||
cntx_->cid = base_cid_;
|
||||
tx->ScheduleSingleHop([this](auto* tx, auto* es) { return SquashedHopCb(tx, es); });
|
||||
|
||||
facade::CapturingReplyBuilder::Payload payload;
|
||||
|
|
|
@ -57,6 +57,8 @@ Transaction::Transaction(const Transaction* parent)
|
|||
: multi_{make_unique<MultiData>()}, txid_{parent->txid()} {
|
||||
multi_->mode = parent->multi_->mode;
|
||||
multi_->role = SQUASHED_STUB;
|
||||
|
||||
time_now_ms_ = parent->time_now_ms_;
|
||||
}
|
||||
|
||||
Transaction::~Transaction() {
|
||||
|
|
|
@ -640,7 +640,7 @@ async def test_expiry(df_local_factory, n_keys=1000):
|
|||
res = await c_replica.mget(k for k, _ in gen_test_data(n_keys))
|
||||
assert all(v is not None for v in res)
|
||||
|
||||
# Set key differnt expries times in ms
|
||||
# Set key different expries times in ms
|
||||
pipe = c_master.pipeline(transaction=True)
|
||||
for k, _ in gen_test_data(n_keys):
|
||||
ms = random.randint(20, 500)
|
||||
|
@ -663,9 +663,10 @@ async def test_expiry(df_local_factory, n_keys=1000):
|
|||
# Wait for master to expire keys
|
||||
await asyncio.sleep(3.0)
|
||||
|
||||
# Check all keys with expiry has be deleted
|
||||
# Check all keys with expiry have been deleted
|
||||
res = await c_master.mget(k for k, _ in gen_test_data(n_keys))
|
||||
assert all(v is None for v in res)
|
||||
|
||||
# Check replica finished executing the replicated commands
|
||||
await check_all_replicas_finished([c_replica], c_master)
|
||||
res = await c_replica.mget(k for k, _ in gen_test_data(n_keys))
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue