diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index bca4cedab..32c5767ea 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -117,7 +117,8 @@ struct Connection::RequestDeleter { // Please note: The call to the Dtor is mandatory for this!! // This class contain types that don't have trivial destructed objects -struct Connection::Request { +class Connection::Request { + public: using MonitorMessage = std::string; struct PipelineMsg { @@ -135,9 +136,6 @@ struct Connection::Request { StorageType storage; }; - static constexpr size_t kSizeOfPipelineMsg = sizeof(PipelineMsg); - - public: using MessagePayload = std::variant; // Overload to create the a new pipeline message @@ -153,7 +151,11 @@ struct Connection::Request { MessagePayload payload; + size_t StorageCapacity() const; + private: + static constexpr size_t kSizeOfPipelineMsg = sizeof(PipelineMsg); + Request(size_t nargs, size_t capacity) : payload(PipelineMsg{nargs, capacity}) { } @@ -167,6 +169,20 @@ struct Connection::Request { void SetArgs(const RespVec& args); }; +struct Connection::DispatchOperations { + DispatchOperations(SinkReplyBuilder* b, Connection* me) + : stats{me->service_->GetThreadLocalConnectionStats()}, builder{b}, self(me) { + } + + void operator()(const PubMsgRecord& msg); + void operator()(Request::PipelineMsg& msg); + void operator()(const Request::MonitorMessage& msg); + + ConnectionStats* stats = nullptr; + SinkReplyBuilder* builder = nullptr; + Connection* self = nullptr; +}; + Connection::RequestPtr Connection::Request::New(std::string msg) { void* ptr = mi_malloc(sizeof(Request)); Request* req = new (ptr) Request(std::move(msg)); @@ -230,6 +246,52 @@ void Connection::Request::PipelineMsg::Reset(size_t nargs, size_t capacity) { args.resize(nargs); } +template struct Overloaded : Ts... { using Ts::operator()...; }; +template Overloaded(Ts...) -> Overloaded; + +size_t Connection::Request::StorageCapacity() const { + return std::visit(Overloaded{[](const PubMsgRecord& msg) -> size_t { return 0; }, + [](const PipelineMsg& arg) -> size_t { + return arg.storage.capacity() + arg.args.capacity(); + }, + [](const MonitorMessage& arg) -> size_t { return arg.capacity(); }}, + payload); +} + +void Connection::DispatchOperations::operator()(const Request::MonitorMessage& msg) { + RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder; + rbuilder->SendSimpleString(msg); +} + +void Connection::DispatchOperations::operator()(const PubMsgRecord& msg) { + RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder; + ++stats->async_writes_cnt; + const PubMessage& pub_msg = msg.pub_msg; + string_view arr[4]; + if (pub_msg.pattern.empty()) { + arr[0] = "message"; + arr[1] = pub_msg.channel; + arr[2] = *pub_msg.message; + rbuilder->SendStringArr(absl::Span{arr, 3}); + } else { + arr[0] = "pmessage"; + arr[1] = pub_msg.pattern; + arr[2] = pub_msg.channel; + arr[3] = *pub_msg.message; + rbuilder->SendStringArr(absl::Span{arr, 4}); + } +} + +void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) { + ++stats->pipelined_cmd_cnt; + bool empty = self->dispatch_q_.empty(); + builder->SetBatchMode(!empty); + self->cc_->async_dispatch = true; + self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get()); + self->last_interaction_ = time(nullptr); + self->cc_->async_dispatch = false; +} + Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx, ServiceInterface* service) : io_buf_(kMinReadSize), http_listener_(http_listener), ctx_(ctx), service_(service) { @@ -449,11 +511,13 @@ io::Result Connection::CheckForHttpProto(FiberSocketBase* peer) { } void Connection::ConnectionFlow(FiberSocketBase* peer) { + stats_ = service_->GetThreadLocalConnectionStats(); + auto dispatch_fb = fibers::fiber(fibers::launch::dispatch, [&] { DispatchFiber(peer); }); - ConnectionStats* stats = service_->GetThreadLocalConnectionStats(); - ++stats->num_conns; - ++stats->conn_received_cnt; - stats->read_buf_capacity += io_buf_.Capacity(); + + ++stats_->num_conns; + ++stats_->conn_received_cnt; + stats_->read_buf_capacity += io_buf_.Capacity(); ParserStatus parse_status = OK; @@ -490,18 +554,18 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { VLOG(1) << "After dispatch_fb.join()"; service_->OnClose(cc_.get()); - stats->read_buf_capacity -= io_buf_.Capacity(); + stats_->read_buf_capacity -= io_buf_.Capacity(); // Update num_replicas if this was a replica connection. if (cc_->replica_conn) { - --stats->num_replicas; + --stats_->num_replicas; } // We wait for dispatch_fb to finish writing the previous replies before replying to the last // offending request. if (parse_status == ERROR) { VLOG(1) << "Error parser status " << parser_error_; - ++stats->parser_err_cnt; + ++stats_->parser_err_cnt; if (redis_parser_) { SendProtocolError(RedisParser::Result(parser_error_), peer); @@ -521,7 +585,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { LOG(WARNING) << "Socket error " << ec << " " << ec.message(); } - --stats->num_conns; + --stats_->num_conns; } auto Connection::ParseRedis() -> ParserStatus { @@ -649,12 +713,11 @@ void Connection::OnBreakCb(int32_t mask) { auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant { SinkReplyBuilder* builder = cc_->reply_builder(); - ConnectionStats* stats = service_->GetThreadLocalConnectionStats(); error_code ec; ParserStatus parse_status = OK; do { - FetchBuilderStats(stats, builder); + FetchBuilderStats(stats_, builder); io::MutableBytes append_buf = io_buf_.AppendBuffer(); SetPhase("readsock"); @@ -669,8 +732,8 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variantio_read_bytes += *recv_sz; - ++stats->io_read_cnt; + stats_->io_read_bytes += *recv_sz; + ++stats_->io_read_cnt; SetPhase("process"); if (redis_parser_) { @@ -698,7 +761,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variantread_buf_capacity += (io_buf_.Capacity() - capacity); + stats_->read_buf_capacity += (io_buf_.Capacity() - capacity); } } } else if (parse_status != OK) { @@ -707,7 +770,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variantGetError(); } while (peer->IsOpen() && !ec); - FetchBuilderStats(stats, builder); + FetchBuilderStats(stats_, builder); if (ec) return ec; @@ -715,54 +778,6 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variantservice_->GetThreadLocalConnectionStats()}, builder{b}, self(me) { - } - - void operator()(const PubMsgRecord& msg); - void operator()(Request::PipelineMsg& msg); - void operator()(const Request::MonitorMessage& msg); - - ConnectionStats* stats = nullptr; - SinkReplyBuilder* builder = nullptr; - Connection* self = nullptr; -}; - -void Connection::DispatchOperations::operator()(const Request::MonitorMessage& msg) { - RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder; - rbuilder->SendSimpleString(msg); -} - -void Connection::DispatchOperations::operator()(const PubMsgRecord& msg) { - RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder; - ++stats->async_writes_cnt; - const PubMessage& pub_msg = msg.pub_msg; - string_view arr[4]; - if (pub_msg.pattern.empty()) { - arr[0] = "message"; - arr[1] = pub_msg.channel; - arr[2] = *pub_msg.message; - rbuilder->SendStringArr(absl::Span{arr, 3}); - } else { - arr[0] = "pmessage"; - arr[1] = pub_msg.pattern; - arr[2] = pub_msg.channel; - arr[3] = *pub_msg.message; - rbuilder->SendStringArr(absl::Span{arr, 4}); - } -} - -void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) { - ++stats->pipelined_cmd_cnt; - bool empty = self->dispatch_q_.empty(); - builder->SetBatchMode(!empty); - self->cc_->async_dispatch = true; - self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get()); - self->last_interaction_ = time(nullptr); - self->cc_->async_dispatch = false; -} - // DispatchFiber handles commands coming from the InputLoop. // Thus, InputLoop can quickly read data from the input buffer, parse it and push // into the dispatch queue and DispatchFiber will run those commands asynchronously with @@ -784,8 +799,10 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) { std::visit(dispatch_op, req->payload); // Do not cache more than K items. - if (free_req_pool_.size() < 16) + if (free_req_pool_.size() < 16) { + stats_->pipeline_cache_capacity += req->StorageCapacity(); free_req_pool_.push_back(std::move(req)); + } } cc_->conn_closing = true; @@ -813,6 +830,8 @@ auto Connection::FromArgs(RespVec args, mi_heap_t* heap) -> RequestPtr { req = Request::New(heap, args, backed_sz); } else { req = move(free_req_pool_.back()); + stats_->pipeline_cache_capacity -= req->StorageCapacity(); + free_req_pool_.pop_back(); req->Emplace(move(args), backed_sz); } diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 418ab71e4..ae9e54011 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -63,10 +63,7 @@ class Connection : public util::Connection { // this function is overriden at test_utils TestConnection virtual void SendMsgVecAsync(const PubMessage& pub_msg); - // Please note, this accept the message by value, since we really want to - // create a new copy here, so that we would not need to "worry" about memory - // management, we are assuming that we would not have many copy for this, and that - // we would not need in this way to sync on the lifetime of the message + // Note that this is accepted by value because the message is processed asynchronously. void SendMonitorMsg(std::string monitor_msg); void SetName(std::string_view name) { @@ -129,7 +126,7 @@ class Connection : public util::Connection { std::unique_ptr cc_; - struct Request; + class Request; struct DispatchOperations; struct DispatchCleanup; struct RequestDeleter; @@ -149,6 +146,7 @@ class Connection : public util::Connection { unsigned parser_error_ = 0; uint32_t id_; uint32_t break_poll_id_ = UINT32_MAX; + ConnectionStats* stats_ = nullptr; Protocol protocol_; diff --git a/src/facade/facade.cc b/src/facade/facade.cc index 0e024c39f..9329ebaf8 100644 --- a/src/facade/facade.cc +++ b/src/facade/facade.cc @@ -21,9 +21,10 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats); ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { // To break this code deliberately if we add/remove a field to this struct. - static_assert(kSizeConnStats == 176); + static_assert(kSizeConnStats == 184); ADD(read_buf_capacity); + ADD(pipeline_cache_capacity); ADD(io_read_cnt); ADD(io_read_bytes); ADD(io_write_cnt); diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h index 63abf3479..dfc6475cc 100644 --- a/src/facade/facade_types.h +++ b/src/facade/facade_types.h @@ -32,13 +32,15 @@ struct ConnectionStats { absl::flat_hash_map cmd_count_map; size_t read_buf_capacity = 0; + size_t pipeline_cache_capacity = 0; + size_t io_read_cnt = 0; size_t io_read_bytes = 0; size_t io_write_cnt = 0; size_t io_write_bytes = 0; - size_t command_cnt = 0; - size_t pipelined_cmd_cnt = 0; - size_t parser_err_cnt = 0; + uint64_t command_cnt = 0; + uint64_t pipelined_cmd_cnt = 0; + uint64_t parser_err_cnt = 0; // Writes count that happened via DispatchOperations call. uint64_t async_writes_cnt = 0; diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index e18af0456..e9c34ed3c 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -4,16 +4,18 @@ cxx_link(dragonfly base dragonfly_lib epoll_fiber_lib) if (CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND CMAKE_BUILD_TYPE STREQUAL "Release") # Add core2 only to this file, thus avoiding instructions in this object file that # can cause SIGILL. - set_source_files_properties(dfly_main.cc PROPERTIES COMPILE_FLAGS -march=core2 COMPILE_DEFINITIONS SOURCE_PATH_FROM_BUILD_ENV=${CMAKE_SOURCE_DIR}) + set_source_files_properties(dfly_main.cc PROPERTIES COMPILE_FLAGS -march=core2 COMPILE_DEFINITIONS + SOURCE_PATH_FROM_BUILD_ENV=${CMAKE_SOURCE_DIR}) endif() -add_library(dfly_transaction db_slice.cc malloc_stats.cc engine_shard_set.cc blocking_controller.cc common.cc - io_mgr.cc journal/journal.cc journal/journal_slice.cc table.cc +add_library(dfly_transaction db_slice.cc malloc_stats.cc engine_shard_set.cc blocking_controller.cc + common.cc + io_mgr.cc journal/journal.cc journal/journal_slice.cc server_state.cc table.cc tiered_storage.cc transaction.cc) cxx_link(dfly_transaction uring_fiber_lib dfly_core strings_lib) add_library(dragonfly_lib channel_slice.cc command_registry.cc - config_flags.cc conn_context.cc debugcmd.cc server_state.cc dflycmd.cc + config_flags.cc conn_context.cc debugcmd.cc dflycmd.cc generic_family.cc hset_family.cc json_family.cc list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc diff --git a/src/server/common.cc b/src/server/common.cc index e0591af13..ded7c2552 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -7,7 +7,6 @@ #include #include #include -#include #include @@ -15,10 +14,8 @@ extern "C" { #include "redis/object.h" #include "redis/rdb.h" #include "redis/util.h" -#include "redis/zmalloc.h" } -#include "base/flags.h" #include "base/logging.h" #include "core/compact_object.h" #include "server/engine_shard_set.h" @@ -27,46 +24,15 @@ extern "C" { #include "server/server_state.h" #include "server/transaction.h" -ABSL_FLAG(uint32_t, interpreter_per_thread, 10, "Lua interpreters per thread"); - namespace dfly { using namespace std; -thread_local ServerState ServerState::state_; - atomic_uint64_t used_mem_peak(0); atomic_uint64_t used_mem_current(0); unsigned kernel_version = 0; size_t max_memory_limit = 0; -ServerState::ServerState() : interpreter_mgr_{absl::GetFlag(FLAGS_interpreter_per_thread)} { - CHECK(mi_heap_get_backing() == mi_heap_get_default()); - - mi_heap_t* tlh = mi_heap_new(); - init_zmalloc_threadlocal(tlh); - data_heap_ = tlh; -} - -ServerState::~ServerState() { -} - -void ServerState::Init() { - gstate_ = GlobalState::ACTIVE; -} - -void ServerState::Shutdown() { - gstate_ = GlobalState::SHUTTING_DOWN; -} - -Interpreter* ServerState::BorrowInterpreter() { - return interpreter_mgr_.Get(); -} - -void ServerState::ReturnInterpreter(Interpreter* ir) { - interpreter_mgr_.Return(ir); -} - const char* GlobalStateName(GlobalState s) { switch (s) { case GlobalState::ACTIVE: diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index c3834012c..acc36c3c0 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -30,12 +30,6 @@ void StoredCmd::Invoke(ConnectionContext* ctx) { descr->Invoke(arg_list_, ctx); } -void ConnectionContext::SendMonitorMsg(std::string msg) { - CHECK(owner()); - - owner()->SendMonitorMsg(std::move(msg)); -} - void ConnectionContext::ChangeMonitor(bool start) { // This will either remove or register a new connection // at the "top level" thread --> ServerState context @@ -43,11 +37,11 @@ void ConnectionContext::ChangeMonitor(bool start) { // then notify all other threads that there is a change in the number of monitors auto& my_monitors = ServerState::tlocal()->Monitors(); if (start) { - my_monitors.Add(this); + my_monitors.Add(owner()); } else { VLOG(1) << "connection " << owner()->GetClientInfo() - << " no longer needs to be monitored - removing 0x" << std::hex << (const void*)this; - my_monitors.Remove(this); + << " no longer needs to be monitored - removing 0x" << std::hex << this; + my_monitors.Remove(owner()); } // Tell other threads that about the change in the number of connection that we monitor shard_set->pool()->Await( diff --git a/src/server/conn_context.h b/src/server/conn_context.h index 786e5079f..42fe622d0 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -135,12 +135,6 @@ class ConnectionContext : public facade::ConnectionContext { return conn_state.db_index; } - // Note that this is accepted by value for lifetime reasons - // we want to have our own copy since we are assuming that - // 1. there will be not to many connections that we in monitor state - // 2. we need to have for each of them each own copy for thread safe reasons - void SendMonitorMsg(std::string msg); - void ChangeSubscription(bool to_add, bool to_reply, CmdArgList args); void ChangePSub(bool to_add, bool to_reply, CmdArgList args); void UnsubscribeAll(bool to_reply); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index b82b6bbf8..2591e2fe2 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -167,6 +167,20 @@ std::string MakeMonitorMessage(const ConnectionState& conn_state, return message; } +void SendMonitor(const std::string& msg) { + const auto& monitor_repo = ServerState::tlocal()->Monitors(); + const auto& monitors = monitor_repo.monitors(); + if (!monitors.empty()) { + VLOG(1) << "thread " << util::ProactorBase::GetIndex() << " sending monitor message '" << msg + << "' for " << monitors.size(); + + for (auto monitor_conn : monitors) { + // never preempts, so we can iterate safely. + monitor_conn->SendMonitorMsg(msg); + } + } +} + void DispatchMonitorIfNeeded(bool admin_cmd, ConnectionContext* connection, CmdArgList args) { // We are not sending any admin command in the monitor, and we do not want to // do any processing if we don't have any waiting connections with monitor @@ -175,15 +189,11 @@ void DispatchMonitorIfNeeded(bool admin_cmd, ConnectionContext* connection, CmdA if (!(my_monitors.Empty() || admin_cmd)) { // We have connections waiting to get the info on the last command, send it to them auto monitor_msg = MakeMonitorMessage(connection->conn_state, connection->owner(), args); - // Note that this is accepted by value for lifetime reasons - // we want to have our own copy since we are assuming that - // 1. there will be not to many connections that we in monitor state - // 2. we need to have for each of them each own copy for thread safe reasons + VLOG(1) << "sending command '" << monitor_msg << "' to the clients that registered on it"; + shard_set->pool()->DispatchBrief( - [msg = std::move(monitor_msg)](unsigned idx, util::ProactorBase*) { - ServerState::tlocal()->Monitors().Send(msg); - }); + [msg = std::move(monitor_msg)](unsigned idx, util::ProactorBase*) { SendMonitor(msg); }); } } diff --git a/src/server/server_family.cc b/src/server/server_family.cc index b17334e89..224d8011c 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1319,6 +1319,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("listpack_blobs", total.listpack_blob_cnt); append("listpack_bytes", total.listpack_bytes); append("small_string_bytes", m.small_string_bytes); + append("pipeline_cache_bytes", m.conn_stats.pipeline_cache_capacity); append("maxmemory", max_memory_limit); append("maxmemory_human", HumanReadableNumBytes(max_memory_limit)); append("cache_mode", GetFlag(FLAGS_cache_mode) ? "cache" : "store"); diff --git a/src/server/server_state.cc b/src/server/server_state.cc index 49beda7fb..fd580a5dd 100644 --- a/src/server/server_state.cc +++ b/src/server/server_state.cc @@ -4,12 +4,23 @@ #include "server/server_state.h" +#include + +extern "C" { +#include "redis/zmalloc.h" +} + +#include "base/flags.h" #include "base/logging.h" -#include "server/conn_context.h" +#include "facade/conn_context.h" + +ABSL_FLAG(uint32_t, interpreter_per_thread, 10, "Lua interpreters per thread"); namespace dfly { -void MonitorsRepo::Add(ConnectionContext* connection) { +thread_local ServerState ServerState::state_; + +void MonitorsRepo::Add(facade::Connection* connection) { VLOG(1) << "register connection " << " at address 0x" << std::hex << (const void*)connection << " for thread " << util::ProactorBase::GetIndex(); @@ -17,25 +28,14 @@ void MonitorsRepo::Add(ConnectionContext* connection) { monitors_.push_back(connection); } -void MonitorsRepo::Send(const std::string& msg) { - if (!monitors_.empty()) { - VLOG(1) << "thread " << util::ProactorBase::GetIndex() << " sending monitor message '" << msg - << "' for " << monitors_.size(); - for (auto monitor_conn : monitors_) { - monitor_conn->SendMonitorMsg(msg); - } - } -} - -void MonitorsRepo::Remove(const ConnectionContext* conn) { +void MonitorsRepo::Remove(const facade::Connection* conn) { auto it = std::find_if(monitors_.begin(), monitors_.end(), [&conn](const auto& val) { return val == conn; }); if (it != monitors_.end()) { - VLOG(1) << "removing connection 0x" << std::hex << (const void*)conn << " releasing token"; + VLOG(1) << "removing connection 0x" << std::hex << conn << " releasing token"; monitors_.erase(it); } else { - VLOG(1) << "no connection 0x" << std::hex << (const void*)conn - << " found in the registered list here"; + VLOG(1) << "no connection 0x" << std::hex << conn << " found in the registered list here"; } } @@ -48,4 +48,31 @@ void MonitorsRepo::NotifyChangeCount(bool added) { } } +ServerState::ServerState() : interpreter_mgr_{absl::GetFlag(FLAGS_interpreter_per_thread)} { + CHECK(mi_heap_get_backing() == mi_heap_get_default()); + + mi_heap_t* tlh = mi_heap_new(); + init_zmalloc_threadlocal(tlh); + data_heap_ = tlh; +} + +ServerState::~ServerState() { +} + +void ServerState::Init() { + gstate_ = GlobalState::ACTIVE; +} + +void ServerState::Shutdown() { + gstate_ = GlobalState::SHUTTING_DOWN; +} + +Interpreter* ServerState::BorrowInterpreter() { + return interpreter_mgr_.Get(); +} + +void ServerState::ReturnInterpreter(Interpreter* ir) { + interpreter_mgr_.Return(ir); +} + } // end of namespace dfly diff --git a/src/server/server_state.h b/src/server/server_state.h index 777e4d0f4..bb482540e 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -14,9 +14,12 @@ typedef struct mi_heap_s mi_heap_t; +namespace facade { +class Connection; +} + namespace dfly { -class ConnectionContext; namespace journal { class Journal; } // namespace journal @@ -36,21 +39,17 @@ class Journal; // at which this would run. It also minimized the number of copied for this list. class MonitorsRepo { public: + using MonitorVec = std::vector; + // This function adds a new connection to be monitored. This function only add // new connection that belong to this thread! Must not be called outside of this // thread context - void Add(ConnectionContext* info); - - // Note that this is accepted by value for lifetime reasons - // we want to have our own copy since we are assuming that - // 1. there will be not to many connections that we in monitor state - // 2. we need to have for each of them each own copy for thread safe reasons - void Send(const std::string& msg); + void Add(facade::Connection* conn); // This function remove a connection what was monitored. This function only removes // a connection that belong to this thread! Must not be called outside of this // thread context - void Remove(const ConnectionContext* conn); + void Remove(const facade::Connection* conn); // We have for each thread the total number of monitors in the application. // So this call is thread safe since we hold a copy of this for each thread. @@ -68,8 +67,11 @@ class MonitorsRepo { return monitors_.size(); } + const MonitorVec& monitors() const { + return monitors_; + } + private: - using MonitorVec = std::vector; MonitorVec monitors_; // save connections belonging to this thread only! unsigned int global_count_ = 0; // by global its means that we count the monitor for all threads };