From 5dcb50dbaa19073416a1eb47f29ec7977acdcaae Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 8 Mar 2022 19:11:52 +0200 Subject: [PATCH] INFO: Add command and error stats --- src/facade/dragonfly_connection.cc | 23 ++++++++------ src/facade/error.h | 3 ++ src/facade/facade.cc | 15 ++++++++- src/facade/facade_types.h | 4 +++ src/facade/reply_builder.cc | 6 ++-- src/facade/reply_builder.h | 18 ++++++++--- src/server/main_service.cc | 28 ++++++++++++----- src/server/main_service.h | 5 +++ src/server/server_family.cc | 50 ++++++++++++++++++++++-------- src/server/server_family.h | 3 +- 10 files changed, 115 insertions(+), 40 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 43b05e368..b55b69560 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -20,7 +20,7 @@ #include "util/tls/tls_socket.h" #include "util/uring/uring_socket.h" -DEFINE_bool(tcp_nodelay, true, "Configures dragonfly connections with socket option TCP_NODELAY"); +DEFINE_bool(tcp_nodelay, false, "Configures dragonfly connections with socket option TCP_NODELAY"); using namespace util; using namespace std; @@ -53,6 +53,16 @@ void RespToArgList(const RespVec& src, CmdArgVec* dest) { } } +void FetchBuilderStats(ConnectionStats* stats, SinkReplyBuilder* builder) { + stats->io_write_cnt += builder->io_write_cnt(); + stats->io_write_bytes += builder->io_write_bytes(); + + for (const auto& k_v : builder->err_count()) { + stats->err_count[k_v.first] += k_v.second; + } + builder->reset_io_stats(); +} + // TODO: to implement correct matcher according to HTTP spec // https://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html // One place to find a good implementation would be https://github.com/h2o/picohttpparser @@ -413,15 +423,8 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variantio_write_cnt += builder->io_write_cnt(); - stats->io_write_bytes += builder->io_write_bytes(); - - builder->reset_io_stats(); - }; - do { - fetch_builder_stats(); + FetchBuilderStats(stats, builder); io::MutableBytes append_buf = io_buf_.AppendBuffer(); ::io::Result recv_sz = peer->Recv(append_buf); @@ -470,7 +473,7 @@ auto Connection::IoLoop(util::FiberSocketBase* peer) -> variantGetError(); } while (peer->IsOpen() && !ec); - fetch_builder_stats(); + FetchBuilderStats(stats, builder); if (ec) return ec; diff --git a/src/facade/error.h b/src/facade/error.h index b6ad25b49..ec989a939 100644 --- a/src/facade/error.h +++ b/src/facade/error.h @@ -23,4 +23,7 @@ extern const char kScriptNotFound[]; extern const char kAuthRejected[]; extern const char kExpiryOutOfRange[]; +extern const char kSyntaxErrType[]; +extern const char kScriptErrType[]; + } // namespace dfly diff --git a/src/facade/facade.cc b/src/facade/facade.cc index eb7058f8b..2d39b05db 100644 --- a/src/facade/facade.cc +++ b/src/facade/facade.cc @@ -17,9 +17,11 @@ using namespace std; #define ADD(x) (x) += o.x +constexpr size_t kSizeConnStats = sizeof(ConnectionStats); + ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { // To break this code deliberately if we add/remove a field to this struct. - static_assert(sizeof(ConnectionStats) == 64); + static_assert(kSizeConnStats == 144); ADD(num_conns); ADD(num_replicas); @@ -31,6 +33,14 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { ADD(pipelined_cmd_cnt); ADD(command_cnt); + for (const auto& k_v : o.err_count) { + err_count[k_v.first] += k_v.second; + } + + for (const auto& k_v : o.cmd_count) { + cmd_count[k_v.first] += k_v.second; + } + return *this; } @@ -52,6 +62,9 @@ const char kScriptNotFound[] = "-NOSCRIPT No matching script. Please use EVAL."; const char kAuthRejected[] = "-WRONGPASS invalid username-password pair or user is disabled."; const char kExpiryOutOfRange[] = "expiry is out of range"; +const char kSyntaxErrType[] = "syntax_error"; +const char kScriptErrType[] = "script_error"; + const char* RespExpr::TypeName(Type t) { switch (t) { case STRING: diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h index 88cbc28fd..5f2171db9 100644 --- a/src/facade/facade_types.h +++ b/src/facade/facade_types.h @@ -5,6 +5,7 @@ #pragma once #include +#include namespace facade { @@ -19,6 +20,9 @@ using CmdArgVec = std::vector; struct ConnectionStats { + absl::flat_hash_map err_count; + absl::flat_hash_map cmd_count; + uint32_t num_conns = 0; uint32_t num_replicas = 0; size_t read_buf_capacity = 0; diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc index c070dacf7..b23ff9a9c 100644 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -112,7 +112,7 @@ void MCReplyBuilder::SendMGetResponse(const OptResp* resp, uint32_t count) { SendDirect("END\r\n"); } -void MCReplyBuilder::SendError(string_view str) { +void MCReplyBuilder::SendError(string_view str, std::string_view type) { SendDirect("ERROR\r\n"); } @@ -132,7 +132,9 @@ void MCReplyBuilder::SendNotFound() { RedisReplyBuilder::RedisReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink) { } -void RedisReplyBuilder::SendError(string_view str) { +void RedisReplyBuilder::SendError(string_view str, std::string_view type) { + err_count_[type.empty() ? str : type]++; + if (str[0] == '-') { iovec v[] = {IoVec(str), IoVec(kCRLF)}; Send(v, ABSL_ARRAYSIZE(v)); diff --git a/src/facade/reply_builder.h b/src/facade/reply_builder.h index fca878097..f91a25641 100644 --- a/src/facade/reply_builder.h +++ b/src/facade/reply_builder.h @@ -1,6 +1,8 @@ // Copyright 2022, Roman Gershman. All rights reserved. // See LICENSE for licensing terms. // +#include + #include #include @@ -18,7 +20,7 @@ class ReplyBuilderInterface { virtual void SendStored() = 0; // Common for both MC and Redis. - virtual void SendError(std::string_view str) = 0; + virtual void SendError(std::string_view str, std::string_view type = std::string_view{}) = 0; virtual std::error_code GetError() const = 0; @@ -69,6 +71,11 @@ class SinkReplyBuilder : public ReplyBuilderInterface { void reset_io_stats() { io_write_cnt_ = 0; io_write_bytes_ = 0; + err_count_.clear(); + } + + const absl::flat_hash_map& err_count() const { + return err_count_; } //! Sends a string as is without any formatting. raw should be encoded according to the protocol. @@ -80,8 +87,10 @@ class SinkReplyBuilder : public ReplyBuilderInterface { std::string batch_; ::io::Sink* sink_; std::error_code ec_; + size_t io_write_cnt_ = 0; size_t io_write_bytes_ = 0; + absl::flat_hash_map err_count_; bool should_batch_ = false; }; @@ -90,7 +99,8 @@ class MCReplyBuilder : public SinkReplyBuilder { public: MCReplyBuilder(::io::Sink* stream); - void SendError(std::string_view str) final; + void SendError(std::string_view str, std::string_view type = std::string_view{}) final; + // void SendGetReply(std::string_view key, uint32_t flags, std::string_view value) final; void SendMGetResponse(const OptResp* resp, uint32_t count) final; @@ -110,7 +120,7 @@ class RedisReplyBuilder : public SinkReplyBuilder { SendSimpleString("OK"); } - void SendError(std::string_view str) override; + void SendError(std::string_view str, std::string_view type = std::string_view{}) override; void SendMGetResponse(const OptResp* resp, uint32_t count) override; void SendStored() override; @@ -151,4 +161,4 @@ class ReqSerializer { std::error_code ec_; }; -} // namespace dfly +} // namespace facade diff --git a/src/server/main_service.cc b/src/server/main_service.cc index d92a040ef..e56c6a9d4 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -62,7 +62,7 @@ class InterpreterReplier : public RedisReplyBuilder { InterpreterReplier(ObjectExplorer* explr) : RedisReplyBuilder(nullptr), explr_(explr) { } - void SendError(std::string_view str) override; + void SendError(std::string_view str, std::string_view type = std::string_view{}) override; void SendStored() override; void SendSimpleString(std::string_view str) final; @@ -155,7 +155,7 @@ void InterpreterReplier::PostItem() { } } -void InterpreterReplier::SendError(string_view str) { +void InterpreterReplier::SendError(string_view str, std::string_view type) { DCHECK(array_len_.empty()); explr_->OnError(str); } @@ -268,7 +268,7 @@ bool EvalValidator(CmdArgList args, ConnectionContext* cntx) { } if (unsigned(num_keys) > args.size() - 3) { - (*cntx)->SendError("Number of keys can't be greater than number of args"); + (*cntx)->SendError("Number of keys can't be greater than number of args", kSyntaxErr); return false; } @@ -356,7 +356,12 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) }; if (cid == nullptr) { - return (*cntx)->SendError(absl::StrCat("unknown command `", cmd_str, "`")); + (*cntx)->SendError(absl::StrCat("unknown command `", cmd_str, "`"), "unknown_cmd"); + + lock_guard lk(stats_mu_); + if (unknown_cmds_.size() < 1024) + unknown_cmds_[cmd_str]++; + return; } if (etl.gstate() == GlobalState::LOADING || etl.gstate() == GlobalState::SHUTTING_DOWN) { @@ -391,11 +396,11 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) if ((cid->arity() > 0 && args.size() != size_t(cid->arity())) || (cid->arity() < 0 && args.size() < size_t(-cid->arity()))) { - return (*cntx)->SendError(facade::WrongNumArgsError(cmd_str)); + return (*cntx)->SendError(facade::WrongNumArgsError(cmd_str), kSyntaxErr); } if (cid->key_arg_step() == 2 && (args.size() % 2) == 0) { - return (*cntx)->SendError(facade::WrongNumArgsError(cmd_str)); + return (*cntx)->SendError(facade::WrongNumArgsError(cmd_str), kSyntaxErr); } // Validate more complicated cases with custom validators. @@ -417,6 +422,8 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) std::move(multi_error).Cancel(); + etl.connection_stats.cmd_count[cmd_name]++; + if (dfly_cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE && !is_trans_cmd) { // TODO: protect against aggregating huge transactions. StoredCmd stored_cmd{cid}; @@ -618,6 +625,11 @@ bool Service::IsPassProtected() const { return !FLAGS_requirepass.empty(); } +absl::flat_hash_map Service::UknownCmdMap() const { + lock_guard lk(stats_mu_); + return unknown_cmds_; +} + void Service::Quit(CmdArgList args, ConnectionContext* cntx) { if (cntx->protocol() == facade::Protocol::REDIS) (*cntx)->SendOk(); @@ -664,7 +676,7 @@ void Service::Eval(CmdArgList args, ConnectionContext* cntx) { string result; Interpreter::AddResult add_result = script.AddFunction(body, &result); if (add_result == Interpreter::COMPILE_ERR) { - return (*cntx)->SendError(result); + return (*cntx)->SendError(result, facade::kScriptErrType); } if (add_result == Interpreter::ADD_OK) { @@ -765,7 +777,7 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter, if (result == Interpreter::RUN_ERR) { string resp = absl::StrCat("Error running script (call to ", eval_args.sha, "): ", error); - return (*cntx)->SendError(resp); + return (*cntx)->SendError(resp, facade::kScriptErrType); } CHECK(result == Interpreter::RUN_OK); diff --git a/src/server/main_service.h b/src/server/main_service.h index 8aa032c0b..1e512f55b 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -65,6 +65,8 @@ class Service : public facade::ServiceInterface { bool IsPassProtected() const; + absl::flat_hash_map UknownCmdMap() const; + private: static void Quit(CmdArgList args, ConnectionContext* cntx); static void Multi(CmdArgList args, ConnectionContext* cntx); @@ -90,6 +92,9 @@ class Service : public facade::ServiceInterface { EngineShardSet shard_set_; ServerFamily server_family_; CommandRegistry registry_; + + mutable ::boost::fibers::mutex stats_mu_; + absl::flat_hash_map unknown_cmds_; }; } // namespace dfly diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 001593544..74ae69461 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -45,8 +45,8 @@ using namespace std; using namespace util; namespace fibers = ::boost::fibers; namespace fs = std::filesystem; -using strings::HumanReadableNumBytes; using facade::MCReplyBuilder; +using strings::HumanReadableNumBytes; namespace { @@ -72,11 +72,10 @@ error_code CreateDirs(fs::path dir_path) { } // namespace -ServerFamily::ServerFamily(Service* engine) - : engine_(*engine), pp_(engine->proactor_pool()), ess_(engine->shard_set()) { +ServerFamily::ServerFamily(Service* service) : service_(*service), ess_(service->shard_set()) { start_time_ = time(NULL); last_save_.store(start_time_, memory_order_release); - script_mgr_.reset(new ScriptMgr(&engine->shard_set())); + script_mgr_.reset(new ScriptMgr(&service->shard_set())); } ServerFamily::~ServerFamily() { @@ -90,7 +89,7 @@ void ServerFamily::Init(util::AcceptServer* acceptor) { void ServerFamily::Shutdown() { VLOG(1) << "ServerFamily::Shutdown"; - pp_.GetNextProactor()->Await([this] { + service_.proactor_pool().GetNextProactor()->Await([this] { unique_lock lk(replica_of_mu_); if (replica_) { replica_->Stop(); @@ -256,7 +255,8 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) { return; } - pp_.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::SAVING); }); + auto& pool = service_.proactor_pool(); + pool.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::SAVING); }); unique_ptr<::io::WriteFile> wf(*res); auto start = absl::Now(); @@ -280,7 +280,7 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) { return; } - pp_.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::IDLE); }); + pool.Await([](auto*) { ServerState::tlocal()->set_gstate(GlobalState::IDLE); }); CHECK_EQ(GlobalState::SAVING, global_state_.Clear()); absl::Duration dur = absl::Now() - start; @@ -326,7 +326,7 @@ Metrics ServerFamily::GetMetrics() const { } }; - pp_.AwaitFiberOnAll(std::move(cb)); + service_.proactor_pool().AwaitFiberOnAll(std::move(cb)); result.qps /= 6; return result; @@ -354,8 +354,8 @@ arch_bits:64 multiplexing_api:iouring tcp_port:)"; - auto should_enter = [&](string_view name) { - bool res = section.empty() || section == name; + auto should_enter = [&](string_view name, bool hidden = false) { + bool res = (!hidden && section.empty()) || section == "ALL" || section == name; if (res && !info.empty()) info.push_back('\n'); @@ -404,6 +404,7 @@ tcp_port:)"; // known we approximate their allocations by taking 16 bytes per member. absl::StrAppend(&info, "blob_used_memory:", m.db.obj_memory_usage, "\n"); absl::StrAppend(&info, "table_used_memory:", m.db.table_mem_usage, "\n"); + absl::StrAppend(&info, "num_buckets:", m.db.bucket_count, "\n"); absl::StrAppend(&info, "num_entries:", m.db.key_count, "\n"); absl::StrAppend(&info, "inline_keys:", m.db.inline_keys, "\n"); absl::StrAppend(&info, "small_string_bytes:", m.db.small_string_bytes, "\n"); @@ -453,6 +454,26 @@ tcp_port:)"; } } + if (should_enter("COMMANDSTATS", true)) { + absl::StrAppend(&info, "# Commandstats\n"); + auto unknown_cmd = service_.UknownCmdMap(); + + for (const auto& k_v : unknown_cmd) { + absl::StrAppend(&info, "unknown_", k_v.first, ":", k_v.second, "\n"); + } + + for (const auto& k_v : m.conn_stats.cmd_count) { + absl::StrAppend(&info, "cmd_", k_v.first, ":", k_v.second, "\n"); + } + } + + if (should_enter("ERRORSTATS", true)) { + absl::StrAppend(&info, "# Errorstats\n"); + for (const auto& k_v : m.conn_stats.err_count) { + absl::StrAppend(&info, k_v.first, ":", k_v.second, "\n"); + } + } + if (should_enter("KEYSPACE")) { absl::StrAppend(&info, "# Keyspace\n"); absl::StrAppend(&info, "db0:keys=xxx,expires=yyy,avg_ttl=zzz\n"); // TODO @@ -464,6 +485,7 @@ tcp_port:)"; void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) { std::string_view host = ArgS(args, 1); std::string_view port_s = ArgS(args, 2); + auto& pool = service_.proactor_pool(); if (absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port_s, "one")) { // use this lock as critical section to prevent concurrent replicaof commands running. @@ -474,7 +496,8 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) { auto repl_ptr = replica_; CHECK(repl_ptr); - pp_.AwaitFiberOnAll([&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = true; }); + pool.AwaitFiberOnAll( + [&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = true; }); replica_->Stop(); replica_.reset(); } @@ -489,7 +512,7 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) { return; } - auto new_replica = make_shared(string(host), port, &engine_); + auto new_replica = make_shared(string(host), port, &service_); unique_lock lk(replica_of_mu_); if (replica_) { @@ -513,8 +536,9 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) { if (!replica_->Run(cntx)) { replica_.reset(); } + bool is_master = !replica_; - pp_.AwaitFiberOnAll( + pool.AwaitFiberOnAll( [&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = is_master; }); } diff --git a/src/server/server_family.h b/src/server/server_family.h index 4b8d4a9fd..293dbbb5a 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -77,8 +77,7 @@ class ServerFamily { void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx); - Service& engine_; - util::ProactorPool& pp_; + Service& service_; EngineShardSet& ess_; util::AcceptServer* acceptor_ = nullptr;