diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 5643384fd..da6f1e4e4 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -78,13 +78,6 @@ bool MatchHttp11Line(string_view line) { constexpr size_t kMinReadSize = 256; constexpr size_t kMaxReadSize = 32_KB; -struct PubMsgRecord { - Connection::PubMessage pub_msg; - - PubMsgRecord(Connection::PubMessage pmsg) : pub_msg(move(pmsg)) { - } -}; - #ifdef ABSL_HAVE_ADDRESS_SANITIZER constexpr size_t kReqStorageSize = 88; #else @@ -95,7 +88,7 @@ thread_local uint32_t free_req_release_weight = 0; } // namespace -thread_local vector Connection::free_req_pool_; +thread_local vector Connection::pipeline_req_pool_; struct Connection::Shutdown { absl::flat_hash_map map; @@ -116,19 +109,19 @@ struct Connection::RequestDeleter { void operator()(Request* req) const; }; +using PubMessage = Connection::PubMessage; +using MonitorMessage = std::string; + // Please note: The call to the Dtor is mandatory for this!! // This class contain types that don't have trivial destructed objects class Connection::Request { public: - using MonitorMessage = std::string; - - struct PipelineMsg { - // I do not use mi_heap_t explicitly but mi_stl_allocator at the end does the same job - // of using the thread's heap. + struct PipelineMessage { + // mi_stl_allocator uses mi heap internally. // The capacity is chosen so that we allocate a fully utilized (256 bytes) block. using StorageType = absl::InlinedVector>; - PipelineMsg(size_t nargs, size_t capacity) : args(nargs), storage(capacity) { + PipelineMessage(size_t nargs, size_t capacity) : args(nargs), storage(capacity) { } void Reset(size_t nargs, size_t capacity); @@ -137,7 +130,7 @@ class Connection::Request { StorageType storage; }; - using MessagePayload = std::variant; + using MessagePayload = std::variant; // Overload to create the a new pipeline message static RequestPtr New(mi_heap_t* heap, const RespVec& args, size_t capacity); @@ -150,26 +143,29 @@ class Connection::Request { void Emplace(const RespVec& args, size_t capacity); - MessagePayload payload; - size_t StorageCapacity() const; bool IsPipelineMsg() const; private: - static constexpr size_t kSizeOfPipelineMsg = sizeof(PipelineMsg); + static constexpr size_t kSizeOfPipelineMsg = sizeof(PipelineMessage); - Request(size_t nargs, size_t capacity) : payload(PipelineMsg{nargs, capacity}) { + Request(size_t nargs, size_t capacity) : payload(PipelineMessage{nargs, capacity}) { } - Request(PubMsgRecord msg) : payload(std::move(msg)) { + Request(PubMessage msg) : payload(move(msg)) { } - Request(MonitorMessage msg) : payload(std::move(msg)) { + Request(MonitorMessage msg) : payload(move(msg)) { } Request(const Request&) = delete; + + // Store arguments for pipeline message. void SetArgs(const RespVec& args); + + public: + MessagePayload payload; }; Connection::PubMessage::PubMessage(string pattern, shared_ptr channel, @@ -186,18 +182,18 @@ struct Connection::DispatchOperations { : stats{me->service_->GetThreadLocalConnectionStats()}, builder{b}, self(me) { } - void operator()(const PubMsgRecord& msg); - void operator()(Request::PipelineMsg& msg); - void operator()(const Request::MonitorMessage& msg); + void operator()(const PubMessage& msg); + void operator()(Request::PipelineMessage& msg); + void operator()(const MonitorMessage& msg); ConnectionStats* stats = nullptr; SinkReplyBuilder* builder = nullptr; Connection* self = nullptr; }; -Connection::RequestPtr Connection::Request::New(std::string msg) { +Connection::RequestPtr Connection::Request::New(MonitorMessage msg) { void* ptr = mi_malloc(sizeof(Request)); - Request* req = new (ptr) Request(std::move(msg)); + Request* req = new (ptr) Request(move(msg)); return Connection::RequestPtr{req, Connection::RequestDeleter{}}; } @@ -213,9 +209,20 @@ Connection::RequestPtr Connection::Request::New(mi_heap_t* heap, const RespVec& return Connection::RequestPtr{req, Connection::RequestDeleter{}}; } +Connection::RequestPtr Connection::Request::New(PubMessage pub_msg) { + // This will generate a new request for pubsub message + // Please note that unlike the above case, we don't need to "protect", the internals here + // since we are currently using a borrow token for it - i.e. the BlockingCounter will + // ensure that the message is not deleted until we are finish sending it at the other + // side of the queue + void* ptr = mi_malloc(sizeof(Request)); + Request* req = new (ptr) Request(move(pub_msg)); + return Connection::RequestPtr{req, Connection::RequestDeleter{}}; +} + void Connection::Request::SetArgs(const RespVec& args) { - // At this point we know that we have PipelineMsg in Request so next op is safe. - PipelineMsg& pipeline_msg = std::get(payload); + // At this point we know that we have PipelineMessage in Request so next op is safe. + PipelineMessage& pipeline_msg = std::get(payload); auto* next = pipeline_msg.storage.data(); for (size_t i = 0; i < args.size(); ++i) { auto buf = args[i].GetBuf(); @@ -226,34 +233,22 @@ void Connection::Request::SetArgs(const RespVec& args) { } } -Connection::RequestPtr Connection::Request::New(PubMessage pub_msg) { - // This will generate a new request for pubsub message - // Please note that unlike the above case, we don't need to "protect", the internals here - // since we are currently using a borrow token for it - i.e. the BlockingCounter will - // ensure that the message is not deleted until we are finish sending it at the other - // side of the queue - PubMsgRecord new_msg{move(pub_msg)}; - void* ptr = mi_malloc(sizeof(Request)); - Request* req = new (ptr) Request(std::move(new_msg)); - return Connection::RequestPtr{req, Connection::RequestDeleter{}}; -} - void Connection::RequestDeleter::operator()(Request* req) const { req->~Request(); mi_free(req); } void Connection::Request::Emplace(const RespVec& args, size_t capacity) { - PipelineMsg* msg = get_if(&payload); + PipelineMessage* msg = get_if(&payload); if (msg) { msg->Reset(args.size(), capacity); } else { - payload = PipelineMsg{args.size(), capacity}; + payload = PipelineMessage{args.size(), capacity}; } SetArgs(args); } -void Connection::Request::PipelineMsg::Reset(size_t nargs, size_t capacity) { +void Connection::Request::PipelineMessage::Reset(size_t nargs, size_t capacity) { storage.resize(capacity); args.resize(nargs); } @@ -262,8 +257,8 @@ template struct Overloaded : Ts... { using Ts::operator()...; }; template Overloaded(Ts...) -> Overloaded; size_t Connection::Request::StorageCapacity() const { - return std::visit(Overloaded{[](const PubMsgRecord& msg) -> size_t { return 0; }, - [](const PipelineMsg& arg) -> size_t { + return std::visit(Overloaded{[](const PubMessage& msg) -> size_t { return 0; }, + [](const PipelineMessage& arg) -> size_t { return arg.storage.capacity() + arg.args.capacity(); }, [](const MonitorMessage& arg) -> size_t { return arg.capacity(); }}, @@ -271,18 +266,17 @@ size_t Connection::Request::StorageCapacity() const { } bool Connection::Request::IsPipelineMsg() const { - return std::get_if(&payload) != nullptr; + return std::get_if(&payload) != nullptr; } -void Connection::DispatchOperations::operator()(const Request::MonitorMessage& msg) { +void Connection::DispatchOperations::operator()(const MonitorMessage& msg) { RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder; rbuilder->SendSimpleString(msg); } -void Connection::DispatchOperations::operator()(const PubMsgRecord& msg) { +void Connection::DispatchOperations::operator()(const PubMessage& pub_msg) { RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder; ++stats->async_writes_cnt; - const PubMessage& pub_msg = msg.pub_msg; string_view arr[4]; if (pub_msg.type == PubMessage::kPublish) { if (pub_msg.pattern.empty()) { @@ -309,7 +303,7 @@ void Connection::DispatchOperations::operator()(const PubMsgRecord& msg) { } } -void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) { +void Connection::DispatchOperations::operator()(Request::PipelineMessage& msg) { ++stats->pipelined_cmd_cnt; self->pipeline_msg_cnt_--; @@ -325,7 +319,7 @@ void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) { Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx, ServiceInterface* service) - : io_buf_(kMinReadSize), http_listener_(http_listener), ctx_(ctx), service_(service) { + : io_buf_(kMinReadSize), http_listener_(http_listener), ctx_(ctx), service_(service), name_{} { static atomic_uint32_t next_id{1}; protocol_ = protocol; @@ -344,8 +338,6 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, creation_time_ = time(nullptr); last_interaction_ = creation_time_; - memset(name_, 0, sizeof(name_)); - memset(phase_, 0, sizeof(phase_)); id_ = next_id.fetch_add(1, memory_order_relaxed); } @@ -356,8 +348,8 @@ Connection::~Connection() { void Connection::OnShutdown() { VLOG(1) << "Connection::OnShutdown"; - if (shutdown_) { - for (const auto& k_v : shutdown_->map) { + if (shutdown_cb_) { + for (const auto& k_v : shutdown_cb_->map) { k_v.second(); } } @@ -384,17 +376,17 @@ void Connection::OnPostMigrateThread() { } auto Connection::RegisterShutdownHook(ShutdownCb cb) -> ShutdownHandle { - if (!shutdown_) { - shutdown_ = make_unique(); + if (!shutdown_cb_) { + shutdown_cb_ = make_unique(); } - return shutdown_->Add(std::move(cb)); + return shutdown_cb_->Add(std::move(cb)); } void Connection::UnregisterShutdownHook(ShutdownHandle id) { - if (shutdown_) { - shutdown_->Remove(id); - if (shutdown_->map.empty()) - shutdown_.reset(); + if (shutdown_cb_) { + shutdown_cb_->Remove(id); + if (shutdown_cb_->map.empty()) + shutdown_cb_.reset(); } } @@ -465,11 +457,11 @@ void Connection::HandleRequests() { VLOG(1) << "Closed connection for peer " << remote_ep; } -void Connection::RegisterOnBreak(BreakerCb breaker_cb) { +void Connection::RegisterBreakHook(BreakerCb breaker_cb) { breaker_cb_ = breaker_cb; } -void Connection::SendMsgVecAsync(PubMessage pub_msg) { +void Connection::SendPubMessageAsync(PubMessage pub_msg) { DCHECK(cc_); if (cc_->conn_closing) { @@ -502,12 +494,15 @@ string Connection::GetClientInfo(unsigned thread_id) const { getsockopt(lsb->native_handle(), SOL_SOCKET, SO_INCOMING_CPU, &cpu, &len); int my_cpu_id = sched_getcpu(); + static constexpr string_view PHASE_NAMES[] = {"readsock", "process"}; + static_assert(PHASE_NAMES[PROCESS] == "process"); + absl::StrAppend(&res, "id=", id_, " addr=", re.address().to_string(), ":", re.port()); absl::StrAppend(&res, " laddr=", le.address().to_string(), ":", le.port()); absl::StrAppend(&res, " fd=", lsb->native_handle(), " name=", name_); absl::StrAppend(&res, " tid=", thread_id, " irqmatch=", int(cpu == my_cpu_id)); absl::StrAppend(&res, " age=", now - creation_time_, " idle=", now - last_interaction_); - absl::StrAppend(&res, " phase=", phase_); + absl::StrAppend(&res, " phase=", PHASE_NAMES[phase_]); if (cc_) { string cc_info = service_->GetContextInfo(cc_.get()); @@ -575,7 +570,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { // At the start we read from the socket to determine the HTTP/Memstore protocol. // Therefore we may already have some data in the buffer. if (io_buf_.InputLen() > 0) { - SetPhase("process"); + phase_ = PROCESS; if (redis_parser_) { parse_status = ParseRedis(); } else { @@ -647,10 +642,10 @@ auto Connection::ParseRedis() -> ParserStatus { mi_heap_t* tlh = mi_heap_get_backing(); do { - result = redis_parser_->Parse(io_buf_.InputBuffer(), &consumed, &parse_args_); + result = redis_parser_->Parse(io_buf_.InputBuffer(), &consumed, &tmp_parse_args_); - if (result == RedisParser::OK && !parse_args_.empty()) { - RespExpr& first = parse_args_.front(); + if (result == RedisParser::OK && !tmp_parse_args_.empty()) { + RespExpr& first = tmp_parse_args_.front(); if (first.type == RespExpr::STRING) { DVLOG(2) << "Got Args with first token " << ToSV(first.GetBuf()); } @@ -662,28 +657,18 @@ auto Connection::ParseRedis() -> ParserStatus { bool is_sync_dispatch = !cc_->async_dispatch && !cc_->force_dispatch; if (dispatch_q_.empty() && is_sync_dispatch && consumed >= io_buf_.InputLen()) { // Gradually release the request pool. - // The request pool is shared by all the connections in the thread so we do not want - // to release it aggressively just because some connection is running in - // non-pipelined mode. So we wait at least N times, - // where N is the number of connections in the thread. - if (!free_req_pool_.empty()) { - ++free_req_release_weight; - if (free_req_release_weight > stats_->num_conns) { - free_req_release_weight = 0; - stats_->pipeline_cache_capacity -= free_req_pool_.back()->StorageCapacity(); - free_req_pool_.pop_back(); - } - } - RespToArgList(parse_args_, &cmd_vec_); + ShrinkPipelinePool(); - DVLOG(2) << "Sync dispatch " << ToSV(cmd_vec_.front()); + RespToArgList(tmp_parse_args_, &tmp_cmd_vec_); - CmdArgList cmd_list{cmd_vec_.data(), cmd_vec_.size()}; + DVLOG(2) << "Sync dispatch " << ToSV(tmp_cmd_vec_.front()); + + CmdArgList cmd_list{tmp_cmd_vec_.data(), tmp_cmd_vec_.size()}; service_->DispatchCommand(cmd_list, cc_.get()); last_interaction_ = time(nullptr); } else { // Dispatch via queue to speedup input reading. - RequestPtr req = FromArgs(std::move(parse_args_), tlh); + RequestPtr req = FromArgs(std::move(tmp_parse_args_), tlh); ++pipeline_msg_cnt_; dispatch_q_.push_back(std::move(req)); if (dispatch_q_.size() == 1) { @@ -783,7 +768,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant recv_sz = peer->Recv(append_buf); last_interaction_ = time(nullptr); @@ -797,7 +782,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variantio_read_bytes += *recv_sz; ++stats_->io_read_cnt; - SetPhase("process"); + phase_ = PROCESS; if (redis_parser_) { parse_status = ParseRedis(); @@ -864,7 +849,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) { if (req->IsPipelineMsg() && stats_->pipeline_cache_capacity < request_cache_limit) { stats_->pipeline_cache_capacity += req->StorageCapacity(); - free_req_pool_.push_back(std::move(req)); + pipeline_req_pool_.push_back(std::move(req)); } } @@ -889,25 +874,48 @@ auto Connection::FromArgs(RespVec args, mi_heap_t* heap) -> RequestPtr { RequestPtr req; - if (free_req_pool_.empty()) { - req = Request::New(heap, args, backed_sz); - } else { - free_req_release_weight = 0; // Reset the release weight. - req = move(free_req_pool_.back()); - stats_->pipeline_cache_capacity -= req->StorageCapacity(); - - free_req_pool_.pop_back(); + if (req = GetFromPipelinePool(); req) { req->Emplace(move(args), backed_sz); + } else { + req = Request::New(heap, args, backed_sz); } return req; } +void Connection::ShrinkPipelinePool() { + if (pipeline_req_pool_.empty()) + return; + + // The request pool is shared by all the connections in the thread so we do not want + // to release it aggressively just because some connection is running in + // non-pipelined mode. So by using free_req_release_weight we wait at least N times, + // where N is the number of connections in the thread. + ++free_req_release_weight; + + if (free_req_release_weight > stats_->num_conns) { + free_req_release_weight = 0; + stats_->pipeline_cache_capacity -= pipeline_req_pool_.back()->StorageCapacity(); + pipeline_req_pool_.pop_back(); + } +} + +Connection::RequestPtr Connection::GetFromPipelinePool() { + if (pipeline_req_pool_.empty()) + return {}; + + free_req_release_weight = 0; // Reset the release weight. + RequestPtr req = move(pipeline_req_pool_.back()); + stats_->pipeline_cache_capacity -= req->StorageCapacity(); + pipeline_req_pool_.pop_back(); + return req; +} + void Connection::ShutdownSelf() { util::Connection::Shutdown(); } void Connection::ShutdownThreadLocal() { - free_req_pool_.clear(); + pipeline_req_pool_.clear(); } void RespToArgList(const RespVec& src, CmdArgVec* dest) { @@ -919,7 +927,7 @@ void RespToArgList(const RespVec& src, CmdArgVec* dest) { } } -void Connection::SendMonitorMsg(std::string monitor_msg) { +void Connection::SendMonitorMessageAsync(std::string monitor_msg) { DCHECK(cc_); if (!cc_->conn_closing) { diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 0ff26f465..d7e783d60 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -38,40 +38,30 @@ class RedisParser; class ServiceInterface; class MemcacheParser; +// Connection represents an active connection for a client. +// +// It directly dispatches regular commands from the io-loop. +// For pipelined requests, monitor and pubsub messages it uses +// a separate dispatch queue that is processed on a separate fiber. class Connection : public util::Connection { public: Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx, ServiceInterface* service); ~Connection(); - using error_code = std::error_code; + using BreakerCb = std::function; using ShutdownCb = std::function; using ShutdownHandle = unsigned; - ShutdownHandle RegisterShutdownHook(ShutdownCb cb); - void UnregisterShutdownHook(ShutdownHandle id); - - Protocol protocol() const { - return protocol_; - } - - using BreakerCb = std::function; - void RegisterOnBreak(BreakerCb breaker_cb); - - // This interface is used to pass a published message directly to the socket without - // copying strings. - // Once the msg is sent "bc" will be decreased so that caller could release the underlying - // storage for the message. - // virtual - to allow the testing code to override it. - + // PubSub message, either incoming message for active subscription or reply for new subscription. struct PubMessage { enum Type { kSubscribe, kUnsubscribe, kPublish } type; - std::string pattern{}; // if empty - means its a regular message, otherwise it's pmessage. + std::string pattern{}; // non-empty for pattern subscriber std::shared_ptr channel{}; - std::shared_ptr message{}; // ensure that this message would out live passing - // between different threads/fibers - uint32_t channel_cnt = 0; // relevant only for kSubscribe and kUnsubscribe + std::shared_ptr message{}; + + uint32_t channel_cnt = 0; PubMessage(bool add, std::shared_ptr channel, uint32_t channel_cnt); PubMessage(std::string pattern, std::shared_ptr channel, @@ -82,23 +72,28 @@ class Connection : public util::Connection { PubMessage(PubMessage&&) = default; }; - // this function is overriden at test_utils TestConnection - virtual void SendMsgVecAsync(PubMessage pub_msg); + enum Phase { READ_SOCKET, PROCESS }; - // Note that this is accepted by value because the message is processed asynchronously. - void SendMonitorMsg(std::string monitor_msg); + public: + // Add PubMessage to dispatch queue. + // Virtual because behaviour is overwritten in test_utils. + virtual void SendPubMessageAsync(PubMessage pub_msg); - void SetName(std::string_view name) { - CopyCharBuf(name, sizeof(name_), name_); - } + // Add monitor message to dispatch queue. + void SendMonitorMessageAsync(std::string monitor_msg); - const char* GetName() const { - return name_; - } + // Register hook that is executed on connection shutdown. + ShutdownHandle RegisterShutdownHook(ShutdownCb cb); - void SetPhase(std::string_view phase) { - CopyCharBuf(phase, sizeof(phase_), phase_); - } + void UnregisterShutdownHook(ShutdownHandle id); + + // Register hook that is executen when the connection breaks. + void RegisterBreakHook(BreakerCb breaker_cb); + + // Manually shutdown self. + void ShutdownSelf(); + + static void ShutdownThreadLocal(); std::string GetClientInfo(unsigned thread_id) const; std::string RemoteEndpointStr() const; @@ -106,9 +101,17 @@ class Connection : public util::Connection { std::string LocalBindAddress() const; uint32_t GetClientId() const; - void ShutdownSelf(); + Protocol protocol() const { + return protocol_; + } - static void ShutdownThreadLocal(); + void SetName(std::string name) { + name_ = name; + } + + std::string_view GetName() const { + return name_; + } protected: void OnShutdown() override; @@ -118,68 +121,84 @@ class Connection : public util::Connection { private: enum ParserStatus { OK, NEED_MORE, ERROR }; - void HandleRequests() final; - - static void CopyCharBuf(std::string_view src, unsigned dest_len, char* dest) { - src = src.substr(0, dest_len - 1); - if (!src.empty()) - memcpy(dest, src.data(), src.size()); - dest[src.size()] = '\0'; - } - - // - io::Result CheckForHttpProto(util::FiberSocketBase* peer); - - void ConnectionFlow(util::FiberSocketBase* peer); - std::variant IoLoop(util::FiberSocketBase* peer); - - void DispatchFiber(util::FiberSocketBase* peer); - - ParserStatus ParseRedis(); - ParserStatus ParseMemcache(); - void OnBreakCb(int32_t mask); - - base::IoBuf io_buf_; - std::unique_ptr redis_parser_; - std::unique_ptr memcache_parser_; - util::HttpListenerBase* http_listener_; - SSL_CTX* ctx_; - ServiceInterface* service_; - time_t creation_time_, last_interaction_; - char name_[16]; - char phase_[16]; - - std::unique_ptr cc_; - class Request; struct DispatchOperations; struct DispatchCleanup; struct RequestDeleter; + struct Shutdown; + // Requests are allocated on the mimalloc heap and thus require a custom deleter. using RequestPtr = std::unique_ptr; - // args are passed deliberately by value - to pass the ownership. + private: + // Check protocol and handle connection. + void HandleRequests() final; + + // Start dispatch fiber and run IoLoop. + void ConnectionFlow(util::FiberSocketBase* peer); + + // Main loop reading client messages and passing requests to dispatch queue. + std::variant IoLoop(util::FiberSocketBase* peer); + + // Returns true if HTTP header is detected. + io::Result CheckForHttpProto(util::FiberSocketBase* peer); + + // Handles events from dispatch queue. + void DispatchFiber(util::FiberSocketBase* peer); + + // Create new pipeline request, re-use from pool when possible. RequestPtr FromArgs(RespVec args, mi_heap_t* heap); - std::deque dispatch_q_; // coordinated via evc_. - uint32_t pipeline_msg_cnt_ = 0; + ParserStatus ParseRedis(); + ParserStatus ParseMemcache(); - static thread_local std::vector free_req_pool_; - dfly::EventCount evc_; + void OnBreakCb(int32_t mask); - RespVec parse_args_; - CmdArgVec cmd_vec_; + // Shrink pipeline pool by a little while handling regular commands. + void ShrinkPipelinePool(); + + // Returns non-null request ptr if pool has vacant entries. + RequestPtr GetFromPipelinePool(); + + private: + std::deque dispatch_q_; // dispatch queue + dfly::EventCount evc_; // dispatch queue waker + + base::IoBuf io_buf_; // used in io loop and parsers + std::unique_ptr redis_parser_; + std::unique_ptr memcache_parser_; - unsigned parser_error_ = 0; uint32_t id_; - uint32_t break_poll_id_ = UINT32_MAX; + Protocol protocol_; ConnectionStats* stats_ = nullptr; - Protocol protocol_; + util::HttpListenerBase* http_listener_; + SSL_CTX* ctx_; + + ServiceInterface* service_; + + time_t creation_time_, last_interaction_; + + Phase phase_; + std::string name_; + + std::unique_ptr cc_; + + unsigned parser_error_ = 0; + uint32_t pipeline_msg_cnt_ = 0; + + uint32_t break_poll_id_ = UINT32_MAX; - struct Shutdown; - std::unique_ptr shutdown_; BreakerCb breaker_cb_; + std::unique_ptr shutdown_cb_; + + RespVec tmp_parse_args_; + CmdArgVec tmp_cmd_vec_; + + // Pooled pipieline messages per-thread. + // Aggregated while handling pipelines, + // graudally released while handling regular commands. + static thread_local std::vector pipeline_req_pool_; }; void RespToArgList(const RespVec& src, CmdArgVec* dest); diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index 17daa81f9..90b1fe766 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -127,7 +127,7 @@ void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgLis if (to_reply) { for (size_t i = 0; i < result.size(); ++i) { - owner()->SendMsgVecAsync({to_add, make_shared(ArgS(args, i)), result[i]}); + owner()->SendPubMessageAsync({to_add, make_shared(ArgS(args, i)), result[i]}); } } } diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 486c91d05..6e2bbfc9e 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -183,7 +183,7 @@ void SendMonitor(const std::string& msg) { for (auto monitor_conn : monitors) { // never preempts, so we can iterate safely. - monitor_conn->SendMonitorMsg(msg); + monitor_conn->SendMonitorMessageAsync(msg); } } } @@ -921,7 +921,7 @@ facade::ConnectionContext* Service::CreateContext(util::FiberSocketBase* peer, // a bit of a hack. I set up breaker callback here for the owner. // Should work though it's confusing to have it here. - owner->RegisterOnBreak([res, this](uint32_t) { + owner->RegisterBreakHook([res, this](uint32_t) { if (res->transaction) { res->transaction->BreakOnShutdown(); } @@ -1463,7 +1463,7 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) { while (it != subscribers_ptr->end() && it->thread_id == idx) { facade::Connection* conn = it->conn_cntx->owner(); DCHECK(conn); - conn->SendMsgVecAsync({move(it->pattern), move(channel_ptr), move(msg_ptr)}); + conn->SendPubMessageAsync({move(it->pattern), move(channel_ptr), move(msg_ptr)}); it->borrow_token.Dec(); it++; } diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 2f63aef40..5d55fd825 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1176,13 +1176,13 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) { string_view sub_cmd = ArgS(args, 0); if (sub_cmd == "SETNAME" && args.size() == 2) { - cntx->owner()->SetName(ArgS(args, 1)); + cntx->owner()->SetName(string{ArgS(args, 1)}); return (*cntx)->SendOk(); } if (sub_cmd == "GETNAME") { - const char* name = cntx->owner()->GetName(); - if (*name != 0) { + auto name = cntx->owner()->GetName(); + if (!name.empty()) { return (*cntx)->SendBulkString(name); } else { return (*cntx)->SendNull(); diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 815dc7e70..4805f2cb5 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -61,7 +61,7 @@ TestConnection::TestConnection(Protocol protocol, io::StringSink* sink) : facade::Connection(protocol, nullptr, nullptr, nullptr), sink_(sink) { } -void TestConnection::SendMsgVecAsync(PubMessage pmsg) { +void TestConnection::SendPubMessageAsync(PubMessage pmsg) { if (pmsg.type == PubMessage::kPublish) { messages.push_back(move(pmsg)); } else { diff --git a/src/server/test_utils.h b/src/server/test_utils.h index ea89c986d..e36a8ab71 100644 --- a/src/server/test_utils.h +++ b/src/server/test_utils.h @@ -21,7 +21,7 @@ class TestConnection : public facade::Connection { public: TestConnection(Protocol protocol, io::StringSink* sink); - void SendMsgVecAsync(PubMessage pmsg) final; + void SendPubMessageAsync(PubMessage pmsg) final; std::vector messages;