mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
chore: dragonfly_connection fixes (#2160)
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
564e38c05c
commit
3b458c21ee
1 changed files with 44 additions and 47 deletions
|
@ -190,35 +190,27 @@ size_t Connection::PipelineMessage::StorageCapacity() const {
|
|||
return storage.capacity() + args.capacity();
|
||||
}
|
||||
|
||||
template <class... Ts> struct Overloaded : Ts... {
|
||||
using Ts::operator()...;
|
||||
|
||||
template <typename T, typename D> size_t operator()(const unique_ptr<T, D>& ptr) {
|
||||
return operator()(*ptr.get());
|
||||
}
|
||||
};
|
||||
|
||||
template <class... Ts> Overloaded(Ts...) -> Overloaded<Ts...>;
|
||||
|
||||
size_t Connection::MessageHandle::UsedMemory() const {
|
||||
// TODO: don't count inline size
|
||||
auto pub_size = [](const PubMessage& msg) -> size_t {
|
||||
return sizeof(PubMessage) + (msg.channel_len + msg.message_len);
|
||||
struct MessageSize {
|
||||
size_t operator()(const PubMessagePtr& msg) {
|
||||
return sizeof(PubMessage) + (msg->channel_len + msg->message_len);
|
||||
}
|
||||
size_t operator()(const PipelineMessagePtr& msg) {
|
||||
return sizeof(PipelineMessage) + msg->args.capacity() * sizeof(MutableSlice) +
|
||||
msg->storage.capacity();
|
||||
}
|
||||
size_t operator()(const MonitorMessage& msg) {
|
||||
return msg.capacity();
|
||||
}
|
||||
size_t operator()(const AclUpdateMessage& msg) {
|
||||
return 0;
|
||||
}
|
||||
size_t operator()(const MigrationRequestMessage& msg) {
|
||||
return 0;
|
||||
}
|
||||
};
|
||||
auto msg_size = [](const PipelineMessage& arg) -> size_t {
|
||||
return sizeof(PipelineMessage) + arg.args.capacity() * sizeof(MutableSlice) +
|
||||
arg.storage.capacity();
|
||||
};
|
||||
auto monitor_size = [](const MonitorMessage& arg) -> size_t { return arg.capacity(); };
|
||||
auto acl_update_size = [](const AclUpdateMessage& msg) -> size_t {
|
||||
return sizeof(AclUpdateMessage);
|
||||
};
|
||||
auto migration_request_size = [](const MigrationRequestMessage& msg) -> size_t {
|
||||
return sizeof(MigrationRequestMessage);
|
||||
};
|
||||
return sizeof(MessageHandle) + visit(Overloaded{pub_size, msg_size, monitor_size, acl_update_size,
|
||||
migration_request_size},
|
||||
this->handle);
|
||||
|
||||
return sizeof(MessageHandle) + visit(MessageSize{}, this->handle);
|
||||
}
|
||||
|
||||
bool Connection::MessageHandle::IsPipelineMsg() const {
|
||||
|
@ -585,10 +577,12 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
|
|||
// After the client disconnected.
|
||||
cc_->conn_closing = true; // Signal dispatch to close.
|
||||
evc_.notify();
|
||||
|
||||
VLOG(1) << "Before dispatch_fb.join()";
|
||||
if (dispatch_fb_.IsJoinable())
|
||||
dispatch_fb_.Join();
|
||||
dispatch_fb_.JoinIfNeeded();
|
||||
VLOG(1) << "After dispatch_fb.join()";
|
||||
DCHECK(dispatch_q_.empty());
|
||||
|
||||
service_->OnClose(cc_.get());
|
||||
|
||||
stats_->read_buf_capacity -= io_buf_.Capacity();
|
||||
|
@ -938,6 +932,23 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
|
|||
uint64_t request_cache_limit = absl::GetFlag(FLAGS_request_cache_limit);
|
||||
size_t squashing_threshold = absl::GetFlag(FLAGS_pipeline_squash);
|
||||
|
||||
auto recycle = [this, request_cache_limit](MessageHandle msg) {
|
||||
size_t used_mem = msg.UsedMemory();
|
||||
queue_backpressure_->bytes.fetch_sub(used_mem, memory_order_relaxed);
|
||||
|
||||
stats_->dispatch_queue_bytes -= used_mem;
|
||||
stats_->dispatch_queue_entries--;
|
||||
|
||||
// Retain pipeline message in pool.
|
||||
if (auto* pipe = get_if<PipelineMessagePtr>(&msg.handle); pipe) {
|
||||
dispatch_q_cmds_count_--;
|
||||
if (stats_->pipeline_cmd_cache_bytes < request_cache_limit) {
|
||||
stats_->pipeline_cmd_cache_bytes += (*pipe)->StorageCapacity();
|
||||
pipeline_req_pool_.push_back(move(*pipe));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
uint64_t prev_epoch = fb2::FiberSwitchEpoch();
|
||||
while (!builder->GetError()) {
|
||||
evc_.await(
|
||||
|
@ -960,23 +971,6 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
|
|||
prev_epoch = cur_epoch;
|
||||
builder->SetBatchMode(dispatch_q_.size() > 1);
|
||||
|
||||
auto recycle = [this, request_cache_limit](MessageHandle msg) {
|
||||
size_t used_mem = msg.UsedMemory();
|
||||
queue_backpressure_->bytes.fetch_sub(used_mem, memory_order_relaxed);
|
||||
|
||||
stats_->dispatch_queue_bytes -= used_mem;
|
||||
stats_->dispatch_queue_entries--;
|
||||
|
||||
// Retain pipeline message in pool.
|
||||
if (auto* pipe = get_if<PipelineMessagePtr>(&msg.handle); pipe) {
|
||||
dispatch_q_cmds_count_--;
|
||||
if (stats_->pipeline_cmd_cache_bytes < request_cache_limit) {
|
||||
stats_->pipeline_cmd_cache_bytes += (*pipe)->StorageCapacity();
|
||||
pipeline_req_pool_.push_back(move(*pipe));
|
||||
}
|
||||
}
|
||||
};
|
||||
|
||||
// Special case: if the dispatch queue accumulated a big number of commands,
|
||||
// we can try to squash them
|
||||
// It is only enabled if the threshold is reached and the whole dispatch queue
|
||||
|
@ -1031,7 +1025,10 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
|
|||
|
||||
cc_->conn_closing = true;
|
||||
|
||||
// make sure that we don't have any leftovers!
|
||||
// Recycle messages even from disconnecting client to keep properly track of memory stats
|
||||
for (auto& msg : dispatch_q_) {
|
||||
recycle(std::move(msg));
|
||||
}
|
||||
dispatch_q_.clear();
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue