From f66ee5f47deec94e53c8f3302005186c4c2711b6 Mon Sep 17 00:00:00 2001 From: Shahar Mike Date: Thu, 20 Jun 2024 11:40:23 +0300 Subject: [PATCH] fix(cluster): Support `FLUSHALL` while slot migration is in progress (#3173) * fix(cluster): Support `FLUSHALL` while slot migration is in progress Fixes #3132 Also do a small refactor to move cancellation logic into `RestoreStreamer`. --- src/server/cluster/cluster_family.cc | 12 +-- src/server/cluster/outgoing_slot_migration.cc | 14 +-- src/server/journal/journal_slice.cc | 1 + src/server/journal/streamer.cc | 22 ++++- src/server/journal/streamer.h | 10 +- src/server/journal/types.h | 1 + tests/dragonfly/cluster_test.py | 97 ++++++++++++++----- 7 files changed, 111 insertions(+), 46 deletions(-) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index a69525c81..b50765160 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -791,15 +791,11 @@ bool RemoveIncomingMigrationImpl(std::vector(removed.ToSlotRanges()); + auto removed_ranges = removed.ToSlotRanges(); LOG_IF(WARNING, migration->GetState() == MigrationState::C_FINISHED) << "Flushing slots of removed FINISHED migration " << migration->GetSourceID() - << ", slots: " << SlotRange::ToString(*removed_ranges); - shard_set->pool()->DispatchOnAll([removed_ranges](unsigned, ProactorBase*) { - if (EngineShard* shard = EngineShard::tlocal(); shard) { - shard->db_slice().FlushSlots(*removed_ranges); - } - }); + << ", slots: " << SlotRange::ToString(removed_ranges); + DeleteSlots(removed_ranges); } return true; @@ -844,7 +840,7 @@ void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) { lock_guard lk(migration_mu_); auto was_removed = RemoveIncomingMigrationImpl(incoming_migrations_jobs_, source_id); - LOG_IF(WARNING, was_removed) << "Reinit was happen for migration from:" << source_id; + LOG_IF(WARNING, was_removed) << "Reinit issued for migration from:" << source_id; incoming_migrations_jobs_.emplace_back(make_shared( std::move(source_id), &server_family_->service(), std::move(slots), flows_num)); diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 7261138a6..3920f6977 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -58,17 +58,11 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { return; } - // Check if migration was cancelled while we yielded so far. - if (cancelled_) { - return; - } - streamer_.Start(Sock()); } void Cancel() { streamer_.Cancel(); - cancelled_ = true; } void Finalize() { @@ -81,7 +75,6 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { private: RestoreStreamer streamer_; - bool cancelled_ = false; }; OutgoingMigration::OutgoingMigration(MigrationInfo info, ClusterFamily* cf, ServerFamily* sf) @@ -94,6 +87,13 @@ OutgoingMigration::OutgoingMigration(MigrationInfo info, ClusterFamily* cf, Serv OutgoingMigration::~OutgoingMigration() { main_sync_fb_.JoinIfNeeded(); + + // Destroy each flow in its dedicated thread, because we could be the last owner of the db tables + shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) { + if (const auto* shard = EngineShard::tlocal(); shard) { + slot_migrations_[shard->shard_id()].reset(); + } + }); } bool OutgoingMigration::ChangeState(MigrationState new_state) { diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index f7c068f46..e916a09a6 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -165,6 +165,7 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) { item = &dummy; item->opcode = entry.opcode; item->lsn = lsn_++; + item->cmd = entry.payload.cmd; item->slot = entry.slot; io::BufSink buf_sink{&ring_serialize_buf_}; diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index c00c2ae87..a3f42b9ab 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -34,7 +34,7 @@ uint32_t replication_stream_output_limit_cached = 64_KB; } // namespace JournalStreamer::JournalStreamer(journal::Journal* journal, Context* cntx) - : journal_(journal), cntx_(cntx) { + : cntx_(cntx), journal_(journal) { // cache the flag to avoid accessing it later. replication_stream_output_limit_cached = absl::GetFlag(FLAGS_replication_stream_output_limit); } @@ -44,7 +44,7 @@ JournalStreamer::~JournalStreamer() { VLOG(1) << "~JournalStreamer"; } -void JournalStreamer::Start(io::AsyncSink* dest, bool send_lsn) { +void JournalStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) { CHECK(dest_ == nullptr && dest != nullptr); dest_ = dest; journal_cb_id_ = @@ -188,9 +188,13 @@ RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal Context* cntx) : JournalStreamer(journal, cntx), db_slice_(slice), my_slots_(std::move(slots)) { DCHECK(slice != nullptr); + db_array_ = slice->databases(); // Inc ref to make sure DB isn't deleted while we use it } -void RestoreStreamer::Start(io::AsyncSink* dest, bool send_lsn) { +void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) { + if (fiber_cancelled_) + return; + VLOG(1) << "RestoreStreamer start"; auto db_cb = absl::bind_front(&RestoreStreamer::OnDbChange, this); snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb)); @@ -199,7 +203,7 @@ void RestoreStreamer::Start(io::AsyncSink* dest, bool send_lsn) { PrimeTable::Cursor cursor; uint64_t last_yield = 0; - PrimeTable* pt = &db_slice_->databases()[0]->prime; + PrimeTable* pt = &db_array_[0]->prime; do { if (fiber_cancelled_) @@ -244,14 +248,22 @@ RestoreStreamer::~RestoreStreamer() { void RestoreStreamer::Cancel() { auto sver = snapshot_version_; snapshot_version_ = 0; // to prevent double cancel in another fiber + fiber_cancelled_ = true; if (sver != 0) { - fiber_cancelled_ = true; db_slice_->UnregisterOnChange(sver); JournalStreamer::Cancel(); } } bool RestoreStreamer::ShouldWrite(const journal::JournalItem& item) const { + if (item.cmd == "FLUSHALL" || item.cmd == "FLUSHDB") { + // On FLUSH* we restart the migration + CHECK(dest_ != nullptr); + cntx_->ReportError("FLUSH command during migration"); + dest_->Shutdown(SHUT_RDWR); + return false; + } + if (!item.slot.has_value()) { return false; } diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 7cb8b34bf..aa61f24fe 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -23,7 +23,7 @@ class JournalStreamer { JournalStreamer(JournalStreamer&& other) = delete; // Register journal listener and start writer in fiber. - virtual void Start(io::AsyncSink* dest, bool send_lsn); + virtual void Start(util::FiberSocketBase* dest, bool send_lsn); // Must be called on context cancellation for unblocking // and manual cleanup. @@ -48,6 +48,9 @@ class JournalStreamer { void WaitForInflightToComplete(); + util::FiberSocketBase* dest_ = nullptr; + Context* cntx_; + private: void OnCompletion(std::error_code ec, size_t len); @@ -58,8 +61,6 @@ class JournalStreamer { bool IsStalled() const; journal::Journal* journal_; - Context* cntx_; - io::AsyncSink* dest_ = nullptr; std::vector pending_buf_; size_t in_flight_bytes_ = 0; time_t last_lsn_time_ = 0; @@ -74,7 +75,7 @@ class RestoreStreamer : public JournalStreamer { RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal, Context* cntx); ~RestoreStreamer() override; - void Start(io::AsyncSink* dest, bool send_lsn = false) override; + void Start(util::FiberSocketBase* dest, bool send_lsn = false) override; // Cancel() must be called if Start() is called void Cancel() override; @@ -96,6 +97,7 @@ class RestoreStreamer : public JournalStreamer { void WriteCommand(journal::Entry::Payload cmd_payload); DbSlice* db_slice_; + DbTableArray db_array_; uint64_t snapshot_version_ = 0; cluster::SlotSet my_slots_; bool fiber_cancelled_ = false; diff --git a/src/server/journal/types.h b/src/server/journal/types.h index aeb0286ca..63c35b9be 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -95,6 +95,7 @@ struct JournalItem { LSN lsn; Op opcode; std::string data; + std::string_view cmd; std::optional slot; }; diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 85f8d058c..9903aef28 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -19,6 +19,16 @@ from . import dfly_args BASE_PORT = 30001 +async def assert_eventually(e): + iterations = 0 + while True: + if await e(): + return + iterations += 1 + assert iterations < 500 + await asyncio.sleep(0.1) + + class RedisClusterNode: def __init__(self, port): self.port = port @@ -1026,6 +1036,59 @@ async def test_config_consistency(df_local_factory: DflyInstanceFactory): await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes]) +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +async def test_cluster_flushall_during_migration( + df_local_factory: DflyInstanceFactory, df_seeder_factory +): + # Check data migration from one node to another + instances = [ + df_local_factory.create( + port=BASE_PORT + i, + admin_port=BASE_PORT + i + 1000, + vmodule="cluster_family=9,cluster_slot_migration=9,outgoing_slot_migration=9", + logtostdout=True, + ) + for i in range(2) + ] + + df_local_factory.start_all(instances) + + nodes = [(await create_node_info(instance)) for instance in instances] + nodes[0].slots = [(0, 16383)] + nodes[1].slots = [] + + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + seeder = df_seeder_factory.create(keys=10_000, port=nodes[0].instance.port, cluster_mode=True) + await seeder.run(target_deviation=0.1) + + nodes[0].migrations.append( + MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16383)], nodes[1].id) + ) + + logging.debug("Start migration") + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + await nodes[0].client.execute_command("flushall") + + assert "FINISHED" not in await nodes[1].admin_client.execute_command( + "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id + ), "Weak test case - finished migration too early" + + await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED") + + logging.debug("Finalizing migration") + nodes[0].migrations = [] + nodes[0].slots = [] + nodes[1].slots = [(0, 16383)] + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + logging.debug("Migration finalized") + + assert await nodes[0].client.dbsize() == 0 + + await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes]) + + @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory): # Check data migration from one node to another @@ -1065,12 +1128,12 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory): await nodes[0].admin_client.execute_command( "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[1].id ) - ).startswith(f"""out {nodes[1].id} FINISHED keys:7""") + ).startswith(f"out {nodes[1].id} FINISHED keys:7") assert ( await nodes[1].admin_client.execute_command( "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", nodes[0].id ) - ).startswith(f"""in {nodes[0].id} FINISHED keys:7""") + ).startswith(f"in {nodes[0].id} FINISHED keys:7") nodes[0].migrations = [] nodes[0].slots = [(0, 2999)] @@ -1232,23 +1295,15 @@ async def test_cluster_fuzzymigration( logging.debug("start migrations") await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) - iterations = 0 - while True: - is_all_finished = True + async def all_finished(): for node in nodes: states = await node.admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") logging.debug(states) - is_all_finished = is_all_finished and ( - all("FINISHED" in s for s in states) or states == "NO_STATE" - ) + if not all("FINISHED" in s for s in states) or states == "NO_STATE": + return False + return True - if is_all_finished: - break - - iterations += 1 - assert iterations < 500 - - await asyncio.sleep(0.1) + await assert_eventually(all_finished) for counter in counters: counter.cancel() @@ -1360,13 +1415,11 @@ async def test_cluster_migration_cancel(df_local_factory: DflyInstanceFactory): nodes[0].migrations = [] await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) assert SIZE == await nodes[0].client.dbsize() - while True: - db_size = await nodes[1].client.dbsize() - if 0 == db_size: - break - logging.debug(f"target dbsize is {db_size}") - logging.debug(await nodes[1].client.execute_command("KEYS", "*")) - await asyncio.sleep(0.1) + + async def node1size0(): + return await nodes[1].client.dbsize() == 0 + + await assert_eventually(node1size0) logging.debug("Reissuing migration") nodes[0].migrations.append(