chore(transaction): More blocking tests (#3546)

This commit is contained in:
Vladislav 2024-08-26 10:02:08 +03:00 committed by GitHub
parent 789603d1a7
commit b7eccad5bd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -780,6 +780,7 @@ TEST_F(ListFamilyTest, BLMove) {
ASSERT_THAT(resp.GetVec(), ElementsAre("val1", "val2"));
}
// Wake two BLMOVEs on the same shard simultaneously
TEST_F(ListFamilyTest, BLMoveSimultaneously) {
EXPECT_EQ(Shard("src1", shard_set->size()),
Shard("src10", shard_set->size())); // wake on same shard
@ -806,20 +807,22 @@ TEST_F(ListFamilyTest, BLMoveSimultaneously) {
EXPECT_THAT(res.GetVec(), UnorderedElementsAre("v1", "v2"));
}
// Move key five times in rings 0 -> 1 -> 2 ... -> 0
TEST_F(ListFamilyTest, BLMoveRings) {
vector<fb2::Fiber> fibers;
fibers.reserve(10);
#pragma GCC diagnostic push
// We compile this code both with C++17 and C++20 and if you capture
// by [=, this] it becomes an error on C++17 and if you capture
// by [=] it becomes and error in C++20
#pragma GCC diagnostic ignored "-Wdeprecated"
for (int i = 0; i < 10; i++) {
fibers.emplace_back(pp_->at(i % 3)->LaunchFiber([=]() {
auto key1 = to_string(i);
auto key2 = to_string(i + 1);
Run(key1, {"blmove", key1, key2, "LEFT", "RIGHT", "0"});
}));
for (int j = 0; j < 5; j++) {
for (int i = 0; i < 10; i++) {
fibers.emplace_back(pp_->at(i % pp_->size())->LaunchFiber([=]() {
auto key1 = to_string(i);
auto key2 = to_string((i + 1) % 10);
Run(key1 + to_string(j), {"blmove", key1, key2, "LEFT", "RIGHT", "0"});
}));
}
}
#pragma GCC diagnostic pop
@ -829,9 +832,89 @@ TEST_F(ListFamilyTest, BLMoveRings) {
for (auto& fiber : fibers)
fiber.Join();
for (int i = 0; i < 10; i++)
for (int i = 1; i < 10; i++)
EXPECT_THAT(Run({"llen", to_string(i)}), IntArg(0));
EXPECT_EQ(Run({"lrange", "10", "0", "-1"}), "v1");
EXPECT_EQ(Run({"lrange", "0", "0", "-1"}), "v1");
}
// Move in waves where each wave layer has a fixed set of "vertices" through which all values travel
TEST_F(ListFamilyTest, BLMoveWaves) {
const int kFlow = 64;
vector<int> wave_sizes = {1 /* 0:0 */, kFlow, kFlow / 2, kFlow / 4, kFlow / 8, kFlow / 3,
kFlow / 5, 1, kFlow / 6, kFlow, kFlow / 4, 1};
vector<fb2::Fiber> fibers;
for (size_t i = 1; i < wave_sizes.size(); i++) {
for (size_t j = 0; j < kFlow; j++) {
auto src = to_string(i - 1) + ":" + to_string(j / (kFlow / wave_sizes[i - 1]));
auto dest = to_string(i) + ":" + to_string(j / (kFlow / wave_sizes[i]));
fibers.emplace_back(pp_->at(i % 3)->LaunchFiber([=]() {
Run("c" + to_string(i * kFlow + j), {"blmove", src, dest, "LEFT", "RIGHT", "0"});
}));
}
}
vector<string> values(kFlow);
for (size_t i = 0; i < kFlow; i++)
values[i] = "v" + to_string(i);
Run({"multi"});
for (size_t i = 0; i < kFlow; i++)
Run({"lpush", "0:0", values[i]});
Run({"exec"});
for (auto& fiber : fibers)
fiber.Join();
auto res = Run({"lrange", to_string(wave_sizes.size() - 1) + ":0", "0", "-1"});
EXPECT_THAT(res.GetVec(), UnorderedElementsAreArray(values));
}
// Move value back and forth between two lists, verfiy that atomic lookup of states catches it only
// in one of two possible states
TEST_F(ListFamilyTest, BLMovePendulum) {
GTEST_SKIP() << "Blocking commands don't respect transactional ordering after waking up";
// Suppose BLMOVE A -> B is running, then MULTI LLEN A LLEN B EXEC will
// 1. Run on shard B because it doesn't have "blocking" keys freely, so LLEN B = 0
// 2. Will run on shard A after BLMOVE A removed itself from the "awakened" set, so LLEN A = 0
// => we observe a theoretically impossible state and the execution order is not linearizable
vector<fb2::Fiber> fibers;
atomic_bool stopped = false;
auto swing = [this, &stopped](int i, string src, string dest) {
while (!stopped.load(std::memory_order_relaxed))
Run(src + dest + to_string(i), {"blmove", src, dest, "LEFT", "RIGHT", "0"});
};
for (int i = 0; i < 3; i++)
fibers.emplace_back(pp_->at(i % pp_->size())->LaunchFiber([=]() { swing(i, "A", "B"); }));
for (int i = 0; i < 3; i++)
fibers.emplace_back(pp_->at(i % pp_->size())->LaunchFiber([=]() { swing(i, "B", "A"); }));
Run({"lpush", "A", "v"});
ThisFiber::SleepFor(1ms);
for (int i = 0; i < 100; i++) {
Run({"multi"});
Run({"llen", "A"});
Run({"llen", "B"});
auto res = Run({"EXEC"});
int i1 = *res.GetVec()[0].GetInt();
int i2 = *res.GetVec()[1].GetInt();
ASSERT_EQ(i1 + i2, 1);
}
stopped = true;
Run({"lpush", "A", "stop"});
Run({"lpush", "B", "stop"});
for (auto& fiber : fibers)
fiber.Join();
int i1 = *Run({"llen", "A"}).GetInt();
int i2 = *Run({"llen", "B"}).GetInt();
ASSERT_EQ(i1 + i2, 3); // v, stop, stop
}
TEST_F(ListFamilyTest, LPushX) {