diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc index 0e675901c..72e873f8b 100644 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -73,6 +73,8 @@ char* write_piece(string_view str, char* dest) { } // namespace +thread_local SinkReplyBuilder::PendingList SinkReplyBuilder::pending_list; + SinkReplyBuilder::ReplyAggregator::~ReplyAggregator() { rb->batched_ = prev; if (!prev) @@ -150,16 +152,22 @@ void SinkReplyBuilder::Send() { auto& reply_stats = tl_facade_stats->reply_stats; send_active_ = true; - uint64_t before_ns = util::fb2::ProactorBase::GetMonotonicTimeNs(); + PendingPin pin(util::fb2::ProactorBase::GetMonotonicTimeNs()); + + pending_list.push_back(pin); + reply_stats.io_write_cnt++; reply_stats.io_write_bytes += total_size_; DVLOG(2) << "Writing " << total_size_ << " bytes"; if (auto ec = sink_->Write(vecs_.data(), vecs_.size()); ec) ec_ = ec; + auto it = PendingList::s_iterator_to(pin); + pending_list.erase(it); + uint64_t after_ns = util::fb2::ProactorBase::GetMonotonicTimeNs(); reply_stats.send_stats.count++; - reply_stats.send_stats.total_duration += (after_ns - before_ns) / 1'000; + reply_stats.send_stats.total_duration += (after_ns - pin.timestamp_ns) / 1'000; DVLOG(2) << "Finished writing " << total_size_ << " bytes"; send_active_ = false; } diff --git a/src/facade/reply_builder.h b/src/facade/reply_builder.h index 762d2835e..f148bf97f 100644 --- a/src/facade/reply_builder.h +++ b/src/facade/reply_builder.h @@ -5,6 +5,7 @@ #include +#include #include #include @@ -33,6 +34,20 @@ class SinkReplyBuilder { constexpr static size_t kMaxInlineSize = 32; constexpr static size_t kMaxBufferSize = 8192; + struct PendingPin : public boost::intrusive::list_base_hook< + ::boost::intrusive::link_mode<::boost::intrusive::normal_link>> { + uint64_t timestamp_ns; + + PendingPin(uint64_t v = 0) : timestamp_ns(v) { + } + }; + + using PendingList = + boost::intrusive::list, + boost::intrusive::cache_last>; + + static thread_local PendingList pending_list; + explicit SinkReplyBuilder(io::Sink* sink) : sink_(sink) { } diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 07d3812a7..2bbfd1df4 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -638,6 +638,15 @@ optional ReplicaOfArgs::FromCmdArgs(CmdArgList args, SinkReplyBui return replicaof_args; } +uint64_t GetDelayMs(uint64_t ts) { + uint64_t now_ns = fb2::ProactorBase::GetMonotonicTimeNs(); + uint64_t delay_ns = 0; + if (ts < now_ns - 1000000) { // if more than 1ms has passed between ts and now_ns + delay_ns = (now_ns - ts) / 1000000; + } + return delay_ns; +} + } // namespace void SlowLogGet(dfly::CmdArgList args, std::string_view sub_cmd, util::ProactorPool* pp, @@ -1294,6 +1303,10 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd MetricType::GAUGE, &resp->body()); AppendMetricWithoutLabels("pipeline_queue_length", "", conn_stats.dispatch_queue_entries, MetricType::GAUGE, &resp->body()); + AppendMetricWithoutLabels("send_delay_seconds", "", + double(GetDelayMs(m.oldest_pending_send_ts)) / 1000.0, + MetricType::GAUGE, &resp->body()); + AppendMetricWithoutLabels("pipeline_throttle_total", "", conn_stats.pipeline_throttle_count, MetricType::COUNTER, &resp->body()); AppendMetricWithoutLabels("pipeline_cmd_cache_bytes", "", conn_stats.pipeline_cmd_cache_bytes, @@ -2145,6 +2158,17 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const { result.connections_lib_name_ver_map[k] += v; } + auto& send_list = facade::SinkReplyBuilder::pending_list; + if (!send_list.empty()) { + DCHECK(std::is_sorted(send_list.begin(), send_list.end(), + [](const auto& left, const auto& right) { + return left.timestamp_ns < right.timestamp_ns; + })); + + auto& oldest_member = send_list.front(); + result.oldest_pending_send_ts = + min(result.oldest_pending_send_ts, oldest_member.timestamp_ns); + } service_.mutable_registry()->MergeCallStats(index, cmd_stat_cb); }; // cb @@ -2253,6 +2277,8 @@ void ServerFamily::Info(CmdArgList args, const CommandContext& cmd_cntx) { append("client_read_buffer_bytes", m.facade_stats.conn_stats.read_buf_capacity); append("blocked_clients", m.facade_stats.conn_stats.num_blocked_clients); append("pipeline_queue_length", m.facade_stats.conn_stats.dispatch_queue_entries); + + append("send_delay_ms", GetDelayMs(m.oldest_pending_send_ts)); } if (should_enter("MEMORY")) { diff --git a/src/server/server_family.h b/src/server/server_family.h index 2e47283c4..a2b7cf7f8 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -107,6 +107,10 @@ struct Metrics { uint32_t blocked_tasks = 0; size_t worker_fiber_stack_size = 0; + // monotonic timestamp (ProactorBase::GetMonotonicTimeNs) of the connection stuck on send + // for longest time. + uint64_t oldest_pending_send_ts = uint64_t(-1); + InterpreterManager::Stats lua_stats; // command call frequencies (count, aggregated latency in usec). diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index cdd10c958..6b6eb1715 100755 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -606,6 +606,26 @@ async def test_subscribe_in_pipeline(async_client: aioredis.Redis): assert res == ["one", ["subscribe", "ch1", 1], "two", ["subscribe", "ch2", 2], "three"] +async def test_send_delay_metric(df_server: DflyInstance): + client = df_server.client() + await client.client_setname("client1") + blob = "A" * 1000 + for j in range(10): + await client.set(f"key-{j}", blob) + + await client.config_set("pipeline_queue_limit", 100) + reader, writer = await asyncio.open_connection("localhost", df_server.port) + for j in range(1000000): + writer.write(f"GET key-{j % 10}\n".encode()) + + @assert_eventually + async def wait_for_large_delay(): + info = await client.info("clients") + assert int(info["send_delay_ms"]) > 100 + + await wait_for_large_delay() + + async def test_match_http(df_server: DflyInstance): client = df_server.client() reader, writer = await asyncio.open_connection("localhost", df_server.port)