mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
fix(server): Init tx time for all multi/lua transactions (#2562)
* fix(server): Return correct `TIME` under unscheduled tx Fixes #2555 * Init tx time in all multi / lua cases * init ctor
This commit is contained in:
parent
72f651d527
commit
9912df09ae
3 changed files with 37 additions and 3 deletions
|
@ -467,7 +467,7 @@ TEST_F(GenericFamilyTest, Sort) {
|
||||||
ASSERT_THAT(Run({"sort", "list-2"}), ErrArg("One or more scores can't be converted into double"));
|
ASSERT_THAT(Run({"sort", "list-2"}), ErrArg("One or more scores can't be converted into double"));
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(GenericFamilyTest, Time) {
|
TEST_F(GenericFamilyTest, TimeNoKeys) {
|
||||||
auto resp = Run({"time"});
|
auto resp = Run({"time"});
|
||||||
EXPECT_THAT(resp, ArrLen(2));
|
EXPECT_THAT(resp, ArrLen(2));
|
||||||
EXPECT_THAT(resp.GetVec()[0], ArgType(RespExpr::INT64));
|
EXPECT_THAT(resp.GetVec()[0], ArgType(RespExpr::INT64));
|
||||||
|
@ -488,6 +488,33 @@ TEST_F(GenericFamilyTest, Time) {
|
||||||
int64_t val0 = get<int64_t>(resp.GetVec()[0].GetVec()[i].u);
|
int64_t val0 = get<int64_t>(resp.GetVec()[0].GetVec()[i].u);
|
||||||
int64_t val1 = get<int64_t>(resp.GetVec()[1].GetVec()[i].u);
|
int64_t val1 = get<int64_t>(resp.GetVec()[1].GetVec()[i].u);
|
||||||
EXPECT_EQ(val0, val1);
|
EXPECT_EQ(val0, val1);
|
||||||
|
EXPECT_NE(val0, 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(GenericFamilyTest, TimeWithKeys) {
|
||||||
|
auto resp = Run({"time"});
|
||||||
|
EXPECT_THAT(resp, ArrLen(2));
|
||||||
|
EXPECT_THAT(resp.GetVec()[0], ArgType(RespExpr::INT64));
|
||||||
|
EXPECT_THAT(resp.GetVec()[1], ArgType(RespExpr::INT64));
|
||||||
|
|
||||||
|
// Check that time is the same inside a transaction.
|
||||||
|
Run({"multi"});
|
||||||
|
Run({"time"});
|
||||||
|
usleep(2000);
|
||||||
|
Run({"time"});
|
||||||
|
Run({"get", "x"});
|
||||||
|
resp = Run({"exec"});
|
||||||
|
EXPECT_THAT(resp, ArrLen(3));
|
||||||
|
|
||||||
|
ASSERT_THAT(resp.GetVec()[0], ArrLen(2));
|
||||||
|
ASSERT_THAT(resp.GetVec()[1], ArrLen(2));
|
||||||
|
|
||||||
|
for (int i = 0; i < 2; ++i) {
|
||||||
|
int64_t val0 = get<int64_t>(resp.GetVec()[0].GetVec()[i].u);
|
||||||
|
int64_t val1 = get<int64_t>(resp.GetVec()[1].GetVec()[i].u);
|
||||||
|
EXPECT_EQ(val0, val1);
|
||||||
|
EXPECT_NE(val0, 0);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -167,6 +167,7 @@ cv_status Transaction::BatonBarrierrier::Wait(time_point tp) {
|
||||||
* @param cs
|
* @param cs
|
||||||
*/
|
*/
|
||||||
Transaction::Transaction(const CommandId* cid) : cid_{cid} {
|
Transaction::Transaction(const CommandId* cid) : cid_{cid} {
|
||||||
|
InitTxTime();
|
||||||
string_view cmd_name(cid_->name());
|
string_view cmd_name(cid_->name());
|
||||||
if (cmd_name == "EXEC" || cmd_name == "EVAL" || cmd_name == "EVALSHA") {
|
if (cmd_name == "EXEC" || cmd_name == "EVAL" || cmd_name == "EVALSHA") {
|
||||||
multi_.reset(new MultiData);
|
multi_.reset(new MultiData);
|
||||||
|
@ -502,6 +503,10 @@ void Transaction::StartMultiNonAtomic() {
|
||||||
multi_->mode = NON_ATOMIC;
|
multi_->mode = NON_ATOMIC;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Transaction::InitTxTime() {
|
||||||
|
time_now_ms_ = GetCurrentTimeMs();
|
||||||
|
}
|
||||||
|
|
||||||
void Transaction::MultiSwitchCmd(const CommandId* cid) {
|
void Transaction::MultiSwitchCmd(const CommandId* cid) {
|
||||||
DCHECK(multi_);
|
DCHECK(multi_);
|
||||||
DCHECK(!cb_ptr_);
|
DCHECK(!cb_ptr_);
|
||||||
|
@ -713,7 +718,7 @@ void Transaction::ScheduleInternal() {
|
||||||
// Loop until successfully scheduled in all shards.
|
// Loop until successfully scheduled in all shards.
|
||||||
while (true) {
|
while (true) {
|
||||||
txid_ = op_seq.fetch_add(1, memory_order_relaxed);
|
txid_ = op_seq.fetch_add(1, memory_order_relaxed);
|
||||||
time_now_ms_ = GetCurrentTimeMs();
|
InitTxTime();
|
||||||
|
|
||||||
atomic_uint32_t schedule_fails = 0;
|
atomic_uint32_t schedule_fails = 0;
|
||||||
auto cb = [this, &schedule_fails](EngineShard* shard) {
|
auto cb = [this, &schedule_fails](EngineShard* shard) {
|
||||||
|
@ -793,7 +798,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
|
||||||
DCHECK(IsActive(unique_shard_id_));
|
DCHECK(IsActive(unique_shard_id_));
|
||||||
DCHECK(shard_data_.size() == 1 || multi_->mode == NON_ATOMIC);
|
DCHECK(shard_data_.size() == 1 || multi_->mode == NON_ATOMIC);
|
||||||
|
|
||||||
time_now_ms_ = GetCurrentTimeMs();
|
InitTxTime();
|
||||||
shard_data_[SidToId(unique_shard_id_)].is_armed.store(true, memory_order_relaxed);
|
shard_data_[SidToId(unique_shard_id_)].is_armed.store(true, memory_order_relaxed);
|
||||||
|
|
||||||
// Start new phase, be careful with writes until phase end!
|
// Start new phase, be careful with writes until phase end!
|
||||||
|
|
|
@ -237,6 +237,8 @@ class Transaction {
|
||||||
// Start multi in NON_ATOMIC mode.
|
// Start multi in NON_ATOMIC mode.
|
||||||
void StartMultiNonAtomic();
|
void StartMultiNonAtomic();
|
||||||
|
|
||||||
|
void InitTxTime();
|
||||||
|
|
||||||
// Report which shards had write commands that executed on stub transactions
|
// Report which shards had write commands that executed on stub transactions
|
||||||
// and thus did not mark itself in MultiData::shard_journal_write.
|
// and thus did not mark itself in MultiData::shard_journal_write.
|
||||||
void ReportWritesSquashedMulti(absl::FunctionRef<bool(ShardId)> had_write);
|
void ReportWritesSquashedMulti(absl::FunctionRef<bool(ShardId)> had_write);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue