feat: retry ACK if the configs are different #2833 (#2906)

* feat: retry ACK if the configs are different #2833
This commit is contained in:
Borys 2024-04-16 15:03:30 +03:00 committed by GitHub
parent f58ded488c
commit d99b0eda16
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 69 additions and 16 deletions

View file

@ -102,6 +102,10 @@ class ClusterConfig {
std::vector<MigrationInfo> GetFinishedIncomingMigrations(
const std::shared_ptr<ClusterConfig>& prev) const;
std::vector<MigrationInfo> GetIncomingMigrations() const {
return my_incoming_migrations_;
}
private:
struct SlotEntry {
const ClusterShardInfo* shard = nullptr;

View file

@ -834,6 +834,14 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) {
}
VLOG(1) << "DFLYMIGRATE ACK" << args;
auto in_migrations = tl_cluster_config->GetIncomingMigrations();
auto m_it = std::find_if(in_migrations.begin(), in_migrations.end(),
[source_id](const auto& m) { return m.node_id == source_id; });
if (m_it == in_migrations.end()) {
LOG(WARNING) << "migration isn't in config";
return cntx->SendLong(OutgoingMigration::kInvalidAttempt);
}
auto migration = GetIncomingMigration(source_id);
if (!migration)
return cntx->SendError(kIdNotFound);
@ -847,8 +855,9 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) {
}
UpdateConfig(migration->GetSlots(), true);
VLOG(1) << "Config is updated for " << MyID();
cntx->SendLong(attempt);
return cntx->SendLong(attempt);
}
using EngineFunc = void (ClusterFamily::*)(CmdArgList args, ConnectionContext* cntx);

View file

@ -95,6 +95,7 @@ void IncomingSlotMigration::Join() {
void IncomingSlotMigration::StartFlow(uint32_t shard, io::Source* source) {
VLOG(1) << "Start flow for shard: " << shard;
state_.store(MigrationState::C_SYNC);
shard_flows_[shard]->Start(&cntx_, source);
bc_->Dec();

View file

@ -25,7 +25,7 @@ class IncomingSlotMigration {
void Join();
MigrationState GetState() const {
return state_;
return state_.load();
}
const SlotRanges& GetSlots() const {
@ -41,7 +41,7 @@ class IncomingSlotMigration {
Service& service_;
std::vector<std::unique_ptr<ClusterShardMigration>> shard_flows_;
SlotRanges slots_;
MigrationState state_ = MigrationState::C_NO_STATE;
std::atomic<MigrationState> state_ = MigrationState::C_NO_STATE;
Context cntx_;
util::fb2::BlockingCounter bc_;

View file

@ -17,8 +17,7 @@
#include "server/journal/streamer.h"
#include "server/server_family.h"
ABSL_FLAG(int, source_connect_timeout_ms, 20000,
"Timeout for establishing connection to a source node");
ABSL_FLAG(int, slot_migration_connection_timeout_ms, 2000, "Timeout for network operations");
using namespace std;
using namespace facade;
@ -37,7 +36,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
}
std::error_code Start(const std::string& node_id, uint32_t shard_id) {
RETURN_ON_ERR(ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_));
RETURN_ON_ERR(
ConnectAndAuth(absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms, &cntx_));
ResetParser(/*server_mode=*/false);
std::string cmd = absl::StrCat("DFLYMIGRATE FLOW ", node_id, " ", shard_id);
@ -86,6 +86,7 @@ MigrationState OutgoingMigration::GetState() const {
}
void OutgoingMigration::SyncFb() {
state_.store(MigrationState::C_SYNC);
auto start_cb = [this](util::ProactorBase* pb) {
if (auto* shard = EngineShard::tlocal(); shard) {
server_family_->journal()->StartInThread();
@ -146,10 +147,10 @@ bool OutgoingMigration::FinishMigration(long attempt) {
LOG_IF(WARNING, err) << err;
if (!err) {
long attempt_res = -1;
long attempt_res = kInvalidAttempt;
do { // we can have response from previos time so we need to read until get response for the
// last attempt
auto resp = ReadRespReply(absl::GetFlag(FLAGS_source_connect_timeout_ms));
auto resp = ReadRespReply(absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms));
if (!resp) {
LOG(WARNING) << resp.error();
@ -163,6 +164,9 @@ bool OutgoingMigration::FinishMigration(long attempt) {
return false;
}
attempt_res = get<int64_t>(LastResponseArgs().front().u);
if (attempt_res == kInvalidAttempt) {
return false;
}
} while (attempt_res != attempt);
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
@ -172,6 +176,7 @@ bool OutgoingMigration::FinishMigration(long attempt) {
state_.store(MigrationState::C_FINISHED);
cf_->UpdateConfig(migration_info_.slot_ranges, false);
VLOG(1) << "Config is updated for " << cf_->MyID();
return true;
} else {
// TODO implement connection issue error processing
@ -196,7 +201,7 @@ std::error_code OutgoingMigration::Start(ConnectionContext* cntx) {
RETURN_ON_ERR(check_connection_error(ec, "could not resolve host dns"));
VLOG(1) << "Connecting to source";
ec = ConnectAndAuth(absl::GetFlag(FLAGS_source_connect_timeout_ms) * 1ms, &cntx_);
ec = ConnectAndAuth(absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms, &cntx_);
RETURN_ON_ERR(check_connection_error(ec, "couldn't connect to source"));
VLOG(1) << "Migration initiating";

View file

@ -49,6 +49,8 @@ class OutgoingMigration : private ProtocolClient {
return migration_info_;
}
static constexpr long kInvalidAttempt = -1;
private:
MigrationState GetStateImpl() const;
// SliceSlotMigration manages state and data transfering for the corresponding shard

View file

@ -892,9 +892,8 @@ async def test_cluster_native_client(
await close_clients(client, *c_masters, *c_masters_admin, *c_replicas, *c_replicas_admin)
@pytest.mark.skip(reason="Test needs refactoring because of cluster design change")
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
async def test_config_consistency(df_local_factory: DflyInstanceFactory):
# Check slot migration from one node to another
nodes = [
df_local_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000)
@ -928,10 +927,8 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
c_nodes_admin,
)
status = await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", "127.0.0.1", str(nodes[0].admin_port)
)
assert "NO_STATE" == status
for node in c_nodes_admin:
assert await node.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == "NO_STATE"
migation_config = f"""
[
@ -950,12 +947,47 @@ async def test_cluster_slot_migration(df_local_factory: DflyInstanceFactory):
]
"""
# push config only to source node
await push_config(
migation_config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"),
[c_nodes_admin[0]],
)
assert await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == [
f"""out {node_ids[1]} SYNC keys:0"""
]
assert await c_nodes_admin[1].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == [
f"""in {node_ids[0]} SYNC keys:0"""
]
# migration shouldn't be finished until we set the same config to target node
await asyncio.sleep(0.5)
# push config to target node
await push_config(
migation_config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"),
[c_nodes_admin[1]],
)
while "FINISHED" not in await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[0]
):
logging.debug("SLOT-MIGRATION-STATUS is not FINISHED")
await asyncio.sleep(0.05)
assert await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == [
f"""out {node_ids[1]} FINISHED keys:0"""
]
# remove finished migrations
await push_config(
config.replace("LAST_SLOT_CUTOFF", "5199").replace("NEXT_SLOT_CUTOFF", "5200"),
c_nodes_admin,
)
# TODO add a check for correct results after the same config apply
for node in c_nodes_admin:
assert await node.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == "NO_STATE"
await close_clients(*c_nodes, *c_nodes_admin)