Revert "chore: get rid of kv_args and replace it with slices to full_… (#3024)

Revert "chore: get rid of kv_args and replace it with slices to full_args (#2942)"

This reverts commit de0e5cb0bd.
This commit is contained in:
Roman Gershman 2024-05-08 03:01:37 +03:00 committed by GitHub
parent 5c4279c285
commit 25e6930ac3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 185 additions and 216 deletions

View file

@ -40,6 +40,9 @@ CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first
: facade::CommandId(name, mask, arity, first_key, last_key, acl_categories) {
if (mask & CO::ADMIN)
opt_mask_ |= CO::NOSCRIPT;
if (mask & CO::BLOCKING)
opt_mask_ |= CO::REVERSE_MAPPING;
}
bool CommandId::IsTransactional() const {
@ -170,6 +173,8 @@ const char* OptName(CO::CommandOpt fl) {
return "readonly";
case DENYOOM:
return "denyoom";
case REVERSE_MAPPING:
return "reverse-mapping";
case FAST:
return "fast";
case LOADING:

View file

@ -27,13 +27,16 @@ enum CommandOpt : uint32_t {
LOADING = 1U << 3, // Command allowed during LOADING state.
DENYOOM = 1U << 4, // use-memory in redis.
// UNUSED = 1U << 5,
// marked commands that demand preserve the order of keys to work correctly.
// For example, MGET needs to know the order of keys to return the values in the same order.
// BLPOP needs to know the order of keys to return the first non-empty list from the left.
REVERSE_MAPPING = 1U << 5,
VARIADIC_KEYS = 1U << 6, // arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc.
ADMIN = 1U << 7, // implies NOSCRIPT,
NOSCRIPT = 1U << 8,
BLOCKING = 1U << 9,
BLOCKING = 1U << 9, // implies REVERSE_MAPPING
HIDDEN = 1U << 10, // does not show in COMMAND command output
INTERLEAVED_KEYS = 1U << 11, // keys are interleaved with arguments
GLOBAL_TRANS = 1U << 12,

View file

@ -40,12 +40,14 @@ OpResult<std::pair<DbSlice::ConstIterator, unsigned>> FindFirstReadOnly(const Db
int req_obj_type) {
DCHECK(!args.Empty());
for (auto it = args.begin(); it != args.end(); ++it) {
OpResult<DbSlice::ConstIterator> res = db_slice.FindReadOnly(cntx, *it, req_obj_type);
unsigned i = 0;
for (string_view key : args) {
OpResult<DbSlice::ConstIterator> res = db_slice.FindReadOnly(cntx, key, req_obj_type);
if (res)
return make_pair(res.value(), unsigned(it.index()));
return make_pair(res.value(), i);
if (res.status() != OpStatus::KEY_NOTFOUND)
return res.status();
++i;
}
VLOG(2) << "FindFirst not found";
@ -117,8 +119,8 @@ OpResult<ShardFFResult> FindFirstNonEmpty(Transaction* trans, int req_obj_type)
auto comp = [trans](const OpResult<FFResult>& lhs, const OpResult<FFResult>& rhs) {
if (!lhs || !rhs)
return lhs.ok();
size_t i1 = std::get<1>(*lhs);
size_t i2 = std::get<1>(*rhs);
size_t i1 = trans->ReverseArgIndex(std::get<ShardId>(*lhs), std::get<unsigned>(*lhs));
size_t i2 = trans->ReverseArgIndex(std::get<ShardId>(*rhs), std::get<unsigned>(*rhs));
return i1 < i2;
};

View file

@ -42,8 +42,8 @@ struct Entry : public EntryBase {
struct Payload {
std::string_view cmd;
std::variant<CmdArgList, // Parts of a full command.
ShardArgs, // Shard parts.
ArgSlice>
ShardArgs // Command and its shard parts.
>
args;
Payload() = default;
@ -51,8 +51,6 @@ struct Entry : public EntryBase {
}
Payload(std::string_view c, const ShardArgs& a) : cmd(c), args(a) {
}
Payload(std::string_view c, ArgSlice a) : cmd(c), args(a) {
}
};
Entry(TxId txid, Op opcode, DbIndex dbid, uint32_t shard_cnt,

View file

@ -1543,14 +1543,12 @@ void JsonFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
continue;
vector<OptString>& res = mget_resp[sid];
ShardArgs shard_args = transaction->GetShardArgs(sid);
unsigned src_index = 0;
for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_index) {
if (!res[src_index])
for (size_t j = 0; j < res.size(); ++j) {
if (!res[j])
continue;
uint32_t dst_indx = it.index();
results[dst_indx] = std::move(res[src_index]);
uint32_t indx = transaction->ReverseArgIndex(sid, j);
results[indx] = std::move(res[j]);
}
}
@ -2093,7 +2091,8 @@ void JsonFamily::Register(CommandRegistry* registry) {
constexpr size_t kMsetFlags = CO::WRITE | CO::DENYOOM | CO::FAST | CO::INTERLEAVED_KEYS;
registry->StartFamily();
*registry << CI{"JSON.GET", CO::READONLY | CO::FAST, -2, 1, 1, acl::JSON}.HFUNC(Get);
*registry << CI{"JSON.MGET", CO::READONLY | CO::FAST, -3, 1, -2, acl::JSON}.HFUNC(MGet);
*registry << CI{"JSON.MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING, -3, 1, -2, acl::JSON}
.HFUNC(MGet);
*registry << CI{"JSON.TYPE", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(Type);
*registry << CI{"JSON.STRLEN", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(StrLen);
*registry << CI{"JSON.OBJLEN", CO::READONLY | CO::FAST, 3, 1, 1, acl::JSON}.HFUNC(ObjLen);

View file

@ -158,22 +158,6 @@ struct CircularMessages {
// Used to recover logs for BLPOP failures. See OpBPop.
thread_local CircularMessages debugMessages{50};
// A bit awkward translation from a single key to ShardArgs.
// We create a mutable slice (which will never be mutated) from the key, then we create
// a CmdArgList of size 1 that references mslice and finally
// we reference the first element in the CmdArgList via islice.
struct SingleArg {
MutableSlice mslice;
IndexSlice islice{0, 1};
SingleArg(string_view arg) : mslice(const_cast<char*>(arg.data()), arg.size()) {
}
ShardArgs Get() {
return ShardArgs{CmdArgList{&mslice, 1}, absl::MakeSpan(&islice, 1)};
}
};
class BPopPusher {
public:
BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir);
@ -464,9 +448,7 @@ OpResult<string> MoveTwoShards(Transaction* trans, string_view src, string_view
// hack, again. since we hacked which queue we are waiting on (see RunPair)
// we must clean-up src key here manually. See RunPair why we do this.
// in short- we suspended on "src" on both shards.
SingleArg single_arg{src};
shard->blocking_controller()->FinalizeWatched(single_arg.Get(), t);
shard->blocking_controller()->FinalizeWatched(ArgSlice{&src, 1}, t);
}
} else {
DVLOG(1) << "Popping value from list: " << key;
@ -891,8 +873,7 @@ OpResult<string> BPopPusher::RunSingle(ConnectionContext* cntx, time_point tp) {
return op_res;
}
SingleArg single_arg{pop_key_};
auto wcb = [&](Transaction* t, EngineShard* shard) { return single_arg.Get(); };
auto wcb = [&](Transaction* t, EngineShard* shard) { return ShardArgs{&this->pop_key_, 1}; };
const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
std::string_view key) -> bool {
@ -919,13 +900,11 @@ OpResult<string> BPopPusher::RunPair(ConnectionContext* cntx, time_point tp) {
return op_res;
}
SingleArg single_arg(this->pop_key_);
// a hack: we watch in both shards for pop_key but only in the source shard it's relevant.
// Therefore we follow the regular flow of watching the key but for the destination shard it
// will never be triggerred.
// This allows us to run Transaction::Execute on watched transactions in both shards.
auto wcb = [&](Transaction* t, EngineShard* shard) { return single_arg.Get(); };
auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; };
const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
std::string_view key) -> bool {

View file

@ -2989,19 +2989,17 @@ void XReadImpl(CmdArgList args, std::optional<ReadOpts> opts, ConnectionContext*
continue;
vector<RecordVec>& results = xread_resp[sid];
unsigned src_index = 0;
ShardArgs shard_args = cntx->transaction->GetShardArgs(sid);
for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_index) {
if (results[src_index].size() == 0) {
for (size_t i = 0; i < results.size(); ++i) {
if (results[i].size() == 0) {
continue;
}
resolved_streams++;
// Add the stream records ordered by the original stream arguments.
size_t dst_indx = it.index();
res[dst_indx - opts->streams_arg] = std::move(results[src_index]);
size_t indx = cntx->transaction->ReverseArgIndex(sid, i);
res[indx - opts->streams_arg] = std::move(results[i]);
}
}
@ -3325,7 +3323,7 @@ constexpr uint32_t kXAutoClaim = WRITE | STREAM | FAST;
void StreamFamily::Register(CommandRegistry* registry) {
using CI = CommandId;
registry->StartFamily();
constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::VARIADIC_KEYS;
constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS;
*registry << CI{"XADD", CO::WRITE | CO::DENYOOM | CO::FAST, -5, 1, 1, acl::kXAdd}.HFUNC(XAdd)
<< CI{"XCLAIM", CO::WRITE | CO::FAST, -6, 1, 1, acl::kXClaim}.HFUNC(XClaim)
<< CI{"XDEL", CO::WRITE | CO::FAST, -3, 1, 1, acl::kXDel}.HFUNC(XDel)

View file

@ -270,7 +270,6 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success)
SetCmd sg(op_args, false);
size_t index = 0;
bool partial = false;
for (auto it = args.begin(); it != args.end(); ++it) {
string_view key = *it;
++it;
@ -278,7 +277,6 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success)
DVLOG(1) << "MSet " << key << ":" << value;
if (sg.Set(params, key, value) != OpStatus::OK) { // OOM for example.
success->store(false);
partial = true;
break;
}
index += 2;
@ -287,29 +285,18 @@ void OpMSet(const OpArgs& op_args, const ShardArgs& args, atomic_bool* success)
if (auto journal = op_args.shard->journal(); journal) {
// We write a custom journal because an OOM in the above loop could lead to partial success, so
// we replicate only what was changed.
if (partial) {
string_view cmd;
ArgSlice cmd_args;
vector<string_view> store_args(index);
if (index == 0) {
// All shards must record the tx was executed for the replica to execute it, so we send a
// PING in case nothing was changed
// All shards must record the tx was executed for the replica to execute it, so we send a PING
// in case nothing was changed
cmd = "PING";
} else {
// journal [0, i)
cmd = "MSET";
unsigned i = 0;
for (string_view arg : args) {
store_args[i++] = arg;
if (i >= store_args.size())
break;
}
cmd_args = absl::MakeSpan(store_args);
cmd_args = ArgSlice(args.begin(), index);
}
RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt());
} else {
RecordJournal(op_args, "MSET", args, op_args.tx->GetUniqueShardCnt());
}
}
}
@ -1179,17 +1166,16 @@ void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
src.storage_list->next = res.storage_list;
res.storage_list = src.storage_list;
src.storage_list = nullptr;
ShardArgs shard_args = transaction->GetShardArgs(sid);
unsigned src_indx = 0;
for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_indx) {
if (!src.resp_arr[src_indx])
for (size_t j = 0; j < src.resp_arr.size(); ++j) {
if (!src.resp_arr[j])
continue;
uint32_t indx = it.index();
uint32_t indx = transaction->ReverseArgIndex(sid, j);
res.resp_arr[indx] = std::move(src.resp_arr[src_indx]);
res.resp_arr[indx] = std::move(src.resp_arr[j]);
if (cntx->protocol() == Protocol::MEMCACHE) {
res.resp_arr[indx]->key = *it;
res.resp_arr[indx]->key = ArgS(args, indx);
}
}
}
@ -1505,7 +1491,9 @@ void StringFamily::Register(CommandRegistry* registry) {
<< CI{"GETEX", CO::WRITE | CO::DENYOOM | CO::FAST | CO::NO_AUTOJOURNAL, -1, 1, 1, acl::kGetEx}
.HFUNC(GetEx)
<< CI{"GETSET", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, acl::kGetSet}.HFUNC(GetSet)
<< CI{"MGET", CO::READONLY | CO::FAST | CO::IDEMPOTENT, -2, 1, -1, acl::kMGet}.HFUNC(MGet)
<< CI{"MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING | CO::IDEMPOTENT, -2, 1, -1,
acl::kMGet}
.HFUNC(MGet)
<< CI{"MSET", kMSetMask, -3, 1, -1, acl::kMSet}.HFUNC(MSet)
<< CI{"MSETNX", kMSetMask, -3, 1, -1, acl::kMSetNx}.HFUNC(MSetNx)
<< CI{"STRLEN", CO::READONLY | CO::FAST, 2, 1, 1, acl::kStrLen}.HFUNC(StrLen)

View file

@ -184,13 +184,12 @@ void Transaction::InitGlobal() {
void Transaction::BuildShardIndex(const KeyIndex& key_index, std::vector<PerShardCache>* out) {
auto& shard_index = *out;
auto add = [this, &shard_index](uint32_t sid, uint32_t b, uint32_t e) {
auto& slices = shard_index[sid].slices;
if (!slices.empty() && slices.back().second == b) {
slices.back().second = e;
} else {
slices.emplace_back(b, e);
}
auto add = [this, rev_mapping = key_index.has_reverse_mapping, &shard_index](uint32_t sid,
uint32_t i) {
string_view val = ArgS(full_args_, i);
shard_index[sid].args.push_back(val);
if (rev_mapping)
shard_index[sid].original_index.push_back(i);
};
if (key_index.bonus) {
@ -198,39 +197,47 @@ void Transaction::BuildShardIndex(const KeyIndex& key_index, std::vector<PerShar
string_view key = ArgS(full_args_, *key_index.bonus);
unique_slot_checker_.Add(key);
uint32_t sid = Shard(key, shard_data_.size());
add(sid, *key_index.bonus, *key_index.bonus + 1);
add(sid, *key_index.bonus);
}
for (unsigned i = key_index.start; i < key_index.end; i += key_index.step) {
for (unsigned i = key_index.start; i < key_index.end; ++i) {
string_view key = ArgS(full_args_, i);
unique_slot_checker_.Add(key);
uint32_t sid = Shard(key, shard_data_.size());
shard_index[sid].key_step = key_index.step;
add(sid, i, i + key_index.step);
add(sid, i);
// Handle values associated with preceding key.
for (unsigned j = 1; j < key_index.step; ++j) {
add(sid, ++i);
}
}
}
void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, size_t num_args) {
args_slices_.reserve(num_args);
void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, size_t num_args,
bool rev_mapping) {
kv_args_.reserve(num_args);
DCHECK(kv_fp_.empty());
kv_fp_.reserve(num_args);
if (rev_mapping)
reverse_index_.reserve(num_args);
// Store the concatenated per-shard arguments from the shard index inside kv_args_
// and make each shard data point to its own sub-span inside kv_args_.
for (size_t i = 0; i < shard_data_.size(); ++i) {
auto& sd = shard_data_[i];
const auto& src = shard_index[i];
const auto& si = shard_index[i];
sd.slice_count = src.slices.size();
sd.slice_start = args_slices_.size();
sd.arg_count = si.args.size();
sd.arg_start = kv_args_.size();
sd.fp_start = kv_fp_.size();
sd.fp_count = 0;
// Multi transactions can re-initialize on different shards, so clear ACTIVE flag.
DCHECK_EQ(sd.local_mask & ACTIVE, 0);
if (sd.slice_count == 0)
if (sd.arg_count == 0)
continue;
sd.local_mask |= ACTIVE;
@ -238,16 +245,19 @@ void Transaction::InitShardData(absl::Span<const PerShardCache> shard_index, siz
unique_shard_cnt_++;
unique_shard_id_ = i;
for (size_t j = 0; j < src.slices.size(); ++j) {
IndexSlice slice = src.slices[j];
args_slices_.push_back(slice);
for (uint32_t k = slice.first; k < slice.second; k += src.key_step) {
string_view key = ArgS(full_args_, k);
kv_fp_.push_back(LockTag(key).Fingerprint());
for (size_t j = 0; j < si.args.size(); ++j) {
string_view arg = si.args[j];
kv_args_.push_back(arg);
if (si.key_step == 1 || j % si.key_step == 0) {
kv_fp_.push_back(LockTag(arg).Fingerprint());
sd.fp_count++;
}
if (rev_mapping)
reverse_index_.push_back(si.original_index[j]);
}
}
DCHECK_EQ(kv_args_.size(), num_args);
}
void Transaction::PrepareMultiFps(CmdArgList keys) {
@ -267,13 +277,22 @@ void Transaction::PrepareMultiFps(CmdArgList keys) {
void Transaction::StoreKeysInArgs(const KeyIndex& key_index) {
DCHECK(!key_index.bonus);
DCHECK(kv_fp_.empty());
DCHECK(args_slices_.empty());
// even for a single key we may have multiple arguments per key (MSET).
args_slices_.emplace_back(key_index.start, key_index.end);
for (unsigned j = key_index.start; j < key_index.end; j += key_index.step) {
string_view key = ArgS(full_args_, j);
kv_fp_.push_back(LockTag(key).Fingerprint());
string_view arg = ArgS(full_args_, j);
kv_args_.push_back(arg);
kv_fp_.push_back(LockTag(arg).Fingerprint());
for (unsigned k = j + 1; k < j + key_index.step; ++k)
kv_args_.push_back(ArgS(full_args_, k));
}
if (key_index.has_reverse_mapping) {
reverse_index_.resize(kv_args_.size());
for (unsigned j = 0; j < reverse_index_.size(); ++j) {
reverse_index_[j] = j + key_index.start;
}
}
}
@ -295,7 +314,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
StoreKeysInArgs(key_index);
unique_shard_cnt_ = 1;
string_view akey = ArgS(full_args_, key_index.start);
string_view akey = kv_args_.front();
if (is_stub) // stub transactions don't migrate
DCHECK_EQ(unique_shard_id_, Shard(akey, shard_set->size()));
else {
@ -321,11 +340,11 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
BuildShardIndex(key_index, &shard_index);
// Initialize shard data based on distributed arguments.
InitShardData(shard_index, key_index.num_args());
InitShardData(shard_index, key_index.num_args(), key_index.has_reverse_mapping);
DCHECK(!multi_ || multi_->mode != LOCK_AHEAD || !multi_->tag_fps.empty());
DVLOG(1) << "InitByArgs " << DebugId() << facade::ToSV(full_args_.front());
DVLOG(1) << "InitByArgs " << DebugId() << " " << kv_args_.front();
// Compress shard data, if we occupy only one shard.
if (unique_shard_cnt_ == 1) {
@ -338,8 +357,15 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
sd = &shard_data_.front();
sd->local_mask |= ACTIVE;
}
sd->slice_count = -1;
sd->slice_start = -1;
sd->arg_count = -1;
sd->arg_start = -1;
}
// Validation. Check reverse mapping was built correctly.
if (key_index.has_reverse_mapping) {
for (size_t i = 0; i < kv_args_.size(); ++i) {
DCHECK_EQ(kv_args_[i], ArgS(full_args_, reverse_index_[i])) << full_args_;
}
}
// Validation.
@ -370,7 +396,7 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) {
}
DCHECK_EQ(unique_shard_cnt_, 0u);
DCHECK(args_slices_.empty());
DCHECK(kv_args_.empty());
DCHECK(kv_fp_.empty());
OpResult<KeyIndex> key_index = DetermineKeys(cid_, args);
@ -401,8 +427,8 @@ void Transaction::PrepareSquashedMultiHop(const CommandId* cid,
} else {
shard_data_[i].local_mask &= ~ACTIVE;
}
shard_data_[i].slice_start = 0;
shard_data_[i].slice_count = 0;
shard_data_[i].arg_start = 0;
shard_data_[i].arg_count = 0;
}
MultiBecomeSquasher();
@ -459,14 +485,15 @@ void Transaction::MultiSwitchCmd(const CommandId* cid) {
unique_shard_id_ = 0;
unique_shard_cnt_ = 0;
args_slices_.clear();
kv_args_.clear();
kv_fp_.clear();
reverse_index_.clear();
cid_ = cid;
cb_ptr_ = nullptr;
for (auto& sd : shard_data_) {
sd.slice_count = sd.slice_start = 0;
sd.arg_count = sd.arg_start = 0;
if (multi_->mode == NON_ATOMIC) {
sd.local_mask = 0; // Non atomic transactions schedule each time, so remove all flags
@ -528,6 +555,7 @@ void Transaction::PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdA
EnableShard(sid);
OpResult<KeyIndex> key_index = DetermineKeys(cid_, args);
CHECK(key_index);
DCHECK(!key_index->has_reverse_mapping);
StoreKeysInArgs(*key_index);
}
@ -1153,12 +1181,23 @@ ShardArgs Transaction::GetShardArgs(ShardId sid) const {
// We can read unique_shard_cnt_ only because ShardArgsInShard is called after IsArmedInShard
// barrier.
if (unique_shard_cnt_ == 1) {
return ShardArgs{full_args_, absl::MakeSpan(args_slices_)};
return kv_args_;
}
const auto& sd = shard_data_[sid];
return ShardArgs{full_args_,
absl::MakeSpan(args_slices_.data() + sd.slice_start, sd.slice_count)};
return ShardArgs{kv_args_.data() + sd.arg_start, sd.arg_count};
}
// from local index back to original arg index skipping the command.
// i.e. returns (first_key_pos -1) or bigger.
size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const {
DCHECK_LT(arg_index, reverse_index_.size());
if (unique_shard_cnt_ == 1)
return reverse_index_[arg_index];
const auto& sd = shard_data_[shard_id];
return reverse_index_[sd.arg_start + arg_index];
}
OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider,
@ -1334,7 +1373,7 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid, string_view
// Change state to awaked and store index of awakened key
sd.local_mask &= ~SUSPENDED_Q;
sd.local_mask |= AWAKED_Q;
sd.wake_key_pos = it.index();
sd.wake_key_pos = it - args.begin();
blocking_barrier_.Close();
return true;
@ -1345,8 +1384,8 @@ optional<string_view> Transaction::GetWakeKey(ShardId sid) const {
if ((sd.local_mask & AWAKED_Q) == 0)
return nullopt;
CHECK_LT(sd.wake_key_pos, full_args_.size());
return ArgS(full_args_, sd.wake_key_pos);
CHECK_NE(sd.wake_key_pos, UINT16_MAX);
return GetShardArgs(sid).at(sd.wake_key_pos);
}
void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult result) {
@ -1384,11 +1423,10 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult resul
journal::Entry::Payload entry_payload;
string_view cmd{cid_->name()};
if (unique_shard_cnt_ == 1 || args_slices_.empty()) {
if (unique_shard_cnt_ == 1 || kv_args_.empty()) {
entry_payload = journal::Entry::Payload(cmd, full_args_);
} else {
ShardArgs shard_args = GetShardArgs(shard->shard_id());
entry_payload = journal::Entry::Payload(cmd, shard_args);
entry_payload = journal::Entry::Payload(cmd, GetShardArgs(shard->shard_id()).AsSlice());
}
LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false, true);
}
@ -1475,6 +1513,10 @@ OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
int num_custom_keys = -1;
if (cid->opt_mask() & CO::REVERSE_MAPPING) {
key_index.has_reverse_mapping = true;
}
if (cid->opt_mask() & CO::VARIADIC_KEYS) {
// ZUNION/INTER <num_keys> <key1> [<key2> ...]
// EVAL <script> <num_keys>

View file

@ -180,6 +180,9 @@ class Transaction {
// Get command arguments for specific shard. Called from shard thread.
ShardArgs GetShardArgs(ShardId sid) const;
// Map arg_index from GetShardArgs slice to index in original command slice from InitByArgs.
size_t ReverseArgIndex(ShardId shard_id, size_t arg_index) const;
// Execute transaction hop. If conclude is true, it is removed from the pending queue.
void Execute(RunnableType cb, bool conclude);
@ -386,8 +389,8 @@ class Transaction {
// Set when the shard is prepared for another hop. Sync point. Cleared when execution starts.
std::atomic_bool is_armed = false;
uint32_t slice_start = 0; // Subspan in kv_args_ with local arguments.
uint32_t slice_count = 0;
uint32_t arg_start = 0; // Subspan in kv_args_ with local arguments.
uint32_t arg_count = 0;
// span into kv_fp_
uint32_t fp_start = 0;
@ -397,7 +400,7 @@ class Transaction {
TxQueue::Iterator pq_pos = TxQueue::kEnd;
// Index of key relative to args in shard that the shard was woken up after blocking wait.
uint32_t wake_key_pos = UINT32_MAX;
uint16_t wake_key_pos = UINT16_MAX;
// Irrational stats purely for debugging purposes.
struct Stats {
@ -440,11 +443,13 @@ class Transaction {
// Auxiliary structure used during initialization
struct PerShardCache {
std::vector<IndexSlice> slices;
std::vector<std::string_view> args;
std::vector<uint32_t> original_index;
unsigned key_step = 1;
void Clear() {
slices.clear();
args.clear();
original_index.clear();
}
};
@ -483,7 +488,8 @@ class Transaction {
void BuildShardIndex(const KeyIndex& keys, std::vector<PerShardCache>* out);
// Init shard data from shard index.
void InitShardData(absl::Span<const PerShardCache> shard_index, size_t num_args);
void InitShardData(absl::Span<const PerShardCache> shard_index, size_t num_args,
bool rev_mapping);
// Store all key index keys in args_. Used only for single shard initialization.
void StoreKeysInArgs(const KeyIndex& key_index);
@ -582,11 +588,10 @@ class Transaction {
// TODO: explore dense packing
absl::InlinedVector<PerShardData, 4> shard_data_;
// Stores slices of key/values partitioned by shards.
// Slices reference full_args_.
// Stores keys/values of the transaction partitioned by shards.
// We need values as well since we reorder keys, and we need to know what value corresponds
// to what key.
absl::InlinedVector<IndexSlice, 4> args_slices_;
absl::InlinedVector<std::string_view, 4> kv_args_;
// Fingerprints of keys, precomputed once during the transaction initialization.
absl::InlinedVector<LockFp, 4> kv_fp_;
@ -597,6 +602,9 @@ class Transaction {
// Set if a NO_AUTOJOURNAL command asked to enable auto journal again
bool re_enabled_auto_journal_ = false;
// Reverse argument mapping for ReverseArgIndex to convert from shard index to original index.
std::vector<uint32_t> reverse_index_;
RunnableType* cb_ptr_ = nullptr; // Run on shard threads
const CommandId* cid_ = nullptr; // Underlying command
std::unique_ptr<MultiData> multi_; // Initialized when the transaction is multi/exec.

View file

@ -15,21 +15,7 @@ namespace dfly {
using namespace std;
using Payload = journal::Entry::Payload;
size_t ShardArgs::Size() const {
size_t sz = 0;
for (const auto& s : slice_.second)
sz += (s.second - s.first);
return sz;
}
void RecordJournal(const OpArgs& op_args, string_view cmd, const ShardArgs& args,
uint32_t shard_cnt, bool multi_commands) {
VLOG(2) << "Logging command " << cmd << " from txn " << op_args.tx->txid();
op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, multi_commands,
false);
}
void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args, uint32_t shard_cnt,
void RecordJournal(const OpArgs& op_args, string_view cmd, ArgSlice args, uint32_t shard_cnt,
bool multi_commands) {
VLOG(2) << "Logging command " << cmd << " from txn " << op_args.tx->txid();
op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, multi_commands,

View file

@ -39,6 +39,7 @@ struct KeyIndex {
// if index is non-zero then adds another key index (usually 0).
// relevant for for commands like ZUNIONSTORE/ZINTERSTORE for destination key.
std::optional<uint16_t> bonus{};
bool has_reverse_mapping = false;
KeyIndex(unsigned s = 0, unsigned e = 0, unsigned step = 0) : start(s), end(e), step(step) {
}
@ -106,94 +107,52 @@ using KeyReadyChecker =
std::function<bool(EngineShard*, const DbContext& context, Transaction* tx, std::string_view)>;
// References arguments in another array.
using IndexSlice = std::pair<uint32_t, uint32_t>; // [begin, end)
// ShardArgs - hold a span to full arguments and a span of sub-ranges
// referencing those arguments.
class ShardArgs {
using ArgsIndexPair = std::pair<facade::CmdArgList, absl::Span<const IndexSlice>>;
ArgsIndexPair slice_;
using IndexSlice = std::pair<uint32_t, uint32_t>; // (begin, end)
class ShardArgs : protected ArgSlice {
public:
class Iterator {
facade::CmdArgList arglist_;
absl::Span<const IndexSlice>::const_iterator index_it_;
uint32_t delta_ = 0;
using ArgSlice::ArgSlice;
using ArgSlice::at;
using ArgSlice::operator=;
using Iterator = ArgSlice::iterator;
public:
using iterator_category = std::input_iterator_tag;
using value_type = std::string_view;
using difference_type = ptrdiff_t;
using pointer = value_type*;
using reference = value_type&;
// First version, corresponds to spans over arguments.
Iterator(facade::CmdArgList list, absl::Span<const IndexSlice>::const_iterator it)
: arglist_(list), index_it_(it) {
ShardArgs(const ArgSlice& o) : ArgSlice(o) {
}
bool operator==(const Iterator& o) const {
return arglist_ == o.arglist_ && index_it_ == o.index_it_ && delta_ == o.delta_;
size_t Size() const {
return ArgSlice::size();
}
bool operator!=(const Iterator& o) const {
return !(*this == o);
auto cbegin() const {
return ArgSlice::cbegin();
}
std::string_view operator*() const {
return facade::ArgS(arglist_, index());
auto cend() const {
return ArgSlice::cend();
}
Iterator& operator++() {
++delta_;
if (index() >= index_it_->second) {
++index_it_;
++delta_ = 0;
}
return *this;
}
size_t index() const {
return index_it_->first + delta_;
}
};
ShardArgs(facade::CmdArgList fa, absl::Span<const IndexSlice> s) : slice_(ArgsIndexPair(fa, s)) {
}
ShardArgs() : slice_(ArgsIndexPair{}) {
}
size_t Size() const;
Iterator cbegin() const {
return Iterator{slice_.first, slice_.second.begin()};
}
Iterator cend() const {
return Iterator{slice_.first, slice_.second.end()};
}
Iterator begin() const {
auto begin() const {
return cbegin();
}
Iterator end() const {
auto end() const {
return cend();
}
bool Empty() const {
return slice_.second.empty();
return ArgSlice::empty();
}
std::string_view Front() const {
return *cbegin();
}
ArgSlice AsSlice() const {
return ArgSlice(*this);
}
};
// Record non auto journal command with own txid and dbid.
void RecordJournal(const OpArgs& op_args, std::string_view cmd, const ShardArgs& args,
uint32_t shard_cnt = 1, bool multi_commands = false);
void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args,
uint32_t shard_cnt = 1, bool multi_commands = false);

View file

@ -879,7 +879,7 @@ double GetKeyWeight(Transaction* t, ShardId shard_id, const vector<double>& weig
return 1;
}
unsigned windex = key_index - cmdargs_keys_offset;
unsigned windex = t->ReverseArgIndex(shard_id, key_index) - cmdargs_keys_offset;
DCHECK_LT(windex, weights.size());
return weights[windex];
}
@ -920,8 +920,8 @@ OpResult<ScoredMap> OpUnion(EngineShard* shard, Transaction* t, string_view dest
++index;
continue;
}
key_weight_vec[index] = {
*it_res, GetKeyWeight(t, shard->shard_id(), weights, start.index(), cmdargs_keys_offset)};
key_weight_vec[index] = {*it_res, GetKeyWeight(t, shard->shard_id(), weights,
index + removed_keys, cmdargs_keys_offset)};
++index;
}
@ -3329,7 +3329,7 @@ constexpr uint32_t kGeoRadiusByMember = WRITE | GEO | SLOW;
} // namespace acl
void ZSetFamily::Register(CommandRegistry* registry) {
constexpr uint32_t kStoreMask = CO::WRITE | CO::VARIADIC_KEYS | CO::DENYOOM;
constexpr uint32_t kStoreMask = CO::WRITE | CO::VARIADIC_KEYS | CO::REVERSE_MAPPING | CO::DENYOOM;
registry->StartFamily();
// TODO: to add support for SCRIPT for BZPOPMIN, BZPOPMAX similarly to BLPOP.
*registry
@ -3368,7 +3368,9 @@ void ZSetFamily::Register(CommandRegistry* registry) {
ZRevRangeByScore)
<< CI{"ZREVRANK", CO::READONLY | CO::FAST, 3, 1, 1, acl::kZRevRank}.HFUNC(ZRevRank)
<< CI{"ZSCAN", CO::READONLY, -3, 1, 1, acl::kZScan}.HFUNC(ZScan)
<< CI{"ZUNION", CO::READONLY | CO::VARIADIC_KEYS, -3, 2, 2, acl::kZUnion}.HFUNC(ZUnion)
<< CI{"ZUNION", CO::READONLY | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS, -3, 2, 2,
acl::kZUnion}
.HFUNC(ZUnion)
<< CI{"ZUNIONSTORE", kStoreMask, -4, 3, 3, acl::kZUnionStore}.HFUNC(ZUnionStore)
// GEO functions