mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
chore(server): Fix watch (#3557)
This commit is contained in:
parent
3fd3c40b74
commit
0e4e971ad9
3 changed files with 19 additions and 8 deletions
|
@ -2070,8 +2070,8 @@ void Service::Discard(CmdArgList args, ConnectionContext* cntx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Return true if non of the connections watched keys expired.
|
// Return true if non of the connections watched keys expired.
|
||||||
bool CheckWatchedKeyExpiry(ConnectionContext* cntx, const CommandRegistry& registry) {
|
bool CheckWatchedKeyExpiry(ConnectionContext* cntx, const CommandId* exists_cid,
|
||||||
static char EXISTS[] = "EXISTS";
|
const CommandId* exec_cid) {
|
||||||
auto& exec_info = cntx->conn_state.exec_info;
|
auto& exec_info = cntx->conn_state.exec_info;
|
||||||
|
|
||||||
CmdArgVec str_list(exec_info.watched_keys.size());
|
CmdArgVec str_list(exec_info.watched_keys.size());
|
||||||
|
@ -2089,11 +2089,14 @@ bool CheckWatchedKeyExpiry(ConnectionContext* cntx, const CommandRegistry& regis
|
||||||
return OpStatus::OK;
|
return OpStatus::OK;
|
||||||
};
|
};
|
||||||
|
|
||||||
cntx->transaction->MultiSwitchCmd(registry.Find(EXISTS));
|
cntx->transaction->MultiSwitchCmd(exists_cid);
|
||||||
cntx->transaction->InitByArgs(cntx->ns, cntx->conn_state.db_index, CmdArgList{str_list});
|
cntx->transaction->InitByArgs(cntx->ns, cntx->conn_state.db_index, CmdArgList{str_list});
|
||||||
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
|
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||||
CHECK_EQ(OpStatus::OK, status);
|
CHECK_EQ(OpStatus::OK, status);
|
||||||
|
|
||||||
|
// Reset cid to EXEC as it was before
|
||||||
|
cntx->transaction->MultiSwitchCmd(exec_cid);
|
||||||
|
|
||||||
// The comparison can still be true even if a key expired due to another one being created.
|
// The comparison can still be true even if a key expired due to another one being created.
|
||||||
// So we have to check the watched_dirty flag, which is set if a key expired.
|
// So we have to check the watched_dirty flag, which is set if a key expired.
|
||||||
return watch_exist_count.load() == exec_info.watched_existed &&
|
return watch_exist_count.load() == exec_info.watched_existed &&
|
||||||
|
@ -2206,7 +2209,8 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// EXEC should not run if any of the watched keys expired.
|
// EXEC should not run if any of the watched keys expired.
|
||||||
if (!exec_info.watched_keys.empty() && !CheckWatchedKeyExpiry(cntx, registry_)) {
|
if (!exec_info.watched_keys.empty() &&
|
||||||
|
!CheckWatchedKeyExpiry(cntx, registry_.Find("EXISTS"), exec_cid_)) {
|
||||||
cntx->transaction->UnlockMulti();
|
cntx->transaction->UnlockMulti();
|
||||||
return rb->SendNull();
|
return rb->SendNull();
|
||||||
}
|
}
|
||||||
|
|
|
@ -484,6 +484,14 @@ TEST_F(MultiTest, Watch) {
|
||||||
Run({"multi"});
|
Run({"multi"});
|
||||||
ASSERT_THAT(Run({"exec"}), kExecFail);
|
ASSERT_THAT(Run({"exec"}), kExecFail);
|
||||||
|
|
||||||
|
// Check watch with nonempty exec body
|
||||||
|
EXPECT_EQ(Run({"watch", "a"}), "OK");
|
||||||
|
Run({"multi"});
|
||||||
|
Run({"get", "a"});
|
||||||
|
Run({"get", "b"});
|
||||||
|
Run({"get", "c"});
|
||||||
|
ASSERT_THAT(Run({"exec"}), kExecSuccess);
|
||||||
|
|
||||||
// Check watch data cleared after EXEC.
|
// Check watch data cleared after EXEC.
|
||||||
Run({"set", "a", "1"});
|
Run({"set", "a", "1"});
|
||||||
Run({"multi"});
|
Run({"multi"});
|
||||||
|
@ -499,10 +507,9 @@ TEST_F(MultiTest, Watch) {
|
||||||
// Check EXEC doesn't miss watched key expiration.
|
// Check EXEC doesn't miss watched key expiration.
|
||||||
Run({"watch", "a"});
|
Run({"watch", "a"});
|
||||||
Run({"expire", "a", "1"});
|
Run({"expire", "a", "1"});
|
||||||
|
|
||||||
AdvanceTime(1000);
|
AdvanceTime(1000);
|
||||||
|
|
||||||
Run({"multi"});
|
Run({"multi"});
|
||||||
|
Run({"get", "a"});
|
||||||
ASSERT_THAT(Run({"exec"}), kExecFail);
|
ASSERT_THAT(Run({"exec"}), kExecFail);
|
||||||
|
|
||||||
// Check unwatch.
|
// Check unwatch.
|
||||||
|
|
|
@ -507,7 +507,7 @@ void Transaction::MultiUpdateWithParent(const Transaction* parent) {
|
||||||
void Transaction::MultiBecomeSquasher() {
|
void Transaction::MultiBecomeSquasher() {
|
||||||
DCHECK(multi_->mode == GLOBAL || multi_->mode == LOCK_AHEAD);
|
DCHECK(multi_->mode == GLOBAL || multi_->mode == LOCK_AHEAD);
|
||||||
DCHECK_GT(GetUniqueShardCnt(), 0u); // initialized and determined active shards
|
DCHECK_GT(GetUniqueShardCnt(), 0u); // initialized and determined active shards
|
||||||
DCHECK(cid_->IsMultiTransactional()); // proper base command set
|
DCHECK(cid_->IsMultiTransactional()) << cid_->name(); // proper base command set
|
||||||
multi_->role = SQUASHER;
|
multi_->role = SQUASHER;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue