fix: Batch array length before sending its contents (#1287)

fix: Coalesce array contents to optimize response traffic.

Also pull the latest helio.

Fixes #1285

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-05-29 13:31:12 +03:00 committed by GitHub
parent 6e21686406
commit 34c8939a3a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 65 additions and 9 deletions

2
helio

@ -1 +1 @@
Subproject commit 288d85e4fad988994ff236fca8969e7751d19443
Subproject commit 724eb6441516fe07517ec1841e96e6b68f6ff183

View file

@ -45,8 +45,9 @@ void SinkReplyBuilder::CloseConnection() {
void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
DCHECK(sink_);
constexpr uint32_t kMaxBatchCnt = 25;
if (should_batch_) {
if ((should_batch_ || should_aggregate_) && batch_cnt_ < kMaxBatchCnt) {
size_t total_size = batch_.size();
for (unsigned i = 0; i < len; ++i) {
total_size += v[i].iov_len;
@ -57,6 +58,7 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
std::string_view src((char*)v[i].iov_base, v[i].iov_len);
DVLOG(2) << "Appending to stream " << src;
batch_.append(src.data(), src.size());
++batch_cnt_;
}
return;
}
@ -64,10 +66,12 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
error_code ec;
++io_write_cnt_;
size_t bsize = 0;
for (unsigned i = 0; i < len; ++i) {
io_write_bytes_ += v[i].iov_len;
bsize += v[i].iov_len;
}
io_write_bytes_ += bsize;
DVLOG(2) << "Writing " << bsize << " bytes of len " << len;
if (batch_.empty()) {
ec = sink_->Write(v, len);
@ -75,6 +79,7 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
DVLOG(1) << "Sending batch to stream " << sink_ << "\n" << batch_;
io_write_bytes_ += batch_.size();
batch_cnt_ = 0;
iovec tmp[len + 1];
tmp[0].iov_base = batch_.data();
@ -105,6 +110,20 @@ void SinkReplyBuilder::SendRawVec(absl::Span<const std::string_view> msg_vec) {
Send(arr.data(), msg_vec.size());
}
void SinkReplyBuilder::StopAggregate() {
should_aggregate_ = false;
if (should_batch_ || batch_.empty())
return;
error_code ec = sink_->Write(io::Buffer(batch_));
batch_.clear();
batch_cnt_ = 0;
if (ec)
ec_ = ec;
}
MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink), noreply_(false) {
}
@ -287,6 +306,7 @@ void RedisReplyBuilder::SendLong(long num) {
void RedisReplyBuilder::SendScoredArray(const std::vector<std::pair<std::string, double>>& arr,
bool with_scores) {
ReplyAggregator agg(this);
if (!with_scores) {
StartArray(arr.size());
for (const auto& p : arr) {
@ -385,7 +405,13 @@ void RedisReplyBuilder::StartCollection(unsigned len, CollectionType type) {
type = ARRAY;
}
// We do not want to send multiple packets for small responses because these
// trigger TCP-related artifacts (e.g. Nagle's algorithm) that slow down the delivery of the whole
// response.
bool prev = should_aggregate_;
should_aggregate_ |= (len > 0);
SendRaw(absl::StrCat(START_SYMBOLS[type], len, kCRLF));
should_aggregate_ = prev;
}
// This implementation a bit complicated because it uses vectorized

View file

@ -86,12 +86,31 @@ class SinkReplyBuilder {
return err_count_;
}
struct ReplyAggregator {
explicit ReplyAggregator(SinkReplyBuilder* builder) : builder_(builder) {
builder_->StartAggregate();
}
~ReplyAggregator() {
builder_->StopAggregate();
}
private:
SinkReplyBuilder* builder_;
};
protected:
void SendRaw(std::string_view str); // Sends raw without any formatting.
void SendRawVec(absl::Span<const std::string_view> msg_vec);
void Send(const iovec* v, uint32_t len);
void StartAggregate() {
should_aggregate_ = true;
}
void StopAggregate();
std::string batch_;
::io::Sink* sink_;
std::error_code ec_;
@ -100,7 +119,11 @@ class SinkReplyBuilder {
size_t io_write_bytes_ = 0;
absl::flat_hash_map<std::string, uint64_t> err_count_;
bool should_batch_ = false;
bool should_batch_ : 1 = false;
// Similarly to batch mode but is controlled by at operation level.
bool should_aggregate_ : 2 = false;
uint32_t batch_cnt_ = 0;
};
class MCReplyBuilder : public SinkReplyBuilder {
@ -161,6 +184,7 @@ class RedisReplyBuilder : public SinkReplyBuilder {
bool with_scores);
void StartArray(unsigned len); // StartCollection(len, ARRAY)
virtual void StartCollection(unsigned len, CollectionType type);
static char* FormatDouble(double val, char* dest, unsigned dest_len);

View file

@ -185,12 +185,11 @@ TEST_F(RedisReplyBuilderTest, TestMessageSend) {
builder_->SendOk();
ASSERT_EQ(TakePayload(), kOKMessage);
builder_->StartArray(10);
ASSERT_EQ(TakePayload(), "*10\r\n");
sink_.Clear();
std::string_view hello_msg = "hello";
builder_->SendBulkString(hello_msg);
std::string expected_bulk_string =
absl::StrCat(kBulkStringStart, std::to_string(hello_msg.size()), kCRLF, hello_msg, kCRLF);
std::string expected_bulk_string = absl::StrCat(
"*10\r\n", kBulkStringStart, std::to_string(hello_msg.size()), kCRLF, hello_msg, kCRLF);
ASSERT_EQ(TakePayload(), expected_bulk_string);
}

View file

@ -775,6 +775,8 @@ void HSetFamily::HMGet(CmdArgList args, ConnectionContext* cntx) {
OpResult<vector<OptStr>> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
SinkReplyBuilder::ReplyAggregator agg(cntx->reply_builder());
(*cntx)->StartArray(result->size());
for (const auto& val : *result) {
if (val) {
@ -784,6 +786,8 @@ void HSetFamily::HMGet(CmdArgList args, ConnectionContext* cntx) {
}
}
} else if (result.status() == OpStatus::KEY_NOTFOUND) {
SinkReplyBuilder::ReplyAggregator agg(cntx->reply_builder());
(*cntx)->StartArray(args.size());
for (unsigned i = 0; i < args.size(); ++i) {
(*cntx)->SendNull();

View file

@ -954,6 +954,7 @@ void ListFamily::LPos(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendLong((*result)[0]);
}
} else {
SinkReplyBuilder::ReplyAggregator agg(cntx->reply_builder());
(*cntx)->StartArray(result->size());
const auto& array = result.value();
for (const auto& v : array) {

View file

@ -1524,6 +1524,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
Transaction* trans = cntx->transaction;
cntx->transaction = nullptr;
SinkReplyBuilder::ReplyAggregator agg(rb);
rb->StartArray(body.size());
for (auto& scmd : body) {
arg_vec.resize(scmd.NumArgs() + 1);
@ -1549,6 +1550,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
}
VLOG(1) << "StartExec " << exec_info.body.size();
SinkReplyBuilder::ReplyAggregator agg(rb);
rb->StartArray(exec_info.body.size());
if (!exec_info.body.empty()) {