diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 7c1ba2cf1..2d9a28c73 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -370,8 +370,11 @@ void DflyCmd::StartStable(CmdArgList args, Transaction* tx, RedisReplyBuilder* r auto cb = [this, &status, replica_ptr = replica_ptr](EngineShard* shard) { FlowInfo* flow = &replica_ptr->flows[shard->shard_id()]; - StopFullSyncInThread(flow, &replica_ptr->cntx, shard); - status = StartStableSyncInThread(flow, &replica_ptr->cntx, shard); + status = StopFullSyncInThread(flow, &replica_ptr->cntx, shard); + if (*status != OpStatus::OK) { + return; + } + StartStableSyncInThread(flow, &replica_ptr->cntx, shard); }; shard_set->RunBlockingInParallel(std::move(cb)); @@ -590,26 +593,27 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha return OpStatus::OK; } -void DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { +OpStatus DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { DCHECK(shard); error_code ec = flow->saver->StopFullSyncInShard(shard); if (ec) { cntx->ReportError(ec); - return; + return OpStatus::CANCELLED; } ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token)); if (ec) { cntx->ReportError(ec); - return; + return OpStatus::CANCELLED; } // Reset cleanup and saver flow->cleanup = []() {}; flow->saver.reset(); + return OpStatus::OK; } -OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { +void DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { // Create streamer for shard flows. DCHECK(shard); DCHECK(flow->conn); @@ -625,8 +629,6 @@ OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineS flow->streamer->Cancel(); } }; - - return OpStatus::OK; } auto DflyCmd::CreateSyncSession(ConnectionState* state) -> std::pair { diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index f67d7fee1..328f2b13c 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -209,10 +209,10 @@ class DflyCmd { facade::OpStatus StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard); // Stop full sync in thread. Run state switch cleanup. - void StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard); + facade::OpStatus StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard); // Start stable sync in thread. Called for each flow. - facade::OpStatus StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard); + void StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard); // Get ReplicaInfo by sync_id. std::shared_ptr GetReplicaInfo(uint32_t sync_id) ABSL_LOCKS_EXCLUDED(mu_); diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 77cec6d9e..552624006 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -48,6 +48,7 @@ SliceSnapshot::SliceSnapshot(DbSlice* slice, CompressionMode compression_mode, } SliceSnapshot::~SliceSnapshot() { + DCHECK(db_slice_->shard_owner()->IsMyThread()); tl_slice_snapshots.erase(this); }