mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
feat(tiering): Simplest small bins (#2810)
Simplest small bins to unite values significantly less than 2k into separate bins --------- Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
0d91e0313d
commit
5fcd64aea9
4 changed files with 246 additions and 1 deletions
|
@ -14,13 +14,14 @@ set_property(SOURCE dfly_main.cc APPEND PROPERTY COMPILE_DEFINITIONS
|
|||
SOURCE_PATH_FROM_BUILD_ENV=${CMAKE_SOURCE_DIR})
|
||||
|
||||
if ("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux")
|
||||
SET(TX_LINUX_SRCS io_mgr.cc tiered_storage.cc tiering/disk_storage.cc tiering/op_manager.cc)
|
||||
SET(TX_LINUX_SRCS io_mgr.cc tiered_storage.cc tiering/disk_storage.cc tiering/op_manager.cc tiering/small_bins.cc)
|
||||
|
||||
add_executable(dfly_bench dfly_bench.cc)
|
||||
cxx_link(dfly_bench dfly_facade fibers2 absl::random_random)
|
||||
cxx_test(tiered_storage_test dfly_test_lib LABELS DFLY)
|
||||
cxx_test(tiering/disk_storage_test dfly_test_lib LABELS DFLY)
|
||||
cxx_test(tiering/op_manager_test dfly_test_lib LABELS DFLY)
|
||||
cxx_test(tiering/small_bins_test dfly_test_lib LABELS DFLY)
|
||||
endif()
|
||||
|
||||
|
||||
|
|
101
src/server/tiering/small_bins.cc
Normal file
101
src/server/tiering/small_bins.cc
Normal file
|
@ -0,0 +1,101 @@
|
|||
// Copyright 2024, DragonflyDB authors. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
|
||||
#include "server/tiering/small_bins.h"
|
||||
|
||||
#include <algorithm>
|
||||
#include <optional>
|
||||
#include <utility>
|
||||
|
||||
#include "absl/base/internal/endian.h"
|
||||
#include "base/logging.h"
|
||||
#include "core/compact_object.h"
|
||||
#include "server/tiering/disk_storage.h"
|
||||
|
||||
namespace dfly::tiering {
|
||||
|
||||
std::optional<SmallBins::FilledBin> SmallBins::Stash(std::string_view key, std::string_view value) {
|
||||
DCHECK_LT(value.size(), 2_KB);
|
||||
|
||||
// See FlushBin() for format details
|
||||
size_t value_bytes = 8 /* hash */ + value.size();
|
||||
|
||||
std::optional<FilledBin> filled_bin;
|
||||
if (2 /* num entries */ + current_bin_bytes_ + value_bytes >= 4_KB) {
|
||||
filled_bin = FlushBin();
|
||||
}
|
||||
|
||||
current_bin_bytes_ += value_bytes;
|
||||
current_bin_[key] = value;
|
||||
return filled_bin;
|
||||
}
|
||||
|
||||
SmallBins::FilledBin SmallBins::FlushBin() {
|
||||
std::string out;
|
||||
out.resize(current_bin_bytes_ + 2);
|
||||
|
||||
BinId id = ++last_bin_id_;
|
||||
auto& pending_set = pending_bins_[id];
|
||||
|
||||
char* data = out.data();
|
||||
|
||||
// Store number of entries, 2 bytes
|
||||
absl::little_endian::Store16(data, current_bin_.size());
|
||||
data += sizeof(uint16_t);
|
||||
|
||||
// Store all hashes, n * 8 bytes
|
||||
for (const auto& [key, _] : current_bin_) {
|
||||
absl::little_endian::Store64(data, CompactObj::HashCode(key));
|
||||
data += sizeof(uint64_t);
|
||||
}
|
||||
|
||||
// Store all values, n * x bytes
|
||||
for (const auto& [key, value] : current_bin_) {
|
||||
pending_set[key] = {size_t(data - out.data()), value.size()};
|
||||
|
||||
memcpy(data, value.data(), value.size());
|
||||
data += value.size();
|
||||
}
|
||||
|
||||
current_bin_bytes_ = 0; // num hashes
|
||||
current_bin_.erase(current_bin_.begin(), current_bin_.end());
|
||||
|
||||
return {id, std::move(out)};
|
||||
}
|
||||
|
||||
SmallBins::KeySegmentList SmallBins::ReportStashed(BinId id, DiskSegment segment) {
|
||||
auto key_list = pending_bins_.extract(id);
|
||||
return SmallBins::KeySegmentList{key_list.mapped().begin(), key_list.mapped().end()};
|
||||
}
|
||||
|
||||
std::vector<std::string> SmallBins::ReportStashAborted(BinId id) {
|
||||
std::vector<std::string> out;
|
||||
|
||||
auto node = pending_bins_.extract(id);
|
||||
auto& entries = node.mapped();
|
||||
while (!entries.empty())
|
||||
out.emplace_back(std::move(entries.extract(entries.begin()).key()));
|
||||
|
||||
return out;
|
||||
}
|
||||
|
||||
std::optional<SmallBins::BinId> SmallBins::Delete(std::string_view key) {
|
||||
for (auto& [id, keys] : pending_bins_) {
|
||||
if (keys.erase(key))
|
||||
return keys.empty() ? std::make_optional(id) : std::nullopt;
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
std::optional<DiskSegment> SmallBins::Delete(DiskSegment segment) {
|
||||
segment = segment.FillPages();
|
||||
if (auto it = stashed_bins_.find(segment.offset);
|
||||
it != stashed_bins_.end() && --it->second == 0) {
|
||||
stashed_bins_.erase(it);
|
||||
return segment;
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
} // namespace dfly::tiering
|
63
src/server/tiering/small_bins.h
Normal file
63
src/server/tiering/small_bins.h
Normal file
|
@ -0,0 +1,63 @@
|
|||
// Copyright 2024, DragonflyDB authors. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <absl/container/flat_hash_map.h>
|
||||
|
||||
#include <optional>
|
||||
#include <string>
|
||||
#include <unordered_map>
|
||||
#include <vector>
|
||||
|
||||
#include "server/tiering/disk_storage.h"
|
||||
|
||||
namespace dfly::tiering {
|
||||
|
||||
// Small bins accumulate small values into larger bins that fill up 4kb pages.
|
||||
// SIMPLEST VERSION for now.
|
||||
class SmallBins {
|
||||
public:
|
||||
using BinId = unsigned;
|
||||
|
||||
// Bin filled with blob of serialized entries
|
||||
using FilledBin = std::pair<BinId, std::string>;
|
||||
|
||||
// List of locations of values for corresponding keys of previously filled bin
|
||||
using KeySegmentList = std::vector<std::pair<std::string /* key*/, DiskSegment>>;
|
||||
|
||||
// Enqueue key/value pair for stash. Returns page to be stashed if it filled up.
|
||||
std::optional<FilledBin> Stash(std::string_view key, std::string_view value);
|
||||
|
||||
// Report that a stash succeeeded. Returns list of stored keys with calculated value locations.
|
||||
KeySegmentList ReportStashed(BinId id, DiskSegment segment);
|
||||
|
||||
// Report that a stash was aborted. Returns list of keys that the entry contained.
|
||||
std::vector<std::string /* key */> ReportStashAborted(BinId id);
|
||||
|
||||
// Delete a key with pending io. Returns entry id if needs to be deleted.
|
||||
std::optional<BinId> Delete(std::string_view key);
|
||||
|
||||
// Delete a stored segment. Returns page segment if it became emtpy and needs to be deleted.
|
||||
std::optional<DiskSegment> Delete(DiskSegment segment);
|
||||
|
||||
private:
|
||||
// Flush current bin
|
||||
FilledBin FlushBin();
|
||||
|
||||
private:
|
||||
BinId last_bin_id_ = 0;
|
||||
|
||||
unsigned current_bin_bytes_ = 0;
|
||||
absl::flat_hash_map<std::string, std::string> current_bin_;
|
||||
|
||||
// Pending stashes, their keys and value sizes
|
||||
absl::flat_hash_map<unsigned /* id */, absl::flat_hash_map<std::string /* key*/, DiskSegment>>
|
||||
pending_bins_;
|
||||
|
||||
// Map of bins that were stashed and should be deleted when refcount reaches 0
|
||||
absl::flat_hash_map<size_t /*offset*/, unsigned /* refcount*/> stashed_bins_;
|
||||
};
|
||||
|
||||
}; // namespace dfly::tiering
|
80
src/server/tiering/small_bins_test.cc
Normal file
80
src/server/tiering/small_bins_test.cc
Normal file
|
@ -0,0 +1,80 @@
|
|||
// Copyright 2024, DragonflyDB authors. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
|
||||
#include "server/tiering/small_bins.h"
|
||||
|
||||
#include <absl/strings/str_cat.h>
|
||||
|
||||
#include <algorithm>
|
||||
|
||||
#include "base/gtest.h"
|
||||
#include "base/logging.h"
|
||||
#include "server/tiering/disk_storage.h"
|
||||
|
||||
namespace dfly::tiering {
|
||||
|
||||
using namespace std;
|
||||
using namespace std::string_literals;
|
||||
|
||||
TEST(SmallBins, SimpleStashRead) {
|
||||
SmallBins bins;
|
||||
|
||||
// Fill single bin
|
||||
std::optional<SmallBins::FilledBin> bin;
|
||||
for (unsigned i = 0; !bin; i++)
|
||||
bin = bins.Stash(absl::StrCat("k", i), absl::StrCat("v", i));
|
||||
|
||||
// Verify cut locations point to correct values
|
||||
auto segments = bins.ReportStashed(bin->first, DiskSegment{0, 4_KB});
|
||||
for (auto [key, location] : segments) {
|
||||
auto value = "v"s + key.substr(1);
|
||||
EXPECT_EQ(value, bin->second.substr(location.offset, location.length));
|
||||
}
|
||||
}
|
||||
|
||||
TEST(SmallBins, SimpleDeleteAbort) {
|
||||
SmallBins bins;
|
||||
|
||||
// Fill single bin
|
||||
std::optional<SmallBins::FilledBin> bin;
|
||||
unsigned i = 0;
|
||||
for (; !bin; i++)
|
||||
bin = bins.Stash(absl::StrCat("k", i), absl::StrCat("v", i));
|
||||
|
||||
// Delete all even values
|
||||
for (unsigned j = 0; j <= i; j += 2)
|
||||
bins.Delete(absl::StrCat("k", j));
|
||||
|
||||
auto remaining = bins.ReportStashAborted(bin->first);
|
||||
sort(remaining.begin(), remaining.end());
|
||||
|
||||
// Expect all odd keys still to exist
|
||||
EXPECT_EQ(remaining.size(), i / 2);
|
||||
for (unsigned j = 1; j < i; j += 2)
|
||||
EXPECT_TRUE(binary_search(remaining.begin(), remaining.end(), absl::StrCat("k", j))) << j;
|
||||
}
|
||||
|
||||
TEST(SmallBins, PartialStash) {
|
||||
SmallBins bins;
|
||||
|
||||
// Fill single bin
|
||||
std::optional<SmallBins::FilledBin> bin;
|
||||
unsigned i = 0;
|
||||
for (; !bin; i++)
|
||||
bin = bins.Stash(absl::StrCat("k", i), absl::StrCat("v", i));
|
||||
|
||||
// Delete all even values
|
||||
for (unsigned j = 0; j <= i; j += 2)
|
||||
bins.Delete(absl::StrCat("k", j));
|
||||
|
||||
auto segments = bins.ReportStashed(bin->first, DiskSegment{0, 4_KB});
|
||||
|
||||
// Expect all odd keys still to exist
|
||||
EXPECT_EQ(segments.size(), i / 2);
|
||||
for (auto& [key, segment] : segments) {
|
||||
EXPECT_EQ(key, "k"s + bin->second.substr(segment.offset, segment.length).substr(1));
|
||||
}
|
||||
}
|
||||
|
||||
} // namespace dfly::tiering
|
Loading…
Add table
Add a link
Reference in a new issue