From de0e5cb0bd7f8357ebc39d22d2882b97dd982ce9 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Mon, 6 May 2024 15:53:30 +0300 Subject: [PATCH] chore: get rid of kv_args and replace it with slices to full_args (#2942) The main change is in tx_base.* where we introduce ShardArgs slice that is only forward iterable. It allows us to go over sub-ranges of the full arguments slice or read an index of any of its elements. Since ShardArgs provide now indices into the original argument list we do not need to build the reverse index in transactions. Signed-off-by: Roman Gershman --- src/server/command_registry.cc | 5 -- src/server/command_registry.h | 7 +- src/server/container_utils.cc | 12 ++- src/server/journal/types.h | 6 +- src/server/json_family.cc | 13 ++-- src/server/list_family.cc | 27 ++++++- src/server/stream_family.cc | 12 +-- src/server/string_family.cc | 50 ++++++++----- src/server/transaction.cc | 132 +++++++++++---------------------- src/server/transaction.h | 26 +++---- src/server/tx_base.cc | 16 +++- src/server/tx_base.h | 83 +++++++++++++++------ src/server/zset_family.cc | 12 ++- 13 files changed, 216 insertions(+), 185 deletions(-) diff --git a/src/server/command_registry.cc b/src/server/command_registry.cc index 1ad459ca9..82548d8d8 100644 --- a/src/server/command_registry.cc +++ b/src/server/command_registry.cc @@ -40,9 +40,6 @@ CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first : facade::CommandId(name, mask, arity, first_key, last_key, acl_categories) { if (mask & CO::ADMIN) opt_mask_ |= CO::NOSCRIPT; - - if (mask & CO::BLOCKING) - opt_mask_ |= CO::REVERSE_MAPPING; } bool CommandId::IsTransactional() const { @@ -173,8 +170,6 @@ const char* OptName(CO::CommandOpt fl) { return "readonly"; case DENYOOM: return "denyoom"; - case REVERSE_MAPPING: - return "reverse-mapping"; case FAST: return "fast"; case LOADING: diff --git a/src/server/command_registry.h b/src/server/command_registry.h index 2b3ccb0c4..d6ba00921 100644 --- a/src/server/command_registry.h +++ b/src/server/command_registry.h @@ -27,16 +27,13 @@ enum CommandOpt : uint32_t { LOADING = 1U << 3, // Command allowed during LOADING state. DENYOOM = 1U << 4, // use-memory in redis. - // marked commands that demand preserve the order of keys to work correctly. - // For example, MGET needs to know the order of keys to return the values in the same order. - // BLPOP needs to know the order of keys to return the first non-empty list from the left. - REVERSE_MAPPING = 1U << 5, + // UNUSED = 1U << 5, VARIADIC_KEYS = 1U << 6, // arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc. ADMIN = 1U << 7, // implies NOSCRIPT, NOSCRIPT = 1U << 8, - BLOCKING = 1U << 9, // implies REVERSE_MAPPING + BLOCKING = 1U << 9, HIDDEN = 1U << 10, // does not show in COMMAND command output INTERLEAVED_KEYS = 1U << 11, // keys are interleaved with arguments GLOBAL_TRANS = 1U << 12, diff --git a/src/server/container_utils.cc b/src/server/container_utils.cc index 3b0402c7b..ce79da6c8 100644 --- a/src/server/container_utils.cc +++ b/src/server/container_utils.cc @@ -40,14 +40,12 @@ OpResult> FindFirstReadOnly(const Db int req_obj_type) { DCHECK(!args.Empty()); - unsigned i = 0; - for (string_view key : args) { - OpResult res = db_slice.FindReadOnly(cntx, key, req_obj_type); + for (auto it = args.begin(); it != args.end(); ++it) { + OpResult res = db_slice.FindReadOnly(cntx, *it, req_obj_type); if (res) - return make_pair(res.value(), i); + return make_pair(res.value(), unsigned(it.index())); if (res.status() != OpStatus::KEY_NOTFOUND) return res.status(); - ++i; } VLOG(2) << "FindFirst not found"; @@ -119,8 +117,8 @@ OpResult FindFirstNonEmpty(Transaction* trans, int req_obj_type) auto comp = [trans](const OpResult& lhs, const OpResult& rhs) { if (!lhs || !rhs) return lhs.ok(); - size_t i1 = trans->ReverseArgIndex(std::get(*lhs), std::get(*lhs)); - size_t i2 = trans->ReverseArgIndex(std::get(*rhs), std::get(*rhs)); + size_t i1 = std::get<1>(*lhs); + size_t i2 = std::get<1>(*rhs); return i1 < i2; }; diff --git a/src/server/journal/types.h b/src/server/journal/types.h index 4d86e3325..aeb0286ca 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -42,8 +42,8 @@ struct Entry : public EntryBase { struct Payload { std::string_view cmd; std::variant + ShardArgs, // Shard parts. + ArgSlice> args; Payload() = default; @@ -51,6 +51,8 @@ struct Entry : public EntryBase { } Payload(std::string_view c, const ShardArgs& a) : cmd(c), args(a) { } + Payload(std::string_view c, ArgSlice a) : cmd(c), args(a) { + } }; Entry(TxId txid, Op opcode, DbIndex dbid, uint32_t shard_cnt, diff --git a/src/server/json_family.cc b/src/server/json_family.cc index da5ff13ed..0617ebc55 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -1543,12 +1543,14 @@ void JsonFamily::MGet(CmdArgList args, ConnectionContext* cntx) { continue; vector& res = mget_resp[sid]; - for (size_t j = 0; j < res.size(); ++j) { - if (!res[j]) + ShardArgs shard_args = transaction->GetShardArgs(sid); + unsigned src_index = 0; + for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_index) { + if (!res[src_index]) continue; - uint32_t indx = transaction->ReverseArgIndex(sid, j); - results[indx] = std::move(res[j]); + uint32_t dst_indx = it.index(); + results[dst_indx] = std::move(res[src_index]); } } @@ -2091,8 +2093,7 @@ void JsonFamily::Register(CommandRegistry* registry) { constexpr size_t kMsetFlags = CO::WRITE | CO::DENYOOM | CO::FAST | CO::INTERLEAVED_KEYS; registry->StartFamily(); *registry << CI{"JSON.GET", CO::READONLY | CO::FAST, -2, 1, 1, acl::JSON}.HFUNC(Get); - *registry << CI{"JSON.MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING, -3, 1, -2, acl::JSON} - .HFUNC(MGet); + *registry << CI{"JSON.MGET", CO::READONLY | CO::FAST, -3, 1, -2, acl::JSON}.HFUNC(MGet); *registry << CI{"JSON.TYPE", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(Type); *registry << CI{"JSON.STRLEN", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(StrLen); *registry << CI{"JSON.OBJLEN", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(ObjLen); diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 79adbf336..9515c7239 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -158,6 +158,22 @@ struct CircularMessages { // Used to recover logs for BLPOP failures. See OpBPop. thread_local CircularMessages debugMessages{50}; +// A bit awkward translation from a single key to ShardArgs. +// We create a mutable slice (which will never be mutated) from the key, then we create +// a CmdArgList of size 1 that references mslice and finally +// we reference the first element in the CmdArgList via islice. +struct SingleArg { + MutableSlice mslice; + IndexSlice islice{0, 1}; + + SingleArg(string_view arg) : mslice(const_cast(arg.data()), arg.size()) { + } + + ShardArgs Get() { + return ShardArgs{CmdArgList{&mslice, 1}, absl::MakeSpan(&islice, 1)}; + } +}; + class BPopPusher { public: BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir); @@ -448,7 +464,9 @@ OpResult MoveTwoShards(Transaction* trans, string_view src, string_view // hack, again. since we hacked which queue we are waiting on (see RunPair) // we must clean-up src key here manually. See RunPair why we do this. // in short- we suspended on "src" on both shards. - shard->blocking_controller()->FinalizeWatched(ArgSlice{&src, 1}, t); + + SingleArg single_arg{src}; + shard->blocking_controller()->FinalizeWatched(single_arg.Get(), t); } } else { DVLOG(1) << "Popping value from list: " << key; @@ -873,7 +891,8 @@ OpResult BPopPusher::RunSingle(ConnectionContext* cntx, time_point tp) { return op_res; } - auto wcb = [&](Transaction* t, EngineShard* shard) { return ShardArgs{&this->pop_key_, 1}; }; + SingleArg single_arg{pop_key_}; + auto wcb = [&](Transaction* t, EngineShard* shard) { return single_arg.Get(); }; const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*, std::string_view key) -> bool { @@ -900,11 +919,13 @@ OpResult BPopPusher::RunPair(ConnectionContext* cntx, time_point tp) { return op_res; } + SingleArg single_arg(this->pop_key_); + // a hack: we watch in both shards for pop_key but only in the source shard it's relevant. // Therefore we follow the regular flow of watching the key but for the destination shard it // will never be triggerred. // This allows us to run Transaction::Execute on watched transactions in both shards. - auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; }; + auto wcb = [&](Transaction* t, EngineShard* shard) { return single_arg.Get(); }; const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*, std::string_view key) -> bool { diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 3274c439c..675871ac6 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -2989,17 +2989,19 @@ void XReadImpl(CmdArgList args, std::optional opts, ConnectionContext* continue; vector& results = xread_resp[sid]; + unsigned src_index = 0; + ShardArgs shard_args = cntx->transaction->GetShardArgs(sid); - for (size_t i = 0; i < results.size(); ++i) { - if (results[i].size() == 0) { + for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_index) { + if (results[src_index].size() == 0) { continue; } resolved_streams++; // Add the stream records ordered by the original stream arguments. - size_t indx = cntx->transaction->ReverseArgIndex(sid, i); - res[indx - opts->streams_arg] = std::move(results[i]); + size_t dst_indx = it.index(); + res[dst_indx - opts->streams_arg] = std::move(results[src_index]); } } @@ -3323,7 +3325,7 @@ constexpr uint32_t kXAutoClaim = WRITE | STREAM | FAST; void StreamFamily::Register(CommandRegistry* registry) { using CI = CommandId; registry->StartFamily(); - constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS; + constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::VARIADIC_KEYS; *registry << CI{"XADD", CO::WRITE | CO::DENYOOM | CO::FAST, -5, 1, 1, acl::kXAdd}.HFUNC(XAdd) << CI{"XCLAIM", CO::WRITE | CO::FAST, -6, 1, 1, acl::kXClaim}.HFUNC(XClaim) << CI{"XDEL", CO::WRITE | CO::FAST, -3, 1, 1, acl::kXDel}.HFUNC(XDel) diff --git a/src/server/string_family.cc b/src/server/string_family.cc index d10de123e..96db64f88 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -271,6 +271,7 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success) SetCmd sg(op_args, false); size_t index = 0; + bool partial = false; for (auto it = args.begin(); it != args.end(); ++it) { string_view key = *it; ++it; @@ -278,6 +279,7 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success) DVLOG(1) << "MSet " << key << ":" << value; if (sg.Set(params, key, value) != OpStatus::OK) { // OOM for example. success->store(false); + partial = true; break; } index += 2; @@ -286,18 +288,29 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success) if (auto journal = op_args.shard->journal(); journal) { // We write a custom journal because an OOM in the above loop could lead to partial success, so // we replicate only what was changed. - string_view cmd; - ArgSlice cmd_args; - if (index == 0) { - // All shards must record the tx was executed for the replica to execute it, so we send a PING - // in case nothing was changed - cmd = "PING"; + if (partial) { + string_view cmd; + ArgSlice cmd_args; + vector store_args(index); + if (index == 0) { + // All shards must record the tx was executed for the replica to execute it, so we send a + // PING in case nothing was changed + cmd = "PING"; + } else { + // journal [0, i) + cmd = "MSET"; + unsigned i = 0; + for (string_view arg : args) { + store_args[i++] = arg; + if (i >= store_args.size()) + break; + } + cmd_args = absl::MakeSpan(store_args); + } + RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt()); } else { - // journal [0, i) - cmd = "MSET"; - cmd_args = ArgSlice(args.begin(), index); + RecordJournal(op_args, "MSET", args, op_args.tx->GetUniqueShardCnt()); } - RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt()); } } @@ -1161,16 +1174,17 @@ void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) { src.storage_list->next = res.storage_list; res.storage_list = src.storage_list; src.storage_list = nullptr; - - for (size_t j = 0; j < src.resp_arr.size(); ++j) { - if (!src.resp_arr[j]) + ShardArgs shard_args = transaction->GetShardArgs(sid); + unsigned src_indx = 0; + for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_indx) { + if (!src.resp_arr[src_indx]) continue; - uint32_t indx = transaction->ReverseArgIndex(sid, j); + uint32_t indx = it.index(); - res.resp_arr[indx] = std::move(src.resp_arr[j]); + res.resp_arr[indx] = std::move(src.resp_arr[src_indx]); if (cntx->protocol() == Protocol::MEMCACHE) { - res.resp_arr[indx]->key = ArgS(args, indx); + res.resp_arr[indx]->key = *it; } } } @@ -1486,9 +1500,7 @@ void StringFamily::Register(CommandRegistry* registry) { << CI{"GETEX", CO::WRITE | CO::DENYOOM | CO::FAST | CO::NO_AUTOJOURNAL, -1, 1, 1, acl::kGetEx} .HFUNC(GetEx) << CI{"GETSET", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, acl::kGetSet}.HFUNC(GetSet) - << CI{"MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING | CO::IDEMPOTENT, -2, 1, -1, - acl::kMGet} - .HFUNC(MGet) + << CI{"MGET", CO::READONLY | CO::FAST | CO::IDEMPOTENT, -2, 1, -1, acl::kMGet}.HFUNC(MGet) << CI{"MSET", kMSetMask, -3, 1, -1, acl::kMSet}.HFUNC(MSet) << CI{"MSETNX", kMSetMask, -3, 1, -1, acl::kMSetNx}.HFUNC(MSetNx) << CI{"STRLEN", CO::READONLY | CO::FAST, 2, 1, 1, acl::kStrLen}.HFUNC(StrLen) diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 65795a52e..b09969478 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -184,12 +184,13 @@ void Transaction::InitGlobal() { void Transaction::BuildShardIndex(const KeyIndex& key_index, std::vector* out) { auto& shard_index = *out; - auto add = [this, rev_mapping = key_index.has_reverse_mapping, &shard_index](uint32_t sid, - uint32_t i) { - string_view val = ArgS(full_args_, i); - shard_index[sid].args.push_back(val); - if (rev_mapping) - shard_index[sid].original_index.push_back(i); + auto add = [this, &shard_index](uint32_t sid, uint32_t b, uint32_t e) { + auto& slices = shard_index[sid].slices; + if (!slices.empty() && slices.back().second == b) { + slices.back().second = e; + } else { + slices.emplace_back(b, e); + } }; if (key_index.bonus) { @@ -197,47 +198,39 @@ void Transaction::BuildShardIndex(const KeyIndex& key_index, std::vector shard_index, size_t num_args, - bool rev_mapping) { - kv_args_.reserve(num_args); +void Transaction::InitShardData(absl::Span shard_index, size_t num_args) { + args_slices_.reserve(num_args); DCHECK(kv_fp_.empty()); kv_fp_.reserve(num_args); - if (rev_mapping) - reverse_index_.reserve(num_args); - // Store the concatenated per-shard arguments from the shard index inside kv_args_ // and make each shard data point to its own sub-span inside kv_args_. for (size_t i = 0; i < shard_data_.size(); ++i) { auto& sd = shard_data_[i]; - const auto& si = shard_index[i]; + const auto& src = shard_index[i]; - sd.arg_count = si.args.size(); - sd.arg_start = kv_args_.size(); + sd.slice_count = src.slices.size(); + sd.slice_start = args_slices_.size(); sd.fp_start = kv_fp_.size(); sd.fp_count = 0; // Multi transactions can re-initialize on different shards, so clear ACTIVE flag. DCHECK_EQ(sd.local_mask & ACTIVE, 0); - if (sd.arg_count == 0) + if (sd.slice_count == 0) continue; sd.local_mask |= ACTIVE; @@ -245,19 +238,16 @@ void Transaction::InitShardData(absl::Span shard_index, siz unique_shard_cnt_++; unique_shard_id_ = i; - for (size_t j = 0; j < si.args.size(); ++j) { - string_view arg = si.args[j]; - kv_args_.push_back(arg); - if (si.key_step == 1 || j % si.key_step == 0) { - kv_fp_.push_back(LockTag(arg).Fingerprint()); + for (size_t j = 0; j < src.slices.size(); ++j) { + IndexSlice slice = src.slices[j]; + args_slices_.push_back(slice); + for (uint32_t k = slice.first; k < slice.second; k += src.key_step) { + string_view key = ArgS(full_args_, k); + kv_fp_.push_back(LockTag(key).Fingerprint()); sd.fp_count++; } - if (rev_mapping) - reverse_index_.push_back(si.original_index[j]); } } - - DCHECK_EQ(kv_args_.size(), num_args); } void Transaction::PrepareMultiFps(CmdArgList keys) { @@ -277,22 +267,13 @@ void Transaction::PrepareMultiFps(CmdArgList keys) { void Transaction::StoreKeysInArgs(const KeyIndex& key_index) { DCHECK(!key_index.bonus); DCHECK(kv_fp_.empty()); + DCHECK(args_slices_.empty()); // even for a single key we may have multiple arguments per key (MSET). + args_slices_.emplace_back(key_index.start, key_index.end); for (unsigned j = key_index.start; j < key_index.end; j += key_index.step) { - string_view arg = ArgS(full_args_, j); - kv_args_.push_back(arg); - kv_fp_.push_back(LockTag(arg).Fingerprint()); - - for (unsigned k = j + 1; k < j + key_index.step; ++k) - kv_args_.push_back(ArgS(full_args_, k)); - } - - if (key_index.has_reverse_mapping) { - reverse_index_.resize(kv_args_.size()); - for (unsigned j = 0; j < reverse_index_.size(); ++j) { - reverse_index_[j] = j + key_index.start; - } + string_view key = ArgS(full_args_, j); + kv_fp_.push_back(LockTag(key).Fingerprint()); } } @@ -314,7 +295,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { StoreKeysInArgs(key_index); unique_shard_cnt_ = 1; - string_view akey = kv_args_.front(); + string_view akey = ArgS(full_args_, key_index.start); if (is_stub) // stub transactions don't migrate DCHECK_EQ(unique_shard_id_, Shard(akey, shard_set->size())); else { @@ -340,11 +321,11 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { BuildShardIndex(key_index, &shard_index); // Initialize shard data based on distributed arguments. - InitShardData(shard_index, key_index.num_args(), key_index.has_reverse_mapping); + InitShardData(shard_index, key_index.num_args()); DCHECK(!multi_ || multi_->mode != LOCK_AHEAD || !multi_->tag_fps.empty()); - DVLOG(1) << "InitByArgs " << DebugId() << " " << kv_args_.front(); + DVLOG(1) << "InitByArgs " << DebugId() << facade::ToSV(full_args_.front()); // Compress shard data, if we occupy only one shard. if (unique_shard_cnt_ == 1) { @@ -357,15 +338,8 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { sd = &shard_data_.front(); sd->local_mask |= ACTIVE; } - sd->arg_count = -1; - sd->arg_start = -1; - } - - // Validation. Check reverse mapping was built correctly. - if (key_index.has_reverse_mapping) { - for (size_t i = 0; i < kv_args_.size(); ++i) { - DCHECK_EQ(kv_args_[i], ArgS(full_args_, reverse_index_[i])) << full_args_; - } + sd->slice_count = -1; + sd->slice_start = -1; } // Validation. @@ -396,7 +370,7 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) { } DCHECK_EQ(unique_shard_cnt_, 0u); - DCHECK(kv_args_.empty()); + DCHECK(args_slices_.empty()); DCHECK(kv_fp_.empty()); OpResult key_index = DetermineKeys(cid_, args); @@ -427,8 +401,8 @@ void Transaction::PrepareSquashedMultiHop(const CommandId* cid, } else { shard_data_[i].local_mask &= ~ACTIVE; } - shard_data_[i].arg_start = 0; - shard_data_[i].arg_count = 0; + shard_data_[i].slice_start = 0; + shard_data_[i].slice_count = 0; } MultiBecomeSquasher(); @@ -485,15 +459,14 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) { unique_shard_id_ = 0; unique_shard_cnt_ = 0; - kv_args_.clear(); + args_slices_.clear(); kv_fp_.clear(); - reverse_index_.clear(); cid_ = cid; cb_ptr_ = nullptr; for (auto& sd : shard_data_) { - sd.arg_count = sd.arg_start = 0; + sd.slice_count = sd.slice_start = 0; if (multi_->mode == NON_ATOMIC) { sd.local_mask = 0; // Non atomic transactions schedule each time, so remove all flags @@ -555,7 +528,6 @@ void Transaction::PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdA EnableShard(sid); OpResult key_index = DetermineKeys(cid_, args); CHECK(key_index); - DCHECK(!key_index->has_reverse_mapping); StoreKeysInArgs(*key_index); } @@ -1181,23 +1153,12 @@ ShardArgs Transaction::GetShardArgs(ShardId sid) const { // We can read unique_shard_cnt_ only because ShardArgsInShard is called after IsArmedInShard // barrier. if (unique_shard_cnt_ == 1) { - return kv_args_; + return ShardArgs{full_args_, absl::MakeSpan(args_slices_)}; } const auto& sd = shard_data_[sid]; - return ShardArgs{kv_args_.data() + sd.arg_start, sd.arg_count}; -} - -// from local index back to original arg index skipping the command. -// i.e. returns (first_key_pos -1) or bigger. -size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const { - DCHECK_LT(arg_index, reverse_index_.size()); - - if (unique_shard_cnt_ == 1) - return reverse_index_[arg_index]; - - const auto& sd = shard_data_[shard_id]; - return reverse_index_[sd.arg_start + arg_index]; + return ShardArgs{full_args_, + absl::MakeSpan(args_slices_.data() + sd.slice_start, sd.slice_count)}; } OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider, @@ -1373,7 +1334,7 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid, string_view // Change state to awaked and store index of awakened key sd.local_mask &= ~SUSPENDED_Q; sd.local_mask |= AWAKED_Q; - sd.wake_key_pos = it - args.begin(); + sd.wake_key_pos = it.index(); blocking_barrier_.Close(); return true; @@ -1384,8 +1345,8 @@ optional Transaction::GetWakeKey(ShardId sid) const { if ((sd.local_mask & AWAKED_Q) == 0) return nullopt; - CHECK_NE(sd.wake_key_pos, UINT16_MAX); - return GetShardArgs(sid).at(sd.wake_key_pos); + CHECK_LT(sd.wake_key_pos, full_args_.size()); + return ArgS(full_args_, sd.wake_key_pos); } void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult result) { @@ -1421,10 +1382,11 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult resul journal::Entry::Payload entry_payload; string_view cmd{cid_->name()}; - if (unique_shard_cnt_ == 1 || kv_args_.empty()) { + if (unique_shard_cnt_ == 1 || args_slices_.empty()) { entry_payload = journal::Entry::Payload(cmd, full_args_); } else { - entry_payload = journal::Entry::Payload(cmd, GetShardArgs(shard->shard_id()).AsSlice()); + ShardArgs shard_args = GetShardArgs(shard->shard_id()); + entry_payload = journal::Entry::Payload(cmd, shard_args); } LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false, true); } @@ -1511,10 +1473,6 @@ OpResult DetermineKeys(const CommandId* cid, CmdArgList args) { int num_custom_keys = -1; - if (cid->opt_mask() & CO::REVERSE_MAPPING) { - key_index.has_reverse_mapping = true; - } - if (cid->opt_mask() & CO::VARIADIC_KEYS) { // ZUNION/INTER [ ...] // EVAL