mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
test(server): test transaction locked keys (#1669)
* test(server): test transaction locked keys 1. add test utility class that will add suspension to transaction execution. 2. add test for locked keys in transaction Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
16c2353faf
commit
f9a3e2811c
5 changed files with 76 additions and 18 deletions
6
.github/workflows/ci.yml
vendored
6
.github/workflows/ci.yml
vendored
|
@ -106,9 +106,9 @@ jobs:
|
|||
echo "Running tests with both --cluster_mode=emulated & --lock_on_hashtags"
|
||||
FLAGS_cluster_mode=emulated FLAGS_lock_on_hashtags=true ctest -V -L DFLY
|
||||
|
||||
./dragonfly_test --gtest_repeat=10
|
||||
./multi_test --multi_exec_mode=1 --gtest_repeat=10
|
||||
./multi_test --multi_exec_mode=3 --gtest_repeat=10
|
||||
./dragonfly_test
|
||||
./multi_test --multi_exec_mode=1
|
||||
./multi_test --multi_exec_mode=3
|
||||
# GLOG_logtostderr=1 GLOG_vmodule=transaction=1,engine_shard_set=1 CTEST_OUTPUT_ON_FAILURE=1 ninja server/test
|
||||
lint-test-chart:
|
||||
runs-on: ubuntu-latest
|
||||
|
|
|
@ -33,21 +33,6 @@ class ClusterFamilyTest : public BaseFamilyTest {
|
|||
protected:
|
||||
static constexpr string_view kInvalidConfiguration = "Invalid cluster configuration";
|
||||
|
||||
void ExpectConditionWithinTimeout(const std::function<bool()>& condition,
|
||||
absl::Duration timeout = absl::Seconds(10)) {
|
||||
absl::Time deadline = absl::Now() + timeout;
|
||||
|
||||
while (deadline > absl::Now()) {
|
||||
if (condition()) {
|
||||
break;
|
||||
}
|
||||
absl::SleepFor(absl::Milliseconds(10));
|
||||
}
|
||||
|
||||
EXPECT_LE(absl::Now(), deadline)
|
||||
<< "Timeout of " << timeout << " reached when expecting condition";
|
||||
}
|
||||
|
||||
string GetMyId() {
|
||||
return RunAdmin({"dflycluster", "myid"}).GetString();
|
||||
}
|
||||
|
|
|
@ -807,6 +807,31 @@ TEST_F(MultiTest, MultiLeavesTxQueue) {
|
|||
ASSERT_TRUE(success);
|
||||
}
|
||||
|
||||
TEST_F(MultiTest, TestLockedKeys) {
|
||||
if (auto mode = absl::GetFlag(FLAGS_multi_exec_mode); mode != Transaction::LOCK_AHEAD) {
|
||||
GTEST_SKIP() << "Skipped TestLockedKeys test because multi_exec_mode is not lock ahead";
|
||||
return;
|
||||
}
|
||||
|
||||
TransactionSuspension tx;
|
||||
tx.Start();
|
||||
|
||||
auto fb0 = pp_->at(0)->LaunchFiber([&] {
|
||||
EXPECT_EQ(Run({"multi"}), "OK");
|
||||
EXPECT_EQ(Run({"set", "key1", "val1"}), "QUEUED");
|
||||
EXPECT_EQ(Run({"set", "key2", "val2"}), "QUEUED");
|
||||
EXPECT_THAT(Run({"exec"}), RespArray(ElementsAre("OK", "OK")));
|
||||
});
|
||||
|
||||
ExpectConditionWithinTimeout(
|
||||
[&]() { return service_->IsLocked(0, "key1") && service_->IsLocked(0, "key2"); });
|
||||
|
||||
tx.Terminate();
|
||||
fb0.Join();
|
||||
EXPECT_FALSE(service_->IsLocked(0, "key1"));
|
||||
EXPECT_FALSE(service_->IsLocked(0, "key1"));
|
||||
}
|
||||
|
||||
class MultiEvalTest : public BaseFamilyTest {
|
||||
protected:
|
||||
MultiEvalTest() : BaseFamilyTest() {
|
||||
|
|
|
@ -66,6 +66,23 @@ void TestConnection::SendPubMessageAsync(PubMessage pmsg) {
|
|||
messages.push_back(move(pmsg));
|
||||
}
|
||||
|
||||
void TransactionSuspension::Start() {
|
||||
CommandId cid{"TEST", CO::WRITE | CO::GLOBAL_TRANS, -1, 0, 0, 0};
|
||||
|
||||
transaction_ = new dfly::Transaction{&cid};
|
||||
|
||||
auto st = transaction_->InitByArgs(0, {});
|
||||
CHECK_EQ(st, OpStatus::OK);
|
||||
|
||||
transaction_->Schedule();
|
||||
transaction_->Execute([](Transaction* t, EngineShard* shard) { return OpStatus::OK; }, false);
|
||||
}
|
||||
|
||||
void TransactionSuspension::Terminate() {
|
||||
transaction_->Conclude();
|
||||
transaction_ = nullptr;
|
||||
}
|
||||
|
||||
class BaseFamilyTest::TestConnWrapper {
|
||||
public:
|
||||
TestConnWrapper(Protocol proto);
|
||||
|
@ -509,6 +526,21 @@ absl::flat_hash_set<string> BaseFamilyTest::GetLastUsedKeys() {
|
|||
return result;
|
||||
}
|
||||
|
||||
void BaseFamilyTest::ExpectConditionWithinTimeout(const std::function<bool()>& condition,
|
||||
absl::Duration timeout) {
|
||||
absl::Time deadline = absl::Now() + timeout;
|
||||
|
||||
while (deadline > absl::Now()) {
|
||||
if (condition()) {
|
||||
break;
|
||||
}
|
||||
absl::SleepFor(absl::Milliseconds(10));
|
||||
}
|
||||
|
||||
EXPECT_LE(absl::Now(), deadline)
|
||||
<< "Timeout of " << timeout << " reached when expecting condition";
|
||||
}
|
||||
|
||||
void BaseFamilyTest::SetTestFlag(string_view flag_name, string_view new_value) {
|
||||
auto* flag = absl::FindCommandLineFlag(flag_name);
|
||||
CHECK_NE(flag, nullptr);
|
||||
|
|
|
@ -12,6 +12,7 @@
|
|||
#include "io/io.h"
|
||||
#include "server/conn_context.h"
|
||||
#include "server/main_service.h"
|
||||
#include "server/transaction.h"
|
||||
#include "util/proactor_pool.h"
|
||||
|
||||
namespace dfly {
|
||||
|
@ -37,6 +38,19 @@ class TestConnection : public facade::Connection {
|
|||
bool is_admin_ = false;
|
||||
};
|
||||
|
||||
// The TransactionSuspension class is designed to facilitate the temporary suspension of commands
|
||||
// executions. When the 'start' method is invoked, it enforces the suspension of other
|
||||
// transactions by acquiring a global shard lock. Conversely, invoking the 'terminate' method
|
||||
// releases the global shard lock, enabling all transactions in the queue to resume execution.
|
||||
class TransactionSuspension {
|
||||
public:
|
||||
void Start();
|
||||
void Terminate();
|
||||
|
||||
private:
|
||||
boost::intrusive_ptr<dfly::Transaction> transaction_;
|
||||
};
|
||||
|
||||
class BaseFamilyTest : public ::testing::Test {
|
||||
protected:
|
||||
BaseFamilyTest();
|
||||
|
@ -105,6 +119,8 @@ class BaseFamilyTest : public ::testing::Test {
|
|||
size_t index) const;
|
||||
|
||||
static absl::flat_hash_set<std::string> GetLastUsedKeys();
|
||||
static void ExpectConditionWithinTimeout(const std::function<bool()>& condition,
|
||||
absl::Duration timeout = absl::Seconds(10));
|
||||
|
||||
static unsigned NumLocked();
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue