diff --git a/src/facade/CMakeLists.txt b/src/facade/CMakeLists.txt index a0b338c29..408d17884 100644 --- a/src/facade/CMakeLists.txt +++ b/src/facade/CMakeLists.txt @@ -1,4 +1,4 @@ -add_library(dfly_facade dragonfly_listener.cc dragonfly_connection.cc facade.cc +add_library(dfly_facade conn_context.cc dragonfly_listener.cc dragonfly_connection.cc facade.cc memcache_parser.cc redis_parser.cc reply_builder.cc op_status.cc reply_capture.cc resp_expr.cc cmd_arg_parser.cc tls_error.cc) diff --git a/src/facade/conn_context.cc b/src/facade/conn_context.cc new file mode 100644 index 000000000..3b23ca3dd --- /dev/null +++ b/src/facade/conn_context.cc @@ -0,0 +1,44 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "facade/conn_context.h" + +#include "facade/dragonfly_connection.h" + +namespace facade { + +ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : owner_(owner) { + if (owner) { + protocol_ = owner->protocol(); + } + + if (stream) { + switch (protocol_) { + case Protocol::REDIS: + rbuilder_.reset(new RedisReplyBuilder(stream)); + break; + case Protocol::MEMCACHE: + rbuilder_.reset(new MCReplyBuilder(stream)); + break; + } + } + + conn_closing = false; + req_auth = false; + replica_conn = false; + authenticated = false; + async_dispatch = false; + sync_dispatch = false; + journal_emulated = false; + paused = false; + blocked = false; + + subscriptions = 0; +} + +size_t ConnectionContext::UsedMemory() const { + return dfly::HeapSize(rbuilder_) + dfly::HeapSize(authed_username) + dfly::HeapSize(acl_commands); +} + +} // namespace facade diff --git a/src/facade/conn_context.h b/src/facade/conn_context.h index e369dc236..9ba641108 100644 --- a/src/facade/conn_context.h +++ b/src/facade/conn_context.h @@ -87,10 +87,7 @@ class ConnectionContext { rbuilder_->SendProtocolError(str); } - virtual size_t UsedMemory() const { - return dfly::HeapSize(rbuilder_) + dfly::HeapSize(authed_username) + - dfly::HeapSize(acl_commands); - } + virtual size_t UsedMemory() const; // connection state / properties. bool conn_closing : 1; @@ -101,7 +98,9 @@ class ConnectionContext { bool sync_dispatch : 1; // whether this connection is amid a sync dispatch bool journal_emulated : 1; // whether it is used to dispatch journal commands bool paused : 1; // whether this connection is paused due to CLIENT PAUSE - bool blocked; // whether it's blocked on blocking commands like BLPOP, needs to be addressable + + // whether it's blocked on blocking commands like BLPOP, needs to be addressable + bool blocked; // How many async subscription sources are active: monitor and/or pubsub - at most 2. uint8_t subscriptions; diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 8ec10cd1c..1a212af7d 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -82,16 +82,6 @@ void SendProtocolError(RedisParser::Result pres, SinkReplyBuilder* builder) { } } -void FetchBuilderStats(ConnectionStats* stats, SinkReplyBuilder* builder) { - stats->io_write_cnt += builder->io_write_cnt(); - stats->io_write_bytes += builder->io_write_bytes(); - - for (const auto& k_v : builder->err_count()) { - stats->err_count_map[k_v.first] += k_v.second; - } - builder->reset_io_stats(); -} - // TODO: to implement correct matcher according to HTTP spec // https://www.w3.org/Protocols/rfc2616/rfc2616-sec5.html // One place to find a good implementation would be https://github.com/h2o/picohttpparser @@ -114,6 +104,9 @@ constexpr size_t kMinReadSize = 256; thread_local uint32_t free_req_release_weight = 0; +const char* kPhaseName[Connection::NUM_PHASES] = {"SETUP", "READ", "PROCESS", "SHUTTING_DOWN", + "PRECLOSE"}; + } // namespace thread_local vector Connection::pipeline_req_pool_; @@ -513,13 +506,15 @@ std::pair Connection::GetClientInfoBeforeAfterTid() co string after; absl::StrAppend(&after, " irqmatch=", int(cpu == my_cpu_id)); absl::StrAppend(&after, " age=", now - creation_time_, " idle=", now - last_interaction_); - absl::StrAppend(&after, " phase=", PHASE_NAMES[phase_]); + string_view phase_name = PHASE_NAMES[phase_]; if (cc_) { string cc_info = service_->GetContextInfo(cc_.get()); + if (cc_->reply_builder()->IsSendActive()) + phase_name = "send"; absl::StrAppend(&after, " ", cc_info); } - + absl::StrAppend(&after, " phase=", phase_name); return {std::move(before), std::move(after)}; } @@ -684,14 +679,12 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { } } } - - FetchBuilderStats(stats_, orig_builder); } if (ec && !FiberSocketBase::IsConnClosed(ec)) { string conn_info = service_->GetContextInfo(cc_.get()); - LOG(WARNING) << "Socket error for connection " << conn_info << " " << GetName() << ": " << ec - << " " << ec.message(); + LOG(WARNING) << "Socket error for connection " << conn_info << " " << GetName() + << " during phase " << kPhaseName[phase_] << " : " << ec << " " << ec.message(); } } @@ -899,8 +892,6 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil size_t max_iobfuf_len = absl::GetFlag(FLAGS_max_client_iobuf_len); do { - FetchBuilderStats(stats_, orig_builder); - HandleMigrateRequest(); io::MutableBytes append_buf = io_buf_.AppendBuffer(); @@ -975,8 +966,6 @@ auto Connection::IoLoop(util::FiberSocketBase* peer, SinkReplyBuilder* orig_buil ec = orig_builder->GetError(); } while (peer->IsOpen() && !ec); - FetchBuilderStats(stats_, orig_builder); - if (ec) return ec; diff --git a/src/facade/facade.cc b/src/facade/facade.cc index 0999439e8..dafa7185c 100644 --- a/src/facade/facade.cc +++ b/src/facade/facade.cc @@ -7,27 +7,20 @@ #include "base/logging.h" #include "facade/command_id.h" -#include "facade/conn_context.h" -#include "facade/dragonfly_connection.h" #include "facade/error.h" +#include "facade/resp_expr.h" namespace facade { using namespace std; #define ADD(x) (x) += o.x -#define ADD_M(m) \ - do { \ - for (const auto& k_v : o.m) { \ - m[k_v.first] += k_v.second; \ - } \ - } while (0) constexpr size_t kSizeConnStats = sizeof(ConnectionStats); ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { // To break this code deliberately if we add/remove a field to this struct. - static_assert(kSizeConnStats == 144u); + static_assert(kSizeConnStats == 96u); ADD(read_buf_capacity); ADD(dispatch_queue_entries); @@ -36,8 +29,6 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { ADD(pipeline_cmd_cache_bytes); ADD(io_read_cnt); ADD(io_read_bytes); - ADD(io_write_cnt); - ADD(io_write_bytes); ADD(command_cnt); ADD(pipelined_cmd_cnt); ADD(conn_received_cnt); @@ -45,8 +36,6 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { ADD(num_replicas); ADD(num_blocked_clients); - ADD_M(err_count_map); - return *this; } @@ -110,35 +99,6 @@ const char* RespExpr::TypeName(Type t) { ABSL_UNREACHABLE(); } -ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : owner_(owner) { - if (owner) { - protocol_ = owner->protocol(); - } - - if (stream) { - switch (protocol_) { - case Protocol::REDIS: - rbuilder_.reset(new RedisReplyBuilder(stream)); - break; - case Protocol::MEMCACHE: - rbuilder_.reset(new MCReplyBuilder(stream)); - break; - } - } - - conn_closing = false; - req_auth = false; - replica_conn = false; - authenticated = false; - async_dispatch = false; - sync_dispatch = false; - journal_emulated = false; - paused = false; - blocked = false; - - subscriptions = 0; -} - CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first_key, int8_t last_key, uint32_t acl_categories) : name_(name), diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h index 79775a5f8..4151c90d1 100644 --- a/src/facade/facade_types.h +++ b/src/facade/facade_types.h @@ -37,8 +37,6 @@ struct CmdArgListFormatter { }; struct ConnectionStats { - absl::flat_hash_map err_count_map; - size_t read_buf_capacity = 0; // total capacity of input buffers size_t dispatch_queue_entries = 0; // total number of dispatch queue entries size_t dispatch_queue_bytes = 0; // total size of all dispatch queue entries @@ -48,8 +46,6 @@ struct ConnectionStats { size_t io_read_cnt = 0; size_t io_read_bytes = 0; - size_t io_write_cnt = 0; - size_t io_write_bytes = 0; uint64_t command_cnt = 0; uint64_t pipelined_cmd_cnt = 0; diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc old mode 100755 new mode 100644 index 56be517fd..b4c6b0083 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -40,10 +40,25 @@ const char* NullString(bool resp3) { return resp3 ? "_\r\n" : "$-1\r\n"; } -static thread_local SinkReplyBuilder::StatsType tl_stats; +static thread_local SinkReplyBuilder::ReplyStats tl_stats; } // namespace +SinkReplyBuilder::ReplyStats& SinkReplyBuilder::ReplyStats::operator+=(const ReplyStats& o) { + io_write_cnt += o.io_write_cnt; + io_write_bytes += o.io_write_bytes; + + for (const auto& k_v : o.err_count) { + err_count[k_v.first] += k_v.second; + } + + for (unsigned i = 0; i < kNumTypes; ++i) { + send_stats[i] += o.send_stats[i]; + } + + return *this; +} + SinkReplyBuilder::MGetResponse::~MGetResponse() { while (storage_list) { auto* next = storage_list->next; @@ -53,7 +68,11 @@ SinkReplyBuilder::MGetResponse::~MGetResponse() { } SinkReplyBuilder::SinkReplyBuilder(::io::Sink* sink) - : sink_(sink), should_batch_(false), should_aggregate_(false), has_replied_(true) { + : sink_(sink), + should_batch_(false), + should_aggregate_(false), + has_replied_(true), + send_active_(false) { } void SinkReplyBuilder::CloseConnection() { @@ -61,18 +80,22 @@ void SinkReplyBuilder::CloseConnection() { ec_ = std::make_error_code(std::errc::connection_aborted); } -SinkReplyBuilder::StatsType SinkReplyBuilder::GetThreadLocalStats() { +const SinkReplyBuilder::ReplyStats& SinkReplyBuilder::GetThreadLocalStats() { return tl_stats; } +void SinkReplyBuilder::ResetThreadLocalStats() { + tl_stats = {}; +} + void SinkReplyBuilder::Send(const iovec* v, uint32_t len) { int64_t before = absl::GetCurrentTimeNanos(); SendStatsType stats_type = SendStatsType::kRegular; auto cleanup = absl::MakeCleanup([&]() { int64_t after = absl::GetCurrentTimeNanos(); - tl_stats[stats_type].count++; - tl_stats[stats_type].total_duration += (after - before) / 1'000; + tl_stats.send_stats[stats_type].count++; + tl_stats.send_stats[stats_type].total_duration += (after - before) / 1'000; }); has_replied_ = true; @@ -99,8 +122,9 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) { } error_code ec; - ++io_write_cnt_; - io_write_bytes_ += bsize; + send_active_ = true; + tl_stats.io_write_cnt++; + tl_stats.io_write_bytes += bsize; DVLOG(2) << "Writing " << bsize << " bytes of len " << len; if (batch_.empty()) { @@ -108,7 +132,7 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) { } else { DVLOG(3) << "Sending batch to stream :" << absl::CHexEscape(batch_); - io_write_bytes_ += batch_.size(); + tl_stats.io_write_bytes += batch_.size(); iovec tmp[len + 1]; tmp[0].iov_base = batch_.data(); @@ -117,7 +141,7 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) { ec = sink_->Write(tmp, len + 1); batch_.clear(); } - + send_active_ = false; if (ec) { DVLOG(1) << "Error writing to stream: " << ec.message(); ec_ = ec; @@ -197,7 +221,7 @@ void SinkReplyBuilder::FlushBatch() { } size_t SinkReplyBuilder::UsedMemory() const { - return dfly::HeapSize(batch_) + dfly::HeapSize(err_count_); + return dfly::HeapSize(batch_); } MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink), noreply_(false) { @@ -300,7 +324,7 @@ void RedisReplyBuilder::SendError(string_view str, string_view err_type) { err_type = kSyntaxErrType; } - err_count_[err_type]++; + tl_stats.err_count[err_type]++; if (str[0] == '-') { iovec v[] = {IoVec(str), IoVec(kCRLF)}; diff --git a/src/facade/reply_builder.h b/src/facade/reply_builder.h old mode 100755 new mode 100644 index 097b44427..54c09a2bc --- a/src/facade/reply_builder.h +++ b/src/facade/reply_builder.h @@ -67,7 +67,7 @@ class SinkReplyBuilder { SinkReplyBuilder(const SinkReplyBuilder&) = delete; void operator=(const SinkReplyBuilder&) = delete; - SinkReplyBuilder(::io::Sink* sink); + explicit SinkReplyBuilder(::io::Sink* sink); virtual ~SinkReplyBuilder() { } @@ -110,22 +110,8 @@ class SinkReplyBuilder { return ec_; } - size_t io_write_cnt() const { - return io_write_cnt_; - } - - size_t io_write_bytes() const { - return io_write_bytes_; - } - - void reset_io_stats() { - io_write_cnt_ = 0; - io_write_bytes_ = 0; - err_count_.clear(); - } - - const absl::flat_hash_map& err_count() const { - return err_count_; + bool IsSendActive() const { + return send_active_; } struct ReplyAggregator { @@ -172,9 +158,18 @@ class SinkReplyBuilder { } }; - using StatsType = std::array; + struct ReplyStats { + SendStats send_stats[SendStatsType::kNumTypes]; - static StatsType GetThreadLocalStats(); + size_t io_write_cnt = 0; + size_t io_write_bytes = 0; + absl::flat_hash_map err_count; + + ReplyStats& operator+=(const ReplyStats& other); + }; + + static const ReplyStats& GetThreadLocalStats(); + static void ResetThreadLocalStats(); protected: void SendRaw(std::string_view str); // Sends raw without any formatting. @@ -189,15 +184,12 @@ class SinkReplyBuilder { ::io::Sink* sink_; std::error_code ec_; - size_t io_write_cnt_ = 0; - size_t io_write_bytes_ = 0; - absl::flat_hash_map err_count_; - bool should_batch_ : 1; // Similarly to batch mode but is controlled by at operation level. bool should_aggregate_ : 1; bool has_replied_ : 1; + bool send_active_ : 1; }; class MCReplyBuilder : public SinkReplyBuilder { diff --git a/src/facade/reply_builder_test.cc b/src/facade/reply_builder_test.cc index 0f930e2cb..f3b1fa0b9 100644 --- a/src/facade/reply_builder_test.cc +++ b/src/facade/reply_builder_test.cc @@ -13,7 +13,6 @@ #include "facade/facade_test.h" #include "facade/redis_parser.h" #include "facade/reply_capture.h" - // This will test the reply_builder RESP (Redis). using namespace testing; @@ -104,6 +103,7 @@ class RedisReplyBuilderTest : public testing::Test { void SetUp() { sink_.Clear(); builder_.reset(new RedisReplyBuilder(&sink_)); + SinkReplyBuilder::ResetThreadLocalStats(); } protected: @@ -126,6 +126,20 @@ class RedisReplyBuilderTest : public testing::Test { return str().size(); } + unsigned GetError(string_view err) const { + const auto& map = SinkReplyBuilder::GetThreadLocalStats().err_count; + auto it = map.find(err); + return it == map.end() ? 0 : it->second; + } + + static bool NoErrors() { + return SinkReplyBuilder::GetThreadLocalStats().err_count.empty(); + } + + static const SinkReplyBuilder::ReplyStats& GetReplyStats() { + return SinkReplyBuilder::GetThreadLocalStats(); + } + // Breaks the string we have in sink into tokens. // In RESP each token is build up from series of bytes follow by "\r\n" // This function don't try to parse the message, only to break the strings based @@ -228,7 +242,7 @@ TEST_F(RedisReplyBuilderTest, SimpleError) { // ASSERT_EQ(sink_.str().at(0), kErrorStartChar); ASSERT_TRUE(absl::StartsWith(str(), kErrorStart)); ASSERT_TRUE(absl::EndsWith(str(), kCRLF)); - ASSERT_EQ(builder_->err_count().at(error), 1); + ASSERT_EQ(GetError(error), 1); ASSERT_EQ(str(), BuildExpectedErrorString(error)) << " error different from expected - '" << str() << "'"; auto parsing = Parse(); @@ -242,7 +256,7 @@ TEST_F(RedisReplyBuilderTest, SimpleError) { ASSERT_EQ(str(), kOKMessage); ASSERT_TRUE(absl::EndsWith(str(), kCRLF)); - ASSERT_EQ(builder_->err_count().at(error), 1); + ASSERT_EQ(GetError(error), 1); parsing = Parse(); ASSERT_TRUE(parsing.Verify(SinkSize())); @@ -263,8 +277,7 @@ TEST_F(RedisReplyBuilderTest, ErrorBuiltInMessage) { builder_->SendError(err); ASSERT_TRUE(absl::StartsWith(str(), kErrorStart)) << " invalid start char for " << err; ASSERT_TRUE(absl::EndsWith(str(), kCRLF)) << " failed to find correct termination at " << err; - ASSERT_EQ(builder_->err_count().at(error_type), 1) - << " number of error count is invalid for " << err; + ASSERT_EQ(GetError(error_type), 1) << " number of error count is invalid for " << err; ASSERT_EQ(str(), BuildExpectedErrorString(error_name)) << " error different from expected - '" << str() << "'"; @@ -280,7 +293,7 @@ TEST_F(RedisReplyBuilderTest, ErrorReplyBuiltInMessage) { builder_->SendError(err); ASSERT_TRUE(absl::StartsWith(str(), kErrorStart)); ASSERT_TRUE(absl::EndsWith(str(), kCRLF)); - ASSERT_EQ(builder_->err_count().at(kIndexOutOfRange), 1); + ASSERT_EQ(GetError(kIndexOutOfRange), 1); ASSERT_EQ(str(), BuildExpectedErrorString(kIndexOutOfRange)); auto parsing_output = Parse(); @@ -292,7 +305,7 @@ TEST_F(RedisReplyBuilderTest, ErrorReplyBuiltInMessage) { builder_->SendError(err); ASSERT_TRUE(absl::StartsWith(str(), kErrorStart)); ASSERT_TRUE(absl::EndsWith(str(), kCRLF)); - ASSERT_EQ(builder_->err_count().at("e2"), 1); + ASSERT_EQ(GetError("e2"), 1); ASSERT_EQ(str(), BuildExpectedErrorString("e1")); parsing_output = Parse(); @@ -314,7 +327,7 @@ TEST_F(RedisReplyBuilderTest, ErrorNoneBuiltInMessage) { builder_->SendError(err); ASSERT_TRUE(absl::StartsWith(str(), kErrorStart)) << " invalid start char for " << err; ASSERT_TRUE(absl::EndsWith(str(), kCRLF)); - auto current_error_count = builder_->err_count().at(error_type); + auto current_error_count = GetError(error_type); error_count++; ASSERT_EQ(current_error_count, error_count) << " number of error count is invalid for " << err; auto parsing_output = Parse(); @@ -374,7 +387,7 @@ TEST_F(RedisReplyBuilderTest, StrArray) { for (auto s : string_vector) { builder_->SendSimpleString(s); expected_size += s.size() + kCRLF.size() + 1; - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); } ASSERT_EQ(SinkSize(), expected_size); // ASSERT_EQ(kArrayStart, str().at(0)); @@ -401,7 +414,7 @@ TEST_F(RedisReplyBuilderTest, SendSimpleStrArr) { "+++", "---", "$$$", "~~~~", "@@@", "^^^", "1234", "foo"}; const std::size_t kArrayLen = sizeof(kArrayMessage) / sizeof(kArrayMessage[0]); builder_->SendSimpleStrArr(kArrayMessage); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); // Tokenize the message and verify content std::vector message_tokens = TokenizeMessage(); ASSERT_THAT(message_tokens, ElementsAre(absl::StrCat(kArrayStartString, kArrayLen), @@ -426,7 +439,7 @@ TEST_F(RedisReplyBuilderTest, SendStringViewArr) { // random values "(((", "}}}", "&&&&", "####", "___", "+++", "0.1234", "bar"}; builder_->SendStringArr(kArrayMessage); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); // verify content std::vector message_tokens = TokenizeMessage(); // the form of this is *\r\n$\r\n..$SendStringArr(kArrayMessage); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); std::vector message_tokens = TokenizeMessage(); // the form of this is *\r\n$\r\n..$\r\n\r\n @@ -479,7 +492,7 @@ TEST_F(RedisReplyBuilderTest, SendBulkStringArr) { TEST_F(RedisReplyBuilderTest, NullBulkString) { // null bulk string == "$-1\r\n" i.e. '$' + -1 + \r + \n builder_->SendNull(); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); ASSERT_EQ(str(), kNullBulkString); auto parsing_output = Parse(); ASSERT_TRUE(parsing_output.Verify(SinkSize())); @@ -491,7 +504,7 @@ TEST_F(RedisReplyBuilderTest, EmptyBulkString) { // empty bulk string is in the form of "$0\r\n\r\n", i.e. length 0 after $ follow by \r\n*2 const std::string_view kEmptyBulkString = "$0\r\n\r\n"; builder_->SendBulkString(std::string_view{}); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); ASSERT_EQ(str(), kEmptyBulkString); auto parsing_output = Parse(); ASSERT_TRUE(parsing_output.Verify(SinkSize())); @@ -505,7 +518,7 @@ TEST_F(RedisReplyBuilderTest, NoAsciiBulkString) { std::size_t data_size = sizeof(random_bytes) / sizeof(random_bytes[0]); std::string_view none_ascii_payload{random_bytes, data_size}; builder_->SendBulkString(none_ascii_payload); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); const std::string expected_payload = absl::StrCat(kBulkStringStart, data_size, kCRLF, none_ascii_payload, kCRLF); ASSERT_EQ(str(), expected_payload); @@ -522,7 +535,7 @@ TEST_F(RedisReplyBuilderTest, BulkStringWithCRLF) { // Verify bulk string that contains the \r\n as payload std::string_view crlf_chars{"\r\n"}; builder_->SendBulkString(crlf_chars); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); // the expected message in this case is $2\r\n\r\n\r\n std::string expected_message = absl::StrCat(kBulkStringStart, crlf_chars.size(), kCRLF, crlf_chars, kCRLF); @@ -538,7 +551,7 @@ TEST_F(RedisReplyBuilderTest, BulkStringWithStartBulkString) { std::string expected_message = absl::StrCat(kBulkStringStart, message.size(), kCRLF, message, kCRLF); builder_->SendBulkString(message); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); ASSERT_EQ(str(), expected_message); auto parsing_output = Parse(); @@ -562,7 +575,7 @@ TEST_F(RedisReplyBuilderTest, BulkStringWithErrorString) { std::string expected_message = absl::StrCat(kBulkStringStart, message.size(), kCRLF, message, kCRLF); builder_->SendBulkString(message); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); ASSERT_EQ(str(), expected_message); auto parsing_output = Parse(); ASSERT_TRUE(parsing_output.IsString()); @@ -594,7 +607,7 @@ TEST_F(RedisReplyBuilderTest, Double) { const std::string expected_payload = absl::StrCat(kBulkStringStart, kPayloadStr.size(), kCRLF, kPayloadStr, kCRLF); builder_->SendDouble(double_value); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); std::vector message_tokens = TokenizeMessage(); ASSERT_EQ(str(), expected_payload); ASSERT_THAT(message_tokens, @@ -638,7 +651,7 @@ TEST_F(RedisReplyBuilderTest, MixedTypeArray) { builder_->SendDouble(double_value); const std::string_view output_msg = str(); ASSERT_FALSE(output_msg.empty()); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); std::vector message_tokens = TokenizeMessage(); ASSERT_THAT( message_tokens, @@ -673,8 +686,8 @@ TEST_F(RedisReplyBuilderTest, BatchMode) { for (const auto& val : kInputArray) { builder_->SendBulkString(val); ASSERT_EQ(SinkSize(), 0) << " sink is not empty at iteration number " << count; - ASSERT_EQ(builder_->io_write_bytes(), 0); - ASSERT_EQ(builder_->io_write_cnt(), 0); + ASSERT_EQ(GetReplyStats().io_write_bytes, 0); + ASSERT_EQ(GetReplyStats().io_write_cnt, 0); total_bytes += val.size(); ++count; } @@ -682,11 +695,11 @@ TEST_F(RedisReplyBuilderTest, BatchMode) { // write something builder_->SetBatchMode(false); builder_->SendBulkString(std::string_view{}); - ASSERT_EQ(builder_->io_write_cnt(), 1); + ASSERT_EQ(GetReplyStats().io_write_cnt, 1); // We expecting to have more than the total bytes we count, // since we are not counting the \r\n and the type char as well // as length entries - ASSERT_GT(builder_->io_write_bytes(), total_bytes); + ASSERT_GT(GetReplyStats().io_write_bytes, total_bytes); std::vector array_members = TokenizeMessage(); ASSERT_THAT(array_members, ElementsAre(absl::StrCat(kArrayStartString, kInputArray.size()), @@ -702,14 +715,14 @@ TEST_F(RedisReplyBuilderTest, BatchMode) { TEST_F(RedisReplyBuilderTest, Resp3Double) { builder_->SetResp3(true); builder_->SendDouble(5.5); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); ASSERT_EQ(str(), ",5.5\r\n"); } TEST_F(RedisReplyBuilderTest, Resp3NullString) { builder_->SetResp3(true); builder_->SendNull(); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "_\r\n"); } @@ -718,13 +731,13 @@ TEST_F(RedisReplyBuilderTest, SendStringArrayAsMap) { builder_->SetResp3(false); builder_->SendStringArr(map_array, builder_->MAP); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "*4\r\n$2\r\nk1\r\n$2\r\nv1\r\n$2\r\nk2\r\n$2\r\nv2\r\n") << "SendStringArrayAsMap Resp2 Failed."; builder_->SetResp3(true); builder_->SendStringArr(map_array, builder_->MAP); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "%2\r\n$2\r\nk1\r\n$2\r\nv1\r\n$2\r\nk2\r\n$2\r\nv2\r\n") << "SendStringArrayAsMap Resp3 Failed."; } @@ -734,13 +747,13 @@ TEST_F(RedisReplyBuilderTest, SendStringArrayAsSet) { builder_->SetResp3(false); builder_->SendStringArr(set_array, builder_->SET); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "*3\r\n$2\r\ne1\r\n$2\r\ne2\r\n$2\r\ne3\r\n") << "SendStringArrayAsSet Resp2 Failed."; builder_->SetResp3(true); builder_->SendStringArr(set_array, builder_->SET); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "~3\r\n$2\r\ne1\r\n$2\r\ne2\r\n$2\r\ne3\r\n") << "SendStringArrayAsSet Resp3 Failed."; } @@ -751,26 +764,26 @@ TEST_F(RedisReplyBuilderTest, SendScoredArray) { builder_->SetResp3(false); builder_->SendScoredArray(scored_array, false); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "*3\r\n$2\r\ne1\r\n$2\r\ne2\r\n$2\r\ne3\r\n") << "Resp2 WITHOUT scores failed."; builder_->SetResp3(true); builder_->SendScoredArray(scored_array, false); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "*3\r\n$2\r\ne1\r\n$2\r\ne2\r\n$2\r\ne3\r\n") << "Resp3 WITHOUT scores failed."; builder_->SetResp3(false); builder_->SendScoredArray(scored_array, true); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "*6\r\n$2\r\ne1\r\n$3\r\n1.1\r\n$2\r\ne2\r\n$3\r\n2.2\r\n$2\r\ne3\r\n$3\r\n3.3\r\n") << "Resp3 WITHSCORES failed."; builder_->SetResp3(true); builder_->SendScoredArray(scored_array, true); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "*3\r\n*2\r\n$2\r\ne1\r\n,1.1\r\n*2\r\n$2\r\ne2\r\n,2.2\r\n*2\r\n$2\r\ne3\r\n,3.3\r\n") << "Resp3 WITHSCORES failed."; @@ -781,14 +794,14 @@ TEST_F(RedisReplyBuilderTest, SendMGetResponse) { builder_->SetResp3(false); builder_->SendMGetResponse(std::move(resp)); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "*3\r\n$2\r\nv1\r\n$-1\r\n$2\r\nv3\r\n") << "Resp2 SendMGetResponse failed."; resp = MakeMGetResponse({"v1", nullopt, "v3"}); builder_->SetResp3(true); builder_->SendMGetResponse(std::move(resp)); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "*3\r\n$2\r\nv1\r\n_\r\n$2\r\nv3\r\n") << "Resp3 SendMGetResponse failed."; } @@ -919,17 +932,17 @@ TEST_F(RedisReplyBuilderTest, VerbatimString) { builder_->SetResp3(true); builder_->SendVerbatimString(str, RedisReplyBuilder::VerbatimFormat::TXT); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "=20\r\ntxt:A simple string!\r\n") << "Resp3 VerbatimString TXT failed."; builder_->SetResp3(true); builder_->SendVerbatimString(str, RedisReplyBuilder::VerbatimFormat::MARKDOWN); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "=20\r\nmkd:A simple string!\r\n") << "Resp3 VerbatimString TXT failed."; builder_->SetResp3(false); builder_->SendVerbatimString(str); - ASSERT_TRUE(builder_->err_count().empty()); + ASSERT_TRUE(NoErrors()); ASSERT_EQ(TakePayload(), "$16\r\nA simple string!\r\n") << "Resp3 VerbatimString TXT failed."; } diff --git a/src/server/server_family.cc b/src/server/server_family.cc index f4e7e26ae..82077e9c5 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -827,7 +827,7 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) { // Net metrics AppendMetricWithoutLabels("net_input_bytes_total", "", conn_stats.io_read_bytes, MetricType::COUNTER, &resp->body()); - AppendMetricWithoutLabels("net_output_bytes_total", "", conn_stats.io_write_bytes, + AppendMetricWithoutLabels("net_output_bytes_total", "", m.reply_stats.io_write_bytes, MetricType::COUNTER, &resp->body()); { string send_latency_metrics; @@ -841,7 +841,7 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) { &send_count_metrics); for (unsigned i = 0; i < SendStatsType::kNumTypes; ++i) { - auto& stats = m.reply_stats[i]; + auto& stats = m.reply_stats.send_stats[i]; string_view type; switch (SendStatsType(i)) { case SendStatsType::kRegular: @@ -1049,7 +1049,7 @@ void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext* ADD_LINE(total_connections, -1); ADD_LINE(rejected_connections, -1); ADD_LINE(bytes_read, m.conn_stats.io_read_bytes); - ADD_LINE(bytes_written, m.conn_stats.io_write_bytes); + ADD_LINE(bytes_written, m.reply_stats.io_write_bytes); ADD_LINE(limit_maxbytes, -1); absl::StrAppend(&info, "END\r\n"); @@ -1450,7 +1450,7 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) { registry->ResetCallStats(index); auto& sstate = *ServerState::tlocal(); auto& stats = sstate.connection_stats; - stats.err_count_map.clear(); + SinkReplyBuilder::ResetThreadLocalStats(); stats.command_cnt = 0; stats.pipelined_cmd_cnt = 0; }); @@ -1550,10 +1550,7 @@ Metrics ServerFamily::GetMetrics() const { result.uptime = time(NULL) - this->start_time_; result.qps += uint64_t(ss->MovingSum6()); result.conn_stats += ss->connection_stats; - - for (size_t i = 0; i < reply_stats.size(); ++i) { - result.reply_stats[i] += reply_stats[i]; - } + result.reply_stats += reply_stats; if (shard) { result.heap_used_bytes += shard->UsedMemory(); @@ -1724,7 +1721,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("instantaneous_ops_per_sec", m.qps); append("total_pipelined_commands", m.conn_stats.pipelined_cmd_cnt); append("total_net_input_bytes", m.conn_stats.io_read_bytes); - append("total_net_output_bytes", m.conn_stats.io_write_bytes); + append("total_net_output_bytes", m.reply_stats.io_write_bytes); append("instantaneous_input_kbps", -1); append("instantaneous_output_kbps", -1); append("rejected_connections", -1); @@ -1742,14 +1739,15 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("keyspace_misses", m.events.misses); append("keyspace_mutations", m.events.mutations); append("total_reads_processed", m.conn_stats.io_read_cnt); - append("total_writes_processed", m.conn_stats.io_write_cnt); + append("total_writes_processed", m.reply_stats.io_write_cnt); append("defrag_attempt_total", m.shard_stats.defrag_attempt_total); append("defrag_realloc_total", m.shard_stats.defrag_realloc_total); append("defrag_task_invocation_total", m.shard_stats.defrag_task_invocation_total); - append("reply_count", m.reply_stats[SendStatsType::kRegular].count); - append("reply_latency_usec", m.reply_stats[SendStatsType::kRegular].total_duration); - append("reply_batch_count", m.reply_stats[SendStatsType::kBatch].count); - append("reply_batch_latency_usec", m.reply_stats[SendStatsType::kBatch].total_duration); + append("reply_count", m.reply_stats.send_stats[SendStatsType::kRegular].count); + append("reply_latency_usec", m.reply_stats.send_stats[SendStatsType::kRegular].total_duration); + append("reply_batch_count", m.reply_stats.send_stats[SendStatsType::kBatch].count); + append("reply_batch_latency_usec", + m.reply_stats.send_stats[SendStatsType::kBatch].total_duration); } if (should_enter("TIERED", true)) { @@ -1882,7 +1880,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { } if (should_enter("ERRORSTATS", true)) { - for (const auto& k_v : m.conn_stats.err_count_map) { + for (const auto& k_v : m.reply_stats.err_count) { append(k_v.first, k_v.second); } } diff --git a/src/server/server_family.h b/src/server/server_family.h index 04c8cc189..df915d212 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -78,7 +78,7 @@ struct Metrics { SearchStats search_stats; ServerState::Stats coordinator_stats; // stats on transaction running - facade::SinkReplyBuilder::StatsType reply_stats{}; // Stats for Send*() ops + facade::SinkReplyBuilder::ReplyStats reply_stats; // Stats for Send*() ops PeakStats peak_stats;