chore: refactor slowlog code, no functionality changes (#2331)

Goal: to improve readability of the code.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-12-23 18:40:03 +02:00 committed by GitHub
parent bbe3d9303b
commit 4562fad737
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 103 additions and 97 deletions

View file

@ -16,6 +16,7 @@ endif()
add_library(dfly_transaction db_slice.cc malloc_stats.cc blocking_controller.cc
command_registry.cc
common.cc journal/journal.cc journal/types.cc journal/journal_slice.cc
server_state.cc table.cc top_keys.cc transaction.cc
serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc
@ -27,7 +28,7 @@ if (NOT APPLE)
cxx_test(search/search_family_test dfly_test_lib LABELS DFLY)
endif()
add_library(dragonfly_lib engine_shard_set.cc channel_store.cc command_registry.cc
add_library(dragonfly_lib engine_shard_set.cc channel_store.cc
config_registry.cc conn_context.cc debugcmd.cc dflycmd.cc
generic_family.cc hset_family.cc json_family.cc
${SEARCH_FILES}

View file

@ -16,7 +16,6 @@
#include "facade/dragonfly_connection.h"
#include "facade/error.h"
#include "server/acl/acl_commands_def.h"
#include "server/conn_context.h"
#include "server/server_state.h"
using namespace std;
@ -56,7 +55,7 @@ bool CommandId::IsTransactional() const {
return false;
}
void CommandId::Invoke(CmdArgList args, ConnectionContext* cntx) const {
uint64_t CommandId::Invoke(CmdArgList args, ConnectionContext* cntx) const {
int64_t before = absl::GetCurrentTimeNanos();
handler_(args, cntx);
int64_t after = absl::GetCurrentTimeNanos();
@ -64,17 +63,12 @@ void CommandId::Invoke(CmdArgList args, ConnectionContext* cntx) const {
ServerState* ss = ServerState::tlocal(); // Might have migrated thread, read after invocation
int64_t execution_time_usec = (after - before) / 1000;
const auto* conn = cntx->conn();
auto& ent = command_stats_[ss->thread_index()];
// TODO: we should probably discard more commands here,
// not just the blocking ones
if (!(opt_mask_ & CO::BLOCKING) && conn != nullptr && ss->GetSlowLog().Capacity() > 0 &&
execution_time_usec > ss->log_slower_than_usec) {
ss->GetSlowLog().Add(name(), args, conn->GetName(), conn->RemoteEndpointStr(),
execution_time_usec, after);
}
++ent.first;
ent.second += execution_time_usec;
return execution_time_usec;
}
optional<facade::ErrorReply> CommandId::Validate(CmdArgList tail_args) const {

View file

@ -85,7 +85,8 @@ class CommandId : public facade::CommandId {
using ArgValidator = fu2::function_base<true, true, fu2::capacity_default, false, false,
std::optional<facade::ErrorReply>(CmdArgList) const>;
void Invoke(CmdArgList args, ConnectionContext* cntx) const;
// Returns the invoke time in usec.
uint64_t Invoke(CmdArgList args, ConnectionContext* cntx) const;
// Returns error if validation failed, otherwise nullopt
std::optional<facade::ErrorReply> Validate(CmdArgList tail_args) const;

View file

@ -1225,14 +1225,23 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo
// Verifies that we reply to the client when needed.
ReplyGuard reply_guard(cntx, cid->name());
#endif
uint64_t invoke_time_usec = 0;
try {
cid->Invoke(tail_args, cntx);
invoke_time_usec = cid->Invoke(tail_args, cntx);
} catch (std::exception& e) {
LOG(ERROR) << "Internal error, system probably unstable " << e.what();
return false;
}
// TODO: we should probably discard more commands here,
// not just the blocking ones
const auto* conn = cntx->conn();
if (!(cid->opt_mask() & CO::BLOCKING) && conn != nullptr && etl.GetSlowLog().IsEnabled() &&
invoke_time_usec > etl.log_slower_than_usec) {
etl.GetSlowLog().Add(cid->name(), tail_args, conn->GetName(), conn->RemoteEndpointStr(),
invoke_time_usec, absl::GetCurrentTimeNanos() / 1000);
}
if (cntx->transaction && !cntx->conn_state.exec_info.IsRunning() &&
cntx->conn_state.script_info == nullptr) {
bool is_ooo = cntx->transaction->IsOOO();

View file

@ -203,82 +203,6 @@ std::string AbslUnparseFlag(const CronExprFlag& flag) {
return "";
}
void SlowLogGet(dfly::CmdArgList args, dfly::ConnectionContext* cntx, dfly::Service& service,
std::string_view sub_cmd) {
size_t requested_slow_log_length = UINT32_MAX;
size_t argc = args.size();
if (argc >= 3) {
cntx->SendError(facade::UnknownSubCmd(sub_cmd, "SLOWLOG"), facade::kSyntaxErrType);
return;
} else if (argc == 2) {
string_view length = facade::ArgS(args, 1);
int64_t num;
if ((!absl::SimpleAtoi(length, &num)) || (num < -1)) {
cntx->SendError("count should be greater than or equal to -1");
return;
}
if (num >= 0) {
requested_slow_log_length = num;
}
}
// gather all the individual slowlogs from all the fibers and sort them by their timestamp
std::vector<boost::circular_buffer<dfly::SlowLogEntry>> entries(service.proactor_pool().size());
service.proactor_pool().AwaitFiberOnAll([&](auto index, auto* context) {
auto shard_entries = dfly::ServerState::tlocal()->GetSlowLog().Entries();
entries[index] = shard_entries;
});
std::vector<std::pair<dfly::SlowLogEntry, unsigned>> merged_slow_log;
for (size_t i = 0; i < entries.size(); ++i) {
for (const auto& log_item : entries[i]) {
merged_slow_log.emplace_back(log_item, i);
}
}
std::sort(merged_slow_log.begin(), merged_slow_log.end(), [](const auto& e1, const auto& e2) {
return e1.first.unix_timestamp > e2.first.unix_timestamp;
});
requested_slow_log_length = std::min(merged_slow_log.size(), requested_slow_log_length);
auto* rb = static_cast<facade::RedisReplyBuilder*>(cntx->reply_builder());
rb->StartArray(requested_slow_log_length);
for (size_t i = 0; i < requested_slow_log_length; ++i) {
const auto& entry = merged_slow_log[i].first;
const auto& args = entry.cmd_args;
rb->StartArray(6);
rb->SendLong(entry.entry_id * service.proactor_pool().size() + merged_slow_log[i].second);
rb->SendLong(entry.unix_timestamp / 1000000000);
rb->SendLong(entry.execution_time_micro);
// if we truncated the args, there is one pseudo-element containing the number of truncated
// args that we must add, so the result length is increased by 1
size_t len = args.size() + int(args.size() < entry.original_length);
rb->StartArray(len);
for (const auto& arg : args) {
if (arg.second > 0) {
auto suffix = absl::StrCat("... (", arg.second, " more bytes)");
auto cmd_arg = arg.first.substr(0, dfly::kMaximumSlowlogArgLength - suffix.length());
rb->SendBulkString(absl::StrCat(cmd_arg, suffix));
} else {
rb->SendBulkString(arg.first);
}
}
// if we truncated arguments - add a special string to indicate that.
if (args.size() < entry.original_length) {
rb->SendBulkString(
absl::StrCat("... (", entry.original_length - args.size(), " more arguments)"));
}
rb->SendBulkString(entry.client_ip);
rb->SendBulkString(entry.client_name);
}
return;
}
namespace dfly {
namespace fs = std::filesystem;
@ -370,6 +294,82 @@ std::optional<cron::cronexpr> InferSnapshotCronExpr() {
return std::nullopt;
}
void SlowLogGet(dfly::CmdArgList args, dfly::ConnectionContext* cntx, dfly::Service& service,
std::string_view sub_cmd) {
size_t requested_slow_log_length = UINT32_MAX;
size_t argc = args.size();
if (argc >= 3) {
cntx->SendError(facade::UnknownSubCmd(sub_cmd, "SLOWLOG"), facade::kSyntaxErrType);
return;
} else if (argc == 2) {
string_view length = facade::ArgS(args, 1);
int64_t num;
if ((!absl::SimpleAtoi(length, &num)) || (num < -1)) {
cntx->SendError("count should be greater than or equal to -1");
return;
}
if (num >= 0) {
requested_slow_log_length = num;
}
}
// gather all the individual slowlogs from all the fibers and sort them by their timestamp
std::vector<boost::circular_buffer<SlowLogEntry>> entries(service.proactor_pool().size());
service.proactor_pool().AwaitFiberOnAll([&](auto index, auto* context) {
auto shard_entries = ServerState::tlocal()->GetSlowLog().Entries();
entries[index] = shard_entries;
});
std::vector<std::pair<SlowLogEntry, unsigned>> merged_slow_log;
for (size_t i = 0; i < entries.size(); ++i) {
for (const auto& log_item : entries[i]) {
merged_slow_log.emplace_back(log_item, i);
}
}
std::sort(merged_slow_log.begin(), merged_slow_log.end(), [](const auto& e1, const auto& e2) {
return e1.first.unix_ts_usec > e2.first.unix_ts_usec;
});
requested_slow_log_length = std::min(merged_slow_log.size(), requested_slow_log_length);
auto* rb = static_cast<facade::RedisReplyBuilder*>(cntx->reply_builder());
rb->StartArray(requested_slow_log_length);
for (size_t i = 0; i < requested_slow_log_length; ++i) {
const auto& entry = merged_slow_log[i].first;
const auto& args = entry.cmd_args;
rb->StartArray(6);
rb->SendLong(entry.entry_id * service.proactor_pool().size() + merged_slow_log[i].second);
rb->SendLong(entry.unix_ts_usec / 1000000);
rb->SendLong(entry.exec_time_usec);
// if we truncated the args, there is one pseudo-element containing the number of truncated
// args that we must add, so the result length is increased by 1
size_t len = args.size() + int(args.size() < entry.original_length);
rb->StartArray(len);
for (const auto& arg : args) {
if (arg.second > 0) {
auto suffix = absl::StrCat("... (", arg.second, " more bytes)");
auto cmd_arg = arg.first.substr(0, kMaximumSlowlogArgLength - suffix.length());
rb->SendBulkString(absl::StrCat(cmd_arg, suffix));
} else {
rb->SendBulkString(arg.first);
}
}
// if we truncated arguments - add a special string to indicate that.
if (args.size() < entry.original_length) {
rb->SendBulkString(
absl::StrCat("... (", entry.original_length - args.size(), " more arguments)"));
}
rb->SendBulkString(entry.client_ip);
rb->SendBulkString(entry.client_name);
}
}
} // namespace
ServerFamily::ServerFamily(Service* service) : service_(*service) {

View file

@ -20,7 +20,7 @@ void SlowLogShard::Reset() {
void SlowLogShard::Add(const string_view command_name, CmdArgList args,
const string_view client_name, const string_view client_ip,
uint64_t execution_time_usec, uint64_t unix_ts) {
uint64_t exec_time_usec, uint64_t unix_ts_usec) {
DCHECK_GT(log_entries_.capacity(), 0u);
vector<pair<string, uint32_t>> slowlog_args;
@ -47,7 +47,7 @@ void SlowLogShard::Add(const string_view command_name, CmdArgList args,
extra_bytes);
}
log_entries_.push_back(SlowLogEntry{slowlog_entry_id_++, unix_ts, execution_time_usec,
log_entries_.push_back(SlowLogEntry{slowlog_entry_id_++, unix_ts_usec, exec_time_usec,
/* +1 for the command */ args.size() + 1,
std::move(slowlog_args), string(client_ip),
string(client_name)});

View file

@ -17,8 +17,8 @@ constexpr size_t kMaximumSlowlogArgLength = 128;
struct SlowLogEntry {
uint32_t entry_id;
uint64_t unix_timestamp;
uint64_t execution_time_micro;
uint64_t unix_ts_usec;
uint64_t exec_time_usec;
size_t original_length;
// a vector of pairs of argument and extra bytes if the argument was truncated
std::vector<std::pair<std::string, uint32_t>> cmd_args;
@ -33,15 +33,16 @@ class SlowLogShard {
}
void Add(const std::string_view command_name, CmdArgList args, const std::string_view client_name,
const std::string_view client_ip, uint64_t execution_time, uint64_t unix_timestamp);
const std::string_view client_ip, uint64_t exec_time_usec, uint64_t unix_ts_usec);
void Reset();
void ChangeLength(size_t new_length);
size_t Length() const {
return log_entries_.size();
}
size_t Capacity() const {
return log_entries_.capacity();
size_t IsEnabled() const {
return log_entries_.capacity() > 0;
}
private: