mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
Simplify handling error response check
This commit is contained in:
parent
f080a5208a
commit
40851f9a2c
6 changed files with 19 additions and 22 deletions
|
@ -835,7 +835,7 @@ ClusterFamily::TakeOutOutgoingMigrations(shared_ptr<ClusterConfig> new_config,
|
|||
removed_slots.Merge(slots);
|
||||
LOG(INFO) << "Outgoing migration cancelled: slots " << slots.ToString() << " to "
|
||||
<< migration.GetHostIp() << ":" << migration.GetPort();
|
||||
migration.Finish();
|
||||
migration.Finish(MigrationState::C_FINISHED);
|
||||
res.migrations.push_back(std::move(*it));
|
||||
outgoing_migration_jobs_.erase(it);
|
||||
}
|
||||
|
|
|
@ -10,7 +10,6 @@
|
|||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "cluster_utility.h"
|
||||
#include "server/cluster/cluster_defs.h"
|
||||
#include "server/error.h"
|
||||
#include "server/journal/executor.h"
|
||||
#include "server/journal/tx_executor.h"
|
||||
|
|
|
@ -12,7 +12,6 @@
|
|||
#include "base/logging.h"
|
||||
#include "cluster_family.h"
|
||||
#include "cluster_utility.h"
|
||||
#include "server/cluster/cluster_defs.h"
|
||||
#include "server/db_slice.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/error.h"
|
||||
|
@ -37,7 +36,8 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
|
|||
SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots,
|
||||
journal::Journal* journal, OutgoingMigration* om)
|
||||
: ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &exec_st_) {
|
||||
exec_st_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); });
|
||||
exec_st_.SwitchErrorHandler(
|
||||
[om](auto ge) { om->Finish(MigrationState::C_ERROR, std::move(ge)); });
|
||||
}
|
||||
|
||||
~SliceSlotMigration() {
|
||||
|
@ -139,14 +139,8 @@ void OutgoingMigration::OnAllShards(
|
|||
});
|
||||
}
|
||||
|
||||
void OutgoingMigration::Finish(GenericError error) {
|
||||
auto next_state = MigrationState::C_FINISHED;
|
||||
void OutgoingMigration::Finish(MigrationState next_state, GenericError error) {
|
||||
if (error) {
|
||||
if (error.Format() == kIncomingMigrationOOM) {
|
||||
next_state = MigrationState::C_FATAL;
|
||||
} else {
|
||||
next_state = MigrationState::C_ERROR;
|
||||
}
|
||||
LOG(WARNING) << "Finish outgoing migration for " << cf_->MyID() << ": "
|
||||
<< migration_info_.node_info.id << " with error: " << error.Format();
|
||||
exec_st_.ReportError(std::move(error));
|
||||
|
@ -230,8 +224,7 @@ void OutgoingMigration::SyncFb() {
|
|||
// Break outgoing migration if INIT from incoming node responded with OOM. Usually this will
|
||||
// happen on second iteration after first failed with OOM. Sending second INIT is required to
|
||||
// cleanup slots on incoming slot migration node.
|
||||
if (CheckRespFirstTypes({RespExpr::ERROR}) &&
|
||||
facade::ToSV(LastResponseArgs().front().GetBuf()) == kIncomingMigrationOOM) {
|
||||
if (CheckRespSimpleError(kIncomingMigrationOOM)) {
|
||||
ChangeState(MigrationState::C_FATAL);
|
||||
break;
|
||||
}
|
||||
|
@ -374,13 +367,10 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
|
|||
return false;
|
||||
}
|
||||
|
||||
if (CheckRespFirstTypes({RespExpr::ERROR})) {
|
||||
auto error = facade::ToSV(LastResponseArgs().front().GetBuf());
|
||||
// Check if returned incoming slot OOM and finish migration
|
||||
if (error == kIncomingMigrationOOM) {
|
||||
Finish(std::string(error));
|
||||
return false;
|
||||
}
|
||||
// Check OOM from incoming slot migration on ACK request
|
||||
if (CheckRespSimpleError(kIncomingMigrationOOM)) {
|
||||
Finish(MigrationState::C_FATAL, std::string(kIncomingMigrationOOM));
|
||||
return false;
|
||||
}
|
||||
|
||||
if (!CheckRespFirstTypes({RespExpr::INT64})) {
|
||||
|
@ -399,7 +389,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
|
|||
}
|
||||
|
||||
if (!exec_st_.GetError()) {
|
||||
Finish();
|
||||
Finish(MigrationState::C_FINISHED);
|
||||
keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges);
|
||||
cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges,
|
||||
false);
|
||||
|
|
|
@ -30,7 +30,7 @@ class OutgoingMigration : private ProtocolClient {
|
|||
// if is_error = false mark migration as FINISHED and cancel migration if it's not finished yet
|
||||
// can be called from any thread, but only after Start()
|
||||
// if is_error = true and migration is in progress it will be restarted otherwise nothing happens
|
||||
void Finish(GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_);
|
||||
void Finish(MigrationState next_state, GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_);
|
||||
|
||||
MigrationState GetState() const ABSL_LOCKS_EXCLUDED(state_mu_);
|
||||
|
||||
|
|
|
@ -339,6 +339,11 @@ bool ProtocolClient::CheckRespIsSimpleReply(string_view reply) const {
|
|||
ToSV(resp_args_.front().GetBuf()) == reply;
|
||||
}
|
||||
|
||||
bool ProtocolClient::CheckRespSimpleError(string_view error) const {
|
||||
return resp_args_.size() == 1 && resp_args_.front().type == RespExpr::ERROR &&
|
||||
ToSV(resp_args_.front().GetBuf()) == error;
|
||||
}
|
||||
|
||||
bool ProtocolClient::CheckRespFirstTypes(initializer_list<RespExpr::Type> types) const {
|
||||
unsigned i = 0;
|
||||
for (RespExpr::Type type : types) {
|
||||
|
|
|
@ -85,6 +85,9 @@ class ProtocolClient {
|
|||
// Check if reps_args contains a simple reply.
|
||||
bool CheckRespIsSimpleReply(std::string_view reply) const;
|
||||
|
||||
// Check if resp_args contains a simple error
|
||||
bool CheckRespSimpleError(std::string_view error) const;
|
||||
|
||||
// Check resp_args contains the following types at front.
|
||||
bool CheckRespFirstTypes(std::initializer_list<facade::RespExpr::Type> types) const;
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue