mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
feat: Implement slowlog (#1956)
Implement slowlog Signed-off-by: Uku Loskit <ukuloskit@gmail.com>
This commit is contained in:
parent
32a0baa62c
commit
6a75c6ddc5
13 changed files with 510 additions and 23 deletions
|
@ -157,7 +157,7 @@ class Connection : public util::Connection {
|
|||
|
||||
std::string GetClientInfo(unsigned thread_id) const;
|
||||
std::string GetClientInfo() const;
|
||||
std::string RemoteEndpointStr() const;
|
||||
virtual std::string RemoteEndpointStr() const;
|
||||
std::string RemoteEndpointAddress() const;
|
||||
std::string LocalBindAddress() const;
|
||||
|
||||
|
|
|
@ -19,7 +19,7 @@ add_library(dfly_transaction db_slice.cc malloc_stats.cc blocking_controller.cc
|
|||
common.cc journal/journal.cc journal/types.cc journal/journal_slice.cc
|
||||
server_state.cc table.cc top_keys.cc transaction.cc
|
||||
serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc
|
||||
${TX_LINUX_SRCS} acl/acl_log.cc
|
||||
${TX_LINUX_SRCS} acl/acl_log.cc slowlog.cc
|
||||
)
|
||||
cxx_link(dfly_transaction dfly_core strings_lib TRDP::fast_float)
|
||||
|
||||
|
@ -64,6 +64,7 @@ cxx_test(multi_test dfly_test_lib LABELS DFLY)
|
|||
cxx_test(generic_family_test dfly_test_lib LABELS DFLY)
|
||||
cxx_test(hset_family_test dfly_test_lib LABELS DFLY)
|
||||
cxx_test(list_family_test dfly_test_lib LABELS DFLY)
|
||||
cxx_test(server_family_test dfly_test_lib LABELS DFLY)
|
||||
cxx_test(set_family_test dfly_test_lib LABELS DFLY)
|
||||
cxx_test(stream_family_test dfly_test_lib LABELS DFLY)
|
||||
cxx_test(string_family_test dfly_test_lib LABELS DFLY)
|
||||
|
|
|
@ -11,6 +11,7 @@
|
|||
#include "base/bits.h"
|
||||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "facade/dragonfly_connection.h"
|
||||
#include "facade/error.h"
|
||||
#include "server/acl/acl_commands_def.h"
|
||||
#include "server/conn_context.h"
|
||||
|
@ -54,12 +55,22 @@ bool CommandId::IsTransactional() const {
|
|||
}
|
||||
|
||||
void CommandId::Invoke(CmdArgList args, ConnectionContext* cntx) const {
|
||||
uint64_t before = absl::GetCurrentTimeNanos();
|
||||
handler_(std::move(args), cntx);
|
||||
uint64_t after = absl::GetCurrentTimeNanos();
|
||||
auto& ent = command_stats_[ServerState::tlocal()->thread_index()];
|
||||
ServerState* ss = ServerState::tlocal();
|
||||
int64_t before = absl::GetCurrentTimeNanos();
|
||||
handler_(args, cntx);
|
||||
int64_t after = absl::GetCurrentTimeNanos();
|
||||
int64_t execution_time_micro_s = (after - before) / 1000;
|
||||
const auto* conn = cntx->conn();
|
||||
auto& ent = command_stats_[ss->thread_index()];
|
||||
// TODO: we should probably discard more commands here,
|
||||
// not just the blocking ones
|
||||
if (!(opt_mask_ & CO::BLOCKING) && ss->log_slower_than_usec >= 0 &&
|
||||
execution_time_micro_s > ss->log_slower_than_usec && conn != nullptr) {
|
||||
ss->GetSlowLog().Add(name(), args, conn->GetName(), conn->RemoteEndpointStr(),
|
||||
execution_time_micro_s, after);
|
||||
}
|
||||
++ent.first;
|
||||
ent.second += (after - before) / 1000;
|
||||
ent.second += execution_time_micro_s;
|
||||
}
|
||||
|
||||
optional<facade::ErrorReply> CommandId::Validate(CmdArgList tail_args) const {
|
||||
|
|
|
@ -19,6 +19,7 @@
|
|||
#include <optional>
|
||||
|
||||
#include "facade/error.h"
|
||||
#include "slowlog.h"
|
||||
|
||||
extern "C" {
|
||||
#include "redis/redis_aux.h"
|
||||
|
@ -90,6 +91,10 @@ ABSL_FLAG(ReplicaOfFlag, replicaof, ReplicaOfFlag{},
|
|||
"Specifies a host and port which point to a target master "
|
||||
"to replicate. "
|
||||
"Format should be <IPv4>:<PORT> or host:<PORT> or [<IPv6>]:<PORT>");
|
||||
ABSL_FLAG(int32_t, slowlog_log_slower_than, 10000,
|
||||
"Add commands slower than this threshold to slow log. The value is expressed in "
|
||||
"microseconds and if it's negative - disables the slowlog.");
|
||||
ABSL_FLAG(uint32_t, slowlog_max_len, 20, "Slow log maximum length.");
|
||||
|
||||
ABSL_FLAG(string, s3_endpoint, "", "endpoint for s3 snapshots, default uses aws regional endpoint");
|
||||
// Disable EC2 metadata by default, or if a users credentials are invalid the
|
||||
|
@ -155,6 +160,81 @@ std::string AbslUnparseFlag(const ReplicaOfFlag& flag) {
|
|||
return (flag.has_value()) ? absl::StrCat(flag.host, ":", flag.port) : "";
|
||||
}
|
||||
|
||||
void SlowLogGet(dfly::CmdArgList args, dfly::ConnectionContext* cntx, dfly::Service& service,
|
||||
std::string_view sub_cmd) {
|
||||
size_t requested_slow_log_length = UINT32_MAX;
|
||||
size_t argc = args.size();
|
||||
if (argc >= 3) {
|
||||
(*cntx)->SendError(facade::UnknownSubCmd(sub_cmd, "SLOWLOG"), facade::kSyntaxErrType);
|
||||
return;
|
||||
} else if (argc == 2) {
|
||||
string_view length = facade::ArgS(args, 1);
|
||||
int64_t num;
|
||||
if ((!absl::SimpleAtoi(length, &num)) || (num < -1)) {
|
||||
(*cntx)->SendError("count should be greater than or equal to -1");
|
||||
return;
|
||||
}
|
||||
if (num >= 0) {
|
||||
requested_slow_log_length = num;
|
||||
}
|
||||
}
|
||||
|
||||
// gather all the individual slowlogs from all the fibers and sort them by their timestamp
|
||||
std::vector<boost::circular_buffer<dfly::SlowLogEntry>> entries(service.proactor_pool().size());
|
||||
service.proactor_pool().AwaitFiberOnAll([&](auto index, auto* context) {
|
||||
auto shard_entries = dfly::ServerState::tlocal()->GetSlowLog().Entries();
|
||||
entries[index] = shard_entries;
|
||||
});
|
||||
std::vector<std::pair<dfly::SlowLogEntry, unsigned>> merged_slow_log;
|
||||
for (size_t i = 0; i < entries.size(); ++i) {
|
||||
for (const auto& log_item : entries[i]) {
|
||||
merged_slow_log.emplace_back(log_item, i);
|
||||
}
|
||||
}
|
||||
std::sort(merged_slow_log.begin(), merged_slow_log.end(), [](const auto& e1, const auto& e2) {
|
||||
return e1.first.unix_timestamp > e2.first.unix_timestamp;
|
||||
});
|
||||
|
||||
requested_slow_log_length = std::min(merged_slow_log.size(), requested_slow_log_length);
|
||||
|
||||
(*cntx)->StartArray(requested_slow_log_length);
|
||||
for (size_t i = 0; i < requested_slow_log_length; ++i) {
|
||||
const auto& entry = merged_slow_log[i].first;
|
||||
const auto& args = entry.cmd_args;
|
||||
|
||||
(*cntx)->StartArray(6);
|
||||
|
||||
(*cntx)->SendLong(entry.entry_id * service.proactor_pool().size() + merged_slow_log[i].second);
|
||||
(*cntx)->SendLong(entry.unix_timestamp / 1000000000);
|
||||
(*cntx)->SendLong(entry.execution_time_micro);
|
||||
|
||||
// if we truncated the args, there is one pseudo-element containing the number of truncated
|
||||
// args that we must add, so the result length is increased by 1
|
||||
size_t len = args.size() + int(args.size() < entry.original_length);
|
||||
|
||||
(*cntx)->StartArray(len);
|
||||
|
||||
for (const auto& arg : args) {
|
||||
if (arg.second > 0) {
|
||||
auto suffix = absl::StrCat("... (", arg.second, " more bytes)");
|
||||
auto cmd_arg = arg.first.substr(0, dfly::kMaximumSlowlogArgLength - suffix.length());
|
||||
(*cntx)->SendBulkString(absl::StrCat(cmd_arg, suffix));
|
||||
} else {
|
||||
(*cntx)->SendBulkString(arg.first);
|
||||
}
|
||||
}
|
||||
// if we truncated arguments - add a special string to indicate that.
|
||||
if (args.size() < entry.original_length) {
|
||||
(*cntx)->SendBulkString(
|
||||
absl::StrCat("... (", entry.original_length - args.size(), " more arguments)"));
|
||||
}
|
||||
|
||||
(*cntx)->SendBulkString(entry.client_ip);
|
||||
(*cntx)->SendBulkString(entry.client_name);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
namespace dfly {
|
||||
|
||||
namespace fs = std::filesystem;
|
||||
|
@ -214,21 +294,6 @@ bool IsValidSaveScheduleNibble(string_view time, unsigned int max) {
|
|||
return min_match <= max;
|
||||
}
|
||||
|
||||
void SlowLog(CmdArgList args, ConnectionContext* cntx) {
|
||||
ToUpper(&args[0]);
|
||||
string_view sub_cmd = ArgS(args, 0);
|
||||
|
||||
if (sub_cmd == "LEN") {
|
||||
return (*cntx)->SendLong(0);
|
||||
}
|
||||
|
||||
if (sub_cmd == "GET") {
|
||||
return (*cntx)->SendEmptyArray();
|
||||
}
|
||||
|
||||
(*cntx)->SendError(UnknownSubCmd(sub_cmd, "SLOWLOG"), kSyntaxErrType);
|
||||
}
|
||||
|
||||
// Check that if TLS is used at least one form of client authentication is
|
||||
// enabled. That means either using a password or giving a root
|
||||
// certificate for authenticating client certificates which will
|
||||
|
@ -396,6 +461,16 @@ void SetMaxClients(std::vector<facade::Listener*>& listeners, uint32_t maxclient
|
|||
}
|
||||
}
|
||||
|
||||
void SetSlowLogMaxLen(util::ProactorPool& pool, uint32_t val) {
|
||||
pool.AwaitFiberOnAll(
|
||||
[&val](auto index, auto* context) { ServerState::tlocal()->GetSlowLog().ChangeLength(val); });
|
||||
}
|
||||
|
||||
void SetSlowLogThreshold(util::ProactorPool& pool, int32_t val) {
|
||||
pool.AwaitFiberOnAll(
|
||||
[&val](auto index, auto* context) { ServerState::tlocal()->log_slower_than_usec = val; });
|
||||
}
|
||||
|
||||
void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*> listeners) {
|
||||
CHECK(acceptor_ == nullptr);
|
||||
acceptor_ = acceptor;
|
||||
|
@ -410,6 +485,22 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
|
|||
return res.has_value();
|
||||
});
|
||||
|
||||
SetSlowLogThreshold(service_.proactor_pool(), absl::GetFlag(FLAGS_slowlog_log_slower_than));
|
||||
config_registry.RegisterMutable("slowlog_log_slower_than",
|
||||
[this](const absl::CommandLineFlag& flag) {
|
||||
auto res = flag.TryGet<int32_t>();
|
||||
if (res.has_value())
|
||||
SetSlowLogThreshold(service_.proactor_pool(), res.value());
|
||||
return res.has_value();
|
||||
});
|
||||
SetSlowLogMaxLen(service_.proactor_pool(), absl::GetFlag(FLAGS_slowlog_max_len));
|
||||
config_registry.RegisterMutable("slowlog_max_len", [this](const absl::CommandLineFlag& flag) {
|
||||
auto res = flag.TryGet<uint32_t>();
|
||||
if (res.has_value())
|
||||
SetSlowLogMaxLen(service_.proactor_pool(), res.value());
|
||||
return res.has_value();
|
||||
});
|
||||
|
||||
pb_task_ = shard_set->pool()->GetNextProactor();
|
||||
if (pb_task_->GetKind() == ProactorBase::EPOLL) {
|
||||
fq_threadpool_.reset(new FiberQueueThreadPool(absl::GetFlag(FLAGS_epoll_file_threads)));
|
||||
|
@ -1994,6 +2085,50 @@ void ServerFamily::Dfly(CmdArgList args, ConnectionContext* cntx) {
|
|||
dfly_cmd_->Run(args, cntx);
|
||||
}
|
||||
|
||||
void ServerFamily::SlowLog(CmdArgList args, ConnectionContext* cntx) {
|
||||
ToUpper(&args[0]);
|
||||
string_view sub_cmd = ArgS(args, 0);
|
||||
|
||||
if (sub_cmd == "HELP") {
|
||||
string_view help[] = {
|
||||
"SLOWLOG <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",
|
||||
"GET [<count>]",
|
||||
" Return top <count> entries from the slowlog (default: 10, -1 mean all).",
|
||||
" Entries are made of:",
|
||||
" id, timestamp, time in microseconds, arguments array, client IP and port,",
|
||||
" client name",
|
||||
"LEN",
|
||||
" Return the length of the slowlog.",
|
||||
"RESET",
|
||||
" Reset the slowlog.",
|
||||
"HELP",
|
||||
" Prints this help.",
|
||||
};
|
||||
(*cntx)->SendSimpleStrArr(help);
|
||||
return;
|
||||
}
|
||||
|
||||
if (sub_cmd == "LEN") {
|
||||
vector<int> lengths(service_.proactor_pool().size());
|
||||
service_.proactor_pool().AwaitFiberOnAll([&lengths](auto index, auto* context) {
|
||||
lengths[index] = ServerState::tlocal()->GetSlowLog().Length();
|
||||
});
|
||||
int sum = std::accumulate(lengths.begin(), lengths.end(), 0);
|
||||
return (*cntx)->SendLong(sum);
|
||||
}
|
||||
|
||||
if (sub_cmd == "RESET") {
|
||||
service_.proactor_pool().AwaitFiberOnAll(
|
||||
[](auto index, auto* context) { ServerState::tlocal()->GetSlowLog().Reset(); });
|
||||
return (*cntx)->SendOk();
|
||||
}
|
||||
|
||||
if (sub_cmd == "GET") {
|
||||
return SlowLogGet(args, cntx, service_, sub_cmd);
|
||||
}
|
||||
(*cntx)->SendError(UnknownSubCmd(sub_cmd, "SLOWLOG"), kSyntaxErrType);
|
||||
}
|
||||
|
||||
#define HFUNC(x) SetHandler(HandlerFunc(this, &ServerFamily::x))
|
||||
|
||||
namespace acl {
|
||||
|
@ -2051,7 +2186,7 @@ void ServerFamily::Register(CommandRegistry* registry) {
|
|||
ReplTakeOver)
|
||||
<< CI{"REPLCONF", CO::ADMIN | CO::LOADING, -1, 0, 0, 0, acl::kReplConf}.HFUNC(ReplConf)
|
||||
<< CI{"ROLE", CO::LOADING | CO::FAST | CO::NOSCRIPT, 1, 0, 0, 0, acl::kRole}.HFUNC(Role)
|
||||
<< CI{"SLOWLOG", CO::ADMIN | CO::FAST, -2, 0, 0, 0, acl::kSlowLog}.SetHandler(SlowLog)
|
||||
<< CI{"SLOWLOG", CO::ADMIN | CO::FAST, -2, 0, 0, 0, acl::kSlowLog}.HFUNC(SlowLog)
|
||||
<< CI{"SCRIPT", CO::NOSCRIPT | CO::NO_KEY_TRANSACTIONAL, -2, 0, 0, 0, acl::kScript}.HFUNC(
|
||||
Script)
|
||||
<< CI{"DFLY", CO::ADMIN | CO::GLOBAL_TRANS | CO::HIDDEN, -2, 0, 0, 0, acl::kDfly}.HFUNC(Dfly);
|
||||
|
|
|
@ -15,6 +15,9 @@
|
|||
#include "server/replica.h"
|
||||
#include "server/server_state.h"
|
||||
|
||||
void SlowLogGet(dfly::CmdArgList args, dfly::ConnectionContext* cntx, dfly::Service& service,
|
||||
std::string_view sub_cmd);
|
||||
|
||||
namespace util {
|
||||
|
||||
class AcceptServer;
|
||||
|
@ -212,6 +215,7 @@ class ServerFamily {
|
|||
void Save(CmdArgList args, ConnectionContext* cntx);
|
||||
void Script(CmdArgList args, ConnectionContext* cntx);
|
||||
void Sync(CmdArgList args, ConnectionContext* cntx);
|
||||
void SlowLog(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx);
|
||||
|
||||
|
|
184
src/server/server_family_test.cc
Normal file
184
src/server/server_family_test.cc
Normal file
|
@ -0,0 +1,184 @@
|
|||
// Copyright 2022, DragonflyDB authors. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
|
||||
#include "server/server_family.h"
|
||||
|
||||
#include "base/gtest.h"
|
||||
#include "base/logging.h"
|
||||
#include "facade/facade_test.h"
|
||||
#include "server/test_utils.h"
|
||||
|
||||
using namespace testing;
|
||||
using namespace std;
|
||||
using namespace util;
|
||||
using namespace boost;
|
||||
|
||||
namespace dfly {
|
||||
|
||||
class ServerFamilyTest : public BaseFamilyTest {
|
||||
protected:
|
||||
};
|
||||
|
||||
TEST_F(ServerFamilyTest, SlowLogArgsCountTruncation) {
|
||||
auto resp = Run({"config", "set", "slowlog_max_len", "3"});
|
||||
EXPECT_THAT(resp.GetString(), "OK");
|
||||
resp = Run({"config", "set", "slowlog_log_slower_than", "0"});
|
||||
EXPECT_THAT(resp.GetString(), "OK");
|
||||
// allow exactly 32 arguments (lpush <key> + 30 arguments)
|
||||
// no truncation should happen
|
||||
std::vector<std::string> cmd_args = {"LPUSH", "mykey"};
|
||||
for (int i = 1; i <= 30; ++i) {
|
||||
cmd_args.push_back(std::to_string(i));
|
||||
}
|
||||
absl::Span<std::string> span(cmd_args);
|
||||
resp = Run(span);
|
||||
EXPECT_THAT(resp.GetInt(), 30);
|
||||
resp = Run({"slowlog", "get"});
|
||||
auto slowlog = resp.GetVec();
|
||||
auto commands = slowlog[0].GetVec()[3].GetVec();
|
||||
EXPECT_THAT(commands, ElementsAreArray(cmd_args));
|
||||
|
||||
// now add one more argument, and truncation SHOULD happen
|
||||
std::vector<std::string> cmd_args_one_too_many = {"LPUSH", "mykey"};
|
||||
for (int i = 1; i <= 31; ++i) {
|
||||
cmd_args_one_too_many.push_back(std::to_string(i));
|
||||
}
|
||||
absl::Span<std::string> span2(cmd_args_one_too_many);
|
||||
resp = Run(span2);
|
||||
EXPECT_THAT(resp.GetInt(), 30 + 31);
|
||||
|
||||
resp = Run({"slowlog", "get"});
|
||||
slowlog = resp.GetVec();
|
||||
auto expected_args = cmd_args;
|
||||
expected_args[31] = "... (2 more arguments)";
|
||||
|
||||
commands = slowlog[0].GetVec()[3].GetVec();
|
||||
EXPECT_THAT(commands, ElementsAreArray(expected_args));
|
||||
}
|
||||
|
||||
TEST_F(ServerFamilyTest, SlowLogArgsLengthTruncation) {
|
||||
auto resp = Run({"config", "set", "slowlog_max_len", "3"});
|
||||
EXPECT_THAT(resp.GetString(), "OK");
|
||||
resp = Run({"config", "set", "slowlog_log_slower_than", "0"});
|
||||
EXPECT_THAT(resp.GetString(), "OK");
|
||||
|
||||
std::string at_limit = std::string(128, 'A');
|
||||
resp = Run({"lpush", "mykey", at_limit});
|
||||
EXPECT_THAT(resp.GetInt(), 1);
|
||||
|
||||
resp = Run({"slowlog", "get"});
|
||||
auto slowlog = resp.GetVec();
|
||||
auto key_value = slowlog[0].GetVec()[3].GetVec()[2].GetString();
|
||||
EXPECT_THAT(key_value, at_limit);
|
||||
|
||||
std::string over_limit_by_one = std::string(129, 'A');
|
||||
std::string expected_value = std::string(110, 'A') + "... (1 more bytes)";
|
||||
resp = Run({"lpush", "mykey2", over_limit_by_one});
|
||||
EXPECT_THAT(resp.GetInt(), 1);
|
||||
resp = Run({"slowlog", "get"});
|
||||
slowlog = resp.GetVec();
|
||||
key_value = slowlog[0].GetVec()[3].GetVec()[2].GetString();
|
||||
EXPECT_THAT(key_value, expected_value);
|
||||
}
|
||||
|
||||
TEST_F(ServerFamilyTest, SlowLogHelp) {
|
||||
auto resp = Run({"slowlog", "help"});
|
||||
|
||||
EXPECT_THAT(
|
||||
resp.GetVec(),
|
||||
ElementsAre(
|
||||
"SLOWLOG <subcommand> [<arg> [value] [opt] ...]. Subcommands are:", "GET [<count>]",
|
||||
" Return top <count> entries from the slowlog (default: 10, -1 mean all).",
|
||||
" Entries are made of:",
|
||||
" id, timestamp, time in microseconds, arguments array, client IP and port,",
|
||||
" client name", "LEN", " Return the length of the slowlog.", "RESET",
|
||||
" Reset the slowlog.", "HELP", " Prints this help."));
|
||||
}
|
||||
|
||||
TEST_F(ServerFamilyTest, SlowLogMaxLengthZero) {
|
||||
auto resp = Run({"config", "set", "slowlog_max_len", "0"});
|
||||
EXPECT_THAT(resp.GetString(), "OK");
|
||||
resp = Run({"config", "set", "slowlog_log_slower_than", "0"});
|
||||
EXPECT_THAT(resp.GetString(), "OK");
|
||||
Run({"slowlog", "reset"});
|
||||
|
||||
// issue an arbitrary command
|
||||
resp = Run({"set", "foo", "bar"});
|
||||
EXPECT_THAT(resp.GetString(), "OK");
|
||||
resp = Run({"slowlog", "get"});
|
||||
|
||||
// slowlog should be empty since max_len is 0
|
||||
EXPECT_THAT(resp.GetVec().size(), 0);
|
||||
}
|
||||
|
||||
TEST_F(ServerFamilyTest, SlowLogGetZero) {
|
||||
auto resp = Run({"config", "set", "slowlog_max_len", "3"});
|
||||
EXPECT_THAT(resp.GetString(), "OK");
|
||||
resp = Run({"config", "set", "slowlog_log_slower_than", "0"});
|
||||
EXPECT_THAT(resp.GetString(), "OK");
|
||||
|
||||
resp = Run({"lpush", "mykey", "1"});
|
||||
EXPECT_THAT(resp.GetInt(), 1);
|
||||
resp = Run({"slowlog", "get", "0"});
|
||||
EXPECT_THAT(resp.GetVec().size(), 0);
|
||||
}
|
||||
|
||||
TEST_F(ServerFamilyTest, SlowLogGetMinusOne) {
|
||||
auto resp = Run({"config", "set", "slowlog_max_len", "3"});
|
||||
EXPECT_THAT(resp.GetString(), "OK");
|
||||
resp = Run({"config", "set", "slowlog_log_slower_than", "0"});
|
||||
EXPECT_THAT(resp.GetString(), "OK");
|
||||
|
||||
for (int i = 1; i < 4; ++i) {
|
||||
resp = Run({"lpush", "mykey", std::to_string(i)});
|
||||
EXPECT_THAT(resp.GetInt(), i);
|
||||
}
|
||||
|
||||
// -1 should return the whole slowlog
|
||||
resp = Run({"slowlog", "get", "-1"});
|
||||
EXPECT_THAT(resp.GetVec().size(), 3);
|
||||
}
|
||||
|
||||
TEST_F(ServerFamilyTest, SlowLogGetLessThanMinusOne) {
|
||||
auto resp = Run({"slowlog", "get", "-2"});
|
||||
EXPECT_THAT(resp.GetString(), "ERR count should be greater than or equal to -1");
|
||||
}
|
||||
|
||||
TEST_F(ServerFamilyTest, SlowLogLen) {
|
||||
auto resp = Run({"config", "set", "slowlog_max_len", "3"});
|
||||
EXPECT_THAT(resp.GetString(), "OK");
|
||||
resp = Run({"config", "set", "slowlog_log_slower_than", "0"});
|
||||
EXPECT_THAT(resp.GetString(), "OK");
|
||||
Run({"slowlog", "reset"});
|
||||
|
||||
for (int i = 1; i < 4; ++i) {
|
||||
resp = Run({"lpush", "mykey", std::to_string(i)});
|
||||
EXPECT_THAT(resp.GetInt(), i);
|
||||
}
|
||||
|
||||
resp = Run({"slowlog", "len"});
|
||||
EXPECT_THAT(resp.GetInt(), 3);
|
||||
}
|
||||
|
||||
TEST_F(ServerFamilyTest, SlowLogMinusOneDisabled) {
|
||||
auto resp = Run({"config", "set", "slowlog_max_len", "3"});
|
||||
EXPECT_THAT(resp.GetString(), "OK");
|
||||
resp = Run({"config", "set", "slowlog_log_slower_than", "-1"});
|
||||
EXPECT_THAT(resp.GetString(), "OK");
|
||||
Run({"slowlog", "reset"});
|
||||
|
||||
// issue some commands
|
||||
for (int i = 1; i < 4; ++i) {
|
||||
resp = Run({"lpush", "mykey", std::to_string(i)});
|
||||
EXPECT_THAT(resp.GetInt(), i);
|
||||
}
|
||||
|
||||
// slowlog is still empty
|
||||
resp = Run({"slowlog", "get"});
|
||||
EXPECT_THAT(resp.GetVec().size(), 0);
|
||||
resp = Run({"slowlog", "len"});
|
||||
EXPECT_THAT(resp.GetInt(), 0);
|
||||
}
|
||||
|
||||
} // namespace dfly
|
|
@ -13,6 +13,7 @@
|
|||
#include "server/acl/user_registry.h"
|
||||
#include "server/common.h"
|
||||
#include "server/script_mgr.h"
|
||||
#include "server/slowlog.h"
|
||||
#include "util/sliding_counter.h"
|
||||
|
||||
typedef struct mi_heap_s mi_heap_t;
|
||||
|
@ -211,6 +212,7 @@ class ServerState { // public struct - to allow initialization.
|
|||
|
||||
bool is_master = true;
|
||||
std::string remote_client_id_; // for cluster support
|
||||
int32_t log_slower_than_usec = 0;
|
||||
|
||||
facade::ConnectionStats connection_stats;
|
||||
|
||||
|
@ -218,8 +220,13 @@ class ServerState { // public struct - to allow initialization.
|
|||
|
||||
acl::AclLog acl_log;
|
||||
|
||||
SlowLogShard& GetSlowLog() {
|
||||
return slow_log_shard_;
|
||||
};
|
||||
|
||||
private:
|
||||
int64_t live_transactions_ = 0;
|
||||
SlowLogShard slow_log_shard_;
|
||||
mi_heap_t* data_heap_;
|
||||
journal::Journal* journal_ = nullptr;
|
||||
|
||||
|
|
56
src/server/slowlog.cc
Normal file
56
src/server/slowlog.cc
Normal file
|
@ -0,0 +1,56 @@
|
|||
// Copyright 2022, DragonflyDB authors. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
|
||||
#include "server/slowlog.h"
|
||||
|
||||
namespace dfly {
|
||||
|
||||
using namespace std;
|
||||
|
||||
size_t SlowLogShard::Length() const {
|
||||
return log_entries_.size();
|
||||
}
|
||||
|
||||
void SlowLogShard::ChangeLength(const size_t new_length) {
|
||||
log_entries_.set_capacity(new_length);
|
||||
}
|
||||
|
||||
void SlowLogShard::Reset() {
|
||||
log_entries_.clear();
|
||||
}
|
||||
|
||||
void SlowLogShard::Add(const string_view command_name, CmdArgList args,
|
||||
const string_view client_name, const string_view client_ip,
|
||||
uint64_t execution_time_usec, uint64_t unix_ts) {
|
||||
vector<pair<string, uint32_t>> slowlog_args;
|
||||
size_t slowlog_effective_length = args.size();
|
||||
if (args.size() > kMaximumSlowlogArgCount) {
|
||||
// we store one argument fewer because the last argument is "wasted"
|
||||
// for telling how many further arguments there are
|
||||
slowlog_effective_length = kMaximumSlowlogArgCount - 1;
|
||||
}
|
||||
slowlog_args.reserve(slowlog_effective_length);
|
||||
slowlog_args.emplace_back(command_name, 0);
|
||||
|
||||
for (size_t i = 0; i < slowlog_effective_length; ++i) {
|
||||
string_view arg = facade::ArgS(args, i);
|
||||
size_t extra_bytes = 0;
|
||||
// If any of the arguments is deemed too long, it will be truncated
|
||||
// and the truncated string will be suffixed by the number of truncated bytes in
|
||||
// this format: "... (n more bytes)"
|
||||
size_t extra_bytes_suffix_length = 0;
|
||||
if (arg.size() > kMaximumSlowlogArgLength) {
|
||||
extra_bytes = arg.size() - kMaximumSlowlogArgLength;
|
||||
}
|
||||
slowlog_args.emplace_back(arg.substr(0, kMaximumSlowlogArgLength - extra_bytes_suffix_length),
|
||||
extra_bytes);
|
||||
}
|
||||
|
||||
log_entries_.push_back(SlowLogEntry{slowlog_entry_id_++, unix_ts, execution_time_usec,
|
||||
/* +1 for the command */ args.size() + 1,
|
||||
std::move(slowlog_args), string(client_ip),
|
||||
string(client_name)});
|
||||
}
|
||||
|
||||
} // namespace dfly
|
50
src/server/slowlog.h
Normal file
50
src/server/slowlog.h
Normal file
|
@ -0,0 +1,50 @@
|
|||
// Copyright 2022, DragonflyDB authors. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
|
||||
#pragma once
|
||||
|
||||
#include <boost/circular_buffer.hpp>
|
||||
#include <string>
|
||||
#include <vector>
|
||||
|
||||
#include "base/integral_types.h"
|
||||
#include "server/common.h"
|
||||
namespace dfly {
|
||||
|
||||
constexpr size_t kMaximumSlowlogArgCount = 31; // 32 - 1 for the command name
|
||||
constexpr size_t kMaximumSlowlogArgLength = 128;
|
||||
|
||||
struct SlowLogEntry {
|
||||
uint32_t entry_id;
|
||||
uint64_t unix_timestamp;
|
||||
uint64_t execution_time_micro;
|
||||
size_t original_length;
|
||||
// a vector of pairs of argument and extra bytes if the argument was truncated
|
||||
std::vector<std::pair<std::string, uint32_t>> cmd_args;
|
||||
std::string client_ip;
|
||||
std::string client_name;
|
||||
};
|
||||
|
||||
class SlowLogShard {
|
||||
public:
|
||||
SlowLogShard(){};
|
||||
|
||||
boost::circular_buffer<SlowLogEntry>& Entries() {
|
||||
return log_entries_;
|
||||
}
|
||||
|
||||
void Add(const std::string_view command_name, CmdArgList args, const std::string_view client_name,
|
||||
const std::string_view client_ip, uint64_t execution_time, uint64_t unix_timestamp);
|
||||
void Reset();
|
||||
void ChangeLength(size_t new_length);
|
||||
size_t Length() const;
|
||||
|
||||
private:
|
||||
uint32_t slowlog_entry_id_ = 0;
|
||||
|
||||
// TODO: to replace with base::RingBuffer because circular_buffer does not seem to support
|
||||
// move semantics.
|
||||
boost::circular_buffer<SlowLogEntry> log_entries_;
|
||||
};
|
||||
} // namespace dfly
|
|
@ -70,6 +70,10 @@ void TestConnection::SendPubMessageAsync(PubMessage pmsg) {
|
|||
messages.push_back(move(pmsg));
|
||||
}
|
||||
|
||||
std::string TestConnection::RemoteEndpointStr() const {
|
||||
return "";
|
||||
}
|
||||
|
||||
void TransactionSuspension::Start() {
|
||||
CommandId cid{"TEST", CO::WRITE | CO::GLOBAL_TRANS, -1, 0, 0, 0, acl::NONE};
|
||||
|
||||
|
|
|
@ -21,6 +21,7 @@ using namespace facade;
|
|||
class TestConnection : public facade::Connection {
|
||||
public:
|
||||
TestConnection(Protocol protocol, io::StringSink* sink);
|
||||
std::string RemoteEndpointStr() const;
|
||||
|
||||
void SendPubMessageAsync(PubMessage pmsg) final;
|
||||
|
||||
|
|
BIN
tests/.DS_Store
vendored
Normal file
BIN
tests/.DS_Store
vendored
Normal file
Binary file not shown.
|
@ -1,5 +1,7 @@
|
|||
import pytest
|
||||
import redis
|
||||
|
||||
from . import dfly_args
|
||||
from .utility import *
|
||||
|
||||
|
||||
|
@ -101,3 +103,35 @@ async def test_scan(async_client: aioredis.Redis):
|
|||
assert cur == 0
|
||||
assert len(keys) == 1
|
||||
assert keys[0] == key
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@dfly_args({"slowlog_log_slower_than": 0, "slowlog_max_len": 3})
|
||||
async def test_slowlog_client_name_and_ip(df_local_factory, async_client: aioredis.Redis):
|
||||
df = df_local_factory.create()
|
||||
df.start()
|
||||
expected_clientname = "dragonfly"
|
||||
|
||||
await async_client.client_setname(expected_clientname)
|
||||
|
||||
client_list = await async_client.client_list()
|
||||
addr = client_list[0]["addr"]
|
||||
|
||||
slowlog = await async_client.slowlog_get(1)
|
||||
assert slowlog[0]["client_name"].decode() == expected_clientname
|
||||
assert slowlog[0]["client_address"].decode() == addr
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@dfly_args({"slowlog_log_slower_than": 0, "slowlog_max_len": 3})
|
||||
async def test_blocking_commands_should_not_show_up_in_slow_log(
|
||||
df_local_factory, async_client: aioredis.Redis
|
||||
):
|
||||
await async_client.slowlog_reset()
|
||||
df = df_local_factory.create()
|
||||
df.start()
|
||||
|
||||
await async_client.blpop("mykey", 0.5)
|
||||
|
||||
# blpop does not show up, only the previous reset
|
||||
assert (await async_client.slowlog_get())[0]["command"].decode() == "SLOWLOG RESET"
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue