diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index a344667f3..91cdacb7c 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -1,8 +1,8 @@ add_executable(dragonfly dfly_main.cc) cxx_link(dragonfly base dragonfly_lib) -add_library(dragonfly_lib command_registry.cc config_flags.cc conn_context.cc db_slice.cc - dragonfly_listener.cc +add_library(dragonfly_lib command_registry.cc common_types.cc config_flags.cc + conn_context.cc db_slice.cc dragonfly_listener.cc dragonfly_connection.cc engine_shard_set.cc main_service.cc memcache_parser.cc redis_parser.cc resp_expr.cc reply_builder.cc) diff --git a/server/command_registry.h b/server/command_registry.h index 16ff2935e..a4a742994 100644 --- a/server/command_registry.h +++ b/server/command_registry.h @@ -10,6 +10,7 @@ #include #include "base/function2.hpp" +#include "server/common_types.h" namespace dfly { @@ -31,9 +32,6 @@ const char* OptName(CommandOpt fl); }; // namespace CO -using MutableStrSpan = absl::Span; -using CmdArgList = absl::Span; - class CommandId { public: using CmdFunc = std::function; @@ -143,9 +141,4 @@ class CommandRegistry { void Command(CmdArgList args, ConnectionContext* cntx); }; -inline std::string_view ArgS(CmdArgList args, size_t i) { - auto arg = args[i]; - return std::string_view(arg.data(), arg.size()); -} - } // namespace dfly diff --git a/server/common_types.cc b/server/common_types.cc new file mode 100644 index 000000000..ab56b5007 --- /dev/null +++ b/server/common_types.cc @@ -0,0 +1,48 @@ +// Copyright 2021, Beeri 15. All rights reserved. +// Author: Roman Gershman (romange@gmail.com) +// + +#include "server/common_types.h" + +#include + +#include "base/logging.h" + +namespace dfly { + +using std::string; + + +string WrongNumArgsError(std::string_view cmd) { + return absl::StrCat("wrong number of arguments for '", cmd, "' command"); +} + +const char kSyntaxErr[] = "syntax error"; +const char kWrongTypeErr[] = "-WRONGTYPE Operation against a key holding the wrong kind of value"; +const char kKeyNotFoundErr[] = "no such key"; +const char kInvalidIntErr[] = "value is not an integer or out of range"; +const char kUintErr[] = "value is out of range, must be positive"; +const char kInvalidFloatErr[] = "value is not a valid float"; +const char kInvalidScoreErr[] = "resulting score is not a number (NaN)"; +const char kDbIndOutOfRangeErr[] = "DB index is out of range"; +const char kInvalidDbIndErr[] = "invalid DB index"; +const char kSameObjErr[] = "source and destination objects are the same"; + +} // namespace dfly + +namespace std { + +ostream& operator<<(ostream& os, dfly::CmdArgList ras) { + os << "["; + if (!ras.empty()) { + for (size_t i = 0; i < ras.size() - 1; ++i) { + os << dfly::ArgS(ras, i) << ","; + } + os << dfly::ArgS(ras, ras.size() - 1); + } + os << "]"; + + return os; +} + +} // namespace std \ No newline at end of file diff --git a/server/common_types.h b/server/common_types.h new file mode 100644 index 000000000..6300dd88c --- /dev/null +++ b/server/common_types.h @@ -0,0 +1,77 @@ +// Copyright 2021, Beeri 15. All rights reserved. +// Author: Roman Gershman (romange@gmail.com) +// + +#pragma once + +#include +#include +#include +#include + +#include +#include + +namespace dfly { + +using DbIndex = uint16_t; +using ShardId = uint16_t; + +using MutableStrSpan = absl::Span; +using CmdArgList = absl::Span; +using CmdArgVec = std::vector; + +constexpr DbIndex kInvalidDbId = DbIndex(-1); +constexpr ShardId kInvalidSid = ShardId(-1); + +struct ConnectionState { + enum Mask : uint32_t { + ASYNC_DISPATCH = 1, // whether a command is handled via async dispatch. + CONN_CLOSING = 2, // could be because of unrecoverable error or planned action. + }; + + uint32_t mask = 0; // A bitmask of Mask values. + + bool IsClosing() const { + return mask & CONN_CLOSING; + } + + bool IsRunViaDispatch() const { + return mask & ASYNC_DISPATCH; + } +}; + +template inline ShardId Shard(const View& v, ShardId shard_num) { + XXH64_hash_t hash = XXH64(v.data(), v.size(), 120577); + return hash % shard_num; +} + +using MainValue = std::string; +using MainTable = absl::flat_hash_map; +using MainIterator = MainTable::iterator; + +class EngineShard; + +inline std::string_view ArgS(CmdArgList args, size_t i) { + auto arg = args[i]; + return std::string_view(arg.data(), arg.size()); +} + +inline void ToUpper(const MutableStrSpan* val) { + for (auto& c : *val) { + c = absl::ascii_toupper(c); + } +} + +inline MutableStrSpan ToMSS(absl::Span span) { + return MutableStrSpan{reinterpret_cast(span.data()), span.size()}; +} + +std::string WrongNumArgsError(std::string_view cmd); + +} // namespace dfly + +namespace std { +ostream& operator<<(ostream& os, dfly::CmdArgList args); + +} // namespace std \ No newline at end of file diff --git a/server/conn_context.h b/server/conn_context.h index 7958066db..9cb70e2be 100644 --- a/server/conn_context.h +++ b/server/conn_context.h @@ -5,6 +5,7 @@ #pragma once #include "server/reply_builder.h" +#include "server/common_types.h" namespace dfly { @@ -26,6 +27,8 @@ class ConnectionContext : public ReplyBuilder { Protocol protocol() const; + ConnectionState conn_state; + private: Connection* owner_; }; diff --git a/server/db_slice.h b/server/db_slice.h index 99293172c..2dc599645 100644 --- a/server/db_slice.h +++ b/server/db_slice.h @@ -4,9 +4,7 @@ #pragma once -#include -#include - +#include "server/common_types.h" #include "server/op_status.h" namespace util { @@ -15,8 +13,6 @@ class ProactorBase; namespace dfly { -class EngineShard; - class DbSlice { struct InternalDbStats { // Object memory usage besides hash-table capacity. @@ -24,12 +20,6 @@ class DbSlice { }; public: - using MainValue = std::string; - using MainTable = absl::flat_hash_map; - using MainIterator = MainTable::iterator; - using ShardId = uint16_t; - using DbIndex = uint16_t; - DbSlice(uint32_t index, EngineShard* owner); ~DbSlice(); diff --git a/server/dragonfly_connection.cc b/server/dragonfly_connection.cc index 12ea3c83f..9724f52ef 100644 --- a/server/dragonfly_connection.cc +++ b/server/dragonfly_connection.cc @@ -4,6 +4,8 @@ #include "server/dragonfly_connection.h" +#include + #include #include "base/io_buf.h" @@ -76,7 +78,7 @@ Connection::Connection(Protocol protocol, Service* service, SSL_CTX* ctx) switch (protocol) { case Protocol::REDIS: - redis_parser_.reset(new RedisParser); + redis_parser_.reset(new RedisParser); break; case Protocol::MEMCACHE: memcache_parser_.reset(new MemcacheParser); @@ -140,6 +142,8 @@ void Connection::HandleRequests() { void Connection::InputLoop(FiberSocketBase* peer) { base::IoBuf io_buf{kMinReadSize}; + + auto dispatch_fb = fibers::fiber(fibers::launch::dispatch, [&] { DispatchFiber(peer); }); ParserStatus status = OK; std::error_code ec; @@ -156,7 +160,7 @@ void Connection::InputLoop(FiberSocketBase* peer) { io_buf.CommitWrite(*recv_sz); if (redis_parser_) - status = ParseRedis(&io_buf); + status = ParseRedis(&io_buf); else { DCHECK(memcache_parser_); status = ParseMemcache(&io_buf); @@ -169,6 +173,10 @@ void Connection::InputLoop(FiberSocketBase* peer) { } } while (peer->IsOpen() && !cc_->ec()); + cc_->conn_state.mask |= ConnectionState::CONN_CLOSING; // Signal dispatch to close. + evc_.notify(); + dispatch_fb.join(); + if (cc_->ec()) { ec = cc_->ec(); } else { @@ -208,8 +216,24 @@ auto Connection::ParseRedis(base::IoBuf* io_buf) -> ParserStatus { DVLOG(2) << "Got Args with first token " << ToSV(first.GetBuf()); } - RespToArgList(args, &arg_vec); - service_->DispatchCommand(CmdArgList{arg_vec.data(), arg_vec.size()}, cc_.get()); + // An optimization to skip dispatch_q_ if no pipelining is identified. + // We use ASYNC_DISPATCH as a lock to avoid out-of-order replies when the + // dispatch fiber pulls the last record but is still processing the command and then this + // fiber enters the condition below and executes out of order. + bool is_sync_dispatch = !cc_->conn_state.IsRunViaDispatch(); + if (dispatch_q_.empty() && is_sync_dispatch && consumed >= io_buf->InputLen()) { + RespToArgList(args, &arg_vec); + service_->DispatchCommand(CmdArgList{arg_vec.data(), arg_vec.size()}, cc_.get()); + } else { + // Dispatch via queue to speedup input reading, + Request* req = FromArgs(std::move(args)); + dispatch_q_.emplace_back(req); + if (dispatch_q_.size() == 1) { + evc_.notify(); + } else if (dispatch_q_.size() > 10) { + this_fiber::yield(); + } + } } io_buf->ConsumeInput(consumed); } while (RedisParser::OK == result && !cc_->ec()); @@ -249,8 +273,15 @@ auto Connection::ParseMemcache(base::IoBuf* io_buf) -> ParserStatus { } } - service_->DispatchMC(cmd, value, cc_.get()); - io_buf->ConsumeInput(total_len); + // An optimization to skip dispatch_q_ if no pipelining is identified. + // We use ASYNC_DISPATCH as a lock to avoid out-of-order replies when the + // dispatch fiber pulls the last record but is still processing the command and then this + // fiber enters the condition below and executes out of order. + bool is_sync_dispatch = (cc_->conn_state.mask & ConnectionState::ASYNC_DISPATCH) == 0; + if (dispatch_q_.empty() && is_sync_dispatch && consumed >= io_buf->InputLen()) { + service_->DispatchMC(cmd, value, cc_.get()); + } + io_buf->ConsumeInput(consumed); } while (!cc_->ec()); parser_error_ = result; @@ -264,4 +295,50 @@ auto Connection::ParseMemcache(base::IoBuf* io_buf) -> ParserStatus { return ERROR; } +// DispatchFiber handles commands coming from the InputLoop. +// Thus, InputLoop can quickly read data from the input buffer, parse it and push +// into the dispatch queue and DispatchFiber will run those commands asynchronously with InputLoop. +// Note: in some cases, InputLoop may decide to dispatch directly and bypass the DispatchFiber. +void Connection::DispatchFiber(util::FiberSocketBase* peer) { + this_fiber::properties().set_name("DispatchFiber"); + + while (!cc_->ec()) { + evc_.await([this] { return cc_->conn_state.IsClosing() || !dispatch_q_.empty(); }); + if (cc_->conn_state.IsClosing()) + break; + + std::unique_ptr req{dispatch_q_.front()}; + dispatch_q_.pop_front(); + + cc_->conn_state.mask |= ConnectionState::ASYNC_DISPATCH; + service_->DispatchCommand(CmdArgList{req->args.data(), req->args.size()}, cc_.get()); + cc_->conn_state.mask &= ~ConnectionState::ASYNC_DISPATCH; + } + + cc_->conn_state.mask |= ConnectionState::CONN_CLOSING; +} + +auto Connection::FromArgs(RespVec args) -> Request* { + DCHECK(!args.empty()); + size_t backed_sz = 0; + for (const auto& arg : args) { + CHECK_EQ(RespExpr::STRING, arg.type); + backed_sz += arg.GetBuf().size(); + } + DCHECK(backed_sz); + + Request* req = new Request{args.size(), backed_sz}; + + auto* next = req->storage.data(); + for (size_t i = 0; i < args.size(); ++i) { + auto buf = args[i].GetBuf(); + size_t s = buf.size(); + memcpy(next, buf.data(), s); + req->args[i] = MutableStrSpan(next, s); + next += s; + } + + return req; +} + } // namespace dfly diff --git a/server/dragonfly_connection.h b/server/dragonfly_connection.h index 7d580cb1f..427a98a10 100644 --- a/server/dragonfly_connection.h +++ b/server/dragonfly_connection.h @@ -4,10 +4,16 @@ #pragma once -#include "util/connection.h" +#include + +#include #include "base/io_buf.h" +#include "server/common_types.h" #include "server/dfly_protocol.h" +#include "server/resp_expr.h" +#include "util/connection.h" +#include "util/fibers/event_count.h" typedef struct ssl_ctx_st SSL_CTX; @@ -30,7 +36,9 @@ class Connection : public util::Connection { ShutdownHandle RegisterShutdownHook(ShutdownCb cb); void UnregisterShutdownHook(ShutdownHandle id); - Protocol protocol() const { return protocol_;} + Protocol protocol() const { + return protocol_; + } protected: void OnShutdown() override; @@ -41,6 +49,7 @@ class Connection : public util::Connection { void HandleRequests() final; void InputLoop(util::FiberSocketBase* peer); + void DispatchFiber(util::FiberSocketBase* peer); ParserStatus ParseRedis(base::IoBuf* buf); ParserStatus ParseMemcache(base::IoBuf* buf); @@ -51,6 +60,19 @@ class Connection : public util::Connection { SSL_CTX* ctx_; std::unique_ptr cc_; + struct Request { + absl::FixedArray args; + absl::FixedArray storage; + + Request(size_t nargs, size_t capacity) : args(nargs), storage(capacity) { + } + Request(const Request&) = delete; + }; + + static Request* FromArgs(RespVec args); + + std::deque dispatch_q_; // coordinated via evc_. + util::fibers_ext::EventCount evc_; unsigned parser_error_ = 0; Protocol protocol_; struct Shutdown; diff --git a/server/main_service.cc b/server/main_service.cc index a1eec8547..8ba5cf0ef 100644 --- a/server/main_service.cc +++ b/server/main_service.cc @@ -18,23 +18,6 @@ DEFINE_uint32(port, 6380, "Redis port"); DEFINE_uint32(memcache_port, 0, "Memcached port"); -namespace std { - -ostream& operator<<(ostream& os, dfly::CmdArgList args) { - os << "["; - if (!args.empty()) { - for (size_t i = 0; i < args.size() - 1; ++i) { - os << dfly::ArgS(args, i) << ","; - } - os << dfly::ArgS(args, args.size() - 1); - } - os << "]"; - - return os; -} - -} // namespace std - namespace dfly { using namespace std; @@ -52,21 +35,6 @@ DEFINE_VARZ(VarzQps, set_qps); std::optional engine_varz; -inline ShardId Shard(string_view sv, ShardId shard_num) { - XXH64_hash_t hash = XXH64(sv.data(), sv.size(), 24061983); - return hash % shard_num; -} - -inline void ToUpper(const MutableStrSpan* val) { - for (auto& c : *val) { - c = absl::ascii_toupper(c); - } -} - -string WrongNumArgsError(string_view cmd) { - return absl::StrCat("wrong number of arguments for '", cmd, "' command"); -} - } // namespace Service::Service(ProactorPool* pp) : shard_set_(pp), pp_(*pp) { @@ -215,7 +183,7 @@ void Service::Get(CmdArgList args, ConnectionContext* cntx) { shard_set_.Await(sid, [&] { EngineShard* es = EngineShard::tlocal(); - OpResult res = es->db_slice.Find(0, key); + OpResult res = es->db_slice.Find(0, key); if (res) { opres.value() = res.value()->second; } else {