diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 05870dfb2..9ece093a1 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -5,6 +5,7 @@ #include "server/test_utils.h" #include "server/acl/acl_commands_def.h" +#include "util/fibers/fibers.h" extern "C" { #include "redis/zmalloc.h" @@ -661,7 +662,8 @@ void BaseFamilyTest::ExpectConditionWithinTimeout(const std::function& c if (condition()) { break; } - absl::SleepFor(absl::Milliseconds(10)); + ThisFiber::SleepFor(5ms); + // absl::SleepFor(absl::Milliseconds(10)); ?? } EXPECT_LE(absl::Now(), deadline) diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 7264d7e3b..858cd9cbf 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -6,12 +6,21 @@ #include +#include +#include +#include + #include "absl/cleanup/cleanup.h" +#include "absl/flags/internal/flag.h" #include "base/flags.h" #include "base/logging.h" +#include "server/common.h" #include "server/db_slice.h" #include "server/engine_shard_set.h" -#include "util/fibers/fibers.h" +#include "server/table.h" +#include "server/tiering/common.h" +#include "server/tiering/op_manager.h" +#include "server/tiering/small_bins.h" ABSL_FLAG(uint32_t, tiered_storage_max_pending_writes, 32, "Maximal number of pending writes per thread"); @@ -19,6 +28,9 @@ ABSL_FLAG(uint32_t, tiered_storage_throttle_us, 1, "Slow down tiered storage writes for at most this usec in case of I/O saturation " "specified by tiered_storage_max_pending_writes. 0 - do not throttle."); +ABSL_FLAG(bool, tiered_storage_v2_cache_fetched, true, + "WIP: Load results of offloaded reads to memory"); + namespace dfly { using namespace std; @@ -770,4 +782,123 @@ bool TieredStorage::CanExternalizeEntry(PrimeIterator it) { !it->second.IsExternal() && EligibleForOffload(it->second.Size()); } +class TieredStorageV2::ShardOpManager : public tiering::OpManager { + public: + ShardOpManager(TieredStorageV2* ts, DbSlice* db_slice) : ts_{ts}, db_slice_{db_slice} { + cache_fetched_ = absl::GetFlag(FLAGS_tiered_storage_v2_cache_fetched); + } + + // Find entry by key in db_slice and store external segment in place of original value + void SetExternal(std::string_view key, tiering::DiskSegment segment) { + if (auto pv = Find(key); pv) { + pv->SetIoPending(false); + pv->SetExternal(segment.offset, segment.length); // TODO: Handle memory stats + } + } + + void ClearIoPending(std::string_view key) { + if (auto pv = Find(key); pv) + pv->SetIoPending(false); + } + + // Find entry by key and store it's up-to-date value in place of external segment + void SetInMemory(std::string_view key, std::string_view value) { + if (auto pv = Find(key); pv) { + pv->Reset(); // TODO: account for memory + pv->SetString(value); + } + } + + void ReportStashed(EntryId id, tiering::DiskSegment segment) override { + if (holds_alternative(id)) { + SetExternal(get(id), segment); + } else { + for (const auto& [sub_key, sub_segment] : + ts_->bins_->ReportStashed(get(id), segment)) + SetExternal(string_view{sub_key}, sub_segment); + } + } + + void ReportFetched(EntryId id, std::string_view value, tiering::DiskSegment segment) override { + DCHECK(holds_alternative(id)); // we never issue reads for bins + + if (!cache_fetched_) + return; + + SetInMemory(get(id), value); + + // Delete value + if (segment.length >= 2_KB) { + Delete(segment); + } else { + if (auto bin_segment = ts_->bins_->Delete(segment); bin_segment) + Delete(*bin_segment); + } + } + + private: + PrimeValue* Find(std::string_view key) { + // TODO: Get DbContext for transaction for correct dbid and time + auto it = db_slice_->FindMutable(DbContext{}, key); + return IsValid(it.it) ? &it.it->second : nullptr; + } + + bool cache_fetched_ = false; + TieredStorageV2* ts_; + DbSlice* db_slice_; +}; + +TieredStorageV2::TieredStorageV2(DbSlice* db_slice) + : op_manager_{make_unique(this, db_slice)}, + bins_{make_unique()} { +} + +TieredStorageV2::~TieredStorageV2() { +} + +std::error_code TieredStorageV2::Open(string_view path) { + return op_manager_->Open(path); +} + +void TieredStorageV2::Close() { + op_manager_->Close(); +} + +util::fb2::Future TieredStorageV2::Read(string_view key, const PrimeValue& value) { + DCHECK(value.IsExternal()); + return op_manager_->Read(key, value.GetExternalSlice()); +} + +void TieredStorageV2::Stash(string_view key, PrimeValue* value) { + string buf; + string_view value_sv = value->GetSlice(&buf); + value->SetIoPending(true); + + if (value->Size() >= 2_KB) { + if (auto ec = op_manager_->Stash(key, value_sv); ec) + value->SetIoPending(false); + } else if (auto bin = bins_->Stash(key, value_sv); bin) { + if (auto ec = op_manager_->Stash(bin->first, bin->second); ec) { + for (const string& key : bins_->ReportStashAborted(bin->first)) + op_manager_->ClearIoPending(key); // clear IO_PENDING flag + } + } +} +void TieredStorageV2::Delete(string_view key, PrimeValue* value) { + if (value->IsExternal()) { + tiering::DiskSegment segment = value->GetExternalSlice(); + if (segment.length >= 2_KB) { + op_manager_->Delete(segment); + } else if (auto bin = bins_->Delete(segment); bin) { + op_manager_->Delete(*bin); + } + } else { + if (value->Size() >= 2_KB) { + op_manager_->Delete(key); + } else if (auto bin = bins_->Delete(key); bin) { + op_manager_->Delete(*bin); + } + } +} + } // namespace dfly diff --git a/src/server/tiered_storage.h b/src/server/tiered_storage.h index ea862ef3e..82dc06e49 100644 --- a/src/server/tiered_storage.h +++ b/src/server/tiered_storage.h @@ -3,6 +3,9 @@ // #pragma once +#include + +#include "util/fibers/future.h" #ifdef __linux__ #include @@ -16,6 +19,38 @@ namespace dfly { class DbSlice; +namespace tiering { +class SmallBins; +}; + +// Manages offloaded values +class TieredStorageV2 { + class ShardOpManager; + + public: + explicit TieredStorageV2(DbSlice* db_slice); + ~TieredStorageV2(); // drop forward declared unique_ptrs + + TieredStorageV2(TieredStorageV2&& other) = delete; + TieredStorageV2(const TieredStorageV2& other) = delete; + + std::error_code Open(std::string_view path); + void Close(); + + // Read offloaded value. It must be of external type + util::fb2::Future Read(std::string_view key, const PrimeValue& value); + + // Stash value. Sets IO_PENDING flag and unsets it on error or when finished + void Stash(std::string_view key, PrimeValue* value); + + // Delete value. Must either have pending IO or be offloaded (of external type) + void Delete(std::string_view key, PrimeValue* value); + + private: + std::unique_ptr op_manager_; + std::unique_ptr bins_; +}; + class TieredStorage { public: enum : uint16_t { kMinBlobLen = 64 }; diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index 1b702426f..dba03178d 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -2,13 +2,19 @@ // See LICENSE for licensing terms. // +#include "server/tiered_storage.h" + #include #include +#include +#include "absl/flags/internal/flag.h" #include "base/flags.h" #include "base/logging.h" #include "facade/facade_test.h" +#include "server/engine_shard_set.h" #include "server/test_utils.h" +#include "util/fibers/fibers.h" using namespace std; using namespace testing; @@ -16,6 +22,7 @@ using absl::SetFlag; using absl::StrCat; ABSL_DECLARE_FLAG(string, tiered_prefix); +ABSL_DECLARE_FLAG(bool, tiered_storage_v2_cache_fetched); namespace dfly { @@ -33,6 +40,35 @@ class TieredStorageTest : public BaseFamilyTest { static void SetUpTestSuite(); }; +class TieredStorageV2Test : public BaseFamilyTest { + protected: + TieredStorageV2Test() { + num_threads_ = 1; + } + + void SetUp() override { + // TODO: Use FlagSaver if there is need to run V1 tests after V2 + absl::SetFlag(&FLAGS_tiered_prefix, ""); + absl::SetFlag(&FLAGS_tiered_storage_v2_cache_fetched, true); + + BaseFamilyTest::SetUp(); + auto* shard = shard_set->Await(0, [] { return EngineShard::tlocal(); }); + storage_.emplace(&shard->db_slice()); + shard_set->Await(0, [storage = &*storage_] { + auto ec = storage->Open(absl::StrCat("/tmp/tiered_storage_test", 1)); + EXPECT_FALSE(ec); + }); + } + + void TearDown() override { + shard_set->Await(0, [storage = &*storage_] { storage->Close(); }); + BaseFamilyTest::TearDown(); + } + + public: + std::optional storage_; +}; + void TieredStorageTest::SetUpTestSuite() { BaseFamilyTest::SetUpTestSuite(); SetFlag(&FLAGS_tiered_prefix, "/tmp/spill"); @@ -318,4 +354,55 @@ TEST_F(TieredStorageTest, GetValueValidation) { EXPECT_EQ(m.db_stats[0].tiered_entries, 0); } +TEST_F(TieredStorageV2Test, SimpleStash) { + // Create simple values + vector> values(20); + for (unsigned i = 0; i < values.size(); i++) { + // 3 kb is above small bins size + values[i] = {absl::StrCat("key", i), string(3_KB, char('A' + i))}; + Run({"set", values[i].first, values[i].second}); + } + + vector> futures; + shard_set->Await(0, [this, &values, &futures] { + auto& db_slice = EngineShard::tlocal()->db_slice(); + + // Schedule STASH for values + for (const auto& [key, _] : values) { + auto it = db_slice.FindMutable(DbContext{}, key); + storage_->Stash(key, &it.it->second); + } + + // Wait for all values to be stashed + ExpectConditionWithinTimeout([&values, &db_slice] { + for (auto [key, _] : values) + if (db_slice.FindMutable(DbContext{}, key).it->second.HasIoPending()) + return false; + return true; + }); + + // Now read all the values + for (const auto& [key, _] : values) { + auto it = db_slice.FindMutable(DbContext{}, key); + EXPECT_TRUE(it.it->second.IsExternal()); + futures.emplace_back(storage_->Read(key, it.it->second)); + } + }); + + // Wait for futures and assert correct values were read + for (unsigned i = 0; i < values.size(); i++) + EXPECT_EQ(futures[i].get(), values[i].second); + + shard_set->Await(0, [&values] { + auto& db_slice = EngineShard::tlocal()->db_slice(); + + // Make sure all values were loaded back to memory + for (const auto& [key, value] : values) { + auto it = db_slice.FindMutable(DbContext{}, key); + std::string buf; + EXPECT_EQ(it.it->second.GetSlice(&buf), value); + } + }); +} + } // namespace dfly diff --git a/src/server/tiering/op_manager.cc b/src/server/tiering/op_manager.cc index 219151d52..54d9e500f 100644 --- a/src/server/tiering/op_manager.cc +++ b/src/server/tiering/op_manager.cc @@ -92,18 +92,18 @@ void OpManager::ProcessStashed(EntryId id, unsigned version, DiskSegment segment } void OpManager::ProcessRead(size_t offset, std::string_view value) { - auto node = pending_reads_.extract(offset); - ReadOp& info = node.mapped(); + ReadOp* info = &pending_reads_.at(offset); - for (auto& ko : info.key_ops) { - auto key_value = value.substr(ko.segment.offset - info.segment.offset, ko.segment.length); + for (auto& ko : info->key_ops) { + auto key_value = value.substr(ko.segment.offset - info->segment.offset, ko.segment.length); for (auto& fut : ko.futures) fut.set_value(std::string{key_value}); ReportFetched(Borrowed(ko.id), key_value, ko.segment); } - if (info.delete_requested) - storage_.MarkAsFree(info.segment); + if (info->delete_requested) + storage_.MarkAsFree(info->segment); + pending_reads_.erase(offset); } OpManager::EntryOps& OpManager::ReadOp::ForId(EntryId id, DiskSegment key_segment) {