chore: Pipelining fixes (#4994)

Fixes #4998.
1. Reduces agressive yielding when reading multiple requests since it humpers pipeline efficiency.
   Now we yield consistently based on cpu time spend since the last resume point (via flag with sane defaults).
2. Increases socket read buffer size effectively allowing processing more requests in bulk.

`./dragonfly  --cluster_mode=emulated`
latencies (usec) for pipeline sizes 80-199:
p50: 1887, p75: 2367, p90: 2897, p99: 6266

`./dragonfly  --cluster_mode=emulated --experimental_cluster_shard_by_slot`
latencies (usec) for pipeline sizes 80-199:
p50: 813, p75: 976, p90: 1216, p99: 3528

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2025-04-27 20:48:02 +03:00 committed by GitHub
parent ff7d9b79c6
commit 6d30baa20b
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 422 additions and 273 deletions

View file

@ -13,6 +13,7 @@
#include <numeric>
#include <variant>
#include "base/cycle_clock.h"
#include "base/flags.h"
#include "base/histogram.h"
#include "base/io_buf.h"
@ -23,7 +24,9 @@
#include "facade/memcache_parser.h"
#include "facade/redis_parser.h"
#include "facade/service_interface.h"
#include "glog/logging.h"
#include "io/file.h"
#include "util/fibers/fibers.h"
#include "util/fibers/proactor_base.h"
#ifdef DFLY_USE_SSL
@ -93,6 +96,10 @@ ABSL_FLAG(bool, migrate_connections, true,
"they operate. Currently this is only supported for Lua script invocations, and can "
"happen at most once per connection.");
ABSL_FLAG(uint32_t, max_busy_read_usec, 100,
"Maximum time we read and parse from "
"a socket without yielding. In microseconds.");
using namespace util;
using namespace std;
using absl::GetFlag;
@ -146,7 +153,7 @@ struct TrafficLogger {
void TrafficLogger::ResetLocked() {
if (log_file) {
log_file->Close();
std::ignore = log_file->Close();
log_file.reset();
}
}
@ -196,7 +203,7 @@ void OpenTrafficLogger(string_view base_path) {
// Write version, incremental numbering :)
uint8_t version[1] = {2};
tl_traffic_logger.log_file->Write(version);
std::ignore = tl_traffic_logger.log_file->Write(version);
}
void LogTraffic(uint32_t id, bool has_more, absl::Span<RespExpr> resp,
@ -876,6 +883,7 @@ pair<string, string> Connection::GetClientInfoBeforeAfterTid() const {
absl::StrAppend(&after, " irqmatch=", int(cpu == my_cpu_id));
if (dispatch_q_.size()) {
absl::StrAppend(&after, " pipeline=", dispatch_q_.size());
absl::StrAppend(&after, " pbuf=", pending_pipeline_bytes_);
}
absl::StrAppend(&after, " age=", now - creation_time_, " idle=", now - last_interaction_);
string_view phase_name = PHASE_NAMES[phase_];
@ -1028,7 +1036,7 @@ void Connection::ConnectionFlow() {
if (io_buf_.InputLen() > 0) {
phase_ = PROCESS;
if (redis_parser_) {
parse_status = ParseRedis();
parse_status = ParseRedis(10000);
} else {
DCHECK(memcache_parser_);
parse_status = ParseMemcache();
@ -1136,19 +1144,6 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_
// Dispatch async if we're handling a pipeline or if we can't dispatch sync.
if (optimize_for_async || !can_dispatch_sync) {
SendAsync(cmd_msg_cb());
auto epoch = fb2::FiberSwitchEpoch();
if (async_fiber_epoch_ == epoch) {
// If we pushed too many items without context switching - yield
if (++async_streak_len_ >= 10 && !cc_->async_dispatch) {
async_streak_len_ = 0;
ThisFiber::Yield();
}
} else {
async_streak_len_ = 0;
async_fiber_epoch_ = epoch;
}
} else {
ShrinkPipelinePool(); // Gradually release pipeline request pool.
{
@ -1164,20 +1159,17 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef<void()> invoke_
}
}
Connection::ParserStatus Connection::ParseRedis() {
Connection::ParserStatus Connection::ParseRedis(unsigned max_busy_cycles) {
uint32_t consumed = 0;
RedisParser::Result result = RedisParser::OK;
// Re-use connection local resources to reduce allocations
RespVec& parse_args = tmp_parse_args_;
CmdArgVec& cmd_vec = tmp_cmd_vec_;
auto dispatch_sync = [this, &parse_args, &cmd_vec] {
RespExpr::VecToArgList(parse_args, &cmd_vec);
service_->DispatchCommand(absl::MakeSpan(cmd_vec), reply_builder_.get(), cc_.get());
auto dispatch_sync = [this] {
RespExpr::VecToArgList(tmp_parse_args_, &tmp_cmd_vec_);
service_->DispatchCommand(absl::MakeSpan(tmp_cmd_vec_), reply_builder_.get(), cc_.get());
};
auto dispatch_async = [this, &parse_args, tlh = mi_heap_get_backing()]() -> MessageHandle {
return {FromArgs(std::move(parse_args), tlh)};
auto dispatch_async = [this, tlh = mi_heap_get_backing()]() -> MessageHandle {
return {FromArgs(std::move(tmp_parse_args_), tlh)};
};
ReadBuffer read_buffer = GetReadBuffer();
@ -1186,10 +1178,10 @@ Connection::ParserStatus Connection::ParseRedis() {
if (read_buffer.ShouldAdvance()) { // can happen only with io_uring/bundles
read_buffer.slice = NextBundleBuffer(read_buffer.available_bytes);
}
result = redis_parser_->Parse(read_buffer.slice, &consumed, &parse_args);
result = redis_parser_->Parse(read_buffer.slice, &consumed, &tmp_parse_args_);
request_consumed_bytes_ += consumed;
if (result == RedisParser::OK && !parse_args.empty()) {
if (RespExpr& first = parse_args.front(); first.type == RespExpr::STRING)
if (result == RedisParser::OK && !tmp_parse_args_.empty()) {
if (RespExpr& first = tmp_parse_args_.front(); first.type == RespExpr::STRING)
DVLOG(2) << "Got Args with first token " << ToSV(first.GetBuf());
if (io_req_size_hist)
@ -1198,12 +1190,20 @@ Connection::ParserStatus Connection::ParseRedis() {
bool has_more = consumed < read_buffer.available_bytes;
if (tl_traffic_logger.log_file && IsMain() /* log only on the main interface */) {
LogTraffic(id_, has_more, absl::MakeSpan(parse_args), service_->GetContextInfo(cc_.get()));
LogTraffic(id_, has_more, absl::MakeSpan(tmp_parse_args_),
service_->GetContextInfo(cc_.get()));
}
DispatchSingle(has_more, dispatch_sync, dispatch_async);
}
read_buffer.Consume(consumed);
// We must yield from time to time to allow other fibers to run.
// Specifically, if a client sends a huge chunk of data resulting in a very long pipeline,
// we want to yield to allow AsyncFiber to actually execute on the pending pipeline.
if (ThisFiber::GetRunningTimeCycles() > max_busy_cycles) {
ThisFiber::Yield();
}
} while (RedisParser::OK == result && read_buffer.available_bytes > 0 &&
!reply_builder_->GetError());
@ -1390,6 +1390,9 @@ auto Connection::IoLoop() -> variant<error_code, ParserStatus> {
ParserStatus parse_status = OK;
size_t max_iobfuf_len = GetFlag(FLAGS_max_client_iobuf_len);
unsigned max_busy_read_cycles =
(base::CycleClock::Frequency() * GetFlag(FLAGS_max_busy_read_usec)) / 1000000U;
auto* peer = socket_.get();
recv_buf_.res_len = 0;
@ -1404,12 +1407,16 @@ auto Connection::IoLoop() -> variant<error_code, ParserStatus> {
bool is_iobuf_full = io_buf_.AppendLen() == 0;
if (redis_parser_) {
parse_status = ParseRedis();
parse_status = ParseRedis(max_busy_read_cycles);
} else {
DCHECK(memcache_parser_);
parse_status = ParseMemcache();
}
if (reply_builder_->GetError()) {
return reply_builder_->GetError();
}
if (parse_status == NEED_MORE) {
parse_status = OK;
@ -1429,11 +1436,9 @@ auto Connection::IoLoop() -> variant<error_code, ParserStatus> {
[&]() { io_buf_.Reserve(std::min(max_iobfuf_len, parser_hint)); });
}
// If we got a partial request and we couldn't parse the length, just
// double the capacity.
// If we got a partial request because iobuf was full, grow it up to
// a reasonable limit to save on Recv() calls.
if (io_buf_.AppendLen() < 64u || (is_iobuf_full && capacity < 4096)) {
if (is_iobuf_full && capacity < max_iobfuf_len / 2) {
// Last io used most of the io_buf to the end.
UpdateIoBufCapacity(io_buf_, stats_, [&]() {
io_buf_.Reserve(capacity * 2); // Valid growth range.
@ -1441,21 +1446,11 @@ auto Connection::IoLoop() -> variant<error_code, ParserStatus> {
}
DCHECK_GT(io_buf_.AppendLen(), 0U);
} else if (io_buf_.AppendLen() == 0) {
// We have a full buffer and we can not progress with parsing.
// This means that we have request too large.
LOG(ERROR) << "Request is too large, closing connection";
parse_status = ERROR;
break;
}
} else if (parse_status != OK) {
break;
}
ec = reply_builder_->GetError();
} while (peer->IsOpen() && !ec);
if (ec)
return ec;
} while (peer->IsOpen());
return parse_status;
}
@ -1833,6 +1828,7 @@ void Connection::SendAsync(MessageHandle msg) {
// Squashing is only applied to redis commands
if (std::holds_alternative<PipelineMessagePtr>(msg.handle)) {
pending_pipeline_cmd_cnt_++;
pending_pipeline_bytes_ += used_mem;
}
if (msg.IsControl()) {
@ -1869,7 +1865,10 @@ void Connection::RecycleMessage(MessageHandle msg) {
// Retain pipeline message in pool.
if (auto* pipe = get_if<PipelineMessagePtr>(&msg.handle); pipe) {
DCHECK_GE(pending_pipeline_bytes_, used_mem);
DCHECK_GE(pending_pipeline_cmd_cnt_, 1u);
pending_pipeline_cmd_cnt_--;
pending_pipeline_bytes_ -= used_mem;
if (stats_->pipeline_cmd_cache_bytes < qbp.pipeline_cache_limit) {
stats_->pipeline_cmd_cache_bytes += (*pipe)->StorageCapacity();
pipeline_req_pool_.push_back(std::move(*pipe));

View file

@ -367,7 +367,7 @@ class Connection : public util::Connection {
// Create new pipeline request, re-use from pool when possible.
PipelineMessagePtr FromArgs(RespVec args, mi_heap_t* heap);
ParserStatus ParseRedis();
ParserStatus ParseRedis(unsigned max_busy_cycles);
ParserStatus ParseMemcache();
void OnBreakCb(int32_t mask);
@ -427,6 +427,7 @@ class Connection : public util::Connection {
util::fb2::Fiber async_fb_; // async fiber (if started)
uint64_t pending_pipeline_cmd_cnt_ = 0; // how many queued Redis async commands in dispatch_q
size_t pending_pipeline_bytes_ = 0; // how many bytes of the queued Redis async commands
// how many bytes of the current request have been consumed
size_t request_consumed_bytes_ = 0;
@ -455,10 +456,6 @@ class Connection : public util::Connection {
unsigned parser_error_ = 0;
// amount of times we enqued requests asynchronously during the same async_fiber_epoch_.
unsigned async_streak_len_ = 0;
uint64_t async_fiber_epoch_ = 0;
BreakerCb breaker_cb_;
// Used by redis parser to avoid allocations

View file

@ -116,8 +116,8 @@ ABSL_FLAG(size_t, serialization_max_chunk_size, 64_KB,
"Maximum size of a value that may be serialized at once during snapshotting or full "
"sync. Values bigger than this threshold will be serialized using streaming "
"serialization. 0 - to disable streaming mode");
ABSL_FLAG(uint32_t, max_squashed_cmd_num, 32,
"Max number of commands squashed in command squash optimizaiton");
ABSL_FLAG(uint32_t, max_squashed_cmd_num, 100,
"Max number of commands squashed in a single shard during squash optimizaiton");
namespace dfly {

View file

@ -1314,6 +1314,20 @@ void PrintPrometheusMetrics(uint64_t uptime, const Metrics& m, DflyCmd* dfly_cmd
AppendMetricWithoutLabels("pipeline_commands_duration_seconds", "",
conn_stats.pipelined_cmd_latency * 1e-6, MetricType::COUNTER,
&resp->body());
AppendMetricWithoutLabels("cmd_squash_hop_total", "", m.coordinator_stats.multi_squash_executions,
MetricType::COUNTER, &resp->body());
AppendMetricWithoutLabels("cmd_squash_commands_total", "", m.coordinator_stats.squashed_commands,
MetricType::COUNTER, &resp->body());
AppendMetricWithoutLabels("cmd_squash_hop_duration_seconds", "",
m.coordinator_stats.multi_squash_exec_hop_usec * 1e-6,
MetricType::COUNTER, &resp->body());
AppendMetricWithoutLabels("cmd_squash_hop_reply_seconds", "",
m.coordinator_stats.multi_squash_exec_reply_usec * 1e-6,
MetricType::COUNTER, &resp->body());
AppendMetricWithoutLabels("commands_squashing_replies_bytes", "",
MultiCommandSquasher::GetRepliesMemSize(), MetricType::GAUGE,
&resp->body());
@ -2486,7 +2500,6 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
append("total_commands_processed", conn_stats.command_cnt_main + conn_stats.command_cnt_other);
append("instantaneous_ops_per_sec", m.qps);
append("total_pipelined_commands", conn_stats.pipelined_cmd_cnt);
append("total_pipelined_squashed_commands", m.coordinator_stats.squashed_commands);
append("pipeline_throttle_total", conn_stats.pipeline_throttle_count);
append("pipelined_latency_usec", conn_stats.pipelined_cmd_latency);
append("total_net_input_bytes", conn_stats.io_read_bytes);
@ -2628,9 +2641,6 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
append("eval_shardlocal_coordination_total",
m.coordinator_stats.eval_shardlocal_coordination_cnt);
append("eval_squashed_flushes", m.coordinator_stats.eval_squashed_flushes);
append("multi_squash_execution_total", m.coordinator_stats.multi_squash_executions);
append("multi_squash_execution_hop_usec", m.coordinator_stats.multi_squash_exec_hop_usec);
append("multi_squash_execution_reply_usec", m.coordinator_stats.multi_squash_exec_reply_usec);
};
auto add_repl_info = [&] {

View file

@ -561,13 +561,16 @@ async def test_reply_count(async_client: aioredis.Redis):
e = async_client.pipeline(transaction=True)
for _ in range(100):
e.incr("num-1")
assert await measure(e.execute()) == 2 # OK + Response
# one - for MULTI-OK, one for the rest. Depends on the squashing efficiency,
# can be either 1 or 2 replies.
assert await measure(e.execute()) <= 2
# Just pipeline
p = async_client.pipeline(transaction=False)
for _ in range(100):
p.incr("num-1")
assert await measure(p.execute()) == 1
assert await measure(p.execute()) <= 2
# Script result
assert await measure(async_client.eval('return {1,2,{3,4},5,6,7,8,"nine"}', 0)) == 1
@ -1118,14 +1121,14 @@ async def test_send_timeout(df_server, async_client: aioredis.Redis):
# Test that the cache pipeline does not grow or shrink under constant pipeline load.
@dfly_args({"proactor_threads": 1, "pipeline_squash": 9})
@dfly_args({"proactor_threads": 1, "pipeline_squash": 9, "max_busy_read_usec": 1000})
async def test_pipeline_cache_only_async_squashed_dispatches(df_factory):
server = df_factory.create()
server.start()
client = server.client()
async def push_pipeline(size=1):
async def push_pipeline(size):
p = client.pipeline(transaction=True)
for i in range(size):
p.info()
@ -1136,14 +1139,15 @@ async def test_pipeline_cache_only_async_squashed_dispatches(df_factory):
# should be zero because:
# We always dispatch the items that will be squashed, so when `INFO` gets called
# the cache is empty because the pipeline consumed it throughout its execution
for i in range(0, 30):
# high max_busy_read_usec ensures that the connection fiber has enough time to push
# all the commands to reach the squashing limit.
for i in range(0, 10):
# it's actually 11 commands. 8 INFO + 2 from the MULTI/EXEC block that is injected
# by the client. Connection fiber yields to dispatch/async fiber when
# ++async_streak_len_ >= 10. The minimum to squash is 9 so it will squash the pipeline
# by the client. The minimum to squash is 9 so it will squash the pipeline
# and INFO ALL should return zero for all the squashed commands in the pipeline
res = await push_pipeline(8)
for i in range(1):
assert res[i]["pipeline_cache_bytes"] == 0
for r in res:
assert r["pipeline_cache_bytes"] == 0
# Non zero because we reclaimed/recycled the messages back to the cache
info = await client.info()

View file

@ -1301,7 +1301,8 @@
"value": 80
}
]
}
},
"unit": "s"
},
"overrides": []
},
@ -1311,6 +1312,244 @@
"x": 12,
"y": 29
},
"id": 27,
"options": {
"alertThreshold": true,
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "10.1.10",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"exemplar": true,
"expr":
"irate(dragonfly_pipeline_commands_duration_seconds{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])/irate(dragonfly_pipeline_commands_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])",
"format": "time_series",
"interval": "",
"intervalFactor": 2,
"legendFormat": "pipeline",
"range": true,
"refId": "A",
"step": 240
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"exemplar": true,
"expr":
"irate(dragonfly_cmd_squash_hop_duration_seconds{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])/irate(dragonfly_cmd_squash_hop_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])",
"format": "time_series",
"hide": false,
"interval": "",
"intervalFactor": 2,
"legendFormat": "execute_hop",
"range": true,
"refId": "B",
"step": 240
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"exemplar": true,
"expr":
"irate(dragonfly_cmd_squash_hop_reply_seconds{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])/irate(dragonfly_cmd_squash_hop_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])",
"format": "time_series",
"hide": false,
"interval": "",
"intervalFactor": 2,
"legendFormat": "reply",
"range": true,
"refId": "C",
"step": 240
}
],
"title": "Pipeline Latency",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"links": [],
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 7,
"w": 12,
"x": 0,
"y": 36
},
"id": 16,
"options": {
"alertThreshold": true,
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "10.1.10",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"exemplar": true,
"expr": "dragonfly_connected_clients{namespace=\"$namespace\",pod=\"$pod_name\"}",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{pod}}",
"range": true,
"refId": "A"
}
],
"title": "Dragonfly connected clients",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"links": [],
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 7,
"w": 12,
"x": 12,
"y": 36
},
"id": 13,
"options": {
"alertThreshold": true,
@ -1407,7 +1646,6 @@
"mode": "off"
}
},
"links": [],
"mappings": [],
"thresholds": {
"mode": "absolute",
@ -1426,14 +1664,13 @@
"overrides": []
},
"gridPos": {
"h": 7,
"h": 8,
"w": 12,
"x": 12,
"y": 36
"x": 0,
"y": 43
},
"id": 16,
"id": 22,
"options": {
"alertThreshold": true,
"legend": {
"calcs": [],
"displayMode": "list",
@ -1453,17 +1690,111 @@
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"exemplar": true,
"expr": "dragonfly_connected_clients{namespace=\"$namespace\",pod=\"$pod_name\"}",
"format": "time_series",
"interval": "",
"intervalFactor": 1,
"legendFormat": "{{pod}}",
"expr": "dragonfly_pipeline_queue_length{namespace=\"$namespace\",pod=~\"$pod_name\"}",
"instant": false,
"legendFormat": "avr_pipeline_depth",
"range": true,
"refId": "A"
}
],
"title": "Dragonfly connected clients",
"title": "Pipeline length",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"description": "",
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 43
},
"id": 28,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "10.1.10",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr":
"irate(dragonfly_cmd_squash_commands_total\n{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])/irate(dragonfly_cmd_squash_hop_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])",
"instant": false,
"legendFormat": "__auto",
"range": true,
"refId": "A"
}
],
"title": "Average Squashing Length",
"type": "timeseries"
},
{
@ -1472,7 +1803,7 @@
"h": 1,
"w": 24,
"x": 0,
"y": 43
"y": 51
},
"id": 19,
"panels": [],
@ -1542,7 +1873,7 @@
"h": 8,
"w": 12,
"x": 0,
"y": 44
"y": 52
},
"id": 18,
"options": {
@ -1656,101 +1987,6 @@
"h": 8,
"w": 12,
"x": 12,
"y": 44
},
"id": 22,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "10.1.10",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr": "dragonfly_pipeline_queue_length{namespace=\"$namespace\",pod=~\"$pod_name\"}",
"instant": false,
"legendFormat": "avr_pipeline_depth",
"range": true,
"refId": "A"
}
],
"title": "Pipeline length",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
}
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 0,
"y": 52
},
"id": 21,
@ -1798,103 +2034,6 @@
],
"title": "Master Replication memory",
"type": "timeseries"
},
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"fieldConfig": {
"defaults": {
"color": {
"mode": "palette-classic"
},
"custom": {
"axisCenteredZero": false,
"axisColorMode": "text",
"axisLabel": "",
"axisPlacement": "auto",
"barAlignment": 0,
"drawStyle": "line",
"fillOpacity": 0,
"gradientMode": "none",
"hideFrom": {
"legend": false,
"tooltip": false,
"viz": false
},
"insertNulls": false,
"lineInterpolation": "linear",
"lineWidth": 1,
"pointSize": 5,
"scaleDistribution": {
"type": "linear"
},
"showPoints": "auto",
"spanNulls": false,
"stacking": {
"group": "A",
"mode": "none"
},
"thresholdsStyle": {
"mode": "off"
}
},
"mappings": [],
"thresholds": {
"mode": "absolute",
"steps": [
{
"color": "green",
"value": null
},
{
"color": "red",
"value": 80
}
]
},
"unit": "s"
},
"overrides": []
},
"gridPos": {
"h": 8,
"w": 12,
"x": 12,
"y": 52
},
"id": 23,
"options": {
"legend": {
"calcs": [],
"displayMode": "list",
"placement": "bottom",
"showLegend": true
},
"tooltip": {
"mode": "single",
"sort": "none"
}
},
"pluginVersion": "10.1.10",
"targets": [
{
"datasource": {
"type": "prometheus",
"uid": "${DS_PROMETHEUS}"
},
"editorMode": "code",
"expr":
"irate(dragonfly_pipeline_commands_duration_seconds{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])/irate(dragonfly_pipeline_commands_total{namespace=\"$namespace\",pod=~\"$pod_name\"}[$__rate_interval])",
"instant": false,
"legendFormat": "{{pod}}",
"range": true,
"refId": "A"
}
],
"title": "Pipeline latency",
"type": "timeseries"
}
],
"refresh": "10s",