fix(server): Do more aggressive batching. (#1375)

This removes the "25" limit for batched messages.

Turns out the aggregation in #1287 was not aggressive enough, because
it's quite possible to reach the specified max capacity of  io vectors.
For example, each "QUEUED" is actually "+", "QUEUED", "\r\n" so we can
reach the limit with about 8 batched commands and then finish\
aggregating prematurely.

Closes #1285
This commit is contained in:
Roy Jacobson 2023-06-08 22:12:17 +02:00 committed by GitHub
parent ff338bebe2
commit 1011720a25
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 23 deletions

View file

@ -46,31 +46,25 @@ void SinkReplyBuilder::CloseConnection() {
void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
DCHECK(sink_);
constexpr uint32_t kMaxBatchCnt = 25;
constexpr size_t kMaxBatchSize = 8192;
if ((should_batch_ || should_aggregate_) && batch_cnt_ < kMaxBatchCnt) {
size_t total_size = batch_.size();
for (unsigned i = 0; i < len; ++i) {
total_size += v[i].iov_len;
}
if (total_size < 8192) { // Allow batching with up to 8K of data.
for (unsigned i = 0; i < len; ++i) {
std::string_view src((char*)v[i].iov_base, v[i].iov_len);
DVLOG(2) << "Appending to stream " << src;
batch_.append(src.data(), src.size());
++batch_cnt_;
}
return;
}
}
error_code ec;
++io_write_cnt_;
size_t bsize = 0;
for (unsigned i = 0; i < len; ++i) {
bsize += v[i].iov_len;
}
// Allow batching with up to 8K of data.
if ((should_batch_ || should_aggregate_) && batch_.size() + bsize < kMaxBatchSize) {
for (unsigned i = 0; i < len; ++i) {
std::string_view src((char*)v[i].iov_base, v[i].iov_len);
DVLOG(2) << "Appending to stream " << src;
batch_.append(src.data(), src.size());
}
return;
}
error_code ec;
++io_write_cnt_;
io_write_bytes_ += bsize;
DVLOG(2) << "Writing " << bsize << " bytes of len " << len;
@ -80,7 +74,6 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
DVLOG(1) << "Sending batch to stream " << sink_ << "\n" << batch_;
io_write_bytes_ += batch_.size();
batch_cnt_ = 0;
iovec tmp[len + 1];
tmp[0].iov_base = batch_.data();
@ -119,7 +112,6 @@ void SinkReplyBuilder::StopAggregate() {
error_code ec = sink_->Write(io::Buffer(batch_));
batch_.clear();
batch_cnt_ = 0;
if (ec)
ec_ = ec;

View file

@ -123,7 +123,6 @@ class SinkReplyBuilder {
// Similarly to batch mode but is controlled by at operation level.
bool should_aggregate_ : 1;
uint32_t batch_cnt_ = 0;
};
class MCReplyBuilder : public SinkReplyBuilder {