From b5b50931650c64c18aa82ca135d2fe019d9259b2 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 1 Feb 2024 17:39:14 +0200 Subject: [PATCH] fix: fix BLOCKING/REVERSE_MAPPING flags for some commands (#2516) * fix: BLOCKING/REVERSE_MAPPING flags for some commands Also, simplify interfaces around REVERSE_MAPPING in the internal tx code. --------- Signed-off-by: Roman Gershman --- src/server/command_registry.h | 4 ++++ src/server/common.h | 6 +++--- src/server/script_mgr.cc | 2 +- src/server/set_family.cc | 5 ++--- src/server/stream_family.cc | 9 +++------ src/server/transaction.cc | 35 ++++++++++++++++++++--------------- src/server/transaction.h | 4 ++-- src/server/zset_family.cc | 6 +++--- 8 files changed, 38 insertions(+), 33 deletions(-) diff --git a/src/server/command_registry.h b/src/server/command_registry.h index e19203db4..20069ebec 100644 --- a/src/server/command_registry.h +++ b/src/server/command_registry.h @@ -26,6 +26,10 @@ enum CommandOpt : uint32_t { WRITE = 1U << 2, LOADING = 1U << 3, // Command allowed during LOADING state. DENYOOM = 1U << 4, // use-memory in redis. + + // marked commands that demand preserve the order of keys to work correctly. + // For example, MGET needs to know the order of keys to return the values in the same order. + // BLPOP needs to know the order of keys to return the first non-empty list from the left. REVERSE_MAPPING = 1U << 5, VARIADIC_KEYS = 1U << 6, // arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc. diff --git a/src/server/common.h b/src/server/common.h index 6a9ca386b..325a4ea7d 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -71,13 +71,13 @@ struct KeyIndex { // if index is non-zero then adds another key index (usually 0). // relevant for for commands like ZUNIONSTORE/ZINTERSTORE for destination key. std::optional bonus{}; + bool has_reverse_mapping = false; - static KeyIndex Empty() { - return KeyIndex{0, 0, 0, std::nullopt}; + KeyIndex(unsigned s = 0, unsigned e = 0, unsigned step = 0) : start(s), end(e), step(step) { } static KeyIndex Range(unsigned start, unsigned end, unsigned step = 1) { - return KeyIndex{start, end, step, std::nullopt}; + return KeyIndex{start, end, step}; } bool HasSingleKey() const { diff --git a/src/server/script_mgr.cc b/src/server/script_mgr.cc index fe30f928e..cdd2cf91f 100644 --- a/src/server/script_mgr.cc +++ b/src/server/script_mgr.cc @@ -199,7 +199,7 @@ void ScriptMgr::LatencyCmd(ConnectionContext* cntx) const { for (const auto& k_v : result) { rb->StartArray(2); rb->SendBulkString(k_v.first); - rb->SendBulkString(k_v.second.ToString()); + rb->SendVerbatimString(k_v.second.ToString()); } } diff --git a/src/server/set_family.cc b/src/server/set_family.cc index 64207ff83..bf3f2fa85 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -1699,9 +1699,8 @@ void SetFamily::Register(CommandRegistry* registry) { << CI{"SINTERSTORE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, -3, 1, -1, acl::kSInterStore} .HFUNC(SInterStore) - << CI{"SINTERCARD", CO::READONLY | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS, -3, 2, 2, - acl::kSInterCard} - .HFUNC(SInterCard) + << CI{"SINTERCARD", CO::READONLY | CO::VARIADIC_KEYS, -3, 2, 2, acl::kSInterCard}.HFUNC( + SInterCard) << CI{"SMEMBERS", CO::READONLY, 2, 1, 1, acl::kSMembers}.HFUNC(SMembers) << CI{"SISMEMBER", CO::FAST | CO::READONLY, 3, 1, 1, acl::kSIsMember}.HFUNC(SIsMember) << CI{"SMISMEMBER", CO::READONLY, -3, 1, 1, acl::kSMIsMember}.HFUNC(SMIsMember) diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 512a284a8..be37a2908 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -3327,6 +3327,7 @@ constexpr uint32_t kXAutoClaim = WRITE | STREAM | FAST; void StreamFamily::Register(CommandRegistry* registry) { using CI = CommandId; registry->StartFamily(); + constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS; *registry << CI{"XADD", CO::WRITE | CO::DENYOOM | CO::FAST, -5, 1, 1, acl::kXAdd}.HFUNC(XAdd) << CI{"XCLAIM", CO::WRITE | CO::FAST, -6, 1, 1, acl::kXClaim}.HFUNC(XClaim) << CI{"XDEL", CO::WRITE | CO::FAST, -3, 1, 1, acl::kXDel}.HFUNC(XDel) @@ -3336,12 +3337,8 @@ void StreamFamily::Register(CommandRegistry* registry) { << CI{"XPENDING", CO::READONLY, -2, 1, 1, acl::kXPending}.HFUNC(XPending) << CI{"XRANGE", CO::READONLY, -4, 1, 1, acl::kXRange}.HFUNC(XRange) << CI{"XREVRANGE", CO::READONLY, -4, 1, 1, acl::kXRevRange}.HFUNC(XRevRange) - << CI{"XREAD", CO::READONLY | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS, -3, 3, 3, - acl::kXRead} - .HFUNC(XRead) - << CI{"XREADGROUP", CO::READONLY | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS, -6, 6, 6, - acl::kXReadGroup} - .HFUNC(XReadGroup) + << CI{"XREAD", kReadFlags, -3, 3, 3, acl::kXRead}.HFUNC(XRead) + << CI{"XREADGROUP", kReadFlags, -6, 6, 6, acl::kXReadGroup}.HFUNC(XReadGroup) << CI{"XSETID", CO::WRITE, 3, 1, 1, acl::kXSetId}.HFUNC(XSetId) << CI{"XTRIM", CO::WRITE | CO::FAST, -4, 1, 1, acl::kXTrim}.HFUNC(XTrim) << CI{"_XGROUP_HELP", CO::NOSCRIPT | CO::HIDDEN, 2, 0, 0, acl::kXGroupHelp}.SetHandler( diff --git a/src/server/transaction.cc b/src/server/transaction.cc index a56ba42b7..945843a30 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -173,13 +173,13 @@ void Transaction::InitGlobal() { EnableAllShards(); } -void Transaction::BuildShardIndex(const KeyIndex& key_index, bool rev_mapping, - std::vector* out) { +void Transaction::BuildShardIndex(const KeyIndex& key_index, std::vector* out) { auto args = full_args_; auto& shard_index = *out; - auto add = [this, rev_mapping, &shard_index](uint32_t sid, uint32_t i) { + auto add = [this, rev_mapping = key_index.has_reverse_mapping, &shard_index](uint32_t sid, + uint32_t i) { string_view val = ArgS(full_args_, i); shard_index[sid].args.push_back(val); if (rev_mapping) @@ -266,7 +266,7 @@ void Transaction::LaunderKeyStorage(CmdArgVec* keys) { keys->emplace_back(key.data(), key.size()); } -void Transaction::StoreKeysInArgs(KeyIndex key_index, bool rev_mapping) { +void Transaction::StoreKeysInArgs(const KeyIndex& key_index) { DCHECK(!key_index.bonus); DCHECK(key_index.step == 1u || key_index.step == 2u); @@ -277,7 +277,7 @@ void Transaction::StoreKeysInArgs(KeyIndex key_index, bool rev_mapping) { kv_args_.push_back(ArgS(full_args_, ++j)); } - if (rev_mapping) { + if (key_index.has_reverse_mapping) { reverse_index_.resize(kv_args_.size()); for (unsigned j = 0; j < reverse_index_.size(); ++j) { reverse_index_[j] = j + key_index.start; @@ -293,8 +293,6 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { DCHECK_LT(key_index.start, full_args_.size()); - bool needs_reverse_mapping = cid_->opt_mask() & CO::REVERSE_MAPPING; - // Stub transactions always operate only on single shard. bool is_stub = multi_ && multi_->role == SQUASHED_STUB; @@ -302,7 +300,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { DCHECK(!IsActiveMulti() || multi_->mode == NON_ATOMIC); // We don't have to split the arguments by shards, so we can copy them directly. - StoreKeysInArgs(key_index, needs_reverse_mapping); + StoreKeysInArgs(key_index); unique_shard_cnt_ = 1; if (is_stub) // stub transactions don't migrate @@ -328,10 +326,10 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { auto& shard_index = tmp_space.GetShardIndex(shard_data_.size()); // Distribute all the arguments by shards. - BuildShardIndex(key_index, needs_reverse_mapping, &shard_index); + BuildShardIndex(key_index, &shard_index); // Initialize shard data based on distributed arguments. - InitShardData(shard_index, key_index.num_args(), needs_reverse_mapping); + InitShardData(shard_index, key_index.num_args(), key_index.has_reverse_mapping); DCHECK(!multi_ || multi_->mode != LOCK_AHEAD || !multi_->frozen_keys.empty()); @@ -353,7 +351,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) { } // Validation. Check reverse mapping was built correctly. - if (needs_reverse_mapping) { + if (key_index.has_reverse_mapping) { for (size_t i = 0; i < kv_args_.size(); ++i) { DCHECK_EQ(kv_args_[i], ArgS(full_args_, reverse_index_[i])) << full_args_; } @@ -515,7 +513,8 @@ void Transaction::PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdA EnableShard(sid); OpResult key_index = DetermineKeys(cid_, args); CHECK(key_index); - StoreKeysInArgs(*key_index, false); + DCHECK(!key_index->has_reverse_mapping); + StoreKeysInArgs(*key_index); } // Runs in the dbslice thread. Returns true if the transaction continues running in the thread. @@ -1195,6 +1194,8 @@ ArgSlice Transaction::GetShardArgs(ShardId sid) const { // from local index back to original arg index skipping the command. // i.e. returns (first_key_pos -1) or bigger. size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const { + DCHECK_LT(arg_index, reverse_index_.size()); + if (unique_shard_cnt_ == 1) return reverse_index_[arg_index]; @@ -1495,13 +1496,17 @@ void Transaction::CancelBlocking(std::function status_cb) { } OpResult DetermineKeys(const CommandId* cid, CmdArgList args) { - if (cid->opt_mask() & (CO::GLOBAL_TRANS | CO::NO_KEY_TRANSACTIONAL)) - return KeyIndex::Empty(); - KeyIndex key_index; + if (cid->opt_mask() & (CO::GLOBAL_TRANS | CO::NO_KEY_TRANSACTIONAL)) + return key_index; + int num_custom_keys = -1; + if (cid->opt_mask() & CO::REVERSE_MAPPING) { + key_index.has_reverse_mapping = true; + } + if (cid->opt_mask() & CO::VARIADIC_KEYS) { // ZUNION/INTER [ ...] // EVAL