diff --git a/src/server/cluster/cluster_config.h b/src/server/cluster/cluster_config.h index 73866219b..3c5844572 100644 --- a/src/server/cluster/cluster_config.h +++ b/src/server/cluster/cluster_config.h @@ -102,6 +102,10 @@ class ClusterConfig { std::vector GetFinishedIncomingMigrations( const std::shared_ptr& prev) const; + std::vector GetIncomingMigrations() const { + return my_incoming_migrations_; + } + private: struct SlotEntry { const ClusterShardInfo* shard = nullptr; diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index fbde4ae22..f12217e30 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -834,6 +834,14 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) { } VLOG(1) << "DFLYMIGRATE ACK" << args; + auto in_migrations = tl_cluster_config->GetIncomingMigrations(); + auto m_it = std::find_if(in_migrations.begin(), in_migrations.end(), + [source_id](const auto& m) { return m.node_id == source_id; }); + if (m_it == in_migrations.end()) { + LOG(WARNING) << "migration isn't in config"; + return cntx->SendLong(OutgoingMigration::kInvalidAttempt); + } + auto migration = GetIncomingMigration(source_id); if (!migration) return cntx->SendError(kIdNotFound); @@ -847,8 +855,9 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) { } UpdateConfig(migration->GetSlots(), true); + VLOG(1) << "Config is updated for " << MyID(); - cntx->SendLong(attempt); + return cntx->SendLong(attempt); } using EngineFunc = void (ClusterFamily::*)(CmdArgList args, ConnectionContext* cntx); diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index 3a04d92ee..485b02eb3 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -95,6 +95,7 @@ void IncomingSlotMigration::Join() { void IncomingSlotMigration::StartFlow(uint32_t shard, io::Source* source) { VLOG(1) << "Start flow for shard: " << shard; + state_.store(MigrationState::C_SYNC); shard_flows_[shard]->Start(&cntx_, source); bc_->Dec(); diff --git a/src/server/cluster/incoming_slot_migration.h b/src/server/cluster/incoming_slot_migration.h index 4d7fceddb..304a8b14b 100644 --- a/src/server/cluster/incoming_slot_migration.h +++ b/src/server/cluster/incoming_slot_migration.h @@ -25,7 +25,7 @@ class IncomingSlotMigration { void Join(); MigrationState GetState() const { - return state_; + return state_.load(); } const SlotRanges& GetSlots() const { @@ -41,7 +41,7 @@ class IncomingSlotMigration { Service& service_; std::vector> shard_flows_; SlotRanges slots_; - MigrationState state_ = MigrationState::C_NO_STATE; + std::atomic state_ = MigrationState::C_NO_STATE; Context cntx_; util::fb2::BlockingCounter bc_; diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index a9ff610bd..bc571c154 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -17,8 +17,7 @@ #include "server/journal/streamer.h" #include "server/server_family.h" -ABSL_FLAG(int, source_connect_timeout_ms, 20000, - "Timeout for establishing connection to a source node"); +ABSL_FLAG(int, slot_migration_connection_timeout_ms, 2000, "Timeout for network operations"); using namespace std; using namespace facade; @@ -37,7 +36,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { } std::error_code Start(const std::string& node_id, uint32_t shard_id) { - RETURN_ON_ERR(ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_)); + RETURN_ON_ERR( + ConnectAndAuth(absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms, &cntx_)); ResetParser(/*server_mode=*/false); std::string cmd = absl::StrCat("DFLYMIGRATE FLOW ", node_id, " ", shard_id); @@ -86,6 +86,7 @@ MigrationState OutgoingMigration::GetState() const { } void OutgoingMigration::SyncFb() { + state_.store(MigrationState::C_SYNC); auto start_cb = [this](util::ProactorBase* pb) { if (auto* shard = EngineShard::tlocal(); shard) { server_family_->journal()->StartInThread(); @@ -146,10 +147,10 @@ bool OutgoingMigration::FinishMigration(long attempt) { LOG_IF(WARNING, err) << err; if (!err) { - long attempt_res = -1; + long attempt_res = kInvalidAttempt; do { // we can have response from previos time so we need to read until get response for the // last attempt - auto resp = ReadRespReply(absl::GetFlag(FLAGS_source_connect_timeout_ms)); + auto resp = ReadRespReply(absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms)); if (!resp) { LOG(WARNING) << resp.error(); @@ -163,6 +164,9 @@ bool OutgoingMigration::FinishMigration(long attempt) { return false; } attempt_res = get(LastResponseArgs().front().u); + if (attempt_res == kInvalidAttempt) { + return false; + } } while (attempt_res != attempt); shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) { @@ -172,6 +176,7 @@ bool OutgoingMigration::FinishMigration(long attempt) { state_.store(MigrationState::C_FINISHED); cf_->UpdateConfig(migration_info_.slot_ranges, false); + VLOG(1) << "Config is updated for " << cf_->MyID(); return true; } else { // TODO implement connection issue error processing @@ -196,7 +201,7 @@ std::error_code OutgoingMigration::Start(ConnectionContext* cntx) { RETURN_ON_ERR(check_connection_error(ec, "could not resolve host dns")); VLOG(1) << "Connecting to source"; - ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_); + ec = ConnectAndAuth(absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms, &cntx_); RETURN_ON_ERR(check_connection_error(ec, "couldn't connect to source")); VLOG(1) << "Migration initiating"; diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index 048bbb046..da0c78c51 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -49,6 +49,8 @@ class OutgoingMigration : private ProtocolClient { return migration_info_; } + static constexpr long kInvalidAttempt = -1; + private: MigrationState GetStateImpl() const; // SliceSlotMigration manages state and data transfering for the corresponding shard diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 0ece4391c..88165fe0b 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -892,9 +892,8 @@ async def test_cluster_native_client( await close_clients(client, *c_masters, *c_masters_admin, *c_replicas, *c_replicas_admin) -@pytest.mark.skip(reason="Test needs refactoring because of cluster design change") @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) -async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory): +async def test_config_consistency(df_local_factory: DflyInstanceFactory): # Check slot migration from one node to another nodes = [ df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) @@ -928,10 +927,8 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory): c_nodes_admin, ) - status = await c_nodes_admin[1].execute_command( - "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[0].admin_port) - ) - assert "NO_STATE" == status + for node in c_nodes_admin: + assert await node.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == "NO_STATE" migation_config = f""" [ @@ -950,12 +947,47 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory): ] """ + # push config only to source node await push_config( migation_config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"), + [c_nodes_admin[0]], + ) + + assert await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == [ + f"""out {node_ids[1]} SYNC keys:0""" + ] + + assert await c_nodes_admin[1].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == [ + f"""in {node_ids[0]} SYNC keys:0""" + ] + + # migration shouldn't be finished until we set the same config to target node + await asyncio.sleep(0.5) + + # push config to target node + await push_config( + migation_config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"), + [c_nodes_admin[1]], + ) + + while "FINISHED" not in await c_nodes_admin[1].execute_command( + "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[0] + ): + logging.debug("SLOT-MIGRATION-STATUS is not FINISHED") + await asyncio.sleep(0.05) + + assert await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == [ + f"""out {node_ids[1]} FINISHED keys:0""" + ] + + # remove finished migrations + await push_config( + config.replace("LAST_SLOT_CUTOFF", "5199").replace("NEXT_SLOT_CUTOFF", "5200"), c_nodes_admin, ) - # TODO add a check for correct results after the same config apply + for node in c_nodes_admin: + assert await node.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == "NO_STATE" await close_clients(*c_nodes, *c_nodes_admin)