From 43c83d29fafb09a5530e6f72408efec74418d8c4 Mon Sep 17 00:00:00 2001 From: Borys Date: Mon, 25 Nov 2024 16:02:22 +0200 Subject: [PATCH] feat: cluster migrations restarts immediately if timeout happens (#4081) * feat: cluster migrations restarts immediately if timeout happens * feat: add DEBUG MIGRATION PAUSE command --- src/server/cluster/cluster_family.cc | 9 +- src/server/cluster/cluster_family.h | 5 +- src/server/cluster/incoming_slot_migration.cc | 23 +++- src/server/cluster/incoming_slot_migration.h | 2 + src/server/cluster/outgoing_slot_migration.cc | 79 +++++++------- src/server/cluster/outgoing_slot_migration.h | 8 +- src/server/debugcmd.cc | 19 +++- src/server/debugcmd.h | 8 +- src/server/journal/streamer.cc | 2 +- src/server/main_service.cc | 2 +- src/server/main_service.h | 4 + src/server/server_family.cc | 2 +- src/server/server_family.h | 4 - tests/dragonfly/cluster_test.py | 100 ++++++++++++++++-- tests/dragonfly/instance.py | 2 +- 15 files changed, 199 insertions(+), 70 deletions(-) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index c5c0de343..1672bf0f7 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -35,7 +35,6 @@ ABSL_FLAG(std::string, cluster_node_id, "", ABSL_FLAG(bool, managed_service_info, false, "Hides some implementation details from users when true (i.e. in managed service env)"); - ABSL_DECLARE_FLAG(int32_t, port); ABSL_DECLARE_FLAG(uint16_t, announce_port); @@ -1007,6 +1006,14 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) { return builder->SendLong(attempt); } +void ClusterFamily::PauseAllIncomingMigrations(bool pause) { + util::fb2::LockGuard lk(migration_mu_); + LOG_IF(ERROR, incoming_migrations_jobs_.empty()) << "No incoming migrations!"; + for (auto& im : incoming_migrations_jobs_) { + im->Pause(pause); + } +} + using EngineFunc = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx); diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index 429ea0e85..fbd50f3c2 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -44,6 +44,9 @@ class ClusterFamily { return id_; } + // Only for debug purpose. Pause/Resume all incoming migrations + void PauseAllIncomingMigrations(bool pause); + private: using SinkReplyBuilder = facade::SinkReplyBuilder; @@ -64,7 +67,7 @@ class ClusterFamily { // Custom Dragonfly commands for cluster management void DflyCluster(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx); void DflyClusterConfig(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx); - ABSL_LOCKS_EXCLUDED(set_config_mu, migration_mu_); + void DflyClusterGetSlotInfo(CmdArgList args, SinkReplyBuilder* builder) ABSL_LOCKS_EXCLUDED(migration_mu_); void DflyClusterFlushSlots(CmdArgList args, SinkReplyBuilder* builder); diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index 6cfb01f2c..944d6f0bf 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -38,6 +38,10 @@ class ClusterShardMigration { bc_(bc) { } + void Pause(bool pause) { + pause_ = pause; + } + void Start(Context* cntx, util::FiberSocketBase* source) ABSL_LOCKS_EXCLUDED(mu_) { { util::fb2::LockGuard lk(mu_); @@ -56,6 +60,11 @@ class ClusterShardMigration { TransactionReader tx_reader; while (!cntx->IsCancelled()) { + if (pause_) { + ThisFiber::SleepFor(100ms); + continue; + } + auto tx_data = tx_reader.NextTxData(&reader, cntx); if (!tx_data) { in_migration_->ReportError(GenericError("No tx data")); @@ -135,6 +144,7 @@ class ClusterShardMigration { IncomingSlotMigration* in_migration_; util::fb2::BlockingCounter bc_; atomic_long last_attempt_{-1}; + atomic_bool pause_ = false; }; IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, SlotRanges slots, @@ -153,6 +163,13 @@ IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, Slot IncomingSlotMigration::~IncomingSlotMigration() { } +void IncomingSlotMigration::Pause(bool pause) { + VLOG(1) << "Pausing migration " << pause; + for (auto& flow : shard_flows_) { + flow->Pause(pause); + } +} + bool IncomingSlotMigration::Join(long attempt) { const absl::Time start = absl::Now(); const absl::Duration timeout = @@ -161,8 +178,7 @@ bool IncomingSlotMigration::Join(long attempt) { while (true) { const absl::Time now = absl::Now(); const absl::Duration passed = now - start; - VLOG_EVERY_N(1, 1000) << "Checking whether to continue with join " << passed << " vs " - << timeout; + VLOG(1) << "Checking whether to continue with join " << passed << " vs " << timeout; if (passed >= timeout) { LOG(WARNING) << "Can't join migration in time"; ReportError(GenericError("Can't join migration in time")); @@ -198,8 +214,7 @@ void IncomingSlotMigration::Stop() { while (true) { const absl::Time now = absl::Now(); const absl::Duration passed = now - start; - VLOG_EVERY_N(1, 1000) << "Checking whether to continue with stop " << passed << " vs " - << timeout; + VLOG(1) << "Checking whether to continue with stop " << passed << " vs " << timeout; if (bc_->WaitFor(absl::ToInt64Milliseconds(timeout - passed) * 1ms)) { return; diff --git a/src/server/cluster/incoming_slot_migration.h b/src/server/cluster/incoming_slot_migration.h index 02ad3c202..d8af4fce8 100644 --- a/src/server/cluster/incoming_slot_migration.h +++ b/src/server/cluster/incoming_slot_migration.h @@ -59,6 +59,8 @@ class IncomingSlotMigration { size_t GetKeyCount() const; + void Pause(bool pause); + private: std::string source_id_; Service& service_; diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 9367b8424..b77c44c0b 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -31,12 +31,14 @@ namespace dfly::cluster { class OutgoingMigration::SliceSlotMigration : private ProtocolClient { public: SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots, - journal::Journal* journal) + journal::Journal* journal, OutgoingMigration* om) : ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &cntx_) { + cntx_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); }); } ~SliceSlotMigration() { streamer_.Cancel(); + cntx_.JoinErrorHandler(); } // Send DFLYMIGRATE FLOW @@ -107,6 +109,7 @@ OutgoingMigration::OutgoingMigration(MigrationInfo info, ClusterFamily* cf, Serv OutgoingMigration::~OutgoingMigration() { main_sync_fb_.JoinIfNeeded(); + cntx_.JoinErrorHandler(); // Destroy each flow in its dedicated thread, because we could be the last // owner of the db tables OnAllShards([](auto& migration) { migration.reset(); }); @@ -131,9 +134,18 @@ void OutgoingMigration::OnAllShards( }); } -void OutgoingMigration::Finish(bool is_error) { - VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << " : " - << migration_info_.node_info.id; +void OutgoingMigration::Finish(GenericError error) { + auto next_state = MigrationState::C_FINISHED; + if (error) { + next_state = MigrationState::C_ERROR; + VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << ": " + << migration_info_.node_info.id << " with error: " << error.Format(); + cntx_.ReportError(std::move(error)); + } else { + VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << ": " + << migration_info_.node_info.id; + } + bool should_cancel_flows = false; { @@ -151,8 +163,7 @@ void OutgoingMigration::Finish(bool is_error) { should_cancel_flows = true; break; } - - state_ = is_error ? MigrationState::C_ERROR : MigrationState::C_FINISHED; + state_ = next_state; } if (should_cancel_flows) { @@ -160,6 +171,7 @@ void OutgoingMigration::Finish(bool is_error) { CHECK(migration != nullptr); migration->Cancel(); }); + cntx_.JoinErrorHandler(); } } @@ -185,15 +197,15 @@ void OutgoingMigration::SyncFb() { ThisFiber::SleepFor(1000ms); // wait some time before next retry } - VLOG(2) << "Connecting to source"; + VLOG(1) << "Connecting to target node"; auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms; if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) { - VLOG(1) << "Can't connect to source"; + LOG(WARNING) << "Can't connect to taget node"; cntx_.ReportError(GenericError(ec, "Couldn't connect to source.")); continue; } - VLOG(2) << "Migration initiating"; + VLOG(1) << "Migration initiating"; ResetParser(false); auto cmd = absl::StrCat("DFLYMIGRATE INIT ", cf_->MyID(), " ", slot_migrations_.size()); for (const auto& s : migration_info_.slot_ranges) { @@ -201,7 +213,7 @@ void OutgoingMigration::SyncFb() { } if (auto ec = SendCommandAndReadResponse(cmd); ec) { - VLOG(1) << "Unable to initialize migration"; + LOG(WARNING) << "Can't connect to taget node"; cntx_.ReportError(GenericError(ec, "Could not send INIT command.")); continue; } @@ -211,7 +223,7 @@ void OutgoingMigration::SyncFb() { VLOG(2) << "Target node does not recognize migration; retrying"; ThisFiber::SleepFor(1000ms); } else { - VLOG(1) << "Unable to initialize migration"; + LOG(WARNING) << "Unable to initialize migration"; cntx_.ReportError(GenericError(std::string(ToSV(LastResponseArgs().front().GetBuf())))); } continue; @@ -221,7 +233,7 @@ void OutgoingMigration::SyncFb() { DbSlice& db_slice = namespaces->GetDefaultNamespace().GetCurrentDbSlice(); server_family_->journal()->StartInThread(); migration = std::make_unique( - &db_slice, server(), migration_info_.slot_ranges, server_family_->journal()); + &db_slice, server(), migration_info_.slot_ranges, server_family_->journal(), this); }); if (!ChangeState(MigrationState::C_SYNC)) { @@ -229,8 +241,7 @@ void OutgoingMigration::SyncFb() { } OnAllShards([this](auto& migration) { migration->PrepareFlow(cf_->MyID()); }); - if (CheckFlowsForErrors()) { - LOG(WARNING) << "Preparation error detected, retrying outgoing migration"; + if (cntx_.GetError()) { continue; } @@ -241,14 +252,13 @@ void OutgoingMigration::SyncFb() { OnAllShards([](auto& migration) { migration->PrepareSync(); }); } - OnAllShards([this](auto& migration) { - migration->RunSync(); - if (migration->GetError()) - Finish(true); - }); + if (cntx_.GetError()) { + continue; + } - if (CheckFlowsForErrors()) { - LOG(WARNING) << "Errors detected, retrying outgoing migration"; + OnAllShards([this](auto& migration) { migration->RunSync(); }); + + if (cntx_.GetError()) { continue; } @@ -258,8 +268,7 @@ void OutgoingMigration::SyncFb() { VLOG(1) << "Waiting for migration to finalize..."; ThisFiber::SleepFor(500ms); } - if (CheckFlowsForErrors()) { - LOG(WARNING) << "Errors detected, retrying outgoing migration"; + if (cntx_.GetError()) { continue; } break; @@ -273,14 +282,13 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { // reconnect and ACK one more time VLOG(1) << "FinalizeMigration for " << cf_->MyID() << " : " << migration_info_.node_info.id; if (attempt > 1) { - if (CheckFlowsForErrors()) { - Finish(true); + if (cntx_.GetError()) { return true; } VLOG(1) << "Reconnecting to source"; auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms; if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) { - cntx_.ReportError(GenericError(ec, "Couldn't connect to source.")); + LOG(WARNING) << "Couldn't connect to source."; return false; } } @@ -291,8 +299,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { bool is_block_active = true; auto is_pause_in_progress = [&is_block_active] { return is_block_active; }; auto pause_fb_opt = - Pause(server_family_->GetNonPriviligedListeners(), &namespaces->GetDefaultNamespace(), - nullptr, ClientPause::WRITE, is_pause_in_progress); + dfly::Pause(server_family_->GetNonPriviligedListeners(), &namespaces->GetDefaultNamespace(), + nullptr, ClientPause::WRITE, is_pause_in_progress); if (!pause_fb_opt) { LOG(WARNING) << "Cluster migration finalization time out"; @@ -346,9 +354,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { } } - auto is_error = CheckFlowsForErrors(); - Finish(is_error); - if (!is_error) { + if (!cntx_.GetError()) { + Finish(); keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges); cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges, false); @@ -366,16 +373,6 @@ void OutgoingMigration::Start() { main_sync_fb_ = fb2::Fiber("outgoing_migration", &OutgoingMigration::SyncFb, this); } -bool OutgoingMigration::CheckFlowsForErrors() { - for (const auto& flow : slot_migrations_) { - if (flow->GetError()) { - cntx_.ReportError(flow->GetError()); - return true; - } - } - return false; -} - size_t OutgoingMigration::GetKeyCount() const { util::fb2::LockGuard lk(state_mu_); if (state_ == MigrationState::C_FINISHED) { diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index d0bf6929d..d1ce69f15 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -30,9 +30,10 @@ class OutgoingMigration : private ProtocolClient { // start migration process, sends INIT command to the target node void Start(); - // mark migration as FINISHED and cancel migration if it's not finished yet + // if is_error = false mark migration as FINISHED and cancel migration if it's not finished yet // can be called from any thread, but only after Start() - void Finish(bool is_error = false) ABSL_LOCKS_EXCLUDED(state_mu_); + // if is_error = true and migration is in progress it will be restarted otherwise nothing happens + void Finish(GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_); MigrationState GetState() const ABSL_LOCKS_EXCLUDED(state_mu_); @@ -65,9 +66,6 @@ class OutgoingMigration : private ProtocolClient { // should be run for all shards void StartFlow(journal::Journal* journal, io::Sink* dest); - // if we have an error reports it into cntx_ and return true - bool CheckFlowsForErrors(); - MigrationState GetStateImpl() const; // SliceSlotMigration manages state and data transfering for the corresponding shard class SliceSlotMigration; diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 809f28100..4d7d158bf 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -367,7 +367,8 @@ OpResult EstimateCompression(ConnectionContext* cntx, string_ } // namespace -DebugCmd::DebugCmd(ServerFamily* owner, ConnectionContext* cntx) : sf_(*owner), cntx_(cntx) { +DebugCmd::DebugCmd(ServerFamily* owner, cluster::ClusterFamily* cf, ConnectionContext* cntx) + : sf_(*owner), cf_(*cf), cntx_(cntx) { } void DebugCmd::Run(CmdArgList args, facade::SinkReplyBuilder* builder) { @@ -437,6 +438,10 @@ void DebugCmd::Run(CmdArgList args, facade::SinkReplyBuilder* builder) { return Replica(args, builder); } + if (subcmd == "MIGRATION" && args.size() == 2) { + return Migration(args, builder); + } + if (subcmd == "WATCHED") { return Watched(builder); } @@ -550,6 +555,18 @@ void DebugCmd::Replica(CmdArgList args, facade::SinkReplyBuilder* builder) { return builder->SendError(UnknownSubCmd("replica", "DEBUG")); } +void DebugCmd::Migration(CmdArgList args, facade::SinkReplyBuilder* builder) { + args.remove_prefix(1); + + string opt = absl::AsciiStrToUpper(ArgS(args, 0)); + + if (opt == "PAUSE" || opt == "RESUME") { + cf_.PauseAllIncomingMigrations(opt == "PAUSE"); + return builder->SendOk(); + } + return builder->SendError(UnknownSubCmd("MIGRATION", "DEBUG")); +} + optional DebugCmd::ParsePopulateArgs(CmdArgList args, facade::SinkReplyBuilder* builder) { if (args.size() < 2) { diff --git a/src/server/debugcmd.h b/src/server/debugcmd.h index f20629f79..72be36ff6 100644 --- a/src/server/debugcmd.h +++ b/src/server/debugcmd.h @@ -9,6 +9,10 @@ namespace dfly { +namespace cluster { +class ClusterFamily; +} + class EngineShardSet; class ServerFamily; @@ -26,7 +30,7 @@ class DebugCmd { }; public: - DebugCmd(ServerFamily* owner, ConnectionContext* cntx); + DebugCmd(ServerFamily* owner, cluster::ClusterFamily* cf, ConnectionContext* cntx); void Run(CmdArgList args, facade::SinkReplyBuilder* builder); @@ -40,6 +44,7 @@ class DebugCmd { void Reload(CmdArgList args, facade::SinkReplyBuilder* builder); void Replica(CmdArgList args, facade::SinkReplyBuilder* builder); + void Migration(CmdArgList args, facade::SinkReplyBuilder* builder); void Exec(facade::SinkReplyBuilder* builder); void Inspect(std::string_view key, CmdArgList args, facade::SinkReplyBuilder* builder); @@ -52,6 +57,7 @@ class DebugCmd { void RecvSize(std::string_view param, facade::SinkReplyBuilder* builder); ServerFamily& sf_; + cluster::ClusterFamily& cf_; ConnectionContext* cntx_; }; diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index e8e142d49..b3f82758e 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -167,7 +167,7 @@ void JournalStreamer::ThrottleIfNeeded() { if (status == std::cv_status::timeout) { LOG(WARNING) << "Stream timed out, inflight bytes/sent start: " << inflight_start << "/" << sent_start << ", end: " << in_flight_bytes_ << "/" << total_sent_; - cntx_->ReportError(make_error_code(errc::stream_timeout)); + cntx_->ReportError("JournalStreamer write operation timeout"); } } diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 860ebce1d..e3d85177b 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -865,7 +865,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector acl_family_.Init(listeners.front(), &user_registry_); } - // Initialize shard_set with a global callback running once in a while in the shard threads. + // Initialize shard_set with a callback running once in a while in the shard threads. shard_set->Init(shard_num, [this] { server_family_.GetDflyCmd()->BreakStalledFlowsInShard(); server_family_.UpdateMemoryGlobalStats(); diff --git a/src/server/main_service.h b/src/server/main_service.h index fe1dc1a29..f458b7f29 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -114,6 +114,10 @@ class Service : public facade::ServiceInterface { return server_family_; } + cluster::ClusterFamily& cluster_family() { + return cluster_family_; + } + // Utility function used in unit tests // Do not use in production, only meant to be used by unit tests const acl::AclFamily* TestInit(); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index dd01f41e6..8186ef8b7 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1989,7 +1989,7 @@ void ServerFamily::Config(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu void ServerFamily::Debug(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) { - DebugCmd dbg_cmd{this, cntx}; + DebugCmd dbg_cmd{this, &service_.cluster_family(), cntx}; return dbg_cmd.Run(args, builder); } diff --git a/src/server/server_family.h b/src/server/server_family.h index 92f47d286..5d7302a2b 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -39,10 +39,6 @@ namespace journal { class Journal; } // namespace journal -namespace cluster { -class ClusterFamily; -} - class ConnectionContext; class CommandRegistry; class Service; diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index f12b4f848..d3454d8ab 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -13,6 +13,7 @@ from .replication_test import check_all_replicas_finished from redis.cluster import RedisCluster from redis.cluster import ClusterNode from .proxy import Proxy +from .seeder import SeederBase from .seeder import StaticSeeder from . import dfly_args @@ -152,9 +153,30 @@ async def wait_for_status(admin_client, node_id, status, timeout=10): "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_id ) + if not isinstance(status, list): + status = [status] + async for states, breaker in tick_timer(get_status, timeout=timeout): with breaker: - assert len(states) != 0 and all(status == state[2] for state in states), states + assert len(states) != 0 and all(state[2] in status for state in states), states + + +async def wait_for_error(admin_client, node_id, error, timeout=10): + get_status = lambda: admin_client.execute_command( + "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_id + ) + + async for states, breaker in tick_timer(get_status, timeout=timeout): + with breaker: + assert len(states) != 0 and all(error == state[4] for state in states), states + + +async def wait_for_migration_start(admin_client, node_id): + while ( + len(await admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_id)) + == 0 + ): + await asyncio.sleep(0.1) async def check_for_no_state_status(admin_clients): @@ -1281,9 +1303,8 @@ async def test_network_disconnect_during_migration(df_factory, df_seeder_factory await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) - seeder = df_seeder_factory.create(keys=30000, port=nodes[0].instance.port, cluster_mode=True) - - await seeder.run(target_deviation=0.1) + await StaticSeeder(key_target=200000).run(nodes[0].client) + start_capture = await StaticSeeder.capture(nodes[0].client) proxy = Proxy("127.0.0.1", 1111, "127.0.0.1", nodes[1].instance.admin_port) await proxy.start() @@ -1295,23 +1316,26 @@ async def test_network_disconnect_during_migration(df_factory, df_seeder_factory await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) for _ in range(10): - await asyncio.sleep(random.randint(0, 10) / 20) + await asyncio.sleep(random.randint(0, 10) / 100) logging.debug("drop connections") proxy.drop_connection() logging.debug( await nodes[0].admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") ) finally: + await wait_for_status(nodes[0].admin_client, nodes[1].id, "SYNC") await proxy.close(task) + await proxy.start() + + await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", 20) nodes[0].migrations = [] nodes[0].slots = [] nodes[1].slots = [(0, 16383)] logging.debug("remove finished migrations") await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) - capture = await seeder.capture() - assert await seeder.compare(capture, nodes[1].instance.port) + assert (await StaticSeeder.capture(nodes[1].client)) == start_capture @pytest.mark.parametrize( @@ -2205,7 +2229,7 @@ async def test_replicate_disconnect_redis_cluster(redis_cluster, df_factory, df_ @pytest.mark.skip("Takes more than 10 minutes") -@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +@dfly_args({"cluster_mode": "yes"}) async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFactory): # Check data migration from one node to another instances = [ @@ -2261,3 +2285,63 @@ async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFact await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) await check_for_no_state_status([node.admin_client for node in nodes]) + + +@pytest.mark.skip("Flaky") +@pytest.mark.asyncio +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) +async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_seeder_factory): + # Timeout set to 3 seconds because we must first saturate the socket before we get the timeout + instances = [ + df_factory.create( + port=BASE_PORT + i, + admin_port=BASE_PORT + i + 1000, + replication_timeout=3000, + vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=9", + ) + for i in range(2) + ] + + df_factory.start_all(instances) + + nodes = [(await create_node_info(instance)) for instance in instances] + nodes[0].slots = [(0, 16383)] + nodes[1].slots = [] + + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + logging.debug("source node DEBUG POPULATE") + await nodes[0].client.execute_command("debug", "populate", "100000", "foo", "5000") + + # await StaticSeeder(key_target=200000, data_size=1000).run(nodes[0].client) + start_capture = await StaticSeeder.capture(nodes[0].client) + + logging.debug("Start migration") + nodes[0].migrations.append( + MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16383)], nodes[1].id) + ) + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + await asyncio.sleep(random.randint(0, 50) / 100) + await wait_for_migration_start(nodes[1].admin_client, nodes[0].id) + + logging.debug("debug migration pause") + await nodes[1].client.execute_command("debug migration pause") + + await wait_for_error( + nodes[0].admin_client, nodes[1].id, "JournalStreamer write operation timeout" + ) + + logging.debug("debug migration resume") + await nodes[1].client.execute_command("debug migration resume") + + await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED") + await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED") + + nodes[0].migrations = [] + nodes[0].slots = [] + nodes[1].slots = [(0, 16383)] + + await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) + + assert (await StaticSeeder.capture(nodes[1].client)) == start_capture diff --git a/tests/dragonfly/instance.py b/tests/dragonfly/instance.py index 447c07391..18a0bc832 100644 --- a/tests/dragonfly/instance.py +++ b/tests/dragonfly/instance.py @@ -154,7 +154,7 @@ class DflyInstance: async def close_clients(self): for client in self.clients: - await client.aclose() if hasattr(client, "aclose") else client.close() + await client.aclose() if hasattr(client, "aclose") else await client.close() def __enter__(self): self.start()