feat: cluster migrations restarts immediately if timeout happens (#4081)

* feat: cluster migrations restarts immediately if timeout happens

* feat: add DEBUG MIGRATION PAUSE command
This commit is contained in:
Borys 2024-11-25 16:02:22 +02:00 committed by GitHub
parent 3c65651c69
commit 43c83d29fa
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
15 changed files with 199 additions and 70 deletions

View file

@ -35,7 +35,6 @@ ABSL_FLAG(std::string, cluster_node_id, "",
ABSL_FLAG(bool, managed_service_info, false, ABSL_FLAG(bool, managed_service_info, false,
"Hides some implementation details from users when true (i.e. in managed service env)"); "Hides some implementation details from users when true (i.e. in managed service env)");
ABSL_DECLARE_FLAG(int32_t, port); ABSL_DECLARE_FLAG(int32_t, port);
ABSL_DECLARE_FLAG(uint16_t, announce_port); ABSL_DECLARE_FLAG(uint16_t, announce_port);
@ -1007,6 +1006,14 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) {
return builder->SendLong(attempt); return builder->SendLong(attempt);
} }
void ClusterFamily::PauseAllIncomingMigrations(bool pause) {
util::fb2::LockGuard lk(migration_mu_);
LOG_IF(ERROR, incoming_migrations_jobs_.empty()) << "No incoming migrations!";
for (auto& im : incoming_migrations_jobs_) {
im->Pause(pause);
}
}
using EngineFunc = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* builder, using EngineFunc = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* builder,
ConnectionContext* cntx); ConnectionContext* cntx);

View file

@ -44,6 +44,9 @@ class ClusterFamily {
return id_; return id_;
} }
// Only for debug purpose. Pause/Resume all incoming migrations
void PauseAllIncomingMigrations(bool pause);
private: private:
using SinkReplyBuilder = facade::SinkReplyBuilder; using SinkReplyBuilder = facade::SinkReplyBuilder;
@ -64,7 +67,7 @@ class ClusterFamily {
// Custom Dragonfly commands for cluster management // Custom Dragonfly commands for cluster management
void DflyCluster(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx); void DflyCluster(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx);
void DflyClusterConfig(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx); void DflyClusterConfig(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx);
ABSL_LOCKS_EXCLUDED(set_config_mu, migration_mu_);
void DflyClusterGetSlotInfo(CmdArgList args, SinkReplyBuilder* builder) void DflyClusterGetSlotInfo(CmdArgList args, SinkReplyBuilder* builder)
ABSL_LOCKS_EXCLUDED(migration_mu_); ABSL_LOCKS_EXCLUDED(migration_mu_);
void DflyClusterFlushSlots(CmdArgList args, SinkReplyBuilder* builder); void DflyClusterFlushSlots(CmdArgList args, SinkReplyBuilder* builder);

View file

@ -38,6 +38,10 @@ class ClusterShardMigration {
bc_(bc) { bc_(bc) {
} }
void Pause(bool pause) {
pause_ = pause;
}
void Start(Context* cntx, util::FiberSocketBase* source) ABSL_LOCKS_EXCLUDED(mu_) { void Start(Context* cntx, util::FiberSocketBase* source) ABSL_LOCKS_EXCLUDED(mu_) {
{ {
util::fb2::LockGuard lk(mu_); util::fb2::LockGuard lk(mu_);
@ -56,6 +60,11 @@ class ClusterShardMigration {
TransactionReader tx_reader; TransactionReader tx_reader;
while (!cntx->IsCancelled()) { while (!cntx->IsCancelled()) {
if (pause_) {
ThisFiber::SleepFor(100ms);
continue;
}
auto tx_data = tx_reader.NextTxData(&reader, cntx); auto tx_data = tx_reader.NextTxData(&reader, cntx);
if (!tx_data) { if (!tx_data) {
in_migration_->ReportError(GenericError("No tx data")); in_migration_->ReportError(GenericError("No tx data"));
@ -135,6 +144,7 @@ class ClusterShardMigration {
IncomingSlotMigration* in_migration_; IncomingSlotMigration* in_migration_;
util::fb2::BlockingCounter bc_; util::fb2::BlockingCounter bc_;
atomic_long last_attempt_{-1}; atomic_long last_attempt_{-1};
atomic_bool pause_ = false;
}; };
IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, SlotRanges slots, IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, SlotRanges slots,
@ -153,6 +163,13 @@ IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, Slot
IncomingSlotMigration::~IncomingSlotMigration() { IncomingSlotMigration::~IncomingSlotMigration() {
} }
void IncomingSlotMigration::Pause(bool pause) {
VLOG(1) << "Pausing migration " << pause;
for (auto& flow : shard_flows_) {
flow->Pause(pause);
}
}
bool IncomingSlotMigration::Join(long attempt) { bool IncomingSlotMigration::Join(long attempt) {
const absl::Time start = absl::Now(); const absl::Time start = absl::Now();
const absl::Duration timeout = const absl::Duration timeout =
@ -161,8 +178,7 @@ bool IncomingSlotMigration::Join(long attempt) {
while (true) { while (true) {
const absl::Time now = absl::Now(); const absl::Time now = absl::Now();
const absl::Duration passed = now - start; const absl::Duration passed = now - start;
VLOG_EVERY_N(1, 1000) << "Checking whether to continue with join " << passed << " vs " VLOG(1) << "Checking whether to continue with join " << passed << " vs " << timeout;
<< timeout;
if (passed >= timeout) { if (passed >= timeout) {
LOG(WARNING) << "Can't join migration in time"; LOG(WARNING) << "Can't join migration in time";
ReportError(GenericError("Can't join migration in time")); ReportError(GenericError("Can't join migration in time"));
@ -198,8 +214,7 @@ void IncomingSlotMigration::Stop() {
while (true) { while (true) {
const absl::Time now = absl::Now(); const absl::Time now = absl::Now();
const absl::Duration passed = now - start; const absl::Duration passed = now - start;
VLOG_EVERY_N(1, 1000) << "Checking whether to continue with stop " << passed << " vs " VLOG(1) << "Checking whether to continue with stop " << passed << " vs " << timeout;
<< timeout;
if (bc_->WaitFor(absl::ToInt64Milliseconds(timeout - passed) * 1ms)) { if (bc_->WaitFor(absl::ToInt64Milliseconds(timeout - passed) * 1ms)) {
return; return;

View file

@ -59,6 +59,8 @@ class IncomingSlotMigration {
size_t GetKeyCount() const; size_t GetKeyCount() const;
void Pause(bool pause);
private: private:
std::string source_id_; std::string source_id_;
Service& service_; Service& service_;

View file

@ -31,12 +31,14 @@ namespace dfly::cluster {
class OutgoingMigration::SliceSlotMigration : private ProtocolClient { class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
public: public:
SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots, SliceSlotMigration(DbSlice* slice, ServerContext server_context, SlotSet slots,
journal::Journal* journal) journal::Journal* journal, OutgoingMigration* om)
: ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &cntx_) { : ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &cntx_) {
cntx_.SwitchErrorHandler([om](auto ge) { om->Finish(std::move(ge)); });
} }
~SliceSlotMigration() { ~SliceSlotMigration() {
streamer_.Cancel(); streamer_.Cancel();
cntx_.JoinErrorHandler();
} }
// Send DFLYMIGRATE FLOW // Send DFLYMIGRATE FLOW
@ -107,6 +109,7 @@ OutgoingMigration::OutgoingMigration(MigrationInfo info, ClusterFamily* cf, Serv
OutgoingMigration::~OutgoingMigration() { OutgoingMigration::~OutgoingMigration() {
main_sync_fb_.JoinIfNeeded(); main_sync_fb_.JoinIfNeeded();
cntx_.JoinErrorHandler();
// Destroy each flow in its dedicated thread, because we could be the last // Destroy each flow in its dedicated thread, because we could be the last
// owner of the db tables // owner of the db tables
OnAllShards([](auto& migration) { migration.reset(); }); OnAllShards([](auto& migration) { migration.reset(); });
@ -131,9 +134,18 @@ void OutgoingMigration::OnAllShards(
}); });
} }
void OutgoingMigration::Finish(bool is_error) { void OutgoingMigration::Finish(GenericError error) {
VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << " : " auto next_state = MigrationState::C_FINISHED;
<< migration_info_.node_info.id; if (error) {
next_state = MigrationState::C_ERROR;
VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << ": "
<< migration_info_.node_info.id << " with error: " << error.Format();
cntx_.ReportError(std::move(error));
} else {
VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << ": "
<< migration_info_.node_info.id;
}
bool should_cancel_flows = false; bool should_cancel_flows = false;
{ {
@ -151,8 +163,7 @@ void OutgoingMigration::Finish(bool is_error) {
should_cancel_flows = true; should_cancel_flows = true;
break; break;
} }
state_ = next_state;
state_ = is_error ? MigrationState::C_ERROR : MigrationState::C_FINISHED;
} }
if (should_cancel_flows) { if (should_cancel_flows) {
@ -160,6 +171,7 @@ void OutgoingMigration::Finish(bool is_error) {
CHECK(migration != nullptr); CHECK(migration != nullptr);
migration->Cancel(); migration->Cancel();
}); });
cntx_.JoinErrorHandler();
} }
} }
@ -185,15 +197,15 @@ void OutgoingMigration::SyncFb() {
ThisFiber::SleepFor(1000ms); // wait some time before next retry ThisFiber::SleepFor(1000ms); // wait some time before next retry
} }
VLOG(2) << "Connecting to source"; VLOG(1) << "Connecting to target node";
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms; auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) { if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) {
VLOG(1) << "Can't connect to source"; LOG(WARNING) << "Can't connect to taget node";
cntx_.ReportError(GenericError(ec, "Couldn't connect to source.")); cntx_.ReportError(GenericError(ec, "Couldn't connect to source."));
continue; continue;
} }
VLOG(2) << "Migration initiating"; VLOG(1) << "Migration initiating";
ResetParser(false); ResetParser(false);
auto cmd = absl::StrCat("DFLYMIGRATE INIT ", cf_->MyID(), " ", slot_migrations_.size()); auto cmd = absl::StrCat("DFLYMIGRATE INIT ", cf_->MyID(), " ", slot_migrations_.size());
for (const auto& s : migration_info_.slot_ranges) { for (const auto& s : migration_info_.slot_ranges) {
@ -201,7 +213,7 @@ void OutgoingMigration::SyncFb() {
} }
if (auto ec = SendCommandAndReadResponse(cmd); ec) { if (auto ec = SendCommandAndReadResponse(cmd); ec) {
VLOG(1) << "Unable to initialize migration"; LOG(WARNING) << "Can't connect to taget node";
cntx_.ReportError(GenericError(ec, "Could not send INIT command.")); cntx_.ReportError(GenericError(ec, "Could not send INIT command."));
continue; continue;
} }
@ -211,7 +223,7 @@ void OutgoingMigration::SyncFb() {
VLOG(2) << "Target node does not recognize migration; retrying"; VLOG(2) << "Target node does not recognize migration; retrying";
ThisFiber::SleepFor(1000ms); ThisFiber::SleepFor(1000ms);
} else { } else {
VLOG(1) << "Unable to initialize migration"; LOG(WARNING) << "Unable to initialize migration";
cntx_.ReportError(GenericError(std::string(ToSV(LastResponseArgs().front().GetBuf())))); cntx_.ReportError(GenericError(std::string(ToSV(LastResponseArgs().front().GetBuf()))));
} }
continue; continue;
@ -221,7 +233,7 @@ void OutgoingMigration::SyncFb() {
DbSlice& db_slice = namespaces->GetDefaultNamespace().GetCurrentDbSlice(); DbSlice& db_slice = namespaces->GetDefaultNamespace().GetCurrentDbSlice();
server_family_->journal()->StartInThread(); server_family_->journal()->StartInThread();
migration = std::make_unique<SliceSlotMigration>( migration = std::make_unique<SliceSlotMigration>(
&db_slice, server(), migration_info_.slot_ranges, server_family_->journal()); &db_slice, server(), migration_info_.slot_ranges, server_family_->journal(), this);
}); });
if (!ChangeState(MigrationState::C_SYNC)) { if (!ChangeState(MigrationState::C_SYNC)) {
@ -229,8 +241,7 @@ void OutgoingMigration::SyncFb() {
} }
OnAllShards([this](auto& migration) { migration->PrepareFlow(cf_->MyID()); }); OnAllShards([this](auto& migration) { migration->PrepareFlow(cf_->MyID()); });
if (CheckFlowsForErrors()) { if (cntx_.GetError()) {
LOG(WARNING) << "Preparation error detected, retrying outgoing migration";
continue; continue;
} }
@ -241,14 +252,13 @@ void OutgoingMigration::SyncFb() {
OnAllShards([](auto& migration) { migration->PrepareSync(); }); OnAllShards([](auto& migration) { migration->PrepareSync(); });
} }
OnAllShards([this](auto& migration) { if (cntx_.GetError()) {
migration->RunSync(); continue;
if (migration->GetError()) }
Finish(true);
});
if (CheckFlowsForErrors()) { OnAllShards([this](auto& migration) { migration->RunSync(); });
LOG(WARNING) << "Errors detected, retrying outgoing migration";
if (cntx_.GetError()) {
continue; continue;
} }
@ -258,8 +268,7 @@ void OutgoingMigration::SyncFb() {
VLOG(1) << "Waiting for migration to finalize..."; VLOG(1) << "Waiting for migration to finalize...";
ThisFiber::SleepFor(500ms); ThisFiber::SleepFor(500ms);
} }
if (CheckFlowsForErrors()) { if (cntx_.GetError()) {
LOG(WARNING) << "Errors detected, retrying outgoing migration";
continue; continue;
} }
break; break;
@ -273,14 +282,13 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
// reconnect and ACK one more time // reconnect and ACK one more time
VLOG(1) << "FinalizeMigration for " << cf_->MyID() << " : " << migration_info_.node_info.id; VLOG(1) << "FinalizeMigration for " << cf_->MyID() << " : " << migration_info_.node_info.id;
if (attempt > 1) { if (attempt > 1) {
if (CheckFlowsForErrors()) { if (cntx_.GetError()) {
Finish(true);
return true; return true;
} }
VLOG(1) << "Reconnecting to source"; VLOG(1) << "Reconnecting to source";
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms; auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) { if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) {
cntx_.ReportError(GenericError(ec, "Couldn't connect to source.")); LOG(WARNING) << "Couldn't connect to source.";
return false; return false;
} }
} }
@ -291,8 +299,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
bool is_block_active = true; bool is_block_active = true;
auto is_pause_in_progress = [&is_block_active] { return is_block_active; }; auto is_pause_in_progress = [&is_block_active] { return is_block_active; };
auto pause_fb_opt = auto pause_fb_opt =
Pause(server_family_->GetNonPriviligedListeners(), &namespaces->GetDefaultNamespace(), dfly::Pause(server_family_->GetNonPriviligedListeners(), &namespaces->GetDefaultNamespace(),
nullptr, ClientPause::WRITE, is_pause_in_progress); nullptr, ClientPause::WRITE, is_pause_in_progress);
if (!pause_fb_opt) { if (!pause_fb_opt) {
LOG(WARNING) << "Cluster migration finalization time out"; LOG(WARNING) << "Cluster migration finalization time out";
@ -346,9 +354,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
} }
} }
auto is_error = CheckFlowsForErrors(); if (!cntx_.GetError()) {
Finish(is_error); Finish();
if (!is_error) {
keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges); keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges);
cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges, cf_->ApplyMigrationSlotRangeToConfig(migration_info_.node_info.id, migration_info_.slot_ranges,
false); false);
@ -366,16 +373,6 @@ void OutgoingMigration::Start() {
main_sync_fb_ = fb2::Fiber("outgoing_migration", &OutgoingMigration::SyncFb, this); main_sync_fb_ = fb2::Fiber("outgoing_migration", &OutgoingMigration::SyncFb, this);
} }
bool OutgoingMigration::CheckFlowsForErrors() {
for (const auto& flow : slot_migrations_) {
if (flow->GetError()) {
cntx_.ReportError(flow->GetError());
return true;
}
}
return false;
}
size_t OutgoingMigration::GetKeyCount() const { size_t OutgoingMigration::GetKeyCount() const {
util::fb2::LockGuard lk(state_mu_); util::fb2::LockGuard lk(state_mu_);
if (state_ == MigrationState::C_FINISHED) { if (state_ == MigrationState::C_FINISHED) {

View file

@ -30,9 +30,10 @@ class OutgoingMigration : private ProtocolClient {
// start migration process, sends INIT command to the target node // start migration process, sends INIT command to the target node
void Start(); void Start();
// mark migration as FINISHED and cancel migration if it's not finished yet // if is_error = false mark migration as FINISHED and cancel migration if it's not finished yet
// can be called from any thread, but only after Start() // can be called from any thread, but only after Start()
void Finish(bool is_error = false) ABSL_LOCKS_EXCLUDED(state_mu_); // if is_error = true and migration is in progress it will be restarted otherwise nothing happens
void Finish(GenericError error = {}) ABSL_LOCKS_EXCLUDED(state_mu_);
MigrationState GetState() const ABSL_LOCKS_EXCLUDED(state_mu_); MigrationState GetState() const ABSL_LOCKS_EXCLUDED(state_mu_);
@ -65,9 +66,6 @@ class OutgoingMigration : private ProtocolClient {
// should be run for all shards // should be run for all shards
void StartFlow(journal::Journal* journal, io::Sink* dest); void StartFlow(journal::Journal* journal, io::Sink* dest);
// if we have an error reports it into cntx_ and return true
bool CheckFlowsForErrors();
MigrationState GetStateImpl() const; MigrationState GetStateImpl() const;
// SliceSlotMigration manages state and data transfering for the corresponding shard // SliceSlotMigration manages state and data transfering for the corresponding shard
class SliceSlotMigration; class SliceSlotMigration;

View file

@ -367,7 +367,8 @@ OpResult<ValueCompressInfo> EstimateCompression(ConnectionContext* cntx, string_
} // namespace } // namespace
DebugCmd::DebugCmd(ServerFamily* owner, ConnectionContext* cntx) : sf_(*owner), cntx_(cntx) { DebugCmd::DebugCmd(ServerFamily* owner, cluster::ClusterFamily* cf, ConnectionContext* cntx)
: sf_(*owner), cf_(*cf), cntx_(cntx) {
} }
void DebugCmd::Run(CmdArgList args, facade::SinkReplyBuilder* builder) { void DebugCmd::Run(CmdArgList args, facade::SinkReplyBuilder* builder) {
@ -437,6 +438,10 @@ void DebugCmd::Run(CmdArgList args, facade::SinkReplyBuilder* builder) {
return Replica(args, builder); return Replica(args, builder);
} }
if (subcmd == "MIGRATION" && args.size() == 2) {
return Migration(args, builder);
}
if (subcmd == "WATCHED") { if (subcmd == "WATCHED") {
return Watched(builder); return Watched(builder);
} }
@ -550,6 +555,18 @@ void DebugCmd::Replica(CmdArgList args, facade::SinkReplyBuilder* builder) {
return builder->SendError(UnknownSubCmd("replica", "DEBUG")); return builder->SendError(UnknownSubCmd("replica", "DEBUG"));
} }
void DebugCmd::Migration(CmdArgList args, facade::SinkReplyBuilder* builder) {
args.remove_prefix(1);
string opt = absl::AsciiStrToUpper(ArgS(args, 0));
if (opt == "PAUSE" || opt == "RESUME") {
cf_.PauseAllIncomingMigrations(opt == "PAUSE");
return builder->SendOk();
}
return builder->SendError(UnknownSubCmd("MIGRATION", "DEBUG"));
}
optional<DebugCmd::PopulateOptions> DebugCmd::ParsePopulateArgs(CmdArgList args, optional<DebugCmd::PopulateOptions> DebugCmd::ParsePopulateArgs(CmdArgList args,
facade::SinkReplyBuilder* builder) { facade::SinkReplyBuilder* builder) {
if (args.size() < 2) { if (args.size() < 2) {

View file

@ -9,6 +9,10 @@
namespace dfly { namespace dfly {
namespace cluster {
class ClusterFamily;
}
class EngineShardSet; class EngineShardSet;
class ServerFamily; class ServerFamily;
@ -26,7 +30,7 @@ class DebugCmd {
}; };
public: public:
DebugCmd(ServerFamily* owner, ConnectionContext* cntx); DebugCmd(ServerFamily* owner, cluster::ClusterFamily* cf, ConnectionContext* cntx);
void Run(CmdArgList args, facade::SinkReplyBuilder* builder); void Run(CmdArgList args, facade::SinkReplyBuilder* builder);
@ -40,6 +44,7 @@ class DebugCmd {
void Reload(CmdArgList args, facade::SinkReplyBuilder* builder); void Reload(CmdArgList args, facade::SinkReplyBuilder* builder);
void Replica(CmdArgList args, facade::SinkReplyBuilder* builder); void Replica(CmdArgList args, facade::SinkReplyBuilder* builder);
void Migration(CmdArgList args, facade::SinkReplyBuilder* builder);
void Exec(facade::SinkReplyBuilder* builder); void Exec(facade::SinkReplyBuilder* builder);
void Inspect(std::string_view key, CmdArgList args, facade::SinkReplyBuilder* builder); void Inspect(std::string_view key, CmdArgList args, facade::SinkReplyBuilder* builder);
@ -52,6 +57,7 @@ class DebugCmd {
void RecvSize(std::string_view param, facade::SinkReplyBuilder* builder); void RecvSize(std::string_view param, facade::SinkReplyBuilder* builder);
ServerFamily& sf_; ServerFamily& sf_;
cluster::ClusterFamily& cf_;
ConnectionContext* cntx_; ConnectionContext* cntx_;
}; };

View file

@ -167,7 +167,7 @@ void JournalStreamer::ThrottleIfNeeded() {
if (status == std::cv_status::timeout) { if (status == std::cv_status::timeout) {
LOG(WARNING) << "Stream timed out, inflight bytes/sent start: " << inflight_start << "/" LOG(WARNING) << "Stream timed out, inflight bytes/sent start: " << inflight_start << "/"
<< sent_start << ", end: " << in_flight_bytes_ << "/" << total_sent_; << sent_start << ", end: " << in_flight_bytes_ << "/" << total_sent_;
cntx_->ReportError(make_error_code(errc::stream_timeout)); cntx_->ReportError("JournalStreamer write operation timeout");
} }
} }

View file

@ -865,7 +865,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
acl_family_.Init(listeners.front(), &user_registry_); acl_family_.Init(listeners.front(), &user_registry_);
} }
// Initialize shard_set with a global callback running once in a while in the shard threads. // Initialize shard_set with a callback running once in a while in the shard threads.
shard_set->Init(shard_num, [this] { shard_set->Init(shard_num, [this] {
server_family_.GetDflyCmd()->BreakStalledFlowsInShard(); server_family_.GetDflyCmd()->BreakStalledFlowsInShard();
server_family_.UpdateMemoryGlobalStats(); server_family_.UpdateMemoryGlobalStats();

View file

@ -114,6 +114,10 @@ class Service : public facade::ServiceInterface {
return server_family_; return server_family_;
} }
cluster::ClusterFamily& cluster_family() {
return cluster_family_;
}
// Utility function used in unit tests // Utility function used in unit tests
// Do not use in production, only meant to be used by unit tests // Do not use in production, only meant to be used by unit tests
const acl::AclFamily* TestInit(); const acl::AclFamily* TestInit();

View file

@ -1989,7 +1989,7 @@ void ServerFamily::Config(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu
void ServerFamily::Debug(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void ServerFamily::Debug(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) { ConnectionContext* cntx) {
DebugCmd dbg_cmd{this, cntx}; DebugCmd dbg_cmd{this, &service_.cluster_family(), cntx};
return dbg_cmd.Run(args, builder); return dbg_cmd.Run(args, builder);
} }

View file

@ -39,10 +39,6 @@ namespace journal {
class Journal; class Journal;
} // namespace journal } // namespace journal
namespace cluster {
class ClusterFamily;
}
class ConnectionContext; class ConnectionContext;
class CommandRegistry; class CommandRegistry;
class Service; class Service;

View file

@ -13,6 +13,7 @@ from .replication_test import check_all_replicas_finished
from redis.cluster import RedisCluster from redis.cluster import RedisCluster
from redis.cluster import ClusterNode from redis.cluster import ClusterNode
from .proxy import Proxy from .proxy import Proxy
from .seeder import SeederBase
from .seeder import StaticSeeder from .seeder import StaticSeeder
from . import dfly_args from . import dfly_args
@ -152,9 +153,30 @@ async def wait_for_status(admin_client, node_id, status, timeout=10):
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_id "DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_id
) )
if not isinstance(status, list):
status = [status]
async for states, breaker in tick_timer(get_status, timeout=timeout): async for states, breaker in tick_timer(get_status, timeout=timeout):
with breaker: with breaker:
assert len(states) != 0 and all(status == state[2] for state in states), states assert len(states) != 0 and all(state[2] in status for state in states), states
async def wait_for_error(admin_client, node_id, error, timeout=10):
get_status = lambda: admin_client.execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_id
)
async for states, breaker in tick_timer(get_status, timeout=timeout):
with breaker:
assert len(states) != 0 and all(error == state[4] for state in states), states
async def wait_for_migration_start(admin_client, node_id):
while (
len(await admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_id))
== 0
):
await asyncio.sleep(0.1)
async def check_for_no_state_status(admin_clients): async def check_for_no_state_status(admin_clients):
@ -1281,9 +1303,8 @@ async def test_network_disconnect_during_migration(df_factory, df_seeder_factory
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
seeder = df_seeder_factory.create(keys=30000, port=nodes[0].instance.port, cluster_mode=True) await StaticSeeder(key_target=200000).run(nodes[0].client)
start_capture = await StaticSeeder.capture(nodes[0].client)
await seeder.run(target_deviation=0.1)
proxy = Proxy("127.0.0.1", 1111, "127.0.0.1", nodes[1].instance.admin_port) proxy = Proxy("127.0.0.1", 1111, "127.0.0.1", nodes[1].instance.admin_port)
await proxy.start() await proxy.start()
@ -1295,23 +1316,26 @@ async def test_network_disconnect_during_migration(df_factory, df_seeder_factory
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
for _ in range(10): for _ in range(10):
await asyncio.sleep(random.randint(0, 10) / 20) await asyncio.sleep(random.randint(0, 10) / 100)
logging.debug("drop connections") logging.debug("drop connections")
proxy.drop_connection() proxy.drop_connection()
logging.debug( logging.debug(
await nodes[0].admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") await nodes[0].admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
) )
finally: finally:
await wait_for_status(nodes[0].admin_client, nodes[1].id, "SYNC")
await proxy.close(task) await proxy.close(task)
await proxy.start()
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED", 20)
nodes[0].migrations = [] nodes[0].migrations = []
nodes[0].slots = [] nodes[0].slots = []
nodes[1].slots = [(0, 16383)] nodes[1].slots = [(0, 16383)]
logging.debug("remove finished migrations") logging.debug("remove finished migrations")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
capture = await seeder.capture() assert (await StaticSeeder.capture(nodes[1].client)) == start_capture
assert await seeder.compare(capture, nodes[1].instance.port)
@pytest.mark.parametrize( @pytest.mark.parametrize(
@ -2205,7 +2229,7 @@ async def test_replicate_disconnect_redis_cluster(redis_cluster, df_factory, df_
@pytest.mark.skip("Takes more than 10 minutes") @pytest.mark.skip("Takes more than 10 minutes")
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) @dfly_args({"cluster_mode": "yes"})
async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFactory): async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFactory):
# Check data migration from one node to another # Check data migration from one node to another
instances = [ instances = [
@ -2261,3 +2285,63 @@ async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFact
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
await check_for_no_state_status([node.admin_client for node in nodes]) await check_for_no_state_status([node.admin_client for node in nodes])
@pytest.mark.skip("Flaky")
@pytest.mark.asyncio
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_migration_timeout_on_sync(df_factory: DflyInstanceFactory, df_seeder_factory):
# Timeout set to 3 seconds because we must first saturate the socket before we get the timeout
instances = [
df_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
replication_timeout=3000,
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9,streamer=9",
)
for i in range(2)
]
df_factory.start_all(instances)
nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = [(0, 16383)]
nodes[1].slots = []
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
logging.debug("source node DEBUG POPULATE")
await nodes[0].client.execute_command("debug", "populate", "100000", "foo", "5000")
# await StaticSeeder(key_target=200000, data_size=1000).run(nodes[0].client)
start_capture = await StaticSeeder.capture(nodes[0].client)
logging.debug("Start migration")
nodes[0].migrations.append(
MigrationInfo("127.0.0.1", nodes[1].instance.admin_port, [(0, 16383)], nodes[1].id)
)
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
await asyncio.sleep(random.randint(0, 50) / 100)
await wait_for_migration_start(nodes[1].admin_client, nodes[0].id)
logging.debug("debug migration pause")
await nodes[1].client.execute_command("debug migration pause")
await wait_for_error(
nodes[0].admin_client, nodes[1].id, "JournalStreamer write operation timeout"
)
logging.debug("debug migration resume")
await nodes[1].client.execute_command("debug migration resume")
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED")
await wait_for_status(nodes[1].admin_client, nodes[0].id, "FINISHED")
nodes[0].migrations = []
nodes[0].slots = []
nodes[1].slots = [(0, 16383)]
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
assert (await StaticSeeder.capture(nodes[1].client)) == start_capture

View file

@ -154,7 +154,7 @@ class DflyInstance:
async def close_clients(self): async def close_clients(self):
for client in self.clients: for client in self.clients:
await client.aclose() if hasattr(client, "aclose") else client.close() await client.aclose() if hasattr(client, "aclose") else await client.close()
def __enter__(self): def __enter__(self):
self.start() self.start()