mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 09:55:45 +02:00
chore: get rid of kv_args and replace it with slices to full_args (#3101)
* Revert "Revert "chore: get rid of kv_args and replace it with slices to full_… (#3024)"
This reverts commit 25e6930ac3
.
Fixing the performance bug caused by applying equality operator
on two spans inside ShardArgs::Iterator::operator==
---------
Signed-off-by: Roman Gershman <roman@dragonflydb.io>
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
Co-authored-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
1fb250b64f
commit
9f09104c61
14 changed files with 233 additions and 185 deletions
|
@ -184,12 +184,13 @@ void Transaction::InitGlobal() {
|
|||
void Transaction::BuildShardIndex(const KeyIndex& key_index, std::vector<PerShardCache>* out) {
|
||||
auto& shard_index = *out;
|
||||
|
||||
auto add = [this, rev_mapping = key_index.has_reverse_mapping, &shard_index](uint32_t sid,
|
||||
uint32_t i) {
|
||||
string_view val = ArgS(full_args_, i);
|
||||
shard_index[sid].args.push_back(val);
|
||||
if (rev_mapping)
|
||||
shard_index[sid].original_index.push_back(i);
|
||||
auto add = [this, &shard_index](uint32_t sid, uint32_t b, uint32_t e) {
|
||||
auto& slices = shard_index[sid].slices;
|
||||
if (!slices.empty() && slices.back().second == b) {
|
||||
slices.back().second = e;
|
||||
} else {
|
||||
slices.emplace_back(b, e);
|
||||
}
|
||||
};
|
||||
|
||||
if (key_index.bonus) {
|
||||
|
@ -197,47 +198,39 @@ void Transaction::BuildShardIndex(const KeyIndex& key_index, std::vector<PerShar
|
|||
string_view key = ArgS(full_args_, *key_index.bonus);
|
||||
unique_slot_checker_.Add(key);
|
||||
uint32_t sid = Shard(key, shard_data_.size());
|
||||
add(sid, *key_index.bonus);
|
||||
add(sid, *key_index.bonus, *key_index.bonus + 1);
|
||||
}
|
||||
|
||||
for (unsigned i = key_index.start; i < key_index.end; ++i) {
|
||||
for (unsigned i = key_index.start; i < key_index.end; i += key_index.step) {
|
||||
string_view key = ArgS(full_args_, i);
|
||||
unique_slot_checker_.Add(key);
|
||||
uint32_t sid = Shard(key, shard_data_.size());
|
||||
shard_index[sid].key_step = key_index.step;
|
||||
|
||||
add(sid, i);
|
||||
// Handle values associated with preceding key.
|
||||
for (unsigned j = 1; j < key_index.step; ++j) {
|
||||
add(sid, ++i);
|
||||
}
|
||||
add(sid, i, i + key_index.step);
|
||||
}
|
||||
}
|
||||
|
||||
void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, size_t num_args,
|
||||
bool rev_mapping) {
|
||||
kv_args_.reserve(num_args);
|
||||
void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, size_t num_args) {
|
||||
args_slices_.reserve(num_args);
|
||||
DCHECK(kv_fp_.empty());
|
||||
kv_fp_.reserve(num_args);
|
||||
|
||||
if (rev_mapping)
|
||||
reverse_index_.reserve(num_args);
|
||||
|
||||
// Store the concatenated per-shard arguments from the shard index inside kv_args_
|
||||
// and make each shard data point to its own sub-span inside kv_args_.
|
||||
for (size_t i = 0; i < shard_data_.size(); ++i) {
|
||||
auto& sd = shard_data_[i];
|
||||
const auto& si = shard_index[i];
|
||||
const auto& src = shard_index[i];
|
||||
|
||||
sd.arg_count = si.args.size();
|
||||
sd.arg_start = kv_args_.size();
|
||||
sd.slice_count = src.slices.size();
|
||||
sd.slice_start = args_slices_.size();
|
||||
sd.fp_start = kv_fp_.size();
|
||||
sd.fp_count = 0;
|
||||
|
||||
// Multi transactions can re-initialize on different shards, so clear ACTIVE flag.
|
||||
DCHECK_EQ(sd.local_mask & ACTIVE, 0);
|
||||
|
||||
if (sd.arg_count == 0)
|
||||
if (sd.slice_count == 0)
|
||||
continue;
|
||||
|
||||
sd.local_mask |= ACTIVE;
|
||||
|
@ -245,19 +238,16 @@ void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, siz
|
|||
unique_shard_cnt_++;
|
||||
unique_shard_id_ = i;
|
||||
|
||||
for (size_t j = 0; j < si.args.size(); ++j) {
|
||||
string_view arg = si.args[j];
|
||||
kv_args_.push_back(arg);
|
||||
if (si.key_step == 1 || j % si.key_step == 0) {
|
||||
kv_fp_.push_back(LockTag(arg).Fingerprint());
|
||||
for (size_t j = 0; j < src.slices.size(); ++j) {
|
||||
IndexSlice slice = src.slices[j];
|
||||
args_slices_.push_back(slice);
|
||||
for (uint32_t k = slice.first; k < slice.second; k += src.key_step) {
|
||||
string_view key = ArgS(full_args_, k);
|
||||
kv_fp_.push_back(LockTag(key).Fingerprint());
|
||||
sd.fp_count++;
|
||||
}
|
||||
if (rev_mapping)
|
||||
reverse_index_.push_back(si.original_index[j]);
|
||||
}
|
||||
}
|
||||
|
||||
DCHECK_EQ(kv_args_.size(), num_args);
|
||||
}
|
||||
|
||||
void Transaction::PrepareMultiFps(CmdArgList keys) {
|
||||
|
@ -277,22 +267,13 @@ void Transaction::PrepareMultiFps(CmdArgList keys) {
|
|||
void Transaction::StoreKeysInArgs(const KeyIndex& key_index) {
|
||||
DCHECK(!key_index.bonus);
|
||||
DCHECK(kv_fp_.empty());
|
||||
DCHECK(args_slices_.empty());
|
||||
|
||||
// even for a single key we may have multiple arguments per key (MSET).
|
||||
args_slices_.emplace_back(key_index.start, key_index.end);
|
||||
for (unsigned j = key_index.start; j < key_index.end; j += key_index.step) {
|
||||
string_view arg = ArgS(full_args_, j);
|
||||
kv_args_.push_back(arg);
|
||||
kv_fp_.push_back(LockTag(arg).Fingerprint());
|
||||
|
||||
for (unsigned k = j + 1; k < j + key_index.step; ++k)
|
||||
kv_args_.push_back(ArgS(full_args_, k));
|
||||
}
|
||||
|
||||
if (key_index.has_reverse_mapping) {
|
||||
reverse_index_.resize(kv_args_.size());
|
||||
for (unsigned j = 0; j < reverse_index_.size(); ++j) {
|
||||
reverse_index_[j] = j + key_index.start;
|
||||
}
|
||||
string_view key = ArgS(full_args_, j);
|
||||
kv_fp_.push_back(LockTag(key).Fingerprint());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -314,7 +295,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
|
|||
StoreKeysInArgs(key_index);
|
||||
|
||||
unique_shard_cnt_ = 1;
|
||||
string_view akey = kv_args_.front();
|
||||
string_view akey = ArgS(full_args_, key_index.start);
|
||||
if (is_stub) // stub transactions don't migrate
|
||||
DCHECK_EQ(unique_shard_id_, Shard(akey, shard_set->size()));
|
||||
else {
|
||||
|
@ -340,11 +321,11 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
|
|||
BuildShardIndex(key_index, &shard_index);
|
||||
|
||||
// Initialize shard data based on distributed arguments.
|
||||
InitShardData(shard_index, key_index.num_args(), key_index.has_reverse_mapping);
|
||||
InitShardData(shard_index, key_index.num_args());
|
||||
|
||||
DCHECK(!multi_ || multi_->mode != LOCK_AHEAD || !multi_->tag_fps.empty());
|
||||
|
||||
DVLOG(1) << "InitByArgs " << DebugId() << " " << kv_args_.front();
|
||||
DVLOG(1) << "InitByArgs " << DebugId() << facade::ToSV(full_args_.front());
|
||||
|
||||
// Compress shard data, if we occupy only one shard.
|
||||
if (unique_shard_cnt_ == 1) {
|
||||
|
@ -357,15 +338,8 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
|
|||
sd = &shard_data_.front();
|
||||
sd->local_mask |= ACTIVE;
|
||||
}
|
||||
sd->arg_count = -1;
|
||||
sd->arg_start = -1;
|
||||
}
|
||||
|
||||
// Validation. Check reverse mapping was built correctly.
|
||||
if (key_index.has_reverse_mapping) {
|
||||
for (size_t i = 0; i < kv_args_.size(); ++i) {
|
||||
DCHECK_EQ(kv_args_[i], ArgS(full_args_, reverse_index_[i])) << full_args_;
|
||||
}
|
||||
sd->slice_count = -1;
|
||||
sd->slice_start = -1;
|
||||
}
|
||||
|
||||
// Validation.
|
||||
|
@ -396,7 +370,7 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) {
|
|||
}
|
||||
|
||||
DCHECK_EQ(unique_shard_cnt_, 0u);
|
||||
DCHECK(kv_args_.empty());
|
||||
DCHECK(args_slices_.empty());
|
||||
DCHECK(kv_fp_.empty());
|
||||
|
||||
OpResult<KeyIndex> key_index = DetermineKeys(cid_, args);
|
||||
|
@ -427,8 +401,8 @@ void Transaction::PrepareSquashedMultiHop(const CommandId* cid,
|
|||
} else {
|
||||
shard_data_[i].local_mask &= ~ACTIVE;
|
||||
}
|
||||
shard_data_[i].arg_start = 0;
|
||||
shard_data_[i].arg_count = 0;
|
||||
shard_data_[i].slice_start = 0;
|
||||
shard_data_[i].slice_count = 0;
|
||||
}
|
||||
|
||||
MultiBecomeSquasher();
|
||||
|
@ -485,15 +459,14 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) {
|
|||
unique_shard_id_ = 0;
|
||||
unique_shard_cnt_ = 0;
|
||||
|
||||
kv_args_.clear();
|
||||
args_slices_.clear();
|
||||
kv_fp_.clear();
|
||||
reverse_index_.clear();
|
||||
|
||||
cid_ = cid;
|
||||
cb_ptr_ = nullptr;
|
||||
|
||||
for (auto& sd : shard_data_) {
|
||||
sd.arg_count = sd.arg_start = 0;
|
||||
sd.slice_count = sd.slice_start = 0;
|
||||
|
||||
if (multi_->mode == NON_ATOMIC) {
|
||||
sd.local_mask = 0; // Non atomic transactions schedule each time, so remove all flags
|
||||
|
@ -555,7 +528,6 @@ void Transaction::PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdA
|
|||
EnableShard(sid);
|
||||
OpResult<KeyIndex> key_index = DetermineKeys(cid_, args);
|
||||
CHECK(key_index);
|
||||
DCHECK(!key_index->has_reverse_mapping);
|
||||
StoreKeysInArgs(*key_index);
|
||||
}
|
||||
|
||||
|
@ -1181,23 +1153,12 @@ ShardArgs Transaction::GetShardArgs(ShardId sid) const {
|
|||
// We can read unique_shard_cnt_ only because ShardArgsInShard is called after IsArmedInShard
|
||||
// barrier.
|
||||
if (unique_shard_cnt_ == 1) {
|
||||
return kv_args_;
|
||||
return ShardArgs{full_args_, absl::MakeSpan(args_slices_)};
|
||||
}
|
||||
|
||||
const auto& sd = shard_data_[sid];
|
||||
return ShardArgs{kv_args_.data() + sd.arg_start, sd.arg_count};
|
||||
}
|
||||
|
||||
// from local index back to original arg index skipping the command.
|
||||
// i.e. returns (first_key_pos -1) or bigger.
|
||||
size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const {
|
||||
DCHECK_LT(arg_index, reverse_index_.size());
|
||||
|
||||
if (unique_shard_cnt_ == 1)
|
||||
return reverse_index_[arg_index];
|
||||
|
||||
const auto& sd = shard_data_[shard_id];
|
||||
return reverse_index_[sd.arg_start + arg_index];
|
||||
return ShardArgs{full_args_,
|
||||
absl::MakeSpan(args_slices_.data() + sd.slice_start, sd.slice_count)};
|
||||
}
|
||||
|
||||
OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider,
|
||||
|
@ -1373,7 +1334,7 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid, string_view
|
|||
// Change state to awaked and store index of awakened key
|
||||
sd.local_mask &= ~SUSPENDED_Q;
|
||||
sd.local_mask |= AWAKED_Q;
|
||||
sd.wake_key_pos = it - args.begin();
|
||||
sd.wake_key_pos = it.index();
|
||||
|
||||
blocking_barrier_.Close();
|
||||
return true;
|
||||
|
@ -1384,8 +1345,8 @@ optional<string_view> Transaction::GetWakeKey(ShardId sid) const {
|
|||
if ((sd.local_mask & AWAKED_Q) == 0)
|
||||
return nullopt;
|
||||
|
||||
CHECK_NE(sd.wake_key_pos, UINT16_MAX);
|
||||
return GetShardArgs(sid).at(sd.wake_key_pos);
|
||||
CHECK_LT(sd.wake_key_pos, full_args_.size());
|
||||
return ArgS(full_args_, sd.wake_key_pos);
|
||||
}
|
||||
|
||||
void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult result) {
|
||||
|
@ -1424,10 +1385,11 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult resul
|
|||
|
||||
journal::Entry::Payload entry_payload;
|
||||
string_view cmd{cid_->name()};
|
||||
if (unique_shard_cnt_ == 1 || kv_args_.empty()) {
|
||||
if (unique_shard_cnt_ == 1 || args_slices_.empty()) {
|
||||
entry_payload = journal::Entry::Payload(cmd, full_args_);
|
||||
} else {
|
||||
entry_payload = journal::Entry::Payload(cmd, GetShardArgs(shard->shard_id()).AsSlice());
|
||||
ShardArgs shard_args = GetShardArgs(shard->shard_id());
|
||||
entry_payload = journal::Entry::Payload(cmd, shard_args);
|
||||
}
|
||||
// Record to journal autojournal commands, here we allow await which anables writing to sync
|
||||
// the journal change.
|
||||
|
@ -1516,10 +1478,6 @@ OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
|
|||
|
||||
int num_custom_keys = -1;
|
||||
|
||||
if (cid->opt_mask() & CO::REVERSE_MAPPING) {
|
||||
key_index.has_reverse_mapping = true;
|
||||
}
|
||||
|
||||
if (cid->opt_mask() & CO::VARIADIC_KEYS) {
|
||||
// ZUNION/INTER <num_keys> <key1> [<key2> ...]
|
||||
// EVAL <script> <num_keys>
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue