diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index 6a6007089..2d79445ad 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -106,9 +106,9 @@ jobs: echo "Running tests with both --cluster_mode=emulated & --lock_on_hashtags" FLAGS_cluster_mode=emulated FLAGS_lock_on_hashtags=true ctest -V -L DFLY - ./dragonfly_test --gtest_repeat=10 - ./multi_test --multi_exec_mode=1 --gtest_repeat=10 - ./multi_test --multi_exec_mode=3 --gtest_repeat=10 + ./dragonfly_test + ./multi_test --multi_exec_mode=1 + ./multi_test --multi_exec_mode=3 # GLOG_logtostderr=1 GLOG_vmodule=transaction=1,engine_shard_set=1 CTEST_OUTPUT_ON_FAILURE=1 ninja server/test lint-test-chart: runs-on: ubuntu-latest diff --git a/src/server/cluster/cluster_family_test.cc b/src/server/cluster/cluster_family_test.cc index c4bf0884b..acf44df0b 100644 --- a/src/server/cluster/cluster_family_test.cc +++ b/src/server/cluster/cluster_family_test.cc @@ -33,21 +33,6 @@ class ClusterFamilyTest : public BaseFamilyTest { protected: static constexpr string_view kInvalidConfiguration = "Invalid cluster configuration"; - void ExpectConditionWithinTimeout(const std::function& condition, - absl::Duration timeout = absl::Seconds(10)) { - absl::Time deadline = absl::Now() + timeout; - - while (deadline > absl::Now()) { - if (condition()) { - break; - } - absl::SleepFor(absl::Milliseconds(10)); - } - - EXPECT_LE(absl::Now(), deadline) - << "Timeout of " << timeout << " reached when expecting condition"; - } - string GetMyId() { return RunAdmin({"dflycluster", "myid"}).GetString(); } diff --git a/src/server/multi_test.cc b/src/server/multi_test.cc index 15805f4e3..cf02d7404 100644 --- a/src/server/multi_test.cc +++ b/src/server/multi_test.cc @@ -807,6 +807,31 @@ TEST_F(MultiTest, MultiLeavesTxQueue) { ASSERT_TRUE(success); } +TEST_F(MultiTest, TestLockedKeys) { + if (auto mode = absl::GetFlag(FLAGS_multi_exec_mode); mode != Transaction::LOCK_AHEAD) { + GTEST_SKIP() << "Skipped TestLockedKeys test because multi_exec_mode is not lock ahead"; + return; + } + + TransactionSuspension tx; + tx.Start(); + + auto fb0 = pp_->at(0)->LaunchFiber([&] { + EXPECT_EQ(Run({"multi"}), "OK"); + EXPECT_EQ(Run({"set", "key1", "val1"}), "QUEUED"); + EXPECT_EQ(Run({"set", "key2", "val2"}), "QUEUED"); + EXPECT_THAT(Run({"exec"}), RespArray(ElementsAre("OK", "OK"))); + }); + + ExpectConditionWithinTimeout( + [&]() { return service_->IsLocked(0, "key1") && service_->IsLocked(0, "key2"); }); + + tx.Terminate(); + fb0.Join(); + EXPECT_FALSE(service_->IsLocked(0, "key1")); + EXPECT_FALSE(service_->IsLocked(0, "key1")); +} + class MultiEvalTest : public BaseFamilyTest { protected: MultiEvalTest() : BaseFamilyTest() { diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 6694cff26..9ebdafc0b 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -66,6 +66,23 @@ void TestConnection::SendPubMessageAsync(PubMessage pmsg) { messages.push_back(move(pmsg)); } +void TransactionSuspension::Start() { + CommandId cid{"TEST", CO::WRITE | CO::GLOBAL_TRANS, -1, 0, 0, 0}; + + transaction_ = new dfly::Transaction{&cid}; + + auto st = transaction_->InitByArgs(0, {}); + CHECK_EQ(st, OpStatus::OK); + + transaction_->Schedule(); + transaction_->Execute([](Transaction* t, EngineShard* shard) { return OpStatus::OK; }, false); +} + +void TransactionSuspension::Terminate() { + transaction_->Conclude(); + transaction_ = nullptr; +} + class BaseFamilyTest::TestConnWrapper { public: TestConnWrapper(Protocol proto); @@ -509,6 +526,21 @@ absl::flat_hash_set BaseFamilyTest::GetLastUsedKeys() { return result; } +void BaseFamilyTest::ExpectConditionWithinTimeout(const std::function& condition, + absl::Duration timeout) { + absl::Time deadline = absl::Now() + timeout; + + while (deadline > absl::Now()) { + if (condition()) { + break; + } + absl::SleepFor(absl::Milliseconds(10)); + } + + EXPECT_LE(absl::Now(), deadline) + << "Timeout of " << timeout << " reached when expecting condition"; +} + void BaseFamilyTest::SetTestFlag(string_view flag_name, string_view new_value) { auto* flag = absl::FindCommandLineFlag(flag_name); CHECK_NE(flag, nullptr); diff --git a/src/server/test_utils.h b/src/server/test_utils.h index 737bae150..621d7938f 100644 --- a/src/server/test_utils.h +++ b/src/server/test_utils.h @@ -12,6 +12,7 @@ #include "io/io.h" #include "server/conn_context.h" #include "server/main_service.h" +#include "server/transaction.h" #include "util/proactor_pool.h" namespace dfly { @@ -37,6 +38,19 @@ class TestConnection : public facade::Connection { bool is_admin_ = false; }; +// The TransactionSuspension class is designed to facilitate the temporary suspension of commands +// executions. When the 'start' method is invoked, it enforces the suspension of other +// transactions by acquiring a global shard lock. Conversely, invoking the 'terminate' method +// releases the global shard lock, enabling all transactions in the queue to resume execution. +class TransactionSuspension { + public: + void Start(); + void Terminate(); + + private: + boost::intrusive_ptr transaction_; +}; + class BaseFamilyTest : public ::testing::Test { protected: BaseFamilyTest(); @@ -105,6 +119,8 @@ class BaseFamilyTest : public ::testing::Test { size_t index) const; static absl::flat_hash_set GetLastUsedKeys(); + static void ExpectConditionWithinTimeout(const std::function& condition, + absl::Duration timeout = absl::Seconds(10)); static unsigned NumLocked();