feat(server): support partial sync from last master (#5015)

Signed-off-by: adiholden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2025-04-29 12:12:23 +03:00 committed by GitHub
parent 23d310b607
commit 880cd8d1a5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 188 additions and 40 deletions

View file

@ -5,6 +5,7 @@
#include <absl/random/random.h>
#include <absl/strings/str_cat.h>
#include <absl/strings/str_split.h>
#include <absl/strings/strip.h>
#include <limits>
@ -33,6 +34,8 @@
#include "util/fibers/synchronization.h"
using namespace std;
ABSL_FLAG(uint32_t, allow_partial_sync_with_lsn_diff, 0,
"Do partial sync in case lsn diff is less than the given threshold");
ABSL_DECLARE_FLAG(bool, info_replication_valkey_compatible);
ABSL_DECLARE_FLAG(uint32_t, replication_timeout);
@ -99,6 +102,20 @@ bool WaitReplicaFlowToCatchup(absl::Time end_time, const DflyCmd::ReplicaInfo* r
return true;
}
bool IsLSNDiffBellowThreshold(const std::vector<LSN>& lsn_vec1, const std::vector<LSN>& lsn_vec2) {
DCHECK_EQ(lsn_vec1.size(), lsn_vec2.size());
uint32_t allow_diff = absl::GetFlag(FLAGS_allow_partial_sync_with_lsn_diff);
for (size_t i = 0; i < lsn_vec1.size(); ++i) {
uint32_t diff =
lsn_vec1[i] > lsn_vec2[i] ? lsn_vec1[i] - lsn_vec2[i] : lsn_vec2[i] - lsn_vec1[i];
if (diff > allow_diff) {
VLOG(1) << "No partial sync due to diff: " << diff << " allow_diff is: " << allow_diff;
return false;
}
}
return true;
}
} // namespace
void DflyCmd::ReplicaInfo::Cancel() {
@ -140,7 +157,7 @@ void DflyCmd::Run(CmdArgList args, Transaction* tx, facade::RedisReplyBuilder* r
return Thread(args, rb, cntx);
}
if (sub_cmd == "FLOW" && (args.size() == 4 || args.size() == 5)) {
if (sub_cmd == "FLOW" && (args.size() >= 4 && args.size() <= 6)) {
return Flow(args, rb, cntx);
}
@ -233,11 +250,16 @@ void DflyCmd::Flow(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn
string_view flow_id_str = ArgS(args, 3);
std::optional<LSN> seqid;
std::optional<string> last_master_id;
std::optional<string> last_master_lsn;
if (args.size() == 5) {
seqid.emplace();
if (!absl::SimpleAtoi(ArgS(args, 4), &seqid.value())) {
return rb->SendError(facade::kInvalidIntErr);
}
} else if (args.size() == 6) {
last_master_id = ArgS(args, 4);
last_master_lsn = ArgS(args, 5);
}
VLOG(1) << "Got DFLY FLOW master_id: " << master_id << " sync_id: " << sync_id_str
@ -257,6 +279,7 @@ void DflyCmd::Flow(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn
return;
string eof_token;
std::string_view sync_type{"FULL"};
{
util::fb2::LockGuard lk{replica_ptr->shared_mu};
@ -276,17 +299,6 @@ void DflyCmd::Flow(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn
flow.conn = cntx->conn();
flow.eof_token = eof_token;
flow.version = replica_ptr->version;
}
if (!cntx->conn()->Migrate(shard_set->pool()->at(flow_id))) {
// Listener::PreShutdown() triggered
if (cntx->conn()->socket()->IsOpen()) {
return rb->SendError(kInvalidState);
}
return;
}
sf_->journal()->StartInThread();
std::string_view sync_type{"FULL"};
#if 0 // Partial synchronization is disabled
if (seqid.has_value()) {
@ -307,6 +319,42 @@ void DflyCmd::Flow(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn
}
#endif
std::optional<Replica::LastMasterSyncData> data = sf_->GetLastMasterData();
// In this flow the master and the registered replica where synced from the same master.
if (last_master_id && data && data.value().id == last_master_id.value()) {
std::vector<std::string_view> lsn_str_vec = absl::StrSplit(last_master_lsn.value(), '-');
if (lsn_str_vec.size() != data.value().last_journal_LSNs.size()) {
return rb->SendError(facade::kSyntaxErr); // Unexpected flow. LSN vector of same master
// should be the same size on all replicas.
}
std::vector<LSN> lsn_vec;
lsn_vec.reserve(lsn_str_vec.size());
for (string_view lsn_str : lsn_str_vec) {
int64_t value;
if (!absl::SimpleAtoi(lsn_str, &value)) {
return rb->SendError(facade::kInvalidIntErr);
}
lsn_vec.push_back(value);
}
if (IsLSNDiffBellowThreshold(data.value().last_journal_LSNs, lsn_vec)) {
sync_type = "PARTIAL";
flow.start_partial_sync_at = sf_->journal()->GetLsn();
VLOG(1) << "Partial sync requested from LSN=" << flow.start_partial_sync_at.value()
<< " and is available. (current_lsn=" << sf_->journal()->GetLsn() << ")";
}
}
}
if (!cntx->conn()->Migrate(shard_set->pool()->at(flow_id))) {
// Listener::PreShutdown() triggered
if (cntx->conn()->socket()->IsOpen()) {
return rb->SendError(kInvalidState);
}
return;
}
sf_->journal()->StartInThread();
rb->StartArray(2);
rb->SendSimpleString(sync_type);
rb->SendSimpleString(eof_token);

View file

@ -126,18 +126,19 @@ GenericError Replica::Start() {
return {};
}
void Replica::StartMainReplicationFiber() {
sync_fb_ = fb2::Fiber("main_replication", &Replica::MainReplicationFb, this);
void Replica::StartMainReplicationFiber(std::optional<LastMasterSyncData> last_master_sync_data) {
sync_fb_ = fb2::Fiber("main_replication", &Replica::MainReplicationFb, this,
std::move(last_master_sync_data));
}
void Replica::EnableReplication(facade::SinkReplyBuilder* builder) {
VLOG(1) << "Enabling replication";
state_mask_.store(R_ENABLED); // set replica state to enabled
sync_fb_ = MakeFiber(&Replica::MainReplicationFb, this); // call replication fiber
state_mask_.store(R_ENABLED); // set replica state to enabled
sync_fb_ = MakeFiber(&Replica::MainReplicationFb, this, nullopt); // call replication fiber
}
void Replica::Stop() {
std::optional<Replica::LastMasterSyncData> Replica::Stop() {
VLOG(1) << "Stopping replication " << this;
// Stops the loop in MainReplicationFb.
@ -154,6 +155,11 @@ void Replica::Stop() {
for (auto& flow : shard_flows_) {
flow.reset();
}
if (last_journal_LSNs_.has_value()) {
return LastMasterSyncData{master_context_.master_repl_id, last_journal_LSNs_.value()};
}
return nullopt;
}
void Replica::Pause(bool pause) {
@ -183,7 +189,7 @@ std::error_code Replica::TakeOver(std::string_view timeout, bool save_flag) {
return ec;
}
void Replica::MainReplicationFb() {
void Replica::MainReplicationFb(std::optional<LastMasterSyncData> last_master_sync_data) {
VLOG(1) << "Main replication fiber started " << this;
// Switch shard states to replication.
SetShardStates(true);
@ -231,9 +237,9 @@ void Replica::MainReplicationFb() {
// 3. Initiate full sync
if ((state_mask_.load() & R_SYNC_OK) == 0) {
if (HasDflyMaster())
ec = InitiateDflySync();
else
if (HasDflyMaster()) {
ec = InitiateDflySync(std::exchange(last_master_sync_data, nullopt));
} else
ec = InitiatePSync();
if (ec) {
@ -468,7 +474,7 @@ error_code Replica::InitiatePSync() {
}
// Initialize and start sub-replica for each flow.
error_code Replica::InitiateDflySync() {
error_code Replica::InitiateDflySync(std::optional<LastMasterSyncData> last_master_sync_data) {
auto start_time = absl::Now();
// Initialize MultiShardExecution.
@ -530,7 +536,8 @@ error_code Replica::InitiateDflySync() {
auto ec = shard_flows_[id]->StartSyncFlow(sync_block, &exec_st_,
last_journal_LSNs_.has_value()
? std::optional((*last_journal_LSNs_)[id])
: std::nullopt);
: std::nullopt,
last_master_sync_data);
if (ec.has_value())
is_full_sync[id] = ec.value();
else
@ -542,7 +549,7 @@ error_code Replica::InitiateDflySync() {
lock_guard lk{flows_op_mu_};
shard_set->pool()->AwaitFiberOnAll(std::move(shard_cb));
last_journal_LSNs_.reset();
size_t num_full_flows =
std::accumulate(is_full_sync.get(), is_full_sync.get() + num_df_flows, 0);
@ -558,7 +565,6 @@ error_code Replica::InitiateDflySync() {
} else if (num_full_flows == 0) {
sync_type = "partial";
} else {
last_journal_LSNs_.reset();
exec_st_.ReportError(std::make_error_code(errc::state_not_recoverable),
"Won't do a partial sync: some flows must fully resync");
}
@ -734,8 +740,9 @@ error_code Replica::SendNextPhaseRequest(string_view kind) {
return std::error_code{};
}
io::Result<bool> DflyShardReplica::StartSyncFlow(BlockingCounter sb, ExecutionState* cntx,
std::optional<LSN> lsn) {
io::Result<bool> DflyShardReplica::StartSyncFlow(
BlockingCounter sb, ExecutionState* cntx, std::optional<LSN> lsn,
std::optional<Replica::LastMasterSyncData> last_master_data) {
using nonstd::make_unexpected;
DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty());
proactor_index_ = ProactorBase::me()->GetPoolIndex();
@ -746,6 +753,7 @@ io::Result<bool> DflyShardReplica::StartSyncFlow(BlockingCounter sb, ExecutionSt
VLOG(1) << "Sending on flow " << master_context_.master_repl_id << " "
<< master_context_.dfly_session_id << " " << flow_id_;
// DFLY FLOW <master_id> <session_id> <flow_id> [lsn] [last_master_id lsn-vec]
std::string cmd = StrCat("DFLY FLOW ", master_context_.master_repl_id, " ",
master_context_.dfly_session_id, " ", flow_id_);
// Try to negotiate a partial sync if possible.
@ -753,6 +761,12 @@ io::Result<bool> DflyShardReplica::StartSyncFlow(BlockingCounter sb, ExecutionSt
absl::GetFlag(FLAGS_replica_partial_sync)) {
absl::StrAppend(&cmd, " ", *lsn);
}
if (last_master_data && master_context_.version >= DflyVersion::VER5 &&
absl::GetFlag(FLAGS_replica_partial_sync)) {
string lsn_str = absl::StrJoin(last_master_data.value().last_journal_LSNs, "-");
absl::StrAppend(&cmd, " ", last_master_data.value().id, " ", lsn_str);
VLOG(1) << "Sending last master sync flow " << last_master_data.value().id << " " << lsn_str;
}
ResetParser(RedisParser::Mode::CLIENT);
leftover_buf_.emplace(128);

View file

@ -59,14 +59,18 @@ class Replica : ProtocolClient {
// Returns true if initial link with master has been established or
// false if it has failed.
GenericError Start();
void StartMainReplicationFiber();
struct LastMasterSyncData {
std::string id;
std::vector<LSN> last_journal_LSNs; // lsn for each master shard.
};
void StartMainReplicationFiber(std::optional<LastMasterSyncData> data);
// Sets the server state to have replication enabled.
// It is like Start(), but does not attempt to establish
// a connection right-away, but instead lets MainReplicationFb do the work.
void EnableReplication(facade::SinkReplyBuilder* builder);
void Stop(); // thread-safe
std::optional<LastMasterSyncData> Stop(); // thread-safe
void Pause(bool pause);
@ -78,15 +82,15 @@ class Replica : ProtocolClient {
private: /* Main standalone mode functions */
// Coordinate state transitions. Spawned by start.
void MainReplicationFb();
void MainReplicationFb(std::optional<LastMasterSyncData> data);
std::error_code Greet(); // Send PING and REPLCONF.
std::error_code HandleCapaDflyResp();
std::error_code ConfigureDflyMaster();
std::error_code InitiatePSync(); // Redis full sync.
std::error_code InitiateDflySync(); // Dragonfly full sync.
std::error_code InitiatePSync(); // Redis full sync.
std::error_code InitiateDflySync(std::optional<LastMasterSyncData> data); // Dragonfly full sync.
std::error_code ConsumeRedisStream(); // Redis stable state.
std::error_code ConsumeDflyStream(); // Dragonfly stable state.
@ -185,7 +189,8 @@ class DflyShardReplica : public ProtocolClient {
// Start replica initialized as dfly flow.
// Sets is_full_sync when successful.
io::Result<bool> StartSyncFlow(util::fb2::BlockingCounter block, ExecutionState* cntx,
std::optional<LSN>);
std::optional<LSN>,
std::optional<Replica::LastMasterSyncData> data);
// Transition into stable state mode as dfly flow.
std::error_code StartStableSyncFlow(ExecutionState* cntx);

View file

@ -2956,7 +2956,7 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx)
cmd_cntx.rb->SendError(ec.Format());
return;
}
add_replica->StartMainReplicationFiber();
add_replica->StartMainReplicationFiber(nullopt);
cluster_replicas_.push_back(std::move(add_replica));
cmd_cntx.rb->SendOk();
}
@ -2964,6 +2964,7 @@ void ServerFamily::AddReplicaOf(CmdArgList args, const CommandContext& cmd_cntx)
void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ActionOnConnectionFail on_err) {
std::shared_ptr<Replica> new_replica;
std::optional<Replica::LastMasterSyncData> last_master_data;
{
util::fb2::LockGuard lk(replicaof_mu_); // Only one REPLICAOF command can run at a time
@ -2987,7 +2988,7 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
CHECK(replica_);
SetMasterFlagOnAllThreads(true); // Flip flag before clearing replica
replica_->Stop();
last_master_data_ = replica_->Stop();
replica_.reset();
StopAllClusterReplicas();
@ -3000,8 +3001,9 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
}
// If any replication is in progress, stop it, cancellation should kick in immediately
if (replica_)
replica_->Stop();
last_master_data = replica_->Stop();
StopAllClusterReplicas();
// First, switch into the loading state
@ -3057,8 +3059,7 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
// If we are called by "Replicate", tx will be null but we do not need
// to flush anything.
if (on_err == ActionOnConnectionFail::kReturnOnError) {
Drakarys(tx, DbSlice::kDbAll);
new_replica->StartMainReplicationFiber();
new_replica->StartMainReplicationFiber(last_master_data);
}
builder->SendOk();
}
@ -3131,7 +3132,7 @@ void ServerFamily::ReplTakeOver(CmdArgList args, const CommandContext& cmd_cntx)
LOG(INFO) << "Takeover successful, promoting this instance to master.";
SetMasterFlagOnAllThreads(true);
replica_->Stop();
last_master_data_ = replica_->Stop();
replica_.reset();
return builder->SendOk();
}

View file

@ -242,6 +242,10 @@ class ServerFamily {
return dfly_cmd_.get();
}
std::optional<Replica::LastMasterSyncData> GetLastMasterData() const {
return last_master_data_;
}
absl::Span<facade::Listener* const> GetListeners() const {
return listeners_;
}
@ -368,6 +372,7 @@ class ServerFamily {
std::unique_ptr<DflyCmd> dfly_cmd_;
std::string master_replid_;
std::optional<Replica::LastMasterSyncData> last_master_data_;
time_t start_time_ = 0; // in seconds, epoch time.

View file

@ -33,8 +33,11 @@ enum class DflyVersion {
// - Periodic lag checks from master to replica
VER4,
// - Support partial sync from different master
VER5,
// Always points to the latest version
CURRENT_VER = VER4,
CURRENT_VER = VER5,
};
} // namespace dfly

View file

@ -3038,3 +3038,75 @@ async def test_replica_snapshot_with_big_values_while_seeding(df_factory: DflyIn
await wait_available_async(c_node)
assert await c_node.execute_command("dbsize") > 0
await c_node.execute_command("FLUSHALL")
@pytest.mark.parametrize(
"use_takeover, allowed_diff",
[(False, 2), (False, 0), (True, 0)],
)
async def test_partial_replication_on_same_source_master(df_factory, use_takeover, allowed_diff):
master = df_factory.create()
replica1 = df_factory.create(allow_partial_sync_with_lsn_diff=allowed_diff)
replica2 = df_factory.create()
df_factory.start_all([master, replica1, replica2])
c_master = master.client()
c_replica1 = replica1.client()
c_replica2 = replica2.client()
logging.debug("Fill master with test data")
seeder = DebugPopulateSeeder(key_target=50)
await seeder.run(c_master)
logging.debug("Start replication and wait for full sync")
await c_replica1.execute_command(f"REPLICAOF localhost {master.port}")
await wait_for_replicas_state(c_replica1)
await c_replica2.execute_command(f"REPLICAOF localhost {master.port}")
await wait_for_replicas_state(c_replica2)
# Send some traffic
seeder = SeederV2(key_target=8_000)
await seeder.run(c_master, target_deviation=0.01)
# Wait for all journal changes propagate to replicas
await check_all_replicas_finished([c_replica1, c_replica2], c_master)
if use_takeover:
# Promote first replica to master
await c_replica1.execute_command(f"REPLTAKEOVER 5")
else:
# Promote first replica to master
await c_replica1.execute_command(f"REPLICAOF NO ONE")
# Send 2 more commands to be propagated to second replica
# Sending 2 more commands will result in partial sync if allow_partial_sync_with_lsn_diff is equal or higher
await c_master.set("x", "y")
await c_master.set("x", "y")
await check_all_replicas_finished([c_replica2], c_master)
# Start replication with new master
await c_replica2.execute_command(f"REPLICAOF localhost {replica1.port}")
await check_all_replicas_finished([c_replica2], c_replica1)
# Validate data
if use_takeover:
hash1, hash2 = await asyncio.gather(
*(SeederV2.capture(c) for c in (c_replica1, c_replica2))
)
assert hash1 == hash2
# Check we can takeover to the second replica
await c_replica2.execute_command(f"REPLTAKEOVER 5")
replica1.stop()
replica2.stop()
if use_takeover or (allowed_diff > 0 and not use_takeover):
# Check logs for partial replication
lines = replica2.find_in_logs(f"Started partial sync with localhost:{replica1.port}")
assert len(lines) == 1
# Check no full sync logs
lines = replica2.find_in_logs(f"Started full with localhost:{replica1.port}")
assert len(lines) == 0
else:
lines = replica2.find_in_logs(f"Started full with localhost:{replica1.port}")
assert len(lines) == 0
assert len(replica1.find_in_logs("No partial sync due to diff")) > 0