chore: bypass decoding/encoding of the data when performing offloading (#3315)

It's more efficient to offload raw blobs kept in CompactObject than decoded strings.
Unfortunately, it's also more complicated. The complexity arises around Read/Modify operations
that must convert a raw blob into a processed string. Moreover, Modify decodes the raw string, therefore
subsequent callbacks already see the decoded string.

Finally, this PR fixes the logic in NotifyFetched flow:
before it could skip uploading the modified value back to memory, which breaks correctness.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-07-15 13:16:43 +03:00 committed by GitHub
parent f20318d88a
commit 35ef143200
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 132 additions and 76 deletions

View file

@ -942,24 +942,43 @@ void CompactObj::GetString(char* dest) const {
}
void CompactObj::SetExternal(size_t offset, size_t sz) {
SetMeta(EXTERNAL_TAG, mask_ & ~kEncMask);
SetMeta(EXTERNAL_TAG, mask_);
u_.ext_ptr.page_index = offset / 4096;
u_.ext_ptr.page_offset = offset % 4096;
u_.ext_ptr.size = sz;
}
void CompactObj::ImportExternal(const CompactObj& src) {
DCHECK(src.IsExternal());
SetMeta(EXTERNAL_TAG, src.mask_ & kEncMask);
u_.ext_ptr = src.u_.ext_ptr;
}
std::pair<size_t, size_t> CompactObj::GetExternalSlice() const {
DCHECK_EQ(EXTERNAL_TAG, taglen_);
size_t offset = size_t(u_.ext_ptr.page_index) * 4096 + u_.ext_ptr.page_offset;
return pair<size_t, size_t>(offset, size_t(u_.ext_ptr.size));
}
void CompactObj::Materialize(std::string_view str) {
CHECK(IsExternal());
CHECK_GT(str.size(), 20u);
void CompactObj::Materialize(std::string_view blob, bool is_raw) {
CHECK(IsExternal()) << int(taglen_);
EncodeString(str);
DCHECK_GT(blob.size(), kInlineLen);
if (is_raw) {
uint8_t mask = mask_;
if (kUseSmallStrings && SmallString::CanAllocate(blob.size())) {
SetMeta(SMALL_TAG, mask);
tl.small_str_bytes += u_.small_str.Assign(blob);
} else {
SetMeta(ROBJ_TAG, mask);
u_.r_obj.SetString(blob, tl.local_mr);
}
} else {
EncodeString(blob);
}
}
void CompactObj::Reset() {
@ -1186,40 +1205,25 @@ void CompactObj::EncodeString(string_view str) {
u_.r_obj.SetString(encoded, tl.local_mr);
}
pair<StringOrView, uint8_t> CompactObj::GetRawString() const {
StringOrView CompactObj::GetRawString() const {
DCHECK(!IsExternal());
if (taglen_ == ROBJ_TAG) {
CHECK_EQ(OBJ_STRING, u_.r_obj.type());
DCHECK_EQ(OBJ_ENCODING_RAW, u_.r_obj.encoding());
return {StringOrView::FromView(u_.r_obj.AsView()), mask_ & kEncMask};
return StringOrView::FromView(u_.r_obj.AsView());
}
if (taglen_ == SMALL_TAG) {
string tmp;
u_.small_str.Get(&tmp);
return {StringOrView::FromString(std::move(tmp)), mask_ & kEncMask};
return StringOrView::FromString(std::move(tmp));
}
LOG(FATAL) << "Unsupported tag for GetRawString(): " << taglen_;
return {};
}
void CompactObj::SetRawString(std::string_view blob, uint8_t enc_mask) {
DCHECK_GT(blob.size(), kInlineLen);
// Current implementation assumes that the object is External, and switches to string.
CHECK_EQ(taglen_, EXTERNAL_TAG);
uint8_t mask = (mask_ & ~kEncMask) | enc_mask;
if (kUseSmallStrings && SmallString::CanAllocate(blob.size())) {
SetMeta(SMALL_TAG, mask);
tl.small_str_bytes += u_.small_str.Assign(blob);
} else {
SetMeta(ROBJ_TAG, mask);
u_.r_obj.SetString(blob, tl.local_mr);
}
}
size_t CompactObj::DecodedLen(size_t sz) const {
return ascii_len(sz) - ((mask_ & ASCII1_ENC_BIT) ? 1 : 0);
}

View file

@ -324,10 +324,15 @@ class CompactObj {
}
void SetExternal(size_t offset, size_t sz);
void ImportExternal(const CompactObj& src);
std::pair<size_t, size_t> GetExternalSlice() const;
// The opposite of SetExternal, changes the external entry to be an in-memory string.
void Materialize(std::string_view str);
// Injects either the the raw string (extracted with GetRawString()) or the usual string
// back to the compact object. In the latter case, encoding is performed.
// Precondition: The object must be in the EXTERNAL state.
// Postcondition: The object is an in-memory string.
void Materialize(std::string_view str, bool is_raw);
// In case this object a single blob, returns number of bytes allocated on heap
// for that blob. Otherwise returns 0.
@ -380,12 +385,7 @@ class CompactObj {
// returns raw (non-decoded) string together with the encoding mask.
// Used to bypass decoding layer.
// Precondition: the object is a non-inline string.
std::pair<StringOrView, uint8_t> GetRawString() const;
// (blob, enc_mask) must be the same as returned by GetRawString
// NOTE: current implementation assumes that the object is of external type
// though the functionality may be extended to other states if needed.
void SetRawString(std::string_view blob, uint8_t enc_mask);
StringOrView GetRawString() const;
private:
void EncodeString(std::string_view str);

View file

@ -578,26 +578,27 @@ TEST_F(CompactObjectTest, RawInterface) {
string str(50, 'a'), tmp, owned;
cobj_.SetString(str);
{
auto [raw_blob, mask] = cobj_.GetRawString();
auto raw_blob = cobj_.GetRawString();
EXPECT_LT(raw_blob.view().size(), str.size());
EXPECT_TRUE(mask != 0);
raw_blob.MakeOwned();
cobj_.SetExternal(0, 10); // dummy external pointer
cobj_.SetRawString(raw_blob.view(), mask);
cobj_.Materialize(raw_blob.view(), true);
EXPECT_EQ(str, cobj_.GetSlice(&tmp));
}
str.assign(50, char(200)); // non ascii
cobj_.SetString(str);
{
auto [raw_blob, mask] = cobj_.GetRawString();
auto raw_blob = cobj_.GetRawString();
EXPECT_EQ(raw_blob.view(), str);
EXPECT_EQ(mask, 0);
raw_blob.MakeOwned();
cobj_.SetExternal(0, 10); // dummy external pointer
cobj_.SetRawString(raw_blob.view(), mask);
cobj_.Materialize(raw_blob.view(), true);
EXPECT_EQ(str, cobj_.GetSlice(&tmp));
}

View file

@ -489,7 +489,7 @@ OpResult<variant<size_t, util::fb2::Future<size_t>>> OpExtend(const OpArgs& op_a
if (it_res->it->second.ObjType() != OBJ_STRING)
return OpStatus::WRONG_TYPE;
if (PrimeValue& pv = it_res->it->second; pv.IsExternal()) {
if (const PrimeValue& pv = it_res->it->second; pv.IsExternal()) {
auto modf = [value = string{value}, prepend](std::string* v) {
*v = prepend ? absl::StrCat(value, *v) : absl::StrCat(*v, value);
return v->size();

View file

@ -63,6 +63,16 @@ void RecordAdded(const PrimeValue& pv, size_t tiered_len, DbTableStats* stats) {
stats->tiered_used_bytes += tiered_len;
}
string DecodeString(bool is_raw, string_view str, PrimeValue decoder) {
if (is_raw) {
decoder.Materialize(str, true);
string tmp;
decoder.GetString(&tmp);
return tmp;
}
return string{str};
}
} // anonymous namespace
class TieredStorage::ShardOpManager : public tiering::OpManager {
@ -117,9 +127,11 @@ class TieredStorage::ShardOpManager : public tiering::OpManager {
bool NotifyDelete(tiering::DiskSegment segment) override;
// Set value to be an in-memory type again, either empty or with a value. Update memory stats
void Upload(DbIndex dbid, string_view value, size_t serialized_len, PrimeValue* pv) {
pv->Materialize(value);
// Set value to be an in-memory type again. Update memory stats.
void Upload(DbIndex dbid, string_view value, bool is_raw, size_t serialized_len, PrimeValue* pv) {
DCHECK(!value.empty());
pv->Materialize(value, is_raw);
RecordDeleted(*pv, serialized_len, GetDbTableStats(dbid));
}
@ -169,7 +181,7 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str
// Cut out relevant part of value and restore it to memory
string_view value = page.substr(item_segment.offset - segment.offset, item_segment.length);
Upload(dbid, value, item_segment.length, &it->second);
Upload(dbid, value, true, item_segment.length, &it->second);
}
}
@ -182,19 +194,22 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value,
return true; // delete
}
if (!modified && !cache_fetched_)
return false;
// A workaround - to avoid polluting in-memory table by reads that go into a snapshot.
// It's not precise because we may handle reads coming from client requests.
// 1. When modified is true we MUST upload the value back to memory.
// 2. On the other hand, if read is caused by snapshotting we do not want to fetch it.
// Currently, our heuristic is not very smart, because we stop uploading any reads during
// the snapshotting.
// TODO: to revisit this when we rewrite it with more efficient snapshotting algorithm.
if (SliceSnapshot::IsSnaphotInProgress())
bool should_upload = modified || (cache_fetched_ && !SliceSnapshot::IsSnaphotInProgress());
if (!should_upload)
return false;
auto key = get<OpManager::KeyRef>(id);
auto* pv = Find(key);
if (pv && pv->IsExternal() && segment == pv->GetExternalSlice()) {
Upload(key.first, value, segment.length, pv);
bool is_raw = !modified;
Upload(key.first, value, is_raw, segment.length, pv);
return true;
}
@ -214,7 +229,11 @@ bool TieredStorage::ShardOpManager::NotifyDelete(tiering::DiskSegment segment) {
if (bin.fragmented) {
// Trigger read to signal need for defragmentation. NotifyFetched will handle it.
VLOG(1) << "Enqueueing bin defragmentation for: x" << bin.segment.offset;
Enqueue(kFragmentedBin, bin.segment, [](std::string*) { return false; });
auto cb = [dummy = 5](bool, std::string*) -> bool {
(void)dummy; // a hack to make cb non constexpr that confuses some old) compilers.
return false;
};
Enqueue(kFragmentedBin, bin.segment, std::move(cb));
}
return false;
@ -241,10 +260,17 @@ util::fb2::Future<string> TieredStorage::Read(DbIndex dbid, string_view key,
const PrimeValue& value) {
DCHECK(value.IsExternal());
util::fb2::Future<string> future;
auto cb = [future](string* value) mutable {
future.Resolve(*value);
return false;
// The raw_val passed to cb might need decoding based on the encoding mask of the "value" object.
// We save the mask in decoder and use it to decode the final string that Read should resolve.
PrimeValue decoder;
decoder.ImportExternal(value);
auto cb = [future, decoder = std::move(decoder)](bool is_raw, const string* raw_val) mutable {
future.Resolve(DecodeString(is_raw, *raw_val, std::move(decoder)));
return false; // was not modified
};
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
return future;
}
@ -252,8 +278,13 @@ util::fb2::Future<string> TieredStorage::Read(DbIndex dbid, string_view key,
void TieredStorage::Read(DbIndex dbid, std::string_view key, const PrimeValue& value,
std::function<void(const std::string&)> readf) {
DCHECK(value.IsExternal());
auto cb = [readf = std::move(readf)](string* value) {
readf(*value);
PrimeValue decoder;
decoder.ImportExternal(value);
auto cb = [readf = std::move(readf), decoder = std::move(decoder)](
bool is_raw, const string* raw_val) mutable {
readf(DecodeString(is_raw, *raw_val, std::move(decoder)));
return false;
};
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
@ -265,14 +296,23 @@ util::fb2::Future<T> TieredStorage::Modify(DbIndex dbid, std::string_view key,
std::function<T(std::string*)> modf) {
DCHECK(value.IsExternal());
util::fb2::Future<T> future;
auto cb = [future, modf = std::move(modf)](std::string* value) mutable {
future.Resolve(modf(value));
PrimeValue decoder;
decoder.ImportExternal(value);
auto cb = [future, modf = std::move(modf), decoder = std::move(decoder)](
bool is_raw, std::string* raw_val) mutable {
if (is_raw) {
decoder.Materialize(*raw_val, true);
decoder.GetString(raw_val);
}
future.Resolve(modf(raw_val));
return true;
};
op_manager_->Enqueue(KeyRef(dbid, key), value.GetExternalSlice(), std::move(cb));
return future;
}
// Instantiate for size_t only - used in string_family's OpExtend.
template util::fb2::Future<size_t> TieredStorage::Modify(DbIndex dbid, std::string_view key,
const PrimeValue& value,
std::function<size_t(std::string*)> modf);
@ -291,16 +331,15 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) {
return false;
}
string buf;
string_view value_sv = value->GetSlice(&buf);
StringOrView raw_string = value->GetRawString();
value->SetIoPending(true);
tiering::OpManager::EntryId id;
error_code ec;
if (OccupiesWholePages(value->Size())) { // large enough for own page
id = KeyRef(dbid, key);
ec = op_manager_->Stash(id, value_sv, {});
} else if (auto bin = bins_->Stash(dbid, key, value_sv, {}); bin) {
ec = op_manager_->Stash(id, raw_string.view(), {});
} else if (auto bin = bins_->Stash(dbid, key, raw_string.view(), {}); bin) {
id = bin->first;
ec = op_manager_->Stash(id, bin->second, {});
}

View file

@ -50,7 +50,9 @@ class TieredStorage {
void Read(DbIndex dbid, std::string_view key, const PrimeValue& value,
std::function<void(const std::string&)> readf);
// Apply modification to offloaded value, return generic result from callback
// Apply modification to offloaded value, return generic result from callback.
// Unlike immutable Reads - the modified value must be uploaded back to memory.
// This is handled by OpManager when modf completes.
template <typename T>
util::fb2::Future<T> Modify(DbIndex dbid, std::string_view key, const PrimeValue& value,
std::function<T(std::string*)> modf);

View file

@ -76,18 +76,18 @@ TEST_F(TieredStorageTest, SimpleGetSet) {
size_t stashes = 0;
ExpectConditionWithinTimeout([this, &stashes] {
stashes = GetMetrics().tiered_stats.total_stashes;
return stashes >= kMax - 256 - 1;
return stashes >= kMax - kMin - 1;
});
// All entries were accounted for except that one (see comment above)
auto metrics = GetMetrics();
EXPECT_EQ(metrics.db_stats[0].tiered_entries, kMax - kMin - 1);
EXPECT_EQ(metrics.db_stats[0].tiered_used_bytes, (kMax - 1 + kMin) * (kMax - kMin) / 2 - 2047);
EXPECT_LE(metrics.db_stats[0].tiered_used_bytes, (kMax - 1 + kMin) * (kMax - kMin) / 2 - 2047);
// Perform GETSETs
for (size_t i = kMin; i < kMax; i++) {
auto resp = Run({"GETSET", absl::StrCat("k", i), string(i, 'B')});
ASSERT_EQ(resp, string(i, 'A')) << i;
ASSERT_EQ(resp, BuildString(i)) << i;
}
// Perform GETs
@ -126,7 +126,7 @@ TEST_F(TieredStorageTest, SimpleAppend) {
if (sleep)
util::ThisFiber::SleepFor(sleep * 1us);
EXPECT_THAT(Run({"APPEND", "k0", "B"}), IntArg(3001));
EXPECT_EQ(Run({"GET", "k0"}), BuildString(3000) + 'B');
ASSERT_EQ(Run({"GET", "k0"}), BuildString(3000) + 'B') << sleep;
}
}
@ -148,16 +148,18 @@ TEST_F(TieredStorageTest, MultiDb) {
TEST_F(TieredStorageTest, Defrag) {
for (char k = 'a'; k < 'a' + 8; k++) {
Run({"SET", string(1, k), string(512, k)});
Run({"SET", string(1, k), string(600, k)});
}
ExpectConditionWithinTimeout([this] { return GetMetrics().tiered_stats.total_stashes >= 1; });
// 7 out 8 are in one bin, the last one made if flush and is now filling
auto metrics = GetMetrics();
EXPECT_EQ(metrics.tiered_stats.small_bins_cnt, 1u);
EXPECT_EQ(metrics.tiered_stats.small_bins_entries_cnt, 7u);
EXPECT_EQ(metrics.tiered_stats.small_bins_filling_bytes, 512 + 12);
ASSERT_EQ(metrics.tiered_stats.small_bins_cnt, 1u);
ASSERT_EQ(metrics.tiered_stats.small_bins_entries_cnt, 7u);
// Distorted due to encoded values.
ASSERT_EQ(metrics.tiered_stats.small_bins_filling_bytes, 537);
// Reading 3 values still leaves the bin more than half occupied
Run({"GET", string(1, 'a')});

View file

@ -142,7 +142,7 @@ void OpManager::ProcessRead(size_t offset, std::string_view page) {
bool modified = false;
for (auto& cb : ko.callbacks)
modified |= cb(&key_value);
modified |= cb(!modified, &key_value);
bool delete_from_storage = ko.deleting;

View file

@ -35,8 +35,16 @@ class OpManager {
using EntryId = std::variant<unsigned, KeyRef>;
using OwnedEntryId = std::variant<unsigned, std::pair<DbIndex, std::string>>;
// Callback for post-read completion. Returns whether the value was modified
using ReadCallback = std::function<bool(std::string*)>;
// Callback for post-read completion. Returns whether the value was modified.
// We use fu2 function to allow moveable semantics. The arguments are:
// bool - true if the string is raw as it was extracted from the prime value.
// string* - the string that may potentially be modified by the callbacks that subsribed to this
// read. The callback run in the same order as the order of invocation, guaranteeing
// consistent read after modifications.
using ReadCallback =
fu2::function_base<true /*owns*/, false /*moveable*/, fu2::capacity_fixed<40, 8>,
false /* non-throwing*/, false /* strong exceptions guarantees*/,
bool(bool, std::string*)>;
explicit OpManager(size_t max_size);
virtual ~OpManager();

View file

@ -35,7 +35,7 @@ struct OpManagerTest : PoolTestBase, OpManager {
util::fb2::Future<std::string> Read(EntryId id, DiskSegment segment) {
util::fb2::Future<std::string> future;
Enqueue(id, segment, [future](std::string* value) mutable {
Enqueue(id, segment, [future](bool, std::string* value) mutable {
future.Resolve(*value);
return false;
});
@ -151,7 +151,7 @@ TEST_F(OpManagerTest, Modify) {
// Atomically issue sequence of modify-read operations
std::vector<util::fb2::Future<std::string>> futures;
for (size_t i = 0; i < 10; i++) {
Enqueue(0u, stashed_[0u], [i](std::string* v) {
Enqueue(0u, stashed_[0u], [i](bool, std::string* v) {
absl::StrAppend(v, i);
return true;
});