From 81f0b5034dd58673ccb8c32bdfd8351a0f796df0 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 14 Feb 2023 17:35:01 +0200 Subject: [PATCH] feat(facade): Limit request cache using runtime flag (#793) --- src/facade/dragonfly_connection.cc | 28 +++++++++++++++++++++++++--- src/facade/dragonfly_connection.h | 4 +++- src/server/main_service.cc | 5 ++++- 3 files changed, 32 insertions(+), 5 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 32c5767ea..533fbaebb 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -37,6 +37,9 @@ ABSL_FLAG(std::string, admin_bind, "", "This supports both HTTP and RESP " "protocols"); +ABSL_FLAG(std::uint64_t, request_cache_limit, 1ULL << 26, // 64MB + "Amount of memory to use for request cache in bytes - per IO thread."); + using namespace util; using namespace std; using nonstd::make_unexpected; @@ -94,8 +97,12 @@ constexpr size_t kReqStorageSize = 88; constexpr size_t kReqStorageSize = 120; #endif +thread_local uint32_t free_req_release_weight = 0; + } // namespace +thread_local vector Connection::free_req_pool_; + struct Connection::Shutdown { absl::flat_hash_map map; ShutdownHandle next_handle = 1; @@ -611,8 +618,17 @@ auto Connection::ParseRedis() -> ParserStatus { bool is_sync_dispatch = !cc_->async_dispatch && !cc_->force_dispatch; if (dispatch_q_.empty() && is_sync_dispatch && consumed >= io_buf_.InputLen()) { // Gradually release the request pool. + // The request pool is shared by all the connections in the thread so we do not want + // to release it aggressively just because some connection is running in + // non-pipelined mode. So we wait at least N times, + // where N is the number of connections in the thread. if (!free_req_pool_.empty()) { - free_req_pool_.pop_back(); + ++free_req_release_weight; + if (free_req_release_weight > stats_->num_conns) { + free_req_release_weight = 0; + stats_->pipeline_cache_capacity -= free_req_pool_.back()->StorageCapacity(); + free_req_pool_.pop_back(); + } } RespToArgList(parse_args_, &cmd_vec_); CmdArgList cmd_list{cmd_vec_.data(), cmd_vec_.size()}; @@ -788,6 +804,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) { SinkReplyBuilder* builder = cc_->reply_builder(); DispatchOperations dispatch_op{builder, this}; + uint64_t request_cache_limit = absl::GetFlag(FLAGS_request_cache_limit); while (!builder->GetError()) { evc_.await([this] { return cc_->conn_closing || !dispatch_q_.empty(); }); @@ -798,8 +815,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) { dispatch_q_.pop_front(); std::visit(dispatch_op, req->payload); - // Do not cache more than K items. - if (free_req_pool_.size() < 16) { + if (stats_->pipeline_cache_capacity < request_cache_limit) { stats_->pipeline_cache_capacity += req->StorageCapacity(); free_req_pool_.push_back(std::move(req)); } @@ -829,6 +845,7 @@ auto Connection::FromArgs(RespVec args, mi_heap_t* heap) -> RequestPtr { if (free_req_pool_.empty()) { req = Request::New(heap, args, backed_sz); } else { + free_req_release_weight = 0; // Reset the release weight. req = move(free_req_pool_.back()); stats_->pipeline_cache_capacity -= req->StorageCapacity(); @@ -837,10 +854,15 @@ auto Connection::FromArgs(RespVec args, mi_heap_t* heap) -> RequestPtr { } return req; } + void Connection::ShutdownSelf() { util::Connection::Shutdown(); } +void Connection::ShutdownThreadLocal() { + free_req_pool_.clear(); +} + void RespToArgList(const RespVec& src, CmdArgVec* dest) { dest->resize(src.size()); for (size_t i = 0; i < src.size(); ++i) { diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index ae9e54011..837588108 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -85,6 +85,8 @@ class Connection : public util::Connection { void ShutdownSelf(); + static void ShutdownThreadLocal(); + protected: void OnShutdown() override; void OnPreMigrateThread() override; @@ -137,7 +139,7 @@ class Connection : public util::Connection { RequestPtr FromArgs(RespVec args, mi_heap_t* heap); std::deque dispatch_q_; // coordinated via evc_. - std::vector free_req_pool_; + static thread_local std::vector free_req_pool_; util::fibers_ext::EventCount evc_; RespVec parse_args_; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index ec6010e3d..eb6a763ef 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -505,7 +505,10 @@ void Service::Shutdown() { // We mark that we are shutting down. After this incoming requests will be // rejected - pp_.AwaitFiberOnAll([](ProactorBase* pb) { ServerState::tlocal()->Shutdown(); }); + pp_.AwaitFiberOnAll([](ProactorBase* pb) { + ServerState::tlocal()->Shutdown(); + facade::Connection::ShutdownThreadLocal(); + }); // to shutdown all the runtime components that depend on EngineShard. server_family_.Shutdown();