diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 96db64f88..4b3bfe78f 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -45,11 +45,10 @@ using CI = CommandId; constexpr uint32_t kMaxStrLen = 1 << 28; -size_t CopyValueToBuffer(const PrimeValue& pv, char* dest) { +void CopyValueToBuffer(const PrimeValue& pv, char* dest) { DCHECK_EQ(pv.ObjType(), OBJ_STRING); DCHECK(!pv.IsExternal()); pv.GetString(dest); - return pv.Size(); } string GetString(const PrimeValue& pv) { @@ -420,8 +419,8 @@ OpResult> OpThrottle(const OpArgs& op_args, const string_view return array{limited ? 1 : 0, limit, remaining, retry_after_ms, reset_after_ms}; } -SinkReplyBuilder::MGetResponse OpMGet(bool fetch_mcflag, bool fetch_mcver, const Transaction* t, - EngineShard* shard) { +SinkReplyBuilder::MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, bool fetch_mcflag, + bool fetch_mcver, const Transaction* t, EngineShard* shard) { ShardArgs keys = t->GetShardArgs(shard->shard_id()); DCHECK(!keys.Empty()); @@ -430,17 +429,18 @@ SinkReplyBuilder::MGetResponse OpMGet(bool fetch_mcflag, bool fetch_mcver, const SinkReplyBuilder::MGetResponse response(keys.Size()); absl::InlinedVector iters(keys.Size()); + // First, fetch all iterators and count total size ahead size_t total_size = 0; unsigned index = 0; for (string_view key : keys) { auto it_res = db_slice.FindReadOnly(t->GetDbContext(), key, OBJ_STRING); - auto& dest = iters[index++]; - if (!it_res) - continue; - dest = *it_res; - total_size += (*it_res)->second.Size(); + if (auto& dest = iters[index++]; it_res) { + dest = *it_res; + total_size += (*it_res)->second.Size(); + } } + // Allocate enough for all values response.storage_list = SinkReplyBuilder::AllocMGetStorage(total_size); char* next = response.storage_list->data; @@ -451,7 +451,19 @@ SinkReplyBuilder::MGetResponse OpMGet(bool fetch_mcflag, bool fetch_mcver, const auto& resp = response.resp_arr[i].emplace(); - size_t size = CopyValueToBuffer(it->second, next); + // Copy to buffer or trigger tiered read that will eventually write to buffer + if (it->second.IsExternal()) { + wait_bc->Add(1); + auto cb = [next, wait_bc](const string& v) mutable { + memcpy(next, v.data(), v.size()); + wait_bc->Dec(); + }; + shard->tiered_storage()->Read(t->GetDbIndex(), it.key(), it->second, std::move(cb)); + } else { + CopyValueToBuffer(it->second, next); + } + + size_t size = it->second.Size(); resp.value = string_view(next, size); next += size; @@ -717,7 +729,6 @@ OpStatus SetGeneric(ConnectionContext* cntx, const SetCmd::SetParams& sparams, s }); } -// With tieringV2 support void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) { facade::CmdArgParser parser{args}; @@ -852,7 +863,6 @@ void StringFamily::SetNx(CmdArgList args, ConnectionContext* cntx) { return builder->SendLong(0); // value do exists, we need to report that we didn't change it } -// With tieringV2 support void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) { auto cb = [key = ArgS(args, 0)](Transaction* tx, EngineShard* es) -> OpResult { auto it_res = es->db_slice().FindReadOnly(tx->GetDbContext(), key, OBJ_STRING); @@ -865,7 +875,6 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) { GetReplies{cntx->reply_builder()}.Send(cntx->transaction->ScheduleSingleHopT(cb)); } -// With tieringV2 support void StringFamily::GetDel(CmdArgList args, ConnectionContext* cntx) { auto cb = [key = ArgS(args, 0)](Transaction* tx, EngineShard* es) -> OpResult { auto it_res = es->db_slice().FindMutable(tx->GetDbContext(), key, OBJ_STRING); @@ -881,7 +890,6 @@ void StringFamily::GetDel(CmdArgList args, ConnectionContext* cntx) { GetReplies{cntx->reply_builder()}.Send(cntx->transaction->ScheduleSingleHopT(cb)); } -// With tieringV2 support void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 0); string_view value = ArgS(args, 1); @@ -905,7 +913,6 @@ void StringFamily::Prepend(CmdArgList args, ConnectionContext* cntx) { ExtendGeneric(args, true, cntx); } -// With tieringV2 support void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContext* cntx) { string_view key = ArgS(args, 0); string_view value = ArgS(args, 1); @@ -939,7 +946,6 @@ void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContex } } -// With tieringV2 support void StringFamily::GetEx(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 0); @@ -1141,32 +1147,31 @@ void StringFamily::SetExGeneric(bool seconds, CmdArgList args, ConnectionContext void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) { DCHECK_GE(args.size(), 1U); - Transaction* transaction = cntx->transaction; - unsigned shard_count = shard_set->size(); - std::vector mget_resp(shard_count); + std::vector mget_resp(shard_set->size()); ConnectionContext* dfly_cntx = static_cast(cntx); bool fetch_mcflag = cntx->protocol() == Protocol::MEMCACHE; bool fetch_mcver = fetch_mcflag && (dfly_cntx->conn_state.memcache_flag & ConnectionState::FETCH_CAS_VER); + // Count of pending tiered reads + util::fb2::BlockingCounter tiering_bc{0}; auto cb = [&](Transaction* t, EngineShard* shard) { - ShardId sid = shard->shard_id(); - mget_resp[sid] = OpMGet(fetch_mcflag, fetch_mcver, t, shard); + mget_resp[shard->shard_id()] = OpMGet(tiering_bc, fetch_mcflag, fetch_mcver, t, shard); return OpStatus::OK; }; - // MGet requires locking as well. For example, if coordinator A applied W(x) and then W(y) - // it necessarily means that whoever observed y, must observe x. - // Without locking, mget x y could read stale x but latest y. OpStatus result = transaction->ScheduleSingleHop(std::move(cb)); CHECK_EQ(OpStatus::OK, result); + // wait for all tiered reads to finish + tiering_bc->Wait(); + // reorder the responses back according to the order of their corresponding keys. SinkReplyBuilder::MGetResponse res(args.size()); - for (ShardId sid = 0; sid < shard_count; ++sid) { + for (ShardId sid = 0; sid < mget_resp.size(); ++sid) { if (!transaction->IsActive(sid)) continue; diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 941ada55f..4ce9979ae 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -200,6 +200,16 @@ util::fb2::Future TieredStorage::Read(DbIndex dbid, string_view key, return future; } +void TieredStorage::Read(DbIndex dbid, std::string_view key, const PrimeValue& value, + std::function readf) { + DCHECK(value.IsExternal()); + auto cb = [readf = std::move(readf)](string* value) { + readf(*value); + return false; + }; + op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb)); +} + template util::fb2::Future TieredStorage::Modify(DbIndex dbid, std::string_view key, const PrimeValue& value, diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index 4543f82c4..c50b0da93 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -46,6 +46,10 @@ class TieredStorage { // Read offloaded value. It must be of external type util::fb2::Future Read(DbIndex dbid, std::string_view key, const PrimeValue& value); + // Read offloaded value. It must be of external type + void Read(DbIndex dbid, std::string_view key, const PrimeValue& value, + std::function readf); + // Apply modification to offloaded value, return generic result from callback template util::fb2::Future Modify(DbIndex dbid, std::string_view key, const PrimeValue& value, diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index 766c8712e..061f9d132 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -84,6 +84,23 @@ TEST_F(TieredStorageTest, SimpleGetSet) { } } +TEST_F(TieredStorageTest, MGET) { + vector command = {"MGET"}, values = {}; + for (char key = 'A'; key <= 'B'; key++) { + command.emplace_back(1, key); + values.emplace_back(3000, key); + Run({"SET", command.back(), values.back()}); + } + + ExpectConditionWithinTimeout( + [this, &values] { return GetMetrics().tiered_stats.total_stashes >= values.size(); }); + + auto resp = Run(absl::MakeSpan(command)); + auto elements = resp.GetVec(); + for (size_t i = 0; i < elements.size(); i++) + EXPECT_EQ(elements[i], values[i]); +} + TEST_F(TieredStorageTest, SimpleAppend) { // TODO: use pipelines to issue APPEND/GET/APPEND sequence, // currently it's covered only for op_manager_test