mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
Refactor replication code (#1507)
* refactor: Split redis I/O logic out of dfly::Replica * Split DFLY shard replication into a separate class. * Address comments from CR * Add comments * remove dead code * Add a virtual dtor * * Address comments by Shahar. * Fix the redis replication code. * And now fix the Dragonfly replication
This commit is contained in:
parent
a5d8c91188
commit
6d2fcba168
6 changed files with 806 additions and 631 deletions
|
@ -19,6 +19,7 @@ add_library(dragonfly_lib channel_store.cc command_registry.cc
|
|||
generic_family.cc hset_family.cc json_family.cc
|
||||
search/search_family.cc search/doc_index.cc search/doc_accessors.cc
|
||||
list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc
|
||||
protocol_client.cc
|
||||
snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc
|
||||
set_family.cc stream_family.cc string_family.cc
|
||||
zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc
|
||||
|
|
|
@ -21,7 +21,6 @@ namespace dfly {
|
|||
class Interpreter;
|
||||
class ObjectExplorer; // for Interpreter
|
||||
using facade::MemcacheParser;
|
||||
|
||||
class Service : public facade::ServiceInterface {
|
||||
public:
|
||||
using error_code = std::error_code;
|
||||
|
|
326
src/server/protocol_client.cc
Normal file
326
src/server/protocol_client.cc
Normal file
|
@ -0,0 +1,326 @@
|
|||
// Copyright 2022, DragonflyDB authors. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
#include "server/protocol_client.h"
|
||||
|
||||
extern "C" {
|
||||
#include "redis/rdb.h"
|
||||
}
|
||||
|
||||
#include <absl/cleanup/cleanup.h>
|
||||
#include <absl/flags/flag.h>
|
||||
#include <absl/functional/bind_front.h>
|
||||
#include <absl/strings/escaping.h>
|
||||
#include <absl/strings/str_cat.h>
|
||||
#include <absl/strings/strip.h>
|
||||
|
||||
#include <boost/asio/ip/tcp.hpp>
|
||||
|
||||
#include "base/logging.h"
|
||||
#include "facade/dragonfly_connection.h"
|
||||
#include "facade/redis_parser.h"
|
||||
#include "server/error.h"
|
||||
#include "server/journal/executor.h"
|
||||
#include "server/journal/serializer.h"
|
||||
#include "server/main_service.h"
|
||||
#include "server/rdb_load.h"
|
||||
#include "strings/human_readable.h"
|
||||
|
||||
ABSL_FLAG(std::string, masterauth, "", "password for authentication with master");
|
||||
|
||||
namespace dfly {
|
||||
|
||||
using namespace std;
|
||||
using namespace util;
|
||||
using namespace boost::asio;
|
||||
using namespace facade;
|
||||
using absl::GetFlag;
|
||||
using absl::StrCat;
|
||||
|
||||
namespace {
|
||||
|
||||
int ResolveDns(std::string_view host, char* dest) {
|
||||
struct addrinfo hints, *servinfo;
|
||||
|
||||
memset(&hints, 0, sizeof(hints));
|
||||
hints.ai_family = AF_UNSPEC;
|
||||
hints.ai_socktype = SOCK_STREAM;
|
||||
hints.ai_protocol = IPPROTO_TCP;
|
||||
hints.ai_flags = AI_ALL;
|
||||
|
||||
int res = getaddrinfo(host.data(), NULL, &hints, &servinfo);
|
||||
if (res != 0)
|
||||
return res;
|
||||
|
||||
static_assert(INET_ADDRSTRLEN < INET6_ADDRSTRLEN);
|
||||
|
||||
// If possible, we want to use an IPv4 address.
|
||||
char ipv4_addr[INET6_ADDRSTRLEN];
|
||||
bool found_ipv4 = false;
|
||||
char ipv6_addr[INET6_ADDRSTRLEN];
|
||||
bool found_ipv6 = false;
|
||||
|
||||
for (addrinfo* p = servinfo; p != NULL; p = p->ai_next) {
|
||||
CHECK(p->ai_family == AF_INET || p->ai_family == AF_INET6);
|
||||
if (p->ai_family == AF_INET && !found_ipv4) {
|
||||
struct sockaddr_in* ipv4 = (struct sockaddr_in*)p->ai_addr;
|
||||
CHECK(nullptr !=
|
||||
inet_ntop(p->ai_family, (void*)&ipv4->sin_addr, ipv4_addr, INET6_ADDRSTRLEN));
|
||||
found_ipv4 = true;
|
||||
break;
|
||||
} else if (!found_ipv6) {
|
||||
struct sockaddr_in6* ipv6 = (struct sockaddr_in6*)p->ai_addr;
|
||||
CHECK(nullptr !=
|
||||
inet_ntop(p->ai_family, (void*)&ipv6->sin6_addr, ipv6_addr, INET6_ADDRSTRLEN));
|
||||
found_ipv6 = true;
|
||||
}
|
||||
}
|
||||
|
||||
CHECK(found_ipv4 || found_ipv6);
|
||||
memcpy(dest, found_ipv4 ? ipv4_addr : ipv6_addr, INET6_ADDRSTRLEN);
|
||||
|
||||
freeaddrinfo(servinfo);
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
error_code Recv(FiberSocketBase* input, base::IoBuf* dest) {
|
||||
auto buf = dest->AppendBuffer();
|
||||
io::Result<size_t> exp_size = input->Recv(buf);
|
||||
if (!exp_size)
|
||||
return exp_size.error();
|
||||
|
||||
dest->CommitWrite(*exp_size);
|
||||
|
||||
return error_code{};
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
std::string ProtocolClient::ServerContext::Description() const {
|
||||
return absl::StrCat(host, ":", port);
|
||||
}
|
||||
|
||||
ProtocolClient::ProtocolClient(string host, uint16_t port) {
|
||||
server_context_.host = std::move(host);
|
||||
server_context_.port = port;
|
||||
}
|
||||
|
||||
ProtocolClient::~ProtocolClient() {
|
||||
if (sock_) {
|
||||
auto ec = sock_->Close();
|
||||
LOG_IF(ERROR, ec) << "Error closing socket " << ec;
|
||||
}
|
||||
}
|
||||
|
||||
error_code ProtocolClient::ResolveMasterDns() {
|
||||
char ip_addr[INET6_ADDRSTRLEN];
|
||||
int resolve_res = ResolveDns(server_context_.host, ip_addr);
|
||||
if (resolve_res != 0) {
|
||||
LOG(ERROR) << "Dns error " << gai_strerror(resolve_res) << ", host: " << server_context_.host;
|
||||
return make_error_code(errc::host_unreachable);
|
||||
}
|
||||
LOG(INFO) << "Resetting endpoint! " << ip_addr << ", " << server_context_.port;
|
||||
server_context_.endpoint = {ip::make_address(ip_addr), server_context_.port};
|
||||
|
||||
return error_code{};
|
||||
}
|
||||
|
||||
error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms,
|
||||
Context* cntx) {
|
||||
ProactorBase* mythread = ProactorBase::me();
|
||||
CHECK(mythread);
|
||||
{
|
||||
unique_lock lk(sock_mu_);
|
||||
// The context closes sock_. So if the context error handler has already
|
||||
// run we must not create a new socket. sock_mu_ syncs between the two
|
||||
// functions.
|
||||
if (!cntx->IsCancelled()) {
|
||||
sock_.reset(mythread->CreateSocket());
|
||||
serializer_.reset(new ReqSerializer(sock_.get()));
|
||||
} else {
|
||||
return cntx->GetError();
|
||||
}
|
||||
}
|
||||
|
||||
// We set this timeout because this call blocks other REPLICAOF commands. We don't need it for the
|
||||
// rest of the sync.
|
||||
{
|
||||
uint32_t timeout = sock_->timeout();
|
||||
sock_->set_timeout(connect_timeout_ms.count());
|
||||
RETURN_ON_ERR(sock_->Connect(server_context_.endpoint));
|
||||
sock_->set_timeout(timeout);
|
||||
}
|
||||
|
||||
/* These may help but require additional field testing to learn.
|
||||
int yes = 1;
|
||||
CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
|
||||
CHECK_EQ(0, setsockopt(sock_->native_handle(), SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes)));
|
||||
|
||||
int intv = 15;
|
||||
CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_KEEPIDLE, &intv, sizeof(intv)));
|
||||
|
||||
intv /= 3;
|
||||
CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_KEEPINTVL, &intv, sizeof(intv)));
|
||||
|
||||
intv = 3;
|
||||
CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_KEEPCNT, &intv, sizeof(intv)));
|
||||
*/
|
||||
auto masterauth = absl::GetFlag(FLAGS_masterauth);
|
||||
if (!masterauth.empty()) {
|
||||
ResetParser(false);
|
||||
RETURN_ON_ERR(SendCommandAndReadResponse(StrCat("AUTH ", masterauth)));
|
||||
PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK"));
|
||||
}
|
||||
return error_code{};
|
||||
}
|
||||
|
||||
void ProtocolClient::CloseSocket() {
|
||||
unique_lock lk(sock_mu_);
|
||||
if (sock_) {
|
||||
sock_->proactor()->Await([this] {
|
||||
if (sock_->IsOpen()) {
|
||||
auto ec = sock_->Shutdown(SHUT_RDWR);
|
||||
LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec;
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void ProtocolClient::DefaultErrorHandler(const GenericError& err) {
|
||||
CloseSocket();
|
||||
}
|
||||
|
||||
io::Result<ProtocolClient::ReadRespRes> ProtocolClient::ReadRespReply(base::IoBuf* buffer,
|
||||
bool copy_msg) {
|
||||
DCHECK(parser_);
|
||||
|
||||
error_code ec;
|
||||
if (!buffer) {
|
||||
buffer = &resp_buf_;
|
||||
buffer->Clear();
|
||||
}
|
||||
last_resp_ = "";
|
||||
|
||||
uint32_t processed_bytes = 0;
|
||||
|
||||
RedisParser::Result result = RedisParser::OK;
|
||||
while (!ec) {
|
||||
uint32_t consumed;
|
||||
if (buffer->InputLen() == 0 || result == RedisParser::INPUT_PENDING) {
|
||||
io::MutableBytes buf = buffer->AppendBuffer();
|
||||
io::Result<size_t> size_res = sock_->Recv(buf);
|
||||
if (!size_res) {
|
||||
LOG(ERROR) << "Socket error " << size_res.error();
|
||||
return nonstd::make_unexpected(size_res.error());
|
||||
}
|
||||
|
||||
VLOG(2) << "Read master response of " << *size_res << " bytes";
|
||||
|
||||
TouchIoTime();
|
||||
buffer->CommitWrite(*size_res);
|
||||
}
|
||||
|
||||
result = parser_->Parse(buffer->InputBuffer(), &consumed, &resp_args_);
|
||||
processed_bytes += consumed;
|
||||
if (copy_msg)
|
||||
last_resp_ +=
|
||||
std::string_view(reinterpret_cast<char*>(buffer->InputBuffer().data()), consumed);
|
||||
|
||||
if (result == RedisParser::OK) {
|
||||
return ReadRespRes{processed_bytes, consumed}; // success path
|
||||
}
|
||||
|
||||
buffer->ConsumeInput(consumed);
|
||||
|
||||
if (result != RedisParser::INPUT_PENDING) {
|
||||
LOG(ERROR) << "Invalid parser status " << result << " for response " << last_resp_;
|
||||
return nonstd::make_unexpected(std::make_error_code(std::errc::bad_message));
|
||||
}
|
||||
}
|
||||
|
||||
return nonstd::make_unexpected(ec);
|
||||
}
|
||||
|
||||
error_code ProtocolClient::ReadLine(base::IoBuf* io_buf, string_view* line) {
|
||||
size_t eol_pos;
|
||||
std::string_view input_str = ToSV(io_buf->InputBuffer());
|
||||
|
||||
// consume whitespace.
|
||||
while (true) {
|
||||
auto it = find_if_not(input_str.begin(), input_str.end(), absl::ascii_isspace);
|
||||
size_t ws_len = it - input_str.begin();
|
||||
io_buf->ConsumeInput(ws_len);
|
||||
input_str = ToSV(io_buf->InputBuffer());
|
||||
if (!input_str.empty())
|
||||
break;
|
||||
RETURN_ON_ERR(Recv(sock_.get(), io_buf));
|
||||
input_str = ToSV(io_buf->InputBuffer());
|
||||
};
|
||||
|
||||
// find eol.
|
||||
while (true) {
|
||||
eol_pos = input_str.find('\n');
|
||||
|
||||
if (eol_pos != std::string_view::npos) {
|
||||
DCHECK_GT(eol_pos, 0u); // can not be 0 because then would be consumed as a whitespace.
|
||||
if (input_str[eol_pos - 1] != '\r') {
|
||||
break;
|
||||
}
|
||||
*line = input_str.substr(0, eol_pos - 1);
|
||||
return error_code{};
|
||||
}
|
||||
|
||||
RETURN_ON_ERR(Recv(sock_.get(), io_buf));
|
||||
input_str = ToSV(io_buf->InputBuffer());
|
||||
}
|
||||
|
||||
LOG(ERROR) << "Bad replication header: " << input_str;
|
||||
return std::make_error_code(std::errc::illegal_byte_sequence);
|
||||
}
|
||||
|
||||
bool ProtocolClient::CheckRespIsSimpleReply(string_view reply) const {
|
||||
return resp_args_.size() == 1 && resp_args_.front().type == RespExpr::STRING &&
|
||||
ToSV(resp_args_.front().GetBuf()) == reply;
|
||||
}
|
||||
|
||||
bool ProtocolClient::CheckRespFirstTypes(initializer_list<RespExpr::Type> types) const {
|
||||
unsigned i = 0;
|
||||
for (RespExpr::Type type : types) {
|
||||
if (i >= resp_args_.size() || resp_args_[i].type != type)
|
||||
return false;
|
||||
++i;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
error_code ProtocolClient::SendCommand(string_view command) {
|
||||
serializer_->SendCommand(command);
|
||||
error_code ec = serializer_->ec();
|
||||
if (!ec) {
|
||||
TouchIoTime();
|
||||
}
|
||||
return ec;
|
||||
}
|
||||
|
||||
error_code ProtocolClient::SendCommandAndReadResponse(string_view command) {
|
||||
last_cmd_ = command;
|
||||
if (auto ec = SendCommand(command); ec)
|
||||
return ec;
|
||||
auto response_res = ReadRespReply();
|
||||
return response_res.has_value() ? error_code{} : response_res.error();
|
||||
}
|
||||
|
||||
void ProtocolClient::ResetParser(bool server_mode) {
|
||||
parser_.reset(new RedisParser(server_mode));
|
||||
}
|
||||
|
||||
uint64_t ProtocolClient::LastIoTime() const {
|
||||
return last_io_time_;
|
||||
}
|
||||
|
||||
void ProtocolClient::TouchIoTime() {
|
||||
last_io_time_ = Proactor()->GetMonotonicTimeNs();
|
||||
}
|
||||
|
||||
} // namespace dfly
|
139
src/server/protocol_client.h
Normal file
139
src/server/protocol_client.h
Normal file
|
@ -0,0 +1,139 @@
|
|||
// Copyright 2022, DragonflyDB authors. All rights reserved.
|
||||
// See LICENSE for licensing terms.
|
||||
//
|
||||
#pragma once
|
||||
|
||||
#include <absl/container/inlined_vector.h>
|
||||
|
||||
#include <boost/fiber/barrier.hpp>
|
||||
#include <queue>
|
||||
#include <variant>
|
||||
|
||||
#include "base/io_buf.h"
|
||||
#include "facade/facade_types.h"
|
||||
#include "facade/redis_parser.h"
|
||||
#include "server/common.h"
|
||||
#include "server/journal/types.h"
|
||||
#include "server/version.h"
|
||||
#include "util/fiber_socket_base.h"
|
||||
|
||||
namespace facade {
|
||||
class ReqSerializer;
|
||||
}; // namespace facade
|
||||
|
||||
namespace dfly {
|
||||
|
||||
class Service;
|
||||
class ConnectionContext;
|
||||
class JournalExecutor;
|
||||
struct JournalReader;
|
||||
|
||||
// A helper class for implementing a Redis client that talks to a redis server.
|
||||
// This class should be inherited from.
|
||||
class ProtocolClient {
|
||||
public:
|
||||
ProtocolClient(std::string master_host, uint16_t port);
|
||||
virtual ~ProtocolClient();
|
||||
|
||||
void CloseSocket(); // Close replica sockets.
|
||||
|
||||
uint64_t LastIoTime() const;
|
||||
void TouchIoTime();
|
||||
|
||||
protected:
|
||||
struct ServerContext {
|
||||
std::string host;
|
||||
uint16_t port;
|
||||
boost::asio::ip::tcp::endpoint endpoint;
|
||||
|
||||
std::string Description() const;
|
||||
};
|
||||
|
||||
// Constructing using a fully initialized ServerContext allows to skip
|
||||
// the DNS resolution step.
|
||||
explicit ProtocolClient(ServerContext context) : server_context_(std::move(context)) {
|
||||
}
|
||||
|
||||
std::error_code ResolveMasterDns(); // Resolve master dns
|
||||
// Connect to master and authenticate if needed.
|
||||
std::error_code ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms, Context* cntx);
|
||||
|
||||
void DefaultErrorHandler(const GenericError& err);
|
||||
|
||||
struct ReadRespRes {
|
||||
uint32_t total_read;
|
||||
uint32_t left_in_buffer;
|
||||
};
|
||||
|
||||
// This function uses parser_ and cmd_args_ in order to consume a single response
|
||||
// from the sock_. The output will reside in resp_args_.
|
||||
// For error reporting purposes, the parsed command would be in last_resp_ if copy_msg is true.
|
||||
// If io_buf is not given, a internal temporary buffer will be used.
|
||||
// It is the responsibility of the caller to call buffer->ConsumeInput(rv.left_in_buffer) when it
|
||||
// is done with the result of the call; Calling ConsumeInput may invalidate the data in the result
|
||||
// if the buffer relocates.
|
||||
io::Result<ReadRespRes> ReadRespReply(base::IoBuf* buffer = nullptr, bool copy_msg = true);
|
||||
|
||||
std::error_code ReadLine(base::IoBuf* io_buf, std::string_view* line);
|
||||
|
||||
// Check if reps_args contains a simple reply.
|
||||
bool CheckRespIsSimpleReply(std::string_view reply) const;
|
||||
|
||||
// Check resp_args contains the following types at front.
|
||||
bool CheckRespFirstTypes(std::initializer_list<facade::RespExpr::Type> types) const;
|
||||
|
||||
// Send command, update last_io_time, return error.
|
||||
std::error_code SendCommand(std::string_view command);
|
||||
// Send command, read response into resp_args_.
|
||||
std::error_code SendCommandAndReadResponse(std::string_view command);
|
||||
|
||||
const ServerContext& server() const {
|
||||
return server_context_;
|
||||
}
|
||||
|
||||
void ResetParser(bool server_mode);
|
||||
|
||||
auto& LastResponseArgs() {
|
||||
return resp_args_;
|
||||
}
|
||||
|
||||
auto* Proactor() const {
|
||||
return sock_->proactor();
|
||||
}
|
||||
|
||||
util::LinuxSocketBase* Sock() const {
|
||||
return sock_.get();
|
||||
}
|
||||
|
||||
private:
|
||||
ServerContext server_context_;
|
||||
|
||||
std::unique_ptr<facade::ReqSerializer> serializer_;
|
||||
std::unique_ptr<facade::RedisParser> parser_;
|
||||
facade::RespVec resp_args_;
|
||||
base::IoBuf resp_buf_;
|
||||
|
||||
std::unique_ptr<util::LinuxSocketBase> sock_;
|
||||
Mutex sock_mu_;
|
||||
|
||||
protected:
|
||||
Context cntx_; // context for tasks in replica.
|
||||
|
||||
std::string last_cmd_;
|
||||
std::string last_resp_;
|
||||
|
||||
uint64_t last_io_time_ = 0; // in ns, monotonic clock.
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
||||
/**
|
||||
* A convenience macro to use with ProtocolClient instances for protocol input validation.
|
||||
*/
|
||||
#define PC_RETURN_ON_BAD_RESPONSE(x) \
|
||||
do { \
|
||||
if (!(x)) { \
|
||||
LOG(ERROR) << "Bad response to \"" << last_cmd_ << "\": \"" << absl::CEscape(last_resp_); \
|
||||
return std::make_error_code(errc::bad_message); \
|
||||
} \
|
||||
} while (false)
|
File diff suppressed because it is too large
Load diff
|
@ -14,6 +14,7 @@
|
|||
#include "facade/redis_parser.h"
|
||||
#include "server/common.h"
|
||||
#include "server/journal/types.h"
|
||||
#include "server/protocol_client.h"
|
||||
#include "server/version.h"
|
||||
#include "util/fiber_socket_base.h"
|
||||
|
||||
|
@ -27,24 +28,35 @@ class Service;
|
|||
class ConnectionContext;
|
||||
class JournalExecutor;
|
||||
struct JournalReader;
|
||||
class DflyShardReplica;
|
||||
|
||||
class Replica {
|
||||
private:
|
||||
// The attributes of the master we are connecting to.
|
||||
struct MasterContext {
|
||||
std::string host;
|
||||
uint16_t port;
|
||||
boost::asio::ip::tcp::endpoint endpoint;
|
||||
// Coordinator for multi shard execution.
|
||||
struct MultiShardExecution {
|
||||
Mutex map_mu;
|
||||
|
||||
std::string master_repl_id;
|
||||
std::string dfly_session_id; // Sync session id for dfly sync.
|
||||
uint32_t dfly_flow_id = UINT32_MAX; // Flow id if replica acts as a dfly flow.
|
||||
struct TxExecutionSync {
|
||||
Barrier barrier;
|
||||
std::atomic_uint32_t counter;
|
||||
BlockingCounter block;
|
||||
|
||||
DflyVersion version = DflyVersion::VER0;
|
||||
|
||||
std::string Description() const;
|
||||
explicit TxExecutionSync(uint32_t counter)
|
||||
: barrier(counter), counter(counter), block(counter) {
|
||||
}
|
||||
};
|
||||
|
||||
std::unordered_map<TxId, TxExecutionSync> tx_sync_execution;
|
||||
};
|
||||
|
||||
// The attributes of the master we are connecting to.
|
||||
struct MasterContext {
|
||||
std::string master_repl_id;
|
||||
std::string dfly_session_id; // Sync session id for dfly sync.
|
||||
DflyVersion version = DflyVersion::VER0;
|
||||
};
|
||||
|
||||
// This class manages replication from both Dragonfly and Redis masters.
|
||||
class Replica : ProtocolClient {
|
||||
private:
|
||||
// The flow is : R_ENABLED -> R_TCP_CONNECTED -> (R_SYNCING) -> R_SYNC_OK.
|
||||
// SYNCING means that the initial ack succeeded. It may be optional if we can still load from
|
||||
// the journal offset.
|
||||
|
@ -56,8 +68,122 @@ class Replica {
|
|||
R_SYNC_OK = 0x10,
|
||||
};
|
||||
|
||||
public:
|
||||
Replica(std::string master_host, uint16_t port, Service* se, std::string_view id);
|
||||
~Replica();
|
||||
|
||||
// Spawns a fiber that runs until link with master is broken or the replication is stopped.
|
||||
// Returns true if initial link with master has been established or
|
||||
// false if it has failed.
|
||||
std::error_code Start(ConnectionContext* cntx);
|
||||
|
||||
void Stop(); // thread-safe
|
||||
|
||||
void Pause(bool pause);
|
||||
|
||||
std::error_code TakeOver(std::string_view timeout);
|
||||
|
||||
std::string_view MasterId() const {
|
||||
return master_context_.master_repl_id;
|
||||
}
|
||||
|
||||
private: /* Main standalone mode functions */
|
||||
// Coordinate state transitions. Spawned by start.
|
||||
void MainReplicationFb();
|
||||
|
||||
std::error_code Greet(); // Send PING and REPLCONF.
|
||||
|
||||
std::error_code HandleCapaDflyResp();
|
||||
std::error_code ConfigureDflyMaster();
|
||||
|
||||
std::error_code InitiatePSync(); // Redis full sync.
|
||||
std::error_code InitiateDflySync(); // Dragonfly full sync.
|
||||
|
||||
std::error_code ConsumeRedisStream(); // Redis stable state.
|
||||
std::error_code ConsumeDflyStream(); // Dragonfly stable state.
|
||||
|
||||
void RedisStreamAcksFb();
|
||||
|
||||
void JoinAllFlows(); // Join all flows if possible.
|
||||
void SetShardStates(bool replica); // Call SetReplica(replica) on all shards.
|
||||
|
||||
// Send DFLY ${kind} to the master instance.
|
||||
std::error_code SendNextPhaseRequest(std::string_view kind);
|
||||
|
||||
void DefaultErrorHandler(const GenericError& err);
|
||||
|
||||
private: /* Utility */
|
||||
struct PSyncResponse {
|
||||
// string - end of sync token (diskless)
|
||||
// size_t - size of the full sync blob (disk-based).
|
||||
// if fullsync is 0, it means that master can continue with partial replication.
|
||||
std::variant<std::string, size_t> fullsync;
|
||||
};
|
||||
|
||||
std::error_code ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* dest);
|
||||
|
||||
public: /* Utility */
|
||||
struct Info {
|
||||
std::string host;
|
||||
uint16_t port;
|
||||
bool master_link_established;
|
||||
bool full_sync_in_progress;
|
||||
bool full_sync_done;
|
||||
time_t master_last_io_sec; // monotonic clock.
|
||||
};
|
||||
|
||||
Info GetInfo() const; // thread-safe, blocks fiber
|
||||
|
||||
bool HasDflyMaster() const {
|
||||
return !master_context_.dfly_session_id.empty();
|
||||
}
|
||||
|
||||
const std::string& MasterHost() const {
|
||||
return server().host;
|
||||
}
|
||||
|
||||
uint16_t Port() const {
|
||||
return server().port;
|
||||
}
|
||||
|
||||
std::vector<uint64_t> GetReplicaOffset() const;
|
||||
std::string GetSyncId() const;
|
||||
|
||||
private:
|
||||
Service& service_;
|
||||
MasterContext master_context_;
|
||||
|
||||
// In redis replication mode.
|
||||
Fiber sync_fb_;
|
||||
Fiber acks_fb_;
|
||||
EventCount waker_;
|
||||
|
||||
std::vector<std::unique_ptr<DflyShardReplica>> shard_flows_;
|
||||
std::shared_ptr<MultiShardExecution> multi_shard_exe_;
|
||||
|
||||
// Guard operations where flows might be in a mixed state (transition/setup)
|
||||
Mutex flows_op_mu_;
|
||||
|
||||
// repl_offs - till what offset we've already read from the master.
|
||||
// ack_offs_ last acknowledged offset.
|
||||
size_t repl_offs_ = 0, ack_offs_ = 0;
|
||||
std::atomic<unsigned> state_mask_ = 0;
|
||||
unsigned num_df_flows_ = 0;
|
||||
|
||||
bool is_paused_ = false;
|
||||
std::string id_;
|
||||
};
|
||||
|
||||
// This class implements a single shard replication flow from a Dragonfly master instance.
|
||||
// Multiple DflyShardReplica objects are managed by a Replica object.
|
||||
class DflyShardReplica : public ProtocolClient {
|
||||
public:
|
||||
DflyShardReplica(ServerContext server_context, MasterContext master_context, uint32_t flow_id,
|
||||
Service* service, std::shared_ptr<MultiShardExecution> multi_shard_exe);
|
||||
~DflyShardReplica();
|
||||
|
||||
// This class holds the commands of transaction in single shard.
|
||||
// Once all commands were received, the command can be executed.
|
||||
// Once all commands were received, the transaction can be executed.
|
||||
struct TransactionData {
|
||||
// Update the data from ParsedEntry and return true if all shard transaction commands were
|
||||
// received.
|
||||
|
@ -86,72 +212,8 @@ class Replica {
|
|||
absl::flat_hash_map<TxId, TransactionData> current_;
|
||||
};
|
||||
|
||||
// Coorindator for multi shard execution.
|
||||
struct MultiShardExecution {
|
||||
Mutex map_mu;
|
||||
|
||||
struct TxExecutionSync {
|
||||
Barrier barrier;
|
||||
std::atomic_uint32_t counter;
|
||||
BlockingCounter block;
|
||||
|
||||
TxExecutionSync(uint32_t counter) : barrier(counter), counter(counter), block(counter) {
|
||||
}
|
||||
};
|
||||
|
||||
std::unordered_map<TxId, TxExecutionSync> tx_sync_execution;
|
||||
};
|
||||
|
||||
public:
|
||||
Replica(std::string master_host, uint16_t port, Service* se, std::string_view id);
|
||||
~Replica();
|
||||
|
||||
// Spawns a fiber that runs until link with master is broken or the replication is stopped.
|
||||
// Returns true if initial link with master has been established or
|
||||
// false if it has failed.
|
||||
std::error_code Start(ConnectionContext* cntx);
|
||||
|
||||
void Stop(); // thread-safe
|
||||
|
||||
void Pause(bool pause);
|
||||
|
||||
std::error_code TakeOver(std::string_view timeout);
|
||||
|
||||
std::string_view MasterId() const {
|
||||
return master_context_.master_repl_id;
|
||||
}
|
||||
|
||||
private: /* Main standalone mode functions */
|
||||
// Coordinate state transitions. Spawned by start.
|
||||
void MainReplicationFb();
|
||||
|
||||
std::error_code ResolveMasterDns(); // Resolve master dns
|
||||
// Connect to master and authenticate if needed.
|
||||
std::error_code ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms);
|
||||
std::error_code Greet(); // Send PING and REPLCONF.
|
||||
|
||||
std::error_code HandleCapaDflyResp();
|
||||
std::error_code ConfigureDflyMaster();
|
||||
|
||||
std::error_code InitiatePSync(); // Redis full sync.
|
||||
std::error_code InitiateDflySync(); // Dragonfly full sync.
|
||||
|
||||
std::error_code ConsumeRedisStream(); // Redis stable state.
|
||||
std::error_code ConsumeDflyStream(); // Dragonfly stable state.
|
||||
|
||||
void CloseSocket(); // Close replica sockets.
|
||||
void JoinAllFlows(); // Join all flows if possible.
|
||||
void SetShardStates(bool replica); // Call SetReplica(replica) on all shards.
|
||||
|
||||
// Send DFLY ${kind} to the master instance.
|
||||
std::error_code SendNextPhaseRequest(std::string_view kind);
|
||||
|
||||
void DefaultErrorHandler(const GenericError& err);
|
||||
|
||||
private: /* Main dlfly flow mode functions */
|
||||
// Initialize as single dfly flow.
|
||||
Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* service,
|
||||
std::shared_ptr<MultiShardExecution> shared_exe_data);
|
||||
void Cancel();
|
||||
void JoinFlow();
|
||||
|
||||
// Start replica initialized as dfly flow.
|
||||
std::error_code StartFullSyncFlow(BlockingCounter block, Context* cntx);
|
||||
|
@ -160,7 +222,7 @@ class Replica {
|
|||
std::error_code StartStableSyncFlow(Context* cntx);
|
||||
|
||||
// Single flow full sync fiber spawned by StartFullSyncFlow.
|
||||
void FullSyncDflyFb(std::string eof_token, BlockingCounter block, Context* cntx);
|
||||
void FullSyncDflyFb(const std::string& eof_token, BlockingCounter block, Context* cntx);
|
||||
|
||||
// Single flow stable state sync fiber spawned by StartStableSyncFlow.
|
||||
void StableSyncDflyReadFb(Context* cntx);
|
||||
|
@ -169,81 +231,20 @@ class Replica {
|
|||
|
||||
void StableSyncDflyExecFb(Context* cntx);
|
||||
|
||||
private: /* Utility */
|
||||
struct PSyncResponse {
|
||||
// string - end of sync token (diskless)
|
||||
// size_t - size of the full sync blob (disk-based).
|
||||
// if fullsync is 0, it means that master can continue with partial replication.
|
||||
std::variant<std::string, size_t> fullsync;
|
||||
};
|
||||
|
||||
// This function uses parser_ and cmd_args_ in order to consume a single response
|
||||
// from the sock_. The output will reside in cmd_str_args_.
|
||||
// For error reporting purposes, the parsed command would be in last_resp_.
|
||||
// If io_buf is not given, a temporary buffer will be used.
|
||||
std::error_code ReadRespReply(base::IoBuf* buffer = nullptr);
|
||||
|
||||
std::error_code ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* header);
|
||||
std::error_code ReadLine(base::IoBuf* io_buf, std::string_view* line);
|
||||
|
||||
std::error_code ParseAndExecute(base::IoBuf* io_buf, ConnectionContext* cntx);
|
||||
|
||||
// Check if reps_args contains a simple reply.
|
||||
bool CheckRespIsSimpleReply(std::string_view reply) const;
|
||||
|
||||
// Check resp_args contains the following types at front.
|
||||
bool CheckRespFirstTypes(std::initializer_list<facade::RespExpr::Type> types) const;
|
||||
|
||||
// Send command, update last_io_time, return error.
|
||||
std::error_code SendCommand(std::string_view command, facade::ReqSerializer* serializer);
|
||||
// Send command, read response into resp_args_.
|
||||
std::error_code SendCommandAndReadResponse(std::string_view command,
|
||||
facade::ReqSerializer* serializer,
|
||||
base::IoBuf* buffer = nullptr);
|
||||
|
||||
void ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* cntx);
|
||||
void InsertTxDataToShardResource(TransactionData&& tx_data);
|
||||
void ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx);
|
||||
bool InsertTxToSharedMap(const TransactionData& tx_data);
|
||||
|
||||
public: /* Utility */
|
||||
struct Info {
|
||||
std::string host;
|
||||
uint16_t port;
|
||||
bool master_link_established;
|
||||
bool full_sync_in_progress;
|
||||
bool full_sync_done;
|
||||
time_t master_last_io_sec; // monotonic clock.
|
||||
};
|
||||
uint32_t FlowId() const;
|
||||
|
||||
Info GetInfo() const; // thread-safe, blocks fiber
|
||||
|
||||
bool HasDflyMaster() const {
|
||||
return !master_context_.dfly_session_id.empty();
|
||||
}
|
||||
|
||||
bool IsDflyFlow() const {
|
||||
return master_context_.dfly_flow_id != UINT32_MAX;
|
||||
}
|
||||
|
||||
const std::string& MasterHost() const {
|
||||
return master_context_.host;
|
||||
}
|
||||
|
||||
uint16_t Port() const {
|
||||
return master_context_.port;
|
||||
}
|
||||
|
||||
std::vector<uint64_t> GetReplicaOffset() const;
|
||||
std::string GetSyncId() const;
|
||||
uint64_t JournalExecutedCount() const;
|
||||
|
||||
private:
|
||||
Service& service_;
|
||||
MasterContext master_context_;
|
||||
std::unique_ptr<util::LinuxSocketBase> sock_;
|
||||
Mutex sock_mu_;
|
||||
|
||||
std::shared_ptr<MultiShardExecution> multi_shard_exe_;
|
||||
std::optional<base::IoBuf> leftover_buf_;
|
||||
|
||||
std::queue<std::pair<TransactionData, bool>> trans_data_queue_;
|
||||
static constexpr size_t kYieldAfterItemsInQueue = 50;
|
||||
|
@ -261,36 +262,16 @@ class Replica {
|
|||
// run out-of-order on the master instance.
|
||||
std::atomic_uint64_t journal_rec_executed_ = 0;
|
||||
|
||||
// MainReplicationFb in standalone mode, FullSyncDflyFb in flow mode.
|
||||
Fiber sync_fb_;
|
||||
|
||||
Fiber acks_fb_;
|
||||
size_t ack_offs_ = 0;
|
||||
|
||||
bool force_ping_ = false;
|
||||
Fiber execution_fb_;
|
||||
|
||||
std::vector<std::unique_ptr<Replica>> shard_flows_;
|
||||
|
||||
// Guard operations where flows might be in a mixed state (transition/setup)
|
||||
Mutex flows_op_mu_;
|
||||
|
||||
std::optional<base::IoBuf> leftover_buf_;
|
||||
std::unique_ptr<facade::RedisParser> parser_;
|
||||
facade::RespVec resp_args_;
|
||||
base::IoBuf resp_buf_;
|
||||
std::string last_cmd_;
|
||||
std::string last_resp_;
|
||||
facade::CmdArgVec cmd_str_args_;
|
||||
|
||||
Context cntx_; // context for tasks in replica.
|
||||
|
||||
// repl_offs - till what offset we've already read from the master.
|
||||
// ack_offs_ last acknowledged offset.
|
||||
size_t repl_offs_ = 0, ack_offs_ = 0;
|
||||
uint64_t last_io_time_ = 0; // in ns, monotonic clock.
|
||||
std::atomic<unsigned> state_mask_ = 0;
|
||||
unsigned num_df_flows_ = 0;
|
||||
|
||||
bool is_paused_ = false;
|
||||
std::string id_;
|
||||
std::shared_ptr<MultiShardExecution> multi_shard_exe_;
|
||||
uint32_t flow_id_ = UINT32_MAX; // Flow id if replica acts as a dfly flow.
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue