From 015ed622c57449a2df93f709c31ecb787636f0f8 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Tue, 11 Apr 2023 10:14:36 +0300 Subject: [PATCH] fix(server): Optimize StoredCmd (#1053) Opmitize StoredCmd to allow inline storage --- src/server/conn_context.cc | 38 ++++++++++++++++-------- src/server/conn_context.h | 29 ++++++++++-------- src/server/main_service.cc | 35 ++++++++++++++-------- src/server/multi_command_squasher.cc | 44 ++++++++++++++++++++-------- src/server/multi_test.cc | 2 +- src/server/transaction.cc | 2 +- 6 files changed, 98 insertions(+), 52 deletions(-) diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index 8230219e8..fb00864c0 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -16,22 +16,36 @@ namespace dfly { using namespace std; using namespace facade; -StoredCmd::StoredCmd(const CommandId* d, CmdArgList args) : descr(d) { +StoredCmd::StoredCmd(const CommandId* cid, CmdArgList args) + : cid_{cid}, buffer_{}, sizes_(args.size()) { size_t total_size = 0; - for (auto args : args) { + for (auto args : args) total_size += args.size(); - } - backing_storage_.reset(new char[total_size]); - arg_vec_.resize(args.size()); - char* next = backing_storage_.get(); - for (size_t i = 0; i < args.size(); ++i) { - auto src = args[i]; - memcpy(next, src.data(), src.size()); - arg_vec_[i] = MutableSlice{next, src.size()}; - next += src.size(); + buffer_.resize(total_size); + char* next = buffer_.data(); + for (unsigned i = 0; i < args.size(); i++) { + memcpy(next, args[i].data(), args[i].size()); + sizes_[i] = args[i].size(); + next += args[i].size(); } - arg_list_ = {arg_vec_.data(), arg_vec_.size()}; +} + +void StoredCmd::Fill(CmdArgList args) { + CHECK_GE(args.size(), sizes_.size()); + unsigned offset = 0; + for (unsigned i = 0; i < sizes_.size(); i++) { + args[i] = MutableSlice{buffer_.data() + offset, sizes_[i]}; + offset += sizes_[i]; + } +} + +size_t StoredCmd::NumArgs() const { + return sizes_.size(); +} + +const CommandId* StoredCmd::Cid() const { + return cid_; } void ConnectionContext::ChangeMonitor(bool start) { diff --git a/src/server/conn_context.h b/src/server/conn_context.h index 217a19283..905f650c1 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -4,6 +4,7 @@ #pragma once +#include #include #include "core/fibers.h" @@ -17,20 +18,24 @@ class EngineShardSet; class ConnectionContext; class ChannelStore; -struct StoredCmd { - const CommandId* descr; +// Stores command id and arguments for delayed invocation. +// Used for storing MULTI/EXEC commands. +class StoredCmd { + public: + StoredCmd(const CommandId* cid, CmdArgList args); + + size_t NumArgs() const; + + // Fill the arg list with stored arguments, it should be at least of size NumArgs(). + // Between filling and invocation, cmd should NOT be moved. + void Fill(CmdArgList args); + + const CommandId* Cid() const; private: - std::unique_ptr backing_storage_; - CmdArgVec arg_vec_; - CmdArgList arg_list_; - - public: - StoredCmd(const CommandId* d, CmdArgList args); - - CmdArgList ArgList() const { - return arg_list_; - } + const CommandId* cid_; // underlying command + std::string buffer_; // underlying buffer + absl::FixedArray sizes_; // sizes of arg parts }; struct ConnectionState { diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 38f649353..9b7436d97 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1241,23 +1241,26 @@ template void IterateAllKeys(ConnectionState::ExecInfo* exec_info, for (auto& [dbid, key] : exec_info->watched_keys) f(MutableSlice{key.data(), key.size()}); - for (const auto& scmd : exec_info->body) { - if (!scmd.descr->IsTransactional()) + CmdArgVec arg_vec{}; + + for (auto& scmd : exec_info->body) { + if (!scmd.Cid()->IsTransactional()) continue; - auto args = scmd.ArgList(); + arg_vec.resize(scmd.NumArgs()); + scmd.Fill(absl::MakeSpan(arg_vec)); - auto key_res = DetermineKeys(scmd.descr, args); + auto key_res = DetermineKeys(scmd.Cid(), absl::MakeSpan(arg_vec)); if (!key_res.ok()) continue; auto key_index = key_res.value(); for (unsigned i = key_index.start; i < key_index.end; i += key_index.step) - f(args[i]); + f(arg_vec[i]); if (key_index.bonus) - f(args[*key_index.bonus]); + f(arg_vec[*key_index.bonus]); } } @@ -1286,8 +1289,8 @@ bool StartMultiExec(DbIndex dbid, Transaction* trans, ConnectionState::ExecInfo* bool global = false; bool transactional = false; for (const auto& scmd : exec_info->body) { - transactional |= scmd.descr->IsTransactional(); - global |= scmd.descr->opt_mask() & CO::GLOBAL_TRANS; + transactional |= scmd.Cid()->IsTransactional(); + global |= scmd.Cid()->opt_mask() & CO::GLOBAL_TRANS; if (global) break; } @@ -1361,18 +1364,24 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { if (absl::GetFlag(FLAGS_multi_exec_squash)) { MultiCommandSquasher::Execute(absl::MakeSpan(exec_info.body), cntx); } else { - CmdArgVec str_list; + CmdArgVec arg_vec{}; for (auto& scmd : exec_info.body) { - cntx->transaction->MultiSwitchCmd(scmd.descr); - if (scmd.descr->IsTransactional()) { - OpStatus st = cntx->transaction->InitByArgs(cntx->conn_state.db_index, scmd.ArgList()); + cntx->transaction->MultiSwitchCmd(scmd.Cid()); + + arg_vec.resize(scmd.NumArgs()); + CmdArgList args = absl::MakeSpan(arg_vec); + scmd.Fill(args); + + if (scmd.Cid()->IsTransactional()) { + OpStatus st = cntx->transaction->InitByArgs(cntx->conn_state.db_index, args); if (st != OpStatus::OK) { (*cntx)->SendError(st); break; } } - bool ok = InvokeCmd(scmd.ArgList(), scmd.descr, cntx, true); + + bool ok = InvokeCmd(args, scmd.Cid(), cntx, true); if (!ok || rb->GetError()) // checks for i/o error, not logical error. break; } diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 3166100f2..0d2136969 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -1,5 +1,7 @@ #include "server/multi_command_squasher.h" +#include + #include "server/command_registry.h" #include "server/conn_context.h" #include "server/engine_shard_set.h" @@ -43,18 +45,22 @@ MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(Shar } MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cmd) { - if (!cmd->descr->IsTransactional() || (cmd->descr->opt_mask() & CO::BLOCKING) || - (cmd->descr->opt_mask() & CO::GLOBAL_TRANS)) + if (!cmd->Cid()->IsTransactional() || (cmd->Cid()->opt_mask() & CO::BLOCKING) || + (cmd->Cid()->opt_mask() & CO::GLOBAL_TRANS)) return SquashResult::NOT_SQUASHED; - auto keys = DetermineKeys(cmd->descr, cmd->ArgList()); + tmp_keylist_.resize(cmd->NumArgs()); + auto args = absl::MakeSpan(tmp_keylist_); + cmd->Fill(args); + + auto keys = DetermineKeys(cmd->Cid(), args); if (!keys.ok()) return SquashResult::ERROR; // Check if all commands belong to one shard bool found_more = false; ShardId last_sid = kInvalidSid; - IterateKeys(cmd->ArgList(), *keys, [&last_sid, &found_more](MutableSlice key) { + IterateKeys(args, *keys, [&last_sid, &found_more](MutableSlice key) { if (found_more) return; ShardId sid = Shard(facade::ToSV(key), shard_set->size()); @@ -69,11 +75,11 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm return SquashResult::NOT_SQUASHED; if (track_keys_) - IterateKeys(cmd->ArgList(), *keys, [this](MutableSlice key) { collected_keys_.insert(key); }); + IterateKeys(args, *keys, [this](MutableSlice key) { collected_keys_.insert(key); }); auto& sinfo = PrepareShardInfo(last_sid); - sinfo.had_writes |= (cmd->descr->opt_mask() & CO::WRITE); + sinfo.had_writes |= (cmd->Cid()->opt_mask() & CO::WRITE); sinfo.cmds.push_back(cmd); order_.push_back(last_sid); @@ -87,10 +93,15 @@ void MultiCommandSquasher::ExecuteStandalone(StoredCmd* cmd) { DCHECK(order_.empty()); // check no squashed chain is interrupted auto* tx = cntx_->transaction; - tx->MultiSwitchCmd(cmd->descr); - if (cmd->descr->IsTransactional()) - tx->InitByArgs(cntx_->conn_state.db_index, cmd->ArgList()); - cmd->descr->Invoke(cmd->ArgList(), cntx_); + tx->MultiSwitchCmd(cmd->Cid()); + + tmp_keylist_.resize(cmd->NumArgs()); + auto args = absl::MakeSpan(tmp_keylist_); + cmd->Fill(args); + + if (cmd->Cid()->IsTransactional()) + tx->InitByArgs(cntx_->conn_state.db_index, args); + cmd->Cid()->Invoke(args, cntx_); } OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard* es) { @@ -101,10 +112,17 @@ OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard facade::CapturingReplyBuilder crb; ConnectionContext local_cntx{local_tx, &crb}; + absl::InlinedVector arg_vec; + for (auto* cmd : sinfo.cmds) { - local_tx->MultiSwitchCmd(cmd->descr); - local_tx->InitByArgs(parent_tx->GetDbIndex(), cmd->ArgList()); - cmd->descr->Invoke(cmd->ArgList(), &local_cntx); + local_tx->MultiSwitchCmd(cmd->Cid()); + + arg_vec.resize(cmd->NumArgs()); + auto args = absl::MakeSpan(arg_vec); + cmd->Fill(args); + + local_tx->InitByArgs(parent_tx->GetDbIndex(), args); + cmd->Cid()->Invoke(args, &local_cntx); sinfo.reply_chan->Push(crb.Take()); } diff --git a/src/server/multi_test.cc b/src/server/multi_test.cc index 220515228..82230aa1e 100644 --- a/src/server/multi_test.cc +++ b/src/server/multi_test.cc @@ -723,7 +723,7 @@ TEST_F(MultiTest, ContendedList) { // because it runs them within one hop. TEST_F(MultiTest, TestSquashing) { absl::SetFlag(&FLAGS_multi_exec_squash, true); - absl::SetFlag(&FLAGS_multi_exec_mode, Transaction::NON_ATOMIC); + absl::SetFlag(&FLAGS_multi_exec_mode, Transaction::LOCK_AHEAD); const char* keys[] = {kKeySid0, kKeySid1, kKeySid2}; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index dffffabd8..2b0cea9ec 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -236,7 +236,7 @@ void Transaction::InitByKeys(KeyIndex key_index) { auto args = full_args_; if (key_index.start == args.size()) { // eval with 0 keys. - CHECK(absl::StartsWith(cid_->name(), "EVAL")); + CHECK(absl::StartsWith(cid_->name(), "EVAL")) << cid_->name(); return; }