From 9912df09ae900fee7057416451dea252e009ebe0 Mon Sep 17 00:00:00 2001 From: Shahar Mike Date: Thu, 8 Feb 2024 14:47:07 +0200 Subject: [PATCH] fix(server): Init tx time for all multi/lua transactions (#2562) * fix(server): Return correct `TIME` under unscheduled tx Fixes #2555 * Init tx time in all multi / lua cases * init ctor --- src/server/generic_family_test.cc | 29 ++++++++++++++++++++++++++++- src/server/transaction.cc | 9 +++++++-- src/server/transaction.h | 2 ++ 3 files changed, 37 insertions(+), 3 deletions(-) diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index 006df7ecc..eb241b1e2 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -467,7 +467,7 @@ TEST_F(GenericFamilyTest, Sort) { ASSERT_THAT(Run({"sort", "list-2"}), ErrArg("One or more scores can't be converted into double")); } -TEST_F(GenericFamilyTest, Time) { +TEST_F(GenericFamilyTest, TimeNoKeys) { auto resp = Run({"time"}); EXPECT_THAT(resp, ArrLen(2)); EXPECT_THAT(resp.GetVec()[0], ArgType(RespExpr::INT64)); @@ -488,6 +488,33 @@ TEST_F(GenericFamilyTest, Time) { int64_t val0 = get(resp.GetVec()[0].GetVec()[i].u); int64_t val1 = get(resp.GetVec()[1].GetVec()[i].u); EXPECT_EQ(val0, val1); + EXPECT_NE(val0, 0); + } +} + +TEST_F(GenericFamilyTest, TimeWithKeys) { + auto resp = Run({"time"}); + EXPECT_THAT(resp, ArrLen(2)); + EXPECT_THAT(resp.GetVec()[0], ArgType(RespExpr::INT64)); + EXPECT_THAT(resp.GetVec()[1], ArgType(RespExpr::INT64)); + + // Check that time is the same inside a transaction. + Run({"multi"}); + Run({"time"}); + usleep(2000); + Run({"time"}); + Run({"get", "x"}); + resp = Run({"exec"}); + EXPECT_THAT(resp, ArrLen(3)); + + ASSERT_THAT(resp.GetVec()[0], ArrLen(2)); + ASSERT_THAT(resp.GetVec()[1], ArrLen(2)); + + for (int i = 0; i < 2; ++i) { + int64_t val0 = get(resp.GetVec()[0].GetVec()[i].u); + int64_t val1 = get(resp.GetVec()[1].GetVec()[i].u); + EXPECT_EQ(val0, val1); + EXPECT_NE(val0, 0); } } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 2ff3c4b5b..0bdb82474 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -167,6 +167,7 @@ cv_status Transaction::BatonBarrierrier::Wait(time_point tp) { * @param cs */ Transaction::Transaction(const CommandId* cid) : cid_{cid} { + InitTxTime(); string_view cmd_name(cid_->name()); if (cmd_name == "EXEC" || cmd_name == "EVAL" || cmd_name == "EVALSHA") { multi_.reset(new MultiData); @@ -502,6 +503,10 @@ void Transaction::StartMultiNonAtomic() { multi_->mode = NON_ATOMIC; } +void Transaction::InitTxTime() { + time_now_ms_ = GetCurrentTimeMs(); +} + void Transaction::MultiSwitchCmd(const CommandId* cid) { DCHECK(multi_); DCHECK(!cb_ptr_); @@ -713,7 +718,7 @@ void Transaction::ScheduleInternal() { // Loop until successfully scheduled in all shards. while (true) { txid_ = op_seq.fetch_add(1, memory_order_relaxed); - time_now_ms_ = GetCurrentTimeMs(); + InitTxTime(); atomic_uint32_t schedule_fails = 0; auto cb = [this, &schedule_fails](EngineShard* shard) { @@ -793,7 +798,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { DCHECK(IsActive(unique_shard_id_)); DCHECK(shard_data_.size() == 1 || multi_->mode == NON_ATOMIC); - time_now_ms_ = GetCurrentTimeMs(); + InitTxTime(); shard_data_[SidToId(unique_shard_id_)].is_armed.store(true, memory_order_relaxed); // Start new phase, be careful with writes until phase end! diff --git a/src/server/transaction.h b/src/server/transaction.h index 0fd0fd3e4..04f521749 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -237,6 +237,8 @@ class Transaction { // Start multi in NON_ATOMIC mode. void StartMultiNonAtomic(); + void InitTxTime(); + // Report which shards had write commands that executed on stub transactions // and thus did not mark itself in MultiData::shard_journal_write. void ReportWritesSquashedMulti(absl::FunctionRef had_write);