mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 09:55:45 +02:00
chore: Make KeyIndex iterable (#3326)
This commit is contained in:
parent
2b54fd985f
commit
be59b5eeb4
8 changed files with 107 additions and 137 deletions
|
@ -191,32 +191,23 @@ void Transaction::InitGlobal() {
|
|||
}
|
||||
|
||||
void Transaction::BuildShardIndex(const KeyIndex& key_index, std::vector<PerShardCache>* out) {
|
||||
// Because of the way we iterate in InitShardData
|
||||
DCHECK(!key_index.bonus || key_index.step == 1);
|
||||
|
||||
auto& shard_index = *out;
|
||||
|
||||
auto add = [&shard_index](uint32_t sid, uint32_t b, uint32_t e) {
|
||||
auto& slices = shard_index[sid].slices;
|
||||
if (!slices.empty() && slices.back().second == b) {
|
||||
slices.back().second = e;
|
||||
} else {
|
||||
slices.emplace_back(b, e);
|
||||
}
|
||||
};
|
||||
|
||||
if (key_index.bonus) {
|
||||
DCHECK(key_index.step == 1);
|
||||
string_view key = ArgS(full_args_, *key_index.bonus);
|
||||
unique_slot_checker_.Add(key);
|
||||
uint32_t sid = Shard(key, shard_data_.size());
|
||||
add(sid, *key_index.bonus, *key_index.bonus + 1);
|
||||
}
|
||||
|
||||
for (unsigned i = key_index.start; i < key_index.end; i += key_index.step) {
|
||||
for (unsigned i : key_index.Range()) {
|
||||
string_view key = ArgS(full_args_, i);
|
||||
unique_slot_checker_.Add(key);
|
||||
uint32_t sid = Shard(key, shard_data_.size());
|
||||
shard_index[sid].key_step = key_index.step;
|
||||
ShardId sid = Shard(key, shard_data_.size());
|
||||
|
||||
add(sid, i, i + key_index.step);
|
||||
unsigned step = key_index.bonus ? 1 : key_index.step;
|
||||
shard_index[sid].key_step = step;
|
||||
auto& slices = shard_index[sid].slices;
|
||||
if (!slices.empty() && slices.back().second == i) {
|
||||
slices.back().second = i + step;
|
||||
} else {
|
||||
slices.emplace_back(i, i + step);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -247,11 +238,9 @@ void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, siz
|
|||
unique_shard_cnt_++;
|
||||
unique_shard_id_ = i;
|
||||
|
||||
for (size_t j = 0; j < src.slices.size(); ++j) {
|
||||
IndexSlice slice = src.slices[j];
|
||||
args_slices_.push_back(slice);
|
||||
for (uint32_t k = slice.first; k < slice.second; k += src.key_step) {
|
||||
string_view key = ArgS(full_args_, k);
|
||||
for (const auto& [start, end] : src.slices) {
|
||||
args_slices_.emplace_back(start, end);
|
||||
for (string_view key : KeyIndex(start, end, src.key_step).Range(full_args_)) {
|
||||
kv_fp_.push_back(LockTag(key).Fingerprint());
|
||||
sd.fp_count++;
|
||||
}
|
||||
|
@ -279,10 +268,8 @@ void Transaction::StoreKeysInArgs(const KeyIndex& key_index) {
|
|||
|
||||
// even for a single key we may have multiple arguments per key (MSET).
|
||||
args_slices_.emplace_back(key_index.start, key_index.end);
|
||||
for (unsigned j = key_index.start; j < key_index.end; j += key_index.step) {
|
||||
string_view key = ArgS(full_args_, j);
|
||||
for (string_view key : key_index.Range(full_args_))
|
||||
kv_fp_.push_back(LockTag(key).Fingerprint());
|
||||
}
|
||||
}
|
||||
|
||||
void Transaction::InitByKeys(const KeyIndex& key_index) {
|
||||
|
@ -296,14 +283,14 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
|
|||
// Stub transactions always operate only on single shard.
|
||||
bool is_stub = multi_ && multi_->role == SQUASHED_STUB;
|
||||
|
||||
if ((key_index.HasSingleKey() && !IsAtomicMulti()) || is_stub) {
|
||||
if ((key_index.NumArgs() == 1 && !IsAtomicMulti()) || is_stub) {
|
||||
DCHECK(!IsActiveMulti() || multi_->mode == NON_ATOMIC);
|
||||
|
||||
// We don't have to split the arguments by shards, so we can copy them directly.
|
||||
StoreKeysInArgs(key_index);
|
||||
|
||||
unique_shard_cnt_ = 1;
|
||||
string_view akey = ArgS(full_args_, key_index.start);
|
||||
string_view akey = *key_index.Range(full_args_).begin();
|
||||
if (is_stub) // stub transactions don't migrate
|
||||
DCHECK_EQ(unique_shard_id_, Shard(akey, shard_set->size()));
|
||||
else {
|
||||
|
@ -329,7 +316,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
|
|||
BuildShardIndex(key_index, &shard_index);
|
||||
|
||||
// Initialize shard data based on distributed arguments.
|
||||
InitShardData(shard_index, key_index.num_args());
|
||||
InitShardData(shard_index, key_index.NumArgs());
|
||||
|
||||
DCHECK(!multi_ || multi_->mode != LOCK_AHEAD || !multi_->tag_fps.empty());
|
||||
|
||||
|
@ -441,7 +428,7 @@ void Transaction::StartMultiLockedAhead(Namespace* ns, DbIndex dbid, CmdArgList
|
|||
PrepareMultiFps(keys);
|
||||
|
||||
InitBase(ns, dbid, keys);
|
||||
InitByKeys(KeyIndex::Range(0, keys.size()));
|
||||
InitByKeys(KeyIndex(0, keys.size()));
|
||||
|
||||
if (!skip_scheduling)
|
||||
ScheduleInternal();
|
||||
|
@ -1433,23 +1420,24 @@ bool Transaction::CanRunInlined() const {
|
|||
}
|
||||
|
||||
OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
|
||||
KeyIndex key_index;
|
||||
|
||||
if (cid->opt_mask() & (CO::GLOBAL_TRANS | CO::NO_KEY_TRANSACTIONAL))
|
||||
return key_index;
|
||||
return KeyIndex{};
|
||||
|
||||
int num_custom_keys = -1;
|
||||
|
||||
if (cid->opt_mask() & CO::VARIADIC_KEYS) {
|
||||
unsigned start = 0, end = 0, step = 0;
|
||||
std::optional<unsigned> bonus = std::nullopt;
|
||||
|
||||
if (cid->opt_mask() & CO::VARIADIC_KEYS) { // number of keys is not trivially deducable
|
||||
// ZUNION/INTER <num_keys> <key1> [<key2> ...]
|
||||
// EVAL <script> <num_keys>
|
||||
// XREAD ... STREAMS ...
|
||||
if (args.size() < 2) {
|
||||
if (args.size() < 2)
|
||||
return OpStatus::SYNTAX_ERR;
|
||||
}
|
||||
|
||||
string_view name{cid->name()};
|
||||
|
||||
// Determine based on STREAMS argument position
|
||||
if (name == "XREAD" || name == "XREADGROUP") {
|
||||
for (size_t i = 0; i < args.size(); ++i) {
|
||||
string_view arg = ArgS(args, i);
|
||||
|
@ -1458,24 +1446,20 @@ OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
|
|||
if (left < 2 || left % 2 != 0)
|
||||
return OpStatus::SYNTAX_ERR;
|
||||
|
||||
key_index.start = i + 1;
|
||||
key_index.end = key_index.start + (left / 2);
|
||||
key_index.step = 1;
|
||||
|
||||
return key_index;
|
||||
return KeyIndex(i + 1, i + 1 + (left / 2));
|
||||
}
|
||||
}
|
||||
return OpStatus::SYNTAX_ERR;
|
||||
}
|
||||
|
||||
if (absl::EndsWith(name, "STORE"))
|
||||
key_index.bonus = 0; // Z<xxx>STORE <key> commands
|
||||
bonus = 0; // Z<xxx>STORE <key> commands
|
||||
|
||||
unsigned num_keys_index;
|
||||
if (absl::StartsWith(name, "EVAL"))
|
||||
num_keys_index = 1;
|
||||
else
|
||||
num_keys_index = key_index.bonus ? *key_index.bonus + 1 : 0;
|
||||
num_keys_index = bonus ? *bonus + 1 : 0;
|
||||
|
||||
string_view num = ArgS(args, num_keys_index);
|
||||
if (!absl::SimpleAtoi(num, &num_custom_keys) || num_custom_keys < 0)
|
||||
|
@ -1492,22 +1476,22 @@ OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
|
|||
}
|
||||
|
||||
if (cid->first_key_pos() > 0) {
|
||||
key_index.start = cid->first_key_pos() - 1;
|
||||
start = cid->first_key_pos() - 1;
|
||||
int last = cid->last_key_pos();
|
||||
|
||||
if (num_custom_keys >= 0) {
|
||||
key_index.end = key_index.start + num_custom_keys;
|
||||
end = start + num_custom_keys;
|
||||
} else {
|
||||
key_index.end = last > 0 ? last : (int(args.size()) + last + 1);
|
||||
end = last > 0 ? last : (int(args.size()) + last + 1);
|
||||
}
|
||||
if (cid->opt_mask() & CO::INTERLEAVED_KEYS) {
|
||||
if (cid->name() == "JSON.MSET") {
|
||||
key_index.step = 3;
|
||||
step = 3;
|
||||
} else {
|
||||
key_index.step = 2;
|
||||
step = 2;
|
||||
}
|
||||
} else {
|
||||
key_index.step = 1;
|
||||
step = 1;
|
||||
}
|
||||
|
||||
if (cid->opt_mask() & CO::STORE_LAST_KEY) {
|
||||
|
@ -1517,17 +1501,16 @@ OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
|
|||
// key member radius .. STORE destkey
|
||||
string_view opt = ArgS(args, args.size() - 2);
|
||||
if (absl::EqualsIgnoreCase(opt, "STORE") || absl::EqualsIgnoreCase(opt, "STOREDIST")) {
|
||||
key_index.bonus = args.size() - 1;
|
||||
bonus = args.size() - 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return key_index;
|
||||
return KeyIndex{start, end, step, bonus};
|
||||
}
|
||||
|
||||
LOG(FATAL) << "TBD: Not supported " << cid->name();
|
||||
|
||||
return key_index;
|
||||
return {};
|
||||
}
|
||||
|
||||
std::vector<Transaction::PerShardCache>& Transaction::TLTmpSpace::GetShardIndex(unsigned size) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue