fix: fix BLOCKING/REVERSE_MAPPING flags for some commands (#2516)

* fix: BLOCKING/REVERSE_MAPPING flags for some commands

Also, simplify interfaces around REVERSE_MAPPING in the internal tx code.
---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-02-01 17:39:14 +02:00 committed by GitHub
parent 5189dae118
commit b5b5093165
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 38 additions and 33 deletions

View file

@ -26,6 +26,10 @@ enum CommandOpt : uint32_t {
WRITE = 1U << 2, WRITE = 1U << 2,
LOADING = 1U << 3, // Command allowed during LOADING state. LOADING = 1U << 3, // Command allowed during LOADING state.
DENYOOM = 1U << 4, // use-memory in redis. DENYOOM = 1U << 4, // use-memory in redis.
// marked commands that demand preserve the order of keys to work correctly.
// For example, MGET needs to know the order of keys to return the values in the same order.
// BLPOP needs to know the order of keys to return the first non-empty list from the left.
REVERSE_MAPPING = 1U << 5, REVERSE_MAPPING = 1U << 5,
VARIADIC_KEYS = 1U << 6, // arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc. VARIADIC_KEYS = 1U << 6, // arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc.

View file

@ -71,13 +71,13 @@ struct KeyIndex {
// if index is non-zero then adds another key index (usually 0). // if index is non-zero then adds another key index (usually 0).
// relevant for for commands like ZUNIONSTORE/ZINTERSTORE for destination key. // relevant for for commands like ZUNIONSTORE/ZINTERSTORE for destination key.
std::optional<uint16_t> bonus{}; std::optional<uint16_t> bonus{};
bool has_reverse_mapping = false;
static KeyIndex Empty() { KeyIndex(unsigned s = 0, unsigned e = 0, unsigned step = 0) : start(s), end(e), step(step) {
return KeyIndex{0, 0, 0, std::nullopt};
} }
static KeyIndex Range(unsigned start, unsigned end, unsigned step = 1) { static KeyIndex Range(unsigned start, unsigned end, unsigned step = 1) {
return KeyIndex{start, end, step, std::nullopt}; return KeyIndex{start, end, step};
} }
bool HasSingleKey() const { bool HasSingleKey() const {

View file

@ -199,7 +199,7 @@ void ScriptMgr::LatencyCmd(ConnectionContext* cntx) const {
for (const auto& k_v : result) { for (const auto& k_v : result) {
rb->StartArray(2); rb->StartArray(2);
rb->SendBulkString(k_v.first); rb->SendBulkString(k_v.first);
rb->SendBulkString(k_v.second.ToString()); rb->SendVerbatimString(k_v.second.ToString());
} }
} }

View file

@ -1699,9 +1699,8 @@ void SetFamily::Register(CommandRegistry* registry) {
<< CI{"SINTERSTORE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, -3, 1, -1, << CI{"SINTERSTORE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, -3, 1, -1,
acl::kSInterStore} acl::kSInterStore}
.HFUNC(SInterStore) .HFUNC(SInterStore)
<< CI{"SINTERCARD", CO::READONLY | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS, -3, 2, 2, << CI{"SINTERCARD", CO::READONLY | CO::VARIADIC_KEYS, -3, 2, 2, acl::kSInterCard}.HFUNC(
acl::kSInterCard} SInterCard)
.HFUNC(SInterCard)
<< CI{"SMEMBERS", CO::READONLY, 2, 1, 1, acl::kSMembers}.HFUNC(SMembers) << CI{"SMEMBERS", CO::READONLY, 2, 1, 1, acl::kSMembers}.HFUNC(SMembers)
<< CI{"SISMEMBER", CO::FAST | CO::READONLY, 3, 1, 1, acl::kSIsMember}.HFUNC(SIsMember) << CI{"SISMEMBER", CO::FAST | CO::READONLY, 3, 1, 1, acl::kSIsMember}.HFUNC(SIsMember)
<< CI{"SMISMEMBER", CO::READONLY, -3, 1, 1, acl::kSMIsMember}.HFUNC(SMIsMember) << CI{"SMISMEMBER", CO::READONLY, -3, 1, 1, acl::kSMIsMember}.HFUNC(SMIsMember)

View file

@ -3327,6 +3327,7 @@ constexpr uint32_t kXAutoClaim = WRITE | STREAM | FAST;
void StreamFamily::Register(CommandRegistry* registry) { void StreamFamily::Register(CommandRegistry* registry) {
using CI = CommandId; using CI = CommandId;
registry->StartFamily(); registry->StartFamily();
constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS;
*registry << CI{"XADD", CO::WRITE | CO::DENYOOM | CO::FAST, -5, 1, 1, acl::kXAdd}.HFUNC(XAdd) *registry << CI{"XADD", CO::WRITE | CO::DENYOOM | CO::FAST, -5, 1, 1, acl::kXAdd}.HFUNC(XAdd)
<< CI{"XCLAIM", CO::WRITE | CO::FAST, -6, 1, 1, acl::kXClaim}.HFUNC(XClaim) << CI{"XCLAIM", CO::WRITE | CO::FAST, -6, 1, 1, acl::kXClaim}.HFUNC(XClaim)
<< CI{"XDEL", CO::WRITE | CO::FAST, -3, 1, 1, acl::kXDel}.HFUNC(XDel) << CI{"XDEL", CO::WRITE | CO::FAST, -3, 1, 1, acl::kXDel}.HFUNC(XDel)
@ -3336,12 +3337,8 @@ void StreamFamily::Register(CommandRegistry* registry) {
<< CI{"XPENDING", CO::READONLY, -2, 1, 1, acl::kXPending}.HFUNC(XPending) << CI{"XPENDING", CO::READONLY, -2, 1, 1, acl::kXPending}.HFUNC(XPending)
<< CI{"XRANGE", CO::READONLY, -4, 1, 1, acl::kXRange}.HFUNC(XRange) << CI{"XRANGE", CO::READONLY, -4, 1, 1, acl::kXRange}.HFUNC(XRange)
<< CI{"XREVRANGE", CO::READONLY, -4, 1, 1, acl::kXRevRange}.HFUNC(XRevRange) << CI{"XREVRANGE", CO::READONLY, -4, 1, 1, acl::kXRevRange}.HFUNC(XRevRange)
<< CI{"XREAD", CO::READONLY | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS, -3, 3, 3, << CI{"XREAD", kReadFlags, -3, 3, 3, acl::kXRead}.HFUNC(XRead)
acl::kXRead} << CI{"XREADGROUP", kReadFlags, -6, 6, 6, acl::kXReadGroup}.HFUNC(XReadGroup)
.HFUNC(XRead)
<< CI{"XREADGROUP", CO::READONLY | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS, -6, 6, 6,
acl::kXReadGroup}
.HFUNC(XReadGroup)
<< CI{"XSETID", CO::WRITE, 3, 1, 1, acl::kXSetId}.HFUNC(XSetId) << CI{"XSETID", CO::WRITE, 3, 1, 1, acl::kXSetId}.HFUNC(XSetId)
<< CI{"XTRIM", CO::WRITE | CO::FAST, -4, 1, 1, acl::kXTrim}.HFUNC(XTrim) << CI{"XTRIM", CO::WRITE | CO::FAST, -4, 1, 1, acl::kXTrim}.HFUNC(XTrim)
<< CI{"_XGROUP_HELP", CO::NOSCRIPT | CO::HIDDEN, 2, 0, 0, acl::kXGroupHelp}.SetHandler( << CI{"_XGROUP_HELP", CO::NOSCRIPT | CO::HIDDEN, 2, 0, 0, acl::kXGroupHelp}.SetHandler(

View file

@ -173,13 +173,13 @@ void Transaction::InitGlobal() {
EnableAllShards(); EnableAllShards();
} }
void Transaction::BuildShardIndex(const KeyIndex& key_index, bool rev_mapping, void Transaction::BuildShardIndex(const KeyIndex& key_index, std::vector<PerShardCache>* out) {
std::vector<PerShardCache>* out) {
auto args = full_args_; auto args = full_args_;
auto& shard_index = *out; auto& shard_index = *out;
auto add = [this, rev_mapping, &shard_index](uint32_t sid, uint32_t i) { auto add = [this, rev_mapping = key_index.has_reverse_mapping, &shard_index](uint32_t sid,
uint32_t i) {
string_view val = ArgS(full_args_, i); string_view val = ArgS(full_args_, i);
shard_index[sid].args.push_back(val); shard_index[sid].args.push_back(val);
if (rev_mapping) if (rev_mapping)
@ -266,7 +266,7 @@ void Transaction::LaunderKeyStorage(CmdArgVec* keys) {
keys->emplace_back(key.data(), key.size()); keys->emplace_back(key.data(), key.size());
} }
void Transaction::StoreKeysInArgs(KeyIndex key_index, bool rev_mapping) { void Transaction::StoreKeysInArgs(const KeyIndex& key_index) {
DCHECK(!key_index.bonus); DCHECK(!key_index.bonus);
DCHECK(key_index.step == 1u || key_index.step == 2u); DCHECK(key_index.step == 1u || key_index.step == 2u);
@ -277,7 +277,7 @@ void Transaction::StoreKeysInArgs(KeyIndex key_index, bool rev_mapping) {
kv_args_.push_back(ArgS(full_args_, ++j)); kv_args_.push_back(ArgS(full_args_, ++j));
} }
if (rev_mapping) { if (key_index.has_reverse_mapping) {
reverse_index_.resize(kv_args_.size()); reverse_index_.resize(kv_args_.size());
for (unsigned j = 0; j < reverse_index_.size(); ++j) { for (unsigned j = 0; j < reverse_index_.size(); ++j) {
reverse_index_[j] = j + key_index.start; reverse_index_[j] = j + key_index.start;
@ -293,8 +293,6 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
DCHECK_LT(key_index.start, full_args_.size()); DCHECK_LT(key_index.start, full_args_.size());
bool needs_reverse_mapping = cid_->opt_mask() & CO::REVERSE_MAPPING;
// Stub transactions always operate only on single shard. // Stub transactions always operate only on single shard.
bool is_stub = multi_ && multi_->role == SQUASHED_STUB; bool is_stub = multi_ && multi_->role == SQUASHED_STUB;
@ -302,7 +300,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
DCHECK(!IsActiveMulti() || multi_->mode == NON_ATOMIC); DCHECK(!IsActiveMulti() || multi_->mode == NON_ATOMIC);
// We don't have to split the arguments by shards, so we can copy them directly. // We don't have to split the arguments by shards, so we can copy them directly.
StoreKeysInArgs(key_index, needs_reverse_mapping); StoreKeysInArgs(key_index);
unique_shard_cnt_ = 1; unique_shard_cnt_ = 1;
if (is_stub) // stub transactions don't migrate if (is_stub) // stub transactions don't migrate
@ -328,10 +326,10 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
auto& shard_index = tmp_space.GetShardIndex(shard_data_.size()); auto& shard_index = tmp_space.GetShardIndex(shard_data_.size());
// Distribute all the arguments by shards. // Distribute all the arguments by shards.
BuildShardIndex(key_index, needs_reverse_mapping, &shard_index); BuildShardIndex(key_index, &shard_index);
// Initialize shard data based on distributed arguments. // Initialize shard data based on distributed arguments.
InitShardData(shard_index, key_index.num_args(), needs_reverse_mapping); InitShardData(shard_index, key_index.num_args(), key_index.has_reverse_mapping);
DCHECK(!multi_ || multi_->mode != LOCK_AHEAD || !multi_->frozen_keys.empty()); DCHECK(!multi_ || multi_->mode != LOCK_AHEAD || !multi_->frozen_keys.empty());
@ -353,7 +351,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
} }
// Validation. Check reverse mapping was built correctly. // Validation. Check reverse mapping was built correctly.
if (needs_reverse_mapping) { if (key_index.has_reverse_mapping) {
for (size_t i = 0; i < kv_args_.size(); ++i) { for (size_t i = 0; i < kv_args_.size(); ++i) {
DCHECK_EQ(kv_args_[i], ArgS(full_args_, reverse_index_[i])) << full_args_; DCHECK_EQ(kv_args_[i], ArgS(full_args_, reverse_index_[i])) << full_args_;
} }
@ -515,7 +513,8 @@ void Transaction::PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdA
EnableShard(sid); EnableShard(sid);
OpResult<KeyIndex> key_index = DetermineKeys(cid_, args); OpResult<KeyIndex> key_index = DetermineKeys(cid_, args);
CHECK(key_index); CHECK(key_index);
StoreKeysInArgs(*key_index, false); DCHECK(!key_index->has_reverse_mapping);
StoreKeysInArgs(*key_index);
} }
// Runs in the dbslice thread. Returns true if the transaction continues running in the thread. // Runs in the dbslice thread. Returns true if the transaction continues running in the thread.
@ -1195,6 +1194,8 @@ ArgSlice Transaction::GetShardArgs(ShardId sid) const {
// from local index back to original arg index skipping the command. // from local index back to original arg index skipping the command.
// i.e. returns (first_key_pos -1) or bigger. // i.e. returns (first_key_pos -1) or bigger.
size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const { size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const {
DCHECK_LT(arg_index, reverse_index_.size());
if (unique_shard_cnt_ == 1) if (unique_shard_cnt_ == 1)
return reverse_index_[arg_index]; return reverse_index_[arg_index];
@ -1495,13 +1496,17 @@ void Transaction::CancelBlocking(std::function<OpStatus(ArgSlice)> status_cb) {
} }
OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) { OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
if (cid->opt_mask() & (CO::GLOBAL_TRANS | CO::NO_KEY_TRANSACTIONAL))
return KeyIndex::Empty();
KeyIndex key_index; KeyIndex key_index;
if (cid->opt_mask() & (CO::GLOBAL_TRANS | CO::NO_KEY_TRANSACTIONAL))
return key_index;
int num_custom_keys = -1; int num_custom_keys = -1;
if (cid->opt_mask() & CO::REVERSE_MAPPING) {
key_index.has_reverse_mapping = true;
}
if (cid->opt_mask() & CO::VARIADIC_KEYS) { if (cid->opt_mask() & CO::VARIADIC_KEYS) {
// ZUNION/INTER <num_keys> <key1> [<key2> ...] // ZUNION/INTER <num_keys> <key1> [<key2> ...]
// EVAL <script> <num_keys> // EVAL <script> <num_keys>

View file

@ -457,14 +457,14 @@ class Transaction {
void EnableAllShards(); void EnableAllShards();
// Build shard index by distributing the arguments by shards based on the key index. // Build shard index by distributing the arguments by shards based on the key index.
void BuildShardIndex(const KeyIndex& keys, bool rev_mapping, std::vector<PerShardCache>* out); void BuildShardIndex(const KeyIndex& keys, std::vector<PerShardCache>* out);
// Init shard data from shard index. // Init shard data from shard index.
void InitShardData(absl::Span<const PerShardCache> shard_index, size_t num_args, void InitShardData(absl::Span<const PerShardCache> shard_index, size_t num_args,
bool rev_mapping); bool rev_mapping);
// Store all key index keys in args_. Used only for single shard initialization. // Store all key index keys in args_. Used only for single shard initialization.
void StoreKeysInArgs(KeyIndex keys, bool rev_mapping); void StoreKeysInArgs(const KeyIndex& key_index);
// Multi transactions unlock asynchronously, so they need to keep a copy of all they keys. // Multi transactions unlock asynchronously, so they need to keep a copy of all they keys.
// "Launder" keys by filtering uniques and replacing pointers with same lifetime as transaction. // "Launder" keys by filtering uniques and replacing pointers with same lifetime as transaction.

View file

@ -3244,6 +3244,7 @@ constexpr uint32_t kGeoRadiusByMember = WRITE | GEO | SLOW;
void ZSetFamily::Register(CommandRegistry* registry) { void ZSetFamily::Register(CommandRegistry* registry) {
constexpr uint32_t kStoreMask = CO::WRITE | CO::VARIADIC_KEYS | CO::REVERSE_MAPPING | CO::DENYOOM; constexpr uint32_t kStoreMask = CO::WRITE | CO::VARIADIC_KEYS | CO::REVERSE_MAPPING | CO::DENYOOM;
registry->StartFamily(); registry->StartFamily();
// TODO: to add support for SCRIPT for BZPOPMIN, BZPOPMAX similarly to BLPOP.
*registry *registry
<< CI{"ZADD", CO::FAST | CO::WRITE | CO::DENYOOM, -4, 1, 1, acl::kZAdd}.HFUNC(ZAdd) << CI{"ZADD", CO::FAST | CO::WRITE | CO::DENYOOM, -4, 1, 1, acl::kZAdd}.HFUNC(ZAdd)
<< CI{"BZPOPMIN", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING | CO::NO_AUTOJOURNAL, -3, 1, -2, << CI{"BZPOPMIN", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING | CO::NO_AUTOJOURNAL, -3, 1, -2,
@ -3258,9 +3259,8 @@ void ZSetFamily::Register(CommandRegistry* registry) {
<< CI{"ZINCRBY", CO::FAST | CO::WRITE, 4, 1, 1, acl::kZIncrBy}.HFUNC(ZIncrBy) << CI{"ZINCRBY", CO::FAST | CO::WRITE, 4, 1, 1, acl::kZIncrBy}.HFUNC(ZIncrBy)
<< CI{"ZINTERSTORE", kStoreMask, -4, 3, 3, acl::kZInterStore}.HFUNC(ZInterStore) << CI{"ZINTERSTORE", kStoreMask, -4, 3, 3, acl::kZInterStore}.HFUNC(ZInterStore)
<< CI{"ZINTER", kStoreMask, -3, 2, 2, acl::kZInter}.HFUNC(ZInter) << CI{"ZINTER", kStoreMask, -3, 2, 2, acl::kZInter}.HFUNC(ZInter)
<< CI{"ZINTERCARD", CO::READONLY | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS, -3, 2, 2, << CI{"ZINTERCARD", CO::READONLY | CO::VARIADIC_KEYS, -3, 2, 2, acl::kZInterCard}.HFUNC(
acl::kZInterCard} ZInterCard)
.HFUNC(ZInterCard)
<< CI{"ZLEXCOUNT", CO::READONLY, 4, 1, 1, acl::kZLexCount}.HFUNC(ZLexCount) << CI{"ZLEXCOUNT", CO::READONLY, 4, 1, 1, acl::kZLexCount}.HFUNC(ZLexCount)
<< CI{"ZPOPMAX", CO::FAST | CO::WRITE, -2, 1, 1, acl::kZPopMax}.HFUNC(ZPopMax) << CI{"ZPOPMAX", CO::FAST | CO::WRITE, -2, 1, 1, acl::kZPopMax}.HFUNC(ZPopMax)
<< CI{"ZPOPMIN", CO::FAST | CO::WRITE, -2, 1, 1, acl::kZPopMin}.HFUNC(ZPopMin) << CI{"ZPOPMIN", CO::FAST | CO::WRITE, -2, 1, 1, acl::kZPopMin}.HFUNC(ZPopMin)