diff --git a/src/server/cluster/cluster_family_test.cc b/src/server/cluster/cluster_family_test.cc index 758010719..62c20825e 100644 --- a/src/server/cluster/cluster_family_test.cc +++ b/src/server/cluster/cluster_family_test.cc @@ -36,6 +36,28 @@ class ClusterFamilyTest : public BaseFamilyTest { string GetMyId() { return RunPrivileged({"dflycluster", "myid"}).GetString(); } + + void ConfigSingleNodeCluster(string id) { + string config_template = R"json( + [ + { + "slot_ranges": [ + { + "start": 0, + "end": 16383 + } + ], + "master": { + "id": "$0", + "ip": "10.0.0.1", + "port": 7000 + }, + "replicas": [] + } + ])json"; + string config = absl::Substitute(config_template, id); + EXPECT_EQ(RunPrivileged({"dflycluster", "config", config}), "OK"); + } }; TEST_F(ClusterFamilyTest, ClusterConfigInvalidJSON) { @@ -136,25 +158,7 @@ TEST_F(ClusterFamilyTest, ClusterConfigInvalidOverlappingSlots) { } TEST_F(ClusterFamilyTest, ClusterConfigNoReplicas) { - EXPECT_EQ(RunPrivileged({"dflycluster", "config", R"json( - [ - { - "slot_ranges": [ - { - "start": 0, - "end": 16383 - } - ], - "master": { - "id": "abcd1234", - "ip": "10.0.0.1", - "port": 7000 - }, - "replicas": [] - } - ])json"}), - "OK"); - + ConfigSingleNodeCluster("abcd1234"); string cluster_info = Run({"cluster", "info"}).GetString(); EXPECT_THAT(cluster_info, HasSubstr("cluster_state:ok")); EXPECT_THAT(cluster_info, HasSubstr("cluster_slots_assigned:16384")); @@ -422,26 +426,7 @@ TEST_F(ClusterFamilyTest, ClusterGetSlotInfoInvalid) { } TEST_F(ClusterFamilyTest, ClusterGetSlotInfo) { - string config_template = R"json( - [ - { - "slot_ranges": [ - { - "start": 0, - "end": 16383 - } - ], - "master": { - "id": "$0", - "ip": "10.0.0.1", - "port": 7000 - }, - "replicas": [] - } - ])json"; - string config = absl::Substitute(config_template, GetMyId()); - - EXPECT_EQ(RunPrivileged({"dflycluster", "config", config}), "OK"); + ConfigSingleNodeCluster(GetMyId()); constexpr string_view kKey = "some-key"; const SlotId slot = ClusterConfig::KeySlot(kKey); @@ -480,26 +465,7 @@ TEST_F(ClusterFamilyTest, ClusterGetSlotInfo) { } TEST_F(ClusterFamilyTest, ClusterSlotsPopulate) { - string config_template = R"json( - [ - { - "slot_ranges": [ - { - "start": 0, - "end": 16383 - } - ], - "master": { - "id": "$0", - "ip": "10.0.0.1", - "port": 7000 - }, - "replicas": [] - } - ])json"; - string config = absl::Substitute(config_template, GetMyId()); - - EXPECT_EQ(RunPrivileged({"dflycluster", "config", config}), "OK"); + ConfigSingleNodeCluster(GetMyId()); Run({"debug", "populate", "10000", "key", "4", "SLOTS", "0", "1000"}); @@ -515,26 +481,7 @@ TEST_F(ClusterFamilyTest, ClusterSlotsPopulate) { } TEST_F(ClusterFamilyTest, ClusterConfigDeleteSlots) { - string config_template = R"json( - [ - { - "slot_ranges": [ - { - "start": 0, - "end": 16383 - } - ], - "master": { - "id": "$0", - "ip": "10.0.0.1", - "port": 7000 - }, - "replicas": [] - } - ])json"; - string config = absl::Substitute(config_template, GetMyId()); - - EXPECT_EQ(RunPrivileged({"dflycluster", "config", config}), "OK"); + ConfigSingleNodeCluster(GetMyId()); Run({"debug", "populate", "100000"}); @@ -546,8 +493,7 @@ TEST_F(ClusterFamilyTest, ClusterConfigDeleteSlots) { RespArray(ElementsAre(IntArg(2), "key_count", Not(IntArg(0)), "total_reads", IntArg(0), "total_writes", Not(IntArg(0)), "memory_bytes", IntArg(0)))))); - config = absl::Substitute(config_template, "abc"); - EXPECT_EQ(RunPrivileged({"dflycluster", "config", config}), "OK"); + ConfigSingleNodeCluster("abc"); ExpectConditionWithinTimeout([&]() { return CheckedInt({"dbsize"}) == 0; }); @@ -562,26 +508,7 @@ TEST_F(ClusterFamilyTest, ClusterConfigDeleteSlots) { // Test issue #1302 TEST_F(ClusterFamilyTest, ClusterConfigDeleteSlotsNoCrashOnShutdown) { - string config_template = R"json( - [ - { - "slot_ranges": [ - { - "start": 0, - "end": 16383 - } - ], - "master": { - "id": "$0", - "ip": "10.0.0.1", - "port": 7000 - }, - "replicas": [] - } - ])json"; - string config = absl::Substitute(config_template, GetMyId()); - - EXPECT_EQ(RunPrivileged({"dflycluster", "config", config}), "OK"); + ConfigSingleNodeCluster(GetMyId()); Run({"debug", "populate", "100000"}); @@ -593,10 +520,9 @@ TEST_F(ClusterFamilyTest, ClusterConfigDeleteSlotsNoCrashOnShutdown) { RespArray(ElementsAre(IntArg(2), "key_count", Not(IntArg(0)), "total_reads", IntArg(0), "total_writes", Not(IntArg(0)), "memory_bytes", IntArg(0)))))); - config = absl::Substitute(config_template, "abc"); // After running the new config we start a fiber that removes all slots from current instance // we immediately shut down to test that we do not crash. - EXPECT_EQ(RunPrivileged({"dflycluster", "config", config}), "OK"); + ConfigSingleNodeCluster("abc"); } TEST_F(ClusterFamilyTest, ClusterConfigDeleteSomeSlots) { @@ -674,24 +600,7 @@ TEST_F(ClusterFamilyTest, ClusterFirstConfigCallDropsEntriesNotOwnedByNode) { EXPECT_EQ(Run({"debug", "load", save_info.file_name}), "OK"); EXPECT_EQ(CheckedInt({"dbsize"}), 50000); - EXPECT_EQ(RunPrivileged({"dflycluster", "config", R"json( - [ - { - "slot_ranges": [ - { - "start": 0, - "end": 16383 - } - ], - "master": { - "id": "abcd1234", - "ip": "10.0.0.1", - "port": 7000 - }, - "replicas": [] - } - ])json"}), - "OK"); + ConfigSingleNodeCluster("abcd1234"); // Make sure `dbsize` all slots were removed ExpectConditionWithinTimeout([&]() { return CheckedInt({"dbsize"}) == 0; }); @@ -739,27 +648,36 @@ TEST_F(ClusterFamilyTest, FlushSlots) { _, "total_writes", _, "memory_bytes", _))))); } -TEST_F(ClusterFamilyTest, ClusterCrossSlot) { - string config_template = R"json( - [ - { - "slot_ranges": [ - { - "start": 0, - "end": 16383 - } - ], - "master": { - "id": "$0", - "ip": "10.0.0.1", - "port": 7000 - }, - "replicas": [] - } - ])json"; - string config = absl::Substitute(config_template, GetMyId()); +TEST_F(ClusterFamilyTest, FlushSlotsAndImmediatelySetValue) { + for (int count : {1, 10, 100, 1000, 10000, 100000}) { + ConfigSingleNodeCluster(GetMyId()); + + EXPECT_EQ(Run({"debug", "populate", absl::StrCat(count), "key", "4"}), "OK"); + EXPECT_EQ(Run({"get", "key:0"}), "xxxx"); + + EXPECT_THAT(Run({"cluster", "keyslot", "key:0"}), IntArg(2592)); + EXPECT_THAT(Run({"dbsize"}), IntArg(count)); + auto slot_size_response = Run({"dflycluster", "getslotinfo", "slots", "2592"}); + EXPECT_THAT(slot_size_response, RespArray(ElementsAre(_, "key_count", _, "total_reads", _, + "total_writes", _, "memory_bytes", _))); + auto slot_size = slot_size_response.GetVec()[2].GetInt(); + EXPECT_TRUE(slot_size.has_value()); + + EXPECT_EQ(Run({"dflycluster", "flushslots", "2592", "2592"}), "OK"); + // key:0 should have been removed, so APPEND will end up with key:0 == ZZZZ + EXPECT_THAT(Run({"append", "key:0", "ZZZZ"}), IntArg(4)); + EXPECT_EQ(Run({"get", "key:0"}), "ZZZZ"); + // db size should be count - (size of slot 2592) + 1, where 1 is for 'key:0' + ExpectConditionWithinTimeout( + [&]() { return CheckedInt({"dbsize"}) == (count - *slot_size + 1); }); + + ResetService(); + } +} + +TEST_F(ClusterFamilyTest, ClusterCrossSlot) { + ConfigSingleNodeCluster(GetMyId()); - EXPECT_EQ(RunPrivileged({"dflycluster", "config", config}), "OK"); EXPECT_EQ(Run({"SET", "key", "value"}), "OK"); EXPECT_EQ(Run({"GET", "key"}), "value"); diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 206dab698..388dc771d 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -409,12 +409,17 @@ OpResult DbSlice::FindMutableInternal(const Context& cntx } PreUpdate(cntx.db_index, res->it); - return {{res->it, res->exp_it, - AutoUpdater({.action = AutoUpdater::DestructorAction::kRun, - .db_slice = this, - .db_ind = cntx.db_index, - .it = res->it, - .key = key})}}; + // PreUpdate() might have caused a deletion of `it` + if (res->it.IsOccupied()) { + return {{res->it, res->exp_it, + AutoUpdater({.action = AutoUpdater::DestructorAction::kRun, + .db_slice = this, + .db_ind = cntx.db_index, + .it = res->it, + .key = key})}}; + } else { + return OpStatus::KEY_NOTFOUND; + } } DbSlice::ItAndExpConst DbSlice::FindReadOnly(const Context& cntx, std::string_view key) { @@ -577,15 +582,20 @@ OpResult DbSlice::AddOrFindInternal(const Context& cnt if (res.ok()) { PreUpdate(cntx.db_index, res->it); - return DbSlice::AddOrFindResult{ - .it = res->it, - .exp_it = res->exp_it, - .is_new = false, - .post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun, - .db_slice = this, - .db_ind = cntx.db_index, - .it = res->it, - .key = key})}; + // PreUpdate() might have caused a deletion of `it` + if (res->it.IsOccupied()) { + return DbSlice::AddOrFindResult{ + .it = res->it, + .exp_it = res->exp_it, + .is_new = false, + .post_updater = AutoUpdater({.action = AutoUpdater::DestructorAction::kRun, + .db_slice = this, + .db_ind = cntx.db_index, + .it = res->it, + .key = key})}; + } else { + res = OpStatus::KEY_NOTFOUND; + } } auto status = res.status(); CHECK(status == OpStatus::KEY_NOTFOUND || status == OpStatus::OUT_OF_MEMORY) << status; @@ -706,7 +716,8 @@ void DbSlice::FlushSlotsFb(const SlotSet& slot_ids) { // Slot deletion can take time as it traverses all the database, hence it runs in fiber. // We want to flush all the data of a slot that was added till the time the call to FlushSlotsFb // was made. Therefore we delete slots entries with version < next_version - uint64_t next_version = NextVersion(); + uint64_t next_version = 0; + std::string tmp; auto del_entry_cb = [&](PrimeTable::iterator it) { std::string_view key = it->first.GetSlice(&tmp); @@ -717,6 +728,33 @@ void DbSlice::FlushSlotsFb(const SlotSet& slot_ids) { return true; }; + auto on_change = [&](DbIndex db_index, const ChangeReq& req) { + FiberAtomicGuard fg; + PrimeTable* table = GetTables(db_index).first; + + auto iterate_bucket = [&](DbIndex db_index, PrimeTable::bucket_iterator it) { + while (!it.is_done()) { + del_entry_cb(it); + ++it; + } + }; + + if (const PrimeTable::bucket_iterator* bit = req.update()) { + if (bit->GetVersion() < next_version) { + iterate_bucket(db_index, *bit); + } + } else { + string_view key = get(req.change); + table->CVCUponInsert( + next_version, key, + [this, db_index, next_version, iterate_bucket](PrimeTable::bucket_iterator it) { + DCHECK_LT(it.GetVersion(), next_version); + iterate_bucket(db_index, it); + }); + } + }; + next_version = RegisterOnChange(std::move(on_change)); + ServerState& etl = *ServerState::tlocal(); PrimeTable* pt = &db_arr_[0]->prime; PrimeTable::Cursor cursor; @@ -730,6 +768,9 @@ void DbSlice::FlushSlotsFb(const SlotSet& slot_ids) { } } while (cursor && etl.gstate() != GlobalState::SHUTTING_DOWN); + + UnregisterOnChange(next_version); + etl.DecommitMemory(ServerState::kDataHeap); }