From 0ee52c9d35b202110839213bba62c55e1746a837 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Wed, 28 Aug 2024 18:21:53 +0300 Subject: [PATCH] chore: remove DflyVersion::VER0 (#3593) Stop supporting DflyVersion::VER0 from more than a year ago. In addition, rename Metrics fields to make them more clear General improvements and fix the reconnect metric. Signed-off-by: Roman Gershman --- src/server/conn_context.h | 2 +- src/server/dflycmd.h | 5 +- src/server/replica.cc | 101 ++++++++++++++++++------------------ src/server/replica.h | 20 ++++--- src/server/server_family.cc | 43 ++++++++------- src/server/server_family.h | 18 ++++--- src/server/version.h | 3 -- 7 files changed, 98 insertions(+), 94 deletions(-) diff --git a/src/server/conn_context.h b/src/server/conn_context.h index 11acf9852..50a099837 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -133,7 +133,7 @@ struct ConnectionState { uint32_t repl_flow_id = UINT32_MAX; std::string repl_ip_address; uint32_t repl_listening_port = 0; - DflyVersion repl_version = DflyVersion::VER0; + DflyVersion repl_version = DflyVersion::VER1; }; struct SquashingInfo { diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index 160f31907..14b6561bc 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -43,7 +43,7 @@ struct FlowInfo { std::unique_ptr streamer; // Streamer for stable sync phase std::string eof_token; - DflyVersion version = DflyVersion::VER0; + DflyVersion version = DflyVersion::VER1; std::optional start_partial_sync_at; uint64_t last_acked_lsn = 0; @@ -128,7 +128,7 @@ class DflyCmd { std::string id; std::string address; uint32_t listening_port; - DflyVersion version = DflyVersion::VER0; + DflyVersion version = DflyVersion::VER1; // Flows describe the state of shard-local flow. // They are always indexed by the shard index on the master. @@ -153,6 +153,7 @@ class DflyCmd { // Master side acces method to replication info of that connection. std::shared_ptr GetReplicaInfoFromConnection(ConnectionContext* cntx); + // Master-side command. Provides Replica info. std::vector GetReplicasRoleInfo() const ABSL_LOCKS_EXCLUDED(mu_); void GetReplicationMemoryStats(ReplicationMemoryStats* out) const ABSL_LOCKS_EXCLUDED(mu_); diff --git a/src/server/replica.cc b/src/server/replica.cc index 426df939a..084957e08 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -162,15 +162,15 @@ void Replica::Pause(bool pause) { VLOG(1) << "Pausing replication"; Proactor()->Await([&] { is_paused_ = pause; - if (num_df_flows_ > 0) { - auto partition = Partition(num_df_flows_); - auto cb = [&](unsigned index, auto*) { - for (auto id : partition[index]) { - shard_flows_[id]->Pause(pause); - } - }; - shard_set->pool()->AwaitBrief(cb); - } + if (shard_flows_.empty()) + return; + + auto cb = [&](unsigned index, auto*) { + for (auto id : thread_flow_map_[index]) { + shard_flows_[id]->Pause(pause); + } + }; + shard_set->pool()->AwaitBrief(cb); }); } @@ -347,14 +347,15 @@ std::error_code Replica::HandleCapaDflyResp() { } master_context_.master_repl_id = master_repl_id; master_context_.dfly_session_id = ToSV(LastResponseArgs()[1].GetBuf()); - num_df_flows_ = param_num_flows; + master_context_.num_flows = param_num_flows; if (LastResponseArgs().size() >= 4) { PC_RETURN_ON_BAD_RESPONSE(LastResponseArgs()[3].type == RespExpr::INT64); master_context_.version = DflyVersion(get(LastResponseArgs()[3].u)); } VLOG(1) << "Master id: " << master_context_.master_repl_id - << ", sync id: " << master_context_.dfly_session_id << ", num journals: " << num_df_flows_ + << ", sync id: " << master_context_.dfly_session_id + << ", num journals: " << param_num_flows << ", version: " << unsigned(master_context_.version); return error_code{}; @@ -369,12 +370,9 @@ std::error_code Replica::ConfigureDflyMaster() { LOG(WARNING) << "Bad REPLCONF CLIENT-ID response"; } - // Tell the master our version if it supports REPLCONF CLIENT-VERSION - if (master_context_.version > DflyVersion::VER0) { - RETURN_ON_ERR( - SendCommandAndReadResponse(StrCat("REPLCONF CLIENT-VERSION ", DflyVersion::CURRENT_VER))); - PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK")); - } + RETURN_ON_ERR( + SendCommandAndReadResponse(StrCat("REPLCONF CLIENT-VERSION ", DflyVersion::CURRENT_VER))); + PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK")); return error_code{}; } @@ -476,14 +474,16 @@ error_code Replica::InitiateDflySync() { multi_shard_exe_.reset(new MultiShardExecution()); // Initialize shard flows. - shard_flows_.resize(num_df_flows_); - for (unsigned i = 0; i < num_df_flows_; ++i) { + shard_flows_.resize(master_context_.num_flows); + DCHECK(!shard_flows_.empty()); + for (unsigned i = 0; i < shard_flows_.size(); ++i) { shard_flows_[i].reset( new DflyShardReplica(server(), master_context_, i, &service_, multi_shard_exe_)); } + thread_flow_map_ = Partition(shard_flows_.size()); // Blocked on until all flows got full sync cut. - BlockingCounter sync_block{num_df_flows_}; + BlockingCounter sync_block{unsigned(shard_flows_.size())}; // Switch to new error handler that closes flow sockets. auto err_handler = [this, sync_block](const auto& ge) mutable { @@ -516,12 +516,12 @@ error_code Replica::InitiateDflySync() { std::string_view sync_type = "full"; { + unsigned num_df_flows = shard_flows_.size(); // Going out of the way to avoid using std::vector... - auto is_full_sync = std::make_unique(num_df_flows_); - auto partition = Partition(num_df_flows_); - CHECK(!last_journal_LSNs_ || last_journal_LSNs_->size() == shard_flows_.size()); + auto is_full_sync = std::make_unique(num_df_flows); + DCHECK(!last_journal_LSNs_ || last_journal_LSNs_->size() == num_df_flows); auto shard_cb = [&](unsigned index, auto*) { - for (auto id : partition[index]) { + for (auto id : thread_flow_map_[index]) { auto ec = shard_flows_[id]->StartSyncFlow(sync_block, &cntx_, last_journal_LSNs_.has_value() ? std::optional((*last_journal_LSNs_)[id]) @@ -538,9 +538,9 @@ error_code Replica::InitiateDflySync() { shard_set->pool()->AwaitFiberOnAll(std::move(shard_cb)); size_t num_full_flows = - std::accumulate(is_full_sync.get(), is_full_sync.get() + num_df_flows_, 0); + std::accumulate(is_full_sync.get(), is_full_sync.get() + num_df_flows, 0); - if (num_full_flows == num_df_flows_) { + if (num_full_flows == num_df_flows) { if (slot_range_.has_value()) { JournalExecutor{&service_}.FlushSlots(slot_range_.value()); } else { @@ -668,9 +668,8 @@ error_code Replica::ConsumeDflyStream() { LOG(INFO) << "Transitioned into stable sync"; // Transition flows into stable sync. { - auto partition = Partition(num_df_flows_); auto shard_cb = [&](unsigned index, auto*) { - const auto& local_ids = partition[index]; + const auto& local_ids = thread_flow_map_[index]; for (unsigned id : local_ids) { auto ec = shard_flows_[id]->StartStableSyncFlow(&cntx_); if (ec) @@ -723,6 +722,7 @@ io::Result DflyShardReplica::StartSyncFlow(BlockingCounter sb, Context* cn std::optional lsn) { using nonstd::make_unexpected; DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty()); + proactor_index_ = ProactorBase::me()->GetPoolIndex(); RETURN_ON_ERR_T(make_unexpected, ConnectAndAuth(absl::GetFlag(FLAGS_master_connect_timeout_ms) * 1ms, &cntx_)); @@ -826,14 +826,15 @@ void DflyShardReplica::FullSyncDflyFb(std::string eof_token, BlockingCounter bc, if (auto jo = rdb_loader_->journal_offset(); jo.has_value()) { this->journal_rec_executed_.store(*jo); } else { - if (master_context_.version > DflyVersion::VER0) - cntx->ReportError(std::make_error_code(errc::protocol_error), - "Error finding journal offset in stream"); + cntx->ReportError(std::make_error_code(errc::protocol_error), + "Error finding journal offset in stream"); } VLOG(1) << "FullSyncDflyFb finished after reading " << rdb_loader_->bytes_read() << " bytes"; } void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) { + DCHECK_EQ(proactor_index_, ProactorBase::me()->GetPoolIndex()); + // Check leftover from full sync. io::Bytes prefix{}; if (leftover_buf_ && leftover_buf_->InputLen() > 0) { @@ -846,15 +847,15 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) { DCHECK_GE(journal_rec_executed_, 1u); TransactionReader tx_reader{journal_rec_executed_.load(std::memory_order_relaxed) - 1}; - if (master_context_.version > DflyVersion::VER0) { - acks_fb_ = fb2::Fiber("shard_acks", &DflyShardReplica::StableSyncDflyAcksFb, this, cntx); - } + acks_fb_ = fb2::Fiber("shard_acks", &DflyShardReplica::StableSyncDflyAcksFb, this, cntx); while (!cntx->IsCancelled()) { auto tx_data = tx_reader.NextTxData(&reader, cntx); if (!tx_data) break; + DVLOG(3) << "Lsn: " << tx_data->lsn; + last_io_time_ = Proactor()->GetMonotonicTimeNs(); if (tx_data->opcode == journal::Op::LSN) { // Do nothing @@ -895,6 +896,8 @@ void Replica::RedisStreamAcksFb() { } void DflyShardReplica::StableSyncDflyAcksFb(Context* cntx) { + DCHECK_EQ(proactor_index_, ProactorBase::me()->GetPoolIndex()); + constexpr size_t kAckRecordMaxInterval = 1024; std::chrono::duration ack_time_max_interval = 1ms * absl::GetFlag(FLAGS_replication_acks_interval); @@ -1077,6 +1080,9 @@ error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* d auto Replica::GetSummary() const -> Summary { auto f = [this]() { auto last_io_time = LastIoTime(); + + // Note: we access LastIoTime from foreigh thread in unsafe manner. However, specifically here + // it's unlikely to cause a real bug. for (const auto& flow : shard_flows_) { // Get last io time from all sub flows. last_io_time = std::max(last_io_time, flow->LastIoTime()); } @@ -1095,18 +1101,17 @@ auto Replica::GetSummary() const -> Summary { if (Sock()) return Proactor()->AwaitBrief(f); - else { - /** - * when this branch happens: there is a very short grace period - * where Sock() is not initialized, yet the server can - * receive ROLE/INFO commands. That period happens when launching - * an instance with '--replicaof' and then immediately - * sending a command. - * - * In that instance, we have to run f() on the current fiber. - */ - return f(); - } + + /** + * when this branch happens: there is a very short grace period + * where Sock() is not initialized, yet the server can + * receive ROLE/INFO commands. That period happens when launching + * an instance with '--replicaof' and then immediately + * sending a command. + * + * In that instance, we have to run f() on the current fiber. + */ + return f(); } std::vector Replica::GetReplicaOffset() const { @@ -1129,10 +1134,6 @@ uint32_t DflyShardReplica::FlowId() const { return flow_id_; } -uint64_t DflyShardReplica::JournalExecutedCount() const { - return journal_rec_executed_.load(std::memory_order_relaxed); -} - void DflyShardReplica::Pause(bool pause) { if (rdb_loader_) { rdb_loader_->Pause(pause); diff --git a/src/server/replica.h b/src/server/replica.h index bf207b19d..87ce82d31 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -36,7 +36,8 @@ class DflyShardReplica; struct MasterContext { std::string master_repl_id; std::string dfly_session_id; // Sync session id for dfly sync. - DflyVersion version = DflyVersion::VER0; + unsigned num_flows = 0; + DflyVersion version = DflyVersion::VER1; }; // This class manages replication from both Dragonfly and Redis masters. @@ -122,7 +123,7 @@ class Replica : ProtocolClient { uint32_t reconnect_count; }; - Summary GetSummary() const; // thread-safe, blocks fiber + Summary GetSummary() const; // thread-safe, blocks fiber, makes a hop. bool HasDflyMaster() const { return !master_context_.dfly_session_id.empty(); @@ -142,6 +143,8 @@ class Replica : ProtocolClient { util::fb2::EventCount replica_waker_; std::vector> shard_flows_; + std::vector> thread_flow_map_; // a map from proactor id to flow list. + // A vector of the last executer LSNs when a replication is interrupted. // Allows partial sync on reconnects. std::optional> last_journal_LSNs_; @@ -154,7 +157,6 @@ class Replica : ProtocolClient { // ack_offs_ last acknowledged offset. size_t repl_offs_ = 0, ack_offs_ = 0; std::atomic state_mask_ = 0; - unsigned num_df_flows_ = 0; bool is_paused_ = false; std::string id_; @@ -196,8 +198,11 @@ class DflyShardReplica : public ProtocolClient { uint32_t FlowId() const; - uint64_t JournalExecutedCount() const; + uint64_t JournalExecutedCount() const { + return journal_rec_executed_.load(std::memory_order_relaxed); + } + // Can be called from any thread. void Pause(bool pause); private: @@ -218,13 +223,12 @@ class DflyShardReplica : public ProtocolClient { // Note: This is not 1-to-1 the LSN in the master, because this counts // **executed** records, which might be received interleaved when commands // run out-of-order on the master instance. + // Atomic, because JournalExecutedCount() can be called from any thread. std::atomic_uint64_t journal_rec_executed_ = 0; - util::fb2::Fiber sync_fb_; - - util::fb2::Fiber acks_fb_; + util::fb2::Fiber sync_fb_, acks_fb_; size_t ack_offs_ = 0; - + int proactor_index_ = -1; bool force_ping_ = false; std::shared_ptr multi_shard_exe_; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index bd217862d..b00cd15fe 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1261,7 +1261,7 @@ void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse* if (added) absl::StrAppend(&resp->body(), type_used_memory_metric); } - if (!m.replication_metrics.empty()) { + if (!m.master_side_replicas_info.empty()) { ReplicationMemoryStats repl_mem; dfly_cmd->GetReplicationMemoryStats(&repl_mem); AppendMetricWithoutLabels( @@ -1342,11 +1342,11 @@ void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse* absl::StrAppend(&resp->body(), command_metrics); } - if (!m.replication_metrics.empty()) { + if (!m.master_side_replicas_info.empty()) { string replication_lag_metrics; AppendMetricHeader("connected_replica_lag_records", "Lag in records of a connected replica.", MetricType::GAUGE, &replication_lag_metrics); - for (const auto& replica : m.replication_metrics) { + for (const auto& replica : m.master_side_replicas_info) { AppendMetricValue("connected_replica_lag_records", replica.lsn_lag, {"replica_ip", "replica_port", "replica_state"}, {replica.address, absl::StrCat(replica.listening_port), replica.state}, @@ -1355,14 +1355,10 @@ void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse* absl::StrAppend(&resp->body(), replication_lag_metrics); } - if (m.replica_reconnections) { - auto& replica_reconnections = m.replica_reconnections.value(); - AppendMetricHeader("replica_reconnect_count", "Number of replica reconnects", - MetricType::COUNTER, &resp->body()); - AppendMetricValue("replica_reconnect_count", replica_reconnections.reconnect_count, - {"replica_host", "replica_port"}, - {replica_reconnections.host, absl::StrCat(replica_reconnections.port)}, - &resp->body()); + if (m.replica_side_info) { + auto& replica_info = *m.replica_side_info; + AppendMetricWithoutLabels("replica_reconnect_count", "Number of replica reconnects", + replica_info.reconnect_count, MetricType::COUNTER, &resp->body()); } AppendMetricWithoutLabels("fiber_switch_total", "", m.fiber_switch_cnt, MetricType::COUNTER, @@ -2037,7 +2033,7 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const { result.delete_ttl_per_sec += shard->GetMovingSum6(EngineShard::TTL_DELETE); if (result.tx_queue_len < shard->txq()->size()) result.tx_queue_len = shard->txq()->size(); - } + } // if (shard) result.tls_bytes += Listener::TLSUsedMemoryThreadLocal(); result.refused_conn_max_clients_reached_count += Listener::RefusedConnectionMaxClientsCount(); @@ -2045,7 +2041,7 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const { result.lua_stats += InterpreterManager::tl_stats(); service_.mutable_registry()->MergeCallStats(index, cmd_stat_cb); - }; + }; // cb service_.proactor_pool().AwaitFiberOnAll(std::move(cb)); @@ -2056,11 +2052,13 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const { bool is_master = ServerState::tlocal() && ServerState::tlocal()->is_master; if (is_master) { - result.replication_metrics = dfly_cmd_->GetReplicasRoleInfo(); + result.master_side_replicas_info = dfly_cmd_->GetReplicasRoleInfo(); } else { auto info = GetReplicaSummary(); if (info) { - result.replica_reconnections = {std::move(info->host), info->port, info->reconnect_count}; + result.replica_side_info = { + .reconnect_count = info->reconnect_count, + }; } } @@ -2197,7 +2195,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("maxmemory_policy", "noeviction"); } - if (!m.replication_metrics.empty()) { + if (!m.master_side_replicas_info.empty()) { ReplicationMemoryStats repl_mem; dfly_cmd_->GetReplicationMemoryStats(&repl_mem); append("replication_streaming_buffer_bytes", repl_mem.streamer_buf_capacity_bytes); @@ -2361,7 +2359,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { if (!replica_) { append("role", "master"); append("connected_slaves", m.facade_stats.conn_stats.num_replicas); - const auto& replicas = m.replication_metrics; + const auto& replicas = m.master_side_replicas_info; for (size_t i = 0; i < replicas.size(); i++) { auto& r = replicas[i]; // e.g. slave0:ip=172.19.0.3,port=6379,state=full_sync @@ -2372,7 +2370,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { } else { append("role", GetFlag(FLAGS_info_replication_valkey_compatible) ? "slave" : "replica"); - auto replication_info_cb = [&](Replica::Summary rinfo) { + auto replication_info_cb = [&](const Replica::Summary& rinfo) { append("master_host", rinfo.host); append("master_port", rinfo.port); @@ -2385,6 +2383,8 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("slave_read_only", 1); }; replication_info_cb(replica_->GetSummary()); + + // Special case, when multiple masters replicate to a single replica. for (const auto& replica : cluster_replicas_) { replication_info_cb(replica->GetSummary()); } @@ -2700,8 +2700,6 @@ void ServerFamily::Replicate(string_view host, string_view port) { void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) { VLOG(1) << "ReplTakeOver start"; - util::fb2::LockGuard lk(replicaof_mu_); - CmdArgParser parser{args}; int timeout_sec = parser.Next(); @@ -2722,6 +2720,8 @@ void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) { if (ServerState::tlocal()->is_master) return cntx->SendOk(); + util::fb2::LockGuard lk(replicaof_mu_); + auto repl_ptr = replica_; CHECK(repl_ptr); @@ -2735,8 +2735,7 @@ void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) { return cntx->SendError("Couldn't execute takeover"); LOG(INFO) << "Takeover successful, promoting this instance to master."; - service_.proactor_pool().AwaitFiberOnAll( - [&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = true; }); + SetMasterFlagOnAllThreads(true); replica_->Stop(); replica_.reset(); return cntx->SendOk(); diff --git a/src/server/server_family.h b/src/server/server_family.h index c8578ce9c..8e78f76c1 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -67,12 +67,6 @@ struct ReplicationMemoryStats { size_t full_sync_buf_bytes = 0; // total bytes used for full sync buffers }; -struct ReplicaReconnectionsInfo { - std::string host; - uint16_t port; - uint32_t reconnect_count; -}; - struct LoadingStats { size_t restore_count = 0; size_t failed_restore_count = 0; @@ -128,8 +122,16 @@ struct Metrics { // command call frequencies (count, aggregated latency in usec). std::map> cmd_stats_map; - std::vector replication_metrics; - std::optional replica_reconnections; + + // Replica info on the master side. + std::vector master_side_replicas_info; + + struct ReplicaInfo { + uint32_t reconnect_count; + }; + + // Replica reconnect stats on the replica side. Undefined for master + std::optional replica_side_info; LoadingStats loading_stats; }; diff --git a/src/server/version.h b/src/server/version.h index f5f8ea6ff..e89d19bb7 100644 --- a/src/server/version.h +++ b/src/server/version.h @@ -17,9 +17,6 @@ const char* GetVersion(); // Please document for each new entry what the behavior changes are // and to which released versions this corresponds. enum class DflyVersion { - // ver <= 1.3 - VER0, - // 1.4 <= ver <= 1.10 // - Supports receiving ACKs from replicas // - Sends version back on REPLCONF capa dragonfly