From ec78c8a2af7d1a3d202d59469bfccfe90f7a937a Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Wed, 17 Nov 2021 12:51:47 +0200 Subject: [PATCH] Add SET command and thread local db slice --- server/CMakeLists.txt | 3 +- server/db_slice.cc | 116 +++++++++++++++++++++++++++++++++ server/db_slice.h | 84 ++++++++++++++++++++++++ server/dragonfly_connection.cc | 4 ++ server/engine_shard_set.cc | 60 +++++++++++++++++ server/engine_shard_set.h | 98 ++++++++++++++++++++++++++++ server/main_service.cc | 36 +++++++++- server/main_service.h | 13 +++- server/op_status.h | 91 ++++++++++++++++++++++++++ server/resp_expr.h | 1 + 10 files changed, 501 insertions(+), 5 deletions(-) create mode 100644 server/db_slice.cc create mode 100644 server/db_slice.h create mode 100644 server/engine_shard_set.cc create mode 100644 server/engine_shard_set.h create mode 100644 server/op_status.h diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 78144bdca..fb5123ade 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -1,7 +1,8 @@ add_executable(dragonfly dfly_main.cc) cxx_link(dragonfly base dragonfly_lib) -add_library(dragonfly_lib dragonfly_listener.cc dragonfly_connection.cc main_service.cc +add_library(dragonfly_lib db_slice.cc dragonfly_listener.cc dragonfly_connection.cc + main_service.cc engine_shard_set.cc redis_parser.cc resp_expr.cc) cxx_link(dragonfly_lib uring_fiber_lib diff --git a/server/db_slice.cc b/server/db_slice.cc new file mode 100644 index 000000000..36aad1514 --- /dev/null +++ b/server/db_slice.cc @@ -0,0 +1,116 @@ +// Copyright 2021, Beeri 15. All rights reserved. +// Author: Roman Gershman (romange@gmail.com) +// + +#include "server/db_slice.h" + +#include +#include + +#include "base/logging.h" +#include "server/engine_shard_set.h" +#include "util/fiber_sched_algo.h" +#include "util/proactor_base.h" + +namespace dfly { + +using namespace boost; +using namespace std; +using namespace util; + +DbSlice::DbSlice(uint32_t index, EngineShard* owner) : shard_id_(index), owner_(owner) { + db_arr_.emplace_back(); + CreateDbRedis(0); +} + +DbSlice::~DbSlice() { + for (auto& db : db_arr_) { + if (!db.main_table) + continue; + db.main_table.reset(); + } +} + +void DbSlice::Reserve(DbIndex db_ind, size_t key_size) { + ActivateDb(db_ind); + + auto& db = db_arr_[db_ind]; + DCHECK(db.main_table); + + db.main_table->reserve(key_size); +} + +auto DbSlice::Find(DbIndex db_index, std::string_view key) const -> OpResult { + DCHECK_LT(db_index, db_arr_.size()); + DCHECK(db_arr_[db_index].main_table); + + auto& db = db_arr_[db_index]; + MainIterator it = db.main_table->find(key); + + if (it == db.main_table->end()) { + return OpStatus::KEY_NOTFOUND; + } + + return it; +} + +auto DbSlice::AddOrFind(DbIndex db_index, std::string_view key) -> pair { + DCHECK_LT(db_index, db_arr_.size()); + DCHECK(db_arr_[db_index].main_table); + + auto& db = db_arr_[db_index]; + + pair res = db.main_table->emplace(key, MainValue{}); + if (res.second) { // new entry + db.stats.obj_memory_usage += res.first->first.capacity(); + + return make_pair(res.first, true); + } + + return res; +} + +void DbSlice::ActivateDb(DbIndex db_ind) { + if (db_arr_.size() <= db_ind) + db_arr_.resize(db_ind + 1); + CreateDbRedis(db_ind); +} + +void DbSlice::CreateDbRedis(unsigned index) { + auto& db = db_arr_[index]; + if (!db.main_table) { + db.main_table.reset(new MainTable); + } +} + +void DbSlice::AddNew(DbIndex db_ind, std::string_view key, MainValue obj, uint64_t expire_at_ms) { + CHECK(AddIfNotExist(db_ind, key, std::move(obj), expire_at_ms)); +} + +bool DbSlice::AddIfNotExist(DbIndex db_ind, std::string_view key, MainValue obj, + uint64_t expire_at_ms) { + auto& db = db_arr_[db_ind]; + + auto [new_entry, success] = db.main_table->emplace(key, obj); + if (!success) + return false; // in this case obj won't be moved and will be destroyed during unwinding. + + db.stats.obj_memory_usage += (new_entry->first.capacity() + new_entry->second.capacity()); + + if (expire_at_ms) { + // TODO + } + + return true; +} + +size_t DbSlice::DbSize(DbIndex db_ind) const { + DCHECK_LT(db_ind, db_array_size()); + + if (IsDbValid(db_ind)) { + return db_arr_[db_ind].main_table->size(); + } + return 0; +} + +} // namespace dfly diff --git a/server/db_slice.h b/server/db_slice.h new file mode 100644 index 000000000..99293172c --- /dev/null +++ b/server/db_slice.h @@ -0,0 +1,84 @@ +// Copyright 2021, Beeri 15. All rights reserved. +// Author: Roman Gershman (romange@gmail.com) +// + +#pragma once + +#include +#include + +#include "server/op_status.h" + +namespace util { +class ProactorBase; +} + +namespace dfly { + +class EngineShard; + +class DbSlice { + struct InternalDbStats { + // Object memory usage besides hash-table capacity. + size_t obj_memory_usage = 0; + }; + + public: + using MainValue = std::string; + using MainTable = absl::flat_hash_map; + using MainIterator = MainTable::iterator; + using ShardId = uint16_t; + using DbIndex = uint16_t; + + DbSlice(uint32_t index, EngineShard* owner); + ~DbSlice(); + + + // Activates `db_ind` database if it does not exist (see ActivateDb below). + void Reserve(DbIndex db_ind, size_t key_size); + + OpResult Find(DbIndex db_index, std::string_view key) const; + + // Return .second=true if insertion ocurred, false if we return the existing key. + std::pair AddOrFind(DbIndex db_ind, std::string_view key); + + // Adds a new entry. Requires: key does not exist in this slice. + void AddNew(DbIndex db_ind, std::string_view key, MainValue obj, uint64_t expire_at_ms); + + // Adds a new entry if a key does not exists. Returns true if insertion took place, + // false otherwise. expire_at_ms equal to 0 - means no expiry. + bool AddIfNotExist(DbIndex db_ind, std::string_view key, MainValue obj, uint64_t expire_at_ms); + + // Creates a database with index `db_ind`. If such database exists does nothing. + void ActivateDb(DbIndex db_ind); + + size_t db_array_size() const { + return db_arr_.size(); + } + + bool IsDbValid(DbIndex id) const { + return bool(db_arr_[id].main_table); + } + + // Returns existing keys count in the db. + size_t DbSize(DbIndex db_ind) const; + + ShardId shard_id() const { return shard_id_;} + + private: + + void CreateDbRedis(unsigned index); + + ShardId shard_id_; + + EngineShard* owner_; + + struct DbRedis { + std::unique_ptr main_table; + mutable InternalDbStats stats; + }; + + std::vector db_arr_; +}; + +} // namespace dfly diff --git a/server/dragonfly_connection.cc b/server/dragonfly_connection.cc index 1e4f56618..7d8a57add 100644 --- a/server/dragonfly_connection.cc +++ b/server/dragonfly_connection.cc @@ -132,6 +132,10 @@ auto Connection::ParseRedis(base::IoBuf* io_buf, util::FiberSocketBase* peer) -> string_view sv = ToSV(first.GetBuf()); if (sv == "PING") { ec = peer->Write(io::Buffer("PONG\r\n")); + } else if (sv == "SET") { + CHECK_EQ(3u, args.size()); + service_->Set(ToSV(args[1].GetBuf()), ToSV(args[2].GetBuf())); + ec = peer->Write(io::Buffer("OK\r\n")); } } io_buf->ConsumeInput(consumed); diff --git a/server/engine_shard_set.cc b/server/engine_shard_set.cc new file mode 100644 index 000000000..8438c1957 --- /dev/null +++ b/server/engine_shard_set.cc @@ -0,0 +1,60 @@ +// Copyright 2021, Beeri 15. All rights reserved. +// Author: Roman Gershman (romange@gmail.com) +// + +#include "server/engine_shard_set.h" + +#include "base/logging.h" +#include "util/fiber_sched_algo.h" +#include "util/varz.h" + +namespace dfly { + +using namespace std; +using namespace boost; +using util::FiberProps; + +thread_local EngineShard* EngineShard::shard_ = nullptr; +constexpr size_t kQueueLen = 64; + +EngineShard::EngineShard(ShardId index) + : db_slice(index, this), queue_(kQueueLen) { + fiber_q_ = fibers::fiber([this, index] { + this_fiber::properties().set_name(absl::StrCat("shard_queue", index)); + queue_.Run(); + }); +} + +EngineShard::~EngineShard() { + queue_.Shutdown(); + fiber_q_.join(); +} + +void EngineShard::InitThreadLocal(ShardId index) { + CHECK(shard_ == nullptr) << index; + shard_ = new EngineShard(index); +} + +void EngineShard::DestroyThreadLocal() { + if (!shard_) + return; + + uint32_t index = shard_->db_slice.shard_id(); + delete shard_; + shard_ = nullptr; + + DVLOG(1) << "Shard reset " << index; +} + +void EngineShardSet::Init(uint32_t sz) { + CHECK_EQ(0u, size()); + + shard_queue_.resize(sz); +} + +void EngineShardSet::InitThreadLocal(ShardId index) { + EngineShard::InitThreadLocal(index); + shard_queue_[index] = EngineShard::tlocal()->GetQueue(); +} + +} // namespace dfly diff --git a/server/engine_shard_set.h b/server/engine_shard_set.h new file mode 100644 index 000000000..5899b5991 --- /dev/null +++ b/server/engine_shard_set.h @@ -0,0 +1,98 @@ +// Copyright 2021, Beeri 15. All rights reserved. +// Author: Roman Gershman (romange@gmail.com) +// + +#pragma once + +#include "server/db_slice.h" +#include "util/fibers/fibers_ext.h" +#include "util/fibers/fiberqueue_threadpool.h" +#include "util/proactor_pool.h" + +namespace dfly { + +using ShardId = uint16_t; + +class EngineShard { + public: + DbSlice db_slice; + + //EngineShard() is private down below. + ~EngineShard(); + + static void InitThreadLocal(ShardId index); + static void DestroyThreadLocal(); + + static EngineShard* tlocal() { + return shard_; + } + + ShardId shard_id() const { + return db_slice.shard_id(); + } + + ::util::fibers_ext::FiberQueue* GetQueue() { + return &queue_; + } + + private: + EngineShard(ShardId index); + + ::util::fibers_ext::FiberQueue queue_; + ::boost::fibers::fiber fiber_q_; + + static thread_local EngineShard* shard_; +}; + +class EngineShardSet { + public: + explicit EngineShardSet(util::ProactorPool* pp) : pp_(pp) { + } + + uint32_t size() const { + return uint32_t(shard_queue_.size()); + } + + util::ProactorPool* pool() { + return pp_; + } + + void Init(uint32_t size); + void InitThreadLocal(ShardId index); + + template auto Await(ShardId sid, F&& f) { + return shard_queue_[sid]->Await(std::forward(f)); + } + + template auto Add(ShardId sid, F&& f) { + assert(sid < shard_queue_.size()); + return shard_queue_[sid]->Add(std::forward(f)); + } + + template void RunBriefInParallel(U&& func); + + private: + util::ProactorPool* pp_; + std::vector shard_queue_; +}; + +/** + * @brief + * + * @tparam U - a function that receives EngineShard* argument and returns void. + * @param func + */ +template void EngineShardSet::RunBriefInParallel(U&& func) { + util::fibers_ext::BlockingCounter bc{size()}; + + for (uint32_t i = 0; i < size(); ++i) { + util::ProactorBase* dest = pp_->at(i); + dest->AsyncBrief([f = std::forward(func), bc]() mutable { + f(EngineShard::tlocal()); + bc.Dec(); + }); + } + bc.Wait(); +} + +} // namespace dfly diff --git a/server/main_service.cc b/server/main_service.cc index 15742a4ad..5a2f67938 100644 --- a/server/main_service.cc +++ b/server/main_service.cc @@ -6,6 +6,7 @@ #include #include +#include #include "base/logging.h" #include "util/uring/uring_fiber_algo.h" @@ -29,34 +30,63 @@ DEFINE_VARZ(VarzMapAverage, request_latency_usec); std::optional engine_varz; +inline ShardId Shard(string_view sv, ShardId shard_num) { + XXH64_hash_t hash = XXH64(sv.data(), sv.size(), 24061983); + return hash % shard_num; +} + } // namespace Service::Service(ProactorPool* pp) - : pp_(*pp) { + : shard_set_(pp), pp_(*pp) { CHECK(pp); engine_varz.emplace("engine", [this] { return GetVarzStats(); }); } Service::~Service() { - engine_varz.reset(); } void Service::Init(util::AcceptServer* acceptor) { + uint32_t shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size(); + shard_set_.Init(shard_num); + + pp_.AwaitOnAll([&](uint32_t index, ProactorBase* pb) { + if (index < shard_count()) { + shard_set_.InitThreadLocal(index); + } + }); + request_latency_usec.Init(&pp_); } void Service::Shutdown() { + engine_varz.reset(); request_latency_usec.Shutdown(); + + shard_set_.RunBriefInParallel([&](EngineShard*) { EngineShard::DestroyThreadLocal(); }); } void Service::RegisterHttp(HttpListenerBase* listener) { CHECK_NOTNULL(listener); } +void Service::Set(std::string_view key, std::string_view val) { + ShardId sid = Shard(key, shard_count()); + shard_set_.Await(sid, [&] { + EngineShard* es = EngineShard::tlocal(); + auto [it, res] = es->db_slice.AddOrFind(0, key); + it->second = val; + }); +} + VarzValue::Map Service::GetVarzStats() { VarzValue::Map res; - res.emplace_back("keys", VarzValue::FromInt(0)); + atomic_ulong num_keys{0}; + shard_set_.RunBriefInParallel([&](EngineShard* es) { + num_keys += es->db_slice.DbSize(0); + }); + res.emplace_back("keys", VarzValue::FromInt(num_keys.load())); return res; } diff --git a/server/main_service.h b/server/main_service.h index fa1d583a0..62d4fc64c 100644 --- a/server/main_service.h +++ b/server/main_service.h @@ -6,6 +6,7 @@ #include "base/varz_value.h" #include "util/http/http_handler.h" +#include "server/engine_shard_set.h" namespace util { class AcceptServer; @@ -26,14 +27,24 @@ class Service { void Shutdown(); + uint32_t shard_count() const { + return shard_set_.size(); + } + + EngineShardSet& shard_set() { + return shard_set_; + } + util::ProactorPool& proactor_pool() { return pp_; } + void Set(std::string_view key, std::string_view val); private: - + base::VarzValue::Map GetVarzStats(); + EngineShardSet shard_set_; util::ProactorPool& pp_; }; diff --git a/server/op_status.h b/server/op_status.h new file mode 100644 index 000000000..44bd5b8d2 --- /dev/null +++ b/server/op_status.h @@ -0,0 +1,91 @@ +// Copyright 2021, Beeri 15. All rights reserved. +// Author: Roman Gershman (romange@gmail.com) +// + +#pragma once + +#include + +namespace dfly { + +enum class OpStatus : uint16_t { + OK, + KEY_NOTFOUND, +}; + +class OpResultBase { + public: + OpResultBase(OpStatus st = OpStatus::OK) : st_(st) { + } + + constexpr explicit operator bool() const { + return st_ == OpStatus::OK; + } + + OpStatus status() const { + return st_; + } + + bool operator==(OpStatus st) const { + return st_ == st; + } + + bool ok() const { + return st_ == OpStatus::OK; + } + + private: + OpStatus st_; +}; + +template class OpResult : public OpResultBase { + public: + OpResult(V v) : v_(std::move(v)) { + } + + using OpResultBase::OpResultBase; + + const V& value() const { + return v_; + } + + V& value() { + return v_; + } + + V value_or(V v) const { + return status() == OpStatus::OK ? v_ : v; + } + + const V* operator->() const { + return &v_; + } + + private: + V v_; +}; + +template <> class OpResult : public OpResultBase { + public: + using OpResultBase::OpResultBase; +}; + +inline bool operator==(OpStatus st, const OpResultBase& ob) { + return ob.operator==(st); +} + +} // namespace dfly + +namespace std { + +template std::ostream& operator<<(std::ostream& os, const dfly::OpResult& res) { + os << int(res.status()); + return os; +} + +inline std::ostream& operator<<(std::ostream& os, const dfly::OpStatus op) { + os << int(op); + return os; +} + +} // namespace std \ No newline at end of file diff --git a/server/resp_expr.h b/server/resp_expr.h index 7975d3a6d..e0fc10132 100644 --- a/server/resp_expr.h +++ b/server/resp_expr.h @@ -46,6 +46,7 @@ inline std::string_view ToSV(const absl::Span& s) { } // namespace dfly namespace std { + ostream& operator<<(ostream& os, const dfly::RespExpr& e); ostream& operator<<(ostream& os, dfly::RespSpan rspan);