feat: Span-all no-key transactional commands (#1864)

* feat: Span-all no-key transactional commands

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2023-09-19 11:10:56 +03:00 committed by GitHub
parent 890761989c
commit 769f5a19cd
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 59 additions and 40 deletions

View file

@ -42,7 +42,7 @@ CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first
}
bool CommandId::IsTransactional() const {
if (first_key_ > 0 || (opt_mask_ & CO::GLOBAL_TRANS) || (opt_mask_ & CO::NO_KEY_JOURNAL))
if (first_key_ > 0 || (opt_mask_ & CO::GLOBAL_TRANS) || (opt_mask_ & CO::NO_KEY_TRANSACTIONAL))
return true;
if (name_ == "EVAL" || name_ == "EVALSHA" || name_ == "EXEC")
@ -154,8 +154,10 @@ const char* OptName(CO::CommandOpt fl) {
return "variadic-keys";
case NO_AUTOJOURNAL:
return "custom-journal";
case NO_KEY_JOURNAL:
return "no-key-journal";
case NO_KEY_TRANSACTIONAL:
return "no-key-transactional";
case NO_KEY_TX_SPAN_ALL:
return "no-key-tx-span-all";
}
return "unknown";
}

View file

@ -33,10 +33,15 @@ enum CommandOpt : uint32_t {
NOSCRIPT = 1U << 8,
BLOCKING = 1U << 9, // implies REVERSE_MAPPING
HIDDEN = 1U << 10, // does not show in COMMAND command output
GLOBAL_TRANS = 1U << 12,
NO_AUTOJOURNAL = 1U << 15, // Skip automatically logging command to journal inside transaction.
NO_KEY_JOURNAL = 1U << 16, // Command with no keys that need to be journaled
// Allows commands without keys to respect transaction ordering and enables journaling by default
NO_KEY_TRANSACTIONAL = 1U << 16,
NO_KEY_TX_SPAN_ALL =
1U << 17, // If set, all shards are active for the no-key-transactional command
};
const char* OptName(CommandOpt fl);

View file

@ -132,9 +132,8 @@ void ScriptMgr::LoadCmd(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendError(res.error().Format());
// Schedule empty callback inorder to journal command via transaction framework.
auto cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
cntx->transaction->ScheduleSingleHop([](auto* t, auto* shard) { return OpStatus::OK; });
cntx->transaction->ScheduleSingleHop(std::move(cb));
return (*cntx)->SendBulkString(res.value());
}
@ -150,6 +149,9 @@ void ScriptMgr::ConfigCmd(CmdArgList args, ConnectionContext* cntx) {
UpdateScriptCaches(key, data);
// Schedule empty callback inorder to journal command via transaction framework.
cntx->transaction->ScheduleSingleHop([](auto* t, auto* shard) { return OpStatus::OK; });
return (*cntx)->SendOk();
}

View file

@ -345,6 +345,7 @@ void SearchFamily::FtInfo(CmdArgList args, ConnectionContext* cntx) {
atomic_uint num_notfound{0};
vector<DocIndexInfo> infos(shard_set->size());
cntx->transaction->ScheduleSingleHop([&](Transaction* t, EngineShard* es) {
auto* index = es->search_indices()->GetIndex(idx_name);
if (index == nullptr)
@ -452,6 +453,7 @@ void SearchFamily::FtProfile(CmdArgList args, ConnectionContext* cntx) {
atomic_uint total_serialized = 0;
vector<pair<search::AlgorithmProfile, absl::Duration>> results(shard_set->size());
cntx->transaction->ScheduleSingleHop([&](Transaction* t, EngineShard* es) {
auto* index = es->search_indices()->GetIndex(index_name);
if (!index)
@ -520,14 +522,19 @@ void SearchFamily::FtProfile(CmdArgList args, ConnectionContext* cntx) {
void SearchFamily::Register(CommandRegistry* registry) {
using CI = CommandId;
// Disable journaling, because no-key-transactional enables it by default
const uint32_t kReadOnlyMask =
CO::NO_KEY_TRANSACTIONAL | CO::NO_KEY_TX_SPAN_ALL | CO::NO_AUTOJOURNAL;
registry->StartFamily();
*registry << CI{"FT.CREATE", CO::GLOBAL_TRANS, -2, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtCreate)
<< CI{"FT.DROPINDEX", CO::GLOBAL_TRANS, -2, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtDropIndex)
<< CI{"FT.INFO", CO::GLOBAL_TRANS, 2, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtInfo)
<< CI{"FT.INFO", kReadOnlyMask, 2, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtInfo)
// Underscore same as in RediSearch because it's "temporary" (long time already)
<< CI{"FT._LIST", CO::GLOBAL_TRANS, 1, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtList)
<< CI{"FT.SEARCH", CO::GLOBAL_TRANS, -3, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtSearch)
<< CI{"FT.PROFILE", CO::GLOBAL_TRANS, -4, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtProfile);
<< CI{"FT._LIST", kReadOnlyMask, 1, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtList)
<< CI{"FT.SEARCH", kReadOnlyMask, -3, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtSearch)
<< CI{"FT.PROFILE", kReadOnlyMask, -4, 0, 0, 0, acl::FT_SEARCH}.HFUNC(FtProfile);
}
} // namespace dfly

View file

@ -2023,7 +2023,8 @@ void ServerFamily::Register(CommandRegistry* registry) {
<< CI{"REPLCONF", CO::ADMIN | CO::LOADING, -1, 0, 0, 0, acl::kReplConf}.HFUNC(ReplConf)
<< CI{"ROLE", CO::LOADING | CO::FAST | CO::NOSCRIPT, 1, 0, 0, 0, acl::kRole}.HFUNC(Role)
<< CI{"SLOWLOG", CO::ADMIN | CO::FAST, -2, 0, 0, 0, acl::kSlowLog}.SetHandler(SlowLog)
<< CI{"SCRIPT", CO::NOSCRIPT | CO::NO_KEY_JOURNAL, -2, 0, 0, 0, acl::kScript}.HFUNC(Script)
<< CI{"SCRIPT", CO::NOSCRIPT | CO::NO_KEY_TRANSACTIONAL, -2, 0, 0, 0, acl::kScript}.HFUNC(
Script)
<< CI{"DFLY", CO::ADMIN | CO::GLOBAL_TRANS | CO::HIDDEN, -2, 0, 0, 0, acl::kDfly}.HFUNC(Dfly);
}

View file

@ -76,18 +76,7 @@ void Transaction::InitGlobal() {
DCHECK(!multi_ || (multi_->mode == GLOBAL || multi_->mode == NON_ATOMIC));
global_ = true;
unique_shard_cnt_ = shard_set->size();
shard_data_.resize(unique_shard_cnt_);
for (auto& sd : shard_data_)
sd.local_mask = ACTIVE;
}
void Transaction::InitNoKey() {
// No key command will use the first shard.
unique_shard_cnt_ = 1;
unique_shard_id_ = 0;
shard_data_.resize(1);
shard_data_.front().local_mask |= ACTIVE;
EnableAllShards();
}
void Transaction::BuildShardIndex(KeyIndex key_index, bool rev_mapping,
@ -315,8 +304,11 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) {
return OpStatus::OK;
}
if ((cid_->opt_mask() & CO::NO_KEY_JOURNAL) > 0) {
InitNoKey();
if ((cid_->opt_mask() & CO::NO_KEY_TRANSACTIONAL) > 0) {
if ((cid_->opt_mask() & CO::NO_KEY_TX_SPAN_ALL) > 0)
EnableAllShards();
else
EnableShard(0);
return OpStatus::OK;
}
@ -894,6 +886,21 @@ void Transaction::Conclude() {
Execute(std::move(cb), true);
}
void Transaction::EnableShard(ShardId sid) {
unique_shard_cnt_ = 1;
unique_shard_id_ = sid;
shard_data_.resize(1);
shard_data_.front().local_mask |= ACTIVE;
}
void Transaction::EnableAllShards() {
unique_shard_cnt_ = shard_set->size();
unique_shard_id_ = kInvalidSid;
shard_data_.resize(shard_set->size());
for (auto& sd : shard_data_)
sd.local_mask |= ACTIVE;
}
void Transaction::RunQuickie(EngineShard* shard) {
DCHECK(!IsAtomicMulti());
DCHECK(shard_data_.size() == 1u || multi_->mode == NON_ATOMIC);
@ -956,7 +963,7 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const {
res.db_index = db_index_;
res.key_step = cid_->key_arg_step();
res.args = GetShardArgs(sid);
DCHECK(!res.args.empty() || (cid_->opt_mask() & CO::NO_KEY_JOURNAL));
DCHECK(!res.args.empty() || (cid_->opt_mask() & CO::NO_KEY_TRANSACTIONAL));
return res;
}
@ -1326,18 +1333,13 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) {
if (multi_ && multi_->role == SQUASHER)
return;
bool journal_by_cmd_mask = true;
if ((cid_->opt_mask() & CO::NO_KEY_JOURNAL) > 0) {
journal_by_cmd_mask = true; // Enforce journaling for commands that dont change the db.
} else if ((cid_->opt_mask() & CO::WRITE) == 0) {
journal_by_cmd_mask = false; // Non-write command are not journaled.
} else if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) > 0 &&
!renabled_auto_journal_.load(memory_order_relaxed)) {
journal_by_cmd_mask = false; // Command disabled auto journal.
}
if (!journal_by_cmd_mask) {
// Only write commands and/or no-key-transactional commands are logged
if ((cid_->opt_mask() & CO::WRITE) == 0 && (cid_->opt_mask() & CO::NO_KEY_TRANSACTIONAL) == 0)
return;
// If autojournaling was disabled and not re-enabled, skip it
if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) && !renabled_auto_journal_.load(memory_order_relaxed))
return;
}
auto journal = shard->journal();
if (journal == nullptr)

View file

@ -399,12 +399,12 @@ class Transaction {
// Init as a global transaction.
void InitGlobal();
// Init when command has no keys and it need to use transaction framework
void InitNoKey();
// Init with a set of keys.
void InitByKeys(KeyIndex keys);
void EnableShard(ShardId sid);
void EnableAllShards();
// Build shard index by distributing the arguments by shards based on the key index.
void BuildShardIndex(KeyIndex keys, bool rev_mapping, std::vector<PerShardCache>* out);