diff --git a/src/core/compact_object.h b/src/core/compact_object.h index 35e53adf6..8eadcba7e 100644 --- a/src/core/compact_object.h +++ b/src/core/compact_object.h @@ -238,13 +238,13 @@ class CompactObj { } } - bool HasIoPending() const { + bool DefragIfNeeded(float ratio); + + bool HasStashPending() const { return mask_ & IO_PENDING; } - bool DefragIfNeeded(float ratio); - - void SetIoPending(bool b) { + void SetStashPending(bool b) { if (b) { mask_ |= IO_PENDING; } else { @@ -451,7 +451,7 @@ class CompactObj { // static_assert(sizeof(u_) == 16, ""); - mutable uint8_t mask_ = 0; + uint8_t mask_ = 0; // We currently reserve 5 bits for tags and 3 bits for extending the mask. currently reserved. uint8_t taglen_ = 0; diff --git a/src/server/acl/acl_family_test.cc b/src/server/acl/acl_family_test.cc index 6cdd2b524..cbacb7630 100644 --- a/src/server/acl/acl_family_test.cc +++ b/src/server/acl/acl_family_test.cc @@ -4,10 +4,11 @@ #include "server/acl/acl_family.h" -#include "absl/container/flat_hash_map.h" -#include "absl/flags/internal/flag.h" -#include "absl/strings/ascii.h" -#include "absl/strings/str_cat.h" +#include +#include +#include + +#include "base/flags.h" #include "base/gtest.h" #include "base/logging.h" #include "facade/facade_test.h" diff --git a/src/server/acl/acl_log.cc b/src/server/acl/acl_log.cc index 9e05627a5..7ab4ca75e 100644 --- a/src/server/acl/acl_log.cc +++ b/src/server/acl/acl_log.cc @@ -10,6 +10,7 @@ #include "base/flags.h" #include "base/logging.h" #include "facade/dragonfly_connection.h" +#include "server/conn_context.h" ABSL_FLAG(size_t, acllog_max_len, 32, "Specify the number of log entries. Logs are kept locally for each thread " diff --git a/src/server/acl/acl_log.h b/src/server/acl/acl_log.h index 08297dded..bb206ac9f 100644 --- a/src/server/acl/acl_log.h +++ b/src/server/acl/acl_log.h @@ -8,12 +8,11 @@ #include #include -#include "base/flags.h" -#include "server/conn_context.h" +namespace dfly { -ABSL_DECLARE_FLAG(size_t, acllog_max_len); +class ConnectionContext; -namespace dfly::acl { +namespace acl { class AclLog { public: @@ -49,4 +48,5 @@ class AclLog { size_t total_entries_allowed_; }; -} // namespace dfly::acl +} // namespace acl +} // namespace dfly diff --git a/src/server/acl/validator.h b/src/server/acl/validator.h index 89d6f77f4..d9d6d3745 100644 --- a/src/server/acl/validator.h +++ b/src/server/acl/validator.h @@ -6,17 +6,18 @@ #include +#include "facade/facade_types.h" #include "server/acl/acl_log.h" #include "server/command_registry.h" -#include "server/conn_context.h" namespace dfly::acl { +class AclKeys; + std::pair IsUserAllowedToInvokeCommandGeneric( - const std::vector& acl_commands, const AclKeys& keys, CmdArgList tail_args, + const std::vector& acl_commands, const AclKeys& keys, facade::CmdArgList tail_args, const CommandId& id); bool IsUserAllowedToInvokeCommand(const ConnectionContext& cntx, const CommandId& id, - CmdArgList tail_args); - + facade::CmdArgList tail_args); } // namespace dfly::acl diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index bd1627d2c..9de3ca9a9 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -4,8 +4,10 @@ #include "server/cluster/incoming_slot_migration.h" -#include "absl/cleanup/cleanup.h" -#include "absl/strings/str_cat.h" +#include +#include + +#include "base/flags.h" #include "base/logging.h" #include "cluster_utility.h" #include "server/error.h" diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index e860fa689..767dc443f 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -479,6 +479,18 @@ OpResult DbSlice::FindInternal(const Context& cntx, std: fetched_items_.insert(res.it->first.AsRef()); } + // If the value has a pending stash, cancel it before any modification are applied. + // Rationale: we either look it up for reads - and then it's hot, or alternatively, + // we follow up with modifications during mutation operations, and in that case storing on disk + // does not make much sense. + if (res.it->second.HasStashPending()) { + owner_->tiered_storage()->CancelStash(cntx.db_index, key, &res.it->second); + } + + // Mark this entry as being looked up. We use key (first) deliberately to preserve the hotness + // attribute of the entry in case of value overtides. + res.it->first.SetTouched(true); + db.top_keys.Touch(key); std::move(update_stats_on_miss).Cancel(); @@ -491,6 +503,11 @@ OpResult DbSlice::FindInternal(const Context& cntx, std: if (cluster::IsClusterEnabled()) { db.slots_stats[cluster::KeySlot(key)].total_reads++; } + if (res.it->second.IsExternal()) { + events_.ram_misses++; + } else { + events_.ram_hits++; + } break; } return res; @@ -976,13 +993,6 @@ void DbSlice::PreUpdate(DbIndex db_ind, Iterator it, std::string_view key) { DVLOG(2) << "Running callbacks in dbid " << db_ind; CallChangeCallbacks(db_ind, ChangeReq{it.GetInnerIt()}); - // If the value has a pending stash, cancel it before any modification are applied. - // Note: we don't delete offloaded values before updates, because a read-modify operation (like - // append) can be applied instead of a full overwrite. Deleting is reponsibility of the commands - if (it.IsOccupied() && it->second.HasIoPending()) { - owner_->tiered_storage()->CancelStash(db_ind, key, &it->second); - } - it.GetInnerIt().SetVersion(NextVersion()); } @@ -1401,7 +1411,7 @@ void DbSlice::ClearOffloadedEntries(absl::Span indices, const DbT cursor = db_ptr->prime.Traverse(cursor, [&](PrimeIterator it) { if (it->second.IsExternal()) { tiered_storage->Delete(index, &it->second); - } else if (it->second.HasIoPending()) { + } else if (it->second.HasStashPending()) { tiered_storage->CancelStash(index, it->first.GetSlice(&scratch), &it->second); } }); diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 6bac876d6..5efe48211 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -53,6 +53,8 @@ void CopyValueToBuffer(const PrimeValue& pv, char* dest) { string GetString(const PrimeValue& pv) { string res; + DCHECK_EQ(pv.ObjType(), OBJ_STRING); + if (pv.ObjType() != OBJ_STRING) return res; res.resize(pv.Size()); diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 73ea7b527..50308e4fe 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -30,7 +30,7 @@ using namespace std; ABSL_DECLARE_FLAG(string, dbfilename); ABSL_DECLARE_FLAG(uint32_t, num_shards); ABSL_FLAG(bool, force_epoll, false, "If true, uses epoll api instead iouring to run tests"); - +ABSL_DECLARE_FLAG(size_t, acllog_max_len); namespace dfly { std::ostream& operator<<(std::ostream& os, const DbStats& stats) { diff --git a/src/server/tiered_storage.cc b/src/server/tiered_storage.cc index 819231614..984c3dc73 100644 --- a/src/server/tiered_storage.cc +++ b/src/server/tiered_storage.cc @@ -17,7 +17,6 @@ #include "base/logging.h" #include "server/common.h" #include "server/db_slice.h" -#include "server/engine_shard_set.h" #include "server/snapshot.h" #include "server/table.h" #include "server/tiering/common.h" @@ -25,8 +24,11 @@ #include "server/tiering/small_bins.h" #include "server/tx_base.h" -ABSL_FLAG(bool, tiered_storage_cache_fetched, true, - "WIP: Load results of offloaded reads to memory"); +using namespace facade; + +ABSL_FLAG(uint32_t, tiered_storage_memory_margin, 1_MB, + "In bytes. If memory budget on a shard goes blow this limit, tiering stops " + "hot-loading values into ram."); ABSL_FLAG(unsigned, tiered_storage_write_depth, 50, "Maximum number of concurrent stash requests issued by background offload"); @@ -36,8 +38,6 @@ namespace dfly { using namespace std; using namespace util; -using namespace tiering::literals; - using KeyRef = tiering::OpManager::KeyRef; namespace { @@ -80,14 +80,14 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { public: ShardOpManager(TieredStorage* ts, DbSlice* db_slice, size_t max_size) - : tiering::OpManager{max_size}, ts_{ts}, db_slice_{db_slice} { - cache_fetched_ = absl::GetFlag(FLAGS_tiered_storage_cache_fetched); + : tiering::OpManager{max_size}, ts_{ts}, db_slice_{*db_slice} { + memory_margin_ = absl::GetFlag(FLAGS_tiered_storage_memory_margin); } // Clear IO pending flag for entry void ClearIoPending(OpManager::KeyRef key) { if (auto pv = Find(key); pv) { - pv->SetIoPending(false); + pv->SetStashPending(false); stats_.total_cancels++; } } @@ -99,14 +99,14 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { } DbTableStats* GetDbTableStats(DbIndex dbid) { - return db_slice_->MutableStats(dbid); + return db_slice_.MutableStats(dbid); } private: PrimeValue* Find(OpManager::KeyRef key) { // TODO: Get DbContext for transaction for correct dbid and time // Bypass all update and stat mechanisms - auto it = db_slice_->GetDBTable(key.first)->prime.Find(key.second); + auto it = db_slice_.GetDBTable(key.first)->prime.Find(key.second); return IsValid(it) ? &it->second : nullptr; } @@ -141,7 +141,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { if (auto pv = Find(key); pv) { RecordAdded(*pv, segment.length, GetDbTableStats(key.first)); - pv->SetIoPending(false); + pv->SetStashPending(false); pv->SetExternal(segment.offset, segment.length); stats_.total_stashes++; @@ -154,7 +154,11 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { SetExternal({sub_dbid, sub_key}, sub_segment); } - bool cache_fetched_ = false; + bool HasEnoughMemoryMargin(int64_t value_len) { + return db_slice_.memory_budget() - memory_margin_ - value_len > 0; + } + + int64_t memory_margin_ = 0; struct { size_t total_stashes = 0, total_cancels = 0, total_fetches = 0; @@ -162,7 +166,7 @@ class TieredStorage::ShardOpManager : public tiering::OpManager { } stats_; TieredStorage* ts_; - DbSlice* db_slice_; + DbSlice& db_slice_; }; void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, string_view page) { @@ -173,7 +177,7 @@ void TieredStorage::ShardOpManager::Defragment(tiering::DiskSegment segment, str auto predicate = [item_segment](const PrimeKey& key, const PrimeValue& probe) { return probe.IsExternal() && tiering::DiskSegment{probe.GetExternalSlice()} == item_segment; }; - auto it = db_slice_->GetDBTable(dbid)->prime.FindFirst(hash, predicate); + auto it = db_slice_.GetDBTable(dbid)->prime.FindFirst(hash, predicate); if (!IsValid(it)) continue; @@ -200,7 +204,8 @@ bool TieredStorage::ShardOpManager::NotifyFetched(EntryId id, string_view value, // the snapshotting. // TODO: to revisit this when we rewrite it with more efficient snapshotting algorithm. - bool should_upload = modified || (cache_fetched_ && !SliceSnapshot::IsSnaphotInProgress()); + bool should_upload = + modified || (HasEnoughMemoryMargin(value.size()) && !SliceSnapshot::IsSnaphotInProgress()); if (!should_upload) return false; @@ -332,7 +337,7 @@ bool TieredStorage::TryStash(DbIndex dbid, string_view key, PrimeValue* value) { } StringOrView raw_string = value->GetRawString(); - value->SetIoPending(true); + value->SetStashPending(true); tiering::OpManager::EntryId id; error_code ec; @@ -364,13 +369,13 @@ void TieredStorage::Delete(DbIndex dbid, PrimeValue* value) { } void TieredStorage::CancelStash(DbIndex dbid, std::string_view key, PrimeValue* value) { - DCHECK(value->HasIoPending()); + DCHECK(value->HasStashPending()); if (OccupiesWholePages(value->Size())) { op_manager_->Delete(KeyRef(dbid, key)); } else if (auto bin = bins_->Delete(dbid, key); bin) { op_manager_->Delete(*bin); } - value->SetIoPending(false); + value->SetStashPending(false); } float TieredStorage::WriteDepthUsage() const { @@ -423,11 +428,18 @@ void TieredStorage::RunOffloading(DbIndex dbid) { disk_stats.max_file_size) return; - auto cb = [this, dbid, tmp = std::string{}](PrimeIterator it) mutable { - TryStash(dbid, it->first.GetSlice(&tmp), &it->second); + string tmp; + auto cb = [this, dbid, &tmp](PrimeIterator it) mutable { + if (ShouldStash(it->second)) { + if (it->first.WasTouched()) { + it->first.SetTouched(false); + } else { + TryStash(dbid, it->first.GetSlice(&tmp), &it->second); + } + } }; - PrimeTable& table = op_manager_->db_slice_->GetDBTable(dbid)->prime; + PrimeTable& table = op_manager_->db_slice_.GetDBTable(dbid)->prime; PrimeTable::Cursor start_cursor{}; // Loop while we haven't traversed all entries or reached our stash io device limit. @@ -442,7 +454,7 @@ void TieredStorage::RunOffloading(DbIndex dbid) { bool TieredStorage::ShouldStash(const PrimeValue& pv) const { auto disk_stats = op_manager_->GetStats().disk_stats; - return !pv.IsExternal() && !pv.HasIoPending() && pv.ObjType() == OBJ_STRING && + return !pv.IsExternal() && !pv.HasStashPending() && pv.ObjType() == OBJ_STRING && pv.Size() >= kMinValueSize && disk_stats.allocated_bytes + tiering::kPageSize + pv.Size() < disk_stats.max_file_size; } diff --git a/src/server/tiered_storage_test.cc b/src/server/tiered_storage_test.cc index 3d42214d3..5194da8df 100644 --- a/src/server/tiered_storage_test.cc +++ b/src/server/tiered_storage_test.cc @@ -23,8 +23,6 @@ using namespace testing; ABSL_DECLARE_FLAG(bool, force_epoll); ABSL_DECLARE_FLAG(string, tiered_prefix); -ABSL_DECLARE_FLAG(bool, tiered_storage_cache_fetched); -ABSL_DECLARE_FLAG(bool, backing_file_direct); ABSL_DECLARE_FLAG(float, tiered_offload_threshold); ABSL_DECLARE_FLAG(unsigned, tiered_storage_write_depth); @@ -53,8 +51,6 @@ class TieredStorageTest : public BaseFamilyTest { if (GetFlag(FLAGS_tiered_prefix).empty()) { SetFlag(&FLAGS_tiered_prefix, "/tmp/tiered_storage_test"); } - SetFlag(&FLAGS_tiered_storage_cache_fetched, true); - SetFlag(&FLAGS_backing_file_direct, true); BaseFamilyTest::SetUp(); } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 2523ea425..393a08b9a 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -6,6 +6,7 @@ #include +#include "base/flags.h" #include "base/logging.h" #include "facade/op_status.h" #include "redis/redis_aux.h"