diff --git a/src/server/common.cc b/src/server/common.cc index e33ade8dc..3de826c4b 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -24,6 +24,7 @@ extern "C" { #include "server/journal/journal.h" #include "server/server_state.h" #include "server/transaction.h" +#include "strings/human_readable.h" ABSL_FLAG(bool, lock_on_hashtags, false, "When true, locks are done in the {hashtag} level instead of key level. " @@ -373,4 +374,19 @@ GenericError Context::ReportErrorInternal(GenericError&& err) { return err_; } +bool AbslParseFlag(std::string_view in, dfly::MemoryBytesFlag* flag, std::string* err) { + int64_t val; + if (dfly::ParseHumanReadableBytes(in, &val) && val >= 0) { + flag->value = val; + return true; + } + + *err = "Use human-readable format, eg.: 500MB, 1G, 1TB"; + return false; +} + +std::string AbslUnparseFlag(const dfly::MemoryBytesFlag& flag) { + return strings::HumanReadableNumBytes(flag.value); +} + } // namespace dfly diff --git a/src/server/common.h b/src/server/common.h index 5a95f99b4..19644c1d8 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -359,4 +359,11 @@ inline uint32_t MemberTimeSeconds(uint64_t now_ms) { return (now_ms / 1000) - kMemberExpiryBase; } +struct MemoryBytesFlag { + uint64_t value = 0; +}; + +bool AbslParseFlag(std::string_view in, dfly::MemoryBytesFlag* flag, std::string* err); +std::string AbslUnparseFlag(const dfly::MemoryBytesFlag& flag); + } // namespace dfly diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 45f086833..a8e86df9a 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -10,6 +10,9 @@ extern "C" { #include "redis/object.h" #include "redis/zmalloc.h" } +#include + +#include #include "base/flags.h" #include "base/logging.h" @@ -19,6 +22,7 @@ extern "C" { #include "server/server_state.h" #include "server/tiered_storage.h" #include "server/transaction.h" +#include "strings/human_readable.h" #include "util/varz.h" using namespace std; @@ -29,6 +33,11 @@ ABSL_FLAG(string, spill_file_prefix, "", " associated with tiered storage. E.g," "spill_file_prefix=/path/to/file-prefix"); +ABSL_FLAG(dfly::MemoryBytesFlag, tiered_max_file_size, dfly::MemoryBytesFlag{}, + "Limit on maximum file size that is used by the database for tiered storage. " + "0 - means the program will automatically determine its maximum file size. " + "default: 0"); + ABSL_FLAG(uint32_t, hz, 100, "Base frequency at which the server performs other background tasks. " "Warning: not advised to decrease in production."); @@ -57,6 +66,7 @@ namespace dfly { using namespace util; using absl::GetFlag; +using strings::HumanReadableNumBytes; namespace { @@ -390,7 +400,7 @@ void EngineShard::StartPeriodicFiber(util::ProactorBase* pb) { }); } -void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) { +void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time, size_t max_file_size) { CHECK(shard_ == nullptr) << pb->GetPoolIndex(); mi_heap_t* data_heap = ServerState::tlocal()->data_heap(); @@ -407,7 +417,7 @@ void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) { exit(1); } - shard_->tiered_storage_.reset(new TieredStorage(&shard_->db_slice_)); + shard_->tiered_storage_.reset(new TieredStorage(&shard_->db_slice_, max_file_size)); error_code ec = shard_->tiered_storage_->Open(backing_prefix); CHECK(!ec) << ec.message(); // TODO } @@ -778,14 +788,52 @@ auto EngineShard::AnalyzeTxQueue() -> TxQueueInfo { */ +uint64_t GetFsLimit() { + std::filesystem::path file_path(GetFlag(FLAGS_spill_file_prefix)); + std::string dir_name_str = file_path.parent_path().string(); + + struct statvfs stat; + if (statvfs(dir_name_str.c_str(), &stat) == 0) { + uint64_t limit = stat.f_frsize * stat.f_blocks; + return limit; + } + LOG(WARNING) << "Error getting filesystem information"; + return 0; +} + void EngineShardSet::Init(uint32_t sz, bool update_db_time) { CHECK_EQ(0u, size()); cached_stats.resize(sz); shard_queue_.resize(sz); + string file_prefix = GetFlag(FLAGS_spill_file_prefix); + size_t max_shard_file_size = 0; + if (!file_prefix.empty()) { + size_t max_file_size = absl::GetFlag(FLAGS_tiered_max_file_size).value; + size_t max_file_size_limit = GetFsLimit(); + if (max_file_size == 0) { + LOG(INFO) << "max_file_size has not been specified. Deciding myself...."; + max_file_size = (max_file_size_limit * 0.8); + } else { + if (max_file_size_limit < max_file_size) { + LOG(WARNING) << "Got max file size " << HumanReadableNumBytes(max_file_size) + << ", however only " << HumanReadableNumBytes(max_file_size_limit) + << " disk space was found."; + } + } + max_shard_file_size = max_file_size / shard_queue_.size(); + if (max_shard_file_size < 256_MB) { + LOG(ERROR) << "Max tiering file size is too small. Setting: " + << HumanReadableNumBytes(max_file_size) << " Required at least " + << HumanReadableNumBytes(256_MB * shard_queue_.size()) << ". Exiting.."; + exit(1); + } + LOG(INFO) << "Max file size is: " << HumanReadableNumBytes(max_file_size); + } + pp_->AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) { if (index < shard_queue_.size()) { - InitThreadLocal(pb, update_db_time); + InitThreadLocal(pb, update_db_time, max_shard_file_size); } }); } @@ -794,8 +842,8 @@ void EngineShardSet::Shutdown() { RunBlockingInParallel([](EngineShard*) { EngineShard::DestroyThreadLocal(); }); } -void EngineShardSet::InitThreadLocal(ProactorBase* pb, bool update_db_time) { - EngineShard::InitThreadLocal(pb, update_db_time); +void EngineShardSet::InitThreadLocal(ProactorBase* pb, bool update_db_time, size_t max_file_size) { + EngineShard::InitThreadLocal(pb, update_db_time, max_file_size); EngineShard* es = EngineShard::tlocal(); shard_queue_[es->shard_id()] = es->GetFiberQueue(); } diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index d45eb42c6..2e3555989 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -49,7 +49,7 @@ class EngineShard { // Sets up a new EngineShard in the thread. // If update_db_time is true, initializes periodic time update for its db_slice. - static void InitThreadLocal(util::ProactorBase* pb, bool update_db_time); + static void InitThreadLocal(util::ProactorBase* pb, bool update_db_time, size_t max_file_size); static void DestroyThreadLocal(); @@ -332,7 +332,7 @@ class EngineShardSet { void TEST_EnableCacheMode(); private: - void InitThreadLocal(util::ProactorBase* pb, bool update_db_time); + void InitThreadLocal(util::ProactorBase* pb, bool update_db_time, size_t max_file_size); util::ProactorPool* pp_; std::vector shard_queue_; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index a988bf24e..38cd7ac7f 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -59,13 +59,6 @@ using namespace std; using facade::ErrorReply; using dfly::operator""_KB; -struct MaxMemoryFlag { - uint64_t value = 0; -}; - -static bool AbslParseFlag(std::string_view in, MaxMemoryFlag* flag, std::string* err); -static std::string AbslUnparseFlag(const MaxMemoryFlag& flag); - ABSL_FLAG(int32_t, port, 6379, "Redis port. 0 disables the port, -1 will bind on a random available port."); @@ -88,7 +81,7 @@ ABSL_FLAG(bool, admin_nopass, false, "If set, would enable open admin access to console on the assigned port, without " "authorization needed."); -ABSL_FLAG(MaxMemoryFlag, maxmemory, MaxMemoryFlag{}, +ABSL_FLAG(dfly::MemoryBytesFlag, maxmemory, dfly::MemoryBytesFlag{}, "Limit on maximum-memory that is used by the database. " "0 - means the program will automatically determine its maximum memory usage. " "default: 0"); @@ -96,21 +89,6 @@ ABSL_FLAG(double, oom_deny_ratio, 1.1, "commands with flag denyoom will return OOM when the ratio between maxmemory and used " "memory is above this value"); -bool AbslParseFlag(std::string_view in, MaxMemoryFlag* flag, std::string* err) { - int64_t val; - if (dfly::ParseHumanReadableBytes(in, &val) && val >= 0) { - flag->value = val; - return true; - } - - *err = "Use human-readable format, eg.: 500MB, 1G, 1TB"; - return false; -} - -std::string AbslUnparseFlag(const MaxMemoryFlag& flag) { - return strings::HumanReadableNumBytes(flag.value); -} - namespace dfly { #if defined(__linux__) @@ -787,7 +765,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector InitRedisTables(); config_registry.RegisterMutable("maxmemory", [](const absl::CommandLineFlag& flag) { - auto res = flag.TryGet(); + auto res = flag.TryGet(); if (!res) return false; diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 75f5c491a..d5f9947da 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -298,7 +298,8 @@ void TieredStorage::InflightWriteRequest::Undo(PerDb::BinRecord* bin_record, DbS } } -TieredStorage::TieredStorage(DbSlice* db_slice) : db_slice_(*db_slice) { +TieredStorage::TieredStorage(DbSlice* db_slice, size_t max_file_size) + : db_slice_(*db_slice), max_file_size_(max_file_size) { } TieredStorage::~TieredStorage() { @@ -312,8 +313,10 @@ error_code TieredStorage::Open(const string& base) { error_code ec = io_mgr_.Open(path); if (!ec) { - if (io_mgr_.Span()) { // Add initial storage. - alloc_.AddStorage(0, io_mgr_.Span()); + size_t initial_size = io_mgr_.Span(); + if (initial_size) { // Add initial storage. + allocated_size_ += initial_size; + alloc_.AddStorage(0, initial_size); } } return ec; @@ -611,7 +614,7 @@ bool TieredStorage::FlushPending(DbIndex db_index, unsigned bin_index) { } void TieredStorage::InitiateGrow(size_t grow_size) { - if (io_mgr_.grow_pending()) + if (io_mgr_.grow_pending() || allocated_size_ + grow_size > max_file_size_) return; DCHECK_GT(grow_size, 0u); @@ -620,6 +623,7 @@ void TieredStorage::InitiateGrow(size_t grow_size) { auto cb = [start, grow_size, this](int io_res) { if (io_res == 0) { alloc_.AddStorage(start, grow_size); + allocated_size_ += grow_size; } else { LOG_FIRST_N(ERROR, 10) << "Error enlarging storage " << io_res; } diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index 719886cab..21ce982b6 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -21,7 +21,7 @@ class TieredStorage { public: enum : uint16_t { kMinBlobLen = 64 }; - explicit TieredStorage(DbSlice* db_slice); + explicit TieredStorage(DbSlice* db_slice, size_t max_file_size); ~TieredStorage(); std::error_code Open(const std::string& path); @@ -69,6 +69,8 @@ class TieredStorage { absl::flat_hash_map page_refcnt_; TieredStats stats_; + size_t max_file_size_; + size_t allocated_size_ = 0; }; } // namespace dfly