From 8f53c94ecb910b3b469bf11b6a904b4a0bc25102 Mon Sep 17 00:00:00 2001 From: kostas Date: Fri, 11 Apr 2025 17:47:45 +0300 Subject: [PATCH 1/3] chore: event count throttle for squashed commands Signed-off-by: kostas --- src/server/multi_command_squasher.cc | 17 ++++++++++++- src/server/multi_command_squasher.h | 13 ++++++++++ tests/dragonfly/memory_test.py | 38 ++++++++++++++++++++++++++++ 3 files changed, 67 insertions(+), 1 deletion(-) diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 604340a65..5312e08d4 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -6,6 +6,7 @@ #include +#include "base/flags.h" #include "base/logging.h" #include "core/overloaded.h" #include "facade/dragonfly_connection.h" @@ -15,6 +16,8 @@ #include "server/transaction.h" #include "server/tx_base.h" +ABSL_FLAG(size_t, throttle_squashed, 0, ""); + namespace dfly { using namespace std; @@ -63,6 +66,10 @@ size_t Size(const facade::CapturingReplyBuilder::Payload& payload) { } // namespace atomic_uint64_t MultiCommandSquasher::current_reply_size_ = 0; +thread_local size_t MultiCommandSquasher::throttle_size_limit_ = + absl::GetFlag(FLAGS_throttle_squashed); + +thread_local util::fb2::EventCount MultiCommandSquasher::ec_; MultiCommandSquasher::MultiCommandSquasher(absl::Span cmds, ConnectionContext* cntx, Service* service, bool verify_commands, bool error_abort) @@ -201,6 +208,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v crb.SetReplyMode(cmd->ReplyMode()); local_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args); + service_->InvokeCmd(cmd->Cid(), args, &crb, &local_cntx); sinfo.replies.emplace_back(crb.Take()); @@ -222,6 +230,9 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { if (order_.empty()) return true; + MultiCommandSquasher::ec_.await( + []() { return !MultiCommandSquasher::IsMultiCommandSquasherOverLimit(); }); + unsigned num_shards = 0; for (auto& sd : sharded_) { sd.replies.reserve(sd.cmds.size()); @@ -261,19 +272,23 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { uint64_t after_hop = proactor->GetMonotonicTimeNs(); bool aborted = false; + size_t size = 0; for (auto idx : order_) { auto& replies = sharded_[idx].replies; CHECK(!replies.empty()); aborted |= error_abort_ && CapturingReplyBuilder::TryExtractError(replies.back()); - current_reply_size_.fetch_sub(Size(replies.back()), std::memory_order_relaxed); + size += Size(replies.back()); CapturingReplyBuilder::Apply(std::move(replies.back()), rb); replies.pop_back(); if (aborted) break; } + current_reply_size_.fetch_sub(size, std::memory_order_relaxed); + MultiCommandSquasher::ec_.notifyAll(); + uint64_t after_reply = proactor->GetMonotonicTimeNs(); ServerState::SafeTLocal()->stats.multi_squash_exec_hop_usec += (after_hop - start) / 1000; ServerState::SafeTLocal()->stats.multi_squash_exec_reply_usec += (after_reply - after_hop) / 1000; diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index dc4158dd0..183b5afef 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -7,6 +7,7 @@ #include "facade/reply_capture.h" #include "server/conn_context.h" #include "server/main_service.h" +#include "util/fibers/synchronization.h" namespace dfly { @@ -32,6 +33,15 @@ class MultiCommandSquasher { return current_reply_size_.load(std::memory_order_relaxed); } + static bool IsMultiCommandSquasherOverLimit() { + const bool over_limit = + throttle_size_limit_ > 0 && + current_reply_size_.load(std::memory_order_relaxed) > throttle_size_limit_; + VLOG_IF(2, over_limit) << "MultiCommandSquasher overlimit: " << throttle_size_limit_ + << " current reply size " << current_reply_size_; + return over_limit; + } + private: // Per-shard execution info. struct ShardExecInfo { @@ -92,6 +102,9 @@ class MultiCommandSquasher { // we increase size in one thread and decrease in another static atomic_uint64_t current_reply_size_; + static thread_local size_t throttle_size_limit_; + // Used to throttle when memory is tight + static thread_local util::fb2::EventCount ec_; }; } // namespace dfly diff --git a/tests/dragonfly/memory_test.py b/tests/dragonfly/memory_test.py index d871371df..e5df23ca4 100644 --- a/tests/dragonfly/memory_test.py +++ b/tests/dragonfly/memory_test.py @@ -1,4 +1,5 @@ import pytest +import asyncio from redis import asyncio as aioredis from .utility import * import logging @@ -222,3 +223,40 @@ async def test_cache_eviction_with_rss_deny_oom( ) stats_info = await async_client.info("stats") logging.info(f'Current evicted: {stats_info["evicted_keys"]}. Total keys: {num_keys}.') + + +@pytest.mark.asyncio +async def test_throttle_on_commands_squashing_replies_bytes(df_factory: DflyInstanceFactory): + df = df_factory.create( + proactor_threads=2, throttle_squashed=1_000_000_000, vmodule="multi_command_squasher=2" + ) + df.start() + + client = df.client() + # 1gb + await client.execute_command("debug populate 1 test 10000 rand type hash elements 100000") + + async def poll(): + # At any point we should not cross this limit + cl = df.client() + await cl.execute_command("multi") + await cl.execute_command("hgetall test:0") + await cl.execute_command("exec") + + # With the current approach this will overshoot + # await client.execute_command("multi") + # await client.execute_command("hgetall test:0") + # await client.execute_command("hgetall test:0") + # await client.execute_command("hgetall test:0") + # await client.execute_command("hgetall test:0") + # res = await client.execute_command("exec") + tasks = [] + for i in range(50): + tasks.append(asyncio.create_task(poll())) + + for task in tasks: + await task + + df.stop() + found = df.find_in_logs("MultiCommandSquasher overlimit: ") + assert len(found) > 0 From 2faa2a034a8a1d27314d3d02f7ea81f0c10858e6 Mon Sep 17 00:00:00 2001 From: kostas Date: Wed, 23 Apr 2025 14:53:39 +0300 Subject: [PATCH 2/3] fixes --- src/server/multi_command_squasher.cc | 5 ++--- src/server/multi_command_squasher.h | 5 +++-- src/server/server_state.cc | 1 + src/server/server_state.h | 5 +++++ tests/dragonfly/memory_test.py | 9 +++++---- 5 files changed, 16 insertions(+), 9 deletions(-) diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 8b42a5f12..917eaf4c3 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -67,9 +67,8 @@ size_t Size(const facade::CapturingReplyBuilder::Payload& payload) { atomic_uint64_t MultiCommandSquasher::current_reply_size_ = 0; thread_local size_t MultiCommandSquasher::throttle_size_limit_ = - absl::GetFlag(FLAGS_throttle_squashed); - -thread_local util::fb2::EventCount MultiCommandSquasher::ec_; + absl::GetFlag(FLAGS_throttle_squashed) * ServerState::tlocal()->GetTotalShards(); +util::fb2::EventCount MultiCommandSquasher::ec_; MultiCommandSquasher::MultiCommandSquasher(absl::Span cmds, ConnectionContext* cntx, Service* service, const Opts& opts) diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index 4c807e991..bebf5a849 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -104,9 +104,10 @@ class MultiCommandSquasher { // we increase size in one thread and decrease in another static atomic_uint64_t current_reply_size_; - static thread_local size_t throttle_size_limit_; // Used to throttle when memory is tight - static thread_local util::fb2::EventCount ec_; + static util::fb2::EventCount ec_; + + static thread_local size_t throttle_size_limit_; }; } // namespace dfly diff --git a/src/server/server_state.cc b/src/server/server_state.cc index 34344ab77..50bb2bd31 100644 --- a/src/server/server_state.cc +++ b/src/server/server_state.cc @@ -127,6 +127,7 @@ void ServerState::Init(uint32_t thread_index, uint32_t num_shards, util::fb2::Launch::post, "ConnectionsWatcher", [state = state_, main_listener] { state->ConnectionsWatcherFb(main_listener); }); } + state_->total_shards_ = num_shards; } void ServerState::Destroy() { diff --git a/src/server/server_state.h b/src/server/server_state.h index 79a8ec885..97c548a2e 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -270,6 +270,10 @@ class ServerState { // public struct - to allow initialization. bool ShouldLogSlowCmd(unsigned latency_usec) const; + size_t GetTotalShards() const { + return total_shards_; + } + Stats stats; bool is_master = true; @@ -351,6 +355,7 @@ class ServerState { // public struct - to allow initialization. uint64_t used_mem_last_update_ = 0; MemoryUsageStats memory_stats_cached_; // thread local cache of used and rss memory current + size_t total_shards_; static __thread ServerState* state_; }; diff --git a/tests/dragonfly/memory_test.py b/tests/dragonfly/memory_test.py index e5df23ca4..b3c367be7 100644 --- a/tests/dragonfly/memory_test.py +++ b/tests/dragonfly/memory_test.py @@ -228,16 +228,17 @@ async def test_cache_eviction_with_rss_deny_oom( @pytest.mark.asyncio async def test_throttle_on_commands_squashing_replies_bytes(df_factory: DflyInstanceFactory): df = df_factory.create( - proactor_threads=2, throttle_squashed=1_000_000_000, vmodule="multi_command_squasher=2" + proactor_threads=2, throttle_squashed=500_000_000, vmodule="multi_command_squasher=2" ) df.start() client = df.client() - # 1gb - await client.execute_command("debug populate 1 test 10000 rand type hash elements 100000") + # 0.5gb + await client.execute_command("debug populate 1 test 10000 rand type hash elements 50000") async def poll(): # At any point we should not cross this limit + assert df.rss < 1_500_000_000 cl = df.client() await cl.execute_command("multi") await cl.execute_command("hgetall test:0") @@ -251,7 +252,7 @@ async def test_throttle_on_commands_squashing_replies_bytes(df_factory: DflyInst # await client.execute_command("hgetall test:0") # res = await client.execute_command("exec") tasks = [] - for i in range(50): + for i in range(20): tasks.append(asyncio.create_task(poll())) for task in tasks: From 44c607aa7f7ef7360aec222b20d0e1d95981bed6 Mon Sep 17 00:00:00 2001 From: kostas Date: Tue, 6 May 2025 15:14:58 +0300 Subject: [PATCH 3/3] comments --- src/server/main_service.cc | 1 + src/server/multi_command_squasher.cc | 18 +++++++++++++----- src/server/multi_command_squasher.h | 18 +++++++++--------- src/server/server_state.cc | 1 - src/server/server_state.h | 5 ----- tests/dragonfly/memory_test.py | 21 +++++++++------------ 6 files changed, 32 insertions(+), 32 deletions(-) diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 20efa1a4a..44926ffeb 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1435,6 +1435,7 @@ size_t Service::DispatchManyCommands(absl::Span args_list, SinkReply MultiCommandSquasher::Opts opts; opts.verify_commands = true; opts.max_squash_size = ss->max_squash_cmd_num; + opts.is_mult_non_atomic = true; size_t squashed_num = MultiCommandSquasher::Execute(absl::MakeSpan(stored_cmds), static_cast(builder), diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index f4065e1c9..1ea877e24 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -16,7 +16,9 @@ #include "server/transaction.h" #include "server/tx_base.h" -ABSL_FLAG(size_t, throttle_squashed, 0, ""); +ABSL_FLAG(size_t, squashed_reply_size_limit, 0, + "Max bytes allowed for squashing_current_reply_size. If this limit is reached, " + "connections dispatching via pipelines will block until this value is decremented."); namespace dfly { @@ -66,8 +68,8 @@ size_t Size(const facade::CapturingReplyBuilder::Payload& payload) { } // namespace atomic_uint64_t MultiCommandSquasher::current_reply_size_ = 0; -thread_local size_t MultiCommandSquasher::throttle_size_limit_ = - absl::GetFlag(FLAGS_throttle_squashed) * ServerState::tlocal()->GetTotalShards(); +thread_local size_t MultiCommandSquasher::reply_size_limit_ = + absl::GetFlag(FLAGS_squashed_reply_size_limit); util::fb2::EventCount MultiCommandSquasher::ec_; MultiCommandSquasher::MultiCommandSquasher(absl::Span cmds, ConnectionContext* cntx, @@ -214,8 +216,14 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { if (order_.empty()) return true; - MultiCommandSquasher::ec_.await( - []() { return !MultiCommandSquasher::IsMultiCommandSquasherOverLimit(); }); + // Multi non atomic does not lock ahead. So it's safe to preempt while we haven't + // really started the transaction. + // This is not true for `multi/exec` which uses `Execute()` but locks ahead before it + // calls `ScheduleSingleHop` below. + // TODO Investigate what are the side effects for allowing it `lock ahead` mode. + if (opts_.is_mult_non_atomic) { + MultiCommandSquasher::ec_.await([]() { return !MultiCommandSquasher::IsReplySizeOverLimit(); }); + } unsigned num_shards = 0; for (auto& sd : sharded_) { diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index 76271c184..638f1b166 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -24,11 +24,12 @@ namespace dfly { class MultiCommandSquasher { public: struct Opts { - bool verify_commands = false; // Whether commands need to be verified before execution - bool error_abort = false; // Abort upon receiving error + bool verify_commands = false; // Whether commands need to be verified before execution + bool error_abort = false; // Abort upon receiving error + // If MultiCommandSquasher was used from a pipeline and not from multi/exec block + bool is_mult_non_atomic = false; unsigned max_squash_size = 32; // How many commands to squash at once }; - // Returns number of processed commands. static size_t Execute(absl::Span cmds, facade::RedisReplyBuilder* rb, ConnectionContext* cntx, Service* service, const Opts& opts) { @@ -39,11 +40,10 @@ class MultiCommandSquasher { return current_reply_size_.load(std::memory_order_relaxed); } - static bool IsMultiCommandSquasherOverLimit() { - const bool over_limit = - throttle_size_limit_ > 0 && - current_reply_size_.load(std::memory_order_relaxed) > throttle_size_limit_; - VLOG_IF(2, over_limit) << "MultiCommandSquasher overlimit: " << throttle_size_limit_ + static bool IsReplySizeOverLimit() { + const bool over_limit = reply_size_limit_ > 0 && + current_reply_size_.load(std::memory_order_relaxed) > reply_size_limit_; + VLOG_IF(2, over_limit) << "MultiCommandSquasher overlimit: " << reply_size_limit_ << " current reply size " << current_reply_size_; return over_limit; } @@ -110,7 +110,7 @@ class MultiCommandSquasher { // Used to throttle when memory is tight static util::fb2::EventCount ec_; - static thread_local size_t throttle_size_limit_; + static thread_local size_t reply_size_limit_; }; } // namespace dfly diff --git a/src/server/server_state.cc b/src/server/server_state.cc index 50bb2bd31..34344ab77 100644 --- a/src/server/server_state.cc +++ b/src/server/server_state.cc @@ -127,7 +127,6 @@ void ServerState::Init(uint32_t thread_index, uint32_t num_shards, util::fb2::Launch::post, "ConnectionsWatcher", [state = state_, main_listener] { state->ConnectionsWatcherFb(main_listener); }); } - state_->total_shards_ = num_shards; } void ServerState::Destroy() { diff --git a/src/server/server_state.h b/src/server/server_state.h index 97c548a2e..79a8ec885 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -270,10 +270,6 @@ class ServerState { // public struct - to allow initialization. bool ShouldLogSlowCmd(unsigned latency_usec) const; - size_t GetTotalShards() const { - return total_shards_; - } - Stats stats; bool is_master = true; @@ -355,7 +351,6 @@ class ServerState { // public struct - to allow initialization. uint64_t used_mem_last_update_ = 0; MemoryUsageStats memory_stats_cached_; // thread local cache of used and rss memory current - size_t total_shards_; static __thread ServerState* state_; }; diff --git a/tests/dragonfly/memory_test.py b/tests/dragonfly/memory_test.py index b3c367be7..655a7400f 100644 --- a/tests/dragonfly/memory_test.py +++ b/tests/dragonfly/memory_test.py @@ -228,29 +228,26 @@ async def test_cache_eviction_with_rss_deny_oom( @pytest.mark.asyncio async def test_throttle_on_commands_squashing_replies_bytes(df_factory: DflyInstanceFactory): df = df_factory.create( - proactor_threads=2, throttle_squashed=500_000_000, vmodule="multi_command_squasher=2" + proactor_threads=2, + squashed_reply_size_limit=500_000_000, + vmodule="multi_command_squasher=2", ) df.start() client = df.client() # 0.5gb - await client.execute_command("debug populate 1 test 10000 rand type hash elements 50000") + await client.execute_command("debug populate 64 test 3125 rand type hash elements 500") async def poll(): # At any point we should not cross this limit assert df.rss < 1_500_000_000 cl = df.client() - await cl.execute_command("multi") - await cl.execute_command("hgetall test:0") - await cl.execute_command("exec") + pipe = cl.pipeline(transaction=False) + for i in range(64): + pipe.execute_command(f"hgetall test:{i}") + + await pipe.execute() - # With the current approach this will overshoot - # await client.execute_command("multi") - # await client.execute_command("hgetall test:0") - # await client.execute_command("hgetall test:0") - # await client.execute_command("hgetall test:0") - # await client.execute_command("hgetall test:0") - # res = await client.execute_command("exec") tasks = [] for i in range(20): tasks.append(asyncio.create_task(poll()))