mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
Monitor command (#427)
feat(server): support monitor command - allowing user to debug commands from all connections by using a connection as monitors for the this (#344) Signed-off-by: Boaz Sade <boaz@dragonflydb.io>
This commit is contained in:
parent
f8f3eac960
commit
c9f7cbe0e9
12 changed files with 346 additions and 24 deletions
|
@ -104,7 +104,7 @@ with respect to Memcached and Redis APIs.
|
||||||
- [X] ZSCORE
|
- [X] ZSCORE
|
||||||
- [ ] Other
|
- [ ] Other
|
||||||
- [ ] BGREWRITEAOF
|
- [ ] BGREWRITEAOF
|
||||||
- [ ] MONITOR
|
- [x] MONITOR
|
||||||
- [ ] RANDOMKEY
|
- [ ] RANDOMKEY
|
||||||
|
|
||||||
### API 2
|
### API 2
|
||||||
|
|
|
@ -19,7 +19,8 @@ class ConnectionContext {
|
||||||
|
|
||||||
// We won't have any virtual methods, probably. However, since we allocate a derived class,
|
// We won't have any virtual methods, probably. However, since we allocate a derived class,
|
||||||
// we need to declare a virtual d-tor, so we could properly delete it from Connection code.
|
// we need to declare a virtual d-tor, so we could properly delete it from Connection code.
|
||||||
virtual ~ConnectionContext() {}
|
virtual ~ConnectionContext() {
|
||||||
|
}
|
||||||
|
|
||||||
Connection* owner() {
|
Connection* owner() {
|
||||||
return owner_;
|
return owner_;
|
||||||
|
|
|
@ -109,6 +109,8 @@ struct Connection::RequestDeleter {
|
||||||
// Please note: The call to the Dtor is mandatory for this!!
|
// Please note: The call to the Dtor is mandatory for this!!
|
||||||
// This class contain types that don't have trivial destructed objects
|
// This class contain types that don't have trivial destructed objects
|
||||||
struct Connection::Request {
|
struct Connection::Request {
|
||||||
|
using MonitorMessage = std::string;
|
||||||
|
|
||||||
struct PipelineMsg {
|
struct PipelineMsg {
|
||||||
absl::FixedArray<MutableSlice, 6> args;
|
absl::FixedArray<MutableSlice, 6> args;
|
||||||
|
|
||||||
|
@ -122,7 +124,7 @@ struct Connection::Request {
|
||||||
};
|
};
|
||||||
|
|
||||||
private:
|
private:
|
||||||
using MessagePayload = std::variant<PipelineMsg, PubMsgRecord>;
|
using MessagePayload = std::variant<PipelineMsg, PubMsgRecord, MonitorMessage>;
|
||||||
|
|
||||||
Request(size_t nargs, size_t capacity) : payload(PipelineMsg{nargs, capacity}) {
|
Request(size_t nargs, size_t capacity) : payload(PipelineMsg{nargs, capacity}) {
|
||||||
}
|
}
|
||||||
|
@ -130,18 +132,30 @@ struct Connection::Request {
|
||||||
Request(PubMsgRecord msg) : payload(std::move(msg)) {
|
Request(PubMsgRecord msg) : payload(std::move(msg)) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Request(MonitorMessage msg) : payload(std::move(msg)) {
|
||||||
|
}
|
||||||
|
|
||||||
Request(const Request&) = delete;
|
Request(const Request&) = delete;
|
||||||
|
|
||||||
public:
|
public:
|
||||||
// Overload to create the a new pipeline message
|
// Overload to create the a new pipeline message
|
||||||
static RequestPtr New(mi_heap_t* heap, RespVec args, size_t capacity);
|
static RequestPtr New(mi_heap_t* heap, RespVec args, size_t capacity);
|
||||||
|
|
||||||
// overload to create a new pubsub message
|
// Overload to create a new pubsub message
|
||||||
static RequestPtr New(const PubMessage& pub_msg, fibers_ext::BlockingCounter bc);
|
static RequestPtr New(const PubMessage& pub_msg, fibers_ext::BlockingCounter bc);
|
||||||
|
|
||||||
|
// Overload to create a new the monitor message
|
||||||
|
static RequestPtr New(MonitorMessage msg);
|
||||||
|
|
||||||
MessagePayload payload;
|
MessagePayload payload;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Connection::RequestPtr Connection::Request::New(std::string msg) {
|
||||||
|
void* ptr = mi_malloc(sizeof(Request));
|
||||||
|
Request* req = new (ptr) Request(std::move(msg));
|
||||||
|
return Connection::RequestPtr{req, Connection::RequestDeleter{}};
|
||||||
|
}
|
||||||
|
|
||||||
Connection::RequestPtr Connection::Request::New(mi_heap_t* heap, RespVec args, size_t capacity) {
|
Connection::RequestPtr Connection::Request::New(mi_heap_t* heap, RespVec args, size_t capacity) {
|
||||||
constexpr auto kReqSz = sizeof(Request);
|
constexpr auto kReqSz = sizeof(Request);
|
||||||
void* ptr = mi_heap_malloc_small(heap, kReqSz);
|
void* ptr = mi_heap_malloc_small(heap, kReqSz);
|
||||||
|
@ -159,6 +173,7 @@ Connection::RequestPtr Connection::Request::New(mi_heap_t* heap, RespVec args, s
|
||||||
pipeline_msg.args[i] = MutableSlice(next, s);
|
pipeline_msg.args[i] = MutableSlice(next, s);
|
||||||
next += s;
|
next += s;
|
||||||
}
|
}
|
||||||
|
|
||||||
return Connection::RequestPtr{req, Connection::RequestDeleter{}};
|
return Connection::RequestPtr{req, Connection::RequestDeleter{}};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -491,8 +506,6 @@ auto Connection::ParseRedis() -> ParserStatus {
|
||||||
service_->DispatchCommand(cmd_list, cc_.get());
|
service_->DispatchCommand(cmd_list, cc_.get());
|
||||||
last_interaction_ = time(nullptr);
|
last_interaction_ = time(nullptr);
|
||||||
} else {
|
} else {
|
||||||
VLOG(2) << "Dispatch async";
|
|
||||||
|
|
||||||
// Dispatch via queue to speedup input reading.
|
// Dispatch via queue to speedup input reading.
|
||||||
RequestPtr req = FromArgs(std::move(parse_args_), tlh);
|
RequestPtr req = FromArgs(std::move(parse_args_), tlh);
|
||||||
|
|
||||||
|
@ -655,19 +668,23 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variant<error_code, Pars
|
||||||
|
|
||||||
struct Connection::DispatchOperations {
|
struct Connection::DispatchOperations {
|
||||||
DispatchOperations(SinkReplyBuilder* b, Connection* me)
|
DispatchOperations(SinkReplyBuilder* b, Connection* me)
|
||||||
: stats{me->service_->GetThreadLocalConnectionStats()}, builder{b},
|
: stats{me->service_->GetThreadLocalConnectionStats()}, builder{b}, self(me) {
|
||||||
empty{me->dispatch_q_.empty()}, self(me) {
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void operator()(PubMsgRecord& msg);
|
void operator()(PubMsgRecord& msg);
|
||||||
void operator()(Request::PipelineMsg& msg);
|
void operator()(Request::PipelineMsg& msg);
|
||||||
|
void operator()(const Request::MonitorMessage& msg);
|
||||||
|
|
||||||
ConnectionStats* stats = nullptr;
|
ConnectionStats* stats = nullptr;
|
||||||
SinkReplyBuilder* builder = nullptr;
|
SinkReplyBuilder* builder = nullptr;
|
||||||
bool empty = false;
|
|
||||||
Connection* self = nullptr;
|
Connection* self = nullptr;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
void Connection::DispatchOperations::operator()(const Request::MonitorMessage& msg) {
|
||||||
|
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
|
||||||
|
rbuilder->SendSimpleString(msg);
|
||||||
|
}
|
||||||
|
|
||||||
void Connection::DispatchOperations::operator()(PubMsgRecord& msg) {
|
void Connection::DispatchOperations::operator()(PubMsgRecord& msg) {
|
||||||
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
|
RedisReplyBuilder* rbuilder = (RedisReplyBuilder*)builder;
|
||||||
++stats->async_writes_cnt;
|
++stats->async_writes_cnt;
|
||||||
|
@ -690,7 +707,7 @@ void Connection::DispatchOperations::operator()(PubMsgRecord& msg) {
|
||||||
|
|
||||||
void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) {
|
void Connection::DispatchOperations::operator()(Request::PipelineMsg& msg) {
|
||||||
++stats->pipelined_cmd_cnt;
|
++stats->pipelined_cmd_cnt;
|
||||||
|
bool empty = self->dispatch_q_.empty();
|
||||||
builder->SetBatchMode(!empty);
|
builder->SetBatchMode(!empty);
|
||||||
self->cc_->async_dispatch = true;
|
self->cc_->async_dispatch = true;
|
||||||
self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get());
|
self->service_->DispatchCommand(CmdArgList{msg.args.data(), msg.args.size()}, self->cc_.get());
|
||||||
|
@ -703,6 +720,9 @@ struct Connection::DispatchCleanup {
|
||||||
msg.bc.Dec();
|
msg.bc.Dec();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void operator()(const Connection::Request::MonitorMessage&) const {
|
||||||
|
}
|
||||||
|
|
||||||
void operator()(const Connection::Request::PipelineMsg&) const {
|
void operator()(const Connection::Request::PipelineMsg&) const {
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
@ -769,4 +789,26 @@ void RespToArgList(const RespVec& src, CmdArgVec* dest) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Connection::SendMonitorMsg(std::string monitor_msg) {
|
||||||
|
DCHECK(cc_);
|
||||||
|
|
||||||
|
if (!cc_->conn_closing) {
|
||||||
|
RequestPtr req = Request::New(std::move(monitor_msg));
|
||||||
|
dispatch_q_.push_back(std::move(req));
|
||||||
|
if (dispatch_q_.size() == 1) {
|
||||||
|
evc_.notify();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string Connection::RemoteEndpointStr() const {
|
||||||
|
LinuxSocketBase* lsb = static_cast<LinuxSocketBase*>(socket_.get());
|
||||||
|
bool unix_socket = lsb->IsUDS();
|
||||||
|
std::string connection_str = unix_socket ? "unix:" : std::string{};
|
||||||
|
|
||||||
|
auto re = lsb->RemoteEndpoint();
|
||||||
|
absl::StrAppend(&connection_str, re.address().to_string(), ":", re.port());
|
||||||
|
return connection_str;
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace facade
|
} // namespace facade
|
||||||
|
|
|
@ -59,8 +59,15 @@ class Connection : public util::Connection {
|
||||||
std::string_view message;
|
std::string_view message;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// this function is overriden at test_utils TestConnection
|
||||||
virtual void SendMsgVecAsync(const PubMessage& pub_msg, util::fibers_ext::BlockingCounter bc);
|
virtual void SendMsgVecAsync(const PubMessage& pub_msg, util::fibers_ext::BlockingCounter bc);
|
||||||
|
|
||||||
|
// Please note, this accept the message by value, since we really want to
|
||||||
|
// create a new copy here, so that we would not need to "worry" about memory
|
||||||
|
// management, we are assuming that we would not have many copy for this, and that
|
||||||
|
// we would not need in this way to sync on the lifetime of the message
|
||||||
|
void SendMonitorMsg(std::string monitor_msg);
|
||||||
|
|
||||||
void SetName(std::string_view name) {
|
void SetName(std::string_view name) {
|
||||||
CopyCharBuf(name, sizeof(name_), name_);
|
CopyCharBuf(name, sizeof(name_), name_);
|
||||||
}
|
}
|
||||||
|
@ -70,6 +77,7 @@ class Connection : public util::Connection {
|
||||||
}
|
}
|
||||||
|
|
||||||
std::string GetClientInfo() const;
|
std::string GetClientInfo() const;
|
||||||
|
std::string RemoteEndpointStr() const;
|
||||||
uint32 GetClientId() const;
|
uint32 GetClientId() const;
|
||||||
|
|
||||||
void ShutdownSelf();
|
void ShutdownSelf();
|
||||||
|
|
|
@ -63,7 +63,6 @@ string UnknownSubCmd(string_view subcmd, string_view cmd) {
|
||||||
cmd, " HELP.");
|
cmd, " HELP.");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
const char kSyntaxErr[] = "syntax error";
|
const char kSyntaxErr[] = "syntax error";
|
||||||
const char kWrongTypeErr[] = "-WRONGTYPE Operation against a key holding the wrong kind of value";
|
const char kWrongTypeErr[] = "-WRONGTYPE Operation against a key holding the wrong kind of value";
|
||||||
const char kKeyNotFoundErr[] = "no such key";
|
const char kKeyNotFoundErr[] = "no such key";
|
||||||
|
|
|
@ -13,7 +13,7 @@ add_library(dfly_transaction db_slice.cc engine_shard_set.cc blocking_controller
|
||||||
cxx_link(dfly_transaction uring_fiber_lib dfly_core strings_lib)
|
cxx_link(dfly_transaction uring_fiber_lib dfly_core strings_lib)
|
||||||
|
|
||||||
add_library(dragonfly_lib channel_slice.cc command_registry.cc
|
add_library(dragonfly_lib channel_slice.cc command_registry.cc
|
||||||
config_flags.cc conn_context.cc debugcmd.cc dflycmd.cc
|
config_flags.cc conn_context.cc debugcmd.cc server_state.cc dflycmd.cc
|
||||||
generic_family.cc hset_family.cc json_family.cc
|
generic_family.cc hset_family.cc json_family.cc
|
||||||
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc
|
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc
|
||||||
snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc
|
snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc
|
||||||
|
|
|
@ -6,12 +6,40 @@
|
||||||
|
|
||||||
#include "base/logging.h"
|
#include "base/logging.h"
|
||||||
#include "server/engine_shard_set.h"
|
#include "server/engine_shard_set.h"
|
||||||
|
#include "server/server_family.h"
|
||||||
|
#include "server/server_state.h"
|
||||||
|
#include "src/facade/dragonfly_connection.h"
|
||||||
#include "util/proactor_base.h"
|
#include "util/proactor_base.h"
|
||||||
|
|
||||||
namespace dfly {
|
namespace dfly {
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
|
void ConnectionContext::SendMonitorMsg(std::string msg) {
|
||||||
|
CHECK(owner());
|
||||||
|
|
||||||
|
owner()->SendMonitorMsg(std::move(msg));
|
||||||
|
}
|
||||||
|
|
||||||
|
void ConnectionContext::ChangeMonitor(bool start) {
|
||||||
|
// This will either remove or register a new connection
|
||||||
|
// at the "top level" thread --> ServerState context
|
||||||
|
// note that we are registering/removing this connection to the thread at which at run
|
||||||
|
// then notify all other threads that there is a change in the number of monitors
|
||||||
|
auto& my_monitors = ServerState::tlocal()->Monitors();
|
||||||
|
if (start) {
|
||||||
|
my_monitors.Add(this);
|
||||||
|
} else {
|
||||||
|
VLOG(1) << "connection " << owner()->GetClientInfo()
|
||||||
|
<< " no longer needs to be monitored - removing 0x" << std::hex << (const void*)this;
|
||||||
|
my_monitors.Remove(this);
|
||||||
|
}
|
||||||
|
// Tell other threads that about the change in the number of connection that we monitor
|
||||||
|
shard_set->pool()->Await(
|
||||||
|
[start](auto*) { ServerState::tlocal()->Monitors().NotifyChangeCount(start); });
|
||||||
|
EnableMonitoring(start);
|
||||||
|
}
|
||||||
|
|
||||||
void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgList args) {
|
void ConnectionContext::ChangeSubscription(bool to_add, bool to_reply, CmdArgList args) {
|
||||||
vector<unsigned> result(to_reply ? args.size() : 0, 0);
|
vector<unsigned> result(to_reply ? args.size() : 0, 0);
|
||||||
|
|
||||||
|
|
|
@ -32,7 +32,9 @@ struct ConnectionState {
|
||||||
ExecInfo(ExecInfo&&) = delete;
|
ExecInfo(ExecInfo&&) = delete;
|
||||||
|
|
||||||
// Return true if ExecInfo is active (after MULTI)
|
// Return true if ExecInfo is active (after MULTI)
|
||||||
bool IsActive() { return state != EXEC_INACTIVE; }
|
bool IsActive() {
|
||||||
|
return state != EXEC_INACTIVE;
|
||||||
|
}
|
||||||
|
|
||||||
// Resets to blank state after EXEC or DISCARD
|
// Resets to blank state after EXEC or DISCARD
|
||||||
void Clear();
|
void Clear();
|
||||||
|
@ -117,17 +119,29 @@ class ConnectionContext : public facade::ConnectionContext {
|
||||||
return conn_state.db_index;
|
return conn_state.db_index;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Note that this is accepted by value for lifetime reasons
|
||||||
|
// we want to have our own copy since we are assuming that
|
||||||
|
// 1. there will be not to many connections that we in monitor state
|
||||||
|
// 2. we need to have for each of them each own copy for thread safe reasons
|
||||||
|
void SendMonitorMsg(std::string msg);
|
||||||
|
|
||||||
void ChangeSubscription(bool to_add, bool to_reply, CmdArgList args);
|
void ChangeSubscription(bool to_add, bool to_reply, CmdArgList args);
|
||||||
void ChangePSub(bool to_add, bool to_reply, CmdArgList args);
|
void ChangePSub(bool to_add, bool to_reply, CmdArgList args);
|
||||||
void UnsubscribeAll(bool to_reply);
|
void UnsubscribeAll(bool to_reply);
|
||||||
void PUnsubscribeAll(bool to_reply);
|
void PUnsubscribeAll(bool to_reply);
|
||||||
|
void ChangeMonitor(bool start); // either start or stop monitor on a given connection
|
||||||
|
|
||||||
bool is_replicating = false;
|
bool is_replicating = false;
|
||||||
|
bool monitor = false; // when a monitor command is sent over a given connection, we need to aware
|
||||||
|
// of it as a state for the connection
|
||||||
|
|
||||||
private:
|
private:
|
||||||
|
void EnableMonitoring(bool enable) {
|
||||||
|
force_dispatch = enable; // required to support the monitoring
|
||||||
|
monitor = enable;
|
||||||
|
}
|
||||||
void SendSubscriptionChangedResponse(std::string_view action,
|
void SendSubscriptionChangedResponse(std::string_view action,
|
||||||
std::optional<std::string_view> topic,
|
std::optional<std::string_view> topic, unsigned count);
|
||||||
unsigned count);
|
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace dfly
|
} // namespace dfly
|
||||||
|
|
|
@ -68,6 +68,105 @@ std::optional<VarzFunction> engine_varz;
|
||||||
|
|
||||||
constexpr size_t kMaxThreadSize = 1024;
|
constexpr size_t kMaxThreadSize = 1024;
|
||||||
|
|
||||||
|
void DeactivateMonitoring(ConnectionContext* server_ctx) {
|
||||||
|
if (server_ctx->monitor) {
|
||||||
|
// remove monitor on this connection
|
||||||
|
server_ctx->ChangeMonitor(false /*start*/);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// The format of the message that are sending is
|
||||||
|
// +"time of day" [db-number <lua|unix:path|connection info] "command" "arg1" .. "argM"
|
||||||
|
std::string CreateMonitorTimestamp() {
|
||||||
|
timeval tv;
|
||||||
|
|
||||||
|
gettimeofday(&tv, nullptr);
|
||||||
|
return absl::StrCat(tv.tv_sec, ".", tv.tv_usec, absl::kZeroPad6);
|
||||||
|
}
|
||||||
|
|
||||||
|
auto CmdEntryToMonitorFormat(std::string_view str) -> std::string {
|
||||||
|
// This code is based on Redis impl for it at sdscatrepr@sds.c
|
||||||
|
std::string result = absl::StrCat("\"");
|
||||||
|
|
||||||
|
for (auto c : str) {
|
||||||
|
switch (c) {
|
||||||
|
case '\\':
|
||||||
|
absl::StrAppend(&result, "\\\\");
|
||||||
|
break;
|
||||||
|
case '"':
|
||||||
|
absl::StrAppend(&result, "\\\"");
|
||||||
|
break;
|
||||||
|
case '\n':
|
||||||
|
absl::StrAppend(&result, "\\n");
|
||||||
|
break;
|
||||||
|
case '\r':
|
||||||
|
absl::StrAppend(&result, "\\r");
|
||||||
|
break;
|
||||||
|
case '\t':
|
||||||
|
absl::StrAppend(&result, "\\t");
|
||||||
|
break;
|
||||||
|
case '\a':
|
||||||
|
absl::StrAppend(&result, "\\a");
|
||||||
|
break;
|
||||||
|
case '\b':
|
||||||
|
absl::StrAppend(&result, "\\b");
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
if (isprint(c)) {
|
||||||
|
result += c;
|
||||||
|
} else {
|
||||||
|
absl::StrAppend(&result, "\\x", absl::Hex((unsigned char)c, absl::kZeroPad2));
|
||||||
|
}
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
absl::StrAppend(&result, "\"");
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
std::string MakeMonitorMessage(const ConnectionState& conn_state,
|
||||||
|
const facade::Connection* connection, CmdArgList args) {
|
||||||
|
std::string message = CreateMonitorTimestamp();
|
||||||
|
|
||||||
|
if (conn_state.script_info.has_value()) {
|
||||||
|
absl::StrAppend(&message, "lua] ");
|
||||||
|
} else {
|
||||||
|
absl::StrAppend(&message, connection->RemoteEndpointStr());
|
||||||
|
}
|
||||||
|
if (args.empty()) {
|
||||||
|
absl::StrAppend(&message, "error - empty cmd list!");
|
||||||
|
} else if (auto cmd_name = std::string_view(args[0].data(), args[0].size());
|
||||||
|
cmd_name == "AUTH") { // we cannot just send auth details in this case
|
||||||
|
absl::StrAppend(&message, "\"", cmd_name, "\"");
|
||||||
|
} else {
|
||||||
|
message = std::accumulate(args.begin(), args.end(), message, [](auto str, const auto& cmd) {
|
||||||
|
absl::StrAppend(&str, " ", CmdEntryToMonitorFormat(std::string_view(cmd.data(), cmd.size())));
|
||||||
|
return str;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
return message;
|
||||||
|
}
|
||||||
|
|
||||||
|
void DispatchMonitorIfNeeded(bool admin_cmd, ConnectionContext* connection, CmdArgList args) {
|
||||||
|
// We are not sending any admin command in the monitor, and we do not want to
|
||||||
|
// do any processing if we don't have any waiting connections with monitor
|
||||||
|
// enabled on them - see https://redis.io/commands/monitor/
|
||||||
|
const auto& my_monitors = ServerState::tlocal()->Monitors();
|
||||||
|
if (!(my_monitors.Empty() || admin_cmd)) {
|
||||||
|
// We have connections waiting to get the info on the last command, send it to them
|
||||||
|
auto monitor_msg = MakeMonitorMessage(connection->conn_state, connection->owner(), args);
|
||||||
|
// Note that this is accepted by value for lifetime reasons
|
||||||
|
// we want to have our own copy since we are assuming that
|
||||||
|
// 1. there will be not to many connections that we in monitor state
|
||||||
|
// 2. we need to have for each of them each own copy for thread safe reasons
|
||||||
|
VLOG(1) << "sending command '" << monitor_msg << "' to the clients that registered on it";
|
||||||
|
shard_set->pool()->DispatchBrief(
|
||||||
|
[msg = std::move(monitor_msg)](unsigned idx, util::ProactorBase*) {
|
||||||
|
ServerState::tlocal()->Monitors().Send(msg);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
class InterpreterReplier : public RedisReplyBuilder {
|
class InterpreterReplier : public RedisReplyBuilder {
|
||||||
public:
|
public:
|
||||||
InterpreterReplier(ObjectExplorer* explr) : RedisReplyBuilder(nullptr), explr_(explr) {
|
InterpreterReplier(ObjectExplorer* explr) : RedisReplyBuilder(nullptr), explr_(explr) {
|
||||||
|
@ -376,7 +475,7 @@ void Service::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_i
|
||||||
void Service::Shutdown() {
|
void Service::Shutdown() {
|
||||||
VLOG(1) << "Service::Shutdown";
|
VLOG(1) << "Service::Shutdown";
|
||||||
|
|
||||||
// We mark that we are shuttind down. After this incoming requests will be
|
// We mark that we are shutting down. After this incoming requests will be
|
||||||
// rejected
|
// rejected
|
||||||
pp_.AwaitFiberOnAll([](ProactorBase* pb) { ServerState::tlocal()->Shutdown(); });
|
pp_.AwaitFiberOnAll([](ProactorBase* pb) { ServerState::tlocal()->Shutdown(); });
|
||||||
|
|
||||||
|
@ -442,6 +541,11 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// only reset and quit are allow if this connection is used for monitoring
|
||||||
|
if (dfly_cntx->monitor && (cmd_name != "RESET" && cmd_name != "QUIT")) {
|
||||||
|
return (*cntx)->SendError("Replica can't interact with the keyspace");
|
||||||
|
}
|
||||||
|
|
||||||
bool under_script = dfly_cntx->conn_state.script_info.has_value();
|
bool under_script = dfly_cntx->conn_state.script_info.has_value();
|
||||||
|
|
||||||
if (under_script && (cid->opt_mask() & CO::NOSCRIPT)) {
|
if (under_script && (cid->opt_mask() & CO::NOSCRIPT)) {
|
||||||
|
@ -555,6 +659,8 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
|
||||||
dfly_cntx->reply_builder()->CloseConnection();
|
dfly_cntx->reply_builder()->CloseConnection();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DispatchMonitorIfNeeded(cid->opt_mask() & CO::ADMIN, dfly_cntx, args);
|
||||||
|
|
||||||
end_usec = ProactorBase::GetMonotonicTimeNs();
|
end_usec = ProactorBase::GetMonotonicTimeNs();
|
||||||
|
|
||||||
request_latency_usec.IncBy(cmd_str, (end_usec - start_usec) / 1000);
|
request_latency_usec.IncBy(cmd_str, (end_usec - start_usec) / 1000);
|
||||||
|
@ -726,6 +832,8 @@ void Service::Quit(CmdArgList args, ConnectionContext* cntx) {
|
||||||
|
|
||||||
SinkReplyBuilder* builder = cntx->reply_builder();
|
SinkReplyBuilder* builder = cntx->reply_builder();
|
||||||
builder->CloseConnection();
|
builder->CloseConnection();
|
||||||
|
|
||||||
|
DeactivateMonitoring(static_cast<ConnectionContext*>(cntx));
|
||||||
cntx->owner()->ShutdownSelf();
|
cntx->owner()->ShutdownSelf();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -973,7 +1081,8 @@ bool CheckWatchedKeyExpiry(ConnectionContext* cntx, const CommandRegistry& regis
|
||||||
|
|
||||||
// The comparison can still be true even if a key expired due to another one being created.
|
// The comparison can still be true even if a key expired due to another one being created.
|
||||||
// So we have to check the watched_dirty flag, which is set if a key expired.
|
// So we have to check the watched_dirty flag, which is set if a key expired.
|
||||||
return watch_exist_count.load() == exec_info.watched_existed && !exec_info.watched_dirty.load(memory_order_relaxed);
|
return watch_exist_count.load() == exec_info.watched_existed &&
|
||||||
|
!exec_info.watched_dirty.load(memory_order_relaxed);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check if exec_info watches keys on dbs other than db_indx.
|
// Check if exec_info watches keys on dbs other than db_indx.
|
||||||
|
@ -1180,6 +1289,13 @@ void Service::PubsubPatterns(ConnectionContext* cntx) {
|
||||||
(*cntx)->SendLong(pattern_count);
|
(*cntx)->SendLong(pattern_count);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void Service::Monitor(CmdArgList args, ConnectionContext* cntx) {
|
||||||
|
VLOG(1) << "starting monitor on this connection: " << cntx->owner()->GetClientInfo();
|
||||||
|
// we are registering the current connection for all threads so they will be aware of
|
||||||
|
// this connection, to send to it any command
|
||||||
|
cntx->ChangeMonitor(true /* start */);
|
||||||
|
}
|
||||||
|
|
||||||
void Service::Pubsub(CmdArgList args, ConnectionContext* cntx) {
|
void Service::Pubsub(CmdArgList args, ConnectionContext* cntx) {
|
||||||
if (args.size() < 2) {
|
if (args.size() < 2) {
|
||||||
(*cntx)->SendError(WrongNumArgsError(ArgS(args, 0)));
|
(*cntx)->SendError(WrongNumArgsError(ArgS(args, 0)));
|
||||||
|
@ -1273,6 +1389,8 @@ void Service::OnClose(facade::ConnectionContext* cntx) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
DeactivateMonitoring(server_cntx);
|
||||||
|
|
||||||
server_family_.OnClose(server_cntx);
|
server_family_.OnClose(server_cntx);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1316,6 +1434,7 @@ void Service::RegisterCommands() {
|
||||||
<< CI{"PSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.MFUNC(PSubscribe)
|
<< CI{"PSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.MFUNC(PSubscribe)
|
||||||
<< CI{"PUNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -1, 0, 0, 0}.MFUNC(PUnsubscribe)
|
<< CI{"PUNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -1, 0, 0, 0}.MFUNC(PUnsubscribe)
|
||||||
<< CI{"FUNCTION", CO::NOSCRIPT, 2, 0, 0, 0}.MFUNC(Function)
|
<< CI{"FUNCTION", CO::NOSCRIPT, 2, 0, 0, 0}.MFUNC(Function)
|
||||||
|
<< CI{"MONITOR", CO::ADMIN, 1, 0, 0, 0}.MFUNC(Monitor)
|
||||||
<< CI{"PUBSUB", CO::LOADING | CO::FAST, -1, 0, 0, 0}.MFUNC(Pubsub);
|
<< CI{"PUBSUB", CO::LOADING | CO::FAST, -1, 0, 0, 0}.MFUNC(Pubsub);
|
||||||
|
|
||||||
StreamFamily::Register(®istry_);
|
StreamFamily::Register(®istry_);
|
||||||
|
|
|
@ -102,7 +102,7 @@ class Service : public facade::ServiceInterface {
|
||||||
void PSubscribe(CmdArgList args, ConnectionContext* cntx);
|
void PSubscribe(CmdArgList args, ConnectionContext* cntx);
|
||||||
void PUnsubscribe(CmdArgList args, ConnectionContext* cntx);
|
void PUnsubscribe(CmdArgList args, ConnectionContext* cntx);
|
||||||
void Function(CmdArgList args, ConnectionContext* cntx);
|
void Function(CmdArgList args, ConnectionContext* cntx);
|
||||||
|
void Monitor(CmdArgList args, ConnectionContext* cntx);
|
||||||
void Pubsub(CmdArgList args, ConnectionContext* cntx);
|
void Pubsub(CmdArgList args, ConnectionContext* cntx);
|
||||||
void PubsubChannels(std::string_view pattern, ConnectionContext* cntx);
|
void PubsubChannels(std::string_view pattern, ConnectionContext* cntx);
|
||||||
void PubsubPatterns(ConnectionContext* cntx);
|
void PubsubPatterns(ConnectionContext* cntx);
|
||||||
|
|
51
src/server/server_state.cc
Normal file
51
src/server/server_state.cc
Normal file
|
@ -0,0 +1,51 @@
|
||||||
|
// Copyright 2022, DragonflyDB authors. All rights reserved.
|
||||||
|
// See LICENSE for licensing terms.
|
||||||
|
//
|
||||||
|
|
||||||
|
#include "server/server_state.h"
|
||||||
|
|
||||||
|
#include "base/logging.h"
|
||||||
|
#include "server/conn_context.h"
|
||||||
|
|
||||||
|
namespace dfly {
|
||||||
|
|
||||||
|
void MonitorsRepo::Add(ConnectionContext* connection) {
|
||||||
|
VLOG(1) << "register connection "
|
||||||
|
<< " at address 0x" << std::hex << (const void*)connection << " for thread "
|
||||||
|
<< util::ProactorBase::GetIndex();
|
||||||
|
|
||||||
|
monitors_.push_back(connection);
|
||||||
|
}
|
||||||
|
|
||||||
|
void MonitorsRepo::Send(const std::string& msg) {
|
||||||
|
if (!monitors_.empty()) {
|
||||||
|
VLOG(1) << "thread " << util::ProactorBase::GetIndex() << " sending monitor message '" << msg
|
||||||
|
<< "' for " << monitors_.size();
|
||||||
|
for (auto monitor_conn : monitors_) {
|
||||||
|
monitor_conn->SendMonitorMsg(msg);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void MonitorsRepo::Remove(const ConnectionContext* conn) {
|
||||||
|
auto it = std::find_if(monitors_.begin(), monitors_.end(),
|
||||||
|
[&conn](const auto& val) { return val == conn; });
|
||||||
|
if (it != monitors_.end()) {
|
||||||
|
VLOG(1) << "removing connection 0x" << std::hex << (const void*)conn << " releasing token";
|
||||||
|
monitors_.erase(it);
|
||||||
|
} else {
|
||||||
|
VLOG(1) << "no connection 0x" << std::hex << (const void*)conn
|
||||||
|
<< " found in the registered list here";
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
void MonitorsRepo::NotifyChangeCount(bool added) {
|
||||||
|
if (added) {
|
||||||
|
++global_count_;
|
||||||
|
} else {
|
||||||
|
DCHECK(global_count_ > 0);
|
||||||
|
--global_count_;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
} // end of namespace dfly
|
|
@ -15,10 +15,64 @@ typedef struct mi_heap_s mi_heap_t;
|
||||||
|
|
||||||
namespace dfly {
|
namespace dfly {
|
||||||
|
|
||||||
|
class ConnectionContext;
|
||||||
namespace journal {
|
namespace journal {
|
||||||
class Journal;
|
class Journal;
|
||||||
} // namespace journal
|
} // namespace journal
|
||||||
|
|
||||||
|
// This would be used as a thread local storage of sending
|
||||||
|
// monitor messages.
|
||||||
|
// Each thread will have its own list of all the connections that are
|
||||||
|
// used for monitoring. When a connection is set to monitor it would register
|
||||||
|
// itself to this list on all i/o threads. When a new command is dispatched,
|
||||||
|
// and this list is not empty, it would send in the same thread context as then
|
||||||
|
// thread that registered here the command.
|
||||||
|
// Note about performance: we are assuming that we would not have many connections
|
||||||
|
// that are registered here. This is not pub sub where it must be high performance
|
||||||
|
// and may support many to many with tens or more of connections. It is assumed that
|
||||||
|
// since monitoring is for debugging only, we would have less than 1 in most cases.
|
||||||
|
// Also note that we holding this list on the thread level since this is the context
|
||||||
|
// at which this would run. It also minimized the number of copied for this list.
|
||||||
|
class MonitorsRepo {
|
||||||
|
public:
|
||||||
|
// This function adds a new connection to be monitored. This function only add
|
||||||
|
// new connection that belong to this thread! Must not be called outside of this
|
||||||
|
// thread context
|
||||||
|
void Add(ConnectionContext* info);
|
||||||
|
|
||||||
|
// Note that this is accepted by value for lifetime reasons
|
||||||
|
// we want to have our own copy since we are assuming that
|
||||||
|
// 1. there will be not to many connections that we in monitor state
|
||||||
|
// 2. we need to have for each of them each own copy for thread safe reasons
|
||||||
|
void Send(const std::string& msg);
|
||||||
|
|
||||||
|
// This function remove a connection what was monitored. This function only removes
|
||||||
|
// a connection that belong to this thread! Must not be called outside of this
|
||||||
|
// thread context
|
||||||
|
void Remove(const ConnectionContext* conn);
|
||||||
|
|
||||||
|
// We have for each thread the total number of monitors in the application.
|
||||||
|
// So this call is thread safe since we hold a copy of this for each thread.
|
||||||
|
// If this return true, then we don't need to run the monitor operation at all.
|
||||||
|
bool Empty() const {
|
||||||
|
return global_count_ == 0u;
|
||||||
|
}
|
||||||
|
|
||||||
|
// This function is run on all threads to either increment or decrement the "shared" counter
|
||||||
|
// of the monitors - it must be called as part of removing a monitor (for example
|
||||||
|
// when a connection is closed).
|
||||||
|
void NotifyChangeCount(bool added);
|
||||||
|
|
||||||
|
std::size_t Size() const {
|
||||||
|
return monitors_.size();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
using MonitorVec = std::vector<ConnectionContext*>;
|
||||||
|
MonitorVec monitors_; // save connections belonging to this thread only!
|
||||||
|
unsigned int global_count_ = 0; // by global its means that we count the monitor for all threads
|
||||||
|
};
|
||||||
|
|
||||||
// Present in every server thread. This class differs from EngineShard. The latter manages
|
// Present in every server thread. This class differs from EngineShard. The latter manages
|
||||||
// state around engine shards while the former represents coordinator/connection state.
|
// state around engine shards while the former represents coordinator/connection state.
|
||||||
// There may be threads that handle engine shards but not IO, there may be threads that handle IO
|
// There may be threads that handle engine shards but not IO, there may be threads that handle IO
|
||||||
|
@ -94,6 +148,10 @@ class ServerState { // public struct - to allow initialization.
|
||||||
journal_ = j;
|
journal_ = j;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
constexpr MonitorsRepo& Monitors() {
|
||||||
|
return monitors_;
|
||||||
|
}
|
||||||
|
|
||||||
private:
|
private:
|
||||||
int64_t live_transactions_ = 0;
|
int64_t live_transactions_ = 0;
|
||||||
mi_heap_t* data_heap_;
|
mi_heap_t* data_heap_;
|
||||||
|
@ -105,6 +163,8 @@ class ServerState { // public struct - to allow initialization.
|
||||||
using Counter = util::SlidingCounter<7>;
|
using Counter = util::SlidingCounter<7>;
|
||||||
Counter qps_;
|
Counter qps_;
|
||||||
|
|
||||||
|
MonitorsRepo monitors_;
|
||||||
|
|
||||||
static thread_local ServerState state_;
|
static thread_local ServerState state_;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue