Remove blpop FindFirst hop after wakeup (#1168)

Remove BLPOP hop after wake
This commit is contained in:
Vladislav 2023-05-03 19:45:06 +03:00 committed by GitHub
parent e91cb6b153
commit cb82680aca
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 83 additions and 156 deletions

View file

@ -287,7 +287,7 @@ void BlockingController::NotifyWatchQueue(std::string_view key, WatchQueueMap* w
Transaction* head = wi.get();
DVLOG(2) << "WQ-Pop " << head->DebugId() << " from key " << key;
if (head->NotifySuspended(owner_->committed_txid(), sid)) {
if (head->NotifySuspended(owner_->committed_txid(), sid, key)) {
// We deliberately keep the notified transaction in the queue to know which queue
// must handled when this transaction finished.
wq->notify_txid = owner_->committed_txid();

View file

@ -136,7 +136,7 @@ struct ShardFFResult {
};
// Used by bpopper.
OpResult<ShardFFResult> FindFirst(bool awaked_only, Transaction* trans) {
OpResult<ShardFFResult> FindFirst(Transaction* trans) {
VLOG(2) << "FindFirst::Find " << trans->DebugId();
// Holds Find results: (iterator to a found key, and its index in the passed arguments).
@ -145,31 +145,17 @@ OpResult<ShardFFResult> FindFirst(bool awaked_only, Transaction* trans) {
std::vector<OpResult<FFResult>> find_res(shard_set->size());
fill(find_res.begin(), find_res.end(), OpStatus::KEY_NOTFOUND);
// We must capture notify_txid before we spawn callbacks.
// Otherwise, consider the following scenario:
// 0. The key is added in shard 0, with notify_txid = 100
// 1. The cb runs first on shard1 and does not find anything.
// 2. A tx 99 runs on shard 1, adds a key, updates notify_txid to 99.
// 3. the cb on shard 0 runs and ignores the key due to lower notify_txid.
uint64_t notify_txid = trans->GetNotifyTxid();
auto cb = [&](Transaction* t, EngineShard* shard) {
auto args = t->GetShardArgs(shard->shard_id());
// if requested to consider awaked shards only, we check the AWAKED_Q flag.
if (awaked_only && (t->GetLocalMask(shard->shard_id()) & Transaction::AWAKED_Q) == 0) {
return OpStatus::OK;
}
if (shard->committed_txid() <= notify_txid) {
OpResult<pair<PrimeIterator, unsigned>> ff_res =
shard->db_slice().FindFirst(t->GetDbContext(), args);
OpResult<pair<PrimeIterator, unsigned>> ff_res =
shard->db_slice().FindFirst(t->GetDbContext(), args);
if (ff_res) {
FFResult ff_result(ff_res->first->first.AsRef(), ff_res->second);
find_res[shard->shard_id()] = move(ff_result);
} else {
find_res[shard->shard_id()] = ff_res.status();
}
if (ff_res) {
FFResult ff_result(ff_res->first->first.AsRef(), ff_res->second);
find_res[shard->shard_id()] = move(ff_result);
} else {
find_res[shard->shard_id()] = ff_res.status();
}
return OpStatus::OK;
@ -229,6 +215,7 @@ class BPopper {
private:
void Pop(Transaction* t, EngineShard* shard);
void OpPop(Transaction* t, EngineShard* shard);
ListDir dir_;
@ -258,25 +245,25 @@ BPopper::BPopper(ListDir dir) : dir_(dir) {
}
OpStatus BPopper::Run(Transaction* trans, unsigned msec) {
time_point tp =
msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max();
auto tp = msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max();
bool is_multi = trans->IsMulti();
trans->Schedule();
auto* stats = ServerState::tl_connection_stats();
OpResult<ShardFFResult> result = FindFirst(false, trans);
OpResult<ShardFFResult> result = FindFirst(trans);
if (result.status() == OpStatus::KEY_NOTFOUND) {
if (result.ok()) {
ff_result_ = move(result.value());
} else if (result.status() == OpStatus::KEY_NOTFOUND) {
// Close transaction and return.
if (is_multi) {
// close transaction and return.
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
trans->Execute(std::move(cb), true);
return OpStatus::TIMED_OUT;
}
// Block
auto wcb = [](Transaction* t, EngineShard* shard) {
return t->GetShardArgs(shard->shard_id());
};
@ -288,25 +275,16 @@ OpStatus BPopper::Run(Transaction* trans, unsigned msec) {
if (!wait_succeeded)
return OpStatus::TIMED_OUT;
// Now we have something for sure.
result = FindFirst(true, trans); // retry - must find something.
}
if (!result) {
} else {
// Could be the wrong-type error.
// cleanups, locks removal etc.
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
trans->Execute(std::move(cb), true);
DCHECK_NE(result.status(), OpStatus::KEY_NOTFOUND);
return result.status();
}
VLOG(1) << "Popping an element " << trans->DebugId();
ff_result_ = move(result.value());
auto cb = [this](Transaction* t, EngineShard* shard) {
Pop(t, shard);
return OpStatus::OK;
@ -317,27 +295,35 @@ OpStatus BPopper::Run(Transaction* trans, unsigned msec) {
}
void BPopper::Pop(Transaction* t, EngineShard* shard) {
if (shard->shard_id() == ff_result_.sid) {
if (auto wake_key = t->GetWakeKey(shard->shard_id()); wake_key) {
key_ = *wake_key;
OpPop(t, shard);
} else if (shard->shard_id() == ff_result_.sid) {
ff_result_.key.GetString(&key_);
auto& db_slice = shard->db_slice();
auto it_res = db_slice.Find(t->GetDbContext(), key_, OBJ_LIST);
CHECK(it_res) << t->DebugId() << " " << key_; // must exist and must be ok.
PrimeIterator it = *it_res;
quicklist* ql = GetQL(it->second);
OpPop(t, shard);
}
}
DVLOG(2) << "popping from " << key_ << " " << t->DebugId();
db_slice.PreUpdate(t->GetDbIndex(), it);
value_ = ListPop(dir_, ql);
db_slice.PostUpdate(t->GetDbIndex(), it, key_);
if (quicklistCount(ql) == 0) {
DVLOG(1) << "deleting key " << key_ << " " << t->DebugId();
CHECK(shard->db_slice().Del(t->GetDbIndex(), it));
}
OpArgs op_args = t->GetOpArgs(shard);
if (op_args.shard->journal()) {
string command = dir_ == ListDir::LEFT ? "LPOP" : "RPOP";
RecordJournal(op_args, command, ArgSlice{key_}, 1);
}
void BPopper::OpPop(Transaction* t, EngineShard* shard) {
auto& db_slice = shard->db_slice();
auto it_res = db_slice.Find(t->GetDbContext(), key_, OBJ_LIST);
CHECK(it_res) << t->DebugId() << " " << key_; // must exist and must be ok.
PrimeIterator it = *it_res;
quicklist* ql = GetQL(it->second);
DVLOG(2) << "popping from " << key_ << " " << t->DebugId();
db_slice.PreUpdate(t->GetDbIndex(), it);
value_ = ListPop(dir_, ql);
db_slice.PostUpdate(t->GetDbIndex(), it, key_);
if (quicklistCount(ql) == 0) {
DVLOG(1) << "deleting key " << key_ << " " << t->DebugId();
CHECK(shard->db_slice().Del(t->GetDbIndex(), it));
}
OpArgs op_args = t->GetOpArgs(shard);
if (op_args.shard->journal()) {
string command = dir_ == ListDir::LEFT ? "LPOP" : "RPOP";
RecordJournal(op_args, command, ArgSlice{key_}, 1);
}
}
@ -1007,7 +993,6 @@ OpResult<string> BPopPusher::RunSingle(Transaction* t, time_point tp) {
// Block
++stats->num_blocked_clients;
bool wait_succeeded = t->WaitOnWatch(tp, std::move(wcb));
--stats->num_blocked_clients;

View file

@ -233,82 +233,13 @@ TEST_F(ListFamilyTest, BLPopMultiPush) {
pop_fb.Join();
// We can't determine what key was popped, so only check result presence.
// It might not be first kKey3 "C" because of squashing and re-ordering.
ASSERT_THAT(blpop_resp, ArrLen(2));
auto resp_arr = blpop_resp.GetVec();
EXPECT_THAT(resp_arr, ElementsAre(kKey1, "A"));
ASSERT_THAT(Run({"exists", kKey1, kKey2, kKey3}), IntArg(2));
ASSERT_EQ(0, NumWatched());
}
TEST_F(ListFamilyTest, BLPopSerialize) {
RespExpr blpop_resp;
auto pop_fb = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"});
});
WaitUntilLocked(0, kKey1);
LOG(INFO) << "Starting multi";
TxClock cl1, cl2;
auto p1_fb = pp_->at(1)->LaunchFiber([&] {
// auto resp = Run({"multi"}); // We use multi to assign ts to lpush.
// ASSERT_EQ(resp, "OK");
Run({"lpush", kKey1, "A"});
/*for (unsigned i = 0; i < 10; ++i) {
// dummy command to prolong this transaction and make convergence more complicated.
Run({"exists", kKey1, kKey2, kKey3});
}
resp = Run({"exec"});
// Either this lpush has run first or the one below.
// In any case it must be that between 2 invocations of lpush (wrapped in multi)
// blpop will be triggered and it will empty the list again. Hence, in any case
// lpush kKey1 here and below should return 1.
ASSERT_THAT(resp, ArrLen(11));*/
cl1 = GetDebugInfo("IO1").clock;
LOG(INFO) << "push1 ts: " << cl1;
});
auto p2_fb = pp_->at(2)->LaunchFiber([&] {
auto resp = Run({"multi"}); // We use multi to assign ts to lpush.
ASSERT_EQ(resp, "OK");
for (unsigned i = 0; i < 10; ++i) {
// dummy command to prolong this transaction and make convergence more complicated.
Run({"exists", kKey1, kKey2, kKey3});
}
Run({"lpush", kKey1, "B"});
Run({"lpush", kKey2, "C"});
resp = Run({"exec"});
ASSERT_THAT(resp, ArrLen(12));
/*auto sub_arr = resp.GetVec();
EXPECT_THAT(sub_arr[0], IntArg(1));
EXPECT_THAT(sub_arr[1], IntArg(1));*/
cl2 = GetDebugInfo("IO2").clock;
LOG(INFO) << "push2 ts: " << cl2;
});
p1_fb.Join();
p2_fb.Join();
pop_fb.Join();
ASSERT_THAT(blpop_resp, ArrLen(2));
auto resp_arr = blpop_resp.GetVec();
EXPECT_THAT(resp_arr, ElementsAre(kKey1, ArgType(RespExpr::STRING)));
if (cl2 < cl1) {
EXPECT_EQ(resp_arr[1], "B");
} else {
EXPECT_EQ(resp_arr[1], "A");
}
}
TEST_F(ListFamilyTest, WrongTypeDoesNotWake) {
RespExpr blpop_resp;

View file

@ -1107,7 +1107,7 @@ bool Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provi
auto wake_cb = [this] {
return (coordinator_state_ & COORD_CANCELLED) ||
notify_txid_.load(memory_order_relaxed) != kuint64max;
wakeup_requested_.load(memory_order_relaxed) > 0;
};
cv_status status = cv_status::no_timeout;
@ -1257,7 +1257,7 @@ bool Transaction::IsGlobal() const {
// Runs only in the shard thread.
// Returns true if the transacton has changed its state from suspended to awakened,
// false, otherwise.
bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) {
bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid, string_view key) {
unsigned idx = SidToId(sid);
auto& sd = shard_data_[idx];
unsigned local_mask = sd.local_mask;
@ -1266,6 +1266,12 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) {
return false;
}
// Wake a transaction only once on the first notify.
// We don't care about preserving the strict order with multiple operations running on blocking
// keys in parallel, because the internal order is not observable from outside either way.
if (wakeup_requested_.fetch_add(1, memory_order_relaxed) > 0)
return false;
DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask << " by commited_id "
<< committed_txid;
@ -1277,20 +1283,25 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) {
sd.local_mask &= ~SUSPENDED_Q;
sd.local_mask |= AWAKED_Q;
TxId notify_id = notify_txid_.load(memory_order_relaxed);
while (committed_txid < notify_id) {
if (notify_txid_.compare_exchange_weak(notify_id, committed_txid, memory_order_relaxed)) {
// if we improved notify_txid_ - break.
blocking_ec_.notify(); // release barrier.
break;
}
}
return true;
// Find index of awakened key.
auto args = GetShardArgs(sid);
auto it =
find_if(args.begin(), args.end(), [key](auto arg) { return facade::ToSV(arg) == key; });
DCHECK(it != args.end());
sd.wake_key_pos = it - args.begin();
}
CHECK(sd.local_mask & AWAKED_Q);
return false;
blocking_ec_.notify();
return true;
}
optional<string_view> Transaction::GetWakeKey(ShardId sid) const {
auto& sd = shard_data_[SidToId(sid)];
if ((sd.local_mask & AWAKED_Q) == 0)
return nullopt;
CHECK_NE(sd.wake_key_pos, UINT16_MAX);
return GetShardArgs(sid).at(sd.wake_key_pos);
}
void Transaction::LogAutoJournalOnShard(EngineShard* shard) {

View file

@ -177,10 +177,8 @@ class Transaction {
bool WaitOnWatch(const time_point& tp, WaitKeysProvider cb);
// Returns true if transaction is awaked, false if it's timed-out and can be removed from the
// blocking queue. NotifySuspended may be called from (multiple) shard threads and
// with each call potentially improving the minimal wake_txid at which
// this transaction has been awaked.
bool NotifySuspended(TxId committed_ts, ShardId sid);
// blocking queue.
bool NotifySuspended(TxId committed_ts, ShardId sid, std::string_view key);
// Cancel all blocking watches on shutdown. Set COORD_CANCELLED.
void BreakOnShutdown();
@ -257,10 +255,6 @@ class Transaction {
return unique_shard_cnt_;
}
TxId GetNotifyTxid() const {
return notify_txid_.load(std::memory_order_relaxed);
}
bool IsMulti() const {
return bool(multi_);
}
@ -275,6 +269,9 @@ class Transaction {
return coordinator_state_ & COORD_OOO;
}
// If blocking tx was woken up on this shard, get wake key.
std::optional<std::string_view> GetWakeKey(ShardId sid) const;
OpArgs GetOpArgs(EngineShard* shard) const {
return OpArgs{shard, this, GetDbContext()};
}
@ -329,7 +326,7 @@ class Transaction {
std::atomic_bool is_armed{false};
// We pad with some memory so that atomic loads won't cause false sharing betweem threads.
char pad[48]; // to make sure PerShardData is 64 bytes and takes full cacheline.
char pad[46]; // to make sure PerShardData is 64 bytes and takes full cacheline.
uint32_t arg_start = 0; // Indices into args_ array.
uint16_t arg_count = 0;
@ -341,6 +338,9 @@ class Transaction {
// Needed to rollback inconsistent schedulings or remove OOO transactions from
// tx queue.
uint32_t pq_pos = TxQueue::kEnd;
// Index of key relative to args in shard that the shard was woken up after blocking wait.
uint16_t wake_key_pos = UINT16_MAX;
};
static_assert(sizeof(PerShardData) == 64); // cacheline
@ -537,7 +537,7 @@ class Transaction {
DbIndex db_index_{0};
uint64_t time_now_ms_{0};
std::atomic<TxId> notify_txid_{UINT64_MAX};
std::atomic<uint32_t> wakeup_requested_{0}; // whether tx was woken up
std::atomic_uint32_t use_count_{0}, run_count_{0}, seqlock_{0};
// unique_shard_cnt_ and unique_shard_id_ are accessed only by coordinator thread.