refactor: add ability to reinit incoming migration object (#4756)

This commit is contained in:
Borys 2025-04-10 13:06:50 +03:00 committed by GitHub
parent 09882f4150
commit d57a581303
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 76 additions and 51 deletions

View file

@ -594,7 +594,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, SinkReplyBuilder* builder
new_config = new_config->CloneWithChanges(enable_slots, disable_slots);
StartSlotMigrations(new_config->GetNewOutgoingMigrations(ClusterConfig::Current()));
StartNewSlotMigrations(*new_config);
SlotSet before =
ClusterConfig::Current() ? ClusterConfig::Current()->GetOwnedSlots() : SlotSet(true);
@ -700,11 +700,23 @@ void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, SinkReplyBuilder* bui
return builder->SendOk();
}
void ClusterFamily::StartSlotMigrations(std::vector<MigrationInfo> migrations) {
// Add validating and error processing
for (auto& m : migrations) {
auto outgoing_migration = CreateOutgoingMigration(std::move(m));
outgoing_migration->Start();
void ClusterFamily::StartNewSlotMigrations(const ClusterConfig& new_config) {
// TODO Add validating and error processing
auto out_migrations = new_config.GetNewOutgoingMigrations(ClusterConfig::Current());
auto in_migrations = new_config.GetNewIncomingMigrations(ClusterConfig::Current());
util::fb2::LockGuard lk(migration_mu_);
for (auto& m : out_migrations) {
auto migration = make_shared<OutgoingMigration>(std::move(m), this, server_family_);
outgoing_migration_jobs_.emplace_back(migration);
migration->Start();
}
for (auto& m : in_migrations) {
auto migration = make_shared<IncomingSlotMigration>(m.node_info.id, &server_family_->service(),
m.slot_ranges);
incoming_migrations_jobs_.emplace_back(migration);
}
}
@ -900,35 +912,39 @@ void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) {
SlotRanges slot_ranges(std::move(slots));
const auto& incoming_migrations = ClusterConfig::Current()->GetIncomingMigrations();
bool found = any_of(incoming_migrations.begin(), incoming_migrations.end(),
[source_id = source_id, &slot_ranges](const MigrationInfo& info) {
return info.node_info.id == source_id && info.slot_ranges == slot_ranges;
std::shared_ptr<IncomingSlotMigration> migration;
{
util::fb2::LockGuard lk(migration_mu_);
auto it = find_if(incoming_migrations_jobs_.begin(), incoming_migrations_jobs_.end(),
[source_id = source_id, &slot_ranges](const auto& migration) {
return migration->GetSourceID() == source_id &&
migration->GetSlots() == slot_ranges;
});
if (!found) {
if (it != incoming_migrations_jobs_.end()) {
migration = *it;
}
}
if (!migration) {
VLOG(1) << "Unrecognized incoming migration from " << source_id;
return builder->SendSimpleString(OutgoingMigration::kUnknownMigration);
}
VLOG(1) << "Init migration " << source_id;
if (migration->GetState() != MigrationState::C_CONNECTING) {
migration->Stop();
auto slots = migration->GetSlots();
LOG(INFO) << "Flushing slots during migration reinitialization " << migration->GetSourceID()
<< ", slots: " << slots.ToString();
DeleteSlots(slots);
}
util::fb2::LockGuard lk(migration_mu_);
auto was_removed = RemoveIncomingMigrationImpl(incoming_migrations_jobs_, source_id);
LOG_IF(WARNING, was_removed) << "Reinit issued for migration from:" << source_id;
incoming_migrations_jobs_.emplace_back(make_shared<IncomingSlotMigration>(
string(source_id), &server_family_->service(), std::move(slot_ranges), flows_num));
migration->Init(flows_num);
return builder->SendOk();
}
std::shared_ptr<OutgoingMigration> ClusterFamily::CreateOutgoingMigration(MigrationInfo info) {
util::fb2::LockGuard lk(migration_mu_);
auto migration = make_shared<OutgoingMigration>(std::move(info), this, server_family_);
outgoing_migration_jobs_.emplace_back(migration);
return migration;
}
void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
CmdArgParser parser{args};

View file

@ -90,7 +90,7 @@ class ClusterFamily {
std::shared_ptr<IncomingSlotMigration> GetIncomingMigration(std::string_view source_id)
ABSL_LOCKS_EXCLUDED(migration_mu_);
void StartSlotMigrations(std::vector<MigrationInfo> migrations);
void StartNewSlotMigrations(const ClusterConfig& new_config);
// must be destroyed excluded set_config_mu and migration_mu_ locks
struct PreparedToRemoveOutgoingMigrations {
@ -105,10 +105,6 @@ class ClusterFamily {
void RemoveIncomingMigrations(const std::vector<MigrationInfo>& migrations)
ABSL_LOCKS_EXCLUDED(migration_mu_);
// store info about migration and create unique session id
std::shared_ptr<OutgoingMigration> CreateOutgoingMigration(MigrationInfo info)
ABSL_LOCKS_EXCLUDED(migration_mu_);
mutable util::fb2::Mutex migration_mu_; // guard migrations operations
// holds all incoming slots migrations that are currently in progress.
std::vector<std::shared_ptr<IncomingSlotMigration>> incoming_migrations_jobs_

View file

@ -151,17 +151,8 @@ class ClusterShardMigration {
atomic_bool pause_ = false;
};
IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, SlotRanges slots,
uint32_t shards_num)
: source_id_(std::move(source_id)),
service_(*se),
slots_(std::move(slots)),
state_(MigrationState::C_CONNECTING),
bc_(shards_num) {
shard_flows_.resize(shards_num);
for (unsigned i = 0; i < shards_num; ++i) {
shard_flows_[i].reset(new ClusterShardMigration(i, &service_, this, bc_));
}
IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, SlotRanges slots)
: source_id_(std::move(source_id)), service_(*se), slots_(std::move(slots)), bc_(0) {
}
IncomingSlotMigration::~IncomingSlotMigration() {
@ -203,7 +194,8 @@ bool IncomingSlotMigration::Join(long attempt) {
auto wait_res = bc_->WaitFor(wait_time);
if (is_attempt_correct) {
if (wait_res) {
state_.store(MigrationState::C_FINISHED);
util::fb2::LockGuard lk(state_mu_);
state_ = MigrationState::C_FINISHED;
keys_number_ = cluster::GetKeyCount(slots_);
} else {
LOG(WARNING) << "Can't join migration because of data after LSN for " << source_id_;
@ -215,7 +207,8 @@ bool IncomingSlotMigration::Join(long attempt) {
}
void IncomingSlotMigration::Stop() {
string_view log_state = state_.load() == MigrationState::C_FINISHED ? "Finishing" : "Cancelling";
util::fb2::LockGuard lk(state_mu_);
string_view log_state = state_ == MigrationState::C_FINISHED ? "Finishing" : "Cancelling";
LOG(INFO) << log_state << " incoming migration of slots " << slots_.ToString();
cntx_.Cancel();
@ -244,17 +237,31 @@ void IncomingSlotMigration::Stop() {
}
}
void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* source) {
state_.store(MigrationState::C_SYNC);
void IncomingSlotMigration::Init(uint32_t shards_num) {
util::fb2::LockGuard lk(state_mu_);
cntx_.Reset(nullptr);
state_ = MigrationState::C_SYNC;
bc_ = BlockingCounter(shards_num);
shard_flows_.resize(shards_num);
for (unsigned i = 0; i < shards_num; ++i) {
shard_flows_[i].reset(new ClusterShardMigration(i, &service_, this, bc_));
}
}
void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* source) {
shard_flows_[shard]->Start(&cntx_, source);
VLOG(1) << "Incoming flow " << shard << " finished for " << source_id_;
}
size_t IncomingSlotMigration::GetKeyCount() const {
if (state_.load() == MigrationState::C_FINISHED) {
return keys_number_;
{
util::fb2::LockGuard lk(state_mu_);
if (state_ == MigrationState::C_FINISHED) {
return keys_number_;
}
}
return cluster::GetKeyCount(slots_);
}

View file

@ -19,7 +19,7 @@ class ClusterShardMigration;
// manage migration process state and data
class IncomingSlotMigration {
public:
IncomingSlotMigration(std::string source_id, Service* se, SlotRanges slots, uint32_t shards_num);
IncomingSlotMigration(std::string source_id, Service* se, SlotRanges slots);
~IncomingSlotMigration();
// process data from FDLYMIGRATE FLOW cmd
@ -34,8 +34,12 @@ class IncomingSlotMigration {
// Stop and join the migration, can be called even after migration is finished
void Stop();
// Init/Reinit migration
void Init(uint32_t shards_num);
MigrationState GetState() const {
return state_.load();
util::fb2::LockGuard lk(state_mu_);
return state_;
}
const SlotRanges& GetSlots() const {
@ -70,12 +74,14 @@ class IncomingSlotMigration {
Service& service_;
std::vector<std::unique_ptr<ClusterShardMigration>> shard_flows_;
SlotRanges slots_;
std::atomic<MigrationState> state_ = MigrationState::C_CONNECTING;
ExecutionState cntx_;
mutable util::fb2::Mutex error_mu_;
dfly::GenericError last_error_ ABSL_GUARDED_BY(error_mu_);
std::atomic<size_t> errors_count_ = 0;
mutable util::fb2::Mutex state_mu_;
MigrationState state_ ABSL_GUARDED_BY(state_mu_) = MigrationState::C_CONNECTING;
// when migration is finished we need to store number of migrated keys
// because new request can add or remove keys and we get incorrect statistic
size_t keys_number_ = 0;

View file

@ -8,7 +8,7 @@
#include "server/transaction.h"
namespace dfly {
class DbSlice;
class ServerFamily;
namespace journal {