fix: big value serialization corner cases (#3430)

There are some problematic flows. First we did not handle deletions, so all sorts of consistency issues could arise while calling DbSlice::Traverse() and DbSlice::Del(). Second, we did not handle FlushAll (same as before, Traverse() preempts and FlushAll() kicks in. Third we did not handle expirations.

---------

Signed-off-by: kostas <kostas@dragonflydb.io>
This commit is contained in:
Kostas Kyrimis 2024-08-11 14:17:32 +03:00 committed by GitHub
parent 41ebb075a1
commit 1c9e9c5922
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 59 additions and 38 deletions

View file

@ -607,6 +607,9 @@ void EngineShard::Heartbeat() {
RetireExpiredAndEvict();
}
// TODO: iterate over all namespaces
DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id());
// Offset CoolMemoryUsage when consider background offloading.
// TODO: Another approach could be is to align the approach similarly to how we do with
// FreeMemWithEvictionStep, i.e. if memory_budget is below the limit.
@ -621,7 +624,6 @@ void EngineShard::Heartbeat() {
<< " tiering_threshold: " << tiering_offload_threshold
<< ", cool memory: " << tiered_storage_->CoolMemoryUsage();
DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id());
for (unsigned i = 0; i < db_slice.db_array_size(); ++i) {
if (!db_slice.IsDbValid(i))
continue;
@ -631,6 +633,10 @@ void EngineShard::Heartbeat() {
}
void EngineShard::RetireExpiredAndEvict() {
// TODO: iterate over all namespaces
DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id());
// Some of the functions below might acquire the lock again so we need to unlock it
std::unique_lock lk(db_slice.GetSerializationMutex());
constexpr double kTtlDeleteLimit = 200;
constexpr double kRedLimitFactor = 0.1;
@ -651,8 +657,6 @@ void EngineShard::RetireExpiredAndEvict() {
DbContext db_cntx;
db_cntx.time_now_ms = GetCurrentTimeMs();
// TODO: iterate over all namespaces
DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id());
for (unsigned i = 0; i < db_slice.db_array_size(); ++i) {
if (!db_slice.IsDbValid(i))
continue;
@ -674,6 +678,8 @@ void EngineShard::RetireExpiredAndEvict() {
}
}
// Because TriggerOnJournalWriteToSink will lock the same lock leading to a deadlock.
lk.unlock();
// Journal entries for expired entries are not writen to socket in the loop above.
// Trigger write to socket when loop finishes.
if (auto journal = EngineShard::tlocal()->journal(); journal) {