From 33584b9e9699ca0f352f587910ce9fc72a6f84a5 Mon Sep 17 00:00:00 2001 From: Borys Date: Thu, 6 Feb 2025 08:47:20 +0200 Subject: [PATCH] refactor: rename Context into ExecutionState (#4562) --- src/server/cluster/incoming_slot_migration.cc | 6 ++--- src/server/cluster/incoming_slot_migration.h | 2 +- src/server/common.cc | 16 +++++------ src/server/common.h | 18 +++++++------ src/server/detail/save_stages_controller.h | 2 +- src/server/dflycmd.cc | 8 +++--- src/server/dflycmd.h | 10 +++---- src/server/journal/streamer.cc | 4 +-- src/server/journal/streamer.h | 7 ++--- src/server/journal/tx_executor.cc | 3 ++- src/server/journal/tx_executor.h | 2 +- src/server/protocol_client.cc | 2 +- src/server/protocol_client.h | 5 ++-- src/server/rdb_save.cc | 27 ++++++++++--------- src/server/rdb_save.h | 6 ++--- src/server/replica.cc | 19 ++++++------- src/server/replica.h | 13 ++++----- src/server/snapshot.cc | 2 +- src/server/snapshot.h | 6 ++--- 19 files changed, 84 insertions(+), 74 deletions(-) diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index 0ca02598b..2d7fce99e 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -42,7 +42,7 @@ class ClusterShardMigration { pause_ = pause; } - void Start(Context* cntx, util::FiberSocketBase* source) ABSL_LOCKS_EXCLUDED(mu_) { + void Start(ExecutionState* cntx, util::FiberSocketBase* source) ABSL_LOCKS_EXCLUDED(mu_) { { util::fb2::LockGuard lk(mu_); if (is_finished_) { @@ -125,7 +125,7 @@ class ClusterShardMigration { } private: - void ExecuteTx(TransactionData&& tx_data, Context* cntx) { + void ExecuteTx(TransactionData&& tx_data, ExecutionState* cntx) { if (cntx->IsCancelled()) { return; } @@ -205,7 +205,7 @@ bool IncomingSlotMigration::Join(long attempt) { void IncomingSlotMigration::Stop() { string_view log_state = state_.load() == MigrationState::C_FINISHED ? "Finishing" : "Cancelling"; LOG(INFO) << log_state << " incoming migration of slots " << slots_.ToString(); - cntx_.Cancel(); + cntx_.ReportCancelError(); for (auto& flow : shard_flows_) { if (auto err = flow->Cancel(); err) { diff --git a/src/server/cluster/incoming_slot_migration.h b/src/server/cluster/incoming_slot_migration.h index d8af4fce8..2f97f4a03 100644 --- a/src/server/cluster/incoming_slot_migration.h +++ b/src/server/cluster/incoming_slot_migration.h @@ -67,7 +67,7 @@ class IncomingSlotMigration { std::vector> shard_flows_; SlotRanges slots_; std::atomic state_ = MigrationState::C_CONNECTING; - Context cntx_; + ExecutionState cntx_; mutable util::fb2::Mutex error_mu_; dfly::GenericError last_error_ ABSL_GUARDED_BY(error_mu_); diff --git a/src/server/common.cc b/src/server/common.cc index e84bc5d44..7285ba5b1 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -339,25 +339,25 @@ std::string GenericError::Format() const { return absl::StrCat(ec_.message(), ": ", details_); } -Context::~Context() { +ExecutionState::~ExecutionState() { DCHECK(!err_handler_fb_.IsJoinable()); err_handler_fb_.JoinIfNeeded(); } -GenericError Context::GetError() const { +GenericError ExecutionState::GetError() const { std::lock_guard lk(err_mu_); return err_; } -const Cancellation* Context::GetCancellation() const { +const Cancellation* ExecutionState::GetCancellation() const { return this; } -void Context::Cancel() { +void ExecutionState::ReportCancelError() { ReportError(std::make_error_code(errc::operation_canceled), "Context cancelled"); } -void Context::Reset(ErrHandler handler) { +void ExecutionState::Reset(ErrHandler handler) { fb2::Fiber fb; unique_lock lk{err_mu_}; @@ -369,7 +369,7 @@ void Context::Reset(ErrHandler handler) { fb.JoinIfNeeded(); } -GenericError Context::SwitchErrorHandler(ErrHandler handler) { +GenericError ExecutionState::SwitchErrorHandler(ErrHandler handler) { std::lock_guard lk{err_mu_}; if (!err_) { // No need to check for the error handler - it can't be running @@ -379,7 +379,7 @@ GenericError Context::SwitchErrorHandler(ErrHandler handler) { return err_; } -void Context::JoinErrorHandler() { +void ExecutionState::JoinErrorHandler() { fb2::Fiber fb; unique_lock lk{err_mu_}; fb.swap(err_handler_fb_); @@ -387,7 +387,7 @@ void Context::JoinErrorHandler() { fb.JoinIfNeeded(); } -GenericError Context::ReportErrorInternal(GenericError&& err) { +GenericError ExecutionState::ReportErrorInternal(GenericError&& err) { lock_guard lk{err_mu_}; if (err_) return err_; diff --git a/src/server/common.h b/src/server/common.h index 28bfd52e4..f6aec672d 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -246,25 +246,27 @@ class GenericError { // Thread safe utility to store the first non null generic error. using AggregateGenericError = AggregateValue; -// Context is a utility for managing error reporting and cancellation for complex tasks. +// ExecutionState is a utility for managing error reporting and cancellation for complex tasks. // // When submitting an error with `Error`, only the first is stored (as in aggregate values). -// Then a special error handler is run, if present, and the context is cancelled. The error handler -// is run in a separate handler to free up the caller. +// Then a special error handler is run, if present, and the ExecutionState is cancelled. The error +// handler is run in a separate handler to free up the caller. // // Manual cancellation with `Cancel` is simulated by reporting an `errc::operation_canceled` error. // This allows running the error handler and representing this scenario as an error. -class Context : protected Cancellation { +class ExecutionState : protected Cancellation { public: using ErrHandler = std::function; - Context() = default; - Context(ErrHandler err_handler) : Cancellation{}, err_{}, err_handler_{std::move(err_handler)} { + ExecutionState() = default; + ExecutionState(ErrHandler err_handler) + : Cancellation{}, err_{}, err_handler_{std::move(err_handler)} { } - ~Context(); + ~ExecutionState(); - void Cancel(); // Cancels the context by submitting an `errc::operation_canceled` error. + void + ReportCancelError(); // Cancels the context by submitting an `errc::operation_canceled` error. using Cancellation::IsCancelled; const Cancellation* GetCancellation() const; diff --git a/src/server/detail/save_stages_controller.h b/src/server/detail/save_stages_controller.h index 7446dffc3..abf8bd6bd 100644 --- a/src/server/detail/save_stages_controller.h +++ b/src/server/detail/save_stages_controller.h @@ -72,7 +72,7 @@ class RdbSnapshot { unique_ptr saver_; RdbTypeFreqMap freq_map_; - Context cntx_{}; + ExecutionState cntx_{}; }; struct SaveStagesController : public SaveStagesInputs { diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index f15b4e5e9..302e6bc22 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -111,7 +111,7 @@ void DflyCmd::ReplicaInfo::Cancel() { // Update state and cancel context. replica_state = SyncState::CANCELLED; - cntx.Cancel(); + cntx.ReportCancelError(); // Wait for tasks to finish. shard_set->RunBlockingInParallel([this](EngineShard* shard) { VLOG(2) << "Disconnecting flow " << shard->shard_id(); @@ -548,7 +548,7 @@ void DflyCmd::Load(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cn rb->SendOk(); } -OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { +OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, ExecutionState* cntx, EngineShard* shard) { DCHECK(shard); DCHECK(flow->conn); @@ -589,7 +589,7 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha return OpStatus::OK; } -OpStatus DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { +OpStatus DflyCmd::StopFullSyncInThread(FlowInfo* flow, ExecutionState* cntx, EngineShard* shard) { DCHECK(shard); error_code ec = flow->saver->StopFullSyncInShard(shard); @@ -610,7 +610,7 @@ OpStatus DflyCmd::StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShar return OpStatus::OK; } -void DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { +void DflyCmd::StartStableSyncInThread(FlowInfo* flow, ExecutionState* cntx, EngineShard* shard) { // Create streamer for shard flows. DCHECK(shard); DCHECK(flow->conn); diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index 4a8129e7d..80ac570e1 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -103,7 +103,7 @@ class DflyCmd { // Stores information related to a single replica. struct ABSL_LOCKABLE ReplicaInfo { ReplicaInfo(unsigned flow_count, std::string address, uint32_t listening_port, - Context::ErrHandler err_handler) + ExecutionState::ErrHandler err_handler) : replica_state{SyncState::PREPARATION}, cntx{std::move(err_handler)}, address{std::move(address)}, @@ -115,7 +115,7 @@ class DflyCmd { void Cancel(); SyncState replica_state; // always guarded by shared_mu - Context cntx; + ExecutionState cntx; std::string id; std::string address; @@ -198,13 +198,13 @@ class DflyCmd { void Load(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cntx); // Start full sync in thread. Start FullSyncFb. Called for each flow. - facade::OpStatus StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard); + facade::OpStatus StartFullSyncInThread(FlowInfo* flow, ExecutionState* cntx, EngineShard* shard); // Stop full sync in thread. Run state switch cleanup. - facade::OpStatus StopFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard); + facade::OpStatus StopFullSyncInThread(FlowInfo* flow, ExecutionState* cntx, EngineShard* shard); // Start stable sync in thread. Called for each flow. - void StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard); + void StartStableSyncInThread(FlowInfo* flow, ExecutionState* cntx, EngineShard* shard); // Get ReplicaInfo by sync_id. std::shared_ptr GetReplicaInfo(uint32_t sync_id) ABSL_LOCKS_EXCLUDED(mu_); diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 6cfacaa80..18210f1b7 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -35,7 +35,7 @@ uint32_t replication_stream_output_limit_cached = 64_KB; } // namespace -JournalStreamer::JournalStreamer(journal::Journal* journal, Context* cntx) +JournalStreamer::JournalStreamer(journal::Journal* journal, ExecutionState* cntx) : cntx_(cntx), journal_(journal) { // cache the flag to avoid accessing it later. replication_stream_output_limit_cached = absl::GetFlag(FLAGS_replication_stream_output_limit); @@ -181,7 +181,7 @@ bool JournalStreamer::IsStalled() const { } RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal, - Context* cntx) + ExecutionState* cntx) : JournalStreamer(journal, cntx), db_slice_(slice), my_slots_(std::move(slots)) { DCHECK(slice != nullptr); db_array_ = slice->databases(); // Inc ref to make sure DB isn't deleted while we use it diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index e46713dd3..73bc2a834 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -19,7 +19,7 @@ namespace dfly { // journal listener and writes them to a destination sink in a separate fiber. class JournalStreamer { public: - JournalStreamer(journal::Journal* journal, Context* cntx); + JournalStreamer(journal::Journal* journal, ExecutionState* cntx); virtual ~JournalStreamer(); // Self referential. @@ -53,7 +53,7 @@ class JournalStreamer { void WaitForInflightToComplete(); util::FiberSocketBase* dest_ = nullptr; - Context* cntx_; + ExecutionState* cntx_; private: void AsyncWrite(); @@ -79,7 +79,8 @@ class JournalStreamer { // Only handles relevant slots, while ignoring all others. class RestoreStreamer : public JournalStreamer { public: - RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal, Context* cntx); + RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal, + ExecutionState* cntx); ~RestoreStreamer() override; void Start(util::FiberSocketBase* dest, bool send_lsn = false) override; diff --git a/src/server/journal/tx_executor.cc b/src/server/journal/tx_executor.cc index f163f6973..33a54cbce 100644 --- a/src/server/journal/tx_executor.cc +++ b/src/server/journal/tx_executor.cc @@ -90,7 +90,8 @@ TransactionData TransactionData::FromEntry(journal::ParsedEntry&& entry) { return data; } -std::optional TransactionReader::NextTxData(JournalReader* reader, Context* cntx) { +std::optional TransactionReader::NextTxData(JournalReader* reader, + ExecutionState* cntx) { io::Result res; if (res = reader->ReadEntry(); !res) { cntx->ReportError(res.error()); diff --git a/src/server/journal/tx_executor.h b/src/server/journal/tx_executor.h index 4ed9d2ec9..633d7e880 100644 --- a/src/server/journal/tx_executor.h +++ b/src/server/journal/tx_executor.h @@ -59,7 +59,7 @@ struct TransactionData { struct TransactionReader { TransactionReader(std::optional lsn = std::nullopt) : lsn_(lsn) { } - std::optional NextTxData(JournalReader* reader, Context* cntx); + std::optional NextTxData(JournalReader* reader, ExecutionState* cntx); private: std::optional lsn_ = 0; diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index aeb46e69c..719ee02b1 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -188,7 +188,7 @@ error_code ProtocolClient::ResolveHostDns() { } error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms, - Context* cntx) { + ExecutionState* cntx) { ProactorBase* mythread = ProactorBase::me(); CHECK(mythread); { diff --git a/src/server/protocol_client.h b/src/server/protocol_client.h index c5a46dffa..642b2a2a1 100644 --- a/src/server/protocol_client.h +++ b/src/server/protocol_client.h @@ -60,7 +60,8 @@ class ProtocolClient { std::error_code ResolveHostDns(); // Connect to master and authenticate if needed. - std::error_code ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms, Context* cntx); + std::error_code ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms, + ExecutionState* cntx); void DefaultErrorHandler(const GenericError& err); @@ -121,7 +122,7 @@ class ProtocolClient { util::fb2::Mutex sock_mu_; protected: - Context cntx_; // context for tasks in replica. + ExecutionState cntx_; // context for tasks in replica. std::string last_cmd_; std::string last_resp_; diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 8ae8d5289..18bf45448 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -49,9 +49,10 @@ ABSL_FLAG(dfly::CompressionMode, compression_mode, dfly::CompressionMode::MULTI_ "set 2 for multi entry zstd compression on df snapshot and single entry on rdb snapshot," "set 3 for multi entry lz4 compression on df snapshot and single entry on rdb snapshot"); -ABSL_RETIRED_FLAG(bool, list_rdb_encode_v2, true, - "V2 rdb encoding of list uses listpack encoding format, compatible with redis 7. V1 rdb " - "enconding of list uses ziplist encoding compatible with redis 6"); +ABSL_RETIRED_FLAG( + bool, list_rdb_encode_v2, true, + "V2 rdb encoding of list uses listpack encoding format, compatible with redis 7. V1 rdb " + "enconding of list uses ziplist encoding compatible with redis 6"); // TODO: to retire this flag in v1.31 ABSL_FLAG(bool, stream_rdb_encode_v2, true, @@ -1048,14 +1049,14 @@ class RdbSaver::Impl final : public SliceSnapshot::SnapshotDataConsumerInterface ~Impl(); - void StartSnapshotting(bool stream_journal, Context* cntx, EngineShard* shard); - void StartIncrementalSnapshotting(LSN start_lsn, Context* cntx, EngineShard* shard); + void StartSnapshotting(bool stream_journal, ExecutionState* cntx, EngineShard* shard); + void StartIncrementalSnapshotting(LSN start_lsn, ExecutionState* cntx, EngineShard* shard); void StopSnapshotting(EngineShard* shard); void WaitForSnapshottingFinish(EngineShard* shard); // Pushes snapshot data. Called from SliceSnapshot - void ConsumeData(std::string data, Context* cntx) override; + void ConsumeData(std::string data, ExecutionState* cntx) override; // Finalizes the snapshot writing. Called from SliceSnapshot void Finalize() override; @@ -1226,7 +1227,8 @@ error_code RdbSaver::Impl::WriteRecord(io::Bytes src) { return ec; } -void RdbSaver::Impl::StartSnapshotting(bool stream_journal, Context* cntx, EngineShard* shard) { +void RdbSaver::Impl::StartSnapshotting(bool stream_journal, ExecutionState* cntx, + EngineShard* shard) { auto& s = GetSnapshot(shard); auto& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()); @@ -1238,7 +1240,7 @@ void RdbSaver::Impl::StartSnapshotting(bool stream_journal, Context* cntx, Engin s->Start(stream_journal, allow_flush); } -void RdbSaver::Impl::StartIncrementalSnapshotting(LSN start_lsn, Context* cntx, +void RdbSaver::Impl::StartIncrementalSnapshotting(LSN start_lsn, ExecutionState* cntx, EngineShard* shard) { auto& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()); auto& s = GetSnapshot(shard); @@ -1255,7 +1257,7 @@ void RdbSaver::Impl::WaitForSnapshottingFinish(EngineShard* shard) { snapshot->WaitSnapshotting(); } -void RdbSaver::Impl::ConsumeData(std::string data, Context* cntx) { +void RdbSaver::Impl::ConsumeData(std::string data, ExecutionState* cntx) { if (cntx->IsCancelled()) { return; } @@ -1429,11 +1431,12 @@ RdbSaver::~RdbSaver() { tlocal->DecommitMemory(ServerState::kAllMemory); } -void RdbSaver::StartSnapshotInShard(bool stream_journal, Context* cntx, EngineShard* shard) { +void RdbSaver::StartSnapshotInShard(bool stream_journal, ExecutionState* cntx, EngineShard* shard) { impl_->StartSnapshotting(stream_journal, cntx, shard); } -void RdbSaver::StartIncrementalSnapshotInShard(LSN start_lsn, Context* cntx, EngineShard* shard) { +void RdbSaver::StartIncrementalSnapshotInShard(LSN start_lsn, ExecutionState* cntx, + EngineShard* shard) { impl_->StartIncrementalSnapshotting(start_lsn, cntx, shard); } @@ -1460,7 +1463,7 @@ error_code RdbSaver::SaveHeader(const GlobalData& glob_state) { return error_code{}; } -error_code RdbSaver::SaveBody(const Context& cntx) { +error_code RdbSaver::SaveBody(const ExecutionState& cntx) { RETURN_ON_ERR(impl_->FlushSerializer()); if (save_mode_ == SaveMode::RDB) { diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 7f3bb703b..e2dd71d11 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -91,10 +91,10 @@ class RdbSaver { // Initiates the serialization in the shard's thread. // cll allows breaking in the middle. - void StartSnapshotInShard(bool stream_journal, Context* cntx, EngineShard* shard); + void StartSnapshotInShard(bool stream_journal, ExecutionState* cntx, EngineShard* shard); // Send only the incremental snapshot since start_lsn. - void StartIncrementalSnapshotInShard(LSN start_lsn, Context* cntx, EngineShard* shard); + void StartIncrementalSnapshotInShard(LSN start_lsn, ExecutionState* cntx, EngineShard* shard); // Stops full-sync serialization for replication in the shard's thread. std::error_code StopFullSyncInShard(EngineShard* shard); @@ -107,7 +107,7 @@ class RdbSaver { // Writes the RDB file into sink. Waits for the serialization to finish. // Called only for save rdb flow and save df on summary file. - std::error_code SaveBody(const Context& cntx); + std::error_code SaveBody(const ExecutionState& cntx); // Fills freq_map with the histogram of rdb types. void FillFreqMap(RdbTypeFreqMap* freq_map); diff --git a/src/server/replica.cc b/src/server/replica.cc index 5a627777d..ba13db531 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -98,7 +98,7 @@ error_code Replica::Start(facade::SinkReplyBuilder* builder) { } if (ec) { builder->SendError(absl::StrCat(msg, ec.message())); - cntx_.Cancel(); + cntx_.ReportCancelError(); } return ec; }; @@ -145,8 +145,8 @@ void Replica::Stop() { // Stops the loop in MainReplicationFb. proactor_->Await([this] { - state_mask_.store(0); // Specifically ~R_ENABLED. - cntx_.Cancel(); // Context is fully resposible for cleanup. + state_mask_.store(0); // Specifically ~R_ENABLED. + cntx_.ReportCancelError(); // Context is fully resposible for cleanup. }); // Make sure the replica fully stopped and did all cleanup, @@ -732,7 +732,7 @@ error_code Replica::SendNextPhaseRequest(string_view kind) { return std::error_code{}; } -io::Result DflyShardReplica::StartSyncFlow(BlockingCounter sb, Context* cntx, +io::Result DflyShardReplica::StartSyncFlow(BlockingCounter sb, ExecutionState* cntx, std::optional lsn) { using nonstd::make_unexpected; DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty()); @@ -781,7 +781,7 @@ io::Result DflyShardReplica::StartSyncFlow(BlockingCounter sb, Context* cn return is_full_sync; } -error_code DflyShardReplica::StartStableSyncFlow(Context* cntx) { +error_code DflyShardReplica::StartStableSyncFlow(ExecutionState* cntx) { DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty()); ProactorBase* mythread = ProactorBase::me(); CHECK(mythread); @@ -796,7 +796,8 @@ error_code DflyShardReplica::StartStableSyncFlow(Context* cntx) { return std::error_code{}; } -void DflyShardReplica::FullSyncDflyFb(std::string eof_token, BlockingCounter bc, Context* cntx) { +void DflyShardReplica::FullSyncDflyFb(std::string eof_token, BlockingCounter bc, + ExecutionState* cntx) { DCHECK(leftover_buf_); io::PrefixSource ps{leftover_buf_->InputBuffer(), Sock()}; @@ -846,7 +847,7 @@ void DflyShardReplica::FullSyncDflyFb(std::string eof_token, BlockingCounter bc, VLOG(1) << "FullSyncDflyFb finished after reading " << rdb_loader_->bytes_read() << " bytes"; } -void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) { +void DflyShardReplica::StableSyncDflyReadFb(ExecutionState* cntx) { DCHECK_EQ(proactor_index_, ProactorBase::me()->GetPoolIndex()); // Check leftover from full sync. @@ -907,7 +908,7 @@ void Replica::RedisStreamAcksFb() { } } -void DflyShardReplica::StableSyncDflyAcksFb(Context* cntx) { +void DflyShardReplica::StableSyncDflyAcksFb(ExecutionState* cntx) { DCHECK_EQ(proactor_index_, ProactorBase::me()->GetPoolIndex()); constexpr size_t kAckRecordMaxInterval = 1024; @@ -958,7 +959,7 @@ DflyShardReplica::~DflyShardReplica() { JoinFlow(); } -void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, Context* cntx) { +void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, ExecutionState* cntx) { if (cntx->IsCancelled()) { return; } diff --git a/src/server/replica.h b/src/server/replica.h index ef293e0f1..dbb976b3d 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -179,21 +179,22 @@ class DflyShardReplica : public ProtocolClient { // Start replica initialized as dfly flow. // Sets is_full_sync when successful. - io::Result StartSyncFlow(util::fb2::BlockingCounter block, Context* cntx, + io::Result StartSyncFlow(util::fb2::BlockingCounter block, ExecutionState* cntx, std::optional); // Transition into stable state mode as dfly flow. - std::error_code StartStableSyncFlow(Context* cntx); + std::error_code StartStableSyncFlow(ExecutionState* cntx); // Single flow full sync fiber spawned by StartFullSyncFlow. - void FullSyncDflyFb(std::string eof_token, util::fb2::BlockingCounter block, Context* cntx); + void FullSyncDflyFb(std::string eof_token, util::fb2::BlockingCounter block, + ExecutionState* cntx); // Single flow stable state sync fiber spawned by StartStableSyncFlow. - void StableSyncDflyReadFb(Context* cntx); + void StableSyncDflyReadFb(ExecutionState* cntx); - void StableSyncDflyAcksFb(Context* cntx); + void StableSyncDflyAcksFb(ExecutionState* cntx); - void ExecuteTx(TransactionData&& tx_data, Context* cntx); + void ExecuteTx(TransactionData&& tx_data, ExecutionState* cntx); uint32_t FlowId() const; diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index a7d8812c5..289870105 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -37,7 +37,7 @@ constexpr size_t kMinBlobSize = 32_KB; } // namespace SliceSnapshot::SliceSnapshot(CompressionMode compression_mode, DbSlice* slice, - SnapshotDataConsumerInterface* consumer, Context* cntx) + SnapshotDataConsumerInterface* consumer, ExecutionState* cntx) : db_slice_(slice), db_array_(slice->databases()), compression_mode_(compression_mode), diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 250e61bc6..53cfaae22 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -54,13 +54,13 @@ class SliceSnapshot { virtual ~SnapshotDataConsumerInterface() = default; // Receives a chunk of snapshot data for processing - virtual void ConsumeData(std::string data, Context* cntx) = 0; + virtual void ConsumeData(std::string data, ExecutionState* cntx) = 0; // Finalizes the snapshot writing virtual void Finalize() = 0; }; SliceSnapshot(CompressionMode compression_mode, DbSlice* slice, - SnapshotDataConsumerInterface* consumer, Context* cntx); + SnapshotDataConsumerInterface* consumer, ExecutionState* cntx); ~SliceSnapshot(); static size_t GetThreadLocalMemoryUsage(); @@ -178,7 +178,7 @@ class SliceSnapshot { ThreadLocalMutex big_value_mu_; SnapshotDataConsumerInterface* consumer_; - Context* cntx_; + ExecutionState* cntx_; }; } // namespace dfly