From 34c8939a3a2b4f1cc34b5ca3011a881e98a56c34 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Mon, 29 May 2023 13:31:12 +0300 Subject: [PATCH] fix: Batch array length before sending its contents (#1287) fix: Coalesce array contents to optimize response traffic. Also pull the latest helio. Fixes #1285 Signed-off-by: Roman Gershman --- helio | 2 +- src/facade/reply_builder.cc | 32 +++++++++++++++++++++++++++++--- src/facade/reply_builder.h | 26 +++++++++++++++++++++++++- src/facade/reply_builder_test.cc | 7 +++---- src/server/hset_family.cc | 4 ++++ src/server/list_family.cc | 1 + src/server/main_service.cc | 2 ++ 7 files changed, 65 insertions(+), 9 deletions(-) diff --git a/helio b/helio index 288d85e4f..724eb6441 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 288d85e4fad988994ff236fca8969e7751d19443 +Subproject commit 724eb6441516fe07517ec1841e96e6b68f6ff183 diff --git a/src/facade/reply_builder.cc b/src/facade/reply_builder.cc index a957644e2..9f68266ce 100644 --- a/src/facade/reply_builder.cc +++ b/src/facade/reply_builder.cc @@ -45,8 +45,9 @@ void SinkReplyBuilder::CloseConnection() { void SinkReplyBuilder::Send(const iovec* v, uint32_t len) { DCHECK(sink_); + constexpr uint32_t kMaxBatchCnt = 25; - if (should_batch_) { + if ((should_batch_ || should_aggregate_) && batch_cnt_ < kMaxBatchCnt) { size_t total_size = batch_.size(); for (unsigned i = 0; i < len; ++i) { total_size += v[i].iov_len; @@ -57,6 +58,7 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) { std::string_view src((char*)v[i].iov_base, v[i].iov_len); DVLOG(2) << "Appending to stream " << src; batch_.append(src.data(), src.size()); + ++batch_cnt_; } return; } @@ -64,10 +66,12 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) { error_code ec; ++io_write_cnt_; - + size_t bsize = 0; for (unsigned i = 0; i < len; ++i) { - io_write_bytes_ += v[i].iov_len; + bsize += v[i].iov_len; } + io_write_bytes_ += bsize; + DVLOG(2) << "Writing " << bsize << " bytes of len " << len; if (batch_.empty()) { ec = sink_->Write(v, len); @@ -75,6 +79,7 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) { DVLOG(1) << "Sending batch to stream " << sink_ << "\n" << batch_; io_write_bytes_ += batch_.size(); + batch_cnt_ = 0; iovec tmp[len + 1]; tmp[0].iov_base = batch_.data(); @@ -105,6 +110,20 @@ void SinkReplyBuilder::SendRawVec(absl::Span msg_vec) { Send(arr.data(), msg_vec.size()); } +void SinkReplyBuilder::StopAggregate() { + should_aggregate_ = false; + + if (should_batch_ || batch_.empty()) + return; + + error_code ec = sink_->Write(io::Buffer(batch_)); + batch_.clear(); + batch_cnt_ = 0; + + if (ec) + ec_ = ec; +} + MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink), noreply_(false) { } @@ -287,6 +306,7 @@ void RedisReplyBuilder::SendLong(long num) { void RedisReplyBuilder::SendScoredArray(const std::vector>& arr, bool with_scores) { + ReplyAggregator agg(this); if (!with_scores) { StartArray(arr.size()); for (const auto& p : arr) { @@ -385,7 +405,13 @@ void RedisReplyBuilder::StartCollection(unsigned len, CollectionType type) { type = ARRAY; } + // We do not want to send multiple packets for small responses because these + // trigger TCP-related artifacts (e.g. Nagle's algorithm) that slow down the delivery of the whole + // response. + bool prev = should_aggregate_; + should_aggregate_ |= (len > 0); SendRaw(absl::StrCat(START_SYMBOLS[type], len, kCRLF)); + should_aggregate_ = prev; } // This implementation a bit complicated because it uses vectorized diff --git a/src/facade/reply_builder.h b/src/facade/reply_builder.h index 48d4254b6..64cb463c6 100644 --- a/src/facade/reply_builder.h +++ b/src/facade/reply_builder.h @@ -86,12 +86,31 @@ class SinkReplyBuilder { return err_count_; } + struct ReplyAggregator { + explicit ReplyAggregator(SinkReplyBuilder* builder) : builder_(builder) { + builder_->StartAggregate(); + } + + ~ReplyAggregator() { + builder_->StopAggregate(); + } + + private: + SinkReplyBuilder* builder_; + }; + protected: void SendRaw(std::string_view str); // Sends raw without any formatting. void SendRawVec(absl::Span msg_vec); void Send(const iovec* v, uint32_t len); + void StartAggregate() { + should_aggregate_ = true; + } + + void StopAggregate(); + std::string batch_; ::io::Sink* sink_; std::error_code ec_; @@ -100,7 +119,11 @@ class SinkReplyBuilder { size_t io_write_bytes_ = 0; absl::flat_hash_map err_count_; - bool should_batch_ = false; + bool should_batch_ : 1 = false; + + // Similarly to batch mode but is controlled by at operation level. + bool should_aggregate_ : 2 = false; + uint32_t batch_cnt_ = 0; }; class MCReplyBuilder : public SinkReplyBuilder { @@ -161,6 +184,7 @@ class RedisReplyBuilder : public SinkReplyBuilder { bool with_scores); void StartArray(unsigned len); // StartCollection(len, ARRAY) + virtual void StartCollection(unsigned len, CollectionType type); static char* FormatDouble(double val, char* dest, unsigned dest_len); diff --git a/src/facade/reply_builder_test.cc b/src/facade/reply_builder_test.cc index d4425f082..e3c990647 100644 --- a/src/facade/reply_builder_test.cc +++ b/src/facade/reply_builder_test.cc @@ -185,12 +185,11 @@ TEST_F(RedisReplyBuilderTest, TestMessageSend) { builder_->SendOk(); ASSERT_EQ(TakePayload(), kOKMessage); builder_->StartArray(10); - ASSERT_EQ(TakePayload(), "*10\r\n"); - sink_.Clear(); + std::string_view hello_msg = "hello"; builder_->SendBulkString(hello_msg); - std::string expected_bulk_string = - absl::StrCat(kBulkStringStart, std::to_string(hello_msg.size()), kCRLF, hello_msg, kCRLF); + std::string expected_bulk_string = absl::StrCat( + "*10\r\n", kBulkStringStart, std::to_string(hello_msg.size()), kCRLF, hello_msg, kCRLF); ASSERT_EQ(TakePayload(), expected_bulk_string); } diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index 02370d025..bdafd6f5a 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -775,6 +775,8 @@ void HSetFamily::HMGet(CmdArgList args, ConnectionContext* cntx) { OpResult> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); if (result) { + SinkReplyBuilder::ReplyAggregator agg(cntx->reply_builder()); + (*cntx)->StartArray(result->size()); for (const auto& val : *result) { if (val) { @@ -784,6 +786,8 @@ void HSetFamily::HMGet(CmdArgList args, ConnectionContext* cntx) { } } } else if (result.status() == OpStatus::KEY_NOTFOUND) { + SinkReplyBuilder::ReplyAggregator agg(cntx->reply_builder()); + (*cntx)->StartArray(args.size()); for (unsigned i = 0; i < args.size(); ++i) { (*cntx)->SendNull(); diff --git a/src/server/list_family.cc b/src/server/list_family.cc index cd1648e60..b5d6da647 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -954,6 +954,7 @@ void ListFamily::LPos(CmdArgList args, ConnectionContext* cntx) { (*cntx)->SendLong((*result)[0]); } } else { + SinkReplyBuilder::ReplyAggregator agg(cntx->reply_builder()); (*cntx)->StartArray(result->size()); const auto& array = result.value(); for (const auto& v : array) { diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 9a2c4e8c0..018f70b66 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1524,6 +1524,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { Transaction* trans = cntx->transaction; cntx->transaction = nullptr; + SinkReplyBuilder::ReplyAggregator agg(rb); rb->StartArray(body.size()); for (auto& scmd : body) { arg_vec.resize(scmd.NumArgs() + 1); @@ -1549,6 +1550,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { } VLOG(1) << "StartExec " << exec_info.body.size(); + SinkReplyBuilder::ReplyAggregator agg(rb); rb->StartArray(exec_info.body.size()); if (!exec_info.body.empty()) {