chore: prefetch keys in opmget

Also, stop deduplicating keys by default, but keep it as a run-time flag.
All in all, this PR improves MGET (100% miss) throughput by at least 7%:
from 1.23M to 1.32M qps.
(100% miss traffic allows measuring more effectively the impact of this code)

Also, finally fix TieredStorageTest.FlushAll test.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2025-03-16 18:24:06 +02:00
parent 70be62c6a1
commit eb69d2b101
No known key found for this signature in database
GPG key ID: F25B77EAF8AEBA7A
2 changed files with 36 additions and 11 deletions

View file

@ -35,6 +35,10 @@
#include "server/transaction.h"
#include "util/fibers/future.h"
ABSL_FLAG(bool, mget_prefetch_keys, true, "If true, MGET will prefetch keys before reading them");
ABSL_FLAG(bool, mget_dedup_keys, false, "If true, MGET will deduplicate keys");
namespace dfly {
namespace {
@ -547,23 +551,42 @@ MGetResponse OpMGet(fb2::BlockingCounter wait_bc, uint8_t fetch_mask, const Tran
absl::InlinedVector<Item, 32> items(keys.Size());
// First, fetch all iterators and count total size ahead
size_t total_size = 0;
unsigned index = 0;
static bool mget_prefetch_keys = absl::GetFlag(FLAGS_mget_prefetch_keys);
static bool mget_dedup_keys = absl::GetFlag(FLAGS_mget_dedup_keys);
// We can not make it thread-local because we may preempt during the Find loop due to
// serialization with the bumpup calls.
// TODO: consider separating BumpUps from finds because it becomes too complicated
// to reason about.
absl::flat_hash_map<string_view, unsigned> key_index;
if (mget_dedup_keys) {
key_index.reserve(keys.Size());
}
// First, fetch all iterators and count total size ahead
size_t total_size = 0;
unsigned index = 0;
key_index.reserve(keys.Size());
PrimeTable& pt = db_slice.GetDBTable(t->GetDbIndex())->prime;
constexpr unsigned kPrefetchLimit = 32;
if (mget_prefetch_keys) {
unsigned prefetched = 0;
for (string_view key : keys) {
pt.Prefetch(key);
if (++prefetched >= kPrefetchLimit) {
break;
}
}
}
for (string_view key : keys) {
auto [it, inserted] = key_index.try_emplace(key, index);
if (!inserted) { // duplicate -> point to the first occurrence.
items[index++].source_index = it->second;
continue;
if (mget_dedup_keys) {
auto [it, inserted] = key_index.try_emplace(key, index);
if (!inserted) { // duplicate -> point to the first occurrence.
items[index++].source_index = it->second;
continue;
}
}
auto it_res = db_slice.FindReadOnly(t->GetDbContext(), key, OBJ_STRING);
@ -618,8 +641,8 @@ MGetResponse OpMGet(fb2::BlockingCounter wait_bc, uint8_t fetch_mask, const Tran
}
}
}
key_index.clear();
key_index.clear();
return response;
}

View file

@ -276,7 +276,10 @@ TEST_F(TieredStorageTest, FlushAll) {
Metrics metrics;
ExpectConditionWithinTimeout([&] {
metrics = GetMetrics();
return metrics.events.hits > 2;
// Note that metrics.events.hits is not consistent with total_fetches
// and it can happen that hits is greater than total_fetches due to in-progress reads.
return metrics.tiered_stats.total_fetches > 2;
});
LOG(INFO) << FormatMetrics(metrics);
@ -290,7 +293,6 @@ TEST_F(TieredStorageTest, FlushAll) {
LOG(INFO) << FormatMetrics(metrics);
EXPECT_EQ(metrics.db_stats.front().tiered_entries, 0u);
EXPECT_GT(metrics.tiered_stats.total_fetches, 2u);
}
TEST_F(TieredStorageTest, FlushPending) {