diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index e4fd48a46..1cdab6cb2 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -52,8 +52,18 @@ ABSL_FLAG(string, admin_bind, "", ABSL_FLAG(uint64_t, request_cache_limit, 64_MB, "Amount of memory to use for request cache in bytes - per IO thread."); -ABSL_FLAG(uint64_t, pipeline_buffer_limit, 8_MB, - "Amount of memory to use for parsing pipeline requests - per IO thread."); +ABSL_FLAG(uint64_t, pipeline_buffer_limit, 128_MB, + "Amount of memory to use for storing pipeline requests - per IO thread." + "Please note that clients that send excecissively huge pipelines, " + "may deadlock themselves. See https://github.com/dragonflydb/dragonfly/discussions/3997" + "for details."); + +ABSL_FLAG(uint32_t, pipeline_queue_limit, 10000, + "Pipeline queue max length, the server will stop reading from the client socket" + " once its pipeline queue crosses this limit, and will resume once it processes " + "excessive requests. This is to prevent OOM states. Users of huge pipelines sizes " + "may require increasing this limit to prevent the risk of deadlocking." + "See https://github.com/dragonflydb/dragonfly/discussions/3997 for details"); ABSL_FLAG(uint64_t, publish_buffer_limit, 128_MB, "Amount of memory to use for storing pub commands in bytes - per IO thread"); @@ -63,10 +73,6 @@ ABSL_FLAG(bool, no_tls_on_admin_port, false, "Allow non-tls connections on admin ABSL_FLAG(uint32_t, pipeline_squash, 10, "Number of queued pipelined commands above which squashing is enabled, 0 means disabled"); -ABSL_FLAG(uint32_t, pipeline_queue_limit, 1000, - "Pipeline queue max length, the server will stop reading from the client socket" - " once the pipeline reaches this limit"); - // When changing this constant, also update `test_large_cmd` test in connection_test.py. ABSL_FLAG(uint32_t, max_multi_bulk_len, 1u << 16, "Maximum multi-bulk (array) length that is " @@ -1020,6 +1026,10 @@ void Connection::DispatchSingle(bool has_more, absl::FunctionRef invoke_ if (optimize_for_async && queue_backpressure_->IsPipelineBufferOverLimit( stats_->dispatch_queue_bytes, dispatch_q_.size())) { + stats_->pipeline_throttle_count++; + LOG_EVERY_T(WARNING, 10) << "Pipeline buffer over limit: pipeline_bytes " + << stats_->dispatch_queue_bytes << " queue_size " << dispatch_q_.size() + << ", consider increasing pipeline_buffer_limit/pipeline_queue_limit"; fb2::NoOpLock noop; queue_backpressure_->pipeline_cnd.wait(noop, [this] { bool over_limits = queue_backpressure_->IsPipelineBufferOverLimit( @@ -1826,6 +1836,12 @@ void Connection::BreakOnce(uint32_t ev_mask) { void Connection::SetMaxQueueLenThreadLocal(uint32_t val) { tl_queue_backpressure_.pipeline_queue_max_len = val; + tl_queue_backpressure_.pipeline_cnd.notify_all(); +} + +void Connection::SetPipelineBufferLimit(size_t val) { + tl_queue_backpressure_.pipeline_buffer_limit = val; + tl_queue_backpressure_.pipeline_cnd.notify_all(); } void Connection::GetRequestSizeHistogramThreadLocal(std::string* hist) { diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 762581e6c..bc1af66c3 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -312,6 +312,7 @@ class Connection : public util::Connection { // Sets max queue length locally in the calling thread. static void SetMaxQueueLenThreadLocal(uint32_t val); + static void SetPipelineBufferLimit(size_t val); static void GetRequestSizeHistogramThreadLocal(std::string* hist); static void TrackRequestSize(bool enable); diff --git a/src/facade/facade.cc b/src/facade/facade.cc index 698a690d6..52a6d7cea 100644 --- a/src/facade/facade.cc +++ b/src/facade/facade.cc @@ -20,7 +20,7 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats); ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { // To break this code deliberately if we add/remove a field to this struct. - static_assert(kSizeConnStats == 112u); + static_assert(kSizeConnStats == 120u); ADD(read_buf_capacity); ADD(dispatch_queue_entries); @@ -37,6 +37,7 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { ADD(num_replicas); ADD(num_blocked_clients); ADD(num_migrations); + ADD(pipeline_throttle_count); return *this; } diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h index acf8c166f..1dbcc9087 100644 --- a/src/facade/facade_types.h +++ b/src/facade/facade_types.h @@ -109,6 +109,9 @@ struct ConnectionStats { uint32_t num_blocked_clients = 0; uint64_t num_migrations = 0; + // Number of events when the pipeline queue was over the limit and was throttled. + uint64_t pipeline_throttle_count = 0; + ConnectionStats& operator+=(const ConnectionStats& o); }; diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc index fea16d66b..68186af5a 100644 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -126,7 +126,7 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) { send_active_ = true; tl_facade_stats->reply_stats.io_write_cnt++; tl_facade_stats->reply_stats.io_write_bytes += bsize; - DVLOG(2) << "Writing " << bsize << " bytes of len " << len; + DVLOG(2) << "Writing " << bsize + batch_.size() << " bytes of len " << len; if (batch_.empty()) { ec = sink_->Write(v, len); diff --git a/src/server/acl/acl_family.cc b/src/server/acl/acl_family.cc index 06e47ece8..fe1281fb5 100644 --- a/src/server/acl/acl_family.cc +++ b/src/server/acl/acl_family.cc @@ -706,16 +706,6 @@ void AclFamily::Init(facade::Listener* main_listener, UserRegistry* registry) { return; } registry_->Init(&CategoryToIdx(), &reverse_cat_table_, &CategoryToCommandsIndex()); - config_registry.RegisterMutable("aclfile"); - config_registry.RegisterMutable("acllog_max_len", [this](const absl::CommandLineFlag& flag) { - auto res = flag.TryGet(); - if (res.has_value()) { - pool_->AwaitFiberOnAll([&res](auto index, auto* context) { - ServerState::tlocal()->acl_log.SetTotalEntries(res.value()); - }); - } - return res.has_value(); - }); } std::string AclFamily::AclCatToString(uint32_t acl_category, User::Sign sign) const { diff --git a/src/server/acl/acl_log.cc b/src/server/acl/acl_log.cc index 7ab4ca75e..05641cb64 100644 --- a/src/server/acl/acl_log.cc +++ b/src/server/acl/acl_log.cc @@ -12,7 +12,7 @@ #include "facade/dragonfly_connection.h" #include "server/conn_context.h" -ABSL_FLAG(size_t, acllog_max_len, 32, +ABSL_FLAG(uint32_t, acllog_max_len, 32, "Specify the number of log entries. Logs are kept locally for each thread " "and therefore the total number of entries are acllog_max_len * threads"); diff --git a/src/server/config_registry.h b/src/server/config_registry.h index eccd3c987..12c01b94b 100644 --- a/src/server/config_registry.h +++ b/src/server/config_registry.h @@ -26,6 +26,18 @@ class ConfigRegistry { return *this; } + template + ConfigRegistry& RegisterSetter(std::string_view name, std::function f) { + return RegisterMutable(name, [f](const absl::CommandLineFlag& flag) { + auto res = flag.TryGet(); + if (res.has_value()) { + f(*res); + return true; + } + return false; + }); + } + enum class SetResult : uint8_t { OK, UNKNOWN, diff --git a/src/server/main_service.cc b/src/server/main_service.cc index fdb0ede99..69ec162c5 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -883,14 +883,8 @@ Service::~Service() { void Service::Init(util::AcceptServer* acceptor, std::vector listeners) { InitRedisTables(); - config_registry.RegisterMutable("maxmemory", [](const absl::CommandLineFlag& flag) { - auto res = flag.TryGet(); - if (!res) - return false; - - max_memory_limit = res->value; - return true; - }); + config_registry.RegisterSetter( + "maxmemory", [](const MemoryBytesFlag& flag) { max_memory_limit = flag.value; }); config_registry.RegisterMutable("dbfilename"); config_registry.Register("dbnum"); // equivalent to databases in redis. @@ -901,32 +895,24 @@ void Service::Init(util::AcceptServer* acceptor, std::vector config_registry.RegisterMutable("max_eviction_per_heartbeat"); config_registry.RegisterMutable("max_segment_to_consider"); - config_registry.RegisterMutable("oom_deny_ratio", [](const absl::CommandLineFlag& flag) { - auto res = flag.TryGet(); - if (res.has_value()) { - SetOomDenyRatioOnAllThreads(*res); - } - return res.has_value(); + config_registry.RegisterSetter("oom_deny_ratio", + [](double val) { SetOomDenyRatioOnAllThreads(val); }); + + config_registry.RegisterSetter("rss_oom_deny_ratio", + [](double val) { SetRssOomDenyRatioOnAllThreads(val); }); + + config_registry.RegisterMutable("pipeline_squash"); + + config_registry.RegisterSetter("pipeline_queue_limit", [](uint32_t val) { + shard_set->pool()->AwaitBrief( + [val](unsigned, auto*) { facade::Connection::SetMaxQueueLenThreadLocal(val); }); }); - config_registry.RegisterMutable("rss_oom_deny_ratio", [](const absl::CommandLineFlag& flag) { - auto res = flag.TryGet(); - if (res.has_value()) { - SetRssOomDenyRatioOnAllThreads(*res); - } - return res.has_value(); + config_registry.RegisterSetter("pipeline_buffer_limit", [](size_t val) { + shard_set->pool()->AwaitBrief( + [val](unsigned, auto*) { facade::Connection::SetPipelineBufferLimit(val); }); }); - config_registry.RegisterMutable("pipeline_squash"); - config_registry.RegisterMutable("pipeline_queue_limit", - [pool = &pp_](const absl::CommandLineFlag& flag) { - auto res = flag.TryGet(); - if (res.has_value()) { - pool->AwaitBrief([val = *res](unsigned, auto*) { - facade::Connection::SetMaxQueueLenThreadLocal(val); - }); - } - return res.has_value(); - }); + config_registry.RegisterMutable("replica_partial_sync"); config_registry.RegisterMutable("replication_timeout"); config_registry.RegisterMutable("table_growth_margin"); @@ -951,6 +937,12 @@ void Service::Init(util::AcceptServer* acceptor, std::vector return true; }); + config_registry.RegisterMutable("aclfile"); + config_registry.RegisterSetter("acllog_max_len", [](uint32_t val) { + shard_set->pool()->AwaitFiberOnAll( + [val](auto index, auto* context) { ServerState::tlocal()->acl_log.SetTotalEntries(val); }); + }); + serialization_max_chunk_size = GetFlag(FLAGS_serialization_max_chunk_size); uint32_t shard_num = GetFlag(FLAGS_num_shards); if (shard_num == 0 || shard_num > pp_.size()) { diff --git a/src/server/server_family.cc b/src/server/server_family.cc index f39503cdc..76b6e0a2d 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -816,12 +816,8 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vectorpool()->size() << " threads"; SetMaxClients(listeners_, absl::GetFlag(FLAGS_maxclients)); - config_registry.RegisterMutable("maxclients", [this](const absl::CommandLineFlag& flag) { - auto res = flag.TryGet(); - if (res.has_value()) - SetMaxClients(listeners_, res.value()); - return res.has_value(); - }); + config_registry.RegisterSetter( + "maxclients", [this](uint32_t val) { SetMaxClients(listeners_, val); }); SetSlowLogThreshold(service_.proactor_pool(), absl::GetFlag(FLAGS_slowlog_log_slower_than)); config_registry.RegisterMutable("slowlog_log_slower_than", @@ -832,12 +828,8 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector(); - if (res.has_value()) - SetSlowLogMaxLen(service_.proactor_pool(), res.value()); - return res.has_value(); - }); + config_registry.RegisterSetter( + "slowlog_max_len", [this](uint32_t val) { SetSlowLogMaxLen(service_.proactor_pool(), val); }); // We only reconfigure TLS when the 'tls' config key changes. Therefore to // update TLS certs, first update tls_cert_file, then set 'tls true'. @@ -1280,6 +1272,8 @@ void PrintPrometheusMetrics(const Metrics& m, DflyCmd* dfly_cmd, StringResponse* MetricType::GAUGE, &resp->body()); AppendMetricWithoutLabels("pipeline_queue_length", "", conn_stats.dispatch_queue_entries, MetricType::GAUGE, &resp->body()); + AppendMetricWithoutLabels("pipeline_throttle_total", "", conn_stats.pipeline_throttle_count, + MetricType::COUNTER, &resp->body()); AppendMetricWithoutLabels("pipeline_cmd_cache_bytes", "", conn_stats.pipeline_cmd_cache_bytes, MetricType::GAUGE, &resp->body()); AppendMetricWithoutLabels("pipeline_commands_total", "", conn_stats.pipelined_cmd_cnt, @@ -2298,6 +2292,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("instantaneous_ops_per_sec", m.qps); append("total_pipelined_commands", conn_stats.pipelined_cmd_cnt); append("total_pipelined_squashed_commands", m.coordinator_stats.squashed_commands); + append("pipeline_throttle_total", conn_stats.pipeline_throttle_count); append("pipelined_latency_usec", conn_stats.pipelined_cmd_latency); append("total_net_input_bytes", conn_stats.io_read_bytes); append("connection_migrations", conn_stats.num_migrations); @@ -2327,9 +2322,13 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("defrag_task_invocation_total", m.shard_stats.defrag_task_invocation_total); append("reply_count", reply_stats.send_stats.count); append("reply_latency_usec", reply_stats.send_stats.total_duration); + + // Number of connections that are currently blocked on grabbing interpreter. append("blocked_on_interpreter", m.coordinator_stats.blocked_on_interpreter); append("lua_interpreter_cnt", m.lua_stats.interpreter_cnt); - append("lua_blocked", m.lua_stats.blocked_cnt); + + // Total number of events of when a connection was blocked on grabbing interpreter. + append("lua_blocked_total", m.lua_stats.blocked_cnt); } if (should_enter("TIERED", true)) { diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index b24b4f7ef..c3a02285b 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -31,7 +31,7 @@ ABSL_DECLARE_FLAG(string, dbfilename); ABSL_DECLARE_FLAG(double, rss_oom_deny_ratio); ABSL_DECLARE_FLAG(uint32_t, num_shards); ABSL_FLAG(bool, force_epoll, false, "If true, uses epoll api instead iouring to run tests"); -ABSL_DECLARE_FLAG(size_t, acllog_max_len); +ABSL_DECLARE_FLAG(uint32_t, acllog_max_len); namespace dfly { std::ostream& operator<<(std::ostream& os, const DbStats& stats) {