chore: optimize SendStringArrInternal even more (#3425)

Before - sending 200K items requires more than 12K send calls.
Now - requires less than 2K calls. Latency also went down though not by x6.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-08-02 14:53:20 +03:00 committed by GitHub
parent 8622c27ce1
commit f652f10743
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 59 additions and 48 deletions

View file

@ -103,12 +103,18 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
DVLOG(3) << "Sending batch to stream :" << absl::CHexEscape(batch_);
tl_facade_stats->reply_stats.io_write_bytes += batch_.size();
iovec tmp[len + 1];
tmp[0].iov_base = batch_.data();
tmp[0].iov_len = batch_.size();
copy(v, v + len, tmp + 1);
ec = sink_->Write(tmp, len + 1);
if (len == UIO_MAXIOV) {
ec = sink_->Write(io::Buffer(batch_));
if (!ec) {
ec = sink_->Write(v, len);
}
} else {
iovec tmp[len + 1];
tmp[0].iov_base = batch_.data();
tmp[0].iov_len = batch_.size();
copy(v, v + len, tmp + 1);
ec = sink_->Write(tmp, len + 1);
}
batch_.clear();
}
send_active_ = false;
@ -549,9 +555,9 @@ void RedisReplyBuilder::StartCollection(unsigned len, CollectionType type) {
}
// This implementation a bit complicated because it uses vectorized
// send to send an array. The problem with that is the OS limits vector length
// to low numbers (around 1024). Therefore, to make it robust we send the array in batches.
// We limit the vector length to 256 and when it fills up we flush it to the socket and continue
// send to send an array. The problem with that is the OS limits vector length to UIO_MAXIOV.
// Therefore, to make it robust we send the array in batches.
// We limit the vector length, and when it fills up we flush it to the socket and continue
// iterating.
void RedisReplyBuilder::SendStringArrInternal(
size_t size, absl::FunctionRef<std::string_view(unsigned)> producer, CollectionType type) {
@ -568,68 +574,73 @@ void RedisReplyBuilder::SendStringArrInternal(
return;
}
// When vector length is too long, Send returns EMSGSIZE.
size_t vec_len = std::min<size_t>(124u, size);
// We limit iovec capacity, vectorized length is limited upto UIO_MAXIOV (Send returns EMSGSIZE).
size_t vec_cap = std::min<size_t>(UIO_MAXIOV, size * 2);
absl::FixedArray<iovec, 16> vec(vec_cap);
absl::FixedArray<char, 128> meta(std::max<size_t>(vec_cap * 64, 128u));
absl::FixedArray<iovec, 16> vec(vec_len * 2 + 2);
absl::FixedArray<char, 128> meta(vec_len * 32 + 64); // 32 bytes per element + spare space
char* next = meta.data();
char* start = meta.data();
char* next = start;
// at most 35 chars.
auto serialize_len = [&](char prefix, size_t len) {
*next++ = prefix;
next = absl::numbers_internal::FastIntToBuffer(len, next);
next = absl::numbers_internal::FastIntToBuffer(len, next); // at most 32 chars
*next++ = '\r';
*next++ = '\n';
};
serialize_len(type_char[0], header_len);
vec[0] = IoVec(string_view{meta.data(), size_t(next - meta.data())});
char* start = next;
unsigned vec_indx = 1;
unsigned vec_indx = 0;
string_view src;
for (unsigned i = 0; i < size; ++i) {
src = producer(i);
serialize_len('$', src.size());
// add serialized len blob
vec[vec_indx++] = IoVec(string_view{start, size_t(next - start)});
DCHECK_GT(next - start, 0);
start = next;
// copy data either by referencing via an iovec or copying inline into meta buf.
if (src.size() >= 30) {
vec[vec_indx++] = IoVec(src);
} else if (src.size() > 0) {
memcpy(next, src.data(), src.size());
vec[vec_indx - 1].iov_len += src.size(); // extend the reference
next += src.size();
start = next;
}
*next++ = '\r';
*next++ = '\n';
// we keep at least 40 bytes to have enough place for a small string as well as its length.
if (vec_indx + 1 >= vec.size() || (meta.end() - next < 40)) {
// Flush the iovec array.
if (i < size - 1 || vec_indx == vec.size()) {
constexpr size_t kSSOLen = 32;
if (src.size() > kSSOLen) {
if (vec_indx + 1 >= vec_cap) {
Send(vec.data(), vec_indx);
if (ec_)
return;
vec_indx = 0;
start = meta.data();
next = start + 2;
start[0] = '\r';
start[1] = '\n';
next = meta.data();
}
// reference metadata blob before referencing another vector.
DCHECK_GT(next - start, 0);
vec[vec_indx++] = IoVec(string_view{start, size_t(next - start)});
start = next;
DCHECK_LT(vec_indx, vec.size());
vec[vec_indx++] = IoVec(src);
} else if (src.size() > 0) {
// NOTE!: this is not just optimization. producer may returns a string_piece that will
// be overriden for the next call, so we must do this for correctness.
memcpy(next, src.data(), src.size());
next += src.size();
}
constexpr ptrdiff_t kMargin = kSSOLen + 3 /*$\r\n*/ + 2 /*length*/ + 2 /* \r\n*/; // metadata
// Keep at least kMargin bytes for a small string as well as its length.
if (vec_indx >= vec.size() || ((meta.end() - next) <= kMargin)) {
// Flush the iovec array.
DVLOG(2) << "i=" << i << "meta size=" << next - meta.data();
Send(vec.data(), vec_indx);
if (ec_)
return;
vec_indx = 0;
start = meta.data();
next = start;
}
*next++ = '\r';
*next++ = '\n';
}
vec[vec_indx].iov_base = start;
vec[vec_indx].iov_len = 2;
vec[vec_indx].iov_len = next - start;
Send(vec.data(), vec_indx + 1);
}

View file

@ -187,7 +187,7 @@ std::vector<std::string_view> RedisReplyBuilderTest::TokenizeMessage() const {
<< "string/error message must have only one token got " << message_tokens.size();
break;
default:
CHECK(false) << "invalid start char [" << data[0] << "]";
LOG(FATAL) << "invalid start char [" << data[0] << "]";
break;
}
return message_tokens;