mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 02:15:45 +02:00
chore: Introduce LockKey for LockTable (#2463)
This should reduce allocations in a common case (not multi). In addition, rename Transaction::args_ to kv_args_. Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io> Co-authored-by: Vladislav <vlad@dragonflydb.io>
This commit is contained in:
parent
9f4c4353b5
commit
d608ec9c62
8 changed files with 134 additions and 73 deletions
|
@ -191,7 +191,7 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT
|
||||||
string tmp;
|
string tmp;
|
||||||
string_view key = last_slot_it->first.GetSlice(&tmp);
|
string_view key = last_slot_it->first.GetSlice(&tmp);
|
||||||
// do not evict locked keys
|
// do not evict locked keys
|
||||||
if (lt.find(KeyLockArgs::GetLockKey(key)) != lt.end())
|
if (lt.Find(KeyLockArgs::GetLockKey(key)).has_value())
|
||||||
return 0;
|
return 0;
|
||||||
|
|
||||||
// log the evicted keys to journal.
|
// log the evicted keys to journal.
|
||||||
|
@ -732,7 +732,7 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
|
||||||
flush_db_arr[index] = std::move(db);
|
flush_db_arr[index] = std::move(db);
|
||||||
|
|
||||||
CreateDb(index);
|
CreateDb(index);
|
||||||
db_arr_[index]->trans_locks.swap(flush_db_arr[index]->trans_locks);
|
std::swap(db_arr_[index]->trans_locks, flush_db_arr[index]->trans_locks);
|
||||||
if (TieredStorage* tiered = shard_owner()->tiered_storage(); tiered) {
|
if (TieredStorage* tiered = shard_owner()->tiered_storage(); tiered) {
|
||||||
tiered->CancelAllIos(index);
|
tiered->CancelAllIos(index);
|
||||||
}
|
}
|
||||||
|
@ -939,7 +939,7 @@ size_t DbSlice::DbSize(DbIndex db_ind) const {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
|
bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
|
||||||
if (lock_args.args.empty()) {
|
if (lock_args.args.empty()) { // Can be empty for NO_KEY_TRANSACTIONAL commands.
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
DCHECK_GT(lock_args.key_step, 0u);
|
DCHECK_GT(lock_args.key_step, 0u);
|
||||||
|
@ -949,7 +949,7 @@ bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
|
||||||
|
|
||||||
if (lock_args.args.size() == 1) {
|
if (lock_args.args.size() == 1) {
|
||||||
string_view key = KeyLockArgs::GetLockKey(lock_args.args.front());
|
string_view key = KeyLockArgs::GetLockKey(lock_args.args.front());
|
||||||
lock_acquired = lt[key].Acquire(mode);
|
lock_acquired = lt.Acquire(key, mode);
|
||||||
uniq_keys_ = {key}; // needed only for tests.
|
uniq_keys_ = {key}; // needed only for tests.
|
||||||
} else {
|
} else {
|
||||||
uniq_keys_.clear();
|
uniq_keys_.clear();
|
||||||
|
@ -957,8 +957,7 @@ bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
|
||||||
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
|
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
|
||||||
string_view s = KeyLockArgs::GetLockKey(lock_args.args[i]);
|
string_view s = KeyLockArgs::GetLockKey(lock_args.args[i]);
|
||||||
if (uniq_keys_.insert(s).second) {
|
if (uniq_keys_.insert(s).second) {
|
||||||
bool res = lt[s].Acquire(mode);
|
lock_acquired &= lt.Acquire(s, mode);
|
||||||
lock_acquired &= res;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -969,41 +968,31 @@ bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
|
||||||
return lock_acquired;
|
return lock_acquired;
|
||||||
}
|
}
|
||||||
|
|
||||||
void DbSlice::ReleaseNormalized(IntentLock::Mode mode, DbIndex db_index, std::string_view key,
|
void DbSlice::ReleaseNormalized(IntentLock::Mode mode, DbIndex db_index, std::string_view key) {
|
||||||
unsigned count) {
|
|
||||||
DCHECK_EQ(key, KeyLockArgs::GetLockKey(key));
|
DCHECK_EQ(key, KeyLockArgs::GetLockKey(key));
|
||||||
DVLOG(1) << "Release " << IntentLock::ModeName(mode) << " " << count << " for " << key;
|
DVLOG(1) << "Release " << IntentLock::ModeName(mode) << " "
|
||||||
|
<< " for " << key;
|
||||||
|
|
||||||
auto& lt = db_arr_[db_index]->trans_locks;
|
auto& lt = db_arr_[db_index]->trans_locks;
|
||||||
auto it = lt.find(KeyLockArgs::GetLockKey(key));
|
lt.Release(KeyLockArgs::GetLockKey(key), mode);
|
||||||
CHECK(it != lt.end()) << key;
|
|
||||||
it->second.Release(mode, count);
|
|
||||||
if (it->second.IsFree()) {
|
|
||||||
lt.erase(it);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
|
void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
|
||||||
if (lock_args.args.empty()) {
|
if (lock_args.args.empty()) { // Can be empty for NO_KEY_TRANSACTIONAL commands.
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
DVLOG(2) << "Release " << IntentLock::ModeName(mode) << " for " << lock_args.args[0];
|
DVLOG(2) << "Release " << IntentLock::ModeName(mode) << " for " << lock_args.args[0];
|
||||||
if (lock_args.args.size() == 1) {
|
if (lock_args.args.size() == 1) {
|
||||||
string_view key = KeyLockArgs::GetLockKey(lock_args.args.front());
|
string_view key = KeyLockArgs::GetLockKey(lock_args.args.front());
|
||||||
ReleaseNormalized(mode, lock_args.db_index, key, 1);
|
ReleaseNormalized(mode, lock_args.db_index, key);
|
||||||
} else {
|
} else {
|
||||||
auto& lt = db_arr_[lock_args.db_index]->trans_locks;
|
auto& lt = db_arr_[lock_args.db_index]->trans_locks;
|
||||||
uniq_keys_.clear();
|
uniq_keys_.clear();
|
||||||
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
|
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
|
||||||
auto s = KeyLockArgs::GetLockKey(lock_args.args[i]);
|
string_view s = KeyLockArgs::GetLockKey(lock_args.args[i]);
|
||||||
if (uniq_keys_.insert(s).second) {
|
if (uniq_keys_.insert(s).second) {
|
||||||
auto it = lt.find(s);
|
lt.Release(s, mode);
|
||||||
CHECK(it != lt.end());
|
|
||||||
it->second.Release(mode);
|
|
||||||
if (it->second.IsFree()) {
|
|
||||||
lt.erase(it);
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1021,11 +1010,9 @@ bool DbSlice::CheckLock(IntentLock::Mode mode, DbIndex dbid, string_view key) co
|
||||||
bool DbSlice::CheckLock(IntentLock::Mode mode, const KeyLockArgs& lock_args) const {
|
bool DbSlice::CheckLock(IntentLock::Mode mode, const KeyLockArgs& lock_args) const {
|
||||||
const auto& lt = db_arr_[lock_args.db_index]->trans_locks;
|
const auto& lt = db_arr_[lock_args.db_index]->trans_locks;
|
||||||
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
|
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
|
||||||
auto s = KeyLockArgs::GetLockKey(lock_args.args[i]);
|
string_view s = KeyLockArgs::GetLockKey(lock_args.args[i]);
|
||||||
auto it = lt.find(s);
|
if (auto lock = lt.Find(s); lock && !lock->Check(mode))
|
||||||
if (it != lt.end() && !it->second.Check(mode)) {
|
|
||||||
return false;
|
return false;
|
||||||
}
|
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -1250,7 +1237,7 @@ void DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t increase_goal_bytes
|
||||||
// check if the key is locked by looking up transaction table.
|
// check if the key is locked by looking up transaction table.
|
||||||
auto& lt = db_table->trans_locks;
|
auto& lt = db_table->trans_locks;
|
||||||
string_view key = evict_it->first.GetSlice(&tmp);
|
string_view key = evict_it->first.GetSlice(&tmp);
|
||||||
if (lt.find(KeyLockArgs::GetLockKey(key)) != lt.end())
|
if (lt.Find(KeyLockArgs::GetLockKey(key)).has_value())
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
if (auto journal = owner_->journal(); journal) {
|
if (auto journal = owner_->journal(); journal) {
|
||||||
|
|
|
@ -405,8 +405,7 @@ class DbSlice {
|
||||||
void PerformDeletion(PrimeIterator del_it, DbTable* table);
|
void PerformDeletion(PrimeIterator del_it, DbTable* table);
|
||||||
|
|
||||||
// Releases a single key. `key` must have been normalized by GetLockKey().
|
// Releases a single key. `key` must have been normalized by GetLockKey().
|
||||||
void ReleaseNormalized(IntentLock::Mode m, DbIndex db_index, std::string_view key,
|
void ReleaseNormalized(IntentLock::Mode m, DbIndex db_index, std::string_view key);
|
||||||
unsigned count);
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void PreUpdate(DbIndex db_ind, PrimeIterator it);
|
void PreUpdate(DbIndex db_ind, PrimeIterator it);
|
||||||
|
|
|
@ -167,9 +167,7 @@ class RoundRobinSharder {
|
||||||
|
|
||||||
bool HasContendedLocks(unsigned shard_id, Transaction* trx, const DbTable* table) {
|
bool HasContendedLocks(unsigned shard_id, Transaction* trx, const DbTable* table) {
|
||||||
auto is_contended = [table](string_view key) {
|
auto is_contended = [table](string_view key) {
|
||||||
auto it = table->trans_locks.find(key);
|
return table->trans_locks.Find(key)->IsContended();
|
||||||
DCHECK(it != table->trans_locks.end());
|
|
||||||
return it->second.IsContended();
|
|
||||||
};
|
};
|
||||||
|
|
||||||
if (trx->IsMulti()) {
|
if (trx->IsMulti()) {
|
||||||
|
@ -771,13 +769,13 @@ auto EngineShard::AnalyzeTxQueue() const -> TxQueueInfo {
|
||||||
if (table == nullptr)
|
if (table == nullptr)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
info.total_locks += table->trans_locks.size();
|
info.total_locks += table->trans_locks.Size();
|
||||||
for (const auto& k_v : table->trans_locks) {
|
for (const auto& [key, lock] : table->trans_locks) {
|
||||||
if (k_v.second.IsContended()) {
|
if (lock.IsContended()) {
|
||||||
info.contended_locks++;
|
info.contended_locks++;
|
||||||
if (k_v.second.ContentionScore() > info.max_contention_score) {
|
if (lock.ContentionScore() > info.max_contention_score) {
|
||||||
info.max_contention_score = k_v.second.ContentionScore();
|
info.max_contention_score = lock.ContentionScore();
|
||||||
info.max_contention_lock_name = k_v.first;
|
info.max_contention_lock_name = string_view{key};
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -56,6 +56,44 @@ SlotStats& SlotStats::operator+=(const SlotStats& o) {
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void LockTable::Key::MakeOwned() const {
|
||||||
|
if (std::holds_alternative<std::string_view>(val_))
|
||||||
|
val_ = std::string{std::get<std::string_view>(val_)};
|
||||||
|
}
|
||||||
|
|
||||||
|
size_t LockTable::Size() const {
|
||||||
|
return locks_.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
std::optional<const IntentLock> LockTable::Find(std::string_view key) const {
|
||||||
|
DCHECK_EQ(KeyLockArgs::GetLockKey(key), key);
|
||||||
|
|
||||||
|
if (auto it = locks_.find(Key{key}); it != locks_.end())
|
||||||
|
return it->second;
|
||||||
|
return std::nullopt;
|
||||||
|
}
|
||||||
|
|
||||||
|
bool LockTable::Acquire(std::string_view key, IntentLock::Mode mode) {
|
||||||
|
DCHECK_EQ(KeyLockArgs::GetLockKey(key), key);
|
||||||
|
|
||||||
|
auto [it, inserted] = locks_.try_emplace(Key{key});
|
||||||
|
if (!inserted) // If more than one transaction refers to a key
|
||||||
|
it->first.MakeOwned(); // we must fall back to using a self-contained string
|
||||||
|
|
||||||
|
return it->second.Acquire(mode);
|
||||||
|
}
|
||||||
|
|
||||||
|
void LockTable::Release(std::string_view key, IntentLock::Mode mode) {
|
||||||
|
DCHECK_EQ(KeyLockArgs::GetLockKey(key), key);
|
||||||
|
|
||||||
|
auto it = locks_.find(Key{key});
|
||||||
|
CHECK(it != locks_.end()) << key;
|
||||||
|
|
||||||
|
it->second.Release(mode);
|
||||||
|
if (it->second.IsFree())
|
||||||
|
locks_.erase(it);
|
||||||
|
}
|
||||||
|
|
||||||
DbTable::DbTable(PMR_NS::memory_resource* mr, DbIndex db_index)
|
DbTable::DbTable(PMR_NS::memory_resource* mr, DbIndex db_index)
|
||||||
: prime(kInitSegmentLog, detail::PrimeTablePolicy{}, mr),
|
: prime(kInitSegmentLog, detail::PrimeTablePolicy{}, mr),
|
||||||
expire(0, detail::ExpireTablePolicy{}, mr),
|
expire(0, detail::ExpireTablePolicy{}, mr),
|
||||||
|
|
|
@ -79,7 +79,47 @@ struct DbTableStats {
|
||||||
DbTableStats& operator+=(const DbTableStats& o);
|
DbTableStats& operator+=(const DbTableStats& o);
|
||||||
};
|
};
|
||||||
|
|
||||||
using LockTable = absl::flat_hash_map<std::string, IntentLock>;
|
// Table for recording locks that uses string_views where possible. LockTable falls back to
|
||||||
|
// strings for locks that are used by multiple transactions. Keys used with the lock table
|
||||||
|
// should be normalized with GetLockKey
|
||||||
|
class LockTable {
|
||||||
|
public:
|
||||||
|
size_t Size() const;
|
||||||
|
std::optional<const IntentLock> Find(std::string_view key) const;
|
||||||
|
|
||||||
|
bool Acquire(std::string_view key, IntentLock::Mode mode);
|
||||||
|
void Release(std::string_view key, IntentLock::Mode mode);
|
||||||
|
|
||||||
|
auto begin() const {
|
||||||
|
return locks_.cbegin();
|
||||||
|
}
|
||||||
|
|
||||||
|
auto end() const {
|
||||||
|
return locks_.cbegin();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
struct Key {
|
||||||
|
operator std::string_view() const {
|
||||||
|
return visit([](const auto& s) -> std::string_view { return s; }, val_);
|
||||||
|
}
|
||||||
|
|
||||||
|
bool operator==(const Key& o) const {
|
||||||
|
return *this == std::string_view(o);
|
||||||
|
}
|
||||||
|
|
||||||
|
friend std::ostream& operator<<(std::ostream& o, const Key& key) {
|
||||||
|
return o << std::string_view(key);
|
||||||
|
}
|
||||||
|
|
||||||
|
// If the key is backed by a string_view, replace it with a string with the same value
|
||||||
|
void MakeOwned() const;
|
||||||
|
|
||||||
|
mutable std::variant<std::string_view, std::string> val_;
|
||||||
|
};
|
||||||
|
|
||||||
|
absl::flat_hash_map<Key, IntentLock> locks_;
|
||||||
|
};
|
||||||
|
|
||||||
// A single Db table that represents a table that can be chosen with "SELECT" command.
|
// A single Db table that represents a table that can be chosen with "SELECT" command.
|
||||||
struct DbTable : boost::intrusive_ref_counter<DbTable, boost::thread_unsafe_counter> {
|
struct DbTable : boost::intrusive_ref_counter<DbTable, boost::thread_unsafe_counter> {
|
||||||
|
|
|
@ -311,7 +311,7 @@ unsigned BaseFamilyTest::NumLocked() {
|
||||||
if (db == nullptr) {
|
if (db == nullptr) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
count += db->trans_locks.size();
|
count += db->trans_locks.Size();
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
return count;
|
return count;
|
||||||
|
|
|
@ -170,18 +170,18 @@ void Transaction::BuildShardIndex(const KeyIndex& key_index, bool rev_mapping,
|
||||||
|
|
||||||
void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, size_t num_args,
|
void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, size_t num_args,
|
||||||
bool rev_mapping) {
|
bool rev_mapping) {
|
||||||
args_.reserve(num_args);
|
kv_args_.reserve(num_args);
|
||||||
if (rev_mapping)
|
if (rev_mapping)
|
||||||
reverse_index_.reserve(num_args);
|
reverse_index_.reserve(num_args);
|
||||||
|
|
||||||
// Store the concatenated per-shard arguments from the shard index inside args_
|
// Store the concatenated per-shard arguments from the shard index inside kv_args_
|
||||||
// and make each shard data point to its own sub-span inside args_.
|
// and make each shard data point to its own sub-span inside kv_args_.
|
||||||
for (size_t i = 0; i < shard_data_.size(); ++i) {
|
for (size_t i = 0; i < shard_data_.size(); ++i) {
|
||||||
auto& sd = shard_data_[i];
|
auto& sd = shard_data_[i];
|
||||||
auto& si = shard_index[i];
|
const auto& si = shard_index[i];
|
||||||
|
|
||||||
sd.arg_count = si.args.size();
|
sd.arg_count = si.args.size();
|
||||||
sd.arg_start = args_.size();
|
sd.arg_start = kv_args_.size();
|
||||||
|
|
||||||
// Multi transactions can re-initialize on different shards, so clear ACTIVE flag.
|
// Multi transactions can re-initialize on different shards, so clear ACTIVE flag.
|
||||||
DCHECK_EQ(sd.local_mask & ACTIVE, 0);
|
DCHECK_EQ(sd.local_mask & ACTIVE, 0);
|
||||||
|
@ -195,13 +195,13 @@ void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, siz
|
||||||
unique_shard_id_ = i;
|
unique_shard_id_ = i;
|
||||||
|
|
||||||
for (size_t j = 0; j < si.args.size(); ++j) {
|
for (size_t j = 0; j < si.args.size(); ++j) {
|
||||||
args_.push_back(si.args[j]);
|
kv_args_.push_back(si.args[j]);
|
||||||
if (rev_mapping)
|
if (rev_mapping)
|
||||||
reverse_index_.push_back(si.original_index[j]);
|
reverse_index_.push_back(si.original_index[j]);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
CHECK_EQ(args_.size(), num_args);
|
DCHECK_EQ(kv_args_.size(), num_args);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Transaction::LaunderKeyStorage(CmdArgVec* keys) {
|
void Transaction::LaunderKeyStorage(CmdArgVec* keys) {
|
||||||
|
@ -233,13 +233,13 @@ void Transaction::StoreKeysInArgs(KeyIndex key_index, bool rev_mapping) {
|
||||||
|
|
||||||
// even for a single key we may have multiple arguments per key (MSET).
|
// even for a single key we may have multiple arguments per key (MSET).
|
||||||
for (unsigned j = key_index.start; j < key_index.end; j++) {
|
for (unsigned j = key_index.start; j < key_index.end; j++) {
|
||||||
args_.push_back(ArgS(full_args_, j));
|
kv_args_.push_back(ArgS(full_args_, j));
|
||||||
if (key_index.step == 2)
|
if (key_index.step == 2)
|
||||||
args_.push_back(ArgS(full_args_, ++j));
|
kv_args_.push_back(ArgS(full_args_, ++j));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (rev_mapping) {
|
if (rev_mapping) {
|
||||||
reverse_index_.resize(args_.size());
|
reverse_index_.resize(kv_args_.size());
|
||||||
for (unsigned j = 0; j < reverse_index_.size(); ++j) {
|
for (unsigned j = 0; j < reverse_index_.size(); ++j) {
|
||||||
reverse_index_[j] = j + key_index.start;
|
reverse_index_[j] = j + key_index.start;
|
||||||
}
|
}
|
||||||
|
@ -287,9 +287,9 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
|
||||||
|
|
||||||
unique_shard_cnt_ = 1;
|
unique_shard_cnt_ = 1;
|
||||||
if (is_stub) // stub transactions don't migrate
|
if (is_stub) // stub transactions don't migrate
|
||||||
DCHECK_EQ(unique_shard_id_, Shard(args_.front(), shard_set->size()));
|
DCHECK_EQ(unique_shard_id_, Shard(kv_args_.front(), shard_set->size()));
|
||||||
else
|
else
|
||||||
unique_shard_id_ = Shard(args_.front(), shard_set->size());
|
unique_shard_id_ = Shard(kv_args_.front(), shard_set->size());
|
||||||
|
|
||||||
// Multi transactions that execute commands on their own (not stubs) can't shrink the backing
|
// Multi transactions that execute commands on their own (not stubs) can't shrink the backing
|
||||||
// array, as it still might be read by leftover callbacks.
|
// array, as it still might be read by leftover callbacks.
|
||||||
|
@ -314,7 +314,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
|
||||||
|
|
||||||
DCHECK(!multi_ || multi_->mode != LOCK_AHEAD || !multi_->frozen_keys.empty());
|
DCHECK(!multi_ || multi_->mode != LOCK_AHEAD || !multi_->frozen_keys.empty());
|
||||||
|
|
||||||
DVLOG(1) << "InitByArgs " << DebugId() << " " << args_.front();
|
DVLOG(1) << "InitByArgs " << DebugId() << " " << kv_args_.front();
|
||||||
|
|
||||||
// Compress shard data, if we occupy only one shard.
|
// Compress shard data, if we occupy only one shard.
|
||||||
if (unique_shard_cnt_ == 1) {
|
if (unique_shard_cnt_ == 1) {
|
||||||
|
@ -333,8 +333,8 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
|
||||||
|
|
||||||
// Validation. Check reverse mapping was built correctly.
|
// Validation. Check reverse mapping was built correctly.
|
||||||
if (needs_reverse_mapping) {
|
if (needs_reverse_mapping) {
|
||||||
for (size_t i = 0; i < args_.size(); ++i) {
|
for (size_t i = 0; i < kv_args_.size(); ++i) {
|
||||||
DCHECK_EQ(args_[i], ArgS(full_args_, reverse_index_[i])) << full_args_;
|
DCHECK_EQ(kv_args_[i], ArgS(full_args_, reverse_index_[i])) << full_args_;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -366,7 +366,7 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) {
|
||||||
}
|
}
|
||||||
|
|
||||||
DCHECK_EQ(unique_shard_cnt_, 0u);
|
DCHECK_EQ(unique_shard_cnt_, 0u);
|
||||||
DCHECK(args_.empty());
|
DCHECK(kv_args_.empty());
|
||||||
|
|
||||||
OpResult<KeyIndex> key_index = DetermineKeys(cid_, args);
|
OpResult<KeyIndex> key_index = DetermineKeys(cid_, args);
|
||||||
if (!key_index)
|
if (!key_index)
|
||||||
|
@ -448,7 +448,7 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) {
|
||||||
unique_shard_id_ = 0;
|
unique_shard_id_ = 0;
|
||||||
unique_shard_cnt_ = 0;
|
unique_shard_cnt_ = 0;
|
||||||
|
|
||||||
args_.clear();
|
kv_args_.clear();
|
||||||
reverse_index_.clear();
|
reverse_index_.clear();
|
||||||
|
|
||||||
cid_ = cid;
|
cid_ = cid;
|
||||||
|
@ -805,10 +805,10 @@ void Transaction::UnlockMulti() {
|
||||||
|
|
||||||
multi_->frozen_keys_set.clear();
|
multi_->frozen_keys_set.clear();
|
||||||
|
|
||||||
auto sharded_keys = make_shared<vector<KeyList>>(shard_set->size());
|
auto sharded_keys = make_shared<vector<vector<string_view>>>(shard_set->size());
|
||||||
for (string& key : multi_->frozen_keys) {
|
for (string& key : multi_->frozen_keys) {
|
||||||
ShardId sid = Shard(key, sharded_keys->size());
|
ShardId sid = Shard(key, sharded_keys->size());
|
||||||
(*sharded_keys)[sid].emplace_back(std::move(key));
|
(*sharded_keys)[sid].emplace_back(key);
|
||||||
}
|
}
|
||||||
|
|
||||||
unsigned shard_journals_cnt =
|
unsigned shard_journals_cnt =
|
||||||
|
@ -1199,11 +1199,11 @@ ArgSlice Transaction::GetShardArgs(ShardId sid) const {
|
||||||
// We can read unique_shard_cnt_ only because ShardArgsInShard is called after IsArmedInShard
|
// We can read unique_shard_cnt_ only because ShardArgsInShard is called after IsArmedInShard
|
||||||
// barrier.
|
// barrier.
|
||||||
if (unique_shard_cnt_ == 1) {
|
if (unique_shard_cnt_ == 1) {
|
||||||
return args_;
|
return kv_args_;
|
||||||
}
|
}
|
||||||
|
|
||||||
const auto& sd = shard_data_[sid];
|
const auto& sd = shard_data_[sid];
|
||||||
return ArgSlice{args_.data() + sd.arg_start, sd.arg_count};
|
return ArgSlice{kv_args_.data() + sd.arg_start, sd.arg_count};
|
||||||
}
|
}
|
||||||
|
|
||||||
// from local index back to original arg index skipping the command.
|
// from local index back to original arg index skipping the command.
|
||||||
|
@ -1319,8 +1319,8 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Transaction::UnlockMultiShardCb(const KeyList& sharded_keys, EngineShard* shard,
|
void Transaction::UnlockMultiShardCb(absl::Span<const std::string_view> sharded_keys,
|
||||||
uint32_t shard_journals_cnt) {
|
EngineShard* shard, uint32_t shard_journals_cnt) {
|
||||||
DCHECK(multi_ && multi_->lock_mode);
|
DCHECK(multi_ && multi_->lock_mode);
|
||||||
|
|
||||||
auto journal = shard->journal();
|
auto journal = shard->journal();
|
||||||
|
@ -1334,7 +1334,7 @@ void Transaction::UnlockMultiShardCb(const KeyList& sharded_keys, EngineShard* s
|
||||||
shard->shard_lock()->Release(IntentLock::EXCLUSIVE);
|
shard->shard_lock()->Release(IntentLock::EXCLUSIVE);
|
||||||
} else {
|
} else {
|
||||||
for (const auto& key : sharded_keys) {
|
for (const auto& key : sharded_keys) {
|
||||||
shard->db_slice().ReleaseNormalized(*multi_->lock_mode, db_index_, key, 1);
|
shard->db_slice().ReleaseNormalized(*multi_->lock_mode, db_index_, key);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1476,7 +1476,7 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult resul
|
||||||
journal::Entry::Payload entry_payload;
|
journal::Entry::Payload entry_payload;
|
||||||
|
|
||||||
string_view cmd{cid_->name()};
|
string_view cmd{cid_->name()};
|
||||||
if (unique_shard_cnt_ == 1 || args_.empty()) {
|
if (unique_shard_cnt_ == 1 || kv_args_.empty()) {
|
||||||
entry_payload = make_pair(cmd, full_args_);
|
entry_payload = make_pair(cmd, full_args_);
|
||||||
} else {
|
} else {
|
||||||
entry_payload = make_pair(cmd, GetShardArgs(shard->shard_id()));
|
entry_payload = make_pair(cmd, GetShardArgs(shard->shard_id()));
|
||||||
|
|
|
@ -362,9 +362,6 @@ class Transaction {
|
||||||
unsigned cnt[2] = {0, 0};
|
unsigned cnt[2] = {0, 0};
|
||||||
};
|
};
|
||||||
|
|
||||||
// owned std::string because callbacks its used in run fully async and can outlive the entries.
|
|
||||||
using KeyList = std::vector<std::string>;
|
|
||||||
|
|
||||||
struct alignas(64) PerShardData {
|
struct alignas(64) PerShardData {
|
||||||
PerShardData(PerShardData&&) noexcept {
|
PerShardData(PerShardData&&) noexcept {
|
||||||
}
|
}
|
||||||
|
@ -489,7 +486,7 @@ class Transaction {
|
||||||
// Run callback inline as part of multi stub.
|
// Run callback inline as part of multi stub.
|
||||||
OpStatus RunSquashedMultiCb(RunnableType cb);
|
OpStatus RunSquashedMultiCb(RunnableType cb);
|
||||||
|
|
||||||
void UnlockMultiShardCb(const KeyList& sharded_keys, EngineShard* shard,
|
void UnlockMultiShardCb(absl::Span<const std::string_view> sharded_keys, EngineShard* shard,
|
||||||
uint32_t shard_journals_cnt);
|
uint32_t shard_journals_cnt);
|
||||||
|
|
||||||
// In a multi-command transaction, we determine the number of shard journals that we wrote entries
|
// In a multi-command transaction, we determine the number of shard journals that we wrote entries
|
||||||
|
@ -563,8 +560,10 @@ class Transaction {
|
||||||
// Never access directly with index, always use SidToId.
|
// Never access directly with index, always use SidToId.
|
||||||
absl::InlinedVector<PerShardData, 4> shard_data_; // length = shard_count
|
absl::InlinedVector<PerShardData, 4> shard_data_; // length = shard_count
|
||||||
|
|
||||||
// Stores arguments of the transaction (i.e. keys + values) partitioned by shards.
|
// Stores keys/values of the transaction partitioned by shards.
|
||||||
absl::InlinedVector<std::string_view, 4> args_;
|
// We need values as well since we reorder keys, and we need to know what value corresponds
|
||||||
|
// to what key.
|
||||||
|
absl::InlinedVector<std::string_view, 4> kv_args_;
|
||||||
|
|
||||||
// Stores the full undivided command.
|
// Stores the full undivided command.
|
||||||
CmdArgList full_args_;
|
CmdArgList full_args_;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue