refactor: clean cluster code (#4707)

This commit is contained in:
Borys 2025-03-06 10:08:16 +02:00 committed by GitHub
parent a39d777b82
commit 700d375ffc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 17 additions and 19 deletions

View file

@ -941,8 +941,6 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder,
return builder->SendError(err->MakeReply());
}
auto host_ip = cntx->conn()->RemoteEndpointAddress();
VLOG(1) << "Create flow " << source_id << " shard_id: " << shard_id;
cntx->conn()->SetName(absl::StrCat("migration_flow_", source_id));

View file

@ -4,8 +4,6 @@
#pragma once
#include <absl/container/btree_map.h>
#include <string>
#include "facade/conn_context.h"

View file

@ -217,7 +217,7 @@ bool IncomingSlotMigration::Join(long attempt) {
void IncomingSlotMigration::Stop() {
string_view log_state = state_.load() == MigrationState::C_FINISHED ? "Finishing" : "Cancelling";
LOG(INFO) << log_state << " incoming migration of slots " << slots_.ToString();
cntx_.ReportCancelError();
cntx_.Cancel();
for (auto& flow : shard_flows_) {
if (auto err = flow->Cancel(); err) {

View file

@ -3,7 +3,6 @@
//
#pragma once
#include "helio/io/io.h"
#include "helio/util/fiber_socket_base.h"
#include "server/cluster/cluster_defs.h"
#include "server/common.h"

View file

@ -309,8 +309,11 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
nullptr, ClientPause::WRITE, is_pause_in_progress);
if (!pause_fb_opt) {
LOG(WARNING) << "Migration finalization time out " << cf_->MyID() << " : "
<< migration_info_.node_info.id << " attempt " << attempt;
auto err = absl::StrCat("Migration finalization time out ", cf_->MyID(), " : ",
migration_info_.node_info.id, " attempt ", attempt);
LOG(WARNING) << err;
SetLastError(std::move(err));
}
absl::Cleanup cleanup([&is_block_active, &pause_fb_opt]() {

View file

@ -3,9 +3,6 @@
//
#pragma once
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include "io/io.h"
#include "server/cluster/cluster_defs.h"
#include "server/protocol_client.h"
#include "server/transaction.h"
@ -53,18 +50,21 @@ class OutgoingMigration : private ProtocolClient {
return migration_info_;
}
void ResetError() ABSL_LOCKS_EXCLUDED(error_mu_) {
void ResetError() {
if (cntx_.IsError()) {
errors_count_.fetch_add(1, std::memory_order_relaxed);
auto err = cntx_.GetError();
{
util::fb2::LockGuard lk(error_mu_);
last_error_ = std::move(err);
}
SetLastError(cntx_.GetError());
cntx_.Reset(nullptr);
}
}
void SetLastError(dfly::GenericError err) ABSL_LOCKS_EXCLUDED(error_mu_) {
if (!err)
return;
errors_count_.fetch_add(1, std::memory_order_relaxed);
util::fb2::LockGuard lk(error_mu_);
last_error_ = std::move(err);
}
std::string GetErrorStr() const ABSL_LOCKS_EXCLUDED(error_mu_) {
util::fb2::LockGuard lk(error_mu_);
return last_error_.Format();

View file

@ -397,7 +397,7 @@ GenericError ExecutionState::ReportErrorInternal(GenericError&& err) {
// This context is either new or was Reset, where the handler was joined
CHECK(!err_handler_fb_.IsJoinable());
LOG(ERROR) << "ReportError: " << err_.Format();
LOG(WARNING) << "ReportError: " << err_.Format();
// We can move err_handler_ because it should run at most once.
if (err_handler_)