chore: more performance improvements around MGET and pipelining (#5022)

1. Remove one vector (affects allocation and data locality) in squashing.
2. stop deduplicating MGET keys by default, but keep it as a run-time flag.

Also, finally fix TieredStorageTest.FlushAll test.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2025-04-30 08:15:55 +03:00 committed by GitHub
parent 84456a2442
commit 10cd22375e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 59 additions and 45 deletions

View file

@ -1500,6 +1500,11 @@ void Connection::SquashPipeline() {
size_t dispatched =
service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), reply_builder_.get(), cc_.get());
// async_dispatch is a guard to prevent concurrent writes into reply_builder_, hence
// it must guard the Flush() as well.
//
// TODO: to investigate if always flushing will improve P99 latency because otherwise we
// wait for the next batch to finish before fully flushing the current response.
if (pending_pipeline_cmd_cnt_ == squash_cmds.size()) { // Flush if no new commands appeared
reply_builder_->Flush();
reply_builder_->SetBatchMode(false); // in case the next dispatch is sync

View file

@ -126,12 +126,12 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(const StoredC
auto& sinfo = PrepareShardInfo(last_sid);
sinfo.cmds.push_back(cmd);
sinfo.dispatched.push_back({.cmd = cmd, .reply = {}});
order_.push_back(last_sid);
num_squashed_++;
bool need_flush = sinfo.cmds.size() >= opts_.max_squash_size;
bool need_flush = sinfo.dispatched.size() >= opts_.max_squash_size;
return need_flush ? SquashResult::SQUASHED_FULL : SquashResult::SQUASHED;
}
@ -160,7 +160,7 @@ bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, cons
OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v) {
auto& sinfo = sharded_[es->shard_id()];
DCHECK(!sinfo.cmds.empty());
DCHECK(!sinfo.dispatched.empty());
auto* local_tx = sinfo.local_tx.get();
facade::CapturingReplyBuilder crb(ReplyMode::FULL, resp_v);
@ -171,27 +171,27 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v
CmdArgVec arg_vec;
for (const auto* cmd : sinfo.cmds) {
auto args = cmd->ArgList(&arg_vec);
for (auto& dispatched : sinfo.dispatched) {
auto args = dispatched.cmd->ArgList(&arg_vec);
if (opts_.verify_commands) {
// The shared context is used for state verification, the local one is only for replies
if (auto err = service_->VerifyCommandState(cmd->Cid(), args, *cntx_); err) {
if (auto err = service_->VerifyCommandState(dispatched.cmd->Cid(), args, *cntx_); err) {
crb.SendError(std::move(*err));
sinfo.replies.emplace_back(crb.Take());
current_reply_size_.fetch_add(Size(sinfo.replies.back()), std::memory_order_relaxed);
dispatched.reply = crb.Take();
current_reply_size_.fetch_add(Size(dispatched.reply), std::memory_order_relaxed);
continue;
}
}
local_cntx.SwitchTxCmd(cmd->Cid());
crb.SetReplyMode(cmd->ReplyMode());
local_cntx.SwitchTxCmd(dispatched.cmd->Cid());
crb.SetReplyMode(dispatched.cmd->ReplyMode());
local_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args);
service_->InvokeCmd(cmd->Cid(), args, &crb, &local_cntx);
service_->InvokeCmd(dispatched.cmd->Cid(), args, &crb, &local_cntx);
sinfo.replies.emplace_back(crb.Take());
current_reply_size_.fetch_add(Size(sinfo.replies.back()), std::memory_order_relaxed);
dispatched.reply = crb.Take();
current_reply_size_.fetch_add(Size(dispatched.reply), std::memory_order_relaxed);
// Assert commands made no persistent state changes to stub context state
const auto& local_state = local_cntx.conn_state;
@ -210,8 +210,7 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
unsigned num_shards = 0;
for (auto& sd : sharded_) {
sd.replies.reserve(sd.cmds.size());
if (!sd.cmds.empty())
if (!sd.dispatched.empty())
++num_shards;
}
@ -224,7 +223,7 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
// stubs, non-atomic ones just run the commands in parallel.
if (IsAtomic()) {
cntx_->cid = base_cid_;
auto cb = [this](ShardId sid) { return !sharded_[sid].cmds.empty(); };
auto cb = [this](ShardId sid) { return !sharded_[sid].dispatched.empty(); };
tx->PrepareSquashedMultiHop(base_cid_, cb);
tx->ScheduleSingleHop(
[this, rb](auto* tx, auto* es) { return SquashedHopCb(es, rb->GetRespVersion()); });
@ -238,7 +237,7 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
};
for (unsigned i = 0; i < sharded_.size(); ++i) {
if (!sharded_[i].cmds.empty())
if (!sharded_[i].dispatched.empty())
shard_set->AddL2(i, cb);
}
bc->Wait();
@ -249,10 +248,9 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
for (auto idx : order_) {
auto& sinfo = sharded_[idx];
auto& replies = sinfo.replies;
DCHECK_LT(sinfo.reply_id, replies.size());
DCHECK_LT(sinfo.reply_id, sinfo.dispatched.size());
auto& reply = replies[sinfo.reply_id++];
auto& reply = sinfo.dispatched[sinfo.reply_id++].reply;
aborted |= opts_.error_abort && CapturingReplyBuilder::TryExtractError(reply);
current_reply_size_.fetch_sub(Size(reply), std::memory_order_relaxed);
@ -265,8 +263,7 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) {
ServerState::SafeTLocal()->stats.multi_squash_exec_reply_usec += (after_reply - after_hop) / 1000;
for (auto& sinfo : sharded_) {
sinfo.cmds.clear();
sinfo.replies.clear();
sinfo.dispatched.clear();
sinfo.reply_id = 0;
}
@ -287,11 +284,12 @@ size_t MultiCommandSquasher::Run(RedisReplyBuilder* rb) {
if (res == SquashResult::NOT_SQUASHED || res == SquashResult::SQUASHED_FULL) {
if (!ExecuteSquashed(rb))
break;
}
if (res == SquashResult::NOT_SQUASHED) {
if (!ExecuteStandalone(rb, &cmd))
break;
// if the last command was not added - we squash it separately.
if (res == SquashResult::NOT_SQUASHED) {
if (!ExecuteStandalone(rb, &cmd))
break;
}
}
}

View file

@ -44,8 +44,11 @@ class MultiCommandSquasher {
ShardExecInfo() : local_tx{nullptr} {
}
std::vector<const StoredCmd*> cmds; // accumulated commands
std::vector<facade::CapturingReplyBuilder::Payload> replies;
struct Command {
const StoredCmd* cmd;
facade::CapturingReplyBuilder::Payload reply;
};
std::vector<Command> dispatched; // Dispatched commands
unsigned reply_id = 0;
boost::intrusive_ptr<Transaction> local_tx; // stub-mode tx for use inside shard
};

View file

@ -35,6 +35,8 @@
#include "server/transaction.h"
#include "util/fibers/future.h"
ABSL_FLAG(bool, mget_dedup_keys, false, "If true, MGET will deduplicate keys");
namespace dfly {
namespace {
@ -51,10 +53,12 @@ constexpr uint32_t kMaxStrLen = 1 << 28;
// Stores a string, the pending result of a tiered read or nothing
struct StringValue {
StringValue() : v_{} {
StringValue() : v_{std::monostate{}} {
}
StringValue(std::string s) : v_{std::move(s)} {
}
StringValue(fb2::Future<std::string> f) : v_{std::move(f)} {
}
@ -547,23 +551,25 @@ MGetResponse OpMGet(fb2::BlockingCounter wait_bc, uint8_t fetch_mask, const Tran
absl::InlinedVector<Item, 32> items(keys.Size());
// We can not make it thread-local because we may preempt during the Find loop due to
// serialization with the bumpup calls.
// TODO: consider separating BumpUps from finds because it becomes too complicated
// to reason about.
absl::flat_hash_map<string_view, unsigned> key_index;
// First, fetch all iterators and count total size ahead
size_t total_size = 0;
unsigned index = 0;
key_index.reserve(keys.Size());
static bool mget_dedup_keys = absl::GetFlag(FLAGS_mget_dedup_keys);
// We can not make it thread-local because we may preempt during the Find loop due to
// replication of expiry events.
absl::flat_hash_map<string_view, unsigned> key_index;
if (mget_dedup_keys) {
key_index.reserve(keys.Size());
}
for (string_view key : keys) {
auto [it, inserted] = key_index.try_emplace(key, index);
if (!inserted) { // duplicate -> point to the first occurrence.
items[index++].source_index = it->second;
continue;
if (mget_dedup_keys) {
auto [it, inserted] = key_index.try_emplace(key, index);
if (!inserted) { // duplicate -> point to the first occurrence.
items[index++].source_index = it->second;
continue;
}
}
auto it_res = db_slice.FindReadOnly(t->GetDbContext(), key, OBJ_STRING);
@ -1074,11 +1080,11 @@ void StringFamily::Set(CmdArgList args, const CommandContext& cmnd_cntx) {
}
void StringFamily::SetEx(CmdArgList args, const CommandContext& cmnd_cntx) {
SetExGeneric(true, std::move(args), cmnd_cntx.conn_cntx->cid, cmnd_cntx.tx, cmnd_cntx.rb);
SetExGeneric(true, args, cmnd_cntx.conn_cntx->cid, cmnd_cntx.tx, cmnd_cntx.rb);
}
void StringFamily::PSetEx(CmdArgList args, const CommandContext& cmnd_cntx) {
SetExGeneric(false, std::move(args), cmnd_cntx.conn_cntx->cid, cmnd_cntx.tx, cmnd_cntx.rb);
SetExGeneric(false, args, cmnd_cntx.conn_cntx->cid, cmnd_cntx.tx, cmnd_cntx.rb);
}
void StringFamily::SetNx(CmdArgList args, const CommandContext& cmnd_cntx) {

View file

@ -276,7 +276,10 @@ TEST_F(TieredStorageTest, FlushAll) {
Metrics metrics;
ExpectConditionWithinTimeout([&] {
metrics = GetMetrics();
return metrics.events.hits > 2;
// Note that metrics.events.hits is not consistent with total_fetches
// and it can happen that hits is greater than total_fetches due to in-progress reads.
return metrics.tiered_stats.total_fetches > 2;
});
LOG(INFO) << FormatMetrics(metrics);
@ -290,7 +293,6 @@ TEST_F(TieredStorageTest, FlushAll) {
LOG(INFO) << FormatMetrics(metrics);
EXPECT_EQ(metrics.db_stats.front().tiered_entries, 0u);
EXPECT_GT(metrics.tiered_stats.total_fetches, 2u);
}
TEST_F(TieredStorageTest, FlushPending) {