fix: improve consistency around brpop flow

1. Added a test that was breaking earlier.
2. Made sure that multiple waked brpop transaction would not
   snatch items from one another.
3. Fixed watched-queues clean-up logic inside blocking_controller that caused deadlocks.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-03-17 09:07:10 +02:00 committed by Roman Gershman
parent c96f637f73
commit f4081f3979
7 changed files with 214 additions and 155 deletions

View file

@ -40,6 +40,11 @@ struct BlockingController::WatchQueue {
state = SUSPENDED;
notify_txid = UINT64_MAX;
}
auto Find(Transaction* tx) const {
return find_if(items.begin(), items.end(),
[tx](const WatchItem& wi) { return wi.get() == tx; });
}
};
// Watch state per db.
@ -50,30 +55,64 @@ struct BlockingController::DbWatchTable {
// they reference key objects in queue_map.
absl::flat_hash_set<base::string_view_sso> awakened_keys;
void RemoveEntry(WatchQueueMap::iterator it);
// returns true if awake event was added.
// Requires that the key queue be in the required state.
bool AddAwakeEvent(WatchQueue::State cur_state, string_view key);
bool AddAwakeEvent(string_view key);
// Returns true if awakened tx was removed from the queue.
bool UnwatchTx(string_view key, Transaction* tx);
};
bool BlockingController::DbWatchTable::UnwatchTx(string_view key, Transaction* tx) {
auto wq_it = queue_map.find(key);
// With multiple same keys we may have misses because the first iteration
// on the same key could remove the queue.
if (wq_it == queue_map.end())
return false;
WatchQueue* wq = wq_it->second.get();
DCHECK(!wq->items.empty());
bool res = false;
if (wq->state == WatchQueue::ACTIVE && wq->items.front().get() == tx) {
wq->items.pop_front();
// We suspend the queue and add keys to re-verification.
// If they are still present, this queue will be reactivated below.
wq->state = WatchQueue::SUSPENDED;
if (!wq->items.empty())
awakened_keys.insert(wq_it->first); // send for further validation.
res = true;
} else {
// tx can be is_awakened == true because of some other key and this queue would be
// in suspended and we still need to clean it up.
// the suspended item does not have to be the first one in the queue.
// This shard has not been awakened and in case this transaction in the queue
// we must clean it up.
if (auto it = wq->Find(tx); it != wq->items.end()) {
wq->items.erase(it);
}
}
if (wq->items.empty()) {
queue_map.erase(wq_it);
}
return res;
}
BlockingController::BlockingController(EngineShard* owner) : owner_(owner) {
}
BlockingController::~BlockingController() {
}
void BlockingController::DbWatchTable::RemoveEntry(WatchQueueMap::iterator it) {
DVLOG(2) << "Erasing watchqueue key " << it->first;
awakened_keys.erase(it->first);
queue_map.erase(it);
}
bool BlockingController::DbWatchTable::AddAwakeEvent(WatchQueue::State cur_state, string_view key) {
bool BlockingController::DbWatchTable::AddAwakeEvent(string_view key) {
auto it = queue_map.find(key);
if (it == queue_map.end() || it->second->state != cur_state)
if (it == queue_map.end() || it->second->state != WatchQueue::SUSPENDED)
return false; /// nobody watches this key or state does not match.
string_view dbkey = it->first;
@ -81,31 +120,82 @@ bool BlockingController::DbWatchTable::AddAwakeEvent(WatchQueue::State cur_state
return awakened_keys.insert(dbkey).second;
}
// Processes potentially awakened keys and verifies that these are indeed
// awakened to eliminate false positives.
// In addition, optionally removes completed_t from the front of the watch queues.
void BlockingController::RunStep(Transaction* completed_t) {
VLOG(1) << "RunStep [" << owner_->shard_id() << "] " << completed_t;
// Optionally removes tx from the front of the watch queues.
void BlockingController::FinalizeWatched(KeyLockArgs lock_args, Transaction* tx) {
DCHECK(tx);
if (completed_t) {
awakened_transactions_.erase(completed_t);
ShardId sid = owner_->shard_id();
auto dbit = watched_dbs_.find(completed_t->GetDbIndex());
if (dbit != watched_dbs_.end()) {
DbWatchTable& wt = *dbit->second;
uint16_t local_mask = tx->GetLocalMask(sid);
VLOG(1) << "FinalizeBlocking [" << sid << "]" << tx->DebugId() << " " << local_mask;
ShardId sid = owner_->shard_id();
KeyLockArgs lock_args = completed_t->GetLockArgs(sid);
bool is_awakened = local_mask & Transaction::AWAKED_Q;
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
string_view key = lock_args.args[i];
if (wt.AddAwakeEvent(WatchQueue::ACTIVE, key)) {
awakened_indices_.emplace(completed_t->GetDbIndex());
}
}
if (is_awakened)
awakened_transactions_.erase(tx);
auto dbit = watched_dbs_.find(tx->GetDbIndex());
// Can happen if it was the only transaction in the queue and it was notified and removed.
if (dbit == watched_dbs_.end())
return;
DbWatchTable& wt = *dbit->second;
// Add keys of processed transaction so we could awake the next one in the queue
// in case those keys still exist.
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
string_view key = lock_args.args[i];
bool removed_awakened = wt.UnwatchTx(key, tx);
if (removed_awakened) {
CHECK(is_awakened) << tx->DebugId() << " " << key << " " << local_mask;
}
}
if (wt.queue_map.empty()) {
watched_dbs_.erase(dbit);
}
awakened_indices_.emplace(tx->GetDbIndex());
}
// Similar function but with ArgSlice. TODO: to fix the duplication.
void BlockingController::FinalizeWatched(ArgSlice args, Transaction* tx) {
DCHECK(tx);
ShardId sid = owner_->shard_id();
VLOG(1) << "FinalizeBlocking [" << sid << "]" << tx->DebugId();
uint16_t local_mask = tx->GetLocalMask(sid);
bool is_awakened = local_mask & Transaction::AWAKED_Q;
if (is_awakened)
awakened_transactions_.erase(tx);
auto dbit = watched_dbs_.find(tx->GetDbIndex());
// Can happen if it was the only transaction in the queue and it was notified and removed.
if (dbit == watched_dbs_.end())
return;
DbWatchTable& wt = *dbit->second;
// Add keys of processed transaction so we could awake the next one in the queue
// in case those keys still exist.
for (string_view key : args) {
bool removed_awakened = wt.UnwatchTx(key, tx);
if (removed_awakened) {
CHECK(is_awakened) << tx->DebugId() << " " << key << " " << local_mask;
}
}
if (wt.queue_map.empty()) {
watched_dbs_.erase(dbit);
}
awakened_indices_.emplace(tx->GetDbIndex());
}
void BlockingController::NotifyPending() {
DbContext context;
context.time_now_ms = GetCurrentTimeMs();
@ -163,57 +253,19 @@ void BlockingController::AddWatched(ArgSlice keys, Transaction* trans) {
}
}
// Runs in O(N) complexity in the worst case.
void BlockingController::RemoveWatched(ArgSlice keys, Transaction* trans) {
VLOG(1) << "RemoveWatched [" << owner_->shard_id() << "] " << trans->DebugId();
auto dbit = watched_dbs_.find(trans->GetDbIndex());
if (dbit == watched_dbs_.end())
return;
DbWatchTable& wt = *dbit->second;
for (auto key : keys) {
auto watch_it = wt.queue_map.find(key);
// that can happen in case of duplicate keys or when we do not watch on all the argument keys
// like with BLPOPRPUSH.
if (watch_it == wt.queue_map.end())
continue;
WatchQueue& wq = *watch_it->second;
for (auto items_it = wq.items.begin(); items_it != wq.items.end(); ++items_it) {
if (items_it->trans == trans) {
wq.items.erase(items_it);
break;
}
}
// again, we may not find trans if we searched for the same key several times.
if (wq.items.empty()) {
wt.RemoveEntry(watch_it);
}
}
if (wt.queue_map.empty()) {
watched_dbs_.erase(dbit);
}
}
// Called from commands like lpush.
void BlockingController::AwakeWatched(DbIndex db_index, string_view db_key) {
auto it = watched_dbs_.find(db_index);
if (it == watched_dbs_.end())
return;
VLOG(1) << "AwakeWatched: db(" << db_index << ") " << db_key;
DbWatchTable& wt = *it->second;
DCHECK(!wt.queue_map.empty());
if (wt.AddAwakeEvent(WatchQueue::SUSPENDED, db_key)) {
if (wt.AddAwakeEvent(db_key)) {
VLOG(1) << "AwakeWatched: db(" << db_index << ") " << db_key;
awakened_indices_.insert(db_index);
} else {
DVLOG(1) << "Skipped awakening " << db_index;
}
}
@ -224,6 +276,7 @@ void BlockingController::NotifyWatchQueue(std::string_view key, WatchQueueMap* w
DVLOG(1) << "Notify WQ: [" << owner_->shard_id() << "] " << key;
WatchQueue* wq = w_it->second.get();
DCHECK_EQ(wq->state, WatchQueue::SUSPENDED);
wq->state = WatchQueue::ACTIVE;
auto& queue = wq->items;
@ -232,15 +285,17 @@ void BlockingController::NotifyWatchQueue(std::string_view key, WatchQueueMap* w
do {
WatchItem& wi = queue.front();
Transaction* head = wi.get();
DVLOG(2) << "Pop " << head << " from key " << key;
queue.pop_front();
DVLOG(2) << "WQ-Pop " << head->DebugId() << " from key " << key;
if (head->NotifySuspended(owner_->committed_txid(), sid)) {
// We deliberately keep the notified transaction in the queue to know which queue
// must handled when this transaction finished.
wq->notify_txid = owner_->committed_txid();
awakened_transactions_.insert(head);
break;
}
queue.pop_front();
} while (!queue.empty());
if (wq->items.empty()) {
@ -248,51 +303,6 @@ void BlockingController::NotifyWatchQueue(std::string_view key, WatchQueueMap* w
}
}
#if 0
void BlockingController::OnTxFinish() {
VLOG(1) << "OnTxFinish [" << owner_->shard_id() << "]";
if (waiting_convergence_.empty())
return;
TxQueue* txq = owner_->txq();
if (txq->Empty()) {
for (const auto& k_v : waiting_convergence_) {
NotifyConvergence(k_v.second);
}
waiting_convergence_.clear();
return;
}
TxId txq_score = txq->HeadScore();
do {
auto tx_waiting = waiting_convergence_.begin();
Transaction* trans = tx_waiting->second;
// Instead of taking the map key, we use upto date notify_txid
// which could meanwhile improve (decrease). Not important though.
TxId notifyid = trans->notify_txid();
if (owner_->committed_txid() < notifyid && txq_score <= notifyid)
break; // we can not converge for notifyid so we can not converge for larger ts as well.
waiting_convergence_.erase(tx_waiting);
NotifyConvergence(trans);
} while (!waiting_convergence_.empty());
}
void BlockingController::RegisterAwaitForConverge(Transaction* t) {
TxId notify_id = t->notify_txid();
DVLOG(1) << "RegisterForConverge " << t->DebugId() << " at notify " << notify_id;
// t->notify_txid might improve in parallel. it does not matter since convergence
// will happen even with stale notify_id.
waiting_convergence_.emplace(notify_id, t);
}
#endif
size_t BlockingController::NumWatched(DbIndex db_indx) const {
auto it = watched_dbs_.find(db_indx);
if (it == watched_dbs_.end())