diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 5acec527f..636de415a 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -113,6 +113,10 @@ bool RestoreStreamer::ShouldWrite(const journal::JournalItem& item) const { return ShouldWrite(*item.slot); } +bool RestoreStreamer::ShouldWrite(std::string_view key) const { + return ShouldWrite(ClusterConfig::KeySlot(key)); +} + bool RestoreStreamer::ShouldWrite(SlotId slot_id) const { return my_slots_.contains(slot_id); } @@ -121,28 +125,29 @@ void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { DCHECK_LT(it.GetVersion(), snapshot_version_); it.SetVersion(snapshot_version_); + bool is_data_present = false; { FiberAtomicGuard fg; // Can't switch fibers because that could invalidate iterator - - while (!it.is_done()) { + string key_buffer; // we can reuse it + for (; !it.is_done(); ++it) { const auto& pv = it->second; - - string key_buffer; string_view key = it->first.GetSlice(&key_buffer); + if (ShouldWrite(key)) { + is_data_present = true; - uint64_t expire = 0; - if (pv.HasExpire()) { - auto eit = db_slice_->databases()[0]->expire.Find(it->first); - expire = db_slice_->ExpireTime(eit); + uint64_t expire = 0; + if (pv.HasExpire()) { + auto eit = db_slice_->databases()[0]->expire.Find(it->first); + expire = db_slice_->ExpireTime(eit); + } + + WriteEntry(key, pv, expire); } - - WriteEntry(key, pv, expire); - - ++it; } } - NotifyWritten(true); + if (is_data_present) + NotifyWritten(true); } void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { @@ -166,7 +171,6 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pv, uint64_t expire_ms) { absl::InlinedVector args; - args.push_back(key); string expire_str = absl::StrCat(expire_ms); diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 91a95f937..7b184573b 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -68,6 +68,7 @@ class RestoreStreamer : public JournalStreamer { private: void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req); bool ShouldWrite(const journal::JournalItem& item) const override; + bool ShouldWrite(std::string_view key) const; bool ShouldWrite(SlotId slot_id) const; void WriteBucket(PrimeTable::bucket_iterator it); diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 3fff9f03d..53fc37bc9 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -762,31 +762,13 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory): config = f""" [ {{ - "slot_ranges": [ - {{ - "start": 0, - "end": LAST_SLOT_CUTOFF - }} - ], - "master": {{ - "id": "{node_ids[0]}", - "ip": "localhost", - "port": {nodes[0].port} - }}, + "slot_ranges": [ {{ "start": 0, "end": LAST_SLOT_CUTOFF }} ], + "master": {{ "id": "{node_ids[0]}", "ip": "localhost", "port": {nodes[0].port} }}, "replicas": [] }}, {{ - "slot_ranges": [ - {{ - "start": NEXT_SLOT_CUTOFF, - "end": 16383 - }} - ], - "master": {{ - "id": "{node_ids[1]}", - "ip": "localhost", - "port": {nodes[1].port} - }}, + "slot_ranges": [ {{ "start": NEXT_SLOT_CUTOFF, "end": 16383 }} ], + "master": {{ "id": "{node_ids[1]}", "ip": "localhost", "port": {nodes[1].port} }}, "replicas": [] }} ] @@ -807,12 +789,13 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory): ) assert "OK" == res - await asyncio.sleep(0.5) - - status = await c_nodes_admin[1].execute_command( - "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[0].admin_port) - ) - assert "STABLE_SYNC" == status + while ( + await c_nodes_admin[1].execute_command( + "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[0].admin_port) + ) + != "STABLE_SYNC" + ): + await asyncio.sleep(0.05) status = await c_nodes_admin[0].execute_command( "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[1].port) @@ -835,5 +818,111 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory): except redis.exceptions.ResponseError as e: assert e.args[0] == "Can't start the migration, another one is in progress" + await push_config( + config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"), + c_nodes_admin, + ) + + await c_nodes_admin[0].close() + await c_nodes_admin[1].close() + + +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory): + # Check data migration from one node to another + nodes = [ + df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) + for i in range(2) + ] + + df_local_factory.start_all(nodes) + + c_nodes = [node.client() for node in nodes] + c_nodes_admin = [node.admin_client() for node in nodes] + + node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes_admin)) + + config = f""" + [ + {{ + "slot_ranges": [ {{ "start": 0, "end": LAST_SLOT_CUTOFF }} ], + "master": {{ "id": "{node_ids[0]}", "ip": "localhost", "port": {nodes[0].port} }}, + "replicas": [] + }}, + {{ + "slot_ranges": [ {{ "start": NEXT_SLOT_CUTOFF, "end": 16383 }} ], + "master": {{ "id": "{node_ids[1]}", "ip": "localhost", "port": {nodes[1].port} }}, + "replicas": [] + }} + ] + """ + + await push_config( + config.replace("LAST_SLOT_CUTOFF", "9000").replace("NEXT_SLOT_CUTOFF", "9001"), + c_nodes_admin, + ) + + assert await c_nodes[0].set("KEY0", "value") + assert await c_nodes[0].set("KEY1", "value") + assert await c_nodes[1].set("KEY2", "value") + assert await c_nodes[1].set("KEY3", "value") + assert await c_nodes[0].set("KEY4", "value") + assert await c_nodes[0].set("KEY5", "value") + assert await c_nodes[1].set("KEY6", "value") + assert await c_nodes[1].set("KEY7", "value") + assert await c_nodes[0].set("KEY8", "value") + assert await c_nodes[0].set("KEY9", "value") + assert await c_nodes[1].set("KEY10", "value") + assert await c_nodes[1].set("KEY11", "value") + assert await c_nodes[0].set("KEY12", "value") + assert await c_nodes[0].set("KEY13", "value") + assert await c_nodes[1].set("KEY14", "value") + assert await c_nodes[1].set("KEY15", "value") + assert await c_nodes[0].set("KEY16", "value") + assert await c_nodes[0].set("KEY17", "value") + assert await c_nodes[1].set("KEY18", "value") + assert await c_nodes[1].set("KEY19", "value") + assert await c_nodes[0].execute_command("DBSIZE") == 10 + + res = await c_nodes_admin[1].execute_command( + "DFLYCLUSTER", "START-SLOT-MIGRATION", "127.0.0.1", str(nodes[0].admin_port), "3000", "9000" + ) + assert "OK" == res + + while ( + await c_nodes_admin[1].execute_command( + "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[0].admin_port) + ) + != "STABLE_SYNC" + ): + await asyncio.sleep(0.05) + + await push_config( + config.replace("LAST_SLOT_CUTOFF", "2999").replace("NEXT_SLOT_CUTOFF", "3000"), + c_nodes_admin, + ) + + assert await c_nodes[0].get("KEY0") == "value" + assert await c_nodes[1].get("KEY1") == "value" + assert await c_nodes[1].get("KEY2") == "value" + assert await c_nodes[1].get("KEY3") == "value" + assert await c_nodes[0].get("KEY4") == "value" + assert await c_nodes[1].get("KEY5") == "value" + assert await c_nodes[1].get("KEY6") == "value" + assert await c_nodes[1].get("KEY7") == "value" + assert await c_nodes[0].get("KEY8") == "value" + assert await c_nodes[1].get("KEY9") == "value" + assert await c_nodes[1].get("KEY10") == "value" + assert await c_nodes[1].get("KEY11") == "value" + assert await c_nodes[1].get("KEY12") == "value" + assert await c_nodes[1].get("KEY13") == "value" + assert await c_nodes[1].get("KEY14") == "value" + assert await c_nodes[1].get("KEY15") == "value" + assert await c_nodes[1].get("KEY16") == "value" + assert await c_nodes[1].get("KEY17") == "value" + assert await c_nodes[1].get("KEY18") == "value" + assert await c_nodes[1].get("KEY19") == "value" + assert await c_nodes[1].execute_command("DBSIZE") == 17 + await c_nodes_admin[0].close() await c_nodes_admin[1].close()