fix: deduplicate mget response (#4175)

* fix: deduplicate mget response

In case of duplicate mget keys, skips fetching the same key twice.
The optimization is straighforward - we just copy the response for the original key,
since the response is a shallow object, we potentially save lots of memory with this
deduplication. Always deduplicate inside OpMGet.

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-11-25 17:29:53 +02:00 committed by GitHub
parent 43c83d29fa
commit 7ac1631424
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 49 additions and 18 deletions

View file

@ -41,6 +41,7 @@ namespace {
using namespace std; using namespace std;
using namespace facade; using namespace facade;
using namespace util;
using CI = CommandId; using CI = CommandId;
@ -54,7 +55,7 @@ struct StringValue {
} }
StringValue(std::string s) : v_{std::move(s)} { StringValue(std::string s) : v_{std::move(s)} {
} }
StringValue(util::fb2::Future<std::string> f) : v_{std::move(f)} { StringValue(fb2::Future<std::string> f) : v_{std::move(f)} {
} }
// Get and consume value. If backed by a future, blocks until resolved. // Get and consume value. If backed by a future, blocks until resolved.
@ -68,11 +69,11 @@ struct StringValue {
EngineShard* es); EngineShard* es);
bool IsFuturized() const { bool IsFuturized() const {
return std::holds_alternative<util::fb2::Future<std::string>>(v_); return std::holds_alternative<fb2::Future<std::string>>(v_);
} }
private: private:
std::variant<std::monostate, std::string, util::fb2::Future<std::string>> v_; std::variant<std::monostate, std::string, fb2::Future<std::string>> v_;
}; };
// Helper for performing SET operations with various options // Helper for performing SET operations with various options
@ -148,12 +149,12 @@ size_t SetRange(std::string* value, size_t start, std::string_view range) {
return value->size(); return value->size();
} }
template <typename T> using TResult = std::variant<T, util::fb2::Future<T>>; template <typename T> using TResult = std::variant<T, fb2::Future<T>>;
template <typename T> T GetResult(TResult<T> v) { template <typename T> T GetResult(TResult<T> v) {
Overloaded ov{ Overloaded ov{
[](T&& t) { return t; }, [](T&& t) { return t; },
[](util::fb2::Future<T>&& future) { return future.Get(); }, [](fb2::Future<T>&& future) { return future.Get(); },
}; };
return std::visit(ov, std::move(v)); return std::visit(ov, std::move(v));
} }
@ -170,7 +171,7 @@ OpResult<TResult<size_t>> OpStrLen(const OpArgs& op_args, string_view key) {
// already pending. // already pending.
// TODO: Optimize to return co.Size() if no modify operations are present // TODO: Optimize to return co.Size() if no modify operations are present
if (const auto& co = it_res.value()->second; co.IsExternal()) { if (const auto& co = it_res.value()->second; co.IsExternal()) {
util::fb2::Future<size_t> fut; fb2::Future<size_t> fut;
op_args.shard->tiered_storage()->Read( op_args.shard->tiered_storage()->Read(
op_args.db_cntx.db_index, key, co, op_args.db_cntx.db_index, key, co,
[fut](const std::string& s) mutable { fut.Resolve(s.size()); }); [fut](const std::string& s) mutable { fut.Resolve(s.size()); });
@ -250,7 +251,7 @@ OpResult<StringValue> OpGetRange(const OpArgs& op_args, string_view key, int32_t
RETURN_ON_BAD_STATUS(it_res); RETURN_ON_BAD_STATUS(it_res);
if (const CompactObj& co = it_res.value()->second; co.IsExternal()) { if (const CompactObj& co = it_res.value()->second; co.IsExternal()) {
util::fb2::Future<std::string> fut; fb2::Future<std::string> fut;
op_args.shard->tiered_storage()->Read( op_args.shard->tiered_storage()->Read(
op_args.db_cntx.db_index, key, co, op_args.db_cntx.db_index, key, co,
[read, fut](const std::string& s) mutable { fut.Resolve(string{read(s)}); }); [read, fut](const std::string& s) mutable { fut.Resolve(string{read(s)}); });
@ -536,7 +537,7 @@ struct MGetResponse {
// fetch_mask values // fetch_mask values
constexpr uint8_t FETCH_MCFLAG = 0x1; constexpr uint8_t FETCH_MCFLAG = 0x1;
constexpr uint8_t FETCH_MCVER = 0x2; constexpr uint8_t FETCH_MCVER = 0x2;
MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, uint8_t fetch_mask, const Transaction* t, MGetResponse OpMGet(fb2::BlockingCounter wait_bc, uint8_t fetch_mask, const Transaction* t,
EngineShard* shard) { EngineShard* shard) {
ShardArgs keys = t->GetShardArgs(shard->shard_id()); ShardArgs keys = t->GetShardArgs(shard->shard_id());
DCHECK(!keys.Empty()); DCHECK(!keys.Empty());
@ -544,29 +545,55 @@ MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, uint8_t fetch_mask, cons
auto& db_slice = t->GetDbSlice(shard->shard_id()); auto& db_slice = t->GetDbSlice(shard->shard_id());
MGetResponse response(keys.Size()); MGetResponse response(keys.Size());
absl::InlinedVector<DbSlice::ConstIterator, 32> iters(keys.Size()); struct Item {
DbSlice::ConstIterator it;
int source_index = -1; // in case of duplicate keys, points to the first occurrence.
};
absl::InlinedVector<Item, 32> items(keys.Size());
// We can not make it thread-local because we may preempt during the Find loop due to
// serialization with the bumpup calls.
// TODO: consider separating BumpUps from finds because it becomes too complicated
// to reason about.
absl::flat_hash_map<string_view, unsigned> key_index;
// First, fetch all iterators and count total size ahead // First, fetch all iterators and count total size ahead
size_t total_size = 0; size_t total_size = 0;
unsigned index = 0; unsigned index = 0;
key_index.reserve(keys.Size());
for (string_view key : keys) { for (string_view key : keys) {
auto [it, inserted] = key_index.try_emplace(key, index);
if (!inserted) { // duplicate -> point to the first occurrence.
items[index++].source_index = it->second;
continue;
}
auto it_res = db_slice.FindReadOnly(t->GetDbContext(), key, OBJ_STRING); auto it_res = db_slice.FindReadOnly(t->GetDbContext(), key, OBJ_STRING);
if (auto& dest = iters[index++]; it_res) { auto& dest = items[index++];
dest = *it_res; if (it_res) {
dest.it = *it_res;
total_size += (*it_res)->second.Size(); total_size += (*it_res)->second.Size();
} }
} }
VLOG_IF(1, total_size > 10000000) << "OpMGet: allocating " << total_size << " bytes";
// Allocate enough for all values // Allocate enough for all values
response.storage = make_unique<char[]>(total_size); response.storage = make_unique<char[]>(total_size);
char* next = response.storage.get(); char* next = response.storage.get();
bool fetch_mcflag = fetch_mask & FETCH_MCFLAG; bool fetch_mcflag = fetch_mask & FETCH_MCFLAG;
bool fetch_mcver = fetch_mask & FETCH_MCVER; bool fetch_mcver = fetch_mask & FETCH_MCVER;
for (size_t i = 0; i < iters.size(); ++i) { for (size_t i = 0; i < items.size(); ++i) {
auto it = iters[i]; auto it = items[i].it;
if (it.is_done()) if (it.is_done()) {
if (items[i].source_index >= 0) {
response.resp_arr[i] = response.resp_arr[items[i].source_index];
}
continue; continue;
}
auto& resp = response.resp_arr[i].emplace(); auto& resp = response.resp_arr[i].emplace();
// Copy to buffer or trigger tiered read that will eventually write to // Copy to buffer or trigger tiered read that will eventually write to
@ -596,6 +623,7 @@ MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, uint8_t fetch_mask, cons
} }
} }
} }
key_index.clear();
return response; return response;
} }
@ -782,7 +810,7 @@ string StringValue::Get() && {
auto prev = exchange(v_, monostate{}); auto prev = exchange(v_, monostate{});
if (holds_alternative<string>(prev)) if (holds_alternative<string>(prev))
return std::move(std::get<string>(prev)); return std::move(std::get<string>(prev));
return std::get<util::fb2::Future<std::string>>(prev).Get(); return std::get<fb2::Future<std::string>>(prev).Get();
} }
bool StringValue::IsEmpty() const { bool StringValue::IsEmpty() const {
@ -1277,7 +1305,7 @@ void StringFamily::MGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
} }
// Count of pending tiered reads // Count of pending tiered reads
util::fb2::BlockingCounter tiering_bc{0}; fb2::BlockingCounter tiering_bc{0};
std::vector<MGetResponse> mget_resp(shard_set->size()); std::vector<MGetResponse> mget_resp(shard_set->size());
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
mget_resp[shard->shard_id()] = OpMGet(tiering_bc, fetch_mask, t, shard); mget_resp[shard->shard_id()] = OpMGet(tiering_bc, fetch_mask, t, shard);

View file

@ -325,11 +325,14 @@ TEST_F(StringFamilyTest, MGetCachingModeBug2465) {
Run({"del", vec[1]}); Run({"del", vec[1]});
Run({"lpush", vec[1], "a"}); Run({"lpush", vec[1], "a"});
resp = Run({"get", vec[2]});
string val = resp.GetString();
auto mget_resp = StrArray(Run({"mget", vec[2], vec[2], vec[2]})); auto mget_resp = StrArray(Run({"mget", vec[2], vec[2], vec[2]}));
EXPECT_THAT(mget_resp, ElementsAre(val, val, val));
resp = Run({"info", "stats"}); resp = Run({"info", "stats"});
size_t bumps = get_bump_ups(resp.GetString()); size_t bumps = get_bump_ups(resp.GetString());
EXPECT_EQ(bumps, 2); // one bump for del and one for the mget key EXPECT_EQ(bumps, 3); // one bump for del and one for get and one for mget
} }
TEST_F(StringFamilyTest, MSetGet) { TEST_F(StringFamilyTest, MSetGet) {