feat(tiering): add max file size limit (#2344)

* add max file size flag

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2024-01-02 12:07:34 +02:00 committed by GitHub
parent fc1a70598d
commit c60e4accc6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 91 additions and 36 deletions

View file

@ -24,6 +24,7 @@ extern "C" {
#include "server/journal/journal.h"
#include "server/server_state.h"
#include "server/transaction.h"
#include "strings/human_readable.h"
ABSL_FLAG(bool, lock_on_hashtags, false,
"When true, locks are done in the {hashtag} level instead of key level. "
@ -373,4 +374,19 @@ GenericError Context::ReportErrorInternal(GenericError&& err) {
return err_;
}
bool AbslParseFlag(std::string_view in, dfly::MemoryBytesFlag* flag, std::string* err) {
int64_t val;
if (dfly::ParseHumanReadableBytes(in, &val) && val >= 0) {
flag->value = val;
return true;
}
*err = "Use human-readable format, eg.: 500MB, 1G, 1TB";
return false;
}
std::string AbslUnparseFlag(const dfly::MemoryBytesFlag& flag) {
return strings::HumanReadableNumBytes(flag.value);
}
} // namespace dfly

View file

@ -359,4 +359,11 @@ inline uint32_t MemberTimeSeconds(uint64_t now_ms) {
return (now_ms / 1000) - kMemberExpiryBase;
}
struct MemoryBytesFlag {
uint64_t value = 0;
};
bool AbslParseFlag(std::string_view in, dfly::MemoryBytesFlag* flag, std::string* err);
std::string AbslUnparseFlag(const dfly::MemoryBytesFlag& flag);
} // namespace dfly

View file

@ -10,6 +10,9 @@ extern "C" {
#include "redis/object.h"
#include "redis/zmalloc.h"
}
#include <sys/statvfs.h>
#include <filesystem>
#include "base/flags.h"
#include "base/logging.h"
@ -19,6 +22,7 @@ extern "C" {
#include "server/server_state.h"
#include "server/tiered_storage.h"
#include "server/transaction.h"
#include "strings/human_readable.h"
#include "util/varz.h"
using namespace std;
@ -29,6 +33,11 @@ ABSL_FLAG(string, spill_file_prefix, "",
" associated with tiered storage. E.g,"
"spill_file_prefix=/path/to/file-prefix");
ABSL_FLAG(dfly::MemoryBytesFlag, tiered_max_file_size, dfly::MemoryBytesFlag{},
"Limit on maximum file size that is used by the database for tiered storage. "
"0 - means the program will automatically determine its maximum file size. "
"default: 0");
ABSL_FLAG(uint32_t, hz, 100,
"Base frequency at which the server performs other background tasks. "
"Warning: not advised to decrease in production.");
@ -57,6 +66,7 @@ namespace dfly {
using namespace util;
using absl::GetFlag;
using strings::HumanReadableNumBytes;
namespace {
@ -390,7 +400,7 @@ void EngineShard::StartPeriodicFiber(util::ProactorBase* pb) {
});
}
void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time, size_t max_file_size) {
CHECK(shard_ == nullptr) << pb->GetPoolIndex();
mi_heap_t* data_heap = ServerState::tlocal()->data_heap();
@ -407,7 +417,7 @@ void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
exit(1);
}
shard_->tiered_storage_.reset(new TieredStorage(&shard_->db_slice_));
shard_->tiered_storage_.reset(new TieredStorage(&shard_->db_slice_, max_file_size));
error_code ec = shard_->tiered_storage_->Open(backing_prefix);
CHECK(!ec) << ec.message(); // TODO
}
@ -778,14 +788,52 @@ auto EngineShard::AnalyzeTxQueue() -> TxQueueInfo {
*/
uint64_t GetFsLimit() {
std::filesystem::path file_path(GetFlag(FLAGS_spill_file_prefix));
std::string dir_name_str = file_path.parent_path().string();
struct statvfs stat;
if (statvfs(dir_name_str.c_str(), &stat) == 0) {
uint64_t limit = stat.f_frsize * stat.f_blocks;
return limit;
}
LOG(WARNING) << "Error getting filesystem information";
return 0;
}
void EngineShardSet::Init(uint32_t sz, bool update_db_time) {
CHECK_EQ(0u, size());
cached_stats.resize(sz);
shard_queue_.resize(sz);
string file_prefix = GetFlag(FLAGS_spill_file_prefix);
size_t max_shard_file_size = 0;
if (!file_prefix.empty()) {
size_t max_file_size = absl::GetFlag(FLAGS_tiered_max_file_size).value;
size_t max_file_size_limit = GetFsLimit();
if (max_file_size == 0) {
LOG(INFO) << "max_file_size has not been specified. Deciding myself....";
max_file_size = (max_file_size_limit * 0.8);
} else {
if (max_file_size_limit < max_file_size) {
LOG(WARNING) << "Got max file size " << HumanReadableNumBytes(max_file_size)
<< ", however only " << HumanReadableNumBytes(max_file_size_limit)
<< " disk space was found.";
}
}
max_shard_file_size = max_file_size / shard_queue_.size();
if (max_shard_file_size < 256_MB) {
LOG(ERROR) << "Max tiering file size is too small. Setting: "
<< HumanReadableNumBytes(max_file_size) << " Required at least "
<< HumanReadableNumBytes(256_MB * shard_queue_.size()) << ". Exiting..";
exit(1);
}
LOG(INFO) << "Max file size is: " << HumanReadableNumBytes(max_file_size);
}
pp_->AwaitFiberOnAll([&](uint32_t index, ProactorBase* pb) {
if (index < shard_queue_.size()) {
InitThreadLocal(pb, update_db_time);
InitThreadLocal(pb, update_db_time, max_shard_file_size);
}
});
}
@ -794,8 +842,8 @@ void EngineShardSet::Shutdown() {
RunBlockingInParallel([](EngineShard*) { EngineShard::DestroyThreadLocal(); });
}
void EngineShardSet::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
EngineShard::InitThreadLocal(pb, update_db_time);
void EngineShardSet::InitThreadLocal(ProactorBase* pb, bool update_db_time, size_t max_file_size) {
EngineShard::InitThreadLocal(pb, update_db_time, max_file_size);
EngineShard* es = EngineShard::tlocal();
shard_queue_[es->shard_id()] = es->GetFiberQueue();
}

View file

@ -49,7 +49,7 @@ class EngineShard {
// Sets up a new EngineShard in the thread.
// If update_db_time is true, initializes periodic time update for its db_slice.
static void InitThreadLocal(util::ProactorBase* pb, bool update_db_time);
static void InitThreadLocal(util::ProactorBase* pb, bool update_db_time, size_t max_file_size);
static void DestroyThreadLocal();
@ -332,7 +332,7 @@ class EngineShardSet {
void TEST_EnableCacheMode();
private:
void InitThreadLocal(util::ProactorBase* pb, bool update_db_time);
void InitThreadLocal(util::ProactorBase* pb, bool update_db_time, size_t max_file_size);
util::ProactorPool* pp_;
std::vector<FiberQueue*> shard_queue_;

View file

@ -59,13 +59,6 @@ using namespace std;
using facade::ErrorReply;
using dfly::operator""_KB;
struct MaxMemoryFlag {
uint64_t value = 0;
};
static bool AbslParseFlag(std::string_view in, MaxMemoryFlag* flag, std::string* err);
static std::string AbslUnparseFlag(const MaxMemoryFlag& flag);
ABSL_FLAG(int32_t, port, 6379,
"Redis port. 0 disables the port, -1 will bind on a random available port.");
@ -88,7 +81,7 @@ ABSL_FLAG(bool, admin_nopass, false,
"If set, would enable open admin access to console on the assigned port, without "
"authorization needed.");
ABSL_FLAG(MaxMemoryFlag, maxmemory, MaxMemoryFlag{},
ABSL_FLAG(dfly::MemoryBytesFlag, maxmemory, dfly::MemoryBytesFlag{},
"Limit on maximum-memory that is used by the database. "
"0 - means the program will automatically determine its maximum memory usage. "
"default: 0");
@ -96,21 +89,6 @@ ABSL_FLAG(double, oom_deny_ratio, 1.1,
"commands with flag denyoom will return OOM when the ratio between maxmemory and used "
"memory is above this value");
bool AbslParseFlag(std::string_view in, MaxMemoryFlag* flag, std::string* err) {
int64_t val;
if (dfly::ParseHumanReadableBytes(in, &val) && val >= 0) {
flag->value = val;
return true;
}
*err = "Use human-readable format, eg.: 500MB, 1G, 1TB";
return false;
}
std::string AbslUnparseFlag(const MaxMemoryFlag& flag) {
return strings::HumanReadableNumBytes(flag.value);
}
namespace dfly {
#if defined(__linux__)
@ -787,7 +765,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
InitRedisTables();
config_registry.RegisterMutable("maxmemory", [](const absl::CommandLineFlag& flag) {
auto res = flag.TryGet<MaxMemoryFlag>();
auto res = flag.TryGet<MemoryBytesFlag>();
if (!res)
return false;

View file

@ -298,7 +298,8 @@ void TieredStorage::InflightWriteRequest::Undo(PerDb::BinRecord* bin_record, DbS
}
}
TieredStorage::TieredStorage(DbSlice* db_slice) : db_slice_(*db_slice) {
TieredStorage::TieredStorage(DbSlice* db_slice, size_t max_file_size)
: db_slice_(*db_slice), max_file_size_(max_file_size) {
}
TieredStorage::~TieredStorage() {
@ -312,8 +313,10 @@ error_code TieredStorage::Open(const string& base) {
error_code ec = io_mgr_.Open(path);
if (!ec) {
if (io_mgr_.Span()) { // Add initial storage.
alloc_.AddStorage(0, io_mgr_.Span());
size_t initial_size = io_mgr_.Span();
if (initial_size) { // Add initial storage.
allocated_size_ += initial_size;
alloc_.AddStorage(0, initial_size);
}
}
return ec;
@ -611,7 +614,7 @@ bool TieredStorage::FlushPending(DbIndex db_index, unsigned bin_index) {
}
void TieredStorage::InitiateGrow(size_t grow_size) {
if (io_mgr_.grow_pending())
if (io_mgr_.grow_pending() || allocated_size_ + grow_size > max_file_size_)
return;
DCHECK_GT(grow_size, 0u);
@ -620,6 +623,7 @@ void TieredStorage::InitiateGrow(size_t grow_size) {
auto cb = [start, grow_size, this](int io_res) {
if (io_res == 0) {
alloc_.AddStorage(start, grow_size);
allocated_size_ += grow_size;
} else {
LOG_FIRST_N(ERROR, 10) << "Error enlarging storage " << io_res;
}

View file

@ -21,7 +21,7 @@ class TieredStorage {
public:
enum : uint16_t { kMinBlobLen = 64 };
explicit TieredStorage(DbSlice* db_slice);
explicit TieredStorage(DbSlice* db_slice, size_t max_file_size);
~TieredStorage();
std::error_code Open(const std::string& path);
@ -69,6 +69,8 @@ class TieredStorage {
absl::flat_hash_map<uint32_t, uint8_t> page_refcnt_;
TieredStats stats_;
size_t max_file_size_;
size_t allocated_size_ = 0;
};
} // namespace dfly