From 25e6930ac36f048887dcd39909e9cbef88824715 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Wed, 8 May 2024 03:01:37 +0300 Subject: [PATCH] =?UTF-8?q?Revert=20"chore:=20get=20rid=20of=20kv=5Fargs?= =?UTF-8?q?=20and=20replace=20it=20with=20slices=20to=20full=5F=E2=80=A6?= =?UTF-8?q?=20(#3024)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Revert "chore: get rid of kv_args and replace it with slices to full_args (#2942)" This reverts commit de0e5cb0bd7f8357ebc39d22d2882b97dd982ce9. --- src/server/command_registry.cc | 5 ++ src/server/command_registry.h | 7 +- src/server/container_utils.cc | 12 +-- src/server/journal/types.h | 6 +- src/server/json_family.cc | 13 ++-- src/server/list_family.cc | 27 +------ src/server/stream_family.cc | 12 ++- src/server/string_family.cc | 50 +++++-------- src/server/transaction.cc | 132 ++++++++++++++++++++++----------- src/server/transaction.h | 26 ++++--- src/server/tx_base.cc | 16 +--- src/server/tx_base.h | 83 ++++++--------------- src/server/zset_family.cc | 12 +-- 13 files changed, 185 insertions(+), 216 deletions(-) diff --git a/src/server/command_registry.cc b/src/server/command_registry.cc index 82548d8d8..1ad459ca9 100644 --- a/src/server/command_registry.cc +++ b/src/server/command_registry.cc @@ -40,6 +40,9 @@ CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first : facade::CommandId(name, mask, arity, first_key, last_key, acl_categories) { if (mask & CO::ADMIN) opt_mask_ |= CO::NOSCRIPT; + + if (mask & CO::BLOCKING) + opt_mask_ |= CO::REVERSE_MAPPING; } bool CommandId::IsTransactional() const { @@ -170,6 +173,8 @@ const char* OptName(CO::CommandOpt fl) { return "readonly"; case DENYOOM: return "denyoom"; + case REVERSE_MAPPING: + return "reverse-mapping"; case FAST: return "fast"; case LOADING: diff --git a/src/server/command_registry.h b/src/server/command_registry.h index d6ba00921..2b3ccb0c4 100644 --- a/src/server/command_registry.h +++ b/src/server/command_registry.h @@ -27,13 +27,16 @@ enum CommandOpt : uint32_t { LOADING = 1U << 3, // Command allowed during LOADING state. DENYOOM = 1U << 4, // use-memory in redis. - // UNUSED = 1U << 5, + // marked commands that demand preserve the order of keys to work correctly. + // For example, MGET needs to know the order of keys to return the values in the same order. + // BLPOP needs to know the order of keys to return the first non-empty list from the left. + REVERSE_MAPPING = 1U << 5, VARIADIC_KEYS = 1U << 6, // arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc. ADMIN = 1U << 7, // implies NOSCRIPT, NOSCRIPT = 1U << 8, - BLOCKING = 1U << 9, + BLOCKING = 1U << 9, // implies REVERSE_MAPPING HIDDEN = 1U << 10, // does not show in COMMAND command output INTERLEAVED_KEYS = 1U << 11, // keys are interleaved with arguments GLOBAL_TRANS = 1U << 12, diff --git a/src/server/container_utils.cc b/src/server/container_utils.cc index ce79da6c8..3b0402c7b 100644 --- a/src/server/container_utils.cc +++ b/src/server/container_utils.cc @@ -40,12 +40,14 @@ OpResult> FindFirstReadOnly(const Db int req_obj_type) { DCHECK(!args.Empty()); - for (auto it = args.begin(); it != args.end(); ++it) { - OpResult res = db_slice.FindReadOnly(cntx, *it, req_obj_type); + unsigned i = 0; + for (string_view key : args) { + OpResult res = db_slice.FindReadOnly(cntx, key, req_obj_type); if (res) - return make_pair(res.value(), unsigned(it.index())); + return make_pair(res.value(), i); if (res.status() != OpStatus::KEY_NOTFOUND) return res.status(); + ++i; } VLOG(2) << "FindFirst not found"; @@ -117,8 +119,8 @@ OpResult FindFirstNonEmpty(Transaction* trans, int req_obj_type) auto comp = [trans](const OpResult& lhs, const OpResult& rhs) { if (!lhs || !rhs) return lhs.ok(); - size_t i1 = std::get<1>(*lhs); - size_t i2 = std::get<1>(*rhs); + size_t i1 = trans->ReverseArgIndex(std::get(*lhs), std::get(*lhs)); + size_t i2 = trans->ReverseArgIndex(std::get(*rhs), std::get(*rhs)); return i1 < i2; }; diff --git a/src/server/journal/types.h b/src/server/journal/types.h index aeb0286ca..4d86e3325 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -42,8 +42,8 @@ struct Entry : public EntryBase { struct Payload { std::string_view cmd; std::variant + ShardArgs // Command and its shard parts. + > args; Payload() = default; @@ -51,8 +51,6 @@ struct Entry : public EntryBase { } Payload(std::string_view c, const ShardArgs& a) : cmd(c), args(a) { } - Payload(std::string_view c, ArgSlice a) : cmd(c), args(a) { - } }; Entry(TxId txid, Op opcode, DbIndex dbid, uint32_t shard_cnt, diff --git a/src/server/json_family.cc b/src/server/json_family.cc index 0617ebc55..da5ff13ed 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -1543,14 +1543,12 @@ void JsonFamily::MGet(CmdArgList args, ConnectionContext* cntx) { continue; vector& res = mget_resp[sid]; - ShardArgs shard_args = transaction->GetShardArgs(sid); - unsigned src_index = 0; - for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_index) { - if (!res[src_index]) + for (size_t j = 0; j < res.size(); ++j) { + if (!res[j]) continue; - uint32_t dst_indx = it.index(); - results[dst_indx] = std::move(res[src_index]); + uint32_t indx = transaction->ReverseArgIndex(sid, j); + results[indx] = std::move(res[j]); } } @@ -2093,7 +2091,8 @@ void JsonFamily::Register(CommandRegistry* registry) { constexpr size_t kMsetFlags = CO::WRITE | CO::DENYOOM | CO::FAST | CO::INTERLEAVED_KEYS; registry->StartFamily(); *registry << CI{"JSON.GET", CO::READONLY | CO::FAST, -2, 1, 1, acl::JSON}.HFUNC(Get); - *registry << CI{"JSON.MGET", CO::READONLY | CO::FAST, -3, 1, -2, acl::JSON}.HFUNC(MGet); + *registry << CI{"JSON.MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING, -3, 1, -2, acl::JSON} + .HFUNC(MGet); *registry << CI{"JSON.TYPE", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(Type); *registry << CI{"JSON.STRLEN", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(StrLen); *registry << CI{"JSON.OBJLEN", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(ObjLen); diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 9515c7239..79adbf336 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -158,22 +158,6 @@ struct CircularMessages { // Used to recover logs for BLPOP failures. See OpBPop. thread_local CircularMessages debugMessages{50}; -// A bit awkward translation from a single key to ShardArgs. -// We create a mutable slice (which will never be mutated) from the key, then we create -// a CmdArgList of size 1 that references mslice and finally -// we reference the first element in the CmdArgList via islice. -struct SingleArg { - MutableSlice mslice; - IndexSlice islice{0, 1}; - - SingleArg(string_view arg) : mslice(const_cast(arg.data()), arg.size()) { - } - - ShardArgs Get() { - return ShardArgs{CmdArgList{&mslice, 1}, absl::MakeSpan(&islice, 1)}; - } -}; - class BPopPusher { public: BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir); @@ -464,9 +448,7 @@ OpResult MoveTwoShards(Transaction* trans, string_view src, string_view // hack, again. since we hacked which queue we are waiting on (see RunPair) // we must clean-up src key here manually. See RunPair why we do this. // in short- we suspended on "src" on both shards. - - SingleArg single_arg{src}; - shard->blocking_controller()->FinalizeWatched(single_arg.Get(), t); + shard->blocking_controller()->FinalizeWatched(ArgSlice{&src, 1}, t); } } else { DVLOG(1) << "Popping value from list: " << key; @@ -891,8 +873,7 @@ OpResult BPopPusher::RunSingle(ConnectionContext* cntx, time_point tp) { return op_res; } - SingleArg single_arg{pop_key_}; - auto wcb = [&](Transaction* t, EngineShard* shard) { return single_arg.Get(); }; + auto wcb = [&](Transaction* t, EngineShard* shard) { return ShardArgs{&this->pop_key_, 1}; }; const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*, std::string_view key) -> bool { @@ -919,13 +900,11 @@ OpResult BPopPusher::RunPair(ConnectionContext* cntx, time_point tp) { return op_res; } - SingleArg single_arg(this->pop_key_); - // a hack: we watch in both shards for pop_key but only in the source shard it's relevant. // Therefore we follow the regular flow of watching the key but for the destination shard it // will never be triggerred. // This allows us to run Transaction::Execute on watched transactions in both shards. - auto wcb = [&](Transaction* t, EngineShard* shard) { return single_arg.Get(); }; + auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; }; const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*, std::string_view key) -> bool { diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 675871ac6..3274c439c 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -2989,19 +2989,17 @@ void XReadImpl(CmdArgList args, std::optional opts, ConnectionContext* continue; vector& results = xread_resp[sid]; - unsigned src_index = 0; - ShardArgs shard_args = cntx->transaction->GetShardArgs(sid); - for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_index) { - if (results[src_index].size() == 0) { + for (size_t i = 0; i < results.size(); ++i) { + if (results[i].size() == 0) { continue; } resolved_streams++; // Add the stream records ordered by the original stream arguments. - size_t dst_indx = it.index(); - res[dst_indx - opts->streams_arg] = std::move(results[src_index]); + size_t indx = cntx->transaction->ReverseArgIndex(sid, i); + res[indx - opts->streams_arg] = std::move(results[i]); } } @@ -3325,7 +3323,7 @@ constexpr uint32_t kXAutoClaim = WRITE | STREAM | FAST; void StreamFamily::Register(CommandRegistry* registry) { using CI = CommandId; registry->StartFamily(); - constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::VARIADIC_KEYS; + constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS; *registry << CI{"XADD", CO::WRITE | CO::DENYOOM | CO::FAST, -5, 1, 1, acl::kXAdd}.HFUNC(XAdd) << CI{"XCLAIM", CO::WRITE | CO::FAST, -6, 1, 1, acl::kXClaim}.HFUNC(XClaim) << CI{"XDEL", CO::WRITE | CO::FAST, -3, 1, 1, acl::kXDel}.HFUNC(XDel) diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 4b3bfe78f..f9e0cd65a 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -270,7 +270,6 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success) SetCmd sg(op_args, false); size_t index = 0; - bool partial = false; for (auto it = args.begin(); it != args.end(); ++it) { string_view key = *it; ++it; @@ -278,7 +277,6 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success) DVLOG(1) << "MSet " << key << ":" << value; if (sg.Set(params, key, value) != OpStatus::OK) { // OOM for example. success->store(false); - partial = true; break; } index += 2; @@ -287,29 +285,18 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success) if (auto journal = op_args.shard->journal(); journal) { // We write a custom journal because an OOM in the above loop could lead to partial success, so // we replicate only what was changed. - if (partial) { - string_view cmd; - ArgSlice cmd_args; - vector store_args(index); - if (index == 0) { - // All shards must record the tx was executed for the replica to execute it, so we send a - // PING in case nothing was changed - cmd = "PING"; - } else { - // journal [0, i) - cmd = "MSET"; - unsigned i = 0; - for (string_view arg : args) { - store_args[i++] = arg; - if (i >= store_args.size()) - break; - } - cmd_args = absl::MakeSpan(store_args); - } - RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt()); + string_view cmd; + ArgSlice cmd_args; + if (index == 0) { + // All shards must record the tx was executed for the replica to execute it, so we send a PING + // in case nothing was changed + cmd = "PING"; } else { - RecordJournal(op_args, "MSET", args, op_args.tx->GetUniqueShardCnt()); + // journal [0, i) + cmd = "MSET"; + cmd_args = ArgSlice(args.begin(), index); } + RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt()); } } @@ -1179,17 +1166,16 @@ void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) { src.storage_list->next = res.storage_list; res.storage_list = src.storage_list; src.storage_list = nullptr; - ShardArgs shard_args = transaction->GetShardArgs(sid); - unsigned src_indx = 0; - for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_indx) { - if (!src.resp_arr[src_indx]) + + for (size_t j = 0; j < src.resp_arr.size(); ++j) { + if (!src.resp_arr[j]) continue; - uint32_t indx = it.index(); + uint32_t indx = transaction->ReverseArgIndex(sid, j); - res.resp_arr[indx] = std::move(src.resp_arr[src_indx]); + res.resp_arr[indx] = std::move(src.resp_arr[j]); if (cntx->protocol() == Protocol::MEMCACHE) { - res.resp_arr[indx]->key = *it; + res.resp_arr[indx]->key = ArgS(args, indx); } } } @@ -1505,7 +1491,9 @@ void StringFamily::Register(CommandRegistry* registry) { << CI{"GETEX", CO::WRITE | CO::DENYOOM | CO::FAST | CO::NO_AUTOJOURNAL, -1, 1, 1, acl::kGetEx} .HFUNC(GetEx) << CI{"GETSET", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, acl::kGetSet}.HFUNC(GetSet) - << CI{"MGET", CO::READONLY | CO::FAST | CO::IDEMPOTENT, -2, 1, -1, acl::kMGet}.HFUNC(MGet) + << CI{"MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING | CO::IDEMPOTENT, -2, 1, -1, + acl::kMGet} + .HFUNC(MGet) << CI{"MSET", kMSetMask, -3, 1, -1, acl::kMSet}.HFUNC(MSet) << CI{"MSETNX", kMSetMask, -3, 1, -1, acl::kMSetNx}.HFUNC(MSetNx) << CI{"STRLEN", CO::READONLY | CO::FAST, 2, 1, 1, acl::kStrLen}.HFUNC(StrLen) diff --git a/src/server/transaction.cc b/src/server/transaction.cc index dfe603a20..f591eb61f 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -184,13 +184,12 @@ void Transaction::InitGlobal() { void Transaction::BuildShardIndex(const KeyIndex& key_index, std::vector* out) { auto& shard_index = *out; - auto add = [this, &shard_index](uint32_t sid, uint32_t b, uint32_t e) { - auto& slices = shard_index[sid].slices; - if (!slices.empty() && slices.back().second == b) { - slices.back().second = e; - } else { - slices.emplace_back(b, e); - } + auto add = [this, rev_mapping = key_index.has_reverse_mapping, &shard_index](uint32_t sid, + uint32_t i) { + string_view val = ArgS(full_args_, i); + shard_index[sid].args.push_back(val); + if (rev_mapping) + shard_index[sid].original_index.push_back(i); }; if (key_index.bonus) { @@ -198,39 +197,47 @@ void Transaction::BuildShardIndex(const KeyIndex& key_index, std::vector shard_index, size_t num_args) { - args_slices_.reserve(num_args); +void Transaction::InitShardData(absl::Span shard_index, size_t num_args, + bool rev_mapping) { + kv_args_.reserve(num_args); DCHECK(kv_fp_.empty()); kv_fp_.reserve(num_args); + if (rev_mapping) + reverse_index_.reserve(num_args); + // Store the concatenated per-shard arguments from the shard index inside kv_args_ // and make each shard data point to its own sub-span inside kv_args_. for (size_t i = 0; i < shard_data_.size(); ++i) { auto& sd = shard_data_[i]; - const auto& src = shard_index[i]; + const auto& si = shard_index[i]; - sd.slice_count = src.slices.size(); - sd.slice_start = args_slices_.size(); + sd.arg_count = si.args.size(); + sd.arg_start = kv_args_.size(); sd.fp_start = kv_fp_.size(); sd.fp_count = 0; // Multi transactions can re-initialize on different shards, so clear ACTIVE flag. DCHECK_EQ(sd.local_mask & ACTIVE, 0); - if (sd.slice_count == 0) + if (sd.arg_count == 0) continue; sd.local_mask |= ACTIVE; @@ -238,16 +245,19 @@ void Transaction::InitShardData(absl::Span shard_index, siz unique_shard_cnt_++; unique_shard_id_ = i; - for (size_t j = 0; j < src.slices.size(); ++j) { - IndexSlice slice = src.slices[j]; - args_slices_.push_back(slice); - for (uint32_t k = slice.first; k < slice.second; k += src.key_step) { - string_view key = ArgS(full_args_, k); - kv_fp_.push_back(LockTag(key).Fingerprint()); + for (size_t j = 0; j < si.args.size(); ++j) { + string_view arg = si.args[j]; + kv_args_.push_back(arg); + if (si.key_step == 1 || j % si.key_step == 0) { + kv_fp_.push_back(LockTag(arg).Fingerprint()); sd.fp_count++; } + if (rev_mapping) + reverse_index_.push_back(si.original_index[j]); } } + + DCHECK_EQ(kv_args_.size(), num_args); } void Transaction::PrepareMultiFps(CmdArgList keys) { @@ -267,13 +277,22 @@ void Transaction::PrepareMultiFps(CmdArgList keys) { void Transaction::StoreKeysInArgs(const KeyIndex& key_index) { DCHECK(!key_index.bonus); DCHECK(kv_fp_.empty()); - DCHECK(args_slices_.empty()); // even for a single key we may have multiple arguments per key (MSET). - args_slices_.emplace_back(key_index.start, key_index.end); for (unsigned j = key_index.start; j < key_index.end; j += key_index.step) { - string_view key = ArgS(full_args_, j); - kv_fp_.push_back(LockTag(key).Fingerprint()); + string_view arg = ArgS(full_args_, j); + kv_args_.push_back(arg); + kv_fp_.push_back(LockTag(arg).Fingerprint()); + + for (unsigned k = j + 1; k < j + key_index.step; ++k) + kv_args_.push_back(ArgS(full_args_, k)); + } + + if (key_index.has_reverse_mapping) { + reverse_index_.resize(kv_args_.size()); + for (unsigned j = 0; j < reverse_index_.size(); ++j) { + reverse_index_[j] = j + key_index.start; + } } } @@ -295,7 +314,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { StoreKeysInArgs(key_index); unique_shard_cnt_ = 1; - string_view akey = ArgS(full_args_, key_index.start); + string_view akey = kv_args_.front(); if (is_stub) // stub transactions don't migrate DCHECK_EQ(unique_shard_id_, Shard(akey, shard_set->size())); else { @@ -321,11 +340,11 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { BuildShardIndex(key_index, &shard_index); // Initialize shard data based on distributed arguments. - InitShardData(shard_index, key_index.num_args()); + InitShardData(shard_index, key_index.num_args(), key_index.has_reverse_mapping); DCHECK(!multi_ || multi_->mode != LOCK_AHEAD || !multi_->tag_fps.empty()); - DVLOG(1) << "InitByArgs " << DebugId() << facade::ToSV(full_args_.front()); + DVLOG(1) << "InitByArgs " << DebugId() << " " << kv_args_.front(); // Compress shard data, if we occupy only one shard. if (unique_shard_cnt_ == 1) { @@ -338,8 +357,15 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { sd = &shard_data_.front(); sd->local_mask |= ACTIVE; } - sd->slice_count = -1; - sd->slice_start = -1; + sd->arg_count = -1; + sd->arg_start = -1; + } + + // Validation. Check reverse mapping was built correctly. + if (key_index.has_reverse_mapping) { + for (size_t i = 0; i < kv_args_.size(); ++i) { + DCHECK_EQ(kv_args_[i], ArgS(full_args_, reverse_index_[i])) << full_args_; + } } // Validation. @@ -370,7 +396,7 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) { } DCHECK_EQ(unique_shard_cnt_, 0u); - DCHECK(args_slices_.empty()); + DCHECK(kv_args_.empty()); DCHECK(kv_fp_.empty()); OpResult key_index = DetermineKeys(cid_, args); @@ -401,8 +427,8 @@ void Transaction::PrepareSquashedMultiHop(const CommandId* cid, } else { shard_data_[i].local_mask &= ~ACTIVE; } - shard_data_[i].slice_start = 0; - shard_data_[i].slice_count = 0; + shard_data_[i].arg_start = 0; + shard_data_[i].arg_count = 0; } MultiBecomeSquasher(); @@ -459,14 +485,15 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) { unique_shard_id_ = 0; unique_shard_cnt_ = 0; - args_slices_.clear(); + kv_args_.clear(); kv_fp_.clear(); + reverse_index_.clear(); cid_ = cid; cb_ptr_ = nullptr; for (auto& sd : shard_data_) { - sd.slice_count = sd.slice_start = 0; + sd.arg_count = sd.arg_start = 0; if (multi_->mode == NON_ATOMIC) { sd.local_mask = 0; // Non atomic transactions schedule each time, so remove all flags @@ -528,6 +555,7 @@ void Transaction::PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdA EnableShard(sid); OpResult key_index = DetermineKeys(cid_, args); CHECK(key_index); + DCHECK(!key_index->has_reverse_mapping); StoreKeysInArgs(*key_index); } @@ -1153,12 +1181,23 @@ ShardArgs Transaction::GetShardArgs(ShardId sid) const { // We can read unique_shard_cnt_ only because ShardArgsInShard is called after IsArmedInShard // barrier. if (unique_shard_cnt_ == 1) { - return ShardArgs{full_args_, absl::MakeSpan(args_slices_)}; + return kv_args_; } const auto& sd = shard_data_[sid]; - return ShardArgs{full_args_, - absl::MakeSpan(args_slices_.data() + sd.slice_start, sd.slice_count)}; + return ShardArgs{kv_args_.data() + sd.arg_start, sd.arg_count}; +} + +// from local index back to original arg index skipping the command. +// i.e. returns (first_key_pos -1) or bigger. +size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const { + DCHECK_LT(arg_index, reverse_index_.size()); + + if (unique_shard_cnt_ == 1) + return reverse_index_[arg_index]; + + const auto& sd = shard_data_[shard_id]; + return reverse_index_[sd.arg_start + arg_index]; } OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider, @@ -1334,7 +1373,7 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid, string_view // Change state to awaked and store index of awakened key sd.local_mask &= ~SUSPENDED_Q; sd.local_mask |= AWAKED_Q; - sd.wake_key_pos = it.index(); + sd.wake_key_pos = it - args.begin(); blocking_barrier_.Close(); return true; @@ -1345,8 +1384,8 @@ optional Transaction::GetWakeKey(ShardId sid) const { if ((sd.local_mask & AWAKED_Q) == 0) return nullopt; - CHECK_LT(sd.wake_key_pos, full_args_.size()); - return ArgS(full_args_, sd.wake_key_pos); + CHECK_NE(sd.wake_key_pos, UINT16_MAX); + return GetShardArgs(sid).at(sd.wake_key_pos); } void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult result) { @@ -1384,11 +1423,10 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult resul journal::Entry::Payload entry_payload; string_view cmd{cid_->name()}; - if (unique_shard_cnt_ == 1 || args_slices_.empty()) { + if (unique_shard_cnt_ == 1 || kv_args_.empty()) { entry_payload = journal::Entry::Payload(cmd, full_args_); } else { - ShardArgs shard_args = GetShardArgs(shard->shard_id()); - entry_payload = journal::Entry::Payload(cmd, shard_args); + entry_payload = journal::Entry::Payload(cmd, GetShardArgs(shard->shard_id()).AsSlice()); } LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false, true); } @@ -1475,6 +1513,10 @@ OpResult DetermineKeys(const CommandId* cid, CmdArgList args) { int num_custom_keys = -1; + if (cid->opt_mask() & CO::REVERSE_MAPPING) { + key_index.has_reverse_mapping = true; + } + if (cid->opt_mask() & CO::VARIADIC_KEYS) { // ZUNION/INTER [ ...] // EVAL