refactoring

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adi_holden 2025-04-28 16:40:08 +03:00
parent f04f00ef5a
commit 18fa077f50
2 changed files with 14 additions and 17 deletions

View file

@ -102,6 +102,19 @@ bool WaitReplicaFlowToCatchup(absl::Time end_time, const DflyCmd::ReplicaInfo* r
return true;
}
bool IsLSNDiffBellowThreshold(const std::vector<LSN>& lsn_vec1, const std::vector<LSN>& lsn_vec2) {
uint32_t allow_diff = absl::GetFlag(FLAGS_allow_partial_sync_with_lsn_diff);
for (size_t i = 0; i < lsn_vec1.size(); ++i) {
uint32_t diff =
lsn_vec1[i] > lsn_vec2[i] ? lsn_vec1[i] - lsn_vec2[i] : lsn_vec2[i] - lsn_vec1[i];
if (diff > allow_diff) {
VLOG(1) << "No partial sync due to diff: " << diff << " allow_diff is: " << allow_diff;
return false;
}
}
return true;
}
} // namespace
void DflyCmd::ReplicaInfo::Cancel() {
@ -323,22 +336,7 @@ void DflyCmd::Flow(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn
lsn_vec.push_back(value);
}
bool partial = true;
uint32_t allow_diff = absl::GetFlag(FLAGS_allow_partial_sync_with_lsn_diff);
for (size_t i = 0; i < lsn_vec.size(); ++i) {
uint32_t diff = lsn_vec[i] > data.value().last_journal_LSNs[i]
? lsn_vec[i] - data.value().last_journal_LSNs[i]
: data.value().last_journal_LSNs[i] - lsn_vec[i];
VLOG(1) << "diff is: " << diff;
if (diff > allow_diff) {
VLOG(1) << "No partial sync due to diff: " << diff
<< " replica_lsn_vec:" << last_master_lsn.value()
<< " my lsn vec: " << absl::StrJoin(data.value().last_journal_LSNs, " ");
partial = false;
break;
}
}
if (partial) {
if (IsLSNDiffBellowThreshold(data.value().last_journal_LSNs, lsn_vec)) {
sync_type = "PARTIAL";
flow.start_partial_sync_at = sf_->journal()->GetLsn();
VLOG(1) << "Partial sync requested from LSN=" << flow.start_partial_sync_at.value()

View file

@ -3056,7 +3056,6 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReply
// If we are called by "Replicate", tx will be null but we do not need
// to flush anything.
if (on_err == ActionOnConnectionFail::kReturnOnError) {
// Drakarys(tx, DbSlice::kDbAll);
new_replica->StartMainReplicationFiber(last_master_data);
}
builder->SendOk();