Implement single shard use-case for rpoplpush. Some BLPOP related refactoring

This commit is contained in:
Roman Gershman 2022-04-28 19:05:51 +03:00
parent d3764efbca
commit b36c16b314
7 changed files with 167 additions and 130 deletions

View file

@ -27,72 +27,6 @@ std::atomic_uint64_t op_seq{1};
} // namespace
struct Transaction::FindFirstProcessor {
public:
FindFirstProcessor(TxId notify, unsigned size)
: find_res_(size, OpStatus::KEY_NOTFOUND), notify_txid_(notify) {
}
void Find(Transaction* t);
OpResult<Transaction::FindFirstResult> Process(Transaction* t);
private:
OpStatus RunInShard(Transaction* t, EngineShard* shard);
// Holds Find results: (iterator to a found key, and its index in the passed arguments).
// See DbSlice::FindFirst for more details.
// spans all the shards for now.
std::vector<OpResult<std::pair<PrimeIterator, unsigned>>> find_res_;
TxId notify_txid_;
};
void Transaction::FindFirstProcessor::Find(Transaction* t) {
VLOG(2) << "FindFirst::Find " << t->DebugId();
t->Execute([this](auto* t, auto* s) { return RunInShard(t, s); }, false);
}
OpStatus Transaction::FindFirstProcessor::RunInShard(Transaction* t, EngineShard* shard) {
if (notify_txid_ == kuint64max || shard->committed_txid() == notify_txid_) {
// TODO: to add timestamp logic that provides consistency guarantees for blocking transactions.
auto args = t->ShardArgsInShard(shard->shard_id());
find_res_[shard->shard_id()] = shard->db_slice().FindFirst(t->db_index(), args);
}
return OpStatus::OK;
}
OpResult<Transaction::FindFirstResult> Transaction::FindFirstProcessor::Process(Transaction* t) {
uint32_t min_arg_indx = UINT32_MAX;
FindFirstResult result;
for (size_t sid = 0; sid < find_res_.size(); ++sid) {
const auto& fr = find_res_[sid];
auto status = fr.status();
if (status == OpStatus::KEY_NOTFOUND)
continue;
if (status == OpStatus::WRONG_TYPE) {
return status;
}
DCHECK(fr && IsValid(fr->first));
const auto& it_pos = fr.value();
size_t arg_indx = t->ReverseArgIndex(sid, it_pos.second);
if (arg_indx < min_arg_indx) {
min_arg_indx = arg_indx;
result.sid = sid;
result.find_res = it_pos.first;
}
}
if (result.sid == kInvalidSid) {
return OpStatus::KEY_NOTFOUND;
}
return result;
}
IntentLock::Mode Transaction::Mode() const {
return (cid_->opt_mask() & CO::READONLY) ? IntentLock::SHARED : IntentLock::EXCLUSIVE;
}
@ -1194,12 +1128,4 @@ void Transaction::BreakOnClose() {
}
}
auto Transaction::FindFirst() -> OpResult<FindFirstResult> {
FindFirstProcessor processor(notify_txid_.load(memory_order_relaxed), ess_->size());
processor.Find(this);
return processor.Process(this);
}
} // namespace dfly