mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
feat(facade): Limit request cache using runtime flag (#793)
This commit is contained in:
parent
72bad6c5ab
commit
81f0b5034d
3 changed files with 32 additions and 5 deletions
|
@ -37,6 +37,9 @@ ABSL_FLAG(std::string, admin_bind, "",
|
|||
"This supports both HTTP and RESP "
|
||||
"protocols");
|
||||
|
||||
ABSL_FLAG(std::uint64_t, request_cache_limit, 1ULL << 26, // 64MB
|
||||
"Amount of memory to use for request cache in bytes - per IO thread.");
|
||||
|
||||
using namespace util;
|
||||
using namespace std;
|
||||
using nonstd::make_unexpected;
|
||||
|
@ -94,8 +97,12 @@ constexpr size_t kReqStorageSize = 88;
|
|||
constexpr size_t kReqStorageSize = 120;
|
||||
#endif
|
||||
|
||||
thread_local uint32_t free_req_release_weight = 0;
|
||||
|
||||
} // namespace
|
||||
|
||||
thread_local vector<Connection::RequestPtr> Connection::free_req_pool_;
|
||||
|
||||
struct Connection::Shutdown {
|
||||
absl::flat_hash_map<ShutdownHandle, ShutdownCb> map;
|
||||
ShutdownHandle next_handle = 1;
|
||||
|
@ -611,8 +618,17 @@ auto Connection::ParseRedis() -> ParserStatus {
|
|||
bool is_sync_dispatch = !cc_->async_dispatch && !cc_->force_dispatch;
|
||||
if (dispatch_q_.empty() && is_sync_dispatch && consumed >= io_buf_.InputLen()) {
|
||||
// Gradually release the request pool.
|
||||
// The request pool is shared by all the connections in the thread so we do not want
|
||||
// to release it aggressively just because some connection is running in
|
||||
// non-pipelined mode. So we wait at least N times,
|
||||
// where N is the number of connections in the thread.
|
||||
if (!free_req_pool_.empty()) {
|
||||
free_req_pool_.pop_back();
|
||||
++free_req_release_weight;
|
||||
if (free_req_release_weight > stats_->num_conns) {
|
||||
free_req_release_weight = 0;
|
||||
stats_->pipeline_cache_capacity -= free_req_pool_.back()->StorageCapacity();
|
||||
free_req_pool_.pop_back();
|
||||
}
|
||||
}
|
||||
RespToArgList(parse_args_, &cmd_vec_);
|
||||
CmdArgList cmd_list{cmd_vec_.data(), cmd_vec_.size()};
|
||||
|
@ -788,6 +804,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
|
|||
|
||||
SinkReplyBuilder* builder = cc_->reply_builder();
|
||||
DispatchOperations dispatch_op{builder, this};
|
||||
uint64_t request_cache_limit = absl::GetFlag(FLAGS_request_cache_limit);
|
||||
|
||||
while (!builder->GetError()) {
|
||||
evc_.await([this] { return cc_->conn_closing || !dispatch_q_.empty(); });
|
||||
|
@ -798,8 +815,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
|
|||
dispatch_q_.pop_front();
|
||||
std::visit(dispatch_op, req->payload);
|
||||
|
||||
// Do not cache more than K items.
|
||||
if (free_req_pool_.size() < 16) {
|
||||
if (stats_->pipeline_cache_capacity < request_cache_limit) {
|
||||
stats_->pipeline_cache_capacity += req->StorageCapacity();
|
||||
free_req_pool_.push_back(std::move(req));
|
||||
}
|
||||
|
@ -829,6 +845,7 @@ auto Connection::FromArgs(RespVec args, mi_heap_t* heap) -> RequestPtr {
|
|||
if (free_req_pool_.empty()) {
|
||||
req = Request::New(heap, args, backed_sz);
|
||||
} else {
|
||||
free_req_release_weight = 0; // Reset the release weight.
|
||||
req = move(free_req_pool_.back());
|
||||
stats_->pipeline_cache_capacity -= req->StorageCapacity();
|
||||
|
||||
|
@ -837,10 +854,15 @@ auto Connection::FromArgs(RespVec args, mi_heap_t* heap) -> RequestPtr {
|
|||
}
|
||||
return req;
|
||||
}
|
||||
|
||||
void Connection::ShutdownSelf() {
|
||||
util::Connection::Shutdown();
|
||||
}
|
||||
|
||||
void Connection::ShutdownThreadLocal() {
|
||||
free_req_pool_.clear();
|
||||
}
|
||||
|
||||
void RespToArgList(const RespVec& src, CmdArgVec* dest) {
|
||||
dest->resize(src.size());
|
||||
for (size_t i = 0; i < src.size(); ++i) {
|
||||
|
|
|
@ -85,6 +85,8 @@ class Connection : public util::Connection {
|
|||
|
||||
void ShutdownSelf();
|
||||
|
||||
static void ShutdownThreadLocal();
|
||||
|
||||
protected:
|
||||
void OnShutdown() override;
|
||||
void OnPreMigrateThread() override;
|
||||
|
@ -137,7 +139,7 @@ class Connection : public util::Connection {
|
|||
RequestPtr FromArgs(RespVec args, mi_heap_t* heap);
|
||||
|
||||
std::deque<RequestPtr> dispatch_q_; // coordinated via evc_.
|
||||
std::vector<RequestPtr> free_req_pool_;
|
||||
static thread_local std::vector<RequestPtr> free_req_pool_;
|
||||
util::fibers_ext::EventCount evc_;
|
||||
|
||||
RespVec parse_args_;
|
||||
|
|
|
@ -505,7 +505,10 @@ void Service::Shutdown() {
|
|||
|
||||
// We mark that we are shutting down. After this incoming requests will be
|
||||
// rejected
|
||||
pp_.AwaitFiberOnAll([](ProactorBase* pb) { ServerState::tlocal()->Shutdown(); });
|
||||
pp_.AwaitFiberOnAll([](ProactorBase* pb) {
|
||||
ServerState::tlocal()->Shutdown();
|
||||
facade::Connection::ShutdownThreadLocal();
|
||||
});
|
||||
|
||||
// to shutdown all the runtime components that depend on EngineShard.
|
||||
server_family_.Shutdown();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue