diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index f67cf5cda..29089a5be 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -270,11 +270,12 @@ SliceEvents& SliceEvents::operator+=(const SliceEvents& o) { #undef ADD -DbSlice::DbSlice(uint32_t index, bool caching_mode, EngineShard* owner) +DbSlice::DbSlice(uint32_t index, bool cache_mode, EngineShard* owner) : shard_id_(index), - caching_mode_(caching_mode), + cache_mode_(cache_mode), owner_(owner), client_tracking_map_(owner->memory_resource()) { + load_in_progress_ = false; db_arr_.emplace_back(); CreateDb(0); expire_base_[0] = expire_base_[1] = 0; @@ -471,7 +472,7 @@ OpResult DbSlice::FindInternal(const Context& cntx, std: } } - if (caching_mode_ && IsValid(res.it)) { + if (IsCacheMode() && IsValid(res.it)) { if (!change_cb_.empty()) { FetchedItemsRestorer fetched_restorer(&fetched_items_); auto bump_cb = [&](PrimeTable::bucket_iterator bit) { @@ -600,14 +601,14 @@ OpResult DbSlice::AddOrFindInternal(const Context& cnt !owner_->IsReplica() && !(ServerState::tlocal()->gstate() == GlobalState::LOADING); // If we are over limit in non-cache scenario, just be conservative and throw. - if (apply_memory_limit && !caching_mode_ && memory_budget_ + memory_offset < 0) { + if (apply_memory_limit && !IsCacheMode() && memory_budget_ + memory_offset < 0) { LOG_EVERY_T(WARNING, 1) << "AddOrFind: over limit, budget: " << memory_budget_ << " reclaimed: " << reclaimed << " offset: " << memory_offset; events_.insertion_rejections++; return OpStatus::OUT_OF_MEMORY; } - PrimeEvictionPolicy evp{cntx, (bool(caching_mode_) && !owner_->IsReplica()), + PrimeEvictionPolicy evp{cntx, (IsCacheMode() && !owner_->IsReplica()), memory_offset, ssize_t(soft_budget_limit_), this, apply_memory_limit}; @@ -1272,7 +1273,7 @@ pair DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t s return {0, evicted_bytes}; } - if ((!caching_mode_) || !expire_allowed_) + if ((!IsCacheMode()) || !expire_allowed_) return {0, 0}; auto max_eviction_per_hb = GetFlag(FLAGS_max_eviction_per_heartbeat); diff --git a/src/server/db_slice.h b/src/server/db_slice.h index b6457bb8c..55458cfe1 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -229,7 +229,7 @@ class DbSlice { int32_t expire_options = 0; // ExpireFlags }; - DbSlice(uint32_t index, bool caching_mode, EngineShard* owner); + DbSlice(uint32_t index, bool cache_mode, EngineShard* owner); ~DbSlice(); // Activates `db_ind` database if it does not exist (see ActivateDb below). @@ -468,7 +468,16 @@ class DbSlice { } void TEST_EnableCacheMode() { - caching_mode_ = 1; + cache_mode_ = 1; + } + + bool IsCacheMode() const { + // During loading time we never bump elements. + return cache_mode_ && !load_in_progress_; + } + + void SetLoadInProgress(bool in_progress) { + load_in_progress_ = in_progress; } // Test hook to inspect last locked keys. @@ -575,7 +584,8 @@ class DbSlice { mutable LocalBlockingCounter block_counter_; ShardId shard_id_; - uint8_t caching_mode_ : 1; + uint8_t cache_mode_ : 1; + uint8_t load_in_progress_ : 1; EngineShard* owner_; diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index d49555009..192df90b0 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -2034,6 +2034,9 @@ error_code RdbLoader::Load(io::Source* src) { auto cleanup = absl::Cleanup([&] { FinishLoad(start, &keys_loaded); }); + shard_set->AwaitRunningOnShardQueue([](EngineShard* es) { + namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(true); + }); while (!stop_early_.load(memory_order_relaxed)) { if (pause_) { ThisFiber::SleepFor(100ms); @@ -2223,7 +2226,10 @@ void RdbLoader::FinishLoad(absl::Time start_time, size_t* keys_loaded) { FlushShardAsync(i); // Send sentinel callbacks to ensure that all previous messages have been processed. - shard_set->Add(i, [bc]() mutable { bc->Dec(); }); + shard_set->Add(i, [bc]() mutable { + namespaces->GetDefaultNamespace().GetCurrentDbSlice().SetLoadInProgress(false); + bc->Dec(); + }); } bc->Wait(); // wait for sentinels to report. diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 8d1ad4029..455ce5b1e 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -722,4 +722,16 @@ TEST_F(RdbTest, SnapshotTooBig) { ASSERT_THAT(resp, ErrArg("Out of memory")); } +TEST_F(RdbTest, HugeKeyIssue4497) { + SetTestFlag("cache_mode", "true"); + ResetService(); + + EXPECT_EQ(Run({"flushall"}), "OK"); + EXPECT_EQ(Run({"debug", "populate", "1", "k", "1000", "rand", "type", "set", "elements", "5000"}), + "OK"); + EXPECT_EQ(Run({"save", "rdb", "hugekey.rdb"}), "OK"); + EXPECT_EQ(Run({"dfly", "load", "hugekey.rdb"}), "OK"); + EXPECT_EQ(Run({"flushall"}), "OK"); +} + } // namespace dfly