mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
fix: Improve reply latency of HELLO (#2925)
For high connection rate cases it can be significant. In addition, refactor reply_builder so we could use SendStringArrInternal for more cases like SendScoredArray that also has high latency overhead for replies. Signed-off-by: Roman Gershman <roman@dragonflydb.io> Co-authored-by: romange <romange@users.noreply.github.com>
This commit is contained in:
parent
9b6ee18e90
commit
9289f12dc3
3 changed files with 24 additions and 20 deletions
|
@ -524,7 +524,9 @@ void RedisReplyBuilder::SendStringArr(StrSpan arr, CollectionType type) {
|
|||
return;
|
||||
}
|
||||
|
||||
SendStringArrInternal(warr, type);
|
||||
auto cb = [&](size_t i) { return warr[i]; };
|
||||
|
||||
SendStringArrInternal(warr.Size(), std::move(cb), type);
|
||||
}
|
||||
|
||||
void RedisReplyBuilder::StartArray(unsigned len) {
|
||||
|
@ -558,9 +560,8 @@ void RedisReplyBuilder::StartCollection(unsigned len, CollectionType type) {
|
|||
// to low numbers (around 1024). Therefore, to make it robust we send the array in batches.
|
||||
// We limit the vector length to 256 and when it fills up we flush it to the socket and continue
|
||||
// iterating.
|
||||
void RedisReplyBuilder::SendStringArrInternal(WrappedStrSpan arr, CollectionType type) {
|
||||
size_t size = arr.Size();
|
||||
|
||||
void RedisReplyBuilder::SendStringArrInternal(
|
||||
size_t size, absl::FunctionRef<std::string_view(unsigned)> producer, CollectionType type) {
|
||||
size_t header_len = size;
|
||||
string_view type_char = "*";
|
||||
if (is_resp3_) {
|
||||
|
@ -575,40 +576,41 @@ void RedisReplyBuilder::SendStringArrInternal(WrappedStrSpan arr, CollectionType
|
|||
}
|
||||
|
||||
// When vector length is too long, Send returns EMSGSIZE.
|
||||
size_t vec_len = std::min<size_t>(256u, size);
|
||||
size_t vec_len = std::min<size_t>(128u, size);
|
||||
|
||||
absl::FixedArray<iovec, 16> vec(vec_len * 2 + 2);
|
||||
absl::FixedArray<char, 64> meta((vec_len + 1) * 16);
|
||||
absl::FixedArray<char, 64> meta((vec_len + 1) * 16); // 16 bytes per length.
|
||||
char* next = meta.data();
|
||||
|
||||
*next++ = type_char[0];
|
||||
next = absl::numbers_internal::FastIntToBuffer(header_len, next);
|
||||
*next++ = '\r';
|
||||
*next++ = '\n';
|
||||
auto serialize_len = [&](char prefix, size_t len) {
|
||||
*next++ = prefix;
|
||||
next = absl::numbers_internal::FastIntToBuffer(len, next);
|
||||
*next++ = '\r';
|
||||
*next++ = '\n';
|
||||
};
|
||||
|
||||
serialize_len(type_char[0], header_len);
|
||||
vec[0] = IoVec(string_view{meta.data(), size_t(next - meta.data())});
|
||||
char* start = next;
|
||||
|
||||
unsigned vec_indx = 1;
|
||||
string_view src;
|
||||
for (unsigned i = 0; i < size; ++i) {
|
||||
src = arr[i];
|
||||
*next++ = '$';
|
||||
next = absl::numbers_internal::FastIntToBuffer(src.size(), next);
|
||||
*next++ = '\r';
|
||||
*next++ = '\n';
|
||||
vec[vec_indx] = IoVec(string_view{start, size_t(next - start)});
|
||||
src = producer(i);
|
||||
serialize_len('$', src.size());
|
||||
// add serialized len blob
|
||||
vec[vec_indx++] = IoVec(string_view{start, size_t(next - start)});
|
||||
DCHECK_GT(next - start, 0);
|
||||
|
||||
start = next;
|
||||
++vec_indx;
|
||||
|
||||
vec[vec_indx] = IoVec(src);
|
||||
vec[vec_indx++] = IoVec(src);
|
||||
|
||||
*next++ = '\r';
|
||||
*next++ = '\n';
|
||||
++vec_indx;
|
||||
|
||||
if (vec_indx + 1 >= vec.size()) {
|
||||
// Flush the iovec array.
|
||||
if (i < size - 1 || vec_indx == vec.size()) {
|
||||
Send(vec.data(), vec_indx);
|
||||
if (ec_)
|
||||
|
|
|
@ -247,7 +247,8 @@ class RedisReplyBuilder : public SinkReplyBuilder {
|
|||
};
|
||||
|
||||
private:
|
||||
void SendStringArrInternal(WrappedStrSpan arr, CollectionType type);
|
||||
void SendStringArrInternal(size_t size, absl::FunctionRef<std::string_view(unsigned)> producer,
|
||||
CollectionType type);
|
||||
|
||||
bool is_resp3_ = false;
|
||||
};
|
||||
|
|
|
@ -2327,6 +2327,7 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
|
|||
rb->SetResp3(false);
|
||||
}
|
||||
|
||||
SinkReplyBuilder::ReplyAggregator agg(rb);
|
||||
rb->StartCollection(7, RedisReplyBuilder::MAP);
|
||||
rb->SendBulkString("server");
|
||||
rb->SendBulkString("redis");
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue