feat server: support partial sync from last master

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adi_holden 2025-04-28 11:58:55 +03:00
parent 6d30baa20b
commit 71ef9b58ee
7 changed files with 138 additions and 40 deletions

View file

@ -140,7 +140,7 @@ void DflyCmd::Run(CmdArgList args, Transaction* tx, facade::RedisReplyBuilder* r
return Thread(args, rb, cntx);
}
if (sub_cmd == "FLOW" && (args.size() == 4 || args.size() == 5)) {
if (sub_cmd == "FLOW" && (args.size() == 4 || args.size() == 5 || args.size() == 6)) {
return Flow(args, rb, cntx);
}
@ -233,11 +233,16 @@ void DflyCmd::Flow(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn
string_view flow_id_str = ArgS(args, 3);
std::optional<LSN> seqid;
std::optional<string> last_master_id;
std::optional<string> last_master_lsn;
if (args.size() == 5) {
seqid.emplace();
if (!absl::SimpleAtoi(ArgS(args, 4), &seqid.value())) {
return rb->SendError(facade::kInvalidIntErr);
}
} else if (args.size() == 6) {
last_master_id = ArgS(args, 4);
last_master_lsn = ArgS(args, 5);
}
VLOG(1) << "Got DFLY FLOW master_id: " << master_id << " sync_id: " << sync_id_str
@ -257,6 +262,7 @@ void DflyCmd::Flow(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn
return;
string eof_token;
std::string_view sync_type{"FULL"};
{
util::fb2::LockGuard lk{replica_ptr->shared_mu};
@ -276,17 +282,6 @@ void DflyCmd::Flow(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn
flow.conn = cntx->conn();
flow.eof_token = eof_token;
flow.version = replica_ptr->version;
}
if (!cntx->conn()->Migrate(shard_set->pool()->at(flow_id))) {
// Listener::PreShutdown() triggered
if (cntx->conn()->socket()->IsOpen()) {
return rb->SendError(kInvalidState);
}
return;
}
sf_->journal()->StartInThread();
std::string_view sync_type{"FULL"};
#if 0 // Partial synchronization is disabled
if (seqid.has_value()) {
@ -307,6 +302,27 @@ void DflyCmd::Flow(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn
}
#endif
std::optional<Replica::LastMasterSyncData> data = sf_->GetLastMasterData();
if (last_master_id.has_value() && data.has_value()) {
string last_master_lsn_str = absl::StrJoin(data.value().last_journal_LSNs, "-");
if (data.value().id == last_master_id.value() && last_master_lsn_str == last_master_lsn) {
sync_type = "PARTIAL";
flow.start_partial_sync_at = sf_->journal()->GetLsn();
VLOG(1) << "Partial sync requested from LSN=" << flow.start_partial_sync_at.value()
<< " and is available. (current_lsn=" << sf_->journal()->GetLsn() << ")";
}
}
}
if (!cntx->conn()->Migrate(shard_set->pool()->at(flow_id))) {
// Listener::PreShutdown() triggered
if (cntx->conn()->socket()->IsOpen()) {
return rb->SendError(kInvalidState);
}
return;
}
sf_->journal()->StartInThread();
rb->StartArray(2);
rb->SendSimpleString(sync_type);
rb->SendSimpleString(eof_token);

View file

@ -126,21 +126,22 @@ GenericError Replica::Start() {
return {};
}
void Replica::StartMainReplicationFiber() {
sync_fb_ = fb2::Fiber("main_replication", &Replica::MainReplicationFb, this);
void Replica::StartMainReplicationFiber(std::optional<LastMasterSyncData> data) {
sync_fb_ = fb2::Fiber("main_replication", &Replica::MainReplicationFb, this, data);
}
void Replica::EnableReplication(facade::SinkReplyBuilder* builder) {
VLOG(1) << "Enabling replication";
state_mask_.store(R_ENABLED); // set replica state to enabled
sync_fb_ = MakeFiber(&Replica::MainReplicationFb, this); // call replication fiber
state_mask_.store(R_ENABLED); // set replica state to enabled
sync_fb_ = MakeFiber(&Replica::MainReplicationFb, this, nullopt); // call replication fiber
}
void Replica::Stop() {
std::optional<Replica::LastMasterSyncData> Replica::Stop() {
VLOG(1) << "Stopping replication " << this;
// Stops the loop in MainReplicationFb.
// bool is_stable_sync = state_mask_.load() & R_SYNC_OK;
proactor_->Await([this] {
state_mask_.store(0); // Specifically ~R_ENABLED.
exec_st_.ReportCancelError(); // Context is fully resposible for cleanup.
@ -154,6 +155,13 @@ void Replica::Stop() {
for (auto& flow : shard_flows_) {
flow.reset();
}
if (last_journal_LSNs_.has_value()) {
LastMasterSyncData data;
data.id = master_context_.master_repl_id;
data.last_journal_LSNs = last_journal_LSNs_.value();
return data;
}
return nullopt;
}
void Replica::Pause(bool pause) {
@ -183,7 +191,7 @@ std::error_code Replica::TakeOver(std::string_view timeout, bool save_flag) {
return ec;
}
void Replica::MainReplicationFb() {
void Replica::MainReplicationFb(std::optional<LastMasterSyncData> data) {
VLOG(1) << "Main replication fiber started " << this;
// Switch shard states to replication.
SetShardStates(true);
@ -231,9 +239,10 @@ void Replica::MainReplicationFb() {
// 3. Initiate full sync
if ((state_mask_.load() & R_SYNC_OK) == 0) {
if (HasDflyMaster())
ec = InitiateDflySync();
else
if (HasDflyMaster()) {
ec = InitiateDflySync(data);
data = nullopt; // use old master data only once.
} else
ec = InitiatePSync();
if (ec) {
@ -468,7 +477,7 @@ error_code Replica::InitiatePSync() {
}
// Initialize and start sub-replica for each flow.
error_code Replica::InitiateDflySync() {
error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> data) {
auto start_time = absl::Now();
// Initialize MultiShardExecution.
@ -530,7 +539,8 @@ error_code Replica::InitiateDflySync() {
auto ec = shard_flows_[id]->StartSyncFlow(sync_block, &exec_st_,
last_journal_LSNs_.has_value()
? std::optional((*last_journal_LSNs_)[id])
: std::nullopt);
: std::nullopt,
data);
if (ec.has_value())
is_full_sync[id] = ec.value();
else
@ -542,7 +552,7 @@ error_code Replica::InitiateDflySync() {
lock_guard lk{flows_op_mu_};
shard_set->pool()->AwaitFiberOnAll(std::move(shard_cb));
last_journal_LSNs_.reset();
size_t num_full_flows =
std::accumulate(is_full_sync.get(), is_full_sync.get() + num_df_flows, 0);
@ -558,7 +568,6 @@ error_code Replica::InitiateDflySync() {
} else if (num_full_flows == 0) {
sync_type = "partial";
} else {
last_journal_LSNs_.reset();
exec_st_.ReportError(std::make_error_code(errc::state_not_recoverable),
"Won't do a partial sync: some flows must fully resync");
}
@ -734,8 +743,9 @@ error_code Replica::SendNextPhaseRequest(string_view kind) {
return std::error_code{};
}
io::Result<bool> DflyShardReplica::StartSyncFlow(BlockingCounter sb, ExecutionState* cntx,
std::optional<LSN> lsn) {
io::Result<bool> DflyShardReplica::StartSyncFlow(
BlockingCounter sb, ExecutionState* cntx, std::optional<LSN> lsn,
std::optional<Replica::LastMasterSyncData> last_master_data) {
using nonstd::make_unexpected;
DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty());
proactor_index_ = ProactorBase::me()->GetPoolIndex();
@ -753,6 +763,12 @@ io::Result<bool> DflyShardReplica::StartSyncFlow(BlockingCounter sb, ExecutionSt
absl::GetFlag(FLAGS_replica_partial_sync)) {
absl::StrAppend(&cmd, " ", *lsn);
}
if (last_master_data.has_value() && master_context_.version >= DflyVersion::VER5 &&
absl::GetFlag(FLAGS_replica_partial_sync)) {
string lsn_str = absl::StrJoin(last_master_data.value().last_journal_LSNs, "-");
absl::StrAppend(&cmd, " ", last_master_data.value().id, " ", lsn_str);
VLOG(1) << "Sending last master sync flow " << last_master_data.value().id << " " << lsn_str;
}
ResetParser(RedisParser::Mode::CLIENT);
leftover_buf_.emplace(128);

View file

@ -59,14 +59,18 @@ class Replica : ProtocolClient {
// Returns true if initial link with master has been established or
// false if it has failed.
GenericError Start();
void StartMainReplicationFiber();
struct LastMasterSyncData {
std::string id;
std::vector<LSN> last_journal_LSNs;
};
void StartMainReplicationFiber(std::optional<LastMasterSyncData> data);
// Sets the server state to have replication enabled.
// It is like Start(), but does not attempt to establish
// a connection right-away, but instead lets MainReplicationFb do the work.
void EnableReplication(facade::SinkReplyBuilder* builder);
void Stop(); // thread-safe
std::optional<LastMasterSyncData> Stop(); // thread-safe
void Pause(bool pause);
@ -78,15 +82,15 @@ class Replica : ProtocolClient {
private: /* Main standalone mode functions */
// Coordinate state transitions. Spawned by start.
void MainReplicationFb();
void MainReplicationFb(std::optional<LastMasterSyncData> data);
std::error_code Greet(); // Send PING and REPLCONF.
std::error_code HandleCapaDflyResp();
std::error_code ConfigureDflyMaster();
std::error_code InitiatePSync(); // Redis full sync.
std::error_code InitiateDflySync(); // Dragonfly full sync.
std::error_code InitiatePSync(); // Redis full sync.
std::error_code InitiateDflySync(std::optional<LastMasterSyncData> data); // Dragonfly full sync.
std::error_code ConsumeRedisStream(); // Redis stable state.
std::error_code ConsumeDflyStream(); // Dragonfly stable state.
@ -185,7 +189,8 @@ class DflyShardReplica : public ProtocolClient {
// Start replica initialized as dfly flow.
// Sets is_full_sync when successful.
io::Result<bool> StartSyncFlow(util::fb2::BlockingCounter block, ExecutionState* cntx,
std::optional<LSN>);
std::optional<LSN>,
std::optional<Replica::LastMasterSyncData> data);
// Transition into stable state mode as dfly flow.
std::error_code StartStableSyncFlow(ExecutionState* cntx);

View file

@ -2953,7 +2953,7 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx)
cmd_cntx.rb->SendError(ec.Format());
return;
}
add_replica->StartMainReplicationFiber();
add_replica->StartMainReplicationFiber(nullopt);
cluster_replicas_.push_back(std::move(add_replica));
cmd_cntx.rb->SendOk();
}
@ -2961,6 +2961,7 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx)
void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ActionOnConnectionFail on_err) {
std::shared_ptr<Replica> new_replica;
std::optional<Replica::LastMasterSyncData> last_master_data;
{
util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time
@ -2984,7 +2985,7 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
CHECK(replica_);
SetMasterFlagOnAllThreads(true); // Flip flag before clearing replica
replica_->Stop();
last_master_data_ = replica_->Stop();
replica_.reset();
StopAllClusterReplicas();
@ -2997,8 +2998,9 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
}
// If any replication is in progress, stop it, cancellation should kick in immediately
if (replica_)
replica_->Stop();
last_master_data = replica_->Stop();
StopAllClusterReplicas();
// First, switch into the loading state
@ -3054,8 +3056,8 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
// If we are called by "Replicate", tx will be null but we do not need
// to flush anything.
if (on_err == ActionOnConnectionFail::kReturnOnError) {
Drakarys(tx, DbSlice::kDbAll);
new_replica->StartMainReplicationFiber();
// Drakarys(tx, DbSlice::kDbAll);
new_replica->StartMainReplicationFiber(last_master_data);
}
builder->SendOk();
}
@ -3128,7 +3130,7 @@ void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx)
LOG(INFO) << "Takeover successful, promoting this instance to master.";
SetMasterFlagOnAllThreads(true);
replica_->Stop();
last_master_data_ = replica_->Stop();
replica_.reset();
return builder->SendOk();
}

View file

@ -242,6 +242,10 @@ class ServerFamily {
return dfly_cmd_.get();
}
std::optional<Replica::LastMasterSyncData> GetLastMasterData() const {
return last_master_data_;
}
absl::Span<facade::Listener* const> GetListeners() const {
return listeners_;
}
@ -368,6 +372,7 @@ class ServerFamily {
std::unique_ptr<DflyCmd> dfly_cmd_;
std::string master_replid_;
std::optional<Replica::LastMasterSyncData> last_master_data_;
time_t start_time_ = 0; // in seconds, epoch time.

View file

@ -33,8 +33,11 @@ enum class DflyVersion {
// - Periodic lag checks from master to replica
VER4,
// - Support partial sync from different master
VER5,
// Always points to the latest version
CURRENT_VER = VER4,
CURRENT_VER = VER5,
};
} // namespace dfly

View file

@ -3038,3 +3038,54 @@ async def test_replica_snapshot_with_big_values_while_seeding(df_factory: DflyIn
await wait_available_async(c_node)
assert await c_node.execute_command("dbsize") > 0
await c_node.execute_command("FLUSHALL")
@pytest.mark.parametrize("use_takeover", [False, True])
async def test_partial_replication_on_same_source_master(df_factory, use_takeover):
master = df_factory.create()
replica1 = df_factory.create()
replica2 = df_factory.create()
df_factory.start_all([master, replica1, replica2])
c_master = master.client()
c_replica1 = replica1.client()
c_replica2 = replica2.client()
logging.debug("Fill master with test data")
seeder = DebugPopulateSeeder(key_target=50)
await seeder.run(c_master)
logging.debug("Start replication and wait for full sync")
await c_replica1.execute_command(f"REPLICAOF localhost {master.port}")
await wait_for_replicas_state(c_replica1)
await c_replica2.execute_command(f"REPLICAOF localhost {master.port}")
await wait_for_replicas_state(c_replica2)
# Send some traffic
seeder = SeederV2(key_target=8_000)
await seeder.run(c_master, target_deviation=0.01)
# Wait for all journal changes propaget to replicas
await check_all_replicas_finished([c_replica1, c_replica2], c_master)
# Promote first replica to master
if use_takeover:
await c_replica1.execute_command(f"REPLTAKEOVER 5")
else:
await c_replica1.execute_command(f"REPLICAOF NO ONE")
# Start replication with new master
await c_replica2.execute_command(f"REPLICAOF localhost {replica1.port}")
await check_all_replicas_finished([c_replica2], c_replica1)
# Validate data
hash1, hash2 = await asyncio.gather(*(SeederV2.capture(c) for c in (c_replica1, c_replica2)))
assert hash1 == hash2
replica2.stop()
# Check logs for partial replication
lines = replica2.find_in_logs(f"Started partial sync with localhost:{replica1.port}")
assert len(lines) == 1
# Check no full sync logs
lines = replica2.find_in_logs(f"Started partial full with localhost:{replica1.port}")
assert len(lines) == 0