diff --git a/src/server/command_registry.cc b/src/server/command_registry.cc index c38e1a858..9c9e058ad 100644 --- a/src/server/command_registry.cc +++ b/src/server/command_registry.cc @@ -42,7 +42,7 @@ CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first } bool CommandId::IsTransactional() const { - if (first_key_ > 0 || (opt_mask_ & CO::GLOBAL_TRANS) || (opt_mask_ & CO::NO_KEY_JOURNAL)) + if (first_key_ > 0 || (opt_mask_ & CO::GLOBAL_TRANS) || (opt_mask_ & CO::NO_KEY_TRANSACTIONAL)) return true; if (name_ == "EVAL" || name_ == "EVALSHA" || name_ == "EXEC") @@ -154,8 +154,10 @@ const char* OptName(CO::CommandOpt fl) { return "variadic-keys"; case NO_AUTOJOURNAL: return "custom-journal"; - case NO_KEY_JOURNAL: - return "no-key-journal"; + case NO_KEY_TRANSACTIONAL: + return "no-key-transactional"; + case NO_KEY_TX_SPAN_ALL: + return "no-key-tx-span-all"; } return "unknown"; } diff --git a/src/server/command_registry.h b/src/server/command_registry.h index c388fbf97..097ea9e2c 100644 --- a/src/server/command_registry.h +++ b/src/server/command_registry.h @@ -33,10 +33,15 @@ enum CommandOpt : uint32_t { NOSCRIPT = 1U << 8, BLOCKING = 1U << 9, // implies REVERSE_MAPPING HIDDEN = 1U << 10, // does not show in COMMAND command output + GLOBAL_TRANS = 1U << 12, NO_AUTOJOURNAL = 1U << 15, // Skip automatically logging command to journal inside transaction. - NO_KEY_JOURNAL = 1U << 16, // Command with no keys that need to be journaled + + // Allows commands without keys to respect transaction ordering and enables journaling by default + NO_KEY_TRANSACTIONAL = 1U << 16, + NO_KEY_TX_SPAN_ALL = + 1U << 17, // If set, all shards are active for the no-key-transactional command }; const char* OptName(CommandOpt fl); diff --git a/src/server/script_mgr.cc b/src/server/script_mgr.cc index 1e2427fa7..fc57f1ea1 100644 --- a/src/server/script_mgr.cc +++ b/src/server/script_mgr.cc @@ -132,9 +132,8 @@ void ScriptMgr::LoadCmd(CmdArgList args, ConnectionContext* cntx) { return (*cntx)->SendError(res.error().Format()); // Schedule empty callback inorder to journal command via transaction framework. - auto cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; + cntx->transaction->ScheduleSingleHop([](auto* t, auto* shard) { return OpStatus::OK; }); - cntx->transaction->ScheduleSingleHop(std::move(cb)); return (*cntx)->SendBulkString(res.value()); } @@ -150,6 +149,9 @@ void ScriptMgr::ConfigCmd(CmdArgList args, ConnectionContext* cntx) { UpdateScriptCaches(key, data); + // Schedule empty callback inorder to journal command via transaction framework. + cntx->transaction->ScheduleSingleHop([](auto* t, auto* shard) { return OpStatus::OK; }); + return (*cntx)->SendOk(); } diff --git a/src/server/search/search_family.cc b/src/server/search/search_family.cc index e63e32ac1..09475e45d 100644 --- a/src/server/search/search_family.cc +++ b/src/server/search/search_family.cc @@ -345,6 +345,7 @@ void SearchFamily::FtInfo(CmdArgList args, ConnectionContext* cntx) { atomic_uint num_notfound{0}; vector infos(shard_set->size()); + cntx->transaction->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { auto* index = es->search_indices()->GetIndex(idx_name); if (index == nullptr) @@ -452,6 +453,7 @@ void SearchFamily::FtProfile(CmdArgList args, ConnectionContext* cntx) { atomic_uint total_serialized = 0; vector> results(shard_set->size()); + cntx->transaction->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { auto* index = es->search_indices()->GetIndex(index_name); if (!index) @@ -520,14 +522,19 @@ void SearchFamily::FtProfile(CmdArgList args, ConnectionContext* cntx) { void SearchFamily::Register(CommandRegistry* registry) { using CI = CommandId; + + // Disable journaling, because no-key-transactional enables it by default + const uint32_t kReadOnlyMask = + CO::NO_KEY_TRANSACTIONAL | CO::NO_KEY_TX_SPAN_ALL | CO::NO_AUTOJOURNAL; + registry->StartFamily(); *registry << CI{"FT.CREATE", CO::GLOBAL_TRANS, -2, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtCreate) << CI{"FT.DROPINDEX", CO::GLOBAL_TRANS, -2, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtDropIndex) - << CI{"FT.INFO", CO::GLOBAL_TRANS, 2, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtInfo) + << CI{"FT.INFO", kReadOnlyMask, 2, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtInfo) // Underscore same as in RediSearch because it's "temporary" (long time already) - << CI{"FT._LIST", CO::GLOBAL_TRANS, 1, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtList) - << CI{"FT.SEARCH", CO::GLOBAL_TRANS, -3, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtSearch) - << CI{"FT.PROFILE", CO::GLOBAL_TRANS, -4, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtProfile); + << CI{"FT._LIST", kReadOnlyMask, 1, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtList) + << CI{"FT.SEARCH", kReadOnlyMask, -3, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtSearch) + << CI{"FT.PROFILE", kReadOnlyMask, -4, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtProfile); } } // namespace dfly diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 435afa168..6e630e7dc 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -2023,7 +2023,8 @@ void ServerFamily::Register(CommandRegistry* registry) { << CI{"REPLCONF", CO::ADMIN | CO::LOADING, -1, 0, 0, 0, acl::kReplConf}.HFUNC(ReplConf) << CI{"ROLE", CO::LOADING | CO::FAST | CO::NOSCRIPT, 1, 0, 0, 0, acl::kRole}.HFUNC(Role) << CI{"SLOWLOG", CO::ADMIN | CO::FAST, -2, 0, 0, 0, acl::kSlowLog}.SetHandler(SlowLog) - << CI{"SCRIPT", CO::NOSCRIPT | CO::NO_KEY_JOURNAL, -2, 0, 0, 0, acl::kScript}.HFUNC(Script) + << CI{"SCRIPT", CO::NOSCRIPT | CO::NO_KEY_TRANSACTIONAL, -2, 0, 0, 0, acl::kScript}.HFUNC( + Script) << CI{"DFLY", CO::ADMIN | CO::GLOBAL_TRANS | CO::HIDDEN, -2, 0, 0, 0, acl::kDfly}.HFUNC(Dfly); } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 73eeee18a..c5fa4a874 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -76,18 +76,7 @@ void Transaction::InitGlobal() { DCHECK(!multi_ || (multi_->mode == GLOBAL || multi_->mode == NON_ATOMIC)); global_ = true; - unique_shard_cnt_ = shard_set->size(); - shard_data_.resize(unique_shard_cnt_); - for (auto& sd : shard_data_) - sd.local_mask = ACTIVE; -} - -void Transaction::InitNoKey() { - // No key command will use the first shard. - unique_shard_cnt_ = 1; - unique_shard_id_ = 0; - shard_data_.resize(1); - shard_data_.front().local_mask |= ACTIVE; + EnableAllShards(); } void Transaction::BuildShardIndex(KeyIndex key_index, bool rev_mapping, @@ -315,8 +304,11 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) { return OpStatus::OK; } - if ((cid_->opt_mask() & CO::NO_KEY_JOURNAL) > 0) { - InitNoKey(); + if ((cid_->opt_mask() & CO::NO_KEY_TRANSACTIONAL) > 0) { + if ((cid_->opt_mask() & CO::NO_KEY_TX_SPAN_ALL) > 0) + EnableAllShards(); + else + EnableShard(0); return OpStatus::OK; } @@ -894,6 +886,21 @@ void Transaction::Conclude() { Execute(std::move(cb), true); } +void Transaction::EnableShard(ShardId sid) { + unique_shard_cnt_ = 1; + unique_shard_id_ = sid; + shard_data_.resize(1); + shard_data_.front().local_mask |= ACTIVE; +} + +void Transaction::EnableAllShards() { + unique_shard_cnt_ = shard_set->size(); + unique_shard_id_ = kInvalidSid; + shard_data_.resize(shard_set->size()); + for (auto& sd : shard_data_) + sd.local_mask |= ACTIVE; +} + void Transaction::RunQuickie(EngineShard* shard) { DCHECK(!IsAtomicMulti()); DCHECK(shard_data_.size() == 1u || multi_->mode == NON_ATOMIC); @@ -956,7 +963,7 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const { res.db_index = db_index_; res.key_step = cid_->key_arg_step(); res.args = GetShardArgs(sid); - DCHECK(!res.args.empty() || (cid_->opt_mask() & CO::NO_KEY_JOURNAL)); + DCHECK(!res.args.empty() || (cid_->opt_mask() & CO::NO_KEY_TRANSACTIONAL)); return res; } @@ -1326,18 +1333,13 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) { if (multi_ && multi_->role == SQUASHER) return; - bool journal_by_cmd_mask = true; - if ((cid_->opt_mask() & CO::NO_KEY_JOURNAL) > 0) { - journal_by_cmd_mask = true; // Enforce journaling for commands that dont change the db. - } else if ((cid_->opt_mask() & CO::WRITE) == 0) { - journal_by_cmd_mask = false; // Non-write command are not journaled. - } else if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) > 0 && - !renabled_auto_journal_.load(memory_order_relaxed)) { - journal_by_cmd_mask = false; // Command disabled auto journal. - } - if (!journal_by_cmd_mask) { + // Only write commands and/or no-key-transactional commands are logged + if ((cid_->opt_mask() & CO::WRITE) == 0 && (cid_->opt_mask() & CO::NO_KEY_TRANSACTIONAL) == 0) + return; + + // If autojournaling was disabled and not re-enabled, skip it + if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) && !renabled_auto_journal_.load(memory_order_relaxed)) return; - } auto journal = shard->journal(); if (journal == nullptr) diff --git a/src/server/transaction.h b/src/server/transaction.h index 05d4fec79..67e222fbe 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -399,12 +399,12 @@ class Transaction { // Init as a global transaction. void InitGlobal(); - // Init when command has no keys and it need to use transaction framework - void InitNoKey(); - // Init with a set of keys. void InitByKeys(KeyIndex keys); + void EnableShard(ShardId sid); + void EnableAllShards(); + // Build shard index by distributing the arguments by shards based on the key index. void BuildShardIndex(KeyIndex keys, bool rev_mapping, std::vector* out);