This commit is contained in:
kostas 2025-05-06 15:14:58 +03:00
parent 8e3134e7ba
commit 44c607aa7f
No known key found for this signature in database
GPG key ID: 1860AC7B1177CACB
6 changed files with 32 additions and 32 deletions

View file

@ -1435,6 +1435,7 @@ size_t Service::DispatchManyCommands(absl::Span<CmdArgList> args_list, SinkReply
MultiCommandSquasher::Opts opts;
opts.verify_commands = true;
opts.max_squash_size = ss->max_squash_cmd_num;
opts.is_mult_non_atomic = true;
size_t squashed_num = MultiCommandSquasher::Execute(absl::MakeSpan(stored_cmds),
static_cast<RedisReplyBuilder*>(builder),

View file

@ -16,7 +16,9 @@
#include "server/transaction.h"
#include "server/tx_base.h"
ABSL_FLAG(size_t, throttle_squashed, 0, "");
ABSL_FLAG(size_t, squashed_reply_size_limit, 0,
"Max bytes allowed for squashing_current_reply_size. If this limit is reached, "
"connections dispatching via pipelines will block until this value is decremented.");
namespace dfly {
@ -66,8 +68,8 @@ size_t Size(const facade::CapturingReplyBuilder::Payload& payload) {
} // namespace
atomic_uint64_t MultiCommandSquasher::current_reply_size_ = 0;
thread_local size_t MultiCommandSquasher::throttle_size_limit_ =
absl::GetFlag(FLAGS_throttle_squashed) * ServerState::tlocal()->GetTotalShards();
thread_local size_t MultiCommandSquasher::reply_size_limit_ =
absl::GetFlag(FLAGS_squashed_reply_size_limit);
util::fb2::EventCount MultiCommandSquasher::ec_;
MultiCommandSquasher::MultiCommandSquasher(absl::Span<StoredCmd> cmds, ConnectionContext* cntx,
@ -214,8 +216,14 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
if (order_.empty())
return true;
MultiCommandSquasher::ec_.await(
[]() { return !MultiCommandSquasher::IsMultiCommandSquasherOverLimit(); });
// Multi non atomic does not lock ahead. So it's safe to preempt while we haven't
// really started the transaction.
// This is not true for `multi/exec` which uses `Execute()` but locks ahead before it
// calls `ScheduleSingleHop` below.
// TODO Investigate what are the side effects for allowing it `lock ahead` mode.
if (opts_.is_mult_non_atomic) {
MultiCommandSquasher::ec_.await([]() { return !MultiCommandSquasher::IsReplySizeOverLimit(); });
}
unsigned num_shards = 0;
for (auto& sd : sharded_) {

View file

@ -24,11 +24,12 @@ namespace dfly {
class MultiCommandSquasher {
public:
struct Opts {
bool verify_commands = false; // Whether commands need to be verified before execution
bool error_abort = false; // Abort upon receiving error
bool verify_commands = false; // Whether commands need to be verified before execution
bool error_abort = false; // Abort upon receiving error
// If MultiCommandSquasher was used from a pipeline and not from multi/exec block
bool is_mult_non_atomic = false;
unsigned max_squash_size = 32; // How many commands to squash at once
};
// Returns number of processed commands.
static size_t Execute(absl::Span<StoredCmd> cmds, facade::RedisReplyBuilder* rb,
ConnectionContext* cntx, Service* service, const Opts& opts) {
@ -39,11 +40,10 @@ class MultiCommandSquasher {
return current_reply_size_.load(std::memory_order_relaxed);
}
static bool IsMultiCommandSquasherOverLimit() {
const bool over_limit =
throttle_size_limit_ > 0 &&
current_reply_size_.load(std::memory_order_relaxed) > throttle_size_limit_;
VLOG_IF(2, over_limit) << "MultiCommandSquasher overlimit: " << throttle_size_limit_
static bool IsReplySizeOverLimit() {
const bool over_limit = reply_size_limit_ > 0 &&
current_reply_size_.load(std::memory_order_relaxed) > reply_size_limit_;
VLOG_IF(2, over_limit) << "MultiCommandSquasher overlimit: " << reply_size_limit_
<< " current reply size " << current_reply_size_;
return over_limit;
}
@ -110,7 +110,7 @@ class MultiCommandSquasher {
// Used to throttle when memory is tight
static util::fb2::EventCount ec_;
static thread_local size_t throttle_size_limit_;
static thread_local size_t reply_size_limit_;
};
} // namespace dfly

View file

@ -127,7 +127,6 @@ void ServerState::Init(uint32_t thread_index, uint32_t num_shards,
util::fb2::Launch::post, "ConnectionsWatcher",
[state = state_, main_listener] { state->ConnectionsWatcherFb(main_listener); });
}
state_->total_shards_ = num_shards;
}
void ServerState::Destroy() {

View file

@ -270,10 +270,6 @@ class ServerState { // public struct - to allow initialization.
bool ShouldLogSlowCmd(unsigned latency_usec) const;
size_t GetTotalShards() const {
return total_shards_;
}
Stats stats;
bool is_master = true;
@ -355,7 +351,6 @@ class ServerState { // public struct - to allow initialization.
uint64_t used_mem_last_update_ = 0;
MemoryUsageStats memory_stats_cached_; // thread local cache of used and rss memory current
size_t total_shards_;
static __thread ServerState* state_;
};

View file

@ -228,29 +228,26 @@ async def test_cache_eviction_with_rss_deny_oom(
@pytest.mark.asyncio
async def test_throttle_on_commands_squashing_replies_bytes(df_factory: DflyInstanceFactory):
df = df_factory.create(
proactor_threads=2, throttle_squashed=500_000_000, vmodule="multi_command_squasher=2"
proactor_threads=2,
squashed_reply_size_limit=500_000_000,
vmodule="multi_command_squasher=2",
)
df.start()
client = df.client()
# 0.5gb
await client.execute_command("debug populate 1 test 10000 rand type hash elements 50000")
await client.execute_command("debug populate 64 test 3125 rand type hash elements 500")
async def poll():
# At any point we should not cross this limit
assert df.rss < 1_500_000_000
cl = df.client()
await cl.execute_command("multi")
await cl.execute_command("hgetall test:0")
await cl.execute_command("exec")
pipe = cl.pipeline(transaction=False)
for i in range(64):
pipe.execute_command(f"hgetall test:{i}")
await pipe.execute()
# With the current approach this will overshoot
# await client.execute_command("multi")
# await client.execute_command("hgetall test:0")
# await client.execute_command("hgetall test:0")
# await client.execute_command("hgetall test:0")
# await client.execute_command("hgetall test:0")
# res = await client.execute_command("exec")
tasks = []
for i in range(20):
tasks.append(asyncio.create_task(poll()))