bug(replication): snapshot cleanup fix in transition to stable sync (#4211)

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2024-11-28 08:38:36 +02:00 committed by GitHub
parent 369899a40a
commit 90b4fea0d9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 13 additions and 10 deletions

View file

@ -370,8 +370,11 @@ void DflyCmd::StartStable(CmdArgList args, Transaction* tx, RedisReplyBuilder* r
auto cb = [this, &status, replica_ptr = replica_ptr](EngineShard* shard) {
FlowInfo* flow = &replica_ptr->flows[shard->shard_id()];
StopFullSyncInThread(flow, &replica_ptr->cntx, shard);
status = StartStableSyncInThread(flow, &replica_ptr->cntx, shard);
status = StopFullSyncInThread(flow, &replica_ptr->cntx, shard);
if (*status != OpStatus::OK) {
return;
}
StartStableSyncInThread(flow, &replica_ptr->cntx, shard);
};
shard_set->RunBlockingInParallel(std::move(cb));
@ -590,26 +593,27 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
return OpStatus::OK;
}
void DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
OpStatus DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
DCHECK(shard);
error_code ec = flow->saver->StopFullSyncInShard(shard);
if (ec) {
cntx->ReportError(ec);
return;
return OpStatus::CANCELLED;
}
ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token));
if (ec) {
cntx->ReportError(ec);
return;
return OpStatus::CANCELLED;
}
// Reset cleanup and saver
flow->cleanup = []() {};
flow->saver.reset();
return OpStatus::OK;
}
OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
void DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
// Create streamer for shard flows.
DCHECK(shard);
DCHECK(flow->conn);
@ -625,8 +629,6 @@ OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineS
flow->streamer->Cancel();
}
};
return OpStatus::OK;
}
auto DflyCmd::CreateSyncSession(ConnectionState* state) -> std::pair<uint32_t, unsigned> {

View file

@ -209,10 +209,10 @@ class DflyCmd {
facade::OpStatus StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);
// Stop full sync in thread. Run state switch cleanup.
void StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);
facade::OpStatus StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);
// Start stable sync in thread. Called for each flow.
facade::OpStatus StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);
void StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);
// Get ReplicaInfo by sync_id.
std::shared_ptr<ReplicaInfo> GetReplicaInfo(uint32_t sync_id) ABSL_LOCKS_EXCLUDED(mu_);

View file

@ -48,6 +48,7 @@ SliceSnapshot::SliceSnapshot(DbSlice* slice, CompressionMode compression_mode,
}
SliceSnapshot::~SliceSnapshot() {
DCHECK(db_slice_->shard_owner()->IsMyThread());
tl_slice_snapshots.erase(this);
}