From 40851f9a2c9b15f9726fe6561929148c3a7655ea Mon Sep 17 00:00:00 2001 From: mkaruza Date: Wed, 7 May 2025 21:11:59 +0200 Subject: [PATCH] Simplify handling error response check --- src/server/cluster/cluster_family.cc | 2 +- src/server/cluster/incoming_slot_migration.cc | 1 - src/server/cluster/outgoing_slot_migration.cc | 28 ++++++------------- src/server/cluster/outgoing_slot_migration.h | 2 +- src/server/protocol_client.cc | 5 ++++ src/server/protocol_client.h | 3 ++ 6 files changed, 19 insertions(+), 22 deletions(-) diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 37419d1c3..3368baf0e 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -835,7 +835,7 @@ ClusterFamily::TakeOutOutgoingMigrations(shared_ptr new_config, removed_slots.Merge(slots); LOG(INFO) << "Outgoing migration cancelled: slots " << slots.ToString() << " to " << migration.GetHostIp() << ":" << migration.GetPort(); - migration.Finish(); + migration.Finish(MigrationState::C_FINISHED); res.migrations.push_back(std::move(*it)); outgoing_migration_jobs_.erase(it); } diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index bb3c2e9c5..4c0420c71 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -10,7 +10,6 @@ #include "base/flags.h" #include "base/logging.h" #include "cluster_utility.h" -#include "server/cluster/cluster_defs.h" #include "server/error.h" #include "server/journal/executor.h" #include "server/journal/tx_executor.h" diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 58219a92c..60086d784 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -12,7 +12,6 @@ #include "base/logging.h" #include "cluster_family.h" #include "cluster_utility.h" -#include "server/cluster/cluster_defs.h" #include "server/db_slice.h" #include "server/engine_shard_set.h" #include "server/error.h" @@ -37,7 +36,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots, journal::Journal* journal, OutgoingMigration* om) : ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &exec_st_) { - exec_st_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); }); + exec_st_.SwitchErrorHandler( + [om](auto ge) { om->Finish(MigrationState::C_ERROR, std::move(ge)); }); } ~SliceSlotMigration() { @@ -139,14 +139,8 @@ void OutgoingMigration::OnAllShards( }); } -void OutgoingMigration::Finish(GenericError error) { - auto next_state = MigrationState::C_FINISHED; +void OutgoingMigration::Finish(MigrationState next_state, GenericError error) { if (error) { - if (error.Format() == kIncomingMigrationOOM) { - next_state = MigrationState::C_FATAL; - } else { - next_state = MigrationState::C_ERROR; - } LOG(WARNING) << "Finish outgoing migration for " << cf_->MyID() << ": " << migration_info_.node_info.id << " with error: " << error.Format(); exec_st_.ReportError(std::move(error)); @@ -230,8 +224,7 @@ void OutgoingMigration::SyncFb() { // Break outgoing migration if INIT from incoming node responded with OOM. Usually this will // happen on second iteration after first failed with OOM. Sending second INIT is required to // cleanup slots on incoming slot migration node. - if (CheckRespFirstTypes({RespExpr::ERROR}) && - facade::ToSV(LastResponseArgs().front().GetBuf()) == kIncomingMigrationOOM) { + if (CheckRespSimpleError(kIncomingMigrationOOM)) { ChangeState(MigrationState::C_FATAL); break; } @@ -374,13 +367,10 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { return false; } - if (CheckRespFirstTypes({RespExpr::ERROR})) { - auto error = facade::ToSV(LastResponseArgs().front().GetBuf()); - // Check if returned incoming slot OOM and finish migration - if (error == kIncomingMigrationOOM) { - Finish(std::string(error)); - return false; - } + // Check OOM from incoming slot migration on ACK request + if (CheckRespSimpleError(kIncomingMigrationOOM)) { + Finish(MigrationState::C_FATAL, std::string(kIncomingMigrationOOM)); + return false; } if (!CheckRespFirstTypes({RespExpr::INT64})) { @@ -399,7 +389,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { } if (!exec_st_.GetError()) { - Finish(); + Finish(MigrationState::C_FINISHED); keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges); cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges, false); diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index 9ed9ccd4a..95a32feb9 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -30,7 +30,7 @@ class OutgoingMigration : private ProtocolClient { // if is_error = false mark migration as FINISHED and cancel migration if it's not finished yet // can be called from any thread, but only after Start() // if is_error = true and migration is in progress it will be restarted otherwise nothing happens - void Finish(GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_); + void Finish(MigrationState next_state, GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_); MigrationState GetState() const ABSL_LOCKS_EXCLUDED(state_mu_); diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index 8ecd6f65d..895e2af1a 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -339,6 +339,11 @@ bool ProtocolClient::CheckRespIsSimpleReply(string_view reply) const { ToSV(resp_args_.front().GetBuf()) == reply; } +bool ProtocolClient::CheckRespSimpleError(string_view error) const { + return resp_args_.size() == 1 && resp_args_.front().type == RespExpr::ERROR && + ToSV(resp_args_.front().GetBuf()) == error; +} + bool ProtocolClient::CheckRespFirstTypes(initializer_list types) const { unsigned i = 0; for (RespExpr::Type type : types) { diff --git a/src/server/protocol_client.h b/src/server/protocol_client.h index 6c08a0bb8..7e7ddda03 100644 --- a/src/server/protocol_client.h +++ b/src/server/protocol_client.h @@ -85,6 +85,9 @@ class ProtocolClient { // Check if reps_args contains a simple reply. bool CheckRespIsSimpleReply(std::string_view reply) const; + // Check if resp_args contains a simple error + bool CheckRespSimpleError(std::string_view error) const; + // Check resp_args contains the following types at front. bool CheckRespFirstTypes(std::initializer_list types) const;