mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
parent
c952251381
commit
c65073eca9
7 changed files with 215 additions and 188 deletions
|
@ -78,13 +78,6 @@ bool MatchHttp11Line(string_view line) {
|
|||
constexpr size_t kMinReadSize = 256;
|
||||
constexpr size_t kMaxReadSize = 32_KB;
|
||||
|
||||
struct PubMsgRecord {
|
||||
Connection::PubMessage pub_msg;
|
||||
|
||||
PubMsgRecord(Connection::PubMessage pmsg) : pub_msg(move(pmsg)) {
|
||||
}
|
||||
};
|
||||
|
||||
#ifdef ABSL_HAVE_ADDRESS_SANITIZER
|
||||
constexpr size_t kReqStorageSize = 88;
|
||||
#else
|
||||
|
@ -95,7 +88,7 @@ thread_local uint32_t free_req_release_weight = 0;
|
|||
|
||||
} // namespace
|
||||
|
||||
thread_local vector<Connection::RequestPtr> Connection::free_req_pool_;
|
||||
thread_local vector<Connection::RequestPtr> Connection::pipeline_req_pool_;
|
||||
|
||||
struct Connection::Shutdown {
|
||||
absl::flat_hash_map<ShutdownHandle, ShutdownCb> map;
|
||||
|
@ -116,19 +109,19 @@ struct Connection::RequestDeleter {
|
|||
void operator()(Request* req) const;
|
||||
};
|
||||
|
||||
using PubMessage = Connection::PubMessage;
|
||||
using MonitorMessage = std::string;
|
||||
|
||||
// Please note: The call to the Dtor is mandatory for this!!
|
||||
// This class contain types that don't have trivial destructed objects
|
||||
class Connection::Request {
|
||||
public:
|
||||
using MonitorMessage = std::string;
|
||||
|
||||
struct PipelineMsg {
|
||||
// I do not use mi_heap_t explicitly but mi_stl_allocator at the end does the same job
|
||||
// of using the thread's heap.
|
||||
struct PipelineMessage {
|
||||
// mi_stl_allocator uses mi heap internally.
|
||||
// The capacity is chosen so that we allocate a fully utilized (256 bytes) block.
|
||||
using StorageType = absl::InlinedVector<char, kReqStorageSize, mi_stl_allocator<char>>;
|
||||
|
||||
PipelineMsg(size_t nargs, size_t capacity) : args(nargs), storage(capacity) {
|
||||
PipelineMessage(size_t nargs, size_t capacity) : args(nargs), storage(capacity) {
|
||||
}
|
||||
|
||||
void Reset(size_t nargs, size_t capacity);
|
||||
|
@ -137,7 +130,7 @@ class Connection::Request {
|
|||
StorageType storage;
|
||||
};
|
||||
|
||||
using MessagePayload = std::variant<PipelineMsg, PubMsgRecord, MonitorMessage>;
|
||||
using MessagePayload = std::variant<PipelineMessage, PubMessage, MonitorMessage>;
|
||||
|
||||
// Overload to create the a new pipeline message
|
||||
static RequestPtr New(mi_heap_t* heap, const RespVec& args, size_t capacity);
|
||||
|
@ -150,26 +143,29 @@ class Connection::Request {
|
|||
|
||||
void Emplace(const RespVec& args, size_t capacity);
|
||||
|
||||
MessagePayload payload;
|
||||
|
||||
size_t StorageCapacity() const;
|
||||
|
||||
bool IsPipelineMsg() const;
|
||||
|
||||
private:
|
||||
static constexpr size_t kSizeOfPipelineMsg = sizeof(PipelineMsg);
|
||||
static constexpr size_t kSizeOfPipelineMsg = sizeof(PipelineMessage);
|
||||
|
||||
Request(size_t nargs, size_t capacity) : payload(PipelineMsg{nargs, capacity}) {
|
||||
Request(size_t nargs, size_t capacity) : payload(PipelineMessage{nargs, capacity}) {
|
||||
}
|
||||
|
||||
Request(PubMsgRecord msg) : payload(std::move(msg)) {
|
||||
Request(PubMessage msg) : payload(move(msg)) {
|
||||
}
|
||||
|
||||
Request(MonitorMessage msg) : payload(std::move(msg)) {
|
||||
Request(MonitorMessage msg) : payload(move(msg)) {
|
||||
}
|
||||
|
||||
Request(const Request&) = delete;
|
||||
|
||||
// Store arguments for pipeline message.
|
||||
void SetArgs(const RespVec& args);
|
||||
|
||||
public:
|
||||
MessagePayload payload;
|
||||
};
|
||||
|
||||
Connection::PubMessage::PubMessage(string pattern, shared_ptr<string> channel,
|
||||
|
@ -186,18 +182,18 @@ struct Connection::DispatchOperations {
|
|||
: stats{me->service_->GetThreadLocalConnectionStats()}, builder{b}, self(me) {
|
||||
}
|
||||
|
||||
void operator()(const PubMsgRecord& msg);
|
||||
void operator()(Request::PipelineMsg& msg);
|
||||
void operator()(const Request::MonitorMessage& msg);
|
||||
void operator()(const PubMessage& msg);
|
||||
void operator()(Request::PipelineMessage& msg);
|
||||
void operator()(const MonitorMessage& msg);
|
||||
|
||||
ConnectionStats* stats = nullptr;
|
||||
SinkReplyBuilder* builder = nullptr;
|
||||
Connection* self = nullptr;
|
||||
};
|
||||
|
||||
Connection::RequestPtr Connection::Request::New(std::string msg) {
|
||||
Connection::RequestPtr Connection::Request::New(MonitorMessage msg) {
|
||||
void* ptr = mi_malloc(sizeof(Request));
|
||||
Request* req = new (ptr) Request(std::move(msg));
|
||||
Request* req = new (ptr) Request(move(msg));
|
||||
return Connection::RequestPtr{req, Connection::RequestDeleter{}};
|
||||
}
|
||||
|
||||
|
@ -213,9 +209,20 @@ Connection::RequestPtr Connection::Request::New(mi_heap_t* heap, const RespVec&
|
|||
return Connection::RequestPtr{req, Connection::RequestDeleter{}};
|
||||
}
|
||||
|
||||
Connection::RequestPtr Connection::Request::New(PubMessage pub_msg) {
|
||||
// This will generate a new request for pubsub message
|
||||
// Please note that unlike the above case, we don't need to "protect", the internals here
|
||||
// since we are currently using a borrow token for it - i.e. the BlockingCounter will
|
||||
// ensure that the message is not deleted until we are finish sending it at the other
|
||||
// side of the queue
|
||||
void* ptr = mi_malloc(sizeof(Request));
|
||||
Request* req = new (ptr) Request(move(pub_msg));
|
||||
return Connection::RequestPtr{req, Connection::RequestDeleter{}};
|
||||
}
|
||||
|
||||
void Connection::Request::SetArgs(const RespVec& args) {
|
||||
// At this point we know that we have PipelineMsg in Request so next op is safe.
|
||||
PipelineMsg& pipeline_msg = std::get<PipelineMsg>(payload);
|
||||
// At this point we know that we have PipelineMessage in Request so next op is safe.
|
||||
PipelineMessage& pipeline_msg = std::get<PipelineMessage>(payload);
|
||||
auto* next = pipeline_msg.storage.data();
|
||||
for (size_t i = 0; i < args.size(); ++i) {
|
||||
auto buf = args[i].GetBuf();
|
||||
|
@ -226,34 +233,22 @@ void Connection::Request::SetArgs(const RespVec& args) {
|
|||
}
|
||||
}
|
||||
|
||||
Connection::RequestPtr Connection::Request::New(PubMessage pub_msg) {
|
||||
// This will generate a new request for pubsub message
|
||||
// Please note that unlike the above case, we don't need to "protect", the internals here
|
||||
// since we are currently using a borrow token for it - i.e. the BlockingCounter will
|
||||
// ensure that the message is not deleted until we are finish sending it at the other
|
||||
// side of the queue
|
||||
PubMsgRecord new_msg{move(pub_msg)};
|
||||
void* ptr = mi_malloc(sizeof(Request));
|
||||
Request* req = new (ptr) Request(std::move(new_msg));
|
||||
return Connection::RequestPtr{req, Connection::RequestDeleter{}};
|
||||
}
|
||||
|
||||
void Connection::RequestDeleter::operator()(Request* req) const {
|
||||
req->~Request();
|
||||
mi_free(req);
|
||||
}
|
||||
|
||||
void Connection::Request::Emplace(const RespVec& args, size_t capacity) {
|
||||
PipelineMsg* msg = get_if<PipelineMsg>(&payload);
|
||||
PipelineMessage* msg = get_if<PipelineMessage>(&payload);
|
||||
if (msg) {
|
||||
msg->Reset(args.size(), capacity);
|
||||
} else {
|
||||
payload = PipelineMsg{args.size(), capacity};
|
||||
payload = PipelineMessage{args.size(), capacity};
|
||||
}
|
||||
SetArgs(args);
|
||||
}
|
||||
|
||||
void Connection::Request::PipelineMsg::Reset(size_t nargs, size_t capacity) {
|
||||
void Connection::Request::PipelineMessage::Reset(size_t nargs, size_t capacity) {
|
||||
storage.resize(capacity);
|
||||
args.resize(nargs);
|
||||
}
|
||||
|
@ -262,8 +257,8 @@ template <class... Ts> struct Overloaded : Ts... { using Ts::operator()...; };
|
|||
template <class... Ts> Overloaded(Ts...) -> Overloaded<Ts...>;
|
||||
|
||||
size_t Connection::Request::StorageCapacity() const {
|
||||
return std::visit(Overloaded{[](const PubMsgRecord& msg) -> size_t { return 0; },
|
||||
[](const PipelineMsg& arg) -> size_t {
|
||||
return std::visit(Overloaded{[](const PubMessage& msg) -> size_t { return 0; },
|
||||
[](const PipelineMessage& arg) -> size_t {
|
||||
return arg.storage.capacity() + arg.args.capacity();
|
||||
},
|
||||
[](const MonitorMessage& arg) -> size_t { return arg.capacity(); }},
|
||||
|
@ -271,18 +266,17 @@ size_t Connection::Request::StorageCapacity() const {
|
|||
}
|
||||
|
||||
bool Connection::Request::IsPipelineMsg() const {
|
||||
return std::get_if<PipelineMsg>(&payload) != nullptr;
|
||||
return std::get_if<PipelineMessage>(&payload) != nullptr;
|
||||
}
|
||||
|
||||
void Connection::DispatchOperations::operator()(const Request::MonitorMessage& msg) {
|
||||
void Connection::DispatchOperations::operator()(const MonitorMessage& msg) {
|
||||
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
|
||||
rbuilder->SendSimpleString(msg);
|
||||
}
|
||||
|
||||
void Connection::DispatchOperations::operator()(const PubMsgRecord& msg) {
|
||||
void Connection::DispatchOperations::operator()(const PubMessage& pub_msg) {
|
||||
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
|
||||
++stats->async_writes_cnt;
|
||||
const PubMessage& pub_msg = msg.pub_msg;
|
||||
string_view arr[4];
|
||||
if (pub_msg.type == PubMessage::kPublish) {
|
||||
if (pub_msg.pattern.empty()) {
|
||||
|
@ -309,7 +303,7 @@ void Connection::DispatchOperations::operator()(const PubMsgRecord& msg) {
|
|||
}
|
||||
}
|
||||
|
||||
void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) {
|
||||
void Connection::DispatchOperations::operator()(Request::PipelineMessage& msg) {
|
||||
++stats->pipelined_cmd_cnt;
|
||||
self->pipeline_msg_cnt_--;
|
||||
|
||||
|
@ -325,7 +319,7 @@ void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) {
|
|||
|
||||
Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
|
||||
ServiceInterface* service)
|
||||
: io_buf_(kMinReadSize), http_listener_(http_listener), ctx_(ctx), service_(service) {
|
||||
: io_buf_(kMinReadSize), http_listener_(http_listener), ctx_(ctx), service_(service), name_{} {
|
||||
static atomic_uint32_t next_id{1};
|
||||
|
||||
protocol_ = protocol;
|
||||
|
@ -344,8 +338,6 @@ Connection::Connection(Protocol protocol, util::HttpListenerBase* http_listener,
|
|||
|
||||
creation_time_ = time(nullptr);
|
||||
last_interaction_ = creation_time_;
|
||||
memset(name_, 0, sizeof(name_));
|
||||
memset(phase_, 0, sizeof(phase_));
|
||||
id_ = next_id.fetch_add(1, memory_order_relaxed);
|
||||
}
|
||||
|
||||
|
@ -356,8 +348,8 @@ Connection::~Connection() {
|
|||
void Connection::OnShutdown() {
|
||||
VLOG(1) << "Connection::OnShutdown";
|
||||
|
||||
if (shutdown_) {
|
||||
for (const auto& k_v : shutdown_->map) {
|
||||
if (shutdown_cb_) {
|
||||
for (const auto& k_v : shutdown_cb_->map) {
|
||||
k_v.second();
|
||||
}
|
||||
}
|
||||
|
@ -384,17 +376,17 @@ void Connection::OnPostMigrateThread() {
|
|||
}
|
||||
|
||||
auto Connection::RegisterShutdownHook(ShutdownCb cb) -> ShutdownHandle {
|
||||
if (!shutdown_) {
|
||||
shutdown_ = make_unique<Shutdown>();
|
||||
if (!shutdown_cb_) {
|
||||
shutdown_cb_ = make_unique<Shutdown>();
|
||||
}
|
||||
return shutdown_->Add(std::move(cb));
|
||||
return shutdown_cb_->Add(std::move(cb));
|
||||
}
|
||||
|
||||
void Connection::UnregisterShutdownHook(ShutdownHandle id) {
|
||||
if (shutdown_) {
|
||||
shutdown_->Remove(id);
|
||||
if (shutdown_->map.empty())
|
||||
shutdown_.reset();
|
||||
if (shutdown_cb_) {
|
||||
shutdown_cb_->Remove(id);
|
||||
if (shutdown_cb_->map.empty())
|
||||
shutdown_cb_.reset();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -465,11 +457,11 @@ void Connection::HandleRequests() {
|
|||
VLOG(1) << "Closed connection for peer " << remote_ep;
|
||||
}
|
||||
|
||||
void Connection::RegisterOnBreak(BreakerCb breaker_cb) {
|
||||
void Connection::RegisterBreakHook(BreakerCb breaker_cb) {
|
||||
breaker_cb_ = breaker_cb;
|
||||
}
|
||||
|
||||
void Connection::SendMsgVecAsync(PubMessage pub_msg) {
|
||||
void Connection::SendPubMessageAsync(PubMessage pub_msg) {
|
||||
DCHECK(cc_);
|
||||
|
||||
if (cc_->conn_closing) {
|
||||
|
@ -502,12 +494,15 @@ string Connection::GetClientInfo(unsigned thread_id) const {
|
|||
getsockopt(lsb->native_handle(), SOL_SOCKET, SO_INCOMING_CPU, &cpu, &len);
|
||||
int my_cpu_id = sched_getcpu();
|
||||
|
||||
static constexpr string_view PHASE_NAMES[] = {"readsock", "process"};
|
||||
static_assert(PHASE_NAMES[PROCESS] == "process");
|
||||
|
||||
absl::StrAppend(&res, "id=", id_, " addr=", re.address().to_string(), ":", re.port());
|
||||
absl::StrAppend(&res, " laddr=", le.address().to_string(), ":", le.port());
|
||||
absl::StrAppend(&res, " fd=", lsb->native_handle(), " name=", name_);
|
||||
absl::StrAppend(&res, " tid=", thread_id, " irqmatch=", int(cpu == my_cpu_id));
|
||||
absl::StrAppend(&res, " age=", now - creation_time_, " idle=", now - last_interaction_);
|
||||
absl::StrAppend(&res, " phase=", phase_);
|
||||
absl::StrAppend(&res, " phase=", PHASE_NAMES[phase_]);
|
||||
|
||||
if (cc_) {
|
||||
string cc_info = service_->GetContextInfo(cc_.get());
|
||||
|
@ -575,7 +570,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
|
|||
// At the start we read from the socket to determine the HTTP/Memstore protocol.
|
||||
// Therefore we may already have some data in the buffer.
|
||||
if (io_buf_.InputLen() > 0) {
|
||||
SetPhase("process");
|
||||
phase_ = PROCESS;
|
||||
if (redis_parser_) {
|
||||
parse_status = ParseRedis();
|
||||
} else {
|
||||
|
@ -647,10 +642,10 @@ auto Connection::ParseRedis() -> ParserStatus {
|
|||
mi_heap_t* tlh = mi_heap_get_backing();
|
||||
|
||||
do {
|
||||
result = redis_parser_->Parse(io_buf_.InputBuffer(), &consumed, &parse_args_);
|
||||
result = redis_parser_->Parse(io_buf_.InputBuffer(), &consumed, &tmp_parse_args_);
|
||||
|
||||
if (result == RedisParser::OK && !parse_args_.empty()) {
|
||||
RespExpr& first = parse_args_.front();
|
||||
if (result == RedisParser::OK && !tmp_parse_args_.empty()) {
|
||||
RespExpr& first = tmp_parse_args_.front();
|
||||
if (first.type == RespExpr::STRING) {
|
||||
DVLOG(2) << "Got Args with first token " << ToSV(first.GetBuf());
|
||||
}
|
||||
|
@ -662,28 +657,18 @@ auto Connection::ParseRedis() -> ParserStatus {
|
|||
bool is_sync_dispatch = !cc_->async_dispatch && !cc_->force_dispatch;
|
||||
if (dispatch_q_.empty() && is_sync_dispatch && consumed >= io_buf_.InputLen()) {
|
||||
// Gradually release the request pool.
|
||||
// The request pool is shared by all the connections in the thread so we do not want
|
||||
// to release it aggressively just because some connection is running in
|
||||
// non-pipelined mode. So we wait at least N times,
|
||||
// where N is the number of connections in the thread.
|
||||
if (!free_req_pool_.empty()) {
|
||||
++free_req_release_weight;
|
||||
if (free_req_release_weight > stats_->num_conns) {
|
||||
free_req_release_weight = 0;
|
||||
stats_->pipeline_cache_capacity -= free_req_pool_.back()->StorageCapacity();
|
||||
free_req_pool_.pop_back();
|
||||
}
|
||||
}
|
||||
RespToArgList(parse_args_, &cmd_vec_);
|
||||
ShrinkPipelinePool();
|
||||
|
||||
DVLOG(2) << "Sync dispatch " << ToSV(cmd_vec_.front());
|
||||
RespToArgList(tmp_parse_args_, &tmp_cmd_vec_);
|
||||
|
||||
CmdArgList cmd_list{cmd_vec_.data(), cmd_vec_.size()};
|
||||
DVLOG(2) << "Sync dispatch " << ToSV(tmp_cmd_vec_.front());
|
||||
|
||||
CmdArgList cmd_list{tmp_cmd_vec_.data(), tmp_cmd_vec_.size()};
|
||||
service_->DispatchCommand(cmd_list, cc_.get());
|
||||
last_interaction_ = time(nullptr);
|
||||
} else {
|
||||
// Dispatch via queue to speedup input reading.
|
||||
RequestPtr req = FromArgs(std::move(parse_args_), tlh);
|
||||
RequestPtr req = FromArgs(std::move(tmp_parse_args_), tlh);
|
||||
++pipeline_msg_cnt_;
|
||||
dispatch_q_.push_back(std::move(req));
|
||||
if (dispatch_q_.size() == 1) {
|
||||
|
@ -783,7 +768,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, Pars
|
|||
FetchBuilderStats(stats_, builder);
|
||||
|
||||
io::MutableBytes append_buf = io_buf_.AppendBuffer();
|
||||
SetPhase("readsock");
|
||||
phase_ = READ_SOCKET;
|
||||
|
||||
::io::Result<size_t> recv_sz = peer->Recv(append_buf);
|
||||
last_interaction_ = time(nullptr);
|
||||
|
@ -797,7 +782,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, Pars
|
|||
io_buf_.CommitWrite(*recv_sz);
|
||||
stats_->io_read_bytes += *recv_sz;
|
||||
++stats_->io_read_cnt;
|
||||
SetPhase("process");
|
||||
phase_ = PROCESS;
|
||||
|
||||
if (redis_parser_) {
|
||||
parse_status = ParseRedis();
|
||||
|
@ -864,7 +849,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) {
|
|||
|
||||
if (req->IsPipelineMsg() && stats_->pipeline_cache_capacity < request_cache_limit) {
|
||||
stats_->pipeline_cache_capacity += req->StorageCapacity();
|
||||
free_req_pool_.push_back(std::move(req));
|
||||
pipeline_req_pool_.push_back(std::move(req));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -889,25 +874,48 @@ auto Connection::FromArgs(RespVec args, mi_heap_t* heap) -> RequestPtr {
|
|||
|
||||
RequestPtr req;
|
||||
|
||||
if (free_req_pool_.empty()) {
|
||||
req = Request::New(heap, args, backed_sz);
|
||||
} else {
|
||||
free_req_release_weight = 0; // Reset the release weight.
|
||||
req = move(free_req_pool_.back());
|
||||
stats_->pipeline_cache_capacity -= req->StorageCapacity();
|
||||
|
||||
free_req_pool_.pop_back();
|
||||
if (req = GetFromPipelinePool(); req) {
|
||||
req->Emplace(move(args), backed_sz);
|
||||
} else {
|
||||
req = Request::New(heap, args, backed_sz);
|
||||
}
|
||||
return req;
|
||||
}
|
||||
|
||||
void Connection::ShrinkPipelinePool() {
|
||||
if (pipeline_req_pool_.empty())
|
||||
return;
|
||||
|
||||
// The request pool is shared by all the connections in the thread so we do not want
|
||||
// to release it aggressively just because some connection is running in
|
||||
// non-pipelined mode. So by using free_req_release_weight we wait at least N times,
|
||||
// where N is the number of connections in the thread.
|
||||
++free_req_release_weight;
|
||||
|
||||
if (free_req_release_weight > stats_->num_conns) {
|
||||
free_req_release_weight = 0;
|
||||
stats_->pipeline_cache_capacity -= pipeline_req_pool_.back()->StorageCapacity();
|
||||
pipeline_req_pool_.pop_back();
|
||||
}
|
||||
}
|
||||
|
||||
Connection::RequestPtr Connection::GetFromPipelinePool() {
|
||||
if (pipeline_req_pool_.empty())
|
||||
return {};
|
||||
|
||||
free_req_release_weight = 0; // Reset the release weight.
|
||||
RequestPtr req = move(pipeline_req_pool_.back());
|
||||
stats_->pipeline_cache_capacity -= req->StorageCapacity();
|
||||
pipeline_req_pool_.pop_back();
|
||||
return req;
|
||||
}
|
||||
|
||||
void Connection::ShutdownSelf() {
|
||||
util::Connection::Shutdown();
|
||||
}
|
||||
|
||||
void Connection::ShutdownThreadLocal() {
|
||||
free_req_pool_.clear();
|
||||
pipeline_req_pool_.clear();
|
||||
}
|
||||
|
||||
void RespToArgList(const RespVec& src, CmdArgVec* dest) {
|
||||
|
@ -919,7 +927,7 @@ void RespToArgList(const RespVec& src, CmdArgVec* dest) {
|
|||
}
|
||||
}
|
||||
|
||||
void Connection::SendMonitorMsg(std::string monitor_msg) {
|
||||
void Connection::SendMonitorMessageAsync(std::string monitor_msg) {
|
||||
DCHECK(cc_);
|
||||
|
||||
if (!cc_->conn_closing) {
|
||||
|
|
|
@ -38,40 +38,30 @@ class RedisParser;
|
|||
class ServiceInterface;
|
||||
class MemcacheParser;
|
||||
|
||||
// Connection represents an active connection for a client.
|
||||
//
|
||||
// It directly dispatches regular commands from the io-loop.
|
||||
// For pipelined requests, monitor and pubsub messages it uses
|
||||
// a separate dispatch queue that is processed on a separate fiber.
|
||||
class Connection : public util::Connection {
|
||||
public:
|
||||
Connection(Protocol protocol, util::HttpListenerBase* http_listener, SSL_CTX* ctx,
|
||||
ServiceInterface* service);
|
||||
~Connection();
|
||||
|
||||
using error_code = std::error_code;
|
||||
using BreakerCb = std::function<void(uint32_t)>;
|
||||
using ShutdownCb = std::function<void()>;
|
||||
using ShutdownHandle = unsigned;
|
||||
|
||||
ShutdownHandle RegisterShutdownHook(ShutdownCb cb);
|
||||
void UnregisterShutdownHook(ShutdownHandle id);
|
||||
|
||||
Protocol protocol() const {
|
||||
return protocol_;
|
||||
}
|
||||
|
||||
using BreakerCb = std::function<void(uint32_t)>;
|
||||
void RegisterOnBreak(BreakerCb breaker_cb);
|
||||
|
||||
// This interface is used to pass a published message directly to the socket without
|
||||
// copying strings.
|
||||
// Once the msg is sent "bc" will be decreased so that caller could release the underlying
|
||||
// storage for the message.
|
||||
// virtual - to allow the testing code to override it.
|
||||
|
||||
// PubSub message, either incoming message for active subscription or reply for new subscription.
|
||||
struct PubMessage {
|
||||
enum Type { kSubscribe, kUnsubscribe, kPublish } type;
|
||||
|
||||
std::string pattern{}; // if empty - means its a regular message, otherwise it's pmessage.
|
||||
std::string pattern{}; // non-empty for pattern subscriber
|
||||
std::shared_ptr<std::string> channel{};
|
||||
std::shared_ptr<std::string> message{}; // ensure that this message would out live passing
|
||||
// between different threads/fibers
|
||||
uint32_t channel_cnt = 0; // relevant only for kSubscribe and kUnsubscribe
|
||||
std::shared_ptr<std::string> message{};
|
||||
|
||||
uint32_t channel_cnt = 0;
|
||||
|
||||
PubMessage(bool add, std::shared_ptr<std::string> channel, uint32_t channel_cnt);
|
||||
PubMessage(std::string pattern, std::shared_ptr<std::string> channel,
|
||||
|
@ -82,23 +72,28 @@ class Connection : public util::Connection {
|
|||
PubMessage(PubMessage&&) = default;
|
||||
};
|
||||
|
||||
// this function is overriden at test_utils TestConnection
|
||||
virtual void SendMsgVecAsync(PubMessage pub_msg);
|
||||
enum Phase { READ_SOCKET, PROCESS };
|
||||
|
||||
// Note that this is accepted by value because the message is processed asynchronously.
|
||||
void SendMonitorMsg(std::string monitor_msg);
|
||||
public:
|
||||
// Add PubMessage to dispatch queue.
|
||||
// Virtual because behaviour is overwritten in test_utils.
|
||||
virtual void SendPubMessageAsync(PubMessage pub_msg);
|
||||
|
||||
void SetName(std::string_view name) {
|
||||
CopyCharBuf(name, sizeof(name_), name_);
|
||||
}
|
||||
// Add monitor message to dispatch queue.
|
||||
void SendMonitorMessageAsync(std::string monitor_msg);
|
||||
|
||||
const char* GetName() const {
|
||||
return name_;
|
||||
}
|
||||
// Register hook that is executed on connection shutdown.
|
||||
ShutdownHandle RegisterShutdownHook(ShutdownCb cb);
|
||||
|
||||
void SetPhase(std::string_view phase) {
|
||||
CopyCharBuf(phase, sizeof(phase_), phase_);
|
||||
}
|
||||
void UnregisterShutdownHook(ShutdownHandle id);
|
||||
|
||||
// Register hook that is executen when the connection breaks.
|
||||
void RegisterBreakHook(BreakerCb breaker_cb);
|
||||
|
||||
// Manually shutdown self.
|
||||
void ShutdownSelf();
|
||||
|
||||
static void ShutdownThreadLocal();
|
||||
|
||||
std::string GetClientInfo(unsigned thread_id) const;
|
||||
std::string RemoteEndpointStr() const;
|
||||
|
@ -106,9 +101,17 @@ class Connection : public util::Connection {
|
|||
std::string LocalBindAddress() const;
|
||||
uint32_t GetClientId() const;
|
||||
|
||||
void ShutdownSelf();
|
||||
Protocol protocol() const {
|
||||
return protocol_;
|
||||
}
|
||||
|
||||
static void ShutdownThreadLocal();
|
||||
void SetName(std::string name) {
|
||||
name_ = name;
|
||||
}
|
||||
|
||||
std::string_view GetName() const {
|
||||
return name_;
|
||||
}
|
||||
|
||||
protected:
|
||||
void OnShutdown() override;
|
||||
|
@ -118,68 +121,84 @@ class Connection : public util::Connection {
|
|||
private:
|
||||
enum ParserStatus { OK, NEED_MORE, ERROR };
|
||||
|
||||
void HandleRequests() final;
|
||||
|
||||
static void CopyCharBuf(std::string_view src, unsigned dest_len, char* dest) {
|
||||
src = src.substr(0, dest_len - 1);
|
||||
if (!src.empty())
|
||||
memcpy(dest, src.data(), src.size());
|
||||
dest[src.size()] = '\0';
|
||||
}
|
||||
|
||||
//
|
||||
io::Result<bool> CheckForHttpProto(util::FiberSocketBase* peer);
|
||||
|
||||
void ConnectionFlow(util::FiberSocketBase* peer);
|
||||
std::variant<std::error_code, ParserStatus> IoLoop(util::FiberSocketBase* peer);
|
||||
|
||||
void DispatchFiber(util::FiberSocketBase* peer);
|
||||
|
||||
ParserStatus ParseRedis();
|
||||
ParserStatus ParseMemcache();
|
||||
void OnBreakCb(int32_t mask);
|
||||
|
||||
base::IoBuf io_buf_;
|
||||
std::unique_ptr<RedisParser> redis_parser_;
|
||||
std::unique_ptr<MemcacheParser> memcache_parser_;
|
||||
util::HttpListenerBase* http_listener_;
|
||||
SSL_CTX* ctx_;
|
||||
ServiceInterface* service_;
|
||||
time_t creation_time_, last_interaction_;
|
||||
char name_[16];
|
||||
char phase_[16];
|
||||
|
||||
std::unique_ptr<ConnectionContext> cc_;
|
||||
|
||||
class Request;
|
||||
struct DispatchOperations;
|
||||
struct DispatchCleanup;
|
||||
struct RequestDeleter;
|
||||
struct Shutdown;
|
||||
|
||||
// Requests are allocated on the mimalloc heap and thus require a custom deleter.
|
||||
using RequestPtr = std::unique_ptr<Request, RequestDeleter>;
|
||||
|
||||
// args are passed deliberately by value - to pass the ownership.
|
||||
private:
|
||||
// Check protocol and handle connection.
|
||||
void HandleRequests() final;
|
||||
|
||||
// Start dispatch fiber and run IoLoop.
|
||||
void ConnectionFlow(util::FiberSocketBase* peer);
|
||||
|
||||
// Main loop reading client messages and passing requests to dispatch queue.
|
||||
std::variant<std::error_code, ParserStatus> IoLoop(util::FiberSocketBase* peer);
|
||||
|
||||
// Returns true if HTTP header is detected.
|
||||
io::Result<bool> CheckForHttpProto(util::FiberSocketBase* peer);
|
||||
|
||||
// Handles events from dispatch queue.
|
||||
void DispatchFiber(util::FiberSocketBase* peer);
|
||||
|
||||
// Create new pipeline request, re-use from pool when possible.
|
||||
RequestPtr FromArgs(RespVec args, mi_heap_t* heap);
|
||||
|
||||
std::deque<RequestPtr> dispatch_q_; // coordinated via evc_.
|
||||
uint32_t pipeline_msg_cnt_ = 0;
|
||||
ParserStatus ParseRedis();
|
||||
ParserStatus ParseMemcache();
|
||||
|
||||
static thread_local std::vector<RequestPtr> free_req_pool_;
|
||||
dfly::EventCount evc_;
|
||||
void OnBreakCb(int32_t mask);
|
||||
|
||||
RespVec parse_args_;
|
||||
CmdArgVec cmd_vec_;
|
||||
// Shrink pipeline pool by a little while handling regular commands.
|
||||
void ShrinkPipelinePool();
|
||||
|
||||
// Returns non-null request ptr if pool has vacant entries.
|
||||
RequestPtr GetFromPipelinePool();
|
||||
|
||||
private:
|
||||
std::deque<RequestPtr> dispatch_q_; // dispatch queue
|
||||
dfly::EventCount evc_; // dispatch queue waker
|
||||
|
||||
base::IoBuf io_buf_; // used in io loop and parsers
|
||||
std::unique_ptr<RedisParser> redis_parser_;
|
||||
std::unique_ptr<MemcacheParser> memcache_parser_;
|
||||
|
||||
unsigned parser_error_ = 0;
|
||||
uint32_t id_;
|
||||
uint32_t break_poll_id_ = UINT32_MAX;
|
||||
Protocol protocol_;
|
||||
ConnectionStats* stats_ = nullptr;
|
||||
|
||||
Protocol protocol_;
|
||||
util::HttpListenerBase* http_listener_;
|
||||
SSL_CTX* ctx_;
|
||||
|
||||
ServiceInterface* service_;
|
||||
|
||||
time_t creation_time_, last_interaction_;
|
||||
|
||||
Phase phase_;
|
||||
std::string name_;
|
||||
|
||||
std::unique_ptr<ConnectionContext> cc_;
|
||||
|
||||
unsigned parser_error_ = 0;
|
||||
uint32_t pipeline_msg_cnt_ = 0;
|
||||
|
||||
uint32_t break_poll_id_ = UINT32_MAX;
|
||||
|
||||
struct Shutdown;
|
||||
std::unique_ptr<Shutdown> shutdown_;
|
||||
BreakerCb breaker_cb_;
|
||||
std::unique_ptr<Shutdown> shutdown_cb_;
|
||||
|
||||
RespVec tmp_parse_args_;
|
||||
CmdArgVec tmp_cmd_vec_;
|
||||
|
||||
// Pooled pipieline messages per-thread.
|
||||
// Aggregated while handling pipelines,
|
||||
// graudally released while handling regular commands.
|
||||
static thread_local std::vector<RequestPtr> pipeline_req_pool_;
|
||||
};
|
||||
|
||||
void RespToArgList(const RespVec& src, CmdArgVec* dest);
|
||||
|
|
|
@ -127,7 +127,7 @@ void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgLis
|
|||
|
||||
if (to_reply) {
|
||||
for (size_t i = 0; i < result.size(); ++i) {
|
||||
owner()->SendMsgVecAsync({to_add, make_shared<string>(ArgS(args, i)), result[i]});
|
||||
owner()->SendPubMessageAsync({to_add, make_shared<string>(ArgS(args, i)), result[i]});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -183,7 +183,7 @@ void SendMonitor(const std::string& msg) {
|
|||
|
||||
for (auto monitor_conn : monitors) {
|
||||
// never preempts, so we can iterate safely.
|
||||
monitor_conn->SendMonitorMsg(msg);
|
||||
monitor_conn->SendMonitorMessageAsync(msg);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -921,7 +921,7 @@ facade::ConnectionContext* Service::CreateContext(util::FiberSocketBase* peer,
|
|||
|
||||
// a bit of a hack. I set up breaker callback here for the owner.
|
||||
// Should work though it's confusing to have it here.
|
||||
owner->RegisterOnBreak([res, this](uint32_t) {
|
||||
owner->RegisterBreakHook([res, this](uint32_t) {
|
||||
if (res->transaction) {
|
||||
res->transaction->BreakOnShutdown();
|
||||
}
|
||||
|
@ -1463,7 +1463,7 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) {
|
|||
while (it != subscribers_ptr->end() && it->thread_id == idx) {
|
||||
facade::Connection* conn = it->conn_cntx->owner();
|
||||
DCHECK(conn);
|
||||
conn->SendMsgVecAsync({move(it->pattern), move(channel_ptr), move(msg_ptr)});
|
||||
conn->SendPubMessageAsync({move(it->pattern), move(channel_ptr), move(msg_ptr)});
|
||||
it->borrow_token.Dec();
|
||||
it++;
|
||||
}
|
||||
|
|
|
@ -1176,13 +1176,13 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) {
|
|||
string_view sub_cmd = ArgS(args, 0);
|
||||
|
||||
if (sub_cmd == "SETNAME" && args.size() == 2) {
|
||||
cntx->owner()->SetName(ArgS(args, 1));
|
||||
cntx->owner()->SetName(string{ArgS(args, 1)});
|
||||
return (*cntx)->SendOk();
|
||||
}
|
||||
|
||||
if (sub_cmd == "GETNAME") {
|
||||
const char* name = cntx->owner()->GetName();
|
||||
if (*name != 0) {
|
||||
auto name = cntx->owner()->GetName();
|
||||
if (!name.empty()) {
|
||||
return (*cntx)->SendBulkString(name);
|
||||
} else {
|
||||
return (*cntx)->SendNull();
|
||||
|
|
|
@ -61,7 +61,7 @@ TestConnection::TestConnection(Protocol protocol, io::StringSink* sink)
|
|||
: facade::Connection(protocol, nullptr, nullptr, nullptr), sink_(sink) {
|
||||
}
|
||||
|
||||
void TestConnection::SendMsgVecAsync(PubMessage pmsg) {
|
||||
void TestConnection::SendPubMessageAsync(PubMessage pmsg) {
|
||||
if (pmsg.type == PubMessage::kPublish) {
|
||||
messages.push_back(move(pmsg));
|
||||
} else {
|
||||
|
|
|
@ -21,7 +21,7 @@ class TestConnection : public facade::Connection {
|
|||
public:
|
||||
TestConnection(Protocol protocol, io::StringSink* sink);
|
||||
|
||||
void SendMsgVecAsync(PubMessage pmsg) final;
|
||||
void SendPubMessageAsync(PubMessage pmsg) final;
|
||||
|
||||
std::vector<PubMessage> messages;
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue