chore: tiering - make Modify work with cool storage (#3395)

1. Fully support tiered_experimental_cooling for all operations
2. Offset cool storage usage when computing memory pressure situations in Hearbeat.
3. Introduce realtime entry counting per db_slice and provide DCHECK to verify it vs the old approach.
   Later we will switch to realtime entry and free memory computations when computing bytes per object,
   and remove the old approach in CacheStats().
4. Show hit rate during the run of dfly_bench loadtest.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-07-27 14:31:29 +03:00 committed by GitHub
parent 9d16bd6f6e
commit 6b67f44e29
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 95 additions and 45 deletions

View file

@ -622,6 +622,7 @@ OpResult<DbSlice::AddOrFindResult> DbSlice::AddOrFindInternal(const Context& cnt
}
table_memory_ += (db.table_memory() - table_before);
entries_count_++;
db.stats.inline_keys += it->first.IsInline();
AccountObjectMemory(key, it->first.ObjType(), it->first.MallocUsed(), &db); // Account for key
@ -756,6 +757,8 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
for (DbIndex index : indexes) {
table_memory_ -= db_arr_[index]->table_memory();
entries_count_ -= db_arr_[index]->prime.size();
InvalidateDbWatches(index);
flush_db_arr[index] = std::move(db_arr_[index]);
@ -764,6 +767,7 @@ void DbSlice::FlushDbIndexes(const std::vector<DbIndex>& indexes) {
}
CHECK(fetched_items_.empty());
auto cb = [indexes, flush_db_arr = std::move(flush_db_arr)]() mutable {
flush_db_arr.clear();
ServerState::tlocal()->DecommitMemory(ServerState::kDataHeap | ServerState::kBackingHeap |
@ -1464,6 +1468,7 @@ void DbSlice::PerformDeletion(Iterator del_it, ExpIterator exp_it, DbTable* tabl
table->prime.Erase(del_it.GetInnerIt());
table_memory_ += (table->table_memory() - table_before);
--entries_count_;
SendInvalidationTrackingMessage(del_it.key());
}

View file

@ -391,7 +391,6 @@ class DbSlice {
// Returns existing keys count in the db.
size_t DbSize(DbIndex db_ind) const;
// Callback functions called upon writing to the existing key.
DbTableStats* MutableStats(DbIndex db_ind) {
return &db_arr_[db_ind]->stats;
}
@ -417,6 +416,10 @@ class DbSlice {
return table_memory_;
}
size_t entries_count() const {
return entries_count_;
}
using ChangeCallback = std::function<void(DbIndex, const ChangeReq&)>;
//! Registers the callback to be called for each change.
@ -567,6 +570,7 @@ class DbSlice {
size_t bytes_per_object_ = 0;
size_t soft_budget_limit_ = 0;
size_t table_memory_ = 0;
uint64_t entries_count_ = 0;
mutable SliceEvents events_; // we may change this even for const operations.

View file

@ -162,6 +162,15 @@ struct ClientStats {
uint64_t hit_count = 0;
uint64_t hit_opportunities = 0;
uint64_t num_errors = 0;
ClientStats& operator+=(const ClientStats& o) {
hist.Merge(o.hist);
num_responses += o.num_responses;
hit_count += o.hit_count;
hit_opportunities += o.hit_opportunities;
num_errors += o.num_errors;
return *this;
}
};
// Per connection driver.
@ -474,29 +483,30 @@ void WatchFiber(absl::Time start_time, atomic_bool* finish_signal, ProactorPool*
ThisFiber::SleepFor(1s);
absl::Time now = absl::Now();
if (now - last_print > absl::Seconds(5)) {
uint64_t num_resp = 0;
uint64_t num_errors = 0;
ClientStats client_stats;
pp->AwaitFiberOnAll([&](auto* p) {
unique_lock lk(mutex);
num_resp += client->stats.num_responses;
num_errors += client->stats.num_errors;
client_stats += client->stats;
lk.unlock();
});
uint64_t total_ms = (now - start_time) / absl::Milliseconds(1);
uint64_t period_ms = (now - last_print) / absl::Milliseconds(1);
uint64_t period_resp_cnt = num_resp - num_last_resp_cnt;
double done_perc = double(num_resp) * 100 / resp_goal;
uint64_t period_resp_cnt = client_stats.num_responses - num_last_resp_cnt;
double done_perc = double(client_stats.num_responses) * 100 / resp_goal;
double hitrate =
client_stats.hit_opportunities > 0
? 100 * double(client_stats.hit_count) / double(client_stats.hit_opportunities)
: 0;
CONSOLE_INFO << total_ms / 1000 << "s: " << absl::StrFormat("%.1f", done_perc)
<< "% done, effective RPS(now/accumulated): "
<< period_resp_cnt * 1000 / period_ms << "/" << num_resp * 1000 / total_ms
<< ", errors: " << num_errors;
<< period_resp_cnt * 1000 / period_ms << "/"
<< client_stats.num_responses * 1000 / total_ms
<< ", errors: " << client_stats.num_errors
<< ", hitrate: " << absl::StrFormat("%.1f", hitrate) << "%";
last_print = now;
num_last_resp_cnt = num_resp;
num_last_resp_cnt = client_stats.num_responses;
}
}
}
@ -599,31 +609,27 @@ int main(int argc, char* argv[]) {
base::Histogram hist;
LOG(INFO) << "Resetting all threads";
uint64_t hit_opportunities = 0, hit_count = 0, num_errors = 0, num_responses = 0;
ClientStats summary;
pp->AwaitFiberOnAll([&](auto* p) {
unique_lock lk(mutex);
hist.Merge(client->stats.hist);
hit_opportunities += client->stats.hit_opportunities;
hit_count += client->stats.hit_count;
num_errors += client->stats.num_errors;
num_responses += client->stats.num_responses;
summary += client->stats;
lk.unlock();
client.reset();
});
CONSOLE_INFO << "\nTotal time: " << duration << ". Overall number of requests: " << num_responses
<< ", QPS: " << num_responses / (duration / absl::Seconds(1));
CONSOLE_INFO << "\nTotal time: " << duration
<< ". Overall number of requests: " << summary.num_responses
<< ", QPS: " << summary.num_responses / (duration / absl::Seconds(1));
if (num_errors) {
CONSOLE_INFO << "Got " << num_errors << " error responses!";
if (summary.num_errors) {
CONSOLE_INFO << "Got " << summary.num_errors << " error responses!";
}
CONSOLE_INFO << "Latency summary, all times are in usec:\n" << hist.ToString();
if (hit_opportunities) {
if (summary.hit_opportunities) {
CONSOLE_INFO << "----------------------------------\nHit rate: "
<< 100 * double(hit_count) / double(hit_opportunities) << "%\n";
<< 100 * double(summary.hit_count) / double(summary.hit_opportunities) << "%\n";
}
pp->Stop();

View file

@ -620,10 +620,15 @@ void EngineShard::Heartbeat() {
}
ssize_t eviction_redline = size_t(max_memory_limit * kRedLimitFactor) / shard_set->size();
// Offset CoolMemoryUsage when consider background offloading.
// TODO: Another approach could be is to align the approach similarly to how we do with
// FreeMemWithEvictionStep, i.e. if memory_budget is below the limit.
size_t tiering_offload_threshold =
tiered_storage_
? size_t(max_memory_limit * GetFlag(FLAGS_tiered_offload_threshold)) / shard_set->size()
: std::numeric_limits<size_t>::max();
tiered_storage_ ? tiered_storage_->CoolMemoryUsage() +
size_t(max_memory_limit * GetFlag(FLAGS_tiered_offload_threshold)) /
shard_set->size()
: std::numeric_limits<size_t>::max();
DbContext db_cntx;
db_cntx.time_now_ms = GetCurrentTimeMs();
@ -738,6 +743,7 @@ void EngineShard::CacheStats() {
}
}
DCHECK_EQ(table_memory, db_slice.table_memory());
DCHECK_EQ(entries, db_slice.entries_count());
if (tiered_storage_) {
table_memory += tiered_storage_->CoolMemoryUsage();
}
@ -764,7 +770,10 @@ bool EngineShard::ShouldThrottleForTiering() const { // see header for formula
size_t tiering_redline =
(max_memory_limit * GetFlag(FLAGS_tiered_offload_threshold)) / shard_set->size();
return UsedMemory() > tiering_redline && tiered_storage_->WriteDepthUsage() > 0.3;
// UsedMemory includes CoolMemoryUsage, so we are offsetting it to remove the cool cache impact.
return tiered_storage_->WriteDepthUsage() > 0.3 &&
(UsedMemory() > tiering_redline + tiered_storage_->CoolMemoryUsage());
}
auto EngineShard::AnalyzeTxQueue() const -> TxQueueInfo {

View file

@ -31,7 +31,7 @@ ABSL_FLAG(uint32_t, tiered_storage_memory_margin, 10_MB,
"In bytes. If memory budget on a shard goes below this limit, tiering stops "
"hot-loading values into ram.");
ABSL_FLAG(bool, tiered_experimental_cooling, false,
ABSL_FLAG(bool, tiered_experimental_cooling, true,
"If true, uses intermidate cooling layer "
"when offloading values to storage");
@ -267,6 +267,8 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value,
}
bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) {
DVLOG(2) << "NotifyDelete [" << segment.offset << "," << segment.length << "]";
if (OccupiesWholePages(segment.length))
return true;
@ -398,22 +400,33 @@ util::fb2::Future<T> TieredStorage::Modify(DbIndex dbid, std::string_view key,
const PrimeValue& value,
std::function<T(std::string*)> modf) {
DCHECK(value.IsExternal());
DCHECK(!value.IsCool()); // TBD
util::fb2::Future<T> future;
PrimeValue decoder;
decoder.ImportExternal(value);
if (value.IsCool()) {
PrimeValue hot = Warmup(dbid, value.GetCool());
string tmp;
auto cb = [future, modf = std::move(modf), decoder = std::move(decoder)](
bool is_raw, std::string* raw_val) mutable {
if (is_raw) {
decoder.Materialize(*raw_val, true);
decoder.GetString(raw_val);
}
future.Resolve(modf(raw_val));
return true;
};
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
DCHECK_EQ(value.Size(), hot.Size());
hot.GetString(&tmp);
future.Resolve(modf(&tmp));
// TODO: An awful hack - to fix later.
const_cast<PrimeValue&>(value).Materialize(tmp, false);
} else {
PrimeValue decoder;
decoder.ImportExternal(value);
auto cb = [future, modf = std::move(modf), decoder = std::move(decoder)](
bool is_raw, std::string* raw_val) mutable {
if (is_raw) {
decoder.Materialize(*raw_val, true);
decoder.GetString(raw_val);
}
future.Resolve(modf(raw_val));
return true;
};
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
}
return future;
}

View file

@ -26,6 +26,7 @@ ABSL_DECLARE_FLAG(bool, force_epoll);
ABSL_DECLARE_FLAG(string, tiered_prefix);
ABSL_DECLARE_FLAG(float, tiered_offload_threshold);
ABSL_DECLARE_FLAG(unsigned, tiered_storage_write_depth);
ABSL_DECLARE_FLAG(bool, tiered_experimental_cooling);
namespace dfly {
@ -189,7 +190,9 @@ TEST_F(TieredStorageTest, Defrag) {
// This tirggers defragmentation, as only 3 < 7/2 remain left
Run({"GET", string(1, 'd')});
Run({"GET", string(1, 'd')});
// Wait that any reads caused by defrags has been finished.
ExpectConditionWithinTimeout([this] { return GetMetrics().tiered_stats.pending_read_cnt == 0; });
metrics = GetMetrics();
EXPECT_EQ(metrics.tiered_stats.total_defrags, 3u);
EXPECT_EQ(metrics.tiered_stats.small_bins_cnt, 0u);
@ -200,6 +203,9 @@ TEST_F(TieredStorageTest, BackgroundOffloading) {
absl::FlagSaver saver;
SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // offload all values
// The setup works without cooling buffers.
SetFlag(&FLAGS_tiered_experimental_cooling, false);
const int kNum = 500;
max_memory_limit = kNum * 4096;
@ -246,6 +252,13 @@ TEST_F(TieredStorageTest, FlushAll) {
absl::FlagSaver saver;
SetFlag(&FLAGS_tiered_offload_threshold, 0.0f); // offload all values
// We want to cover the interaction of FlushAll with concurrent reads from disk.
// For that we disable tiered_experimental_cooling.
// TODO: seems that our replacement policy will upload the entries to RAM in any case,
// making this test ineffective. We should add the ability to disable promotion of offloaded
// entries to RAM upon reads.
SetFlag(&FLAGS_tiered_experimental_cooling, false);
const int kNum = 500;
for (size_t i = 0; i < kNum; i++) {
Run({"SET", absl::StrCat("k", i), BuildString(3000)});