diff --git a/src/server/blocking_controller_test.cc b/src/server/blocking_controller_test.cc index 43988b40b..9f133c891 100644 --- a/src/server/blocking_controller_test.cc +++ b/src/server/blocking_controller_test.cc @@ -61,7 +61,7 @@ void BlockingControllerTest::SetUp() { arg_vec_.emplace_back(s); } - trans_->InitByArgs(&namespaces.GetDefaultNamespace(), 0, {arg_vec_.data(), arg_vec_.size()}); + trans_->InitByArgs(&namespaces->GetDefaultNamespace(), 0, {arg_vec_.data(), arg_vec_.size()}); CHECK_EQ(0u, Shard("x", shard_set->size())); CHECK_EQ(2u, Shard("z", shard_set->size())); @@ -71,7 +71,6 @@ void BlockingControllerTest::SetUp() { void BlockingControllerTest::TearDown() { shard_set->PreShutdown(); - namespaces.Clear(); shard_set->Shutdown(); delete shard_set; @@ -81,7 +80,7 @@ void BlockingControllerTest::TearDown() { TEST_F(BlockingControllerTest, Basic) { trans_->ScheduleSingleHop([&](Transaction* t, EngineShard* shard) { - BlockingController bc(shard, &namespaces.GetDefaultNamespace()); + BlockingController bc(shard, &namespaces->GetDefaultNamespace()); auto keys = t->GetShardArgs(shard->shard_id()); bc.AddWatched( keys, [](auto...) { return true; }, t); @@ -107,7 +106,7 @@ TEST_F(BlockingControllerTest, Timeout) { unsigned num_watched = shard_set->Await( 0, [&] { - return namespaces.GetDefaultNamespace() + return namespaces->GetDefaultNamespace() .GetBlockingController(EngineShard::tlocal()->shard_id()) ->NumWatched(0); }); diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index a195dc99c..042faa976 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -477,7 +477,7 @@ void DeleteSlots(const SlotRanges& slots_ranges) { if (shard == nullptr) return; - namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()).FlushSlots(slots_ranges); + namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).FlushSlots(slots_ranges); }; shard_set->pool()->AwaitFiberOnAll(std::move(cb)); } @@ -633,7 +633,7 @@ void ClusterFamily::DflyClusterGetSlotInfo(CmdArgList args, SinkReplyBuilder* bu util::fb2::LockGuard lk(mu); for (auto& [slot, data] : slots_stats) { - data += namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()).GetSlotStats(slot); + data += namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).GetSlotStats(slot); } }; diff --git a/src/server/cluster/cluster_utility.cc b/src/server/cluster/cluster_utility.cc index c56ad919f..9fe22876b 100644 --- a/src/server/cluster/cluster_utility.cc +++ b/src/server/cluster/cluster_utility.cc @@ -50,7 +50,7 @@ uint64_t GetKeyCount(const SlotRanges& slots) { uint64_t shard_keys = 0; for (const SlotRange& range : slots) { for (SlotId slot = range.start; slot <= range.end; slot++) { - shard_keys += namespaces.GetDefaultNamespace() + shard_keys += namespaces->GetDefaultNamespace() .GetDbSlice(shard->shard_id()) .GetSlotStats(slot) .key_count; diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index f0b7554be..a8fb21db3 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -101,7 +101,7 @@ OutgoingMigration::OutgoingMigration(MigrationInfo info, ClusterFamily* cf, Serv server_family_(sf), cf_(cf), tx_(new Transaction{sf->service().FindCmd("DFLYCLUSTER")}) { - tx_->InitByArgs(&namespaces.GetDefaultNamespace(), 0, {}); + tx_->InitByArgs(&namespaces->GetDefaultNamespace(), 0, {}); } OutgoingMigration::~OutgoingMigration() { @@ -218,7 +218,7 @@ void OutgoingMigration::SyncFb() { } OnAllShards([this](auto& migration) { - DbSlice& db_slice = namespaces.GetDefaultNamespace().GetCurrentDbSlice(); + DbSlice& db_slice = namespaces->GetDefaultNamespace().GetCurrentDbSlice(); server_family_->journal()->StartInThread(); migration = std::make_unique( &db_slice, server(), migration_info_.slot_ranges, server_family_->journal()); @@ -291,8 +291,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { bool is_block_active = true; auto is_pause_in_progress = [&is_block_active] { return is_block_active; }; auto pause_fb_opt = - Pause(server_family_->GetNonPriviligedListeners(), &namespaces.GetDefaultNamespace(), nullptr, - ClientPause::WRITE, is_pause_in_progress); + Pause(server_family_->GetNonPriviligedListeners(), &namespaces->GetDefaultNamespace(), + nullptr, ClientPause::WRITE, is_pause_in_progress); if (!pause_fb_opt) { LOG(WARNING) << "Cluster migration finalization time out"; diff --git a/src/server/common.cc b/src/server/common.cc index bb78d4e46..56520ff05 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -120,6 +120,7 @@ atomic_uint64_t rss_mem_peak(0); unsigned kernel_version = 0; size_t max_memory_limit = 0; size_t serialization_max_chunk_size = 0; +Namespaces* namespaces = nullptr; const char* GlobalStateName(GlobalState s) { switch (s) { diff --git a/src/server/common.h b/src/server/common.h index 1ab29b270..13e6db336 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -49,6 +49,7 @@ class Transaction; class EngineShard; class ConnectionState; class Interpreter; +class Namespaces; struct LockTagOptions { bool enabled = false; @@ -132,6 +133,8 @@ extern std::atomic_uint64_t rss_mem_peak; extern size_t max_memory_limit; +extern Namespaces* namespaces; + // version 5.11 maps to 511 etc. // set upon server start. extern unsigned kernel_version; diff --git a/src/server/detail/save_stages_controller.cc b/src/server/detail/save_stages_controller.cc index ddfa4eb37..c03ff0807 100644 --- a/src/server/detail/save_stages_controller.cc +++ b/src/server/detail/save_stages_controller.cc @@ -434,7 +434,7 @@ void SaveStagesController::CloseCb(unsigned index) { } if (auto* es = EngineShard::tlocal(); use_dfs_format_ && es) - namespaces.GetDefaultNamespace().GetDbSlice(es->shard_id()).ResetUpdateEvents(); + namespaces->GetDefaultNamespace().GetDbSlice(es->shard_id()).ResetUpdateEvents(); } void SaveStagesController::RunStage(void (SaveStagesController::*cb)(unsigned)) { diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 0d6251d3c..7c1ba2cf1 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -77,7 +77,7 @@ bool WaitReplicaFlowToCatchup(absl::Time end_time, const DflyCmd::ReplicaInfo* r EngineShard* shard) { // We don't want any writes to the journal after we send the `PING`, // and expirations could ruin that. - namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(false); + namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(false); shard->journal()->RecordEntry(0, journal::Op::PING, 0, 0, nullopt, {}, true); const FlowInfo* flow = &replica->flows[shard->shard_id()]; @@ -455,7 +455,7 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext absl::Cleanup cleanup([] { VLOG(2) << "Enabling expiration"; shard_set->RunBriefInParallel([](EngineShard* shard) { - namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(true); + namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(true); }); }); diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index ee61612c1..0a1b131b4 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -379,7 +379,7 @@ TEST_F(DflyEngineTest, MemcacheFlags) { ASSERT_EQ(Run("resp", {"flushdb"}), "OK"); pp_->AwaitFiberOnAll([](auto*) { if (auto* shard = EngineShard::tlocal(); shard) { - EXPECT_EQ(namespaces.GetDefaultNamespace() + EXPECT_EQ(namespaces->GetDefaultNamespace() .GetDbSlice(shard->shard_id()) .GetDBTable(0) ->mcflag.size(), @@ -600,7 +600,7 @@ TEST_F(DflyEngineTest, Bug468) { TEST_F(DflyEngineTest, Bug496) { shard_set->RunBlockingInParallel([](EngineShard* shard) { - auto& db = namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()); + auto& db = namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()); int cb_hits = 0; uint32_t cb_id = diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 53de0b720..29c7957f2 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -309,7 +309,7 @@ bool EngineShard::DoDefrag() { const float threshold = GetFlag(FLAGS_mem_defrag_page_utilization_threshold); // TODO: enable tiered storage on non-default db slice - DbSlice& slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_->shard_id()); + DbSlice& slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_->shard_id()); // If we moved to an invalid db, skip as long as it's not the last one while (!slice.IsDbValid(defrag_state_.dbid) && defrag_state_.dbid + 1 < slice.db_array_size()) @@ -339,7 +339,7 @@ bool EngineShard::DoDefrag() { } }); traverses_count++; - } while (traverses_count < kMaxTraverses && cur && namespaces.IsInitialized()); + } while (traverses_count < kMaxTraverses && cur && namespaces); defrag_state_.UpdateScanState(cur.value()); @@ -370,7 +370,7 @@ bool EngineShard::DoDefrag() { // priority. // otherwise lower the task priority so that it would not use the CPU when not required uint32_t EngineShard::DefragTask() { - if (!namespaces.IsInitialized()) { + if (!namespaces) { return util::ProactorBase::kOnIdleMaxLevel; } @@ -392,7 +392,6 @@ EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap) txq_([](const Transaction* t) { return t->txid(); }), mi_resource_(heap), shard_id_(pb->GetPoolIndex()) { - defrag_task_ = pb->AddOnIdleTask([this]() { return DefragTask(); }); queue_.Start(absl::StrCat("shard_queue_", shard_id())); queue2_.Start(absl::StrCat("l2_queue_", shard_id())); } @@ -452,6 +451,7 @@ void EngineShard::StartPeriodicHeartbeatFiber(util::ProactorBase* pb) { ThisFiber::SetName(absl::StrCat("heartbeat_periodic", index)); RunFPeriodically(heartbeat, period_ms, "heartbeat", &fiber_heartbeat_periodic_done_); }); + defrag_task_ = pb->AddOnIdleTask([this]() { return DefragTask(); }); } void EngineShard::StartPeriodicShardHandlerFiber(util::ProactorBase* pb, @@ -492,7 +492,7 @@ void EngineShard::InitTieredStorage(ProactorBase* pb, size_t max_file_size) { << "Only ioring based backing storage is supported. Exiting..."; // TODO: enable tiered storage on non-default namespace - DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id()); + DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); auto* shard = EngineShard::tlocal(); shard->tiered_storage_ = make_unique(max_file_size, &db_slice); error_code ec = shard->tiered_storage_->Open(backing_prefix); @@ -657,7 +657,7 @@ void EngineShard::RemoveContTx(Transaction* tx) { void EngineShard::Heartbeat() { DVLOG(2) << " Hearbeat"; - DCHECK(namespaces.IsInitialized()); + DCHECK(namespaces); CacheStats(); @@ -666,7 +666,7 @@ void EngineShard::Heartbeat() { } // TODO: iterate over all namespaces - DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id()); + DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); // Offset CoolMemoryUsage when consider background offloading. // TODO: Another approach could be is to align the approach similarly to how we do with @@ -692,7 +692,7 @@ void EngineShard::Heartbeat() { void EngineShard::RetireExpiredAndEvict() { // TODO: iterate over all namespaces - DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id()); + DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); // Some of the functions below might acquire the same lock again so we need to unlock it // asap. We won't yield before we relock the mutex again, so the code below is atomic // in respect to preemptions of big values. An example of that is the call to @@ -758,7 +758,7 @@ void EngineShard::CacheStats() { cache_stats_time_ = now; // Used memory for this shard. size_t used_mem = UsedMemory(); - DbSlice& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id()); + DbSlice& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id()); // delta can wrap if used_memory is smaller than last_cached_used_memory_ and it's fine. size_t delta = used_mem - last_cached_used_memory_; @@ -808,7 +808,7 @@ EngineShard::TxQueueInfo EngineShard::AnalyzeTxQueue() const { info.tx_total = queue->size(); unsigned max_db_id = 0; - auto& db_slice = namespaces.GetDefaultNamespace().GetCurrentDbSlice(); + auto& db_slice = namespaces->GetDefaultNamespace().GetCurrentDbSlice(); { auto value = queue->At(cur); diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index a057abef4..631b66404 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -103,7 +103,10 @@ EngineShardSet* shard_set = nullptr; void EngineShardSet::Init(uint32_t sz, std::function shard_handler) { CHECK_EQ(0u, size()); + CHECK(namespaces == nullptr); + shards_.reset(new EngineShard*[sz]); + size_ = sz; size_t max_shard_file_size = GetTieredFileLimit(sz); pp_->AwaitFiberOnAll([this](uint32_t index, ProactorBase* pb) { @@ -112,7 +115,8 @@ void EngineShardSet::Init(uint32_t sz, std::function shard_handler) { } }); - namespaces.Init(); + // The order is important here. We must initialize namespaces after shards_. + namespaces = new Namespaces(); pp_->AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) { if (index < size_) { @@ -139,7 +143,13 @@ void EngineShardSet::PreShutdown() { } void EngineShardSet::Shutdown() { + // Calling Namespaces::Clear before destroying engine shards, because it accesses them + // internally. + namespaces->Clear(); RunBlockingInParallel([](EngineShard*) { EngineShard::DestroyThreadLocal(); }); + + delete namespaces; + namespaces = nullptr; } void EngineShardSet::InitThreadLocal(ProactorBase* pb) { @@ -150,7 +160,7 @@ void EngineShardSet::InitThreadLocal(ProactorBase* pb) { void EngineShardSet::TEST_EnableCacheMode() { RunBlockingInParallel([](EngineShard* shard) { - namespaces.GetDefaultNamespace().GetCurrentDbSlice().TEST_EnableCacheMode(); + namespaces->GetDefaultNamespace().GetCurrentDbSlice().TEST_EnableCacheMode(); }); } diff --git a/src/server/journal/executor.cc b/src/server/journal/executor.cc index 5f11e2891..ea1b2dd84 100644 --- a/src/server/journal/executor.cc +++ b/src/server/journal/executor.cc @@ -42,7 +42,7 @@ JournalExecutor::JournalExecutor(Service* service) conn_context_.is_replicating = true; conn_context_.journal_emulated = true; conn_context_.skip_acl_validation = true; - conn_context_.ns = &namespaces.GetDefaultNamespace(); + conn_context_.ns = &namespaces->GetDefaultNamespace(); } JournalExecutor::~JournalExecutor() { diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index b9ef3ab71..f904a8d53 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -33,7 +33,7 @@ class ListFamilyTest : public BaseFamilyTest { static unsigned NumWatched() { atomic_uint32_t sum{0}; - auto ns = &namespaces.GetDefaultNamespace(); + auto ns = &namespaces->GetDefaultNamespace(); shard_set->RunBriefInParallel([&](EngineShard* es) { auto* bc = ns->GetBlockingController(es->shard_id()); if (bc) @@ -45,7 +45,7 @@ class ListFamilyTest : public BaseFamilyTest { static bool HasAwakened() { atomic_uint32_t sum{0}; - auto ns = &namespaces.GetDefaultNamespace(); + auto ns = &namespaces->GetDefaultNamespace(); shard_set->RunBriefInParallel([&](EngineShard* es) { auto* bc = ns->GetBlockingController(es->shard_id()); if (bc) diff --git a/src/server/main_service.cc b/src/server/main_service.cc index cf765c627..ee0cf4846 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -468,7 +468,7 @@ void Topkeys(const http::QueryArgs& args, HttpContext* send) { shard_set->RunBriefInParallel([&](EngineShard* shard) { for (const auto& db : - namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()).databases()) { + namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).databases()) { if (db->top_keys.IsEnabled()) { is_enabled = true; for (const auto& [key, count] : db->top_keys.GetTopKeys()) { @@ -823,7 +823,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector auto* shard = EngineShard::tlocal(); if (shard) { auto shard_id = shard->shard_id(); - auto& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard_id); + auto& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard_id); db_slice.SetNotifyKeyspaceEvents(*res); } }); @@ -897,7 +897,6 @@ void Service::Shutdown() { ChannelStore::Destroy(); shard_set->PreShutdown(); - namespaces.Clear(); shard_set->Shutdown(); Transaction::Shutdown(); @@ -1586,7 +1585,7 @@ facade::ConnectionContext* Service::CreateContext(util::FiberSocketBase* peer, facade::Connection* owner) { auto cred = user_registry_.GetCredentials("default"); ConnectionContext* res = new ConnectionContext{peer, owner, std::move(cred)}; - res->ns = &namespaces.GetOrInsert(""); + res->ns = &namespaces->GetOrInsert(""); if (peer->IsUDS()) { res->req_auth = false; @@ -2449,7 +2448,7 @@ void Service::Command(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde VarzValue::Map Service::GetVarzStats() { VarzValue::Map res; - Metrics m = server_family_.GetMetrics(&namespaces.GetDefaultNamespace()); + Metrics m = server_family_.GetMetrics(&namespaces->GetDefaultNamespace()); DbStats db_stats; for (const auto& s : m.db_stats) { db_stats += s; diff --git a/src/server/namespaces.cc b/src/server/namespaces.cc index a6fd2de54..ad2cefa09 100644 --- a/src/server/namespaces.cc +++ b/src/server/namespaces.cc @@ -1,3 +1,7 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + #include "server/namespaces.h" #include "base/flags.h" @@ -45,19 +49,12 @@ BlockingController* Namespace::GetBlockingController(ShardId sid) { return shard_blocking_controller_[sid].get(); } -Namespaces namespaces; - -Namespaces::~Namespaces() { - Clear(); -} - -void Namespaces::Init() { - DCHECK(default_namespace_ == nullptr); +Namespaces::Namespaces() { default_namespace_ = &GetOrInsert(""); } -bool Namespaces::IsInitialized() const { - return default_namespace_ != nullptr; +Namespaces::~Namespaces() { + Clear(); } void Namespaces::Clear() { diff --git a/src/server/namespaces.h b/src/server/namespaces.h index 66f924ea8..b3d8d52be 100644 --- a/src/server/namespaces.h +++ b/src/server/namespaces.h @@ -49,11 +49,9 @@ class Namespace { // mutual dependencies. class Namespaces { public: - Namespaces() = default; + Namespaces(); ~Namespaces(); - void Init(); - bool IsInitialized() const; void Clear() ABSL_LOCKS_EXCLUDED(mu_); // Thread unsafe, use in tear-down or tests Namespace& GetDefaultNamespace() const; // No locks @@ -65,6 +63,4 @@ class Namespaces { Namespace* default_namespace_ = nullptr; }; -extern Namespaces namespaces; - } // namespace dfly diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index fa99a64f8..8340d7792 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -2258,7 +2258,7 @@ error_code RdbLoader::Load(io::Source* src) { // Active database if not existed before. shard_set->Add( - i, [dbid] { namespaces.GetDefaultNamespace().GetCurrentDbSlice().ActivateDb(dbid); }); + i, [dbid] { namespaces->GetDefaultNamespace().GetCurrentDbSlice().ActivateDb(dbid); }); } cur_db_index_ = dbid; @@ -2656,7 +2656,7 @@ std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, LoadConfig co void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) { EngineShard* es = EngineShard::tlocal(); - DbContext db_cntx{&namespaces.GetDefaultNamespace(), db_ind, GetCurrentTimeMs()}; + DbContext db_cntx{&namespaces->GetDefaultNamespace(), db_ind, GetCurrentTimeMs()}; DbSlice& db_slice = db_cntx.GetDbSlice(es->shard_id()); auto error_msg = [](const auto* item, auto db_ind) { @@ -2860,7 +2860,7 @@ void RdbLoader::LoadSearchIndexDefFromAux(string&& def) { cntx.is_replicating = true; cntx.journal_emulated = true; cntx.skip_acl_validation = true; - cntx.ns = &namespaces.GetDefaultNamespace(); + cntx.ns = &namespaces->GetDefaultNamespace(); uint32_t consumed = 0; facade::RespVec resp_vec; @@ -2897,7 +2897,7 @@ void RdbLoader::PerformPostLoad(Service* service) { // Rebuild all search indices as only their definitions are extracted from the snapshot shard_set->AwaitRunningOnShardQueue([](EngineShard* es) { es->search_indices()->RebuildAllIndices( - OpArgs{es, nullptr, DbContext{&namespaces.GetDefaultNamespace(), 0, GetCurrentTimeMs()}}); + OpArgs{es, nullptr, DbContext{&namespaces->GetDefaultNamespace(), 0, GetCurrentTimeMs()}}); }); } diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index f65aa8475..9bab44b8f 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -1327,7 +1327,7 @@ void RdbSaver::Impl::FinalizeSnapshotWriting() { void RdbSaver::Impl::StartSnapshotting(bool stream_journal, Context* cntx, EngineShard* shard) { auto& s = GetSnapshot(shard); - auto& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()); + auto& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()); auto on_snapshot_finish = std::bind(&RdbSaver::Impl::FinalizeSnapshotWriting, this); auto push_cb = std::bind(&RdbSaver::Impl::PushSnapshotData, this, cntx, std::placeholders::_1); @@ -1341,7 +1341,7 @@ void RdbSaver::Impl::StartSnapshotting(bool stream_journal, Context* cntx, Engin void RdbSaver::Impl::StartIncrementalSnapshotting(Context* cntx, EngineShard* shard, LSN start_lsn) { - auto& db_slice = namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()); + auto& db_slice = namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()); auto& s = GetSnapshot(shard); auto on_finalize_cb = std::bind(&RdbSaver::Impl::FinalizeSnapshotWriting, this); auto push_cb = std::bind(&RdbSaver::Impl::PushSnapshotData, this, cntx, std::placeholders::_1); diff --git a/src/server/replica.cc b/src/server/replica.cc index 58fb53bb7..a33a49a04 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -597,7 +597,7 @@ error_code Replica::ConsumeRedisStream() { conn_context.is_replicating = true; conn_context.journal_emulated = true; conn_context.skip_acl_validation = true; - conn_context.ns = &namespaces.GetDefaultNamespace(); + conn_context.ns = &namespaces->GetDefaultNamespace(); // we never reply back on the commands. facade::CapturingReplyBuilder null_builder{facade::ReplyMode::NONE}; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 39f1f7c88..34e8ff150 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1533,7 +1533,7 @@ void ServerFamily::ConfigureMetrics(util::HttpListenerBase* http_base) { auto cb = [this](const util::http::QueryArgs& args, util::HttpContext* send) { StringResponse resp = util::http::MakeStringResponse(boost::beast::http::status::ok); - PrintPrometheusMetrics(this->GetMetrics(&namespaces.GetDefaultNamespace()), + PrintPrometheusMetrics(this->GetMetrics(&namespaces->GetDefaultNamespace()), this->dfly_cmd_.get(), &resp); return send->Invoke(std::move(resp)); @@ -1608,7 +1608,7 @@ void ServerFamily::StatsMC(std::string_view section, SinkReplyBuilder* builder) double utime = dbl_time(ru.ru_utime); double systime = dbl_time(ru.ru_stime); - Metrics m = GetMetrics(&namespaces.GetDefaultNamespace()); + Metrics m = GetMetrics(&namespaces->GetDefaultNamespace()); ADD_LINE(pid, getpid()); ADD_LINE(uptime, m.uptime); @@ -1638,7 +1638,7 @@ GenericError ServerFamily::DoSave(bool ignore_state) { const CommandId* cid = service().FindCmd("SAVE"); CHECK_NOTNULL(cid); boost::intrusive_ptr trans(new Transaction{cid}); - trans->InitByArgs(&namespaces.GetDefaultNamespace(), 0, {}); + trans->InitByArgs(&namespaces->GetDefaultNamespace(), 0, {}); return DoSave(absl::GetFlag(FLAGS_df_snapshot_format), {}, trans.get(), ignore_state); } @@ -1826,7 +1826,7 @@ bool ServerFamily::DoAuth(ConnectionContext* cntx, std::string_view username, cntx->acl_commands = cred.acl_commands; cntx->keys = std::move(cred.keys); cntx->pub_sub = std::move(cred.pub_sub); - cntx->ns = &namespaces.GetOrInsert(cred.ns); + cntx->ns = &namespaces->GetOrInsert(cred.ns); cntx->authenticated = true; } return is_authorized; diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index d86d8404f..f1a4e7791 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -82,7 +82,7 @@ void TransactionSuspension::Start() { transaction_ = new dfly::Transaction{&cid}; - auto st = transaction_->InitByArgs(&namespaces.GetDefaultNamespace(), 0, {}); + auto st = transaction_->InitByArgs(&namespaces->GetDefaultNamespace(), 0, {}); CHECK_EQ(st, OpStatus::OK); transaction_->Execute([](Transaction* t, EngineShard* shard) { return OpStatus::OK; }, false); @@ -109,7 +109,7 @@ class BaseFamilyTest::TestConnWrapper { ConnectionContext* cmd_cntx() { auto cntx = static_cast(dummy_conn_->cntx()); - cntx->ns = &namespaces.GetDefaultNamespace(); + cntx->ns = &namespaces->GetDefaultNamespace(); return cntx; } @@ -213,7 +213,7 @@ void BaseFamilyTest::ResetService() { used_mem_current = 0; TEST_current_time_ms = absl::GetCurrentTimeNanos() / 1000000; - auto default_ns = &namespaces.GetDefaultNamespace(); + auto default_ns = &namespaces->GetDefaultNamespace(); auto cb = [&](EngineShard* s) { default_ns->GetDbSlice(s->shard_id()).UpdateExpireBase(TEST_current_time_ms - 1000, 0); }; @@ -250,7 +250,7 @@ void BaseFamilyTest::ResetService() { } LOG(ERROR) << "TxLocks for shard " << es->shard_id(); - for (const auto& k_v : namespaces.GetDefaultNamespace() + for (const auto& k_v : namespaces->GetDefaultNamespace() .GetDbSlice(es->shard_id()) .GetDBTable(0) ->trans_locks) { @@ -305,7 +305,7 @@ void BaseFamilyTest::CleanupSnapshots() { unsigned BaseFamilyTest::NumLocked() { atomic_uint count = 0; - auto default_ns = &namespaces.GetDefaultNamespace(); + auto default_ns = &namespaces->GetDefaultNamespace(); shard_set->RunBriefInParallel([&](EngineShard* shard) { for (const auto& db : default_ns->GetDbSlice(shard->shard_id()).databases()) { if (db == nullptr) { @@ -386,7 +386,7 @@ RespExpr BaseFamilyTest::Run(std::string_view id, ArgSlice slice) { CmdArgVec args = conn_wrapper->Args(slice); auto* context = conn_wrapper->cmd_cntx(); - context->ns = &namespaces.GetDefaultNamespace(); + context->ns = &namespaces->GetDefaultNamespace(); DCHECK(context->transaction == nullptr) << id; @@ -566,7 +566,7 @@ BaseFamilyTest::TestConnWrapper::GetInvalidationMessage(size_t index) const { } bool BaseFamilyTest::IsLocked(DbIndex db_index, std::string_view key) const { - return service_->IsLocked(&namespaces.GetDefaultNamespace(), db_index, key); + return service_->IsLocked(&namespaces->GetDefaultNamespace(), db_index, key); } string BaseFamilyTest::GetId() const { @@ -654,7 +654,7 @@ vector BaseFamilyTest::GetLastFps() { lock_guard lk(mu); for (auto fp : - namespaces.GetDefaultNamespace().GetDbSlice(shard->shard_id()).TEST_GetLastLockedFps()) { + namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).TEST_GetLastLockedFps()) { result.push_back(fp); } }; diff --git a/src/server/test_utils.h b/src/server/test_utils.h index 3656def8a..b0d86f50b 100644 --- a/src/server/test_utils.h +++ b/src/server/test_utils.h @@ -117,7 +117,7 @@ class BaseFamilyTest : public ::testing::Test { static std::vector StrArray(const RespExpr& expr); Metrics GetMetrics() const { - return service_->server_family().GetMetrics(&namespaces.GetDefaultNamespace()); + return service_->server_family().GetMetrics(&namespaces->GetDefaultNamespace()); } void ClearMetrics();