chore(transaction): Launder copied keys in multi transactions (#2478)

* chore(transaction): Launder copied keys in multi transactions

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-01-27 13:24:42 +03:00 committed by GitHub
parent 8a3ccff0bc
commit 675b3889a4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 68 additions and 61 deletions

View file

@ -234,7 +234,7 @@ size_t ConnectionState::ExecInfo::UsedMemory() const {
}
size_t ConnectionState::ScriptInfo::UsedMemory() const {
return dfly::HeapSize(keys);
return dfly::HeapSize(keys) + async_cmds_heap_mem;
}
size_t ConnectionState::SubscribeInfo::UsedMemory() const {

View file

@ -166,31 +166,27 @@ class RoundRobinSharder {
};
bool HasContendedLocks(unsigned shard_id, Transaction* trx, const DbTable* table) {
bool has_contended_locks = false;
auto is_contended = [table](string_view key) {
auto it = table->trans_locks.find(key);
DCHECK(it != table->trans_locks.end());
return it->second.IsContended();
};
if (trx->IsMulti()) {
trx->IterateMultiLocks(shard_id, [&](const string& key) {
auto it = table->trans_locks.find(key);
DCHECK(it != table->trans_locks.end());
if (it->second.IsContended()) {
has_contended_locks = true;
}
});
auto keys = trx->GetMultiKeys();
for (string_view key : keys) {
if (Shard(key, shard_set->size()) == shard_id && is_contended(key))
return true;
}
} else {
KeyLockArgs lock_args = trx->GetLockArgs(shard_id);
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
string_view s = KeyLockArgs::GetLockKey(lock_args.args[i]);
auto it = table->trans_locks.find(s);
DCHECK(it != table->trans_locks.end());
if (it != table->trans_locks.end()) {
if (it->second.IsContended()) {
has_contended_locks = true;
break;
}
}
if (is_contended(KeyLockArgs::GetLockKey(lock_args.args[i])))
return true;
}
}
return has_contended_locks;
return false;
}
thread_local string RoundRobinSharder::round_robin_prefix_;

View file

@ -902,17 +902,21 @@ OpStatus CheckKeysDeclared(const ConnectionState::ScriptInfo& eval_info, const C
if (!key_index_res)
return key_index_res.status();
// TODO: Switch to transaction internal locked keys once single hop multi transactions are merged
// const auto& locked_keys = trans->GetMultiKeys();
const auto& locked_keys = eval_info.keys;
const auto& key_index = *key_index_res;
for (unsigned i = key_index.start; i < key_index.end; ++i) {
string_view key = KeyLockArgs::GetLockKey(ArgS(args, i));
if (!eval_info.keys.contains(key)) {
if (!locked_keys.contains(key)) {
VLOG(1) << "Key " << key << " is not declared for command " << cid->name();
return OpStatus::KEY_NOTFOUND;
}
}
if (key_index.bonus &&
!eval_info.keys.contains(KeyLockArgs::GetLockKey(ArgS(args, *key_index.bonus))))
!locked_keys.contains(KeyLockArgs::GetLockKey(ArgS(args, *key_index.bonus))))
return OpStatus::KEY_NOTFOUND;
return OpStatus::OK;
@ -1714,7 +1718,7 @@ optional<bool> StartMultiEval(DbIndex dbid, CmdArgList keys, ScriptMgr::ScriptPa
trans->StartMultiGlobal(dbid);
return true;
case Transaction::LOCK_AHEAD:
trans->StartMultiLockedAhead(dbid, keys);
trans->StartMultiLockedAhead(dbid, CmdArgVec{keys.begin(), keys.end()});
return true;
case Transaction::NON_ATOMIC:
trans->StartMultiNonAtomic();
@ -1985,14 +1989,12 @@ CmdArgVec CollectAllKeys(ConnectionState::ExecInfo* exec_info) {
// Return true if transaction was scheduled, false if scheduling was not required.
void StartMultiExec(DbIndex dbid, Transaction* trans, ConnectionState::ExecInfo* exec_info,
Transaction::MultiMode multi_mode) {
CmdArgVec tmp_keys;
switch (multi_mode) {
case Transaction::GLOBAL:
trans->StartMultiGlobal(dbid);
break;
case Transaction::LOCK_AHEAD:
tmp_keys = CollectAllKeys(exec_info);
trans->StartMultiLockedAhead(dbid, CmdArgList{tmp_keys});
trans->StartMultiLockedAhead(dbid, CollectAllKeys(exec_info));
break;
case Transaction::NON_ATOMIC:
trans->StartMultiNonAtomic();

View file

@ -204,23 +204,27 @@ void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, siz
CHECK_EQ(args_.size(), num_args);
}
void Transaction::RecordMultiLocks(const KeyIndex& key_index) {
DCHECK(multi_);
DCHECK(!multi_->lock_mode);
void Transaction::LaunderKeyStorage(CmdArgVec* keys) {
DCHECK_EQ(multi_->mode, LOCK_AHEAD);
DCHECK_GT(keys->size(), 0u);
if (multi_->mode == NON_ATOMIC)
return;
auto& m_keys = multi_->frozen_keys;
auto& m_keys_set = multi_->frozen_keys_set;
auto lock_key = [this](string_view key) { multi_->locks.emplace(KeyLockArgs::GetLockKey(key)); };
// Reserve enough space, so pointers from frozen_keys_set are not invalidated
m_keys.reserve(keys->size());
multi_->lock_mode.emplace(LockMode());
for (size_t i = key_index.start; i < key_index.end; i += key_index.step)
lock_key(ArgS(full_args_, i));
if (key_index.bonus)
lock_key(ArgS(full_args_, *key_index.bonus));
for (MutableSlice key : *keys) {
string_view key_s = KeyLockArgs::GetLockKey(facade::ToSV(key));
// Insert copied string view, not original. This is why "try insert" is not allowed
if (!m_keys_set.contains(key_s))
m_keys_set.insert(m_keys.emplace_back(key_s));
}
DCHECK(IsAtomicMulti());
DCHECK(multi_->mode == GLOBAL || !multi_->locks.empty());
// Copy mutable pointers into keys
keys->clear();
for (string& key : m_keys)
keys->emplace_back(key.data(), key.size());
}
void Transaction::StoreKeysInArgs(KeyIndex key_index, bool rev_mapping) {
@ -308,8 +312,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
// Initialize shard data based on distributed arguments.
InitShardData(shard_index, key_index.num_args(), needs_reverse_mapping);
if (multi_ && !multi_->lock_mode)
RecordMultiLocks(key_index);
DCHECK(!multi_ || multi_->mode != LOCK_AHEAD || !multi_->frozen_keys.empty());
DVLOG(1) << "InitByArgs " << DebugId() << " " << args_.front();
@ -411,17 +414,23 @@ void Transaction::StartMultiGlobal(DbIndex dbid) {
ScheduleInternal();
}
void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgList keys) {
void Transaction::StartMultiLockedAhead(DbIndex dbid, CmdArgVec keys) {
DVLOG(1) << "StartMultiLockedAhead on " << keys.size() << " keys";
DCHECK(multi_);
DCHECK(shard_data_.empty()); // Make sure default InitByArgs didn't run.
multi_->mode = LOCK_AHEAD;
InitBase(dbid, keys);
multi_->lock_mode = LockMode();
LaunderKeyStorage(&keys); // Filter uniques and normalize
InitBase(dbid, absl::MakeSpan(keys));
InitByKeys(KeyIndex::Range(0, keys.size()));
ScheduleInternal();
full_args_ = {nullptr, 0}; // InitBase set it to temporary keys, now we reset it.
}
void Transaction::StartMultiNonAtomic() {
@ -794,11 +803,12 @@ void Transaction::UnlockMulti() {
if (multi_->mode == NON_ATOMIC)
return;
multi_->frozen_keys_set.clear();
auto sharded_keys = make_shared<vector<KeyList>>(shard_set->size());
while (!multi_->locks.empty()) {
auto entry = multi_->locks.extract(multi_->locks.begin());
ShardId sid = Shard(entry.value(), sharded_keys->size());
(*sharded_keys)[sid].emplace_back(std::move(entry.value()));
for (string& key : multi_->frozen_keys) {
ShardId sid = Shard(key, sharded_keys->size());
(*sharded_keys)[sid].emplace_back(std::move(key));
}
unsigned shard_journals_cnt =
@ -922,14 +932,9 @@ void Transaction::Refurbish() {
cb_ptr_ = nullptr;
}
void Transaction::IterateMultiLocks(ShardId sid, std::function<void(const std::string&)> cb) const {
unsigned shard_num = shard_set->size();
for (const auto& key : multi_->locks) {
ShardId key_sid = Shard(key, shard_num);
if (key_sid == sid) {
cb(key);
}
}
const absl::flat_hash_set<std::string_view>& Transaction::GetMultiKeys() const {
DCHECK(multi_);
return multi_->frozen_keys_set;
}
void Transaction::EnableShard(ShardId sid) {

View file

@ -233,7 +233,7 @@ class Transaction {
void StartMultiGlobal(DbIndex dbid);
// Start multi in LOCK_AHEAD mode with given keys.
void StartMultiLockedAhead(DbIndex dbid, CmdArgList keys);
void StartMultiLockedAhead(DbIndex dbid, CmdArgVec keys);
// Start multi in NON_ATOMIC mode.
void StartMultiNonAtomic();
@ -344,7 +344,8 @@ class Transaction {
void Refurbish();
void IterateMultiLocks(ShardId sid, std::function<void(const std::string&)> cb) const;
// Get keys multi transaction was initialized with, normalized and unique
const absl::flat_hash_set<std::string_view>& GetMultiKeys() const;
private:
// Holds number of locks for each IntentLock::Mode: shared and exlusive.
@ -397,9 +398,11 @@ class Transaction {
struct MultiData {
MultiRole role;
MultiMode mode;
std::optional<IntentLock::Mode> lock_mode;
absl::flat_hash_set<std::string> locks;
// Unique normalized keys used for scheduling the multi transaction.
std::vector<std::string> frozen_keys;
absl::flat_hash_set<std::string_view> frozen_keys_set; // point to frozen_keys
// Set if the multi command is concluding to avoid ambiguity with COORD_CONCLUDING
bool concluding = false;
@ -449,12 +452,13 @@ class Transaction {
void InitShardData(absl::Span<const PerShardCache> shard_index, size_t num_args,
bool rev_mapping);
// Init multi. Record locks if needed.
void RecordMultiLocks(const KeyIndex& keys);
// Store all key index keys in args_. Used only for single shard initialization.
void StoreKeysInArgs(KeyIndex keys, bool rev_mapping);
// Multi transactions unlock asynchronously, so they need to keep a copy of all they keys.
// "Launder" keys by filtering uniques and replacing pointers with same lifetime as transaction.
void LaunderKeyStorage(CmdArgVec* keys);
// Generic schedule used from Schedule() and ScheduleSingleHop() on slow path.
void ScheduleInternal();