From 6d30baa20bbed0d61335a275630372fa23b2085c Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sun, 27 Apr 2025 20:48:02 +0300 Subject: [PATCH] chore: Pipelining fixes (#4994) Fixes #4998. 1. Reduces agressive yielding when reading multiple requests since it humpers pipeline efficiency. Now we yield consistently based on cpu time spend since the last resume point (via flag with sane defaults). 2. Increases socket read buffer size effectively allowing processing more requests in bulk. `./dragonfly --cluster_mode=emulated` latencies (usec) for pipeline sizes 80-199: p50: 1887, p75: 2367, p90: 2897, p99: 6266 `./dragonfly --cluster_mode=emulated --experimental_cluster_shard_by_slot` latencies (usec) for pipeline sizes 80-199: p50: 813, p75: 976, p90: 1216, p99: 3528 Signed-off-by: Roman Gershman --- src/facade/dragonfly_connection.cc | 89 ++- src/facade/dragonfly_connection.h | 7 +- src/server/main_service.cc | 4 +- src/server/server_family.cc | 18 +- tests/dragonfly/connection_test.py | 22 +- .../provisioning/dashboards/dragonfly.json | 555 +++++++++++------- 6 files changed, 422 insertions(+), 273 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 4c325e092..822119303 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -13,6 +13,7 @@ #include #include +#include "base/cycle_clock.h" #include "base/flags.h" #include "base/histogram.h" #include "base/io_buf.h" @@ -23,7 +24,9 @@ #include "facade/memcache_parser.h" #include "facade/redis_parser.h" #include "facade/service_interface.h" +#include "glog/logging.h" #include "io/file.h" +#include "util/fibers/fibers.h" #include "util/fibers/proactor_base.h" #ifdef DFLY_USE_SSL @@ -93,6 +96,10 @@ ABSL_FLAG(bool, migrate_connections, true, "they operate. Currently this is only supported for Lua script invocations, and can " "happen at most once per connection."); +ABSL_FLAG(uint32_t, max_busy_read_usec, 100, + "Maximum time we read and parse from " + "a socket without yielding. In microseconds."); + using namespace util; using namespace std; using absl::GetFlag; @@ -146,7 +153,7 @@ struct TrafficLogger { void TrafficLogger::ResetLocked() { if (log_file) { - log_file->Close(); + std::ignore = log_file->Close(); log_file.reset(); } } @@ -196,7 +203,7 @@ void OpenTrafficLogger(string_view base_path) { // Write version, incremental numbering :) uint8_t version[1] = {2}; - tl_traffic_logger.log_file->Write(version); + std::ignore = tl_traffic_logger.log_file->Write(version); } void LogTraffic(uint32_t id, bool has_more, absl::Span resp, @@ -876,6 +883,7 @@ pair Connection::GetClientInfoBeforeAfterTid() const { absl::StrAppend(&after, " irqmatch=", int(cpu == my_cpu_id)); if (dispatch_q_.size()) { absl::StrAppend(&after, " pipeline=", dispatch_q_.size()); + absl::StrAppend(&after, " pbuf=", pending_pipeline_bytes_); } absl::StrAppend(&after, " age=", now - creation_time_, " idle=", now - last_interaction_); string_view phase_name = PHASE_NAMES[phase_]; @@ -1028,7 +1036,7 @@ void Connection::ConnectionFlow() { if (io_buf_.InputLen() > 0) { phase_ = PROCESS; if (redis_parser_) { - parse_status = ParseRedis(); + parse_status = ParseRedis(10000); } else { DCHECK(memcache_parser_); parse_status = ParseMemcache(); @@ -1136,19 +1144,6 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef invoke_ // Dispatch async if we're handling a pipeline or if we can't dispatch sync. if (optimize_for_async || !can_dispatch_sync) { SendAsync(cmd_msg_cb()); - - auto epoch = fb2::FiberSwitchEpoch(); - - if (async_fiber_epoch_ == epoch) { - // If we pushed too many items without context switching - yield - if (++async_streak_len_ >= 10 && !cc_->async_dispatch) { - async_streak_len_ = 0; - ThisFiber::Yield(); - } - } else { - async_streak_len_ = 0; - async_fiber_epoch_ = epoch; - } } else { ShrinkPipelinePool(); // Gradually release pipeline request pool. { @@ -1164,20 +1159,17 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef invoke_ } } -Connection::ParserStatus Connection::ParseRedis() { +Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles) { uint32_t consumed = 0; RedisParser::Result result = RedisParser::OK; - // Re-use connection local resources to reduce allocations - RespVec& parse_args = tmp_parse_args_; - CmdArgVec& cmd_vec = tmp_cmd_vec_; - - auto dispatch_sync = [this, &parse_args, &cmd_vec] { - RespExpr::VecToArgList(parse_args, &cmd_vec); - service_->DispatchCommand(absl::MakeSpan(cmd_vec), reply_builder_.get(), cc_.get()); + auto dispatch_sync = [this] { + RespExpr::VecToArgList(tmp_parse_args_, &tmp_cmd_vec_); + service_->DispatchCommand(absl::MakeSpan(tmp_cmd_vec_), reply_builder_.get(), cc_.get()); }; - auto dispatch_async = [this, &parse_args, tlh = mi_heap_get_backing()]() -> MessageHandle { - return {FromArgs(std::move(parse_args), tlh)}; + + auto dispatch_async = [this, tlh = mi_heap_get_backing()]() -> MessageHandle { + return {FromArgs(std::move(tmp_parse_args_), tlh)}; }; ReadBuffer read_buffer = GetReadBuffer(); @@ -1186,10 +1178,10 @@ Connection::ParserStatus Connection::ParseRedis() { if (read_buffer.ShouldAdvance()) { // can happen only with io_uring/bundles read_buffer.slice = NextBundleBuffer(read_buffer.available_bytes); } - result = redis_parser_->Parse(read_buffer.slice, &consumed, &parse_args); + result = redis_parser_->Parse(read_buffer.slice, &consumed, &tmp_parse_args_); request_consumed_bytes_ += consumed; - if (result == RedisParser::OK && !parse_args.empty()) { - if (RespExpr& first = parse_args.front(); first.type == RespExpr::STRING) + if (result == RedisParser::OK && !tmp_parse_args_.empty()) { + if (RespExpr& first = tmp_parse_args_.front(); first.type == RespExpr::STRING) DVLOG(2) << "Got Args with first token " << ToSV(first.GetBuf()); if (io_req_size_hist) @@ -1198,12 +1190,20 @@ Connection::ParserStatus Connection::ParseRedis() { bool has_more = consumed < read_buffer.available_bytes; if (tl_traffic_logger.log_file && IsMain() /* log only on the main interface */) { - LogTraffic(id_, has_more, absl::MakeSpan(parse_args), service_->GetContextInfo(cc_.get())); + LogTraffic(id_, has_more, absl::MakeSpan(tmp_parse_args_), + service_->GetContextInfo(cc_.get())); } DispatchSingle(has_more, dispatch_sync, dispatch_async); } read_buffer.Consume(consumed); + + // We must yield from time to time to allow other fibers to run. + // Specifically, if a client sends a huge chunk of data resulting in a very long pipeline, + // we want to yield to allow AsyncFiber to actually execute on the pending pipeline. + if (ThisFiber::GetRunningTimeCycles() > max_busy_cycles) { + ThisFiber::Yield(); + } } while (RedisParser::OK == result && read_buffer.available_bytes > 0 && !reply_builder_->GetError()); @@ -1390,6 +1390,9 @@ auto Connection::IoLoop() -> variant { ParserStatus parse_status = OK; size_t max_iobfuf_len = GetFlag(FLAGS_max_client_iobuf_len); + unsigned max_busy_read_cycles = + (base::CycleClock::Frequency() * GetFlag(FLAGS_max_busy_read_usec)) / 1000000U; + auto* peer = socket_.get(); recv_buf_.res_len = 0; @@ -1404,12 +1407,16 @@ auto Connection::IoLoop() -> variant { bool is_iobuf_full = io_buf_.AppendLen() == 0; if (redis_parser_) { - parse_status = ParseRedis(); + parse_status = ParseRedis(max_busy_read_cycles); } else { DCHECK(memcache_parser_); parse_status = ParseMemcache(); } + if (reply_builder_->GetError()) { + return reply_builder_->GetError(); + } + if (parse_status == NEED_MORE) { parse_status = OK; @@ -1429,11 +1436,9 @@ auto Connection::IoLoop() -> variant { [&]() { io_buf_.Reserve(std::min(max_iobfuf_len, parser_hint)); }); } - // If we got a partial request and we couldn't parse the length, just - // double the capacity. // If we got a partial request because iobuf was full, grow it up to // a reasonable limit to save on Recv() calls. - if (io_buf_.AppendLen() < 64u || (is_iobuf_full && capacity < 4096)) { + if (is_iobuf_full && capacity < max_iobfuf_len / 2) { // Last io used most of the io_buf to the end. UpdateIoBufCapacity(io_buf_, stats_, [&]() { io_buf_.Reserve(capacity * 2); // Valid growth range. @@ -1441,21 +1446,11 @@ auto Connection::IoLoop() -> variant { } DCHECK_GT(io_buf_.AppendLen(), 0U); - } else if (io_buf_.AppendLen() == 0) { - // We have a full buffer and we can not progress with parsing. - // This means that we have request too large. - LOG(ERROR) << "Request is too large, closing connection"; - parse_status = ERROR; - break; } } else if (parse_status != OK) { break; } - ec = reply_builder_->GetError(); - } while (peer->IsOpen() && !ec); - - if (ec) - return ec; + } while (peer->IsOpen()); return parse_status; } @@ -1833,6 +1828,7 @@ void Connection::SendAsync(MessageHandle msg) { // Squashing is only applied to redis commands if (std::holds_alternative(msg.handle)) { pending_pipeline_cmd_cnt_++; + pending_pipeline_bytes_ += used_mem; } if (msg.IsControl()) { @@ -1869,7 +1865,10 @@ void Connection::RecycleMessage(MessageHandle msg) { // Retain pipeline message in pool. if (auto* pipe = get_if(&msg.handle); pipe) { + DCHECK_GE(pending_pipeline_bytes_, used_mem); + DCHECK_GE(pending_pipeline_cmd_cnt_, 1u); pending_pipeline_cmd_cnt_--; + pending_pipeline_bytes_ -= used_mem; if (stats_->pipeline_cmd_cache_bytes < qbp.pipeline_cache_limit) { stats_->pipeline_cmd_cache_bytes += (*pipe)->StorageCapacity(); pipeline_req_pool_.push_back(std::move(*pipe)); diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 3b3095a1e..5e1e1be97 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -367,7 +367,7 @@ class Connection : public util::Connection { // Create new pipeline request, re-use from pool when possible. PipelineMessagePtr FromArgs(RespVec args, mi_heap_t* heap); - ParserStatus ParseRedis(); + ParserStatus ParseRedis(unsigned max_busy_cycles); ParserStatus ParseMemcache(); void OnBreakCb(int32_t mask); @@ -427,6 +427,7 @@ class Connection : public util::Connection { util::fb2::Fiber async_fb_; // async fiber (if started) uint64_t pending_pipeline_cmd_cnt_ = 0; // how many queued Redis async commands in dispatch_q + size_t pending_pipeline_bytes_ = 0; // how many bytes of the queued Redis async commands // how many bytes of the current request have been consumed size_t request_consumed_bytes_ = 0; @@ -455,10 +456,6 @@ class Connection : public util::Connection { unsigned parser_error_ = 0; - // amount of times we enqued requests asynchronously during the same async_fiber_epoch_. - unsigned async_streak_len_ = 0; - uint64_t async_fiber_epoch_ = 0; - BreakerCb breaker_cb_; // Used by redis parser to avoid allocations diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 91f003b5a..bf5590b11 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -116,8 +116,8 @@ ABSL_FLAG(size_t, serialization_max_chunk_size, 64_KB, "Maximum size of a value that may be serialized at once during snapshotting or full " "sync. Values bigger than this threshold will be serialized using streaming " "serialization. 0 - to disable streaming mode"); -ABSL_FLAG(uint32_t, max_squashed_cmd_num, 32, - "Max number of commands squashed in command squash optimizaiton"); +ABSL_FLAG(uint32_t, max_squashed_cmd_num, 100, + "Max number of commands squashed in a single shard during squash optimizaiton"); namespace dfly { diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 40c4243fd..155577c2b 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1314,6 +1314,20 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd AppendMetricWithoutLabels("pipeline_commands_duration_seconds", "", conn_stats.pipelined_cmd_latency * 1e-6, MetricType::COUNTER, &resp->body()); + + AppendMetricWithoutLabels("cmd_squash_hop_total", "", m.coordinator_stats.multi_squash_executions, + MetricType::COUNTER, &resp->body()); + + AppendMetricWithoutLabels("cmd_squash_commands_total", "", m.coordinator_stats.squashed_commands, + MetricType::COUNTER, &resp->body()); + + AppendMetricWithoutLabels("cmd_squash_hop_duration_seconds", "", + m.coordinator_stats.multi_squash_exec_hop_usec * 1e-6, + MetricType::COUNTER, &resp->body()); + AppendMetricWithoutLabels("cmd_squash_hop_reply_seconds", "", + m.coordinator_stats.multi_squash_exec_reply_usec * 1e-6, + MetricType::COUNTER, &resp->body()); + AppendMetricWithoutLabels("commands_squashing_replies_bytes", "", MultiCommandSquasher::GetRepliesMemSize(), MetricType::GAUGE, &resp->body()); @@ -2486,7 +2500,6 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio append("total_commands_processed", conn_stats.command_cnt_main + conn_stats.command_cnt_other); append("instantaneous_ops_per_sec", m.qps); append("total_pipelined_commands", conn_stats.pipelined_cmd_cnt); - append("total_pipelined_squashed_commands", m.coordinator_stats.squashed_commands); append("pipeline_throttle_total", conn_stats.pipeline_throttle_count); append("pipelined_latency_usec", conn_stats.pipelined_cmd_latency); append("total_net_input_bytes", conn_stats.io_read_bytes); @@ -2628,9 +2641,6 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio append("eval_shardlocal_coordination_total", m.coordinator_stats.eval_shardlocal_coordination_cnt); append("eval_squashed_flushes", m.coordinator_stats.eval_squashed_flushes); - append("multi_squash_execution_total", m.coordinator_stats.multi_squash_executions); - append("multi_squash_execution_hop_usec", m.coordinator_stats.multi_squash_exec_hop_usec); - append("multi_squash_execution_reply_usec", m.coordinator_stats.multi_squash_exec_reply_usec); }; auto add_repl_info = [&] { diff --git a/tests/dragonfly/connection_test.py b/tests/dragonfly/connection_test.py index 3af54832a..772155538 100755 --- a/tests/dragonfly/connection_test.py +++ b/tests/dragonfly/connection_test.py @@ -561,13 +561,16 @@ async def test_reply_count(async_client: aioredis.Redis): e = async_client.pipeline(transaction=True) for _ in range(100): e.incr("num-1") - assert await measure(e.execute()) == 2 # OK + Response + + # one - for MULTI-OK, one for the rest. Depends on the squashing efficiency, + # can be either 1 or 2 replies. + assert await measure(e.execute()) <= 2 # Just pipeline p = async_client.pipeline(transaction=False) for _ in range(100): p.incr("num-1") - assert await measure(p.execute()) == 1 + assert await measure(p.execute()) <= 2 # Script result assert await measure(async_client.eval('return {1,2,{3,4},5,6,7,8,"nine"}', 0)) == 1 @@ -1118,14 +1121,14 @@ async def test_send_timeout(df_server, async_client: aioredis.Redis): # Test that the cache pipeline does not grow or shrink under constant pipeline load. -@dfly_args({"proactor_threads": 1, "pipeline_squash": 9}) +@dfly_args({"proactor_threads": 1, "pipeline_squash": 9, "max_busy_read_usec": 1000}) async def test_pipeline_cache_only_async_squashed_dispatches(df_factory): server = df_factory.create() server.start() client = server.client() - async def push_pipeline(size=1): + async def push_pipeline(size): p = client.pipeline(transaction=True) for i in range(size): p.info() @@ -1136,14 +1139,15 @@ async def test_pipeline_cache_only_async_squashed_dispatches(df_factory): # should be zero because: # We always dispatch the items that will be squashed, so when `INFO` gets called # the cache is empty because the pipeline consumed it throughout its execution - for i in range(0, 30): + # high max_busy_read_usec ensures that the connection fiber has enough time to push + # all the commands to reach the squashing limit. + for i in range(0, 10): # it's actually 11 commands. 8 INFO + 2 from the MULTI/EXEC block that is injected - # by the client. Connection fiber yields to dispatch/async fiber when - # ++async_streak_len_ >= 10. The minimum to squash is 9 so it will squash the pipeline + # by the client. The minimum to squash is 9 so it will squash the pipeline # and INFO ALL should return zero for all the squashed commands in the pipeline res = await push_pipeline(8) - for i in range(1): - assert res[i]["pipeline_cache_bytes"] == 0 + for r in res: + assert r["pipeline_cache_bytes"] == 0 # Non zero because we reclaimed/recycled the messages back to the cache info = await client.info() diff --git a/tools/local/monitoring/grafana/provisioning/dashboards/dragonfly.json b/tools/local/monitoring/grafana/provisioning/dashboards/dragonfly.json index 88e0476ea..85f1cb9d3 100644 --- a/tools/local/monitoring/grafana/provisioning/dashboards/dragonfly.json +++ b/tools/local/monitoring/grafana/provisioning/dashboards/dragonfly.json @@ -1301,7 +1301,8 @@ "value": 80 } ] - } + }, + "unit": "s" }, "overrides": [] }, @@ -1311,6 +1312,244 @@ "x": 12, "y": 29 }, + "id": 27, + "options": { + "alertThreshold": true, + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "10.1.10", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "exemplar": true, + "expr": + "irate(dragonfly_pipeline_commands_duration_seconds{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])/irate(dragonfly_pipeline_commands_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])", + "format": "time_series", + "interval": "", + "intervalFactor": 2, + "legendFormat": "pipeline", + "range": true, + "refId": "A", + "step": 240 + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "exemplar": true, + "expr": + "irate(dragonfly_cmd_squash_hop_duration_seconds{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])/irate(dragonfly_cmd_squash_hop_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])", + "format": "time_series", + "hide": false, + "interval": "", + "intervalFactor": 2, + "legendFormat": "execute_hop", + "range": true, + "refId": "B", + "step": 240 + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "exemplar": true, + "expr": + "irate(dragonfly_cmd_squash_hop_reply_seconds{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])/irate(dragonfly_cmd_squash_hop_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])", + "format": "time_series", + "hide": false, + "interval": "", + "intervalFactor": 2, + "legendFormat": "reply", + "range": true, + "refId": "C", + "step": 240 + } + ], + "title": "Pipeline Latency", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "links": [], + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 12, + "x": 0, + "y": 36 + }, + "id": 16, + "options": { + "alertThreshold": true, + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "10.1.10", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "exemplar": true, + "expr": "dragonfly_connected_clients{namespace=\"$namespace\",pod=\"$pod_name\"}", + "format": "time_series", + "interval": "", + "intervalFactor": 1, + "legendFormat": "{{pod}}", + "range": true, + "refId": "A" + } + ], + "title": "Dragonfly connected clients", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "links": [], + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 7, + "w": 12, + "x": 12, + "y": 36 + }, "id": 13, "options": { "alertThreshold": true, @@ -1407,7 +1646,6 @@ "mode": "off" } }, - "links": [], "mappings": [], "thresholds": { "mode": "absolute", @@ -1426,14 +1664,13 @@ "overrides": [] }, "gridPos": { - "h": 7, + "h": 8, "w": 12, - "x": 12, - "y": 36 + "x": 0, + "y": 43 }, - "id": 16, + "id": 22, "options": { - "alertThreshold": true, "legend": { "calcs": [], "displayMode": "list", @@ -1453,17 +1690,111 @@ "uid": "${DS_PROMETHEUS}" }, "editorMode": "code", - "exemplar": true, - "expr": "dragonfly_connected_clients{namespace=\"$namespace\",pod=\"$pod_name\"}", - "format": "time_series", - "interval": "", - "intervalFactor": 1, - "legendFormat": "{{pod}}", + "expr": "dragonfly_pipeline_queue_length{namespace=\"$namespace\",pod=~\"$pod_name\"}", + "instant": false, + "legendFormat": "avr_pipeline_depth", "range": true, "refId": "A" } ], - "title": "Dragonfly connected clients", + "title": "Pipeline length", + "type": "timeseries" + }, + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "description": "", + "fieldConfig": { + "defaults": { + "color": { + "mode": "palette-classic" + }, + "custom": { + "axisCenteredZero": false, + "axisColorMode": "text", + "axisLabel": "", + "axisPlacement": "auto", + "barAlignment": 0, + "drawStyle": "line", + "fillOpacity": 0, + "gradientMode": "none", + "hideFrom": { + "legend": false, + "tooltip": false, + "viz": false + }, + "insertNulls": false, + "lineInterpolation": "linear", + "lineWidth": 1, + "pointSize": 5, + "scaleDistribution": { + "type": "linear" + }, + "showPoints": "auto", + "spanNulls": false, + "stacking": { + "group": "A", + "mode": "none" + }, + "thresholdsStyle": { + "mode": "off" + } + }, + "mappings": [], + "thresholds": { + "mode": "absolute", + "steps": [ + { + "color": "green", + "value": null + }, + { + "color": "red", + "value": 80 + } + ] + } + }, + "overrides": [] + }, + "gridPos": { + "h": 8, + "w": 12, + "x": 12, + "y": 43 + }, + "id": 28, + "options": { + "legend": { + "calcs": [], + "displayMode": "list", + "placement": "bottom", + "showLegend": true + }, + "tooltip": { + "mode": "single", + "sort": "none" + } + }, + "pluginVersion": "10.1.10", + "targets": [ + { + "datasource": { + "type": "prometheus", + "uid": "${DS_PROMETHEUS}" + }, + "editorMode": "code", + "expr": + "irate(dragonfly_cmd_squash_commands_total\n{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])/irate(dragonfly_cmd_squash_hop_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])", + "instant": false, + "legendFormat": "__auto", + "range": true, + "refId": "A" + } + ], + "title": "Average Squashing Length", "type": "timeseries" }, { @@ -1472,7 +1803,7 @@ "h": 1, "w": 24, "x": 0, - "y": 43 + "y": 51 }, "id": 19, "panels": [], @@ -1542,7 +1873,7 @@ "h": 8, "w": 12, "x": 0, - "y": 44 + "y": 52 }, "id": 18, "options": { @@ -1656,101 +1987,6 @@ "h": 8, "w": 12, "x": 12, - "y": 44 - }, - "id": 22, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "tooltip": { - "mode": "single", - "sort": "none" - } - }, - "pluginVersion": "10.1.10", - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" - }, - "editorMode": "code", - "expr": "dragonfly_pipeline_queue_length{namespace=\"$namespace\",pod=~\"$pod_name\"}", - "instant": false, - "legendFormat": "avr_pipeline_depth", - "range": true, - "refId": "A" - } - ], - "title": "Pipeline length", - "type": "timeseries" - }, - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } - ] - } - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 0, "y": 52 }, "id": 21, @@ -1798,103 +2034,6 @@ ], "title": "Master Replication memory", "type": "timeseries" - }, - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" - }, - "fieldConfig": { - "defaults": { - "color": { - "mode": "palette-classic" - }, - "custom": { - "axisCenteredZero": false, - "axisColorMode": "text", - "axisLabel": "", - "axisPlacement": "auto", - "barAlignment": 0, - "drawStyle": "line", - "fillOpacity": 0, - "gradientMode": "none", - "hideFrom": { - "legend": false, - "tooltip": false, - "viz": false - }, - "insertNulls": false, - "lineInterpolation": "linear", - "lineWidth": 1, - "pointSize": 5, - "scaleDistribution": { - "type": "linear" - }, - "showPoints": "auto", - "spanNulls": false, - "stacking": { - "group": "A", - "mode": "none" - }, - "thresholdsStyle": { - "mode": "off" - } - }, - "mappings": [], - "thresholds": { - "mode": "absolute", - "steps": [ - { - "color": "green", - "value": null - }, - { - "color": "red", - "value": 80 - } - ] - }, - "unit": "s" - }, - "overrides": [] - }, - "gridPos": { - "h": 8, - "w": 12, - "x": 12, - "y": 52 - }, - "id": 23, - "options": { - "legend": { - "calcs": [], - "displayMode": "list", - "placement": "bottom", - "showLegend": true - }, - "tooltip": { - "mode": "single", - "sort": "none" - } - }, - "pluginVersion": "10.1.10", - "targets": [ - { - "datasource": { - "type": "prometheus", - "uid": "${DS_PROMETHEUS}" - }, - "editorMode": "code", - "expr": - "irate(dragonfly_pipeline_commands_duration_seconds{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])/irate(dragonfly_pipeline_commands_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])", - "instant": false, - "legendFormat": "{{pod}}", - "range": true, - "refId": "A" - } - ], - "title": "Pipeline latency", - "type": "timeseries" } ], "refresh": "10s",