feat(tiering): MGET support (#3013)

* feat(tiering): MGET support

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-05-07 23:13:36 +03:00 committed by GitHub
parent f27506e678
commit 5c4279c285
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 61 additions and 25 deletions

View file

@ -45,11 +45,10 @@ using CI = CommandId;
constexpr uint32_t kMaxStrLen = 1 << 28;
size_t CopyValueToBuffer(const PrimeValue& pv, char* dest) {
void CopyValueToBuffer(const PrimeValue& pv, char* dest) {
DCHECK_EQ(pv.ObjType(), OBJ_STRING);
DCHECK(!pv.IsExternal());
pv.GetString(dest);
return pv.Size();
}
string GetString(const PrimeValue& pv) {
@ -420,8 +419,8 @@ OpResult<array<int64_t, 5>> OpThrottle(const OpArgs& op_args, const string_view
return array<int64_t, 5>{limited ? 1 : 0, limit, remaining, retry_after_ms, reset_after_ms};
}
SinkReplyBuilder::MGetResponse OpMGet(bool fetch_mcflag, bool fetch_mcver, const Transaction* t,
EngineShard* shard) {
SinkReplyBuilder::MGetResponse OpMGet(util::fb2::BlockingCounter wait_bc, bool fetch_mcflag,
bool fetch_mcver, const Transaction* t, EngineShard* shard) {
ShardArgs keys = t->GetShardArgs(shard->shard_id());
DCHECK(!keys.Empty());
@ -430,17 +429,18 @@ SinkReplyBuilder::MGetResponse OpMGet(bool fetch_mcflag, bool fetch_mcver, const
SinkReplyBuilder::MGetResponse response(keys.Size());
absl::InlinedVector<DbSlice::ConstIterator, 32> iters(keys.Size());
// First, fetch all iterators and count total size ahead
size_t total_size = 0;
unsigned index = 0;
for (string_view key : keys) {
auto it_res = db_slice.FindReadOnly(t->GetDbContext(), key, OBJ_STRING);
auto& dest = iters[index++];
if (!it_res)
continue;
dest = *it_res;
total_size += (*it_res)->second.Size();
if (auto& dest = iters[index++]; it_res) {
dest = *it_res;
total_size += (*it_res)->second.Size();
}
}
// Allocate enough for all values
response.storage_list = SinkReplyBuilder::AllocMGetStorage(total_size);
char* next = response.storage_list->data;
@ -451,7 +451,19 @@ SinkReplyBuilder::MGetResponse OpMGet(bool fetch_mcflag, bool fetch_mcver, const
auto& resp = response.resp_arr[i].emplace();
size_t size = CopyValueToBuffer(it->second, next);
// Copy to buffer or trigger tiered read that will eventually write to buffer
if (it->second.IsExternal()) {
wait_bc->Add(1);
auto cb = [next, wait_bc](const string& v) mutable {
memcpy(next, v.data(), v.size());
wait_bc->Dec();
};
shard->tiered_storage()->Read(t->GetDbIndex(), it.key(), it->second, std::move(cb));
} else {
CopyValueToBuffer(it->second, next);
}
size_t size = it->second.Size();
resp.value = string_view(next, size);
next += size;
@ -717,7 +729,6 @@ OpStatus SetGeneric(ConnectionContext* cntx, const SetCmd::SetParams& sparams, s
});
}
// With tieringV2 support
void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
facade::CmdArgParser parser{args};
@ -852,7 +863,6 @@ void StringFamily::SetNx(CmdArgList args, ConnectionContext* cntx) {
return builder->SendLong(0); // value do exists, we need to report that we didn't change it
}
// With tieringV2 support
void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
auto cb = [key = ArgS(args, 0)](Transaction* tx, EngineShard* es) -> OpResult<StringValue> {
auto it_res = es->db_slice().FindReadOnly(tx->GetDbContext(), key, OBJ_STRING);
@ -865,7 +875,6 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) {
GetReplies{cntx->reply_builder()}.Send(cntx->transaction->ScheduleSingleHopT(cb));
}
// With tieringV2 support
void StringFamily::GetDel(CmdArgList args, ConnectionContext* cntx) {
auto cb = [key = ArgS(args, 0)](Transaction* tx, EngineShard* es) -> OpResult<StringValue> {
auto it_res = es->db_slice().FindMutable(tx->GetDbContext(), key, OBJ_STRING);
@ -881,7 +890,6 @@ void StringFamily::GetDel(CmdArgList args, ConnectionContext* cntx) {
GetReplies{cntx->reply_builder()}.Send(cntx->transaction->ScheduleSingleHopT(cb));
}
// With tieringV2 support
void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
string_view value = ArgS(args, 1);
@ -905,7 +913,6 @@ void StringFamily::Prepend(CmdArgList args, ConnectionContext* cntx) {
ExtendGeneric(args, true, cntx);
}
// With tieringV2 support
void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
string_view value = ArgS(args, 1);
@ -939,7 +946,6 @@ void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContex
}
}
// With tieringV2 support
void StringFamily::GetEx(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0);
@ -1141,32 +1147,31 @@ void StringFamily::SetExGeneric(bool seconds, CmdArgList args, ConnectionContext
void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
DCHECK_GE(args.size(), 1U);
Transaction* transaction = cntx->transaction;
unsigned shard_count = shard_set->size();
std::vector<SinkReplyBuilder::MGetResponse> mget_resp(shard_count);
std::vector<SinkReplyBuilder::MGetResponse> mget_resp(shard_set->size());
ConnectionContext* dfly_cntx = static_cast<ConnectionContext*>(cntx);
bool fetch_mcflag = cntx->protocol() == Protocol::MEMCACHE;
bool fetch_mcver =
fetch_mcflag && (dfly_cntx->conn_state.memcache_flag & ConnectionState::FETCH_CAS_VER);
// Count of pending tiered reads
util::fb2::BlockingCounter tiering_bc{0};
auto cb = [&](Transaction* t, EngineShard* shard) {
ShardId sid = shard->shard_id();
mget_resp[sid] = OpMGet(fetch_mcflag, fetch_mcver, t, shard);
mget_resp[shard->shard_id()] = OpMGet(tiering_bc, fetch_mcflag, fetch_mcver, t, shard);
return OpStatus::OK;
};
// MGet requires locking as well. For example, if coordinator A applied W(x) and then W(y)
// it necessarily means that whoever observed y, must observe x.
// Without locking, mget x y could read stale x but latest y.
OpStatus result = transaction->ScheduleSingleHop(std::move(cb));
CHECK_EQ(OpStatus::OK, result);
// wait for all tiered reads to finish
tiering_bc->Wait();
// reorder the responses back according to the order of their corresponding keys.
SinkReplyBuilder::MGetResponse res(args.size());
for (ShardId sid = 0; sid < shard_count; ++sid) {
for (ShardId sid = 0; sid < mget_resp.size(); ++sid) {
if (!transaction->IsActive(sid))
continue;

View file

@ -200,6 +200,16 @@ util::fb2::Future<string> TieredStorage::Read(DbIndex dbid, string_view key,
return future;
}
void TieredStorage::Read(DbIndex dbid, std::string_view key, const PrimeValue& value,
std::function<void(const std::string&)> readf) {
DCHECK(value.IsExternal());
auto cb = [readf = std::move(readf)](string* value) {
readf(*value);
return false;
};
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
}
template <typename T>
util::fb2::Future<T> TieredStorage::Modify(DbIndex dbid, std::string_view key,
const PrimeValue& value,

View file

@ -46,6 +46,10 @@ class TieredStorage {
// Read offloaded value. It must be of external type
util::fb2::Future<std::string> Read(DbIndex dbid, std::string_view key, const PrimeValue& value);
// Read offloaded value. It must be of external type
void Read(DbIndex dbid, std::string_view key, const PrimeValue& value,
std::function<void(const std::string&)> readf);
// Apply modification to offloaded value, return generic result from callback
template <typename T>
util::fb2::Future<T> Modify(DbIndex dbid, std::string_view key, const PrimeValue& value,

View file

@ -84,6 +84,23 @@ TEST_F(TieredStorageTest, SimpleGetSet) {
}
}
TEST_F(TieredStorageTest, MGET) {
vector<string> command = {"MGET"}, values = {};
for (char key = 'A'; key <= 'B'; key++) {
command.emplace_back(1, key);
values.emplace_back(3000, key);
Run({"SET", command.back(), values.back()});
}
ExpectConditionWithinTimeout(
[this, &values] { return GetMetrics().tiered_stats.total_stashes >= values.size(); });
auto resp = Run(absl::MakeSpan(command));
auto elements = resp.GetVec();
for (size_t i = 0; i < elements.size(); i++)
EXPECT_EQ(elements[i], values[i]);
}
TEST_F(TieredStorageTest, SimpleAppend) {
// TODO: use pipelines to issue APPEND/GET/APPEND sequence,
// currently it's covered only for op_manager_test