feat(facade): Capturing reply builder (#1001)

Implement capturing reply builder to allow storing command responses and sending them separately, to allow parallel command execution
This commit is contained in:
Vladislav 2023-04-01 19:07:53 +03:00 committed by GitHub
parent f953946eff
commit 1b4d3b42a3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
8 changed files with 411 additions and 34 deletions

View file

@ -1,5 +1,6 @@
add_library(dfly_facade dragonfly_listener.cc dragonfly_connection.cc facade.cc
memcache_parser.cc redis_parser.cc reply_builder.cc op_status.cc)
memcache_parser.cc redis_parser.cc reply_builder.cc op_status.cc
reply_capture.cc)
if (DF_USE_SSL)
set(TLS_LIB tls_lib)

View file

@ -124,11 +124,11 @@ void MCReplyBuilder::SendLong(long val) {
SendSimpleString(string_view(buf, next - buf));
}
void MCReplyBuilder::SendMGetResponse(const OptResp* resp, uint32_t count) {
void MCReplyBuilder::SendMGetResponse(absl::Span<const OptResp> arr) {
string header;
for (unsigned i = 0; i < count; ++i) {
if (resp[i]) {
const auto& src = *resp[i];
for (unsigned i = 0; i < arr.size(); ++i) {
if (arr[i]) {
const auto& src = *arr[i];
absl::StrAppend(&header, "VALUE ", src.key, " ", src.mc_flag, " ", src.value.size());
if (src.mc_ver) {
absl::StrAppend(&header, " ", src.mc_ver);
@ -323,12 +323,12 @@ void RedisReplyBuilder::SendDouble(double val) {
}
}
void RedisReplyBuilder::SendMGetResponse(const OptResp* resp, uint32_t count) {
string res = absl::StrCat("*", count, kCRLF);
for (size_t i = 0; i < count; ++i) {
if (resp[i]) {
StrAppend(&res, "$", resp[i]->value.size(), kCRLF);
res.append(resp[i]->value).append(kCRLF);
void RedisReplyBuilder::SendMGetResponse(absl::Span<const OptResp> arr) {
string res = absl::StrCat("*", arr.size(), kCRLF);
for (size_t i = 0; i < arr.size(); ++i) {
if (arr[i]) {
StrAppend(&res, "$", arr[i]->value.size(), kCRLF);
res.append(arr[i]->value).append(kCRLF);
} else {
res.append(NullString());
}
@ -337,11 +337,13 @@ void RedisReplyBuilder::SendMGetResponse(const OptResp* resp, uint32_t count) {
SendRaw(res);
}
void RedisReplyBuilder::SendSimpleStrArr(absl::Span<const std::string_view> arr) {
string res = absl::StrCat("*", arr.size(), kCRLF);
void RedisReplyBuilder::SendSimpleStrArr(StrSpan arr) {
WrappedStrSpan warr{arr};
for (auto sv : arr)
StrAppend(&res, "+", sv, kCRLF);
string res = absl::StrCat("*", warr.Size(), kCRLF);
for (unsigned i = 0; i < warr.Size(); i++)
StrAppend(&res, "+", warr[i], kCRLF);
SendRaw(res);
}

View file

@ -1,6 +1,8 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <absl/container/flat_hash_map.h>
#include <optional>
@ -14,7 +16,7 @@ namespace facade {
class SinkReplyBuilder {
public:
struct ResponseValue {
std::string_view key;
std::string key;
std::string value;
uint64_t mc_ver = 0; // 0 means we do not output it (i.e has not been requested).
uint32_t mc_flag = 0;
@ -36,7 +38,7 @@ class SinkReplyBuilder {
virtual void SendStored() = 0; // Reply for set commands.
virtual void SendSetSkipped() = 0;
virtual void SendMGetResponse(const OptResp* resp, uint32_t count) = 0;
virtual void SendMGetResponse(absl::Span<const OptResp>) = 0;
virtual void SendLong(long val) = 0;
virtual void SendSimpleString(std::string_view str) = 0;
@ -103,7 +105,7 @@ class MCReplyBuilder : public SinkReplyBuilder {
void SendError(std::string_view str, std::string_view type = std::string_view{}) final;
// void SendGetReply(std::string_view key, uint32_t flags, std::string_view value) final;
void SendMGetResponse(const OptResp* resp, uint32_t count) final;
void SendMGetResponse(absl::Span<const OptResp>) final;
void SendStored() final;
void SendLong(long val) final;
@ -125,7 +127,7 @@ class RedisReplyBuilder : public SinkReplyBuilder {
void SetResp3(bool is_resp3);
void SendError(std::string_view str, std::string_view type = {}) override;
void SendMGetResponse(const OptResp* resp, uint32_t count) override;
void SendMGetResponse(absl::Span<const OptResp>) override;
void SendStored() override;
void SendSetSkipped() override;
@ -133,7 +135,7 @@ class RedisReplyBuilder : public SinkReplyBuilder {
virtual void SendNullArray(); // Send *-1
virtual void SendEmptyArray(); // Send *0
virtual void SendSimpleStrArr(absl::Span<const std::string_view> arr);
virtual void SendSimpleStrArr(StrSpan arr);
virtual void SendStringArr(StrSpan arr, CollectionType type = ARRAY);
virtual void SendNull();

View file

@ -6,6 +6,7 @@
#include "facade/error.h"
#include "facade/facade_test.h"
#include "facade/redis_parser.h"
#include "facade/reply_capture.h"
// This will test the reply_builder RESP (Redis).
@ -741,15 +742,92 @@ TEST_F(RedisReplyBuilderTest, TestSendMGetResponse) {
v.value = "v3";
builder_->SetResp3(false);
builder_->SendMGetResponse(&mget_res[0], 3);
builder_->SendMGetResponse(mget_res);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_EQ(TakePayload(), "*3\r\n$2\r\nv3\r\n$-1\r\n$0\r\n\r\n")
<< "Resp2 SendMGetResponse failed.";
builder_->SetResp3(true);
builder_->SendMGetResponse(&mget_res[0], 3);
builder_->SendMGetResponse(mget_res);
ASSERT_TRUE(builder_->err_count().empty());
ASSERT_EQ(TakePayload(), "*3\r\n$2\r\nv3\r\n_\r\n$0\r\n\r\n") << "Resp3 SendMGetResponse failed.";
}
TEST_F(RedisReplyBuilderTest, TestBasicCapture) {
using namespace std;
string_view kTestSws[] = {"a1"sv, "a2"sv, "a3"sv, "a4"sv};
CapturingReplyBuilder crb{};
using RRB = RedisReplyBuilder;
auto big_arr_cb = [kTestSws](RRB* r) {
r->StartArray(4);
{
r->StartArray(2);
r->SendLong(1);
r->StartArray(2);
{
r->SendLong(2);
r->SendLong(3);
}
}
r->SendLong(4);
{
r->StartArray(2);
{
r->StartArray(2);
r->SendLong(5);
r->SendLong(6);
}
r->SendLong(7);
}
r->SendLong(8);
};
function<void(RRB*)> funcs[] = {
[](RRB* r) { r->SendNull(); },
[](RRB* r) { r->SendLong(1L); },
[](RRB* r) { r->SendDouble(6.7); },
[](RRB* r) { r->SendSimpleString("ok"); },
[](RRB* r) { r->SendEmptyArray(); },
[](RRB* r) { r->SendNullArray(); },
[](RRB* r) { r->SendError("e1", "e2"); },
[kTestSws](RRB* r) { r->SendSimpleStrArr(kTestSws); },
[kTestSws](RRB* r) { r->SendStringArr(kTestSws); },
[kTestSws](RRB* r) { r->SendStringArr(kTestSws, RRB::SET); },
[kTestSws](RRB* r) { r->SendStringArr(kTestSws, RRB::MAP); },
[kTestSws](RRB* r) {
r->StartArray(3);
r->SendLong(1L);
r->SendDouble(2.5);
r->SendSimpleStrArr(kTestSws);
},
[kTestSws](RRB* r) {
vector<RRB::OptResp> v = {
RRB::ResponseValue{"key-1", "value-1", 0, 0},
nullopt,
RRB::ResponseValue{"key-2", "value-2", 0, 0},
};
r->SendMGetResponse(v);
},
big_arr_cb,
};
crb.SetResp3(true);
builder_->SetResp3(true);
// Run generator functions on both a regular redis builder
// and the capturing builder with its capture applied.
for (auto& f : funcs) {
f(builder_.get());
auto expected = TakePayload();
f(&crb);
CapturingReplyBuilder::Apply(crb.Take(), builder_.get());
auto actual = TakePayload();
EXPECT_EQ(expected, actual);
}
builder_->SetResp3(false);
}
} // namespace facade

190
src/facade/reply_capture.cc Normal file
View file

@ -0,0 +1,190 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "facade/reply_capture.h"
#include "base/logging.h"
#include "reply_capture.h"
namespace facade {
using namespace std;
void CapturingReplyBuilder::SendError(std::string_view str, std::string_view type) {
Capture(Error{str, type});
}
void CapturingReplyBuilder::SendMGetResponse(absl::Span<const OptResp> arr) {
Capture(vector<OptResp>{arr.begin(), arr.end()});
}
void CapturingReplyBuilder::SendError(OpStatus status) {
Capture(status);
}
void CapturingReplyBuilder::SendNullArray() {
Capture(unique_ptr<CollectionPayload>{nullptr});
}
void CapturingReplyBuilder::SendEmptyArray() {
Capture(make_unique<CollectionPayload>(0, ARRAY));
}
void CapturingReplyBuilder::SendSimpleStrArr(StrSpan arr) {
DCHECK_EQ(current_.index(), 0u);
WrappedStrSpan warr{arr};
vector<string> sarr(warr.Size());
for (unsigned i = 0; i < warr.Size(); i++)
sarr[i] = warr[i];
Capture(StrArrPayload{true, ARRAY, move(sarr)});
}
void CapturingReplyBuilder::SendStringArr(StrSpan arr, CollectionType type) {
DCHECK_EQ(current_.index(), 0u);
// TODO: 1. Allocate all strings at once 2. Allow movable types
WrappedStrSpan warr{arr};
vector<string> sarr(warr.Size());
for (unsigned i = 0; i < warr.Size(); i++)
sarr[i] = warr[i];
Capture(StrArrPayload{false, type, move(sarr)});
}
void CapturingReplyBuilder::SendNull() {
Capture(nullptr_t{});
}
void CapturingReplyBuilder::SendLong(long val) {
Capture(val);
}
void CapturingReplyBuilder::SendDouble(double val) {
Capture(val);
}
void CapturingReplyBuilder::SendSimpleString(std::string_view str) {
Capture(SimpleString{string{str}});
}
void CapturingReplyBuilder::SendBulkString(std::string_view str) {
Capture(BulkString{string{str}});
}
void CapturingReplyBuilder::SendScoredArray(const std::vector<std::pair<std::string, double>>& arr,
bool with_scores) {
Capture(ScoredArray{arr, with_scores});
}
void CapturingReplyBuilder::StartCollection(unsigned len, CollectionType type) {
stack_.emplace(make_unique<CollectionPayload>(len, type), type == MAP ? len * 2 : len);
// If we added an empty collection, it must be collapsed immediately.
CollapseFilledCollections();
}
CapturingReplyBuilder::Payload CapturingReplyBuilder::Take() {
CHECK(stack_.empty());
Payload pl = move(current_);
current_ = monostate{};
return pl;
}
void CapturingReplyBuilder::Capture(Payload val) {
if (!stack_.empty()) {
stack_.top().first->arr.push_back(std::move(val));
stack_.top().second--;
} else {
DCHECK_EQ(current_.index(), 0u);
current_ = std::move(val);
}
// Check if we filled up a collection.
CollapseFilledCollections();
}
void CapturingReplyBuilder::CollapseFilledCollections() {
while (!stack_.empty() && stack_.top().second == 0) {
auto pl = move(stack_.top());
stack_.pop();
Capture(move(pl.first));
}
}
CapturingReplyBuilder::CollectionPayload::CollectionPayload(unsigned len, CollectionType type)
: len{len}, type{type}, arr{} {
arr.reserve(type == MAP ? len * 2 : len);
}
struct CaptureVisitor {
void operator()(monostate) {
}
void operator()(long v) {
rb->SendLong(v);
}
void operator()(double v) {
rb->SendDouble(v);
}
void operator()(const CapturingReplyBuilder::SimpleString& ss) {
rb->SendSimpleString(ss);
}
void operator()(const CapturingReplyBuilder::BulkString& bs) {
rb->SendBulkString(bs);
}
void operator()(CapturingReplyBuilder::Null) {
rb->SendNull();
}
void operator()(CapturingReplyBuilder::Error err) {
rb->SendError(err.first, err.second);
}
void operator()(OpStatus status) {
rb->SendError(status);
}
void operator()(const CapturingReplyBuilder::StrArrPayload& sa) {
if (sa.simple)
rb->SendSimpleStrArr(sa.arr);
else
rb->SendStringArr(sa.arr, sa.type);
}
void operator()(const unique_ptr<CapturingReplyBuilder::CollectionPayload>& cp) {
if (!cp) {
rb->SendNullArray();
return;
}
if (cp->len == 0 && cp->type == RedisReplyBuilder::ARRAY) {
rb->SendEmptyArray();
return;
}
rb->StartCollection(cp->len, cp->type);
for (auto& pl : cp->arr)
visit(*this, pl);
}
void operator()(const vector<RedisReplyBuilder::OptResp>& mget) {
rb->SendMGetResponse(mget);
}
void operator()(const CapturingReplyBuilder::ScoredArray& sarr) {
rb->SendScoredArray(sarr.arr, sarr.with_scores);
}
RedisReplyBuilder* rb;
};
void CapturingReplyBuilder::Apply(Payload&& pl, RedisReplyBuilder* rb) {
CaptureVisitor cv{rb};
visit(cv, pl);
}
} // namespace facade

103
src/facade/reply_capture.h Normal file
View file

@ -0,0 +1,103 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <memory>
#include <stack>
#include <variant>
#include "facade/reply_builder.h"
namespace facade {
struct CaptureVisitor;
// CapturingReplyBuilder allows capturing replies and retrieveing them with Take().
// Those replies can be stored standalone and sent with
// CapturingReplyBuilder::Apply() to another reply builder.
class CapturingReplyBuilder : public RedisReplyBuilder {
friend struct CaptureVisitor;
public:
void SendError(std::string_view str, std::string_view type = {}) override;
void SendMGetResponse(absl::Span<const OptResp>) override;
// SendStored -> SendSimpleString("OK")
// SendSetSkipped -> SendNull()
void SendError(OpStatus status) override;
void SendNullArray() override;
void SendEmptyArray() override;
void SendSimpleStrArr(StrSpan arr) override;
void SendStringArr(StrSpan arr, CollectionType type = ARRAY) override;
void SendNull() override;
void SendLong(long val) override;
void SendDouble(double val) override;
void SendSimpleString(std::string_view str) override;
void SendBulkString(std::string_view str) override;
void SendScoredArray(const std::vector<std::pair<std::string, double>>& arr,
bool with_scores) override;
void StartCollection(unsigned len, CollectionType type) override;
private:
using Error = std::pair<std::string, std::string>; // SendError
using Null = std::nullptr_t; // SendNull or SendNullArray
struct SimpleString : public std::string {}; // SendSimpleString
struct BulkString : public std::string {}; // SendBulkString
struct StrArrPayload {
bool simple;
CollectionType type;
std::vector<std::string> arr;
};
struct CollectionPayload;
struct ScoredArray {
std::vector<std::pair<std::string, double>> arr;
bool with_scores;
};
public:
CapturingReplyBuilder() : RedisReplyBuilder{nullptr}, stack_{}, current_{} {
}
using Payload = std::variant<std::monostate, Null, Error, OpStatus, long, double, SimpleString,
BulkString, StrArrPayload, std::unique_ptr<CollectionPayload>,
std::vector<OptResp>, ScoredArray>;
// Take payload and clear state.
Payload Take();
// Send payload to builder.
static void Apply(Payload&& pl, RedisReplyBuilder* builder);
private:
struct CollectionPayload {
CollectionPayload(unsigned len, CollectionType type);
unsigned len;
CollectionType type;
std::vector<Payload> arr;
};
private:
// Capture value and store eiter in current topmost collection or as a standalone value.
void Capture(Payload val);
// While topmost collection in stack is full, finalize it and add it as a regular value.
void CollapseFilledCollections();
// List of nested active collections that are being built.
std::stack<std::pair<std::unique_ptr<CollectionPayload>, int>> stack_;
// Root payload.
Payload current_;
};
} // namespace facade

View file

@ -209,8 +209,8 @@ class InterpreterReplier : public RedisReplyBuilder {
void SendStored() final;
void SendSimpleString(std::string_view str) final;
void SendMGetResponse(const OptResp* resp, uint32_t count) final;
void SendSimpleStrArr(absl::Span<const string_view> arr) final;
void SendMGetResponse(absl::Span<const OptResp>) final;
void SendSimpleStrArr(StrSpan arr) final;
void SendNullArray() final;
void SendStringArr(StrSpan arr, CollectionType type) final;
@ -315,13 +315,13 @@ void InterpreterReplier::SendSimpleString(string_view str) {
PostItem();
}
void InterpreterReplier::SendMGetResponse(const OptResp* resp, uint32_t count) {
void InterpreterReplier::SendMGetResponse(absl::Span<const OptResp> arr) {
DCHECK(array_len_.empty());
explr_->OnArrayStart(count);
for (uint32_t i = 0; i < count; ++i) {
if (resp[i].has_value()) {
explr_->OnString(resp[i]->value);
explr_->OnArrayStart(arr.size());
for (uint32_t i = 0; i < arr.size(); ++i) {
if (arr[i].has_value()) {
explr_->OnString(arr[i]->value);
} else {
explr_->OnNil();
}
@ -329,10 +329,11 @@ void InterpreterReplier::SendMGetResponse(const OptResp* resp, uint32_t count) {
explr_->OnArrayEnd();
}
void InterpreterReplier::SendSimpleStrArr(absl::Span<const string_view> arr) {
explr_->OnArrayStart(arr.size());
for (auto sv : arr)
explr_->OnString(sv);
void InterpreterReplier::SendSimpleStrArr(StrSpan arr) {
WrappedStrSpan warr{arr};
explr_->OnArrayStart(warr.Size());
for (unsigned i = 0; i < warr.Size(); i++)
explr_->OnString(warr[i]);
explr_->OnArrayEnd();
}

View file

@ -1114,7 +1114,7 @@ void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
}
}
return cntx->reply_builder()->SendMGetResponse(res.data(), res.size());
return cntx->reply_builder()->SendMGetResponse(res);
}
void StringFamily::MSet(CmdArgList args, ConnectionContext* cntx) {