diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 822119303..fa77ab2cb 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -1500,6 +1500,11 @@ void Connection::SquashPipeline() { size_t dispatched = service_->DispatchManyCommands(absl::MakeSpan(squash_cmds), reply_builder_.get(), cc_.get()); + // async_dispatch is a guard to prevent concurrent writes into reply_builder_, hence + // it must guard the Flush() as well. + // + // TODO: to investigate if always flushing will improve P99 latency because otherwise we + // wait for the next batch to finish before fully flushing the current response. if (pending_pipeline_cmd_cnt_ == squash_cmds.size()) { // Flush if no new commands appeared reply_builder_->Flush(); reply_builder_->SetBatchMode(false); // in case the next dispatch is sync diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index f6bd2a596..0a9369f3f 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -126,12 +126,12 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(const StoredC auto& sinfo = PrepareShardInfo(last_sid); - sinfo.cmds.push_back(cmd); + sinfo.dispatched.push_back({.cmd = cmd, .reply = {}}); order_.push_back(last_sid); num_squashed_++; - bool need_flush = sinfo.cmds.size() >= opts_.max_squash_size; + bool need_flush = sinfo.dispatched.size() >= opts_.max_squash_size; return need_flush ? SquashResult::SQUASHED_FULL : SquashResult::SQUASHED; } @@ -160,7 +160,7 @@ bool MultiCommandSquasher::ExecuteStandalone(facade::RedisReplyBuilder* rb, cons OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v) { auto& sinfo = sharded_[es->shard_id()]; - DCHECK(!sinfo.cmds.empty()); + DCHECK(!sinfo.dispatched.empty()); auto* local_tx = sinfo.local_tx.get(); facade::CapturingReplyBuilder crb(ReplyMode::FULL, resp_v); @@ -171,27 +171,27 @@ OpStatus MultiCommandSquasher::SquashedHopCb(EngineShard* es, RespVersion resp_v CmdArgVec arg_vec; - for (const auto* cmd : sinfo.cmds) { - auto args = cmd->ArgList(&arg_vec); + for (auto& dispatched : sinfo.dispatched) { + auto args = dispatched.cmd->ArgList(&arg_vec); if (opts_.verify_commands) { // The shared context is used for state verification, the local one is only for replies - if (auto err = service_->VerifyCommandState(cmd->Cid(), args, *cntx_); err) { + if (auto err = service_->VerifyCommandState(dispatched.cmd->Cid(), args, *cntx_); err) { crb.SendError(std::move(*err)); - sinfo.replies.emplace_back(crb.Take()); - current_reply_size_.fetch_add(Size(sinfo.replies.back()), std::memory_order_relaxed); + dispatched.reply = crb.Take(); + current_reply_size_.fetch_add(Size(dispatched.reply), std::memory_order_relaxed); continue; } } - local_cntx.SwitchTxCmd(cmd->Cid()); - crb.SetReplyMode(cmd->ReplyMode()); + local_cntx.SwitchTxCmd(dispatched.cmd->Cid()); + crb.SetReplyMode(dispatched.cmd->ReplyMode()); local_tx->InitByArgs(cntx_->ns, local_cntx.conn_state.db_index, args); - service_->InvokeCmd(cmd->Cid(), args, &crb, &local_cntx); + service_->InvokeCmd(dispatched.cmd->Cid(), args, &crb, &local_cntx); - sinfo.replies.emplace_back(crb.Take()); - current_reply_size_.fetch_add(Size(sinfo.replies.back()), std::memory_order_relaxed); + dispatched.reply = crb.Take(); + current_reply_size_.fetch_add(Size(dispatched.reply), std::memory_order_relaxed); // Assert commands made no persistent state changes to stub context state const auto& local_state = local_cntx.conn_state; @@ -210,8 +210,7 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { unsigned num_shards = 0; for (auto& sd : sharded_) { - sd.replies.reserve(sd.cmds.size()); - if (!sd.cmds.empty()) + if (!sd.dispatched.empty()) ++num_shards; } @@ -224,7 +223,7 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { // stubs, non-atomic ones just run the commands in parallel. if (IsAtomic()) { cntx_->cid = base_cid_; - auto cb = [this](ShardId sid) { return !sharded_[sid].cmds.empty(); }; + auto cb = [this](ShardId sid) { return !sharded_[sid].dispatched.empty(); }; tx->PrepareSquashedMultiHop(base_cid_, cb); tx->ScheduleSingleHop( [this, rb](auto* tx, auto* es) { return SquashedHopCb(es, rb->GetRespVersion()); }); @@ -238,7 +237,7 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { }; for (unsigned i = 0; i < sharded_.size(); ++i) { - if (!sharded_[i].cmds.empty()) + if (!sharded_[i].dispatched.empty()) shard_set->AddL2(i, cb); } bc->Wait(); @@ -249,10 +248,9 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { for (auto idx : order_) { auto& sinfo = sharded_[idx]; - auto& replies = sinfo.replies; - DCHECK_LT(sinfo.reply_id, replies.size()); + DCHECK_LT(sinfo.reply_id, sinfo.dispatched.size()); - auto& reply = replies[sinfo.reply_id++]; + auto& reply = sinfo.dispatched[sinfo.reply_id++].reply; aborted |= opts_.error_abort && CapturingReplyBuilder::TryExtractError(reply); current_reply_size_.fetch_sub(Size(reply), std::memory_order_relaxed); @@ -265,8 +263,7 @@ bool MultiCommandSquasher::ExecuteSquashed(facade::RedisReplyBuilder* rb) { ServerState::SafeTLocal()->stats.multi_squash_exec_reply_usec += (after_reply - after_hop) / 1000; for (auto& sinfo : sharded_) { - sinfo.cmds.clear(); - sinfo.replies.clear(); + sinfo.dispatched.clear(); sinfo.reply_id = 0; } @@ -287,11 +284,12 @@ size_t MultiCommandSquasher::Run(RedisReplyBuilder* rb) { if (res == SquashResult::NOT_SQUASHED || res == SquashResult::SQUASHED_FULL) { if (!ExecuteSquashed(rb)) break; - } - if (res == SquashResult::NOT_SQUASHED) { - if (!ExecuteStandalone(rb, &cmd)) - break; + // if the last command was not added - we squash it separately. + if (res == SquashResult::NOT_SQUASHED) { + if (!ExecuteStandalone(rb, &cmd)) + break; + } } } diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index 657ecef1e..9aea56880 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -44,8 +44,11 @@ class MultiCommandSquasher { ShardExecInfo() : local_tx{nullptr} { } - std::vector cmds; // accumulated commands - std::vector replies; + struct Command { + const StoredCmd* cmd; + facade::CapturingReplyBuilder::Payload reply; + }; + std::vector dispatched; // Dispatched commands unsigned reply_id = 0; boost::intrusive_ptr local_tx; // stub-mode tx for use inside shard }; diff --git a/src/server/string_family.cc b/src/server/string_family.cc index e3a598d07..6ad5210bf 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -35,6 +35,8 @@ #include "server/transaction.h" #include "util/fibers/future.h" +ABSL_FLAG(bool, mget_dedup_keys, false, "If true, MGET will deduplicate keys"); + namespace dfly { namespace { @@ -51,10 +53,12 @@ constexpr uint32_t kMaxStrLen = 1 << 28; // Stores a string, the pending result of a tiered read or nothing struct StringValue { - StringValue() : v_{} { + StringValue() : v_{std::monostate{}} { } + StringValue(std::string s) : v_{std::move(s)} { } + StringValue(fb2::Future f) : v_{std::move(f)} { } @@ -547,23 +551,25 @@ MGetResponse OpMGet(fb2::BlockingCounter wait_bc, uint8_t fetch_mask, const Tran absl::InlinedVector items(keys.Size()); - // We can not make it thread-local because we may preempt during the Find loop due to - // serialization with the bumpup calls. - - // TODO: consider separating BumpUps from finds because it becomes too complicated - // to reason about. - absl::flat_hash_map key_index; - // First, fetch all iterators and count total size ahead size_t total_size = 0; unsigned index = 0; - key_index.reserve(keys.Size()); + static bool mget_dedup_keys = absl::GetFlag(FLAGS_mget_dedup_keys); + + // We can not make it thread-local because we may preempt during the Find loop due to + // replication of expiry events. + absl::flat_hash_map key_index; + if (mget_dedup_keys) { + key_index.reserve(keys.Size()); + } for (string_view key : keys) { - auto [it, inserted] = key_index.try_emplace(key, index); - if (!inserted) { // duplicate -> point to the first occurrence. - items[index++].source_index = it->second; - continue; + if (mget_dedup_keys) { + auto [it, inserted] = key_index.try_emplace(key, index); + if (!inserted) { // duplicate -> point to the first occurrence. + items[index++].source_index = it->second; + continue; + } } auto it_res = db_slice.FindReadOnly(t->GetDbContext(), key, OBJ_STRING); @@ -1074,11 +1080,11 @@ void StringFamily::Set(CmdArgList args, const CommandContext& cmnd_cntx) { } void StringFamily::SetEx(CmdArgList args, const CommandContext& cmnd_cntx) { - SetExGeneric(true, std::move(args), cmnd_cntx.conn_cntx->cid, cmnd_cntx.tx, cmnd_cntx.rb); + SetExGeneric(true, args, cmnd_cntx.conn_cntx->cid, cmnd_cntx.tx, cmnd_cntx.rb); } void StringFamily::PSetEx(CmdArgList args, const CommandContext& cmnd_cntx) { - SetExGeneric(false, std::move(args), cmnd_cntx.conn_cntx->cid, cmnd_cntx.tx, cmnd_cntx.rb); + SetExGeneric(false, args, cmnd_cntx.conn_cntx->cid, cmnd_cntx.tx, cmnd_cntx.rb); } void StringFamily::SetNx(CmdArgList args, const CommandContext& cmnd_cntx) { diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index ac2be9e63..467a8a2e6 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -276,7 +276,10 @@ TEST_F(TieredStorageTest, FlushAll) { Metrics metrics; ExpectConditionWithinTimeout([&] { metrics = GetMetrics(); - return metrics.events.hits > 2; + + // Note that metrics.events.hits is not consistent with total_fetches + // and it can happen that hits is greater than total_fetches due to in-progress reads. + return metrics.tiered_stats.total_fetches > 2; }); LOG(INFO) << FormatMetrics(metrics); @@ -290,7 +293,6 @@ TEST_F(TieredStorageTest, FlushAll) { LOG(INFO) << FormatMetrics(metrics); EXPECT_EQ(metrics.db_stats.front().tiered_entries, 0u); - EXPECT_GT(metrics.tiered_stats.total_fetches, 2u); } TEST_F(TieredStorageTest, FlushPending) {