feat: Expose replica_reconnect_count for Prometheus metrics (#3370)

This commit is contained in:
Stepan Bagritsevich 2024-08-13 12:34:01 +02:00 committed by GitHub
parent 3f5d4f890d
commit c756023332
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 105 additions and 3 deletions

View file

@ -209,7 +209,8 @@ void Replica::MainReplicationFb() {
// Give a lower timeout for connect, because we're
ec = ConnectAndAuth(absl::GetFlag(FLAGS_master_reconnect_timeout_ms) * 1ms, &cntx_);
if (ec) {
LOG(ERROR) << "Error connecting to " << server().Description() << " " << ec;
reconnect_count_++;
LOG(WARNING) << "Error connecting to " << server().Description() << " " << ec;
continue;
}
VLOG(1) << "Replica socket connected";
@ -1086,6 +1087,7 @@ auto Replica::GetSummary() const -> Summary {
res.full_sync_done = (state_mask_.load() & R_SYNC_OK);
res.master_last_io_sec = (ProactorBase::GetMonotonicTimeNs() - last_io_time) / 1000000000UL;
res.master_id = master_context_.master_repl_id;
res.reconnect_count = reconnect_count_;
return res;
};

View file

@ -119,6 +119,7 @@ class Replica : ProtocolClient {
bool full_sync_done;
time_t master_last_io_sec; // monotonic clock.
std::string master_id;
uint32_t reconnect_count;
};
Summary GetSummary() const; // thread-safe, blocks fiber
@ -159,6 +160,8 @@ class Replica : ProtocolClient {
std::string id_;
std::optional<cluster::SlotRange> slot_range_;
uint32_t reconnect_count_ = 0;
};
class RdbLoader;

View file

@ -1294,6 +1294,16 @@ void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse*
absl::StrAppend(&resp->body(), replication_lag_metrics);
}
if (m.replica_reconnections) {
auto& replica_reconnections = m.replica_reconnections.value();
AppendMetricHeader("replica_reconnect_count", "Number of replica reconnects",
MetricType::COUNTER, &resp->body());
AppendMetricValue("replica_reconnect_count", replica_reconnections.reconnect_count,
{"replica_host", "replica_port"},
{replica_reconnections.host, absl::StrCat(replica_reconnections.port)},
&resp->body());
}
AppendMetricWithoutLabels("fiber_switch_total", "", m.fiber_switch_cnt, MetricType::COUNTER,
&resp->body());
double delay_seconds = m.fiber_switch_delay_usec * 1e-6;
@ -1984,8 +1994,14 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const {
result.delete_ttl_per_sec /= 6;
bool is_master = ServerState::tlocal() && ServerState::tlocal()->is_master;
if (is_master)
if (is_master) {
result.replication_metrics = dfly_cmd_->GetReplicasRoleInfo();
} else {
auto info = GetReplicaSummary();
if (info) {
result.replica_reconnections = {std::move(info->host), info->port, info->reconnect_count};
}
}
// Update peak stats. We rely on the fact that GetMetrics is called frequently enough to
// update peak_stats_ from it.

View file

@ -67,6 +67,12 @@ struct ReplicationMemoryStats {
size_t full_sync_buf_bytes = 0; // total bytes used for full sync buffers
};
struct ReplicaReconnectionsInfo {
std::string host;
uint16_t port;
uint32_t reconnect_count;
};
// Global peak stats recorded after aggregating metrics over all shards.
// Note that those values are only updated during GetMetrics calls.
struct PeakStats {
@ -115,6 +121,7 @@ struct Metrics {
// command call frequencies (count, aggregated latency in usec).
std::map<std::string, std::pair<uint64_t, uint64_t>> cmd_stats_map;
std::vector<ReplicaRoleInfo> replication_metrics;
std::optional<ReplicaReconnectionsInfo> replica_reconnections;
};
struct LastSaveInfo {

View file

@ -69,8 +69,11 @@ class Proxy:
if self.server is not None:
self.server.close()
self.server = None
for cb in self.stop_connections:
cb()
self.stop_connections = []
if not task == None:
try:
await task

View file

@ -1012,6 +1012,10 @@ def parse_lag(replication_info: str):
return int(lags[0])
async def get_metric_value(inst, metric_name, sample_index=0):
return (await inst.metrics())[metric_name].samples[sample_index].value
async def assert_lag_condition(inst, client, condition):
"""
Since lag is a bit random, and we want stable tests, we check
@ -1021,7 +1025,7 @@ async def assert_lag_condition(inst, client, condition):
prometheus endpoint.
"""
for _ in range(10):
lag = (await inst.metrics())["dragonfly_connected_replica_lag_records"].samples[0].value
lag = await get_metric_value(inst, "dragonfly_connected_replica_lag_records")
if condition(lag):
break
print("current prometheus lag =", lag)
@ -1038,6 +1042,23 @@ async def assert_lag_condition(inst, client, condition):
assert False, "Lag has never satisfied condition!"
async def get_replica_reconnects_count(replica_inst):
return await get_metric_value(replica_inst, "dragonfly_replica_reconnect_count")
async def assert_replica_reconnections(replica_inst, initial_reconnects_count):
"""
Asserts that the replica has attempted to reconnect at least once.
"""
reconnects_count = await get_replica_reconnects_count(replica_inst)
if reconnects_count > initial_reconnects_count:
return
assert (
False
), f"Expected reconnect count to increase by at least 1, but it did not. Initial dragonfly_replica_reconnect_count: {initial_reconnects_count}, current count: {reconnects_count}"
@dfly_args({"proactor_threads": 2})
@pytest.mark.asyncio
async def test_replication_info(df_factory: DflyInstanceFactory, df_seeder_factory, n_keys=2000):
@ -1812,6 +1833,56 @@ async def test_network_disconnect_small_buffer(df_factory, df_seeder_factory):
# assert master.is_in_logs("Partial sync requested from stale LSN")
async def test_replica_reconnections_after_network_disconnect(df_factory, df_seeder_factory):
master = df_factory.create(proactor_threads=6)
replica = df_factory.create(proactor_threads=4)
df_factory.start_all([replica, master])
seeder = df_seeder_factory.create(port=master.port)
async with replica.client() as c_replica:
await seeder.run(target_deviation=0.1)
proxy = Proxy("127.0.0.1", 1115, "127.0.0.1", master.port)
await proxy.start()
task = asyncio.create_task(proxy.serve())
try:
await c_replica.execute_command(f"REPLICAOF localhost {proxy.port}")
# Wait replica to be up and synchronized with master
await wait_for_replica_status(c_replica, status="up")
await wait_available_async(c_replica)
initial_reconnects_count = await get_replica_reconnects_count(replica)
# Fully drop the server
await proxy.close(task)
# After dropping the connection replica should try to reconnect
await wait_for_replica_status(c_replica, status="down")
await asyncio.sleep(2)
# Restart the proxy
await proxy.start()
task = asyncio.create_task(proxy.serve())
# Wait replica to be reconnected and synchronized with master
await wait_for_replica_status(c_replica, status="up")
await wait_available_async(c_replica)
capture = await seeder.capture()
assert await seeder.compare(capture, replica.port)
# Assert replica reconnects metrics increased
await assert_replica_reconnections(replica, initial_reconnects_count)
finally:
await proxy.close(task)
master.stop()
replica.stop()
async def test_search(df_factory):
master = df_factory.create(proactor_threads=4)
replica = df_factory.create(proactor_threads=4)