fix: do not crash with inconsistent watch queue (#2515)

Output more info about the state of things.
Skip the non existent key and continue the execution.
Fixes #2514

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-02-01 09:48:42 +02:00 committed by GitHub
parent 73fe5a4eb2
commit 2b0310db32
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 25 additions and 18 deletions

View file

@ -55,8 +55,7 @@ struct BlockingController::DbWatchTable {
WatchQueueMap queue_map;
// awakened keys point to blocked keys that can potentially be unblocked.
// they reference key objects in queue_map.
absl::flat_hash_set<base::string_view_sso> awakened_keys;
absl::flat_hash_set<std::string> awakened_keys;
// returns true if awake event was added.
// Requires that the key queue be in the required state.
@ -118,9 +117,7 @@ bool BlockingController::DbWatchTable::AddAwakeEvent(string_view key) {
if (it == queue_map.end() || it->second->state != WatchQueue::SUSPENDED)
return false; /// nobody watches this key or state does not match.
string_view dbkey = it->first;
return awakened_keys.insert(dbkey).second;
return awakened_keys.insert(it->first).second;
}
// Optionally removes tx from the front of the watch queues.
@ -212,10 +209,28 @@ void BlockingController::NotifyPending() {
context.db_index = index;
DbWatchTable& wt = *dbit->second;
for (auto key : wt.awakened_keys) {
string_view sv_key = static_cast<string_view>(key);
for (const auto& key : wt.awakened_keys) {
string_view sv_key = key;
DVLOG(1) << "Processing awakened key " << sv_key;
NotifyWatchQueue(sv_key, &wt.queue_map, context);
auto w_it = wt.queue_map.find(sv_key);
if (w_it == wt.queue_map.end()) {
LOG(ERROR) << "Internal error: Key " << sv_key
<< " was not found in the watch queue, wt.awakened_keys len is "
<< wt.awakened_keys.size() << " wt.queue_map len is " << wt.queue_map.size();
for (const auto& item : wt.awakened_keys) {
LOG(ERROR) << "Awakened key: " << item;
}
continue;
}
CHECK(w_it != wt.queue_map.end());
DVLOG(1) << "Notify WQ: [" << owner_->shard_id() << "] " << key;
WatchQueue* wq = w_it->second.get();
NotifyWatchQueue(sv_key, wq, context);
if (wq->items.empty()) {
wt.queue_map.erase(w_it);
}
}
wt.awakened_keys.clear();
@ -270,12 +285,8 @@ void BlockingController::AwakeWatched(DbIndex db_index, string_view db_key) {
}
// Marks the queue as active and notifies the first transaction in the queue.
void BlockingController::NotifyWatchQueue(std::string_view key, WatchQueueMap* wqm,
void BlockingController::NotifyWatchQueue(std::string_view key, WatchQueue* wq,
const DbContext& context) {
auto w_it = wqm->find(key);
CHECK(w_it != wqm->end());
DVLOG(1) << "Notify WQ: [" << owner_->shard_id() << "] " << key;
WatchQueue* wq = w_it->second.get();
DCHECK_EQ(wq->state, WatchQueue::SUSPENDED);
auto& queue = wq->items;
@ -304,10 +315,6 @@ void BlockingController::NotifyWatchQueue(std::string_view key, WatchQueueMap* w
queue.pop_front();
}
std::move(skipped.begin(), skipped.end(), std::back_inserter(queue));
if (wq->items.empty()) {
wqm->erase(w_it);
}
}
size_t BlockingController::NumWatched(DbIndex db_indx) const {