mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
fix(server): Fix multi tx cleanup (#723)
This commit is contained in:
parent
d660787c6b
commit
2df1c6636c
2 changed files with 45 additions and 5 deletions
|
@ -41,6 +41,11 @@ const char kKey2[] = "b";
|
||||||
const char kKey3[] = "c";
|
const char kKey3[] = "c";
|
||||||
const char kKey4[] = "y";
|
const char kKey4[] = "y";
|
||||||
|
|
||||||
|
const char kKeySid0[] = "x";
|
||||||
|
const char kKeySid1[] = "c";
|
||||||
|
const char kKeySid2[] = "b";
|
||||||
|
const char kKey2Sid0[] = "y";
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
// This test is responsible for server and main service
|
// This test is responsible for server and main service
|
||||||
|
@ -88,6 +93,15 @@ TEST_F(DflyEngineTest, Sds) {
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DflyEngineTest, MultiAndEval) {
|
TEST_F(DflyEngineTest, MultiAndEval) {
|
||||||
|
ShardId sid1 = Shard(kKey1, num_threads_ - 1);
|
||||||
|
ShardId sid2 = Shard(kKey2, num_threads_ - 1);
|
||||||
|
ShardId sid3 = Shard(kKey3, num_threads_ - 1);
|
||||||
|
ShardId sid4 = Shard(kKey4, num_threads_ - 1);
|
||||||
|
EXPECT_EQ(0, sid1);
|
||||||
|
EXPECT_EQ(2, sid2);
|
||||||
|
EXPECT_EQ(1, sid3);
|
||||||
|
EXPECT_EQ(0, sid4);
|
||||||
|
|
||||||
RespExpr resp = Run({"multi"});
|
RespExpr resp = Run({"multi"});
|
||||||
ASSERT_EQ(resp, "OK");
|
ASSERT_EQ(resp, "OK");
|
||||||
|
|
||||||
|
@ -412,6 +426,32 @@ return {offset, epoch}
|
||||||
EXPECT_THAT(resp.GetVec(), ElementsAre(IntArg(1), "6"));
|
EXPECT_THAT(resp.GetVec(), ElementsAre(IntArg(1), "6"));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Scenario: 1. a lua call A schedules itself on shards 0, 1, 2.
|
||||||
|
// 2. another lua call B schedules itself on shards 1,2 but on shard 1 (or 2) it
|
||||||
|
// schedules itself before A.
|
||||||
|
// the order of scheduling: shard 0: A, shard 1: B, A. shard 2: B, A.
|
||||||
|
// 3. A is executes its first command first, which coincendently runs only on shard 0,
|
||||||
|
// hence A finishes before B and then it tries to cleanup.
|
||||||
|
// 4. There was an incorrect cleanup of multi-transactions that breaks for shard 1 (or 2)
|
||||||
|
// because it assume the A is at front of the queue.
|
||||||
|
TEST_F(DflyEngineTest, EvalBug713) {
|
||||||
|
const char* script = "return redis.call('get', KEYS[1])";
|
||||||
|
|
||||||
|
// A
|
||||||
|
auto fb0 = pp_->at(1)->LaunchFiber([&] {
|
||||||
|
fibers_ext::Yield();
|
||||||
|
for (unsigned i = 0; i < 50; ++i) {
|
||||||
|
Run({"eval", script, "3", kKeySid0, kKeySid1, kKeySid2});
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
// B
|
||||||
|
for (unsigned j = 0; j < 50; ++j) {
|
||||||
|
Run({"eval", script, "2", kKeySid1, kKeySid2});
|
||||||
|
}
|
||||||
|
fb0.Join();
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(DflyEngineTest, EvalSha) {
|
TEST_F(DflyEngineTest, EvalSha) {
|
||||||
auto resp = Run({"script", "load", "return 5"});
|
auto resp = Run({"script", "load", "return 5"});
|
||||||
EXPECT_THAT(resp, ArgType(RespExpr::STRING));
|
EXPECT_THAT(resp, ArgType(RespExpr::STRING));
|
||||||
|
|
|
@ -1103,15 +1103,15 @@ void Transaction::UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, E
|
||||||
|
|
||||||
// It does not have to be that all shards in multi transaction execute this tx.
|
// It does not have to be that all shards in multi transaction execute this tx.
|
||||||
// Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from
|
// Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from
|
||||||
// there.
|
// there. The transaction is not guaranteed to be at front.
|
||||||
if (sd.pq_pos != TxQueue::kEnd) {
|
if (sd.pq_pos != TxQueue::kEnd) {
|
||||||
DVLOG(1) << "unlockmulti: TxPopFront " << DebugId();
|
DVLOG(1) << "unlockmulti: TxRemove " << DebugId();
|
||||||
|
|
||||||
TxQueue* txq = shard->txq();
|
TxQueue* txq = shard->txq();
|
||||||
DCHECK(!txq->Empty());
|
DCHECK(!txq->Empty());
|
||||||
Transaction* trans = absl::get<Transaction*>(txq->Front());
|
DCHECK_EQ(absl::get<Transaction*>(txq->At(sd.pq_pos)), this);
|
||||||
DCHECK(trans == this);
|
|
||||||
txq->PopFront();
|
txq->Remove(sd.pq_pos);
|
||||||
sd.pq_pos = TxQueue::kEnd;
|
sd.pq_pos = TxQueue::kEnd;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue