Basic multi modes for MULTI/EXEC (#796)

feat(server): Basic multi transaction modes

This commit adds the notion of multi transaction modes that allow controlling the execution and
locking behaviour of multi transactions.
In general, there are four modes:
- GLOBAL: all commands run within a global transaction. There is no need for recording locks. Lua scripts can theoretically run with undeclared keys.
- LOCK_AHEAD: the transaction locks all keys ahead likewise to a regular transaction and schedules itself.
- LOCK_INCREMENTAL: the transaction determines what shards it has keys in and schedules itself on those shards, but locks only when accessing a new key. This allows other transactions to run ooo alonside with a big multi-transaction that accesses a contended key only at its very end.
- NON_ATOMIC: all commands run separately, no atomicity is provided, likewise to a pipeline

This commit only adds support for the first 3 modes to EXEC commands.

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2023-02-18 20:18:28 +03:00 committed by GitHub
parent 4b3dc87ba0
commit 4ef06e759a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 292 additions and 88 deletions

View file

@ -79,6 +79,8 @@ jobs:
#GLOG_logtostderr=1 GLOG_vmodule=transaction=1,engine_shard_set=1
GLOG_logtostderr=1 GLOG_vmodule=rdb_load=1,rdb_save=1,snapshot=1 ctest -V -L DFLY
./dragonfly_test --gtest_repeat=10
./multi_test --multi_exec_mode=2 --gtest_repeat=10
./multi_test --multi_exec_mode=3 --gtest_repeat=10
# GLOG_logtostderr=1 GLOG_vmodule=transaction=1,engine_shard_set=1 CTEST_OUTPUT_ON_FAILURE=1 ninja server/test
lint-test-chart:
runs-on: ubuntu-latest

View file

@ -58,13 +58,22 @@ struct KeyLockArgs {
// Describes key indices.
struct KeyIndex {
// if index is non-zero then adds another key index (usually 1).
// relevant for for commands like ZUNIONSTORE/ZINTERSTORE for destination key.
unsigned bonus = 0;
unsigned start;
unsigned end; // does not include this index (open limit).
unsigned step; // 1 for commands like mget. 2 for commands like mset.
// if index is non-zero then adds another key index (usually 1).
// relevant for for commands like ZUNIONSTORE/ZINTERSTORE for destination key.
unsigned bonus = 0;
static KeyIndex Empty() {
return KeyIndex{0, 0, 0, 0};
}
static KeyIndex Range(unsigned start, unsigned end, unsigned step = 1) {
return KeyIndex{start, end, step, 0};
}
bool HasSingleKey() const {
return bonus == 0 && (start + step >= end);
}

View file

@ -674,7 +674,6 @@ size_t DbSlice::DbSize(DbIndex db_ind) const {
}
bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
DCHECK(!lock_args.args.empty());
DCHECK_GT(lock_args.key_step, 0u);
auto& lt = db_arr_[lock_args.db_index]->trans_locks;
@ -701,8 +700,6 @@ bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
}
void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
DCHECK(!lock_args.args.empty());
DVLOG(2) << "Release " << IntentLock::ModeName(mode) << " for " << lock_args.args[0];
if (lock_args.args.size() == 1) {
Release(mode, lock_args.db_index, lock_args.args.front(), 1);

View file

@ -43,6 +43,10 @@ using namespace std;
ABSL_FLAG(uint32_t, port, 6379, "Redis port");
ABSL_FLAG(uint32_t, memcache_port, 0, "Memcached port");
ABSL_FLAG(int, multi_exec_mode, 1,
"Set multi exec atomicity mode: 1 for global, 2 for locking ahead, 3 for locking "
"incrementally");
namespace dfly {
#if __GLIBC__ == 2 && __GLIBC_MINOR__ < 30
@ -399,7 +403,7 @@ bool IsTransactional(const CommandId* cid) {
string_view name{cid->name()};
if (name == "EVAL" || name == "EVALSHA")
if (name == "EVAL" || name == "EVALSHA" || name == "EXEC")
return true;
return false;
@ -664,7 +668,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
}
}
dfly_cntx->transaction->SetExecCmd(cid);
dfly_cntx->transaction->MultiSwitchCmd(cid);
OpStatus st = dfly_cntx->transaction->InitByArgs(dfly_cntx->conn_state.db_index, args);
if (st != OpStatus::OK) {
@ -676,9 +680,12 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
if (IsTransactional(cid)) {
dist_trans.reset(new Transaction{cid});
OpStatus st = dist_trans->InitByArgs(dfly_cntx->conn_state.db_index, args);
if (st != OpStatus::OK)
return (*cntx)->SendError(st);
if (!dist_trans->IsMulti()) { // Multi command initialize themself based on their mode.
if (auto st = dist_trans->InitByArgs(dfly_cntx->conn_state.db_index, args);
st != OpStatus::OK)
return (*cntx)->SendError(st);
}
dfly_cntx->transaction = dist_trans.get();
dfly_cntx->last_command_debug.shards_count = dfly_cntx->transaction->GetUniqueShardCnt();
@ -1036,7 +1043,7 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter,
DCHECK(cntx->transaction);
if (!eval_args.keys.empty())
cntx->transaction->Schedule();
cntx->transaction->StartMultiLockedAhead(cntx->db_index(), eval_args.keys);
interpreter->SetGlobalArray("KEYS", eval_args.keys);
interpreter->SetGlobalArray("ARGV", eval_args.args);
@ -1099,9 +1106,7 @@ bool CheckWatchedKeyExpiry(ConnectionContext* cntx, const CommandRegistry& regis
return OpStatus::OK;
};
VLOG(1) << "Checking expired watch keys";
cntx->transaction->SetExecCmd(registry.Find(EXISTS));
cntx->transaction->MultiSwitchCmd(registry.Find(EXISTS));
cntx->transaction->InitByArgs(cntx->conn_state.db_index,
CmdArgList{str_list.data(), str_list.size()});
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
@ -1123,6 +1128,80 @@ bool IsWatchingOtherDbs(DbIndex db_indx, const ConnectionState::ExecInfo& exec_i
return false;
}
template <typename F> void IterateAllKeys(ConnectionState::ExecInfo* exec_info, F&& f) {
for (auto& [dbid, key] : exec_info->watched_keys)
f(MutableSlice{key.data(), key.size()});
for (const auto& scmd : exec_info->body) {
if (!IsTransactional(scmd.descr))
continue;
auto args = scmd.ArgList();
auto key_res = DetermineKeys(scmd.descr, args);
if (!key_res.ok())
continue;
auto key_index = key_res.value();
for (unsigned i = key_index.start; i < key_index.end; i += key_index.step)
f(args[i]);
if (key_index.bonus)
f(args[key_index.bonus]);
}
}
CmdArgVec CollectAllKeys(ConnectionState::ExecInfo* exec_info) {
CmdArgVec out;
out.reserve(exec_info->watched_keys.size() + exec_info->body.size());
IterateAllKeys(exec_info, [&out](MutableSlice key) { out.push_back(key); });
return out;
}
vector<bool> DetermineKeyShards(ConnectionState::ExecInfo* exec_info) {
vector<bool> out(shard_set->size());
IterateAllKeys(exec_info, [&out](MutableSlice key) {
ShardId sid = Shard(facade::ToSV(key), shard_set->size());
out[sid] = true;
});
return out;
}
// Return true if transaction was scheduled, false if scheduling was not required.
bool StartMultiExec(DbIndex dbid, Transaction* trans, ConnectionState::ExecInfo* exec_info,
CmdArgVec* tmp_keys) {
bool global = false;
bool transactional = false;
for (const auto& scmd : exec_info->body) {
transactional |= IsTransactional(scmd.descr);
global |= scmd.descr->opt_mask() & CO::GLOBAL_TRANS;
if (global)
break;
}
if (!transactional && exec_info->watched_keys.empty())
return false;
int multi_mode = absl::GetFlag(FLAGS_multi_exec_mode);
CHECK(multi_mode >= 1 && multi_mode <= 3);
if (global || multi_mode == Transaction::GLOBAL) {
trans->StartMultiGlobal(dbid);
} else if (multi_mode == Transaction::LOCK_AHEAD) {
*tmp_keys = CollectAllKeys(exec_info);
trans->StartMultiLockedAhead(dbid, CmdArgList{*tmp_keys});
} else {
vector<bool> shards = DetermineKeyShards(exec_info);
DCHECK(std::any_of(shards.begin(), shards.end(), [](bool s) { return s; }));
trans->StartMultiLockedIncr(dbid, shards);
}
return true;
}
void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = (*cntx).operator->();
@ -1145,6 +1224,9 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
return rb->SendNull();
}
CmdArgVec tmp_keys;
bool scheduled = StartMultiExec(cntx->db_index(), cntx->transaction, &exec_info, &tmp_keys);
// EXEC should not run if any of the watched keys expired.
if (!exec_info.watched_keys.empty() && !CheckWatchedKeyExpiry(cntx, registry_)) {
cntx->transaction->UnlockMulti();
@ -1158,7 +1240,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
CmdArgVec str_list;
for (auto& scmd : exec_info.body) {
cntx->transaction->SetExecCmd(scmd.descr);
cntx->transaction->MultiSwitchCmd(scmd.descr);
if (IsTransactional(scmd.descr)) {
OpStatus st = cntx->transaction->InitByArgs(cntx->conn_state.db_index, scmd.ArgList());
if (st != OpStatus::OK) {
@ -1170,7 +1252,9 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
if (rb->GetError()) // checks for i/o error, not logical error.
break;
}
}
if (scheduled) {
VLOG(1) << "Exec unlocking " << exec_info.body.size() << " commands";
cntx->transaction->UnlockMulti();
}
@ -1433,8 +1517,6 @@ using ServiceFunc = void (Service::*)(CmdArgList, ConnectionContext* cntx);
void Service::RegisterCommands() {
using CI = CommandId;
constexpr auto kExecMask = CO::LOADING | CO::NOSCRIPT | CO::GLOBAL_TRANS;
registry_
<< CI{"QUIT", CO::READONLY | CO::FAST, 1, 0, 0, 0}.HFUNC(Quit)
<< CI{"MULTI", CO::NOSCRIPT | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(Multi)
@ -1445,7 +1527,7 @@ void Service::RegisterCommands() {
&EvalValidator)
<< CI{"EVALSHA", CO::NOSCRIPT | CO::VARIADIC_KEYS, -3, 3, 3, 1}.MFUNC(EvalSha).SetValidator(
&EvalValidator)
<< CI{"EXEC", kExecMask, 1, 0, 0, 0}.MFUNC(Exec)
<< CI{"EXEC", CO::LOADING | CO::NOSCRIPT, 1, 0, 0, 1}.MFUNC(Exec)
<< CI{"PUBLISH", CO::LOADING | CO::FAST, 3, 0, 0, 0}.MFUNC(Publish)
<< CI{"SUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.MFUNC(Subscribe)
<< CI{"UNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -1, 0, 0, 0}.MFUNC(Unsubscribe)

View file

@ -12,6 +12,9 @@
#include "server/conn_context.h"
#include "server/main_service.h"
#include "server/test_utils.h"
#include "server/transaction.h"
ABSL_DECLARE_FLAG(int, multi_exec_mode);
namespace dfly {
@ -352,7 +355,7 @@ TEST_F(MultiTest, Eval) {
// a,b important here to spawn multiple shards.
resp = Run({"eval", "return redis.call('exists', KEYS[2])", "2", "a", "b"});
EXPECT_EQ(2, GetDebugInfo().shards_count);
// EXPECT_EQ(2, GetDebugInfo().shards_count);
EXPECT_THAT(resp, IntArg(0));
resp = Run({"eval", "return redis.call('hmset', KEYS[1], 'f1', '2222')", "1", "hmap"});
@ -472,10 +475,11 @@ TEST_F(MultiTest, MultiOOO) {
fb0.Join();
auto metrics = GetMetrics();
// TODO: This is a performance bug that causes substantial latency penatly when
// running multi-transactions or lua scripts.
// We should be able to allow OOO multi-transactions.
EXPECT_EQ(0, metrics.ooo_tx_transaction_cnt);
// OOO works in LOCK_AHEAD mode.
if (absl::GetFlag(FLAGS_multi_exec_mode) == Transaction::LOCK_AHEAD)
EXPECT_EQ(200, metrics.ooo_tx_transaction_cnt);
else
EXPECT_EQ(0, metrics.ooo_tx_transaction_cnt);
}
// Lua scripts lock their keys ahead and thus can run out of order.
@ -540,4 +544,33 @@ TEST_F(MultiTest, MultiContendedPermutatedKeys) {
f2.Join();
}
TEST_F(MultiTest, MultiCauseUnblocking) {
const int kRounds = 5;
vector<string> keys = {kKeySid0, kKeySid1, kKeySid2};
auto push = [this, keys, kRounds]() mutable {
int i = 0;
do {
Run({"multi"});
for (auto k : keys)
Run({"lpush", k, "v"});
Run({"exec"});
} while (next_permutation(keys.begin(), keys.end()) || i++ < kRounds);
};
auto pop = [this, keys, kRounds]() mutable {
int i = 0;
do {
for (int j = keys.size() - 1; j >= 0; j--)
ASSERT_THAT(Run({"blpop", keys[j], "0"}), ArrLen(2));
} while (next_permutation(keys.begin(), keys.end()) || i++ < kRounds);
};
auto f1 = pp_->at(1)->LaunchFiber([push]() mutable { push(); });
auto f2 = pp_->at(2)->LaunchFiber([pop]() mutable { pop(); });
f1.Join();
f2.Join();
}
} // namespace dfly

View file

@ -45,12 +45,9 @@ Transaction::Transaction(const CommandId* cid) : cid_{cid} {
string_view cmd_name(cid_->name());
if (cmd_name == "EXEC" || cmd_name == "EVAL" || cmd_name == "EVALSHA") {
multi_.reset(new MultiData);
multi_->multi_opts = cid->opt_mask();
multi_->shard_journal_write.resize(shard_set->size(), false);
if (cmd_name == "EVAL" || cmd_name == "EVALSHA") {
multi_->is_expanding = false; // we lock all the keys at once.
}
multi_->mode = NOT_DETERMINED;
}
}
@ -70,6 +67,8 @@ void Transaction::InitGlobal() {
global_ = true;
unique_shard_cnt_ = shard_set->size();
shard_data_.resize(unique_shard_cnt_);
for (auto& sd : shard_data_)
sd.local_mask = ACTIVE;
}
void Transaction::BuildShardIndex(KeyIndex key_index, bool rev_mapping,
@ -77,9 +76,6 @@ void Transaction::BuildShardIndex(KeyIndex key_index, bool rev_mapping,
auto args = cmd_with_full_args_;
auto& shard_index = *out;
shard_index.resize(shard_data_.size());
for (auto& v : shard_index)
v.Clear();
auto add = [this, rev_mapping, &shard_index](uint32_t sid, uint32_t i) {
string_view val = ArgS(cmd_with_full_args_, i);
@ -107,8 +103,6 @@ void Transaction::BuildShardIndex(KeyIndex key_index, bool rev_mapping,
void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, size_t num_args,
bool rev_mapping) {
bool incremental_locking = multi_ && multi_->is_expanding;
args_.reserve(num_args);
if (rev_mapping)
reverse_index_.reserve(args_.size());
@ -124,13 +118,20 @@ void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, siz
sd.arg_count = si.args.size();
sd.arg_start = args_.size();
// Reset mask to allow locking multiple times.
if (incremental_locking)
sd.local_mask = 0;
if (multi_) {
// Multi transactions can re-intitialize on different shards, so clear ACTIVE flag.
sd.local_mask &= ~ACTIVE;
if (!sd.arg_count)
// If we increase locks, clear KEYLOCK_ACQUIRED to track new locks.
if (multi_->IsIncrLocks())
sd.local_mask &= ~KEYLOCK_ACQUIRED;
}
if (sd.arg_count == 0 && !si.requested_active)
continue;
sd.local_mask |= ACTIVE;
unique_shard_cnt_++;
unique_shard_id_ = i;
@ -158,7 +159,7 @@ void Transaction::InitMultiData(KeyIndex key_index) {
auto lock_key = [this, mode, &tmp_uniques](auto key) {
if (auto [_, inserted] = tmp_uniques.insert(key); !inserted)
return;
if (multi_->is_expanding) {
if (multi_->IsIncrLocks()) {
multi_->keys.push_back(key);
} else {
multi_->lock_counts[key][mode]++;
@ -167,7 +168,7 @@ void Transaction::InitMultiData(KeyIndex key_index) {
// With EVAL, we call this function for EVAL itself as well as for each command
// for eval. currently, we lock everything only during the eval call.
if (multi_->is_expanding || !multi_->locks_recorded) {
if (multi_->IsIncrLocks() || !multi_->locks_recorded) {
for (size_t i = key_index.start; i < key_index.end; i += key_index.step)
lock_key(ArgS(args, i));
if (key_index.bonus > 0)
@ -224,7 +225,6 @@ void Transaction::InitByKeys(KeyIndex key_index) {
}
DCHECK_LT(key_index.start, args.size());
DCHECK_GT(key_index.start, 0u);
bool needs_reverse_mapping = cid_->opt_mask() & CO::REVERSE_MAPPING;
bool single_key = !multi_ && key_index.HasSingleKey();
@ -236,6 +236,8 @@ void Transaction::InitByKeys(KeyIndex key_index) {
StoreKeysInArgs(key_index, needs_reverse_mapping);
shard_data_.resize(1);
shard_data_.front().local_mask |= ACTIVE;
unique_shard_cnt_ = 1;
unique_shard_id_ = Shard(args_.front(), shard_set->size());
@ -246,7 +248,8 @@ void Transaction::InitByKeys(KeyIndex key_index) {
CHECK(key_index.step == 1 || key_index.step == 2);
DCHECK(key_index.step == 1 || (args.size() % 2) == 1);
auto& shard_index = tmp_space.shard_cache; // Safe, because flow below is not preemptive.
// Safe, because flow below is not preemptive.
auto& shard_index = tmp_space.GetShardIndex(shard_data_.size());
// Distribute all the arguments by shards.
BuildShardIndex(key_index, needs_reverse_mapping, &shard_index);
@ -310,18 +313,49 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) {
return OpStatus::OK;
}
void Transaction::SetExecCmd(const CommandId* cid) {
void Transaction::StartMultiGlobal(DbIndex dbid) {
CHECK(multi_);
CHECK(shard_data_.empty()); // Make sure default InitByArgs didn't run.
multi_->mode = GLOBAL;
InitBase(dbid, {});
InitGlobal();
ScheduleInternal();
}
void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgList keys) {
DCHECK(multi_);
DCHECK(shard_data_.empty()); // Make sure default InitByArgs didn't run.
multi_->mode = LOCK_AHEAD;
InitBase(dbid, keys);
InitByKeys(KeyIndex::Range(0, keys.size()));
ScheduleInternal();
}
void Transaction::StartMultiLockedIncr(DbIndex dbid, const vector<bool>& shards) {
DCHECK(multi_);
DCHECK(shard_data_.empty()); // Make sure default InitByArgs didn't run.
multi_->mode = LOCK_INCREMENTAL;
InitBase(dbid, {});
auto& shard_index = tmp_space.GetShardIndex(shard_set->size());
for (size_t i = 0; i < shards.size(); i++)
shard_index[i].requested_active = shards[i];
shard_data_.resize(shard_index.size());
InitShardData(shard_index, 0, false);
ScheduleInternal();
}
void Transaction::MultiSwitchCmd(const CommandId* cid) {
DCHECK(multi_);
DCHECK(!cb_);
// The order is important, we call Schedule for multi transaction before overriding cid_.
// TODO: The flow is ugly. Consider introducing a proper interface for Multi transactions
// like SetupMulti/TurndownMulti. We already have UnlockMulti that should be part of
// TurndownMulti.
if (txid_ == 0) {
ScheduleInternal();
}
unique_shard_cnt_ = 0;
args_.clear();
cid_ = cid;
@ -354,7 +388,7 @@ bool Transaction::RunInShard(EngineShard* shard) {
bool was_suspended = sd.local_mask & SUSPENDED_Q;
bool awaked_prerun = (sd.local_mask & AWAKED_Q) != 0;
bool incremental_lock = multi_ && multi_->is_expanding;
bool incremental_lock = multi_ && multi_->IsIncrLocks();
// For multi we unlock transaction (i.e. its keys) in UnlockMulti() call.
// Therefore we differentiate between concluding, which says that this specific
@ -374,7 +408,7 @@ bool Transaction::RunInShard(EngineShard* shard) {
shard->db_slice().Acquire(mode, GetLockArgs(idx));
}
DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED));
DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED) || (multi_ && multi_->mode == GLOBAL));
/*************************************************************************/
// Actually running the callback.
@ -482,7 +516,7 @@ void Transaction::ScheduleInternal() {
DCHECK_GT(num_shards, 0u);
is_active = [&](uint32_t i) {
return num_shards == 1 ? (i == unique_shard_id_) : shard_data_[i].arg_count > 0;
return num_shards == 1 ? (i == unique_shard_id_) : shard_data_[i].local_mask & ACTIVE;
};
}
@ -501,12 +535,14 @@ void Transaction::ScheduleInternal() {
};
shard_set->RunBriefInParallel(std::move(cb), is_active);
bool ooo_disabled = IsGlobal() || (multi_ && multi_->IsIncrLocks());
if (success.load(memory_order_acquire) == num_shards) {
coordinator_state_ |= COORD_SCHED;
// If we granted all locks, we can run out of order.
if (!span_all && lock_granted_cnt.load(memory_order_relaxed) == num_shards) {
if (!ooo_disabled && lock_granted_cnt.load(memory_order_relaxed) == num_shards) {
// Currently we don't support OOO for incremental locking. Sp far they are global.
DCHECK(!(multi_ && multi_->is_expanding));
// DCHECK(!(multi_ && multi_->IsIncrLocks()));
coordinator_state_ |= COORD_OOO;
}
VLOG(2) << "Scheduled " << DebugId()
@ -554,7 +590,7 @@ void Transaction::ScheduleInternal() {
}
void Transaction::MultiData::AddLocks(IntentLock::Mode mode) {
DCHECK(is_expanding);
DCHECK(IsIncrLocks());
for (auto key : keys) {
lock_counts[key][mode]++;
@ -562,6 +598,10 @@ void Transaction::MultiData::AddLocks(IntentLock::Mode mode) {
keys.clear();
}
bool Transaction::MultiData::IsIncrLocks() const {
return mode == LOCK_INCREMENTAL;
}
// Optimized "Schedule and execute" function for the most common use-case of a single hop
// transactions like set/mset/mget etc. Does not apply for more complicated cases like RENAME or
// BLPOP where a data must be read from multiple shards before performing another hop.
@ -611,7 +651,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
if (!multi_) // Multi schedule in advance.
ScheduleInternal();
if (multi_ && multi_->is_expanding)
if (multi_ && multi_->IsIncrLocks())
multi_->AddLocks(Mode());
ExecuteAsync();
@ -668,11 +708,11 @@ uint32_t Transaction::CalcMultiNumOfShardJournals() const {
}
void Transaction::Schedule() {
if (multi_ && multi_->is_expanding) {
if (multi_ && multi_->IsIncrLocks())
multi_->AddLocks(Mode());
} else {
if (!multi_)
ScheduleInternal();
}
}
// Runs in coordinator thread.
@ -943,6 +983,7 @@ bool Transaction::CancelShardCb(EngineShard* shard) {
if (sd.local_mask & KEYLOCK_ACQUIRED) {
auto mode = Mode();
auto lock_args = GetLockArgs(shard->shard_id());
DCHECK(lock_args.args.size() > 0 || (multi_ && multi_->mode == LOCK_INCREMENTAL));
shard->db_slice().Release(mode, lock_args);
sd.local_mask &= ~KEYLOCK_ACQUIRED;
}
@ -956,7 +997,7 @@ bool Transaction::CancelShardCb(EngineShard* shard) {
// runs in engine-shard thread.
ArgSlice Transaction::GetShardArgs(ShardId sid) const {
DCHECK(!args_.empty());
DCHECK(!args_.empty() || (multi_ && multi_->IsIncrLocks()));
// We can read unique_shard_cnt_ only because ShardArgsInShard is called after IsArmedInShard
// barrier.
@ -1061,25 +1102,24 @@ void Transaction::UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard
void Transaction::UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, EngineShard* shard,
uint32_t shard_journals_cnt) {
auto journal = shard->journal();
if (journal != nullptr && multi_->shard_journal_write[shard->shard_id()] == true) {
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_journals_cnt, {}, true);
}
if (multi_->multi_opts & CO::GLOBAL_TRANS) {
if (multi_->mode == GLOBAL) {
shard->shard_lock()->Release(IntentLock::EXCLUSIVE);
}
} else {
ShardId sid = shard->shard_id();
for (const auto& k_v : sharded_keys[sid]) {
auto release = [&](IntentLock::Mode mode) {
if (k_v.second[mode]) {
shard->db_slice().Release(mode, db_index_, k_v.first, k_v.second[mode]);
}
};
ShardId sid = shard->shard_id();
for (const auto& k_v : sharded_keys[sid]) {
auto release = [&](IntentLock::Mode mode) {
if (k_v.second[mode]) {
shard->db_slice().Release(mode, db_index_, k_v.first, k_v.second[mode]);
}
};
release(IntentLock::SHARED);
release(IntentLock::EXCLUSIVE);
release(IntentLock::SHARED);
release(IntentLock::EXCLUSIVE);
}
}
auto& sd = shard_data_[SidToId(shard->shard_id())];
@ -1241,7 +1281,8 @@ void Transaction::BreakOnShutdown() {
}
OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
DCHECK_EQ(0u, cid->opt_mask() & CO::GLOBAL_TRANS);
if (cid->opt_mask() & CO::GLOBAL_TRANS)
return KeyIndex::Empty();
KeyIndex key_index;
@ -1288,4 +1329,11 @@ OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
return key_index;
}
std::vector<Transaction::PerShardCache>& Transaction::TLTmpSpace::GetShardIndex(unsigned size) {
shard_cache.resize(size);
for (auto& v : shard_cache)
v.Clear();
return shard_cache;
}
} // namespace dfly

View file

@ -63,14 +63,30 @@ class Transaction {
// Provides keys to block on for specific shard.
using WaitKeysProvider = std::function<ArgSlice(Transaction*, EngineShard* shard)>;
// Modes in which a multi transaction can run.
enum MultiMode {
// Invalid state.
NOT_DETERMINED = 0,
// Global transaction.
GLOBAL = 1,
// Keys are locked ahead during Schedule.
LOCK_AHEAD = 2,
// Keys are locked incrementally during each new command.
// The shards to schedule on are detemined ahead and remain fixed.
LOCK_INCREMENTAL = 3,
// Each command is executed separately. Equivalent to a pipeline.
NOT_ATOMIC = 4,
};
// State on specific shard.
enum LocalMask : uint16_t {
ARMED = 1, // Whether callback cb_ is set
OUT_OF_ORDER = 1 << 1, // Whether its running out of order
KEYLOCK_ACQUIRED = 1 << 2, // Whether its key locks are acquired
SUSPENDED_Q = 1 << 3, // Whether is suspened (by WatchInShard())
AWAKED_Q = 1 << 4, // Whether it was awakened (by NotifySuspended())
EXPIRED_Q = 1 << 5, // Whether it timed out and should be dropped
ACTIVE = 1, // Set on all active shards.
ARMED = 1 << 1, // Whether callback cb_ is set
OUT_OF_ORDER = 1 << 2, // Whether its running out of order
KEYLOCK_ACQUIRED = 1 << 3, // Whether its key locks are acquired
SUSPENDED_Q = 1 << 4, // Whether is suspened (by WatchInShard())
AWAKED_Q = 1 << 5, // Whether it was awakened (by NotifySuspended())
EXPIRED_Q = 1 << 6, // Whether it timed out and should be dropped
};
public:
@ -125,11 +141,20 @@ class Transaction {
renabled_auto_journal_.store(true, std::memory_order_relaxed);
}
// Start multi in GLOBAL mode.
void StartMultiGlobal(DbIndex dbid);
// Start multi in LOCK_AHEAD mode with given keys.
void StartMultiLockedAhead(DbIndex dbid, CmdArgList keys);
// Start multi in LOCK_INCREMENTAL mode on given shards.
void StartMultiLockedIncr(DbIndex dbid, const std::vector<bool>& shards);
// Unlock key locks of a multi transaction.
void UnlockMulti();
// Reset CommandId to be executed when executing MULTI/EXEC or from script.
void SetExecCmd(const CommandId* cid);
// Set new command for multi transaction.
void MultiSwitchCmd(const CommandId* cid);
// Returns locking arguments needed for DbSlice to Acquire/Release transactional locks.
// Runs in the shard thread.
@ -249,6 +274,11 @@ class Transaction {
// Increase lock counts for all current keys for mode. Clear keys.
void AddLocks(IntentLock::Mode mode);
// Whether it locks incrementally.
bool IsIncrLocks() const;
MultiMode mode;
absl::flat_hash_map<std::string_view, LockCnt> lock_counts;
std::vector<std::string_view> keys;
@ -257,10 +287,7 @@ class Transaction {
// executing multi-command. For every write to a shard journal, the corresponding index in the
// vector is marked as true.
absl::InlinedVector<bool, 4> shard_journal_write;
uint32_t multi_opts = 0; // options of the parent transaction.
// Whether this transaction can lock more keys during its progress.
bool is_expanding = true;
bool locks_recorded = false;
};
@ -276,10 +303,12 @@ class Transaction {
};
struct PerShardCache {
bool requested_active = false; // Activate on shard geradless of presence of keys.
std::vector<std::string_view> args;
std::vector<uint32_t> original_index;
void Clear() {
requested_active = false;
args.clear();
original_index.clear();
}
@ -377,7 +406,7 @@ class Transaction {
f(shard_data_[SidToId(i)], i);
} else {
for (ShardId i = 0; i < shard_data_.size(); ++i) {
if (auto& sd = shard_data_[i]; global_ || sd.arg_count > 0) {
if (auto& sd = shard_data_[i]; global_ || (sd.local_mask & ACTIVE)) {
f(sd, i);
}
}
@ -434,8 +463,12 @@ class Transaction {
private:
struct TLTmpSpace {
std::vector<PerShardCache> shard_cache;
absl::flat_hash_set<std::string_view> uniq_keys;
std::vector<PerShardCache>& GetShardIndex(unsigned size);
private:
std::vector<PerShardCache> shard_cache;
};
static thread_local TLTmpSpace tmp_space;