diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 0d9f23289..c0554918f 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -41,6 +41,7 @@ namespace { using namespace std; using namespace facade; +using namespace util; using CI = CommandId; @@ -54,7 +55,7 @@ struct StringValue { } StringValue(std::string s) : v_{std::move(s)} { } - StringValue(util::fb2::Future f) : v_{std::move(f)} { + StringValue(fb2::Future f) : v_{std::move(f)} { } // Get and consume value. If backed by a future, blocks until resolved. @@ -68,11 +69,11 @@ struct StringValue { EngineShard* es); bool IsFuturized() const { - return std::holds_alternative>(v_); + return std::holds_alternative>(v_); } private: - std::variant> v_; + std::variant> v_; }; // Helper for performing SET operations with various options @@ -148,12 +149,12 @@ size_t SetRange(std::string* value, size_t start, std::string_view range) { return value->size(); } -template using TResult = std::variant>; +template using TResult = std::variant>; template T GetResult(TResult v) { Overloaded ov{ [](T&& t) { return t; }, - [](util::fb2::Future&& future) { return future.Get(); }, + [](fb2::Future&& future) { return future.Get(); }, }; return std::visit(ov, std::move(v)); } @@ -170,7 +171,7 @@ OpResult> OpStrLen(const OpArgs& op_args, string_view key) { // already pending. // TODO: Optimize to return co.Size() if no modify operations are present if (const auto& co = it_res.value()->second; co.IsExternal()) { - util::fb2::Future fut; + fb2::Future fut; op_args.shard->tiered_storage()->Read( op_args.db_cntx.db_index, key, co, [fut](const std::string& s) mutable { fut.Resolve(s.size()); }); @@ -250,7 +251,7 @@ OpResult OpGetRange(const OpArgs& op_args, string_view key, int32_t RETURN_ON_BAD_STATUS(it_res); if (const CompactObj& co = it_res.value()->second; co.IsExternal()) { - util::fb2::Future fut; + fb2::Future fut; op_args.shard->tiered_storage()->Read( op_args.db_cntx.db_index, key, co, [read, fut](const std::string& s) mutable { fut.Resolve(string{read(s)}); }); @@ -536,7 +537,7 @@ struct MGetResponse { // fetch_mask values constexpr uint8_t FETCH_MCFLAG = 0x1; constexpr uint8_t FETCH_MCVER = 0x2; -MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, uint8_t fetch_mask, const Transaction* t, +MGetResponse OpMGet(fb2::BlockingCounter wait_bc, uint8_t fetch_mask, const Transaction* t, EngineShard* shard) { ShardArgs keys = t->GetShardArgs(shard->shard_id()); DCHECK(!keys.Empty()); @@ -544,29 +545,55 @@ MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, uint8_t fetch_mask, cons auto& db_slice = t->GetDbSlice(shard->shard_id()); MGetResponse response(keys.Size()); - absl::InlinedVector iters(keys.Size()); + struct Item { + DbSlice::ConstIterator it; + int source_index = -1; // in case of duplicate keys, points to the first occurrence. + }; + + absl::InlinedVector items(keys.Size()); + + // We can not make it thread-local because we may preempt during the Find loop due to + // serialization with the bumpup calls. + + // TODO: consider separating BumpUps from finds because it becomes too complicated + // to reason about. + absl::flat_hash_map key_index; // First, fetch all iterators and count total size ahead size_t total_size = 0; unsigned index = 0; + key_index.reserve(keys.Size()); + for (string_view key : keys) { + auto [it, inserted] = key_index.try_emplace(key, index); + if (!inserted) { // duplicate -> point to the first occurrence. + items[index++].source_index = it->second; + continue; + } + auto it_res = db_slice.FindReadOnly(t->GetDbContext(), key, OBJ_STRING); - if (auto& dest = iters[index++]; it_res) { - dest = *it_res; + auto& dest = items[index++]; + if (it_res) { + dest.it = *it_res; total_size += (*it_res)->second.Size(); } } + VLOG_IF(1, total_size > 10000000) << "OpMGet: allocating " << total_size << " bytes"; + // Allocate enough for all values response.storage = make_unique(total_size); char* next = response.storage.get(); bool fetch_mcflag = fetch_mask & FETCH_MCFLAG; bool fetch_mcver = fetch_mask & FETCH_MCVER; - for (size_t i = 0; i < iters.size(); ++i) { - auto it = iters[i]; - if (it.is_done()) + for (size_t i = 0; i < items.size(); ++i) { + auto it = items[i].it; + if (it.is_done()) { + if (items[i].source_index >= 0) { + response.resp_arr[i] = response.resp_arr[items[i].source_index]; + } continue; - + } auto& resp = response.resp_arr[i].emplace(); // Copy to buffer or trigger tiered read that will eventually write to @@ -596,6 +623,7 @@ MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, uint8_t fetch_mask, cons } } } + key_index.clear(); return response; } @@ -782,7 +810,7 @@ string StringValue::Get() && { auto prev = exchange(v_, monostate{}); if (holds_alternative(prev)) return std::move(std::get(prev)); - return std::get>(prev).Get(); + return std::get>(prev).Get(); } bool StringValue::IsEmpty() const { @@ -1277,7 +1305,7 @@ void StringFamily::MGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil } // Count of pending tiered reads - util::fb2::BlockingCounter tiering_bc{0}; + fb2::BlockingCounter tiering_bc{0}; std::vector mget_resp(shard_set->size()); auto cb = [&](Transaction* t, EngineShard* shard) { mget_resp[shard->shard_id()] = OpMGet(tiering_bc, fetch_mask, t, shard); diff --git a/src/server/string_family_test.cc b/src/server/string_family_test.cc index 2e4c5066c..fa4057582 100644 --- a/src/server/string_family_test.cc +++ b/src/server/string_family_test.cc @@ -325,11 +325,14 @@ TEST_F(StringFamilyTest, MGetCachingModeBug2465) { Run({"del", vec[1]}); Run({"lpush", vec[1], "a"}); + resp = Run({"get", vec[2]}); + string val = resp.GetString(); auto mget_resp = StrArray(Run({"mget", vec[2], vec[2], vec[2]})); + EXPECT_THAT(mget_resp, ElementsAre(val, val, val)); resp = Run({"info", "stats"}); size_t bumps = get_bump_ups(resp.GetString()); - EXPECT_EQ(bumps, 2); // one bump for del and one for the mget key + EXPECT_EQ(bumps, 3); // one bump for del and one for get and one for mget } TEST_F(StringFamilyTest, MSetGet) {