chore: add "send" state to client list (#2357)

chore: add send state to client list

This can be done only via a differrent fiber by inspecting SinkReplyBuilder state.
Also, get rid of awkward reply builder statistics and use instead thread local stats for that.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-01-01 17:13:27 +02:00 committed by GitHub
parent 069625f23f
commit 4c2d37f3c6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
11 changed files with 176 additions and 161 deletions

View file

@ -1,4 +1,4 @@
add_library(dfly_facade dragonfly_listener.cc dragonfly_connection.cc facade.cc
add_library(dfly_facade conn_context.cc dragonfly_listener.cc dragonfly_connection.cc facade.cc
memcache_parser.cc redis_parser.cc reply_builder.cc op_status.cc
reply_capture.cc resp_expr.cc cmd_arg_parser.cc tls_error.cc)

View file

@ -0,0 +1,44 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "facade/conn_context.h"
#include "facade/dragonfly_connection.h"
namespace facade {
ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : owner_(owner) {
if (owner) {
protocol_ = owner->protocol();
}
if (stream) {
switch (protocol_) {
case Protocol::REDIS:
rbuilder_.reset(new RedisReplyBuilder(stream));
break;
case Protocol::MEMCACHE:
rbuilder_.reset(new MCReplyBuilder(stream));
break;
}
}
conn_closing = false;
req_auth = false;
replica_conn = false;
authenticated = false;
async_dispatch = false;
sync_dispatch = false;
journal_emulated = false;
paused = false;
blocked = false;
subscriptions = 0;
}
size_t ConnectionContext::UsedMemory() const {
return dfly::HeapSize(rbuilder_) + dfly::HeapSize(authed_username) + dfly::HeapSize(acl_commands);
}
} // namespace facade

View file

@ -87,10 +87,7 @@ class ConnectionContext {
rbuilder_->SendProtocolError(str);
}
virtual size_t UsedMemory() const {
return dfly::HeapSize(rbuilder_) + dfly::HeapSize(authed_username) +
dfly::HeapSize(acl_commands);
}
virtual size_t UsedMemory() const;
// connection state / properties.
bool conn_closing : 1;
@ -101,7 +98,9 @@ class ConnectionContext {
bool sync_dispatch : 1; // whether this connection is amid a sync dispatch
bool journal_emulated : 1; // whether it is used to dispatch journal commands
bool paused : 1; // whether this connection is paused due to CLIENT PAUSE
bool blocked; // whether it's blocked on blocking commands like BLPOP, needs to be addressable
// whether it's blocked on blocking commands like BLPOP, needs to be addressable
bool blocked;
// How many async subscription sources are active: monitor and/or pubsub - at most 2.
uint8_t subscriptions;

View file

@ -82,16 +82,6 @@ void SendProtocolError(RedisParser::Result pres, SinkReplyBuilder* builder) {
}
}
void FetchBuilderStats(ConnectionStats* stats, SinkReplyBuilder* builder) {
stats->io_write_cnt += builder->io_write_cnt();
stats->io_write_bytes += builder->io_write_bytes();
for (const auto& k_v : builder->err_count()) {
stats->err_count_map[k_v.first] += k_v.second;
}
builder->reset_io_stats();
}
// TODO: to implement correct matcher according to HTTP spec
// https://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html
// One place to find a good implementation would be https://github.com/h2o/picohttpparser
@ -114,6 +104,9 @@ constexpr size_t kMinReadSize = 256;
thread_local uint32_t free_req_release_weight = 0;
const char* kPhaseName[Connection::NUM_PHASES] = {"SETUP", "READ", "PROCESS", "SHUTTING_DOWN",
"PRECLOSE"};
} // namespace
thread_local vector<Connection::PipelineMessagePtr> Connection::pipeline_req_pool_;
@ -513,13 +506,15 @@ std::pair<std::string, std::string> Connection::GetClientInfoBeforeAfterTid() co
string after;
absl::StrAppend(&after, " irqmatch=", int(cpu == my_cpu_id));
absl::StrAppend(&after, " age=", now - creation_time_, " idle=", now - last_interaction_);
absl::StrAppend(&after, " phase=", PHASE_NAMES[phase_]);
string_view phase_name = PHASE_NAMES[phase_];
if (cc_) {
string cc_info = service_->GetContextInfo(cc_.get());
if (cc_->reply_builder()->IsSendActive())
phase_name = "send";
absl::StrAppend(&after, " ", cc_info);
}
absl::StrAppend(&after, " phase=", phase_name);
return {std::move(before), std::move(after)};
}
@ -684,14 +679,12 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
}
}
}
FetchBuilderStats(stats_, orig_builder);
}
if (ec && !FiberSocketBase::IsConnClosed(ec)) {
string conn_info = service_->GetContextInfo(cc_.get());
LOG(WARNING) << "Socket error for connection " << conn_info << " " << GetName() << ": " << ec
<< " " << ec.message();
LOG(WARNING) << "Socket error for connection " << conn_info << " " << GetName()
<< " during phase " << kPhaseName[phase_] << " : " << ec << " " << ec.message();
}
}
@ -899,8 +892,6 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
size_t max_iobfuf_len = absl::GetFlag(FLAGS_max_client_iobuf_len);
do {
FetchBuilderStats(stats_, orig_builder);
HandleMigrateRequest();
io::MutableBytes append_buf = io_buf_.AppendBuffer();
@ -975,8 +966,6 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil
ec = orig_builder->GetError();
} while (peer->IsOpen() && !ec);
FetchBuilderStats(stats_, orig_builder);
if (ec)
return ec;

View file

@ -7,27 +7,20 @@
#include "base/logging.h"
#include "facade/command_id.h"
#include "facade/conn_context.h"
#include "facade/dragonfly_connection.h"
#include "facade/error.h"
#include "facade/resp_expr.h"
namespace facade {
using namespace std;
#define ADD(x) (x) += o.x
#define ADD_M(m) \
do { \
for (const auto& k_v : o.m) { \
m[k_v.first] += k_v.second; \
} \
} while (0)
constexpr size_t kSizeConnStats = sizeof(ConnectionStats);
ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
// To break this code deliberately if we add/remove a field to this struct.
static_assert(kSizeConnStats == 144u);
static_assert(kSizeConnStats == 96u);
ADD(read_buf_capacity);
ADD(dispatch_queue_entries);
@ -36,8 +29,6 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
ADD(pipeline_cmd_cache_bytes);
ADD(io_read_cnt);
ADD(io_read_bytes);
ADD(io_write_cnt);
ADD(io_write_bytes);
ADD(command_cnt);
ADD(pipelined_cmd_cnt);
ADD(conn_received_cnt);
@ -45,8 +36,6 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
ADD(num_replicas);
ADD(num_blocked_clients);
ADD_M(err_count_map);
return *this;
}
@ -110,35 +99,6 @@ const char* RespExpr::TypeName(Type t) {
ABSL_UNREACHABLE();
}
ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : owner_(owner) {
if (owner) {
protocol_ = owner->protocol();
}
if (stream) {
switch (protocol_) {
case Protocol::REDIS:
rbuilder_.reset(new RedisReplyBuilder(stream));
break;
case Protocol::MEMCACHE:
rbuilder_.reset(new MCReplyBuilder(stream));
break;
}
}
conn_closing = false;
req_auth = false;
replica_conn = false;
authenticated = false;
async_dispatch = false;
sync_dispatch = false;
journal_emulated = false;
paused = false;
blocked = false;
subscriptions = 0;
}
CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first_key,
int8_t last_key, uint32_t acl_categories)
: name_(name),

View file

@ -37,8 +37,6 @@ struct CmdArgListFormatter {
};
struct ConnectionStats {
absl::flat_hash_map<std::string, uint64_t> err_count_map;
size_t read_buf_capacity = 0; // total capacity of input buffers
size_t dispatch_queue_entries = 0; // total number of dispatch queue entries
size_t dispatch_queue_bytes = 0; // total size of all dispatch queue entries
@ -48,8 +46,6 @@ struct ConnectionStats {
size_t io_read_cnt = 0;
size_t io_read_bytes = 0;
size_t io_write_cnt = 0;
size_t io_write_bytes = 0;
uint64_t command_cnt = 0;
uint64_t pipelined_cmd_cnt = 0;

46
src/facade/reply_builder.cc Executable file → Normal file
View file

@ -40,10 +40,25 @@ const char* NullString(bool resp3) {
return resp3 ? "_\r\n" : "$-1\r\n";
}
static thread_local SinkReplyBuilder::StatsType tl_stats;
static thread_local SinkReplyBuilder::ReplyStats tl_stats;
} // namespace
SinkReplyBuilder::ReplyStats& SinkReplyBuilder::ReplyStats::operator+=(const ReplyStats& o) {
io_write_cnt += o.io_write_cnt;
io_write_bytes += o.io_write_bytes;
for (const auto& k_v : o.err_count) {
err_count[k_v.first] += k_v.second;
}
for (unsigned i = 0; i < kNumTypes; ++i) {
send_stats[i] += o.send_stats[i];
}
return *this;
}
SinkReplyBuilder::MGetResponse::~MGetResponse() {
while (storage_list) {
auto* next = storage_list->next;
@ -53,7 +68,11 @@ SinkReplyBuilder::MGetResponse::~MGetResponse() {
}
SinkReplyBuilder::SinkReplyBuilder(::io::Sink* sink)
: sink_(sink), should_batch_(false), should_aggregate_(false), has_replied_(true) {
: sink_(sink),
should_batch_(false),
should_aggregate_(false),
has_replied_(true),
send_active_(false) {
}
void SinkReplyBuilder::CloseConnection() {
@ -61,18 +80,22 @@ void SinkReplyBuilder::CloseConnection() {
ec_ = std::make_error_code(std::errc::connection_aborted);
}
SinkReplyBuilder::StatsType SinkReplyBuilder::GetThreadLocalStats() {
const SinkReplyBuilder::ReplyStats& SinkReplyBuilder::GetThreadLocalStats() {
return tl_stats;
}
void SinkReplyBuilder::ResetThreadLocalStats() {
tl_stats = {};
}
void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
int64_t before = absl::GetCurrentTimeNanos();
SendStatsType stats_type = SendStatsType::kRegular;
auto cleanup = absl::MakeCleanup([&]() {
int64_t after = absl::GetCurrentTimeNanos();
tl_stats[stats_type].count++;
tl_stats[stats_type].total_duration += (after - before) / 1'000;
tl_stats.send_stats[stats_type].count++;
tl_stats.send_stats[stats_type].total_duration += (after - before) / 1'000;
});
has_replied_ = true;
@ -99,8 +122,9 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
}
error_code ec;
++io_write_cnt_;
io_write_bytes_ += bsize;
send_active_ = true;
tl_stats.io_write_cnt++;
tl_stats.io_write_bytes += bsize;
DVLOG(2) << "Writing " << bsize << " bytes of len " << len;
if (batch_.empty()) {
@ -108,7 +132,7 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
} else {
DVLOG(3) << "Sending batch to stream :" << absl::CHexEscape(batch_);
io_write_bytes_ += batch_.size();
tl_stats.io_write_bytes += batch_.size();
iovec tmp[len + 1];
tmp[0].iov_base = batch_.data();
@ -117,7 +141,7 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
ec = sink_->Write(tmp, len + 1);
batch_.clear();
}
send_active_ = false;
if (ec) {
DVLOG(1) << "Error writing to stream: " << ec.message();
ec_ = ec;
@ -197,7 +221,7 @@ void SinkReplyBuilder::FlushBatch() {
}
size_t SinkReplyBuilder::UsedMemory() const {
return dfly::HeapSize(batch_) + dfly::HeapSize(err_count_);
return dfly::HeapSize(batch_);
}
MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink), noreply_(false) {
@ -300,7 +324,7 @@ void RedisReplyBuilder::SendError(string_view str, string_view err_type) {
err_type = kSyntaxErrType;
}
err_count_[err_type]++;
tl_stats.err_count[err_type]++;
if (str[0] == '-') {
iovec v[] = {IoVec(str), IoVec(kCRLF)};

38
src/facade/reply_builder.h Executable file → Normal file
View file

@ -67,7 +67,7 @@ class SinkReplyBuilder {
SinkReplyBuilder(const SinkReplyBuilder&) = delete;
void operator=(const SinkReplyBuilder&) = delete;
SinkReplyBuilder(::io::Sink* sink);
explicit SinkReplyBuilder(::io::Sink* sink);
virtual ~SinkReplyBuilder() {
}
@ -110,22 +110,8 @@ class SinkReplyBuilder {
return ec_;
}
size_t io_write_cnt() const {
return io_write_cnt_;
}
size_t io_write_bytes() const {
return io_write_bytes_;
}
void reset_io_stats() {
io_write_cnt_ = 0;
io_write_bytes_ = 0;
err_count_.clear();
}
const absl::flat_hash_map<std::string, uint64_t>& err_count() const {
return err_count_;
bool IsSendActive() const {
return send_active_;
}
struct ReplyAggregator {
@ -172,9 +158,18 @@ class SinkReplyBuilder {
}
};
using StatsType = std::array<SendStats, SendStatsType::kNumTypes>;
struct ReplyStats {
SendStats send_stats[SendStatsType::kNumTypes];
static StatsType GetThreadLocalStats();
size_t io_write_cnt = 0;
size_t io_write_bytes = 0;
absl::flat_hash_map<std::string, uint64_t> err_count;
ReplyStats& operator+=(const ReplyStats& other);
};
static const ReplyStats& GetThreadLocalStats();
static void ResetThreadLocalStats();
protected:
void SendRaw(std::string_view str); // Sends raw without any formatting.
@ -189,15 +184,12 @@ class SinkReplyBuilder {
::io::Sink* sink_;
std::error_code ec_;
size_t io_write_cnt_ = 0;
size_t io_write_bytes_ = 0;
absl::flat_hash_map<std::string, uint64_t> err_count_;
bool should_batch_ : 1;
// Similarly to batch mode but is controlled by at operation level.
bool should_aggregate_ : 1;
bool has_replied_ : 1;
bool send_active_ : 1;
};
class MCReplyBuilder : public SinkReplyBuilder {

View file

@ -13,7 +13,6 @@
#include "facade/facade_test.h"
#include "facade/redis_parser.h"
#include "facade/reply_capture.h"
// This will test the reply_builder RESP (Redis).
using namespace testing;
@ -104,6 +103,7 @@ class RedisReplyBuilderTest : public testing::Test {
void SetUp() {
sink_.Clear();
builder_.reset(new RedisReplyBuilder(&sink_));
SinkReplyBuilder::ResetThreadLocalStats();
}
protected:
@ -126,6 +126,20 @@ class RedisReplyBuilderTest : public testing::Test {
return str().size();
}
unsigned GetError(string_view err) const {
const auto& map = SinkReplyBuilder::GetThreadLocalStats().err_count;
auto it = map.find(err);
return it == map.end() ? 0 : it->second;
}
static bool NoErrors() {
return SinkReplyBuilder::GetThreadLocalStats().err_count.empty();
}
static const SinkReplyBuilder::ReplyStats& GetReplyStats() {
return SinkReplyBuilder::GetThreadLocalStats();
}
// Breaks the string we have in sink into tokens.
// In RESP each token is build up from series of bytes follow by "\r\n"
// This function don't try to parse the message, only to break the strings based
@ -228,7 +242,7 @@ TEST_F(RedisReplyBuilderTest, SimpleError) {
// ASSERT_EQ(sink_.str().at(0), kErrorStartChar);
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart));
ASSERT_TRUE(absl::EndsWith(str(), kCRLF));
ASSERT_EQ(builder_->err_count().at(error), 1);
ASSERT_EQ(GetError(error), 1);
ASSERT_EQ(str(), BuildExpectedErrorString(error))
<< " error different from expected - '" << str() << "'";
auto parsing = Parse();
@ -242,7 +256,7 @@ TEST_F(RedisReplyBuilderTest, SimpleError) {
ASSERT_EQ(str(), kOKMessage);
ASSERT_TRUE(absl::EndsWith(str(), kCRLF));
ASSERT_EQ(builder_->err_count().at(error), 1);
ASSERT_EQ(GetError(error), 1);
parsing = Parse();
ASSERT_TRUE(parsing.Verify(SinkSize()));
@ -263,8 +277,7 @@ TEST_F(RedisReplyBuilderTest, ErrorBuiltInMessage) {
builder_->SendError(err);
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart)) << " invalid start char for " << err;
ASSERT_TRUE(absl::EndsWith(str(), kCRLF)) << " failed to find correct termination at " << err;
ASSERT_EQ(builder_->err_count().at(error_type), 1)
<< " number of error count is invalid for " << err;
ASSERT_EQ(GetError(error_type), 1) << " number of error count is invalid for " << err;
ASSERT_EQ(str(), BuildExpectedErrorString(error_name))
<< " error different from expected - '" << str() << "'";
@ -280,7 +293,7 @@ TEST_F(RedisReplyBuilderTest, ErrorReplyBuiltInMessage) {
builder_->SendError(err);
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart));
ASSERT_TRUE(absl::EndsWith(str(), kCRLF));
ASSERT_EQ(builder_->err_count().at(kIndexOutOfRange), 1);
ASSERT_EQ(GetError(kIndexOutOfRange), 1);
ASSERT_EQ(str(), BuildExpectedErrorString(kIndexOutOfRange));
auto parsing_output = Parse();
@ -292,7 +305,7 @@ TEST_F(RedisReplyBuilderTest, ErrorReplyBuiltInMessage) {
builder_->SendError(err);
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart));
ASSERT_TRUE(absl::EndsWith(str(), kCRLF));
ASSERT_EQ(builder_->err_count().at("e2"), 1);
ASSERT_EQ(GetError("e2"), 1);
ASSERT_EQ(str(), BuildExpectedErrorString("e1"));
parsing_output = Parse();
@ -314,7 +327,7 @@ TEST_F(RedisReplyBuilderTest, ErrorNoneBuiltInMessage) {
builder_->SendError(err);
ASSERT_TRUE(absl::StartsWith(str(), kErrorStart)) << " invalid start char for " << err;
ASSERT_TRUE(absl::EndsWith(str(), kCRLF));
auto current_error_count = builder_->err_count().at(error_type);
auto current_error_count = GetError(error_type);
error_count++;
ASSERT_EQ(current_error_count, error_count) << " number of error count is invalid for " << err;
auto parsing_output = Parse();
@ -374,7 +387,7 @@ TEST_F(RedisReplyBuilderTest, StrArray) {
for (auto s : string_vector) {
builder_->SendSimpleString(s);
expected_size += s.size() + kCRLF.size() + 1;
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
}
ASSERT_EQ(SinkSize(), expected_size);
// ASSERT_EQ(kArrayStart, str().at(0));
@ -401,7 +414,7 @@ TEST_F(RedisReplyBuilderTest, SendSimpleStrArr) {
"+++", "---", "$$$", "~~~~", "@@@", "^^^", "1234", "foo"};
const std::size_t kArrayLen = sizeof(kArrayMessage) / sizeof(kArrayMessage[0]);
builder_->SendSimpleStrArr(kArrayMessage);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
// Tokenize the message and verify content
std::vector<std::string_view> message_tokens = TokenizeMessage();
ASSERT_THAT(message_tokens, ElementsAre(absl::StrCat(kArrayStartString, kArrayLen),
@ -426,7 +439,7 @@ TEST_F(RedisReplyBuilderTest, SendStringViewArr) {
// random values
"(((", "}}}", "&&&&", "####", "___", "+++", "0.1234", "bar"};
builder_->SendStringArr(kArrayMessage);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
// verify content
std::vector<std::string_view> message_tokens = TokenizeMessage();
// the form of this is *<array size>\r\n$<string1 size>\r\n<string1>..$<stringN
@ -457,7 +470,7 @@ TEST_F(RedisReplyBuilderTest, SendBulkStringArr) {
// Test this one with large values
std::string(1024, '.'), std::string(2048, ','), std::string(4096, ' ')};
builder_->SendStringArr(kArrayMessage);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
std::vector<std::string_view> message_tokens = TokenizeMessage();
// the form of this is *<array size>\r\n$<string1 size>\r\n<string1>..$<stringN
// size>\r\n<stringN>\r\n
@ -479,7 +492,7 @@ TEST_F(RedisReplyBuilderTest, SendBulkStringArr) {
TEST_F(RedisReplyBuilderTest, NullBulkString) {
// null bulk string == "$-1\r\n" i.e. '$' + -1 + \r + \n
builder_->SendNull();
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
ASSERT_EQ(str(), kNullBulkString);
auto parsing_output = Parse();
ASSERT_TRUE(parsing_output.Verify(SinkSize()));
@ -491,7 +504,7 @@ TEST_F(RedisReplyBuilderTest, EmptyBulkString) {
// empty bulk string is in the form of "$0\r\n\r\n", i.e. length 0 after $ follow by \r\n*2
const std::string_view kEmptyBulkString = "$0\r\n\r\n";
builder_->SendBulkString(std::string_view{});
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
ASSERT_EQ(str(), kEmptyBulkString);
auto parsing_output = Parse();
ASSERT_TRUE(parsing_output.Verify(SinkSize()));
@ -505,7 +518,7 @@ TEST_F(RedisReplyBuilderTest, NoAsciiBulkString) {
std::size_t data_size = sizeof(random_bytes) / sizeof(random_bytes[0]);
std::string_view none_ascii_payload{random_bytes, data_size};
builder_->SendBulkString(none_ascii_payload);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
const std::string expected_payload =
absl::StrCat(kBulkStringStart, data_size, kCRLF, none_ascii_payload, kCRLF);
ASSERT_EQ(str(), expected_payload);
@ -522,7 +535,7 @@ TEST_F(RedisReplyBuilderTest, BulkStringWithCRLF) {
// Verify bulk string that contains the \r\n as payload
std::string_view crlf_chars{"\r\n"};
builder_->SendBulkString(crlf_chars);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
// the expected message in this case is $2\r\n\r\n\r\n
std::string expected_message =
absl::StrCat(kBulkStringStart, crlf_chars.size(), kCRLF, crlf_chars, kCRLF);
@ -538,7 +551,7 @@ TEST_F(RedisReplyBuilderTest, BulkStringWithStartBulkString) {
std::string expected_message =
absl::StrCat(kBulkStringStart, message.size(), kCRLF, message, kCRLF);
builder_->SendBulkString(message);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
ASSERT_EQ(str(), expected_message);
auto parsing_output = Parse();
@ -562,7 +575,7 @@ TEST_F(RedisReplyBuilderTest, BulkStringWithErrorString) {
std::string expected_message =
absl::StrCat(kBulkStringStart, message.size(), kCRLF, message, kCRLF);
builder_->SendBulkString(message);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
ASSERT_EQ(str(), expected_message);
auto parsing_output = Parse();
ASSERT_TRUE(parsing_output.IsString());
@ -594,7 +607,7 @@ TEST_F(RedisReplyBuilderTest, Double) {
const std::string expected_payload =
absl::StrCat(kBulkStringStart, kPayloadStr.size(), kCRLF, kPayloadStr, kCRLF);
builder_->SendDouble(double_value);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
std::vector<std::string_view> message_tokens = TokenizeMessage();
ASSERT_EQ(str(), expected_payload);
ASSERT_THAT(message_tokens,
@ -638,7 +651,7 @@ TEST_F(RedisReplyBuilderTest, MixedTypeArray) {
builder_->SendDouble(double_value);
const std::string_view output_msg = str();
ASSERT_FALSE(output_msg.empty());
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
std::vector<std::string_view> message_tokens = TokenizeMessage();
ASSERT_THAT(
message_tokens,
@ -673,8 +686,8 @@ TEST_F(RedisReplyBuilderTest, BatchMode) {
for (const auto& val : kInputArray) {
builder_->SendBulkString(val);
ASSERT_EQ(SinkSize(), 0) << " sink is not empty at iteration number " << count;
ASSERT_EQ(builder_->io_write_bytes(), 0);
ASSERT_EQ(builder_->io_write_cnt(), 0);
ASSERT_EQ(GetReplyStats().io_write_bytes, 0);
ASSERT_EQ(GetReplyStats().io_write_cnt, 0);
total_bytes += val.size();
++count;
}
@ -682,11 +695,11 @@ TEST_F(RedisReplyBuilderTest, BatchMode) {
// write something
builder_->SetBatchMode(false);
builder_->SendBulkString(std::string_view{});
ASSERT_EQ(builder_->io_write_cnt(), 1);
ASSERT_EQ(GetReplyStats().io_write_cnt, 1);
// We expecting to have more than the total bytes we count,
// since we are not counting the \r\n and the type char as well
// as length entries
ASSERT_GT(builder_->io_write_bytes(), total_bytes);
ASSERT_GT(GetReplyStats().io_write_bytes, total_bytes);
std::vector<std::string_view> array_members = TokenizeMessage();
ASSERT_THAT(array_members,
ElementsAre(absl::StrCat(kArrayStartString, kInputArray.size()),
@ -702,14 +715,14 @@ TEST_F(RedisReplyBuilderTest, BatchMode) {
TEST_F(RedisReplyBuilderTest, Resp3Double) {
builder_->SetResp3(true);
builder_->SendDouble(5.5);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
ASSERT_EQ(str(), ",5.5\r\n");
}
TEST_F(RedisReplyBuilderTest, Resp3NullString) {
builder_->SetResp3(true);
builder_->SendNull();
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
ASSERT_EQ(TakePayload(), "_\r\n");
}
@ -718,13 +731,13 @@ TEST_F(RedisReplyBuilderTest, SendStringArrayAsMap) {
builder_->SetResp3(false);
builder_->SendStringArr(map_array, builder_->MAP);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
ASSERT_EQ(TakePayload(), "*4\r\n$2\r\nk1\r\n$2\r\nv1\r\n$2\r\nk2\r\n$2\r\nv2\r\n")
<< "SendStringArrayAsMap Resp2 Failed.";
builder_->SetResp3(true);
builder_->SendStringArr(map_array, builder_->MAP);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
ASSERT_EQ(TakePayload(), "%2\r\n$2\r\nk1\r\n$2\r\nv1\r\n$2\r\nk2\r\n$2\r\nv2\r\n")
<< "SendStringArrayAsMap Resp3 Failed.";
}
@ -734,13 +747,13 @@ TEST_F(RedisReplyBuilderTest, SendStringArrayAsSet) {
builder_->SetResp3(false);
builder_->SendStringArr(set_array, builder_->SET);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
ASSERT_EQ(TakePayload(), "*3\r\n$2\r\ne1\r\n$2\r\ne2\r\n$2\r\ne3\r\n")
<< "SendStringArrayAsSet Resp2 Failed.";
builder_->SetResp3(true);
builder_->SendStringArr(set_array, builder_->SET);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
ASSERT_EQ(TakePayload(), "~3\r\n$2\r\ne1\r\n$2\r\ne2\r\n$2\r\ne3\r\n")
<< "SendStringArrayAsSet Resp3 Failed.";
}
@ -751,26 +764,26 @@ TEST_F(RedisReplyBuilderTest, SendScoredArray) {
builder_->SetResp3(false);
builder_->SendScoredArray(scored_array, false);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
ASSERT_EQ(TakePayload(), "*3\r\n$2\r\ne1\r\n$2\r\ne2\r\n$2\r\ne3\r\n")
<< "Resp2 WITHOUT scores failed.";
builder_->SetResp3(true);
builder_->SendScoredArray(scored_array, false);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
ASSERT_EQ(TakePayload(), "*3\r\n$2\r\ne1\r\n$2\r\ne2\r\n$2\r\ne3\r\n")
<< "Resp3 WITHOUT scores failed.";
builder_->SetResp3(false);
builder_->SendScoredArray(scored_array, true);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
ASSERT_EQ(TakePayload(),
"*6\r\n$2\r\ne1\r\n$3\r\n1.1\r\n$2\r\ne2\r\n$3\r\n2.2\r\n$2\r\ne3\r\n$3\r\n3.3\r\n")
<< "Resp3 WITHSCORES failed.";
builder_->SetResp3(true);
builder_->SendScoredArray(scored_array, true);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
ASSERT_EQ(TakePayload(),
"*3\r\n*2\r\n$2\r\ne1\r\n,1.1\r\n*2\r\n$2\r\ne2\r\n,2.2\r\n*2\r\n$2\r\ne3\r\n,3.3\r\n")
<< "Resp3 WITHSCORES failed.";
@ -781,14 +794,14 @@ TEST_F(RedisReplyBuilderTest, SendMGetResponse) {
builder_->SetResp3(false);
builder_->SendMGetResponse(std::move(resp));
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
ASSERT_EQ(TakePayload(), "*3\r\n$2\r\nv1\r\n$-1\r\n$2\r\nv3\r\n")
<< "Resp2 SendMGetResponse failed.";
resp = MakeMGetResponse({"v1", nullopt, "v3"});
builder_->SetResp3(true);
builder_->SendMGetResponse(std::move(resp));
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
ASSERT_EQ(TakePayload(), "*3\r\n$2\r\nv1\r\n_\r\n$2\r\nv3\r\n")
<< "Resp3 SendMGetResponse failed.";
}
@ -919,17 +932,17 @@ TEST_F(RedisReplyBuilderTest, VerbatimString) {
builder_->SetResp3(true);
builder_->SendVerbatimString(str, RedisReplyBuilder::VerbatimFormat::TXT);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
ASSERT_EQ(TakePayload(), "=20\r\ntxt:A simple string!\r\n") << "Resp3 VerbatimString TXT failed.";
builder_->SetResp3(true);
builder_->SendVerbatimString(str, RedisReplyBuilder::VerbatimFormat::MARKDOWN);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
ASSERT_EQ(TakePayload(), "=20\r\nmkd:A simple string!\r\n") << "Resp3 VerbatimString TXT failed.";
builder_->SetResp3(false);
builder_->SendVerbatimString(str);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_TRUE(NoErrors());
ASSERT_EQ(TakePayload(), "$16\r\nA simple string!\r\n") << "Resp3 VerbatimString TXT failed.";
}

View file

@ -827,7 +827,7 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) {
// Net metrics
AppendMetricWithoutLabels("net_input_bytes_total", "", conn_stats.io_read_bytes,
MetricType::COUNTER, &resp->body());
AppendMetricWithoutLabels("net_output_bytes_total", "", conn_stats.io_write_bytes,
AppendMetricWithoutLabels("net_output_bytes_total", "", m.reply_stats.io_write_bytes,
MetricType::COUNTER, &resp->body());
{
string send_latency_metrics;
@ -841,7 +841,7 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) {
&send_count_metrics);
for (unsigned i = 0; i < SendStatsType::kNumTypes; ++i) {
auto& stats = m.reply_stats[i];
auto& stats = m.reply_stats.send_stats[i];
string_view type;
switch (SendStatsType(i)) {
case SendStatsType::kRegular:
@ -1049,7 +1049,7 @@ void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext*
ADD_LINE(total_connections, -1);
ADD_LINE(rejected_connections, -1);
ADD_LINE(bytes_read, m.conn_stats.io_read_bytes);
ADD_LINE(bytes_written, m.conn_stats.io_write_bytes);
ADD_LINE(bytes_written, m.reply_stats.io_write_bytes);
ADD_LINE(limit_maxbytes, -1);
absl::StrAppend(&info, "END\r\n");
@ -1450,7 +1450,7 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
registry->ResetCallStats(index);
auto& sstate = *ServerState::tlocal();
auto& stats = sstate.connection_stats;
stats.err_count_map.clear();
SinkReplyBuilder::ResetThreadLocalStats();
stats.command_cnt = 0;
stats.pipelined_cmd_cnt = 0;
});
@ -1550,10 +1550,7 @@ Metrics ServerFamily::GetMetrics() const {
result.uptime = time(NULL) - this->start_time_;
result.qps += uint64_t(ss->MovingSum6());
result.conn_stats += ss->connection_stats;
for (size_t i = 0; i < reply_stats.size(); ++i) {
result.reply_stats[i] += reply_stats[i];
}
result.reply_stats += reply_stats;
if (shard) {
result.heap_used_bytes += shard->UsedMemory();
@ -1724,7 +1721,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("instantaneous_ops_per_sec", m.qps);
append("total_pipelined_commands", m.conn_stats.pipelined_cmd_cnt);
append("total_net_input_bytes", m.conn_stats.io_read_bytes);
append("total_net_output_bytes", m.conn_stats.io_write_bytes);
append("total_net_output_bytes", m.reply_stats.io_write_bytes);
append("instantaneous_input_kbps", -1);
append("instantaneous_output_kbps", -1);
append("rejected_connections", -1);
@ -1742,14 +1739,15 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("keyspace_misses", m.events.misses);
append("keyspace_mutations", m.events.mutations);
append("total_reads_processed", m.conn_stats.io_read_cnt);
append("total_writes_processed", m.conn_stats.io_write_cnt);
append("total_writes_processed", m.reply_stats.io_write_cnt);
append("defrag_attempt_total", m.shard_stats.defrag_attempt_total);
append("defrag_realloc_total", m.shard_stats.defrag_realloc_total);
append("defrag_task_invocation_total", m.shard_stats.defrag_task_invocation_total);
append("reply_count", m.reply_stats[SendStatsType::kRegular].count);
append("reply_latency_usec", m.reply_stats[SendStatsType::kRegular].total_duration);
append("reply_batch_count", m.reply_stats[SendStatsType::kBatch].count);
append("reply_batch_latency_usec", m.reply_stats[SendStatsType::kBatch].total_duration);
append("reply_count", m.reply_stats.send_stats[SendStatsType::kRegular].count);
append("reply_latency_usec", m.reply_stats.send_stats[SendStatsType::kRegular].total_duration);
append("reply_batch_count", m.reply_stats.send_stats[SendStatsType::kBatch].count);
append("reply_batch_latency_usec",
m.reply_stats.send_stats[SendStatsType::kBatch].total_duration);
}
if (should_enter("TIERED", true)) {
@ -1882,7 +1880,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
}
if (should_enter("ERRORSTATS", true)) {
for (const auto& k_v : m.conn_stats.err_count_map) {
for (const auto& k_v : m.reply_stats.err_count) {
append(k_v.first, k_v.second);
}
}

View file

@ -78,7 +78,7 @@ struct Metrics {
SearchStats search_stats;
ServerState::Stats coordinator_stats; // stats on transaction running
facade::SinkReplyBuilder::StatsType reply_stats{}; // Stats for Send*() ops
facade::SinkReplyBuilder::ReplyStats reply_stats; // Stats for Send*() ops
PeakStats peak_stats;