diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index b8ee3984c..577dca757 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -19,6 +19,7 @@ add_library(dragonfly_lib channel_store.cc command_registry.cc generic_family.cc hset_family.cc json_family.cc search/search_family.cc search/doc_index.cc search/doc_accessors.cc list_family.cc main_service.cc memory_cmd.cc rdb_load.cc rdb_save.cc replica.cc + protocol_client.cc snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc set_family.cc stream_family.cc string_family.cc zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc diff --git a/src/server/main_service.h b/src/server/main_service.h index 3e4c0f962..99dd8b066 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -21,7 +21,6 @@ namespace dfly { class Interpreter; class ObjectExplorer; // for Interpreter using facade::MemcacheParser; - class Service : public facade::ServiceInterface { public: using error_code = std::error_code; diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc new file mode 100644 index 000000000..3470ddba5 --- /dev/null +++ b/src/server/protocol_client.cc @@ -0,0 +1,326 @@ +// Copyright 2022, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// +#include "server/protocol_client.h" + +extern "C" { +#include "redis/rdb.h" +} + +#include +#include +#include +#include +#include +#include + +#include + +#include "base/logging.h" +#include "facade/dragonfly_connection.h" +#include "facade/redis_parser.h" +#include "server/error.h" +#include "server/journal/executor.h" +#include "server/journal/serializer.h" +#include "server/main_service.h" +#include "server/rdb_load.h" +#include "strings/human_readable.h" + +ABSL_FLAG(std::string, masterauth, "", "password for authentication with master"); + +namespace dfly { + +using namespace std; +using namespace util; +using namespace boost::asio; +using namespace facade; +using absl::GetFlag; +using absl::StrCat; + +namespace { + +int ResolveDns(std::string_view host, char* dest) { + struct addrinfo hints, *servinfo; + + memset(&hints, 0, sizeof(hints)); + hints.ai_family = AF_UNSPEC; + hints.ai_socktype = SOCK_STREAM; + hints.ai_protocol = IPPROTO_TCP; + hints.ai_flags = AI_ALL; + + int res = getaddrinfo(host.data(), NULL, &hints, &servinfo); + if (res != 0) + return res; + + static_assert(INET_ADDRSTRLEN < INET6_ADDRSTRLEN); + + // If possible, we want to use an IPv4 address. + char ipv4_addr[INET6_ADDRSTRLEN]; + bool found_ipv4 = false; + char ipv6_addr[INET6_ADDRSTRLEN]; + bool found_ipv6 = false; + + for (addrinfo* p = servinfo; p != NULL; p = p->ai_next) { + CHECK(p->ai_family == AF_INET || p->ai_family == AF_INET6); + if (p->ai_family == AF_INET && !found_ipv4) { + struct sockaddr_in* ipv4 = (struct sockaddr_in*)p->ai_addr; + CHECK(nullptr != + inet_ntop(p->ai_family, (void*)&ipv4->sin_addr, ipv4_addr, INET6_ADDRSTRLEN)); + found_ipv4 = true; + break; + } else if (!found_ipv6) { + struct sockaddr_in6* ipv6 = (struct sockaddr_in6*)p->ai_addr; + CHECK(nullptr != + inet_ntop(p->ai_family, (void*)&ipv6->sin6_addr, ipv6_addr, INET6_ADDRSTRLEN)); + found_ipv6 = true; + } + } + + CHECK(found_ipv4 || found_ipv6); + memcpy(dest, found_ipv4 ? ipv4_addr : ipv6_addr, INET6_ADDRSTRLEN); + + freeaddrinfo(servinfo); + + return 0; +} + +error_code Recv(FiberSocketBase* input, base::IoBuf* dest) { + auto buf = dest->AppendBuffer(); + io::Result exp_size = input->Recv(buf); + if (!exp_size) + return exp_size.error(); + + dest->CommitWrite(*exp_size); + + return error_code{}; +} + +} // namespace + +std::string ProtocolClient::ServerContext::Description() const { + return absl::StrCat(host, ":", port); +} + +ProtocolClient::ProtocolClient(string host, uint16_t port) { + server_context_.host = std::move(host); + server_context_.port = port; +} + +ProtocolClient::~ProtocolClient() { + if (sock_) { + auto ec = sock_->Close(); + LOG_IF(ERROR, ec) << "Error closing socket " << ec; + } +} + +error_code ProtocolClient::ResolveMasterDns() { + char ip_addr[INET6_ADDRSTRLEN]; + int resolve_res = ResolveDns(server_context_.host, ip_addr); + if (resolve_res != 0) { + LOG(ERROR) << "Dns error " << gai_strerror(resolve_res) << ", host: " << server_context_.host; + return make_error_code(errc::host_unreachable); + } + LOG(INFO) << "Resetting endpoint! " << ip_addr << ", " << server_context_.port; + server_context_.endpoint = {ip::make_address(ip_addr), server_context_.port}; + + return error_code{}; +} + +error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms, + Context* cntx) { + ProactorBase* mythread = ProactorBase::me(); + CHECK(mythread); + { + unique_lock lk(sock_mu_); + // The context closes sock_. So if the context error handler has already + // run we must not create a new socket. sock_mu_ syncs between the two + // functions. + if (!cntx->IsCancelled()) { + sock_.reset(mythread->CreateSocket()); + serializer_.reset(new ReqSerializer(sock_.get())); + } else { + return cntx->GetError(); + } + } + + // We set this timeout because this call blocks other REPLICAOF commands. We don't need it for the + // rest of the sync. + { + uint32_t timeout = sock_->timeout(); + sock_->set_timeout(connect_timeout_ms.count()); + RETURN_ON_ERR(sock_->Connect(server_context_.endpoint)); + sock_->set_timeout(timeout); + } + + /* These may help but require additional field testing to learn. + int yes = 1; + CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes))); + CHECK_EQ(0, setsockopt(sock_->native_handle(), SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes))); + + int intv = 15; + CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_KEEPIDLE, &intv, sizeof(intv))); + + intv /= 3; + CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_KEEPINTVL, &intv, sizeof(intv))); + + intv = 3; + CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_KEEPCNT, &intv, sizeof(intv))); + */ + auto masterauth = absl::GetFlag(FLAGS_masterauth); + if (!masterauth.empty()) { + ResetParser(false); + RETURN_ON_ERR(SendCommandAndReadResponse(StrCat("AUTH ", masterauth))); + PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK")); + } + return error_code{}; +} + +void ProtocolClient::CloseSocket() { + unique_lock lk(sock_mu_); + if (sock_) { + sock_->proactor()->Await([this] { + if (sock_->IsOpen()) { + auto ec = sock_->Shutdown(SHUT_RDWR); + LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec; + } + }); + } +} + +void ProtocolClient::DefaultErrorHandler(const GenericError& err) { + CloseSocket(); +} + +io::Result ProtocolClient::ReadRespReply(base::IoBuf* buffer, + bool copy_msg) { + DCHECK(parser_); + + error_code ec; + if (!buffer) { + buffer = &resp_buf_; + buffer->Clear(); + } + last_resp_ = ""; + + uint32_t processed_bytes = 0; + + RedisParser::Result result = RedisParser::OK; + while (!ec) { + uint32_t consumed; + if (buffer->InputLen() == 0 || result == RedisParser::INPUT_PENDING) { + io::MutableBytes buf = buffer->AppendBuffer(); + io::Result size_res = sock_->Recv(buf); + if (!size_res) { + LOG(ERROR) << "Socket error " << size_res.error(); + return nonstd::make_unexpected(size_res.error()); + } + + VLOG(2) << "Read master response of " << *size_res << " bytes"; + + TouchIoTime(); + buffer->CommitWrite(*size_res); + } + + result = parser_->Parse(buffer->InputBuffer(), &consumed, &resp_args_); + processed_bytes += consumed; + if (copy_msg) + last_resp_ += + std::string_view(reinterpret_cast(buffer->InputBuffer().data()), consumed); + + if (result == RedisParser::OK) { + return ReadRespRes{processed_bytes, consumed}; // success path + } + + buffer->ConsumeInput(consumed); + + if (result != RedisParser::INPUT_PENDING) { + LOG(ERROR) << "Invalid parser status " << result << " for response " << last_resp_; + return nonstd::make_unexpected(std::make_error_code(std::errc::bad_message)); + } + } + + return nonstd::make_unexpected(ec); +} + +error_code ProtocolClient::ReadLine(base::IoBuf* io_buf, string_view* line) { + size_t eol_pos; + std::string_view input_str = ToSV(io_buf->InputBuffer()); + + // consume whitespace. + while (true) { + auto it = find_if_not(input_str.begin(), input_str.end(), absl::ascii_isspace); + size_t ws_len = it - input_str.begin(); + io_buf->ConsumeInput(ws_len); + input_str = ToSV(io_buf->InputBuffer()); + if (!input_str.empty()) + break; + RETURN_ON_ERR(Recv(sock_.get(), io_buf)); + input_str = ToSV(io_buf->InputBuffer()); + }; + + // find eol. + while (true) { + eol_pos = input_str.find('\n'); + + if (eol_pos != std::string_view::npos) { + DCHECK_GT(eol_pos, 0u); // can not be 0 because then would be consumed as a whitespace. + if (input_str[eol_pos - 1] != '\r') { + break; + } + *line = input_str.substr(0, eol_pos - 1); + return error_code{}; + } + + RETURN_ON_ERR(Recv(sock_.get(), io_buf)); + input_str = ToSV(io_buf->InputBuffer()); + } + + LOG(ERROR) << "Bad replication header: " << input_str; + return std::make_error_code(std::errc::illegal_byte_sequence); +} + +bool ProtocolClient::CheckRespIsSimpleReply(string_view reply) const { + return resp_args_.size() == 1 && resp_args_.front().type == RespExpr::STRING && + ToSV(resp_args_.front().GetBuf()) == reply; +} + +bool ProtocolClient::CheckRespFirstTypes(initializer_list types) const { + unsigned i = 0; + for (RespExpr::Type type : types) { + if (i >= resp_args_.size() || resp_args_[i].type != type) + return false; + ++i; + } + return true; +} + +error_code ProtocolClient::SendCommand(string_view command) { + serializer_->SendCommand(command); + error_code ec = serializer_->ec(); + if (!ec) { + TouchIoTime(); + } + return ec; +} + +error_code ProtocolClient::SendCommandAndReadResponse(string_view command) { + last_cmd_ = command; + if (auto ec = SendCommand(command); ec) + return ec; + auto response_res = ReadRespReply(); + return response_res.has_value() ? error_code{} : response_res.error(); +} + +void ProtocolClient::ResetParser(bool server_mode) { + parser_.reset(new RedisParser(server_mode)); +} + +uint64_t ProtocolClient::LastIoTime() const { + return last_io_time_; +} + +void ProtocolClient::TouchIoTime() { + last_io_time_ = Proactor()->GetMonotonicTimeNs(); +} + +} // namespace dfly diff --git a/src/server/protocol_client.h b/src/server/protocol_client.h new file mode 100644 index 000000000..9fb5e319a --- /dev/null +++ b/src/server/protocol_client.h @@ -0,0 +1,139 @@ +// Copyright 2022, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// +#pragma once + +#include + +#include +#include +#include + +#include "base/io_buf.h" +#include "facade/facade_types.h" +#include "facade/redis_parser.h" +#include "server/common.h" +#include "server/journal/types.h" +#include "server/version.h" +#include "util/fiber_socket_base.h" + +namespace facade { +class ReqSerializer; +}; // namespace facade + +namespace dfly { + +class Service; +class ConnectionContext; +class JournalExecutor; +struct JournalReader; + +// A helper class for implementing a Redis client that talks to a redis server. +// This class should be inherited from. +class ProtocolClient { + public: + ProtocolClient(std::string master_host, uint16_t port); + virtual ~ProtocolClient(); + + void CloseSocket(); // Close replica sockets. + + uint64_t LastIoTime() const; + void TouchIoTime(); + + protected: + struct ServerContext { + std::string host; + uint16_t port; + boost::asio::ip::tcp::endpoint endpoint; + + std::string Description() const; + }; + + // Constructing using a fully initialized ServerContext allows to skip + // the DNS resolution step. + explicit ProtocolClient(ServerContext context) : server_context_(std::move(context)) { + } + + std::error_code ResolveMasterDns(); // Resolve master dns + // Connect to master and authenticate if needed. + std::error_code ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms, Context* cntx); + + void DefaultErrorHandler(const GenericError& err); + + struct ReadRespRes { + uint32_t total_read; + uint32_t left_in_buffer; + }; + + // This function uses parser_ and cmd_args_ in order to consume a single response + // from the sock_. The output will reside in resp_args_. + // For error reporting purposes, the parsed command would be in last_resp_ if copy_msg is true. + // If io_buf is not given, a internal temporary buffer will be used. + // It is the responsibility of the caller to call buffer->ConsumeInput(rv.left_in_buffer) when it + // is done with the result of the call; Calling ConsumeInput may invalidate the data in the result + // if the buffer relocates. + io::Result ReadRespReply(base::IoBuf* buffer = nullptr, bool copy_msg = true); + + std::error_code ReadLine(base::IoBuf* io_buf, std::string_view* line); + + // Check if reps_args contains a simple reply. + bool CheckRespIsSimpleReply(std::string_view reply) const; + + // Check resp_args contains the following types at front. + bool CheckRespFirstTypes(std::initializer_list types) const; + + // Send command, update last_io_time, return error. + std::error_code SendCommand(std::string_view command); + // Send command, read response into resp_args_. + std::error_code SendCommandAndReadResponse(std::string_view command); + + const ServerContext& server() const { + return server_context_; + } + + void ResetParser(bool server_mode); + + auto& LastResponseArgs() { + return resp_args_; + } + + auto* Proactor() const { + return sock_->proactor(); + } + + util::LinuxSocketBase* Sock() const { + return sock_.get(); + } + + private: + ServerContext server_context_; + + std::unique_ptr serializer_; + std::unique_ptr parser_; + facade::RespVec resp_args_; + base::IoBuf resp_buf_; + + std::unique_ptr sock_; + Mutex sock_mu_; + + protected: + Context cntx_; // context for tasks in replica. + + std::string last_cmd_; + std::string last_resp_; + + uint64_t last_io_time_ = 0; // in ns, monotonic clock. +}; + +} // namespace dfly + +/** + * A convenience macro to use with ProtocolClient instances for protocol input validation. + */ +#define PC_RETURN_ON_BAD_RESPONSE(x) \ + do { \ + if (!(x)) { \ + LOG(ERROR) << "Bad response to \"" << last_cmd_ << "\": \"" << absl::CEscape(last_resp_); \ + return std::make_error_code(errc::bad_message); \ + } \ + } while (false) diff --git a/src/server/replica.cc b/src/server/replica.cc index 8ca2ddb78..348251830 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -17,6 +17,8 @@ extern "C" { #include #include +#include +#include #include "base/logging.h" #include "facade/dragonfly_connection.h" @@ -31,21 +33,12 @@ extern "C" { ABSL_FLAG(int, replication_acks_interval, 3000, "Interval between acks in milliseconds."); ABSL_FLAG(bool, enable_multi_shard_sync, false, "Execute multi shards commands on replica syncrhonized"); -ABSL_FLAG(std::string, masterauth, "", "password for authentication with master"); ABSL_FLAG(int, master_connect_timeout_ms, 20000, "Timeout for establishing connection to a replication master"); ABSL_FLAG(int, master_reconnect_timeout_ms, 1000, "Timeout for re-establishing connection to a replication master"); ABSL_DECLARE_FLAG(uint32_t, port); -#define RETURN_ON_BAD_RESPONSE(x) \ - do { \ - if (!(x)) { \ - LOG(ERROR) << "Bad response to \"" << last_cmd_ << "\": \"" << absl::CEscape(last_resp_); \ - return std::make_error_code(errc::bad_message); \ - } \ - } while (false) - namespace dfly { using namespace std; @@ -57,62 +50,6 @@ using absl::StrCat; namespace { -int ResolveDns(std::string_view host, char* dest) { - struct addrinfo hints, *servinfo; - - memset(&hints, 0, sizeof(hints)); - hints.ai_family = AF_UNSPEC; - hints.ai_socktype = SOCK_STREAM; - hints.ai_protocol = IPPROTO_TCP; - hints.ai_flags = AI_ALL; - - int res = getaddrinfo(host.data(), NULL, &hints, &servinfo); - if (res != 0) - return res; - - static_assert(INET_ADDRSTRLEN < INET6_ADDRSTRLEN, ""); - - // If possible, we want to use an IPv4 address. - char ipv4_addr[INET6_ADDRSTRLEN]; - bool found_ipv4 = false; - char ipv6_addr[INET6_ADDRSTRLEN]; - bool found_ipv6 = false; - - for (addrinfo* p = servinfo; p != NULL; p = p->ai_next) { - CHECK(p->ai_family == AF_INET || p->ai_family == AF_INET6); - if (p->ai_family == AF_INET && !found_ipv4) { - struct sockaddr_in* ipv4 = (struct sockaddr_in*)p->ai_addr; - CHECK(nullptr != - inet_ntop(p->ai_family, (void*)&ipv4->sin_addr, ipv4_addr, INET6_ADDRSTRLEN)); - found_ipv4 = true; - break; - } else if (!found_ipv6) { - struct sockaddr_in6* ipv6 = (struct sockaddr_in6*)p->ai_addr; - CHECK(nullptr != - inet_ntop(p->ai_family, (void*)&ipv6->sin6_addr, ipv6_addr, INET6_ADDRSTRLEN)); - found_ipv6 = true; - } - } - - CHECK(found_ipv4 || found_ipv6); - memcpy(dest, found_ipv4 ? ipv4_addr : ipv6_addr, INET6_ADDRSTRLEN); - - freeaddrinfo(servinfo); - - return 0; -} - -error_code Recv(FiberSocketBase* input, base::IoBuf* dest) { - auto buf = dest->AppendBuffer(); - io::Result exp_size = input->Recv(buf); - if (!exp_size) - return exp_size.error(); - - dest->CommitWrite(*exp_size); - - return error_code{}; -} - constexpr unsigned kRdbEofMarkSize = 40; // Distribute flow indices over all available threads (shard_set pool size). @@ -126,40 +63,13 @@ vector> Partition(unsigned num_flows) { } // namespace -std::string Replica::MasterContext::Description() const { - return absl::StrCat(host, ":", port); -} - Replica::Replica(string host, uint16_t port, Service* se, std::string_view id) - : service_(*se), id_{id} { - master_context_.host = std::move(host); - master_context_.port = port; -} - -Replica::Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* service, - std::shared_ptr shared_exe_data) - : service_(*service), master_context_(context) { - master_context_.dfly_flow_id = dfly_flow_id; - multi_shard_exe_ = shared_exe_data; - use_multi_shard_exe_sync_ = GetFlag(FLAGS_enable_multi_shard_sync); - executor_.reset(new JournalExecutor(service)); + : ProtocolClient(std::move(host), port), service_(*se), id_{id} { } Replica::~Replica() { - if (sync_fb_.IsJoinable()) { - sync_fb_.Join(); - } - if (acks_fb_.IsJoinable()) { - acks_fb_.Join(); - } - if (execution_fb_.IsJoinable()) { - execution_fb_.Join(); - } - - if (sock_) { - auto ec = sock_->Close(); - LOG_IF(ERROR, ec) << "Error closing replica socket " << ec; - } + sync_fb_.JoinIfNeeded(); + acks_fb_.JoinIfNeeded(); } static const char kConnErr[] = "could not connect to master: "; @@ -191,13 +101,12 @@ error_code Replica::Start(ConnectionContext* cntx) { // 2. Connect socket. VLOG(1) << "Connecting to master"; - ec = ConnectAndAuth(absl::GetFlag(FLAGS_master_connect_timeout_ms) * 1ms); + ec = ConnectAndAuth(absl::GetFlag(FLAGS_master_connect_timeout_ms) * 1ms, &cntx_); RETURN_ON_ERR(check_connection_error(ec, kConnErr)); // 3. Greet. VLOG(1) << "Greeting"; state_mask_.store(R_ENABLED | R_TCP_CONNECTED); - last_io_time_ = mythread->GetMonotonicTimeNs(); ec = Greet(); RETURN_ON_ERR(check_connection_error(ec, "could not greet master ")); @@ -214,25 +123,24 @@ void Replica::Stop() { state_mask_.store(0); // Specifically ~R_ENABLED. cntx_.Cancel(); // Context is fully resposible for cleanup. + waker_.notifyAll(); + // Make sure the replica fully stopped and did all cleanup, // so we can freely release resources (connections). - waker_.notifyAll(); - if (sync_fb_.IsJoinable()) - sync_fb_.Join(); - if (acks_fb_.IsJoinable()) - acks_fb_.Join(); + sync_fb_.JoinIfNeeded(); + acks_fb_.JoinIfNeeded(); } void Replica::Pause(bool pause) { VLOG(1) << "Pausing replication"; - sock_->proactor()->Await([&] { is_paused_ = pause; }); + Proactor()->Await([&] { is_paused_ = pause; }); } std::error_code Replica::TakeOver(std::string_view timeout) { VLOG(1) << "Taking over"; std::error_code ec; - sock_->proactor()->Await( + Proactor()->Await( [this, &ec, timeout] { ec = SendNextPhaseRequest(absl::StrCat("TAKEOVER ", timeout)); }); // If we successfully taken over, return and let server_family stop the replication. @@ -256,14 +164,14 @@ void Replica::MainReplicationFb() { ec = ResolveMasterDns(); if (ec) { - LOG(ERROR) << "Error resolving dns to " << master_context_.host << " " << ec; + LOG(ERROR) << "Error resolving dns to " << server().host << " " << ec; continue; } // Give a lower timeout for connect, because we're - ec = ConnectAndAuth(absl::GetFlag(FLAGS_master_reconnect_timeout_ms) * 1ms); + ec = ConnectAndAuth(absl::GetFlag(FLAGS_master_reconnect_timeout_ms) * 1ms, &cntx_); if (ec) { - LOG(ERROR) << "Error connecting to " << master_context_.Description() << " " << ec; + LOG(ERROR) << "Error connecting to " << server().Description() << " " << ec; continue; } VLOG(1) << "Replica socket connected"; @@ -275,7 +183,7 @@ void Replica::MainReplicationFb() { if ((state_mask_.load() & R_GREETED) == 0) { ec = Greet(); if (ec) { - LOG(INFO) << "Error greeting " << master_context_.Description() << " " << ec << " " + LOG(INFO) << "Error greeting " << server().Description() << " " << ec << " " << ec.message(); state_mask_.fetch_and(R_ENABLED); continue; @@ -292,7 +200,7 @@ void Replica::MainReplicationFb() { ec = InitiatePSync(); if (ec) { - LOG(WARNING) << "Error syncing with " << master_context_.Description() << " " << ec << " " + LOG(WARNING) << "Error syncing with " << server().Description() << " " << ec << " " << ec.message(); state_mask_.fetch_and(R_ENABLED); // reset all flags besides R_ENABLED continue; @@ -309,7 +217,7 @@ void Replica::MainReplicationFb() { else ec = ConsumeRedisStream(); - LOG(WARNING) << "Error stable sync with " << master_context_.Description() << " " << ec << " " + LOG(WARNING) << "Error stable sync with " << server().Description() << " " << ec << " " << ec.message(); state_mask_.fetch_and(R_ENABLED); } @@ -323,96 +231,35 @@ void Replica::MainReplicationFb() { VLOG(1) << "Main replication fiber finished"; } -error_code Replica::ResolveMasterDns() { - char ip_addr[INET6_ADDRSTRLEN]; - int resolve_res = ResolveDns(master_context_.host, ip_addr); - if (resolve_res != 0) { - LOG(ERROR) << "Dns error " << gai_strerror(resolve_res) << ", host: " << master_context_.host; - return make_error_code(errc::host_unreachable); - } - - master_context_.endpoint = {ip::make_address(ip_addr), master_context_.port}; - - return error_code{}; -} - -error_code Replica::ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms) { - ProactorBase* mythread = ProactorBase::me(); - CHECK(mythread); - { - unique_lock lk(sock_mu_); - // The context closes sock_. So if the context error handler has already - // run we must not create a new socket. sock_mu_ syncs between the two - // functions. - if (!cntx_.IsCancelled()) - sock_.reset(mythread->CreateSocket()); - else - return cntx_.GetError(); - } - - // We set this timeout because this call blocks other REPLICAOF commands. We don't need it for the - // rest of the sync. - { - uint32_t timeout = sock_->timeout(); - sock_->set_timeout(connect_timeout_ms.count()); - RETURN_ON_ERR(sock_->Connect(master_context_.endpoint)); - sock_->set_timeout(timeout); - } - - /* These may help but require additional field testing to learn. - int yes = 1; - CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes))); - CHECK_EQ(0, setsockopt(sock_->native_handle(), SOL_SOCKET, SO_KEEPALIVE, &yes, sizeof(yes))); - - int intv = 15; - CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_KEEPIDLE, &intv, sizeof(intv))); - - intv /= 3; - CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_KEEPINTVL, &intv, sizeof(intv))); - - intv = 3; - CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_KEEPCNT, &intv, sizeof(intv))); - */ - auto masterauth = absl::GetFlag(FLAGS_masterauth); - if (!masterauth.empty()) { - ReqSerializer serializer{sock_.get()}; - parser_.reset(new RedisParser{false}); - RETURN_ON_ERR(SendCommandAndReadResponse(StrCat("AUTH ", masterauth), &serializer)); - RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK")); - } - return error_code{}; -} - error_code Replica::Greet() { - parser_.reset(new RedisParser{false}); - ReqSerializer serializer{sock_.get()}; + ResetParser(false); VLOG(1) << "greeting message handling"; // Corresponds to server.repl_state == REPL_STATE_CONNECTING state in redis - RETURN_ON_ERR(SendCommandAndReadResponse("PING", &serializer)); // optional. - RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("PONG")); + RETURN_ON_ERR(SendCommandAndReadResponse("PING")); // optional. + PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("PONG")); // Corresponds to server.repl_state == REPL_STATE_SEND_HANDSHAKE condition in replication.c auto port = absl::GetFlag(FLAGS_port); - RETURN_ON_ERR(SendCommandAndReadResponse(StrCat("REPLCONF listening-port ", port), &serializer)); - RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK")); + RETURN_ON_ERR(SendCommandAndReadResponse(StrCat("REPLCONF listening-port ", port))); + PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK")); // Corresponds to server.repl_state == REPL_STATE_SEND_CAPA - RETURN_ON_ERR(SendCommandAndReadResponse("REPLCONF capa eof capa psync2", &serializer)); - RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK")); + RETURN_ON_ERR(SendCommandAndReadResponse("REPLCONF capa eof capa psync2")); + PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK")); // Announce that we are the dragonfly client. // Note that we currently do not support dragonfly->redis replication. - RETURN_ON_ERR(SendCommandAndReadResponse("REPLCONF capa dragonfly", &serializer)); - RETURN_ON_BAD_RESPONSE(CheckRespFirstTypes({RespExpr::STRING})); + RETURN_ON_ERR(SendCommandAndReadResponse("REPLCONF capa dragonfly")); + PC_RETURN_ON_BAD_RESPONSE(CheckRespFirstTypes({RespExpr::STRING})); - if (resp_args_.size() == 1) { // Redis - RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK")); - } else if (resp_args_.size() >= 3) { // it's dragonfly master. - RETURN_ON_BAD_RESPONSE(!HandleCapaDflyResp()); + if (LastResponseArgs().size() == 1) { // Redis + PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK")); + } else if (LastResponseArgs().size() >= 3) { // it's dragonfly master. + PC_RETURN_ON_BAD_RESPONSE(!HandleCapaDflyResp()); if (auto ec = ConfigureDflyMaster(); ec) return ec; } else { - RETURN_ON_BAD_RESPONSE(false); + PC_RETURN_ON_BAD_RESPONSE(false); } state_mask_.fetch_or(R_GREETED); @@ -422,10 +269,10 @@ error_code Replica::Greet() { std::error_code Replica::HandleCapaDflyResp() { // Response is: if (!CheckRespFirstTypes({RespExpr::STRING, RespExpr::STRING, RespExpr::INT64}) || - resp_args_[0].GetBuf().size() != CONFIG_RUN_ID_SIZE) + LastResponseArgs()[0].GetBuf().size() != CONFIG_RUN_ID_SIZE) return make_error_code(errc::bad_message); - int64 param_num_flows = get(resp_args_[2].u); + int64 param_num_flows = get(LastResponseArgs()[2].u); if (param_num_flows <= 0 || param_num_flows > 1024) { // sanity check, we support upto 1024 shards. // It's not that we can not support more but it's probably highly unlikely that someone @@ -434,13 +281,13 @@ std::error_code Replica::HandleCapaDflyResp() { return make_error_code(errc::bad_message); } - master_context_.master_repl_id = ToSV(resp_args_[0].GetBuf()); - master_context_.dfly_session_id = ToSV(resp_args_[1].GetBuf()); + master_context_.master_repl_id = ToSV(LastResponseArgs()[0].GetBuf()); + master_context_.dfly_session_id = ToSV(LastResponseArgs()[1].GetBuf()); num_df_flows_ = param_num_flows; - if (resp_args_.size() >= 4) { - RETURN_ON_BAD_RESPONSE(resp_args_[3].type == RespExpr::INT64); - master_context_.version = DflyVersion(get(resp_args_[3].u)); + if (LastResponseArgs().size() >= 4) { + PC_RETURN_ON_BAD_RESPONSE(LastResponseArgs()[3].type == RespExpr::INT64); + master_context_.version = DflyVersion(get(LastResponseArgs()[3].u)); } VLOG(1) << "Master id: " << master_context_.master_repl_id << ", sync id: " << master_context_.dfly_session_id << ", num journals: " << num_df_flows_ @@ -450,21 +297,19 @@ std::error_code Replica::HandleCapaDflyResp() { } std::error_code Replica::ConfigureDflyMaster() { - ReqSerializer serializer{sock_.get()}; - // We need to send this because we may require to use this for cluster commands. // this reason to send this here is that in other context we can get an error reply // since we are budy with the replication - RETURN_ON_ERR(SendCommandAndReadResponse(StrCat("REPLCONF CLIENT-ID ", id_), &serializer)); + RETURN_ON_ERR(SendCommandAndReadResponse(StrCat("REPLCONF CLIENT-ID ", id_))); if (!CheckRespIsSimpleReply("OK")) { LOG(WARNING) << "Bad REPLCONF CLIENT-ID response"; } // Tell the master our version if it supports REPLCONF CLIENT-VERSION if (master_context_.version > DflyVersion::VER0) { - RETURN_ON_ERR(SendCommandAndReadResponse( - StrCat("REPLCONF CLIENT-VERSION ", DflyVersion::CURRENT_VER), &serializer)); - RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK")); + RETURN_ON_ERR( + SendCommandAndReadResponse(StrCat("REPLCONF CLIENT-VERSION ", DflyVersion::CURRENT_VER))); + PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK")); } return error_code{}; @@ -473,8 +318,6 @@ std::error_code Replica::ConfigureDflyMaster() { error_code Replica::InitiatePSync() { base::IoBuf io_buf{128}; - ReqSerializer serializer{sock_.get()}; - // Corresponds to server.repl_state == REPL_STATE_SEND_PSYNC string id("?"); // corresponds to null master id and null offset int64_t offs = -1; @@ -483,7 +326,7 @@ error_code Replica::InitiatePSync() { offs = repl_offs_; // to try incremental sync. } - RETURN_ON_ERR(SendCommand(StrCat("PSYNC ", id, " ", offs), &serializer)); + RETURN_ON_ERR(SendCommand(StrCat("PSYNC ", id, " ", offs))); LOG(INFO) << "Starting full sync"; @@ -492,13 +335,12 @@ error_code Replica::InitiatePSync() { RETURN_ON_ERR(ParseReplicationHeader(&io_buf, &repl_header)); - ProactorBase* sock_thread = sock_->proactor(); string* token = absl::get_if(&repl_header.fullsync); size_t snapshot_size = SIZE_MAX; if (!token) { snapshot_size = absl::get(repl_header.fullsync); } - last_io_time_ = sock_thread->GetMonotonicTimeNs(); + TouchIoTime(); // we get token for diskless redis replication. For disk based replication // we get the snapshot size. @@ -506,7 +348,7 @@ error_code Replica::InitiatePSync() { // Start full sync state_mask_.fetch_or(R_SYNCING); - io::PrefixSource ps{io_buf.InputBuffer(), sock_.get()}; + io::PrefixSource ps{io_buf.InputBuffer(), Sock()}; // Set LOADING state. CHECK(service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING) == GlobalState::LOADING); @@ -542,7 +384,7 @@ error_code Replica::InitiatePSync() { CHECK(ps.UnusedPrefix().empty()); io_buf.ConsumeInput(io_buf.InputLen()); - last_io_time_ = sock_thread->GetMonotonicTimeNs(); + TouchIoTime(); } state_mask_.fetch_and(~R_SYNCING); @@ -573,7 +415,8 @@ error_code Replica::InitiateDflySync() { // Initialize shard flows. shard_flows_.resize(num_df_flows_); for (unsigned i = 0; i < num_df_flows_; ++i) { - shard_flows_[i].reset(new Replica(master_context_, i, &service_, multi_shard_exe_)); + shard_flows_[i].reset( + new DflyShardReplica(server(), master_context_, i, &service_, multi_shard_exe_)); } // Blocked on until all flows got full sync cut. @@ -590,7 +433,7 @@ error_code Replica::InitiateDflySync() { // Unblock all sockets. DefaultErrorHandler(ge); for (auto& flow : shard_flows_) - flow->CloseSocket(); + flow->Cancel(); }; RETURN_ON_ERR(cntx_.SwitchErrorHandler(std::move(err_handler))); @@ -625,7 +468,7 @@ error_code Replica::InitiateDflySync() { return cntx_.ReportError(ec); } - LOG(INFO) << absl::StrCat("Started full sync with ", master_context_.Description()); + LOG(INFO) << absl::StrCat("Started full sync with ", server().Description()); // Wait for all flows to receive full sync cut. // In case of an error, this is unblocked by the error handler. @@ -653,50 +496,50 @@ error_code Replica::ConsumeRedisStream() { io::NullSink null_sink; // we never reply back on the commands. ConnectionContext conn_context{&null_sink, nullptr}; conn_context.is_replicating = true; - parser_.reset(new RedisParser); - - ReqSerializer serializer{sock_.get()}; + ResetParser(true); // Master waits for this command in order to start sending replication stream. - RETURN_ON_ERR(SendCommand("REPLCONF ACK 0", &serializer)); + RETURN_ON_ERR(SendCommand("REPLCONF ACK 0")); VLOG(1) << "Before reading repl-log"; - // Redis sends eiher pings every "repl_ping_slave_period" time inside replicationCron(). + // Redis sends either pings every "repl_ping_slave_period" time inside replicationCron(). // or, alternatively, write commands stream coming from propagate() function. // Replica connection must send "REPLCONF ACK xxx" in order to make sure that master replication - // buffer gets disposed of already processed commands. + // buffer gets disposed of already processed commands, this is done in a separate fiber. error_code ec; - time_t last_ack = time(nullptr); - string ack_cmd; - LOG(INFO) << "Transitioned into stable sync"; + facade::CmdArgVec args_vector; - // basically reflection of dragonfly_connection IoLoop function. - while (!ec) { - io::MutableBytes buf = io_buf.AppendBuffer(); - io::Result size_res = sock_->Recv(buf); - if (!size_res) - return size_res.error(); + acks_fb_ = MakeFiber(&Replica::RedisStreamAcksFb, this); - VLOG(1) << "Read replication stream of " << *size_res << " bytes"; - last_io_time_ = sock_->proactor()->GetMonotonicTimeNs(); - - io_buf.CommitWrite(*size_res); - repl_offs_ += *size_res; - - // Send repl ack back to master. - if (repl_offs_ > ack_offs_ + 1024 || time(nullptr) > last_ack + 5) { - ack_cmd.clear(); - absl::StrAppend(&ack_cmd, "REPLCONF ACK ", repl_offs_); - RETURN_ON_ERR(SendCommand(ack_cmd, &serializer)); + while (true) { + auto response = ReadRespReply(&io_buf, /*copy_msg=*/false); + if (!response.has_value()) { + VLOG(1) << "ConsumeRedisStream finished"; + acks_fb_.JoinIfNeeded(); + return response.error(); } - ec = ParseAndExecute(&io_buf, &conn_context); - } + if (!LastResponseArgs().empty()) { + VLOG(2) << "Got command " << absl::CHexEscape(ToSV(LastResponseArgs()[0].GetBuf())) + << "\n consumed: " << response->total_read; - VLOG(1) << "ConsumeRedisStream finished"; - return ec; + if (LastResponseArgs()[0].GetBuf()[0] == '\r') { + for (const auto& arg : LastResponseArgs()) { + LOG(INFO) << absl::CHexEscape(ToSV(arg.GetBuf())); + } + } + + facade::RespToArgList(LastResponseArgs(), &args_vector); + CmdArgList arg_list{args_vector.data(), args_vector.size()}; + service_.DispatchCommand(arg_list, &conn_context); + } + + io_buf.ConsumeInput(response->left_in_buffer); + repl_offs_ += response->total_read; + waker_.notify(); // Notify to trigger ACKs. + } } error_code Replica::ConsumeDflyStream() { @@ -706,8 +549,7 @@ error_code Replica::ConsumeDflyStream() { lock_guard lk{flows_op_mu_}; DefaultErrorHandler(ge); for (auto& flow : shard_flows_) { - flow->CloseSocket(); - flow->waker_.notifyAll(); + flow->Cancel(); } // Iterate over map and cancel all blocking entities @@ -747,29 +589,9 @@ error_code Replica::ConsumeDflyStream() { return cntx_.GetError(); } -void Replica::CloseSocket() { - unique_lock lk(sock_mu_); - if (sock_) { - sock_->proactor()->Await([this] { - if (sock_->IsOpen()) { - auto ec = sock_->Shutdown(SHUT_RDWR); - LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec; - } - }); - } -} - void Replica::JoinAllFlows() { for (auto& flow : shard_flows_) { - if (flow->sync_fb_.IsJoinable()) { - flow->sync_fb_.Join(); - } - if (flow->acks_fb_.IsJoinable()) { - flow->acks_fb_.Join(); - } - if (flow->execution_fb_.IsJoinable()) { - flow->execution_fb_.Join(); - } + flow->JoinFlow(); } } @@ -782,70 +604,69 @@ void Replica::DefaultErrorHandler(const GenericError& err) { } error_code Replica::SendNextPhaseRequest(string_view kind) { - ReqSerializer serializer{sock_.get()}; - // Ask master to start sending replication stream string request = StrCat("DFLY ", kind, " ", master_context_.dfly_session_id); VLOG(1) << "Sending: " << request; - RETURN_ON_ERR(SendCommandAndReadResponse(request, &serializer)); + RETURN_ON_ERR(SendCommandAndReadResponse(request)); - RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK")); + PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK")); return std::error_code{}; } -error_code Replica::StartFullSyncFlow(BlockingCounter sb, Context* cntx) { +error_code DflyShardReplica::StartFullSyncFlow(BlockingCounter sb, Context* cntx) { DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty()); - RETURN_ON_ERR(ConnectAndAuth(absl::GetFlag(FLAGS_master_connect_timeout_ms) * 1ms)); + RETURN_ON_ERR(ConnectAndAuth(absl::GetFlag(FLAGS_master_connect_timeout_ms) * 1ms, &cntx_)); VLOG(1) << "Sending on flow " << master_context_.master_repl_id << " " - << master_context_.dfly_session_id << " " << master_context_.dfly_flow_id; + << master_context_.dfly_session_id << " " << flow_id_; - ReqSerializer serializer{sock_.get()}; auto cmd = StrCat("DFLY FLOW ", master_context_.master_repl_id, " ", - master_context_.dfly_session_id, " ", master_context_.dfly_flow_id); + master_context_.dfly_session_id, " ", flow_id_); - parser_.reset(new RedisParser{false}); // client mode + ResetParser(/*server_mode=*/false); leftover_buf_.emplace(128); - RETURN_ON_ERR(SendCommandAndReadResponse(cmd, &serializer, &*leftover_buf_)); + RETURN_ON_ERR(SendCommand(cmd)); + auto read_resp = ReadRespReply(&*leftover_buf_); + if (!read_resp.has_value()) { + return read_resp.error(); + } - RETURN_ON_BAD_RESPONSE(CheckRespFirstTypes({RespExpr::STRING, RespExpr::STRING})); + PC_RETURN_ON_BAD_RESPONSE(CheckRespFirstTypes({RespExpr::STRING, RespExpr::STRING})); - string_view flow_directive = ToSV(resp_args_[0].GetBuf()); + string_view flow_directive = ToSV(LastResponseArgs()[0].GetBuf()); string eof_token; - RETURN_ON_BAD_RESPONSE(flow_directive == "FULL"); - eof_token = ToSV(resp_args_[1].GetBuf()); + PC_RETURN_ON_BAD_RESPONSE(flow_directive == "FULL"); + eof_token = ToSV(LastResponseArgs()[1].GetBuf()); - state_mask_.fetch_or(R_TCP_CONNECTED); + leftover_buf_->ConsumeInput(read_resp->left_in_buffer); // We can not discard io_buf because it may contain data // besides the response we parsed. Therefore we pass it further to ReplicateDFFb. - sync_fb_ = MakeFiber(&Replica::FullSyncDflyFb, this, move(eof_token), sb, cntx); + sync_fb_ = MakeFiber(&DflyShardReplica::FullSyncDflyFb, this, std::move(eof_token), sb, cntx); return error_code{}; } -error_code Replica::StartStableSyncFlow(Context* cntx) { +error_code DflyShardReplica::StartStableSyncFlow(Context* cntx) { DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty()); ProactorBase* mythread = ProactorBase::me(); CHECK(mythread); - CHECK(sock_->IsOpen()); - // sock_.reset(mythread->CreateSocket()); - // RETURN_ON_ERR(sock_->Connect(master_context_.master_ep)); - sync_fb_ = MakeFiber(&Replica::StableSyncDflyReadFb, this, cntx); + CHECK(Sock()->IsOpen()); + sync_fb_ = MakeFiber(&DflyShardReplica::StableSyncDflyReadFb, this, cntx); if (use_multi_shard_exe_sync_) { - execution_fb_ = MakeFiber(&Replica::StableSyncDflyExecFb, this, cntx); + execution_fb_ = MakeFiber(&DflyShardReplica::StableSyncDflyExecFb, this, cntx); } return std::error_code{}; } -void Replica::FullSyncDflyFb(string eof_token, BlockingCounter bc, Context* cntx) { +void DflyShardReplica::FullSyncDflyFb(const string& eof_token, BlockingCounter bc, Context* cntx) { DCHECK(leftover_buf_); - io::PrefixSource ps{leftover_buf_->InputBuffer(), sock_.get()}; + io::PrefixSource ps{leftover_buf_->InputBuffer(), Sock()}; RdbLoader loader(&service_); loader.SetFullSyncCutCb([bc, ran = false]() mutable { @@ -889,20 +710,20 @@ void Replica::FullSyncDflyFb(string eof_token, BlockingCounter bc, Context* cntx VLOG(1) << "FullSyncDflyFb finished after reading " << loader.bytes_read() << " bytes"; } -void Replica::StableSyncDflyReadFb(Context* cntx) { +void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) { // Check leftover from full sync. io::Bytes prefix{}; if (leftover_buf_ && leftover_buf_->InputLen() > 0) { prefix = leftover_buf_->InputBuffer(); } - io::PrefixSource ps{prefix, sock_.get()}; + io::PrefixSource ps{prefix, Sock()}; JournalReader reader{&ps, 0}; TransactionReader tx_reader{}; if (master_context_.version > DflyVersion::VER0) { - acks_fb_ = MakeFiber(&Replica::StableSyncDflyAcksFb, this, cntx); + acks_fb_ = MakeFiber(&DflyShardReplica::StableSyncDflyAcksFb, this, cntx); } while (!cntx->IsCancelled()) { @@ -916,7 +737,7 @@ void Replica::StableSyncDflyReadFb(Context* cntx) { if (!tx_data) break; - last_io_time_ = sock_->proactor()->GetMonotonicTimeNs(); + last_io_time_ = Proactor()->GetMonotonicTimeNs(); if (!tx_data->is_ping) { if (use_multi_shard_exe_sync_) { @@ -933,24 +754,46 @@ void Replica::StableSyncDflyReadFb(Context* cntx) { } } -void Replica::StableSyncDflyAcksFb(Context* cntx) { +void Replica::RedisStreamAcksFb() { constexpr size_t kAckRecordMaxInterval = 1024; std::chrono::duration ack_time_max_interval = 1ms * absl::GetFlag(FLAGS_replication_acks_interval); std::string ack_cmd; - ReqSerializer serializer{sock_.get()}; - auto next_ack_tp = std::chrono::steady_clock::now(); + while (!cntx_.IsCancelled()) { + VLOG(1) << "Sending an ACK with offset=" << repl_offs_; + ack_cmd = absl::StrCat("REPLCONF ACK ", repl_offs_); + next_ack_tp = std::chrono::steady_clock::now() + ack_time_max_interval; + if (auto ec = SendCommand(ack_cmd); ec) { + cntx_.ReportError(ec); + break; + } + ack_offs_ = repl_offs_; + + waker_.await_until( + [&]() { return repl_offs_ > ack_offs_ + kAckRecordMaxInterval || cntx_.IsCancelled(); }, + next_ack_tp); + } +} + +void DflyShardReplica::StableSyncDflyAcksFb(Context* cntx) { + constexpr size_t kAckRecordMaxInterval = 1024; + std::chrono::duration ack_time_max_interval = + 1ms * absl::GetFlag(FLAGS_replication_acks_interval); + std::string ack_cmd; + auto next_ack_tp = std::chrono::steady_clock::now(); + + uint64_t current_offset; while (!cntx->IsCancelled()) { // Handle ACKs with the master. PING opcodes from the master mean we should immediately // answer. - uint64_t current_offset = journal_rec_executed_.load(std::memory_order_relaxed); + current_offset = journal_rec_executed_.load(std::memory_order_relaxed); VLOG(1) << "Sending an ACK with offset=" << current_offset << " forced=" << force_ping_; ack_cmd = absl::StrCat("REPLCONF ACK ", current_offset); force_ping_ = false; next_ack_tp = std::chrono::steady_clock::now() + ack_time_max_interval; - if (auto ec = SendCommand(ack_cmd, &serializer); ec) { + if (auto ec = SendCommand(ack_cmd); ec) { cntx->ReportError(ec); break; } @@ -966,7 +809,20 @@ void Replica::StableSyncDflyAcksFb(Context* cntx) { } } -void Replica::ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx) { +DflyShardReplica::DflyShardReplica(ServerContext server_context, MasterContext master_context, + uint32_t flow_id, Service* service, + std::shared_ptr multi_shard_exe) + : ProtocolClient(server_context), service_(*service), master_context_(master_context), + multi_shard_exe_(multi_shard_exe), flow_id_(flow_id) { + use_multi_shard_exe_sync_ = GetFlag(FLAGS_enable_multi_shard_sync); + executor_ = std::make_unique(service); +} + +DflyShardReplica::~DflyShardReplica() { + JoinFlow(); +} + +void DflyShardReplica::ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx) { if (cntx->IsCancelled()) { return; } @@ -979,7 +835,7 @@ void Replica::ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx) ExecuteTx(std::move(tx_data), was_insert, cntx); } -bool Replica::InsertTxToSharedMap(const TransactionData& tx_data) { +bool DflyShardReplica::InsertTxToSharedMap(const TransactionData& tx_data) { std::lock_guard lk{multi_shard_exe_->map_mu}; auto [it, was_insert] = @@ -991,17 +847,17 @@ bool Replica::InsertTxToSharedMap(const TransactionData& tx_data) { return was_insert; } -void Replica::InsertTxDataToShardResource(TransactionData&& tx_data) { +void DflyShardReplica::InsertTxDataToShardResource(TransactionData&& tx_data) { bool was_insert = false; if (tx_data.shard_cnt > 1) { was_insert = InsertTxToSharedMap(tx_data); } VLOG(2) << "txid: " << tx_data.txid << " pushed to queue"; - trans_data_queue_.push(std::make_pair(std::move(tx_data), was_insert)); + trans_data_queue_.emplace(std::move(tx_data), was_insert); } -void Replica::StableSyncDflyExecFb(Context* cntx) { +void DflyShardReplica::StableSyncDflyExecFb(Context* cntx) { while (!cntx->IsCancelled()) { waker_.await([&]() { return (!trans_data_queue_.empty() || cntx->IsCancelled()); }); if (cntx->IsCancelled()) { @@ -1015,7 +871,7 @@ void Replica::StableSyncDflyExecFb(Context* cntx) { } } -void Replica::ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* cntx) { +void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* cntx) { if (cntx->IsCancelled()) { return; } @@ -1078,47 +934,6 @@ void Replica::ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* } } -error_code Replica::ReadRespReply(base::IoBuf* buffer) { - DCHECK(parser_); - - error_code ec; - if (!buffer) { - buffer = &resp_buf_; - buffer->Clear(); - } - last_resp_ = ""; - - // basically reflection of dragonfly_connection IoLoop function. - while (!ec) { - uint32_t consumed; - io::MutableBytes buf = buffer->AppendBuffer(); - io::Result size_res = sock_->Recv(buf); - if (!size_res) - return size_res.error(); - - VLOG(2) << "Read master response of " << *size_res << " bytes"; - - last_io_time_ = sock_->proactor()->GetMonotonicTimeNs(); - - buffer->CommitWrite(*size_res); - - RedisParser::Result result = parser_->Parse(buffer->InputBuffer(), &consumed, &resp_args_); - last_resp_ += std::string_view((char*)buffer->InputBuffer().data(), consumed); - buffer->ConsumeInput(consumed); - - if (result == RedisParser::OK && !resp_args_.empty()) { - return error_code{}; // success path - } - - if (result != RedisParser::INPUT_PENDING) { - LOG(ERROR) << "Invalid parser status " << result << " for response " << last_resp_; - return std::make_error_code(std::errc::bad_message); - } - } - - return ec; -} - error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* dest) { std::string_view str; @@ -1191,92 +1006,18 @@ bad_header: return std::make_error_code(std::errc::illegal_byte_sequence); } -error_code Replica::ReadLine(base::IoBuf* io_buf, string_view* line) { - size_t eol_pos; - std::string_view input_str = ToSV(io_buf->InputBuffer()); - - // consume whitespace. - while (true) { - auto it = find_if_not(input_str.begin(), input_str.end(), absl::ascii_isspace); - size_t ws_len = it - input_str.begin(); - io_buf->ConsumeInput(ws_len); - input_str = ToSV(io_buf->InputBuffer()); - if (!input_str.empty()) - break; - RETURN_ON_ERR(Recv(sock_.get(), io_buf)); - input_str = ToSV(io_buf->InputBuffer()); - }; - - // find eol. - while (true) { - eol_pos = input_str.find('\n'); - - if (eol_pos != std::string_view::npos) { - DCHECK_GT(eol_pos, 0u); // can not be 0 because then would be consumed as a whitespace. - if (input_str[eol_pos - 1] != '\r') { - break; - } - *line = input_str.substr(0, eol_pos - 1); - return error_code{}; - } - - RETURN_ON_ERR(Recv(sock_.get(), io_buf)); - input_str = ToSV(io_buf->InputBuffer()); - } - - LOG(ERROR) << "Bad replication header: " << input_str; - return std::make_error_code(std::errc::illegal_byte_sequence); -} - -error_code Replica::ParseAndExecute(base::IoBuf* io_buf, ConnectionContext* cntx) { - VLOG(1) << "ParseAndExecute: input len " << io_buf->InputLen(); - if (parser_->stash_size() > 0) { - DVLOG(1) << "Stash " << *parser_->stash()[0]; - } - - uint32_t consumed = 0; - RedisParser::Result result = RedisParser::OK; - - do { - result = parser_->Parse(io_buf->InputBuffer(), &consumed, &resp_args_); - - switch (result) { - case RedisParser::OK: - if (!resp_args_.empty()) { - VLOG(2) << "Got command " << ToSV(resp_args_[0].GetBuf()) << "\n consumed: " << consumed; - - facade::RespToArgList(resp_args_, &cmd_str_args_); - CmdArgList arg_list{cmd_str_args_.data(), cmd_str_args_.size()}; - service_.DispatchCommand(arg_list, cntx); - } - io_buf->ConsumeInput(consumed); - break; - case RedisParser::INPUT_PENDING: - io_buf->ConsumeInput(consumed); - break; - default: - LOG(ERROR) << "Invalid parser status " << result << " for buffer of size " - << io_buf->InputLen(); - return std::make_error_code(std::errc::bad_message); - } - } while (io_buf->InputLen() > 0 && result == RedisParser::OK); - VLOG(1) << "ParseAndExecute: " << io_buf->InputLen() << " " << ToSV(io_buf->InputBuffer()); - - return error_code{}; -} - Replica::Info Replica::GetInfo() const { - CHECK(sock_); + CHECK(Sock()); - return sock_->proactor()->AwaitBrief([this] { - auto last_io_time = last_io_time_; + return Proactor()->AwaitBrief([this] { + auto last_io_time = LastIoTime(); for (const auto& flow : shard_flows_) { // Get last io time from all sub flows. - last_io_time = std::max(last_io_time, flow->last_io_time_); + last_io_time = std::max(last_io_time, flow->LastIoTime()); } Info res; - res.host = master_context_.host; - res.port = master_context_.port; + res.host = server().host; + res.port = server().port; res.master_link_established = (state_mask_.load() & R_TCP_CONNECTED); res.full_sync_in_progress = (state_mask_.load() & R_SYNCING); res.full_sync_done = (state_mask_.load() & R_SYNC_OK); @@ -1289,8 +1030,8 @@ std::vector Replica::GetReplicaOffset() const { std::vector flow_rec_count; flow_rec_count.resize(shard_flows_.size()); for (const auto& flow : shard_flows_) { - uint32_t flow_id = flow->master_context_.dfly_flow_id; - uint64_t rec_count = flow->journal_rec_executed_.load(std::memory_order_relaxed); + uint32_t flow_id = flow->FlowId(); + uint64_t rec_count = flow->JournalExecutedCount(); DCHECK_LT(flow_id, shard_flows_.size()); flow_rec_count[flow_id] = rec_count; } @@ -1301,39 +1042,7 @@ std::string Replica::GetSyncId() const { return master_context_.dfly_session_id; } -bool Replica::CheckRespIsSimpleReply(string_view reply) const { - return resp_args_.size() == 1 && resp_args_.front().type == RespExpr::STRING && - ToSV(resp_args_.front().GetBuf()) == reply; -} - -bool Replica::CheckRespFirstTypes(initializer_list types) const { - unsigned i = 0; - for (RespExpr::Type type : types) { - if (i >= resp_args_.size() || resp_args_[i].type != type) - return false; - ++i; - } - return true; -} - -error_code Replica::SendCommand(string_view command, ReqSerializer* serializer) { - serializer->SendCommand(command); - error_code ec = serializer->ec(); - if (!ec) { - last_io_time_ = sock_->proactor()->GetMonotonicTimeNs(); - } - return ec; -} - -error_code Replica::SendCommandAndReadResponse(string_view command, ReqSerializer* serializer, - base::IoBuf* buffer) { - last_cmd_ = command; - if (auto ec = SendCommand(command, serializer); ec) - return ec; - return ReadRespReply(buffer); -} - -bool Replica::TransactionData::AddEntry(journal::ParsedEntry&& entry) { +bool DflyShardReplica::TransactionData::AddEntry(journal::ParsedEntry&& entry) { ++journal_rec_count; switch (entry.opcode) { @@ -1359,7 +1068,7 @@ bool Replica::TransactionData::AddEntry(journal::ParsedEntry&& entry) { return false; } -bool Replica::TransactionData::IsGlobalCmd() const { +bool DflyShardReplica::TransactionData::IsGlobalCmd() const { if (commands.size() > 1) { return false; } @@ -1380,14 +1089,15 @@ bool Replica::TransactionData::IsGlobalCmd() const { return false; } -Replica::TransactionData Replica::TransactionData::FromSingle(journal::ParsedEntry&& entry) { +DflyShardReplica::TransactionData DflyShardReplica::TransactionData::FromSingle( + journal::ParsedEntry&& entry) { TransactionData data; bool res = data.AddEntry(std::move(entry)); DCHECK(res); return data; } -auto Replica::TransactionReader::NextTxData(JournalReader* reader, Context* cntx) +auto DflyShardReplica::TransactionReader::NextTxData(JournalReader* reader, Context* cntx) -> optional { io::Result res; while (true) { @@ -1418,4 +1128,23 @@ auto Replica::TransactionReader::NextTxData(JournalReader* reader, Context* cntx return std::nullopt; } +uint32_t DflyShardReplica::FlowId() const { + return flow_id_; +} + +uint64_t DflyShardReplica::JournalExecutedCount() const { + return journal_rec_executed_.load(std::memory_order_relaxed); +} + +void DflyShardReplica::JoinFlow() { + sync_fb_.JoinIfNeeded(); + acks_fb_.JoinIfNeeded(); + execution_fb_.JoinIfNeeded(); +} + +void DflyShardReplica::Cancel() { + CloseSocket(); + waker_.notifyAll(); +} + } // namespace dfly diff --git a/src/server/replica.h b/src/server/replica.h index a1c2ed5e7..4ef14fb04 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -14,6 +14,7 @@ #include "facade/redis_parser.h" #include "server/common.h" #include "server/journal/types.h" +#include "server/protocol_client.h" #include "server/version.h" #include "util/fiber_socket_base.h" @@ -27,24 +28,35 @@ class Service; class ConnectionContext; class JournalExecutor; struct JournalReader; +class DflyShardReplica; -class Replica { - private: - // The attributes of the master we are connecting to. - struct MasterContext { - std::string host; - uint16_t port; - boost::asio::ip::tcp::endpoint endpoint; +// Coordinator for multi shard execution. +struct MultiShardExecution { + Mutex map_mu; - std::string master_repl_id; - std::string dfly_session_id; // Sync session id for dfly sync. - uint32_t dfly_flow_id = UINT32_MAX; // Flow id if replica acts as a dfly flow. + struct TxExecutionSync { + Barrier barrier; + std::atomic_uint32_t counter; + BlockingCounter block; - DflyVersion version = DflyVersion::VER0; - - std::string Description() const; + explicit TxExecutionSync(uint32_t counter) + : barrier(counter), counter(counter), block(counter) { + } }; + std::unordered_map tx_sync_execution; +}; + +// The attributes of the master we are connecting to. +struct MasterContext { + std::string master_repl_id; + std::string dfly_session_id; // Sync session id for dfly sync. + DflyVersion version = DflyVersion::VER0; +}; + +// This class manages replication from both Dragonfly and Redis masters. +class Replica : ProtocolClient { + private: // The flow is : R_ENABLED -> R_TCP_CONNECTED -> (R_SYNCING) -> R_SYNC_OK. // SYNCING means that the initial ack succeeded. It may be optional if we can still load from // the journal offset. @@ -56,8 +68,122 @@ class Replica { R_SYNC_OK = 0x10, }; + public: + Replica(std::string master_host, uint16_t port, Service* se, std::string_view id); + ~Replica(); + + // Spawns a fiber that runs until link with master is broken or the replication is stopped. + // Returns true if initial link with master has been established or + // false if it has failed. + std::error_code Start(ConnectionContext* cntx); + + void Stop(); // thread-safe + + void Pause(bool pause); + + std::error_code TakeOver(std::string_view timeout); + + std::string_view MasterId() const { + return master_context_.master_repl_id; + } + + private: /* Main standalone mode functions */ + // Coordinate state transitions. Spawned by start. + void MainReplicationFb(); + + std::error_code Greet(); // Send PING and REPLCONF. + + std::error_code HandleCapaDflyResp(); + std::error_code ConfigureDflyMaster(); + + std::error_code InitiatePSync(); // Redis full sync. + std::error_code InitiateDflySync(); // Dragonfly full sync. + + std::error_code ConsumeRedisStream(); // Redis stable state. + std::error_code ConsumeDflyStream(); // Dragonfly stable state. + + void RedisStreamAcksFb(); + + void JoinAllFlows(); // Join all flows if possible. + void SetShardStates(bool replica); // Call SetReplica(replica) on all shards. + + // Send DFLY ${kind} to the master instance. + std::error_code SendNextPhaseRequest(std::string_view kind); + + void DefaultErrorHandler(const GenericError& err); + + private: /* Utility */ + struct PSyncResponse { + // string - end of sync token (diskless) + // size_t - size of the full sync blob (disk-based). + // if fullsync is 0, it means that master can continue with partial replication. + std::variant fullsync; + }; + + std::error_code ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* dest); + + public: /* Utility */ + struct Info { + std::string host; + uint16_t port; + bool master_link_established; + bool full_sync_in_progress; + bool full_sync_done; + time_t master_last_io_sec; // monotonic clock. + }; + + Info GetInfo() const; // thread-safe, blocks fiber + + bool HasDflyMaster() const { + return !master_context_.dfly_session_id.empty(); + } + + const std::string& MasterHost() const { + return server().host; + } + + uint16_t Port() const { + return server().port; + } + + std::vector GetReplicaOffset() const; + std::string GetSyncId() const; + + private: + Service& service_; + MasterContext master_context_; + + // In redis replication mode. + Fiber sync_fb_; + Fiber acks_fb_; + EventCount waker_; + + std::vector> shard_flows_; + std::shared_ptr multi_shard_exe_; + + // Guard operations where flows might be in a mixed state (transition/setup) + Mutex flows_op_mu_; + + // repl_offs - till what offset we've already read from the master. + // ack_offs_ last acknowledged offset. + size_t repl_offs_ = 0, ack_offs_ = 0; + std::atomic state_mask_ = 0; + unsigned num_df_flows_ = 0; + + bool is_paused_ = false; + std::string id_; +}; + +// This class implements a single shard replication flow from a Dragonfly master instance. +// Multiple DflyShardReplica objects are managed by a Replica object. +class DflyShardReplica : public ProtocolClient { + public: + DflyShardReplica(ServerContext server_context, MasterContext master_context, uint32_t flow_id, + Service* service, std::shared_ptr multi_shard_exe); + ~DflyShardReplica(); + // This class holds the commands of transaction in single shard. - // Once all commands were received, the command can be executed. + // Once all commands were received, the transaction can be executed. struct TransactionData { // Update the data from ParsedEntry and return true if all shard transaction commands were // received. @@ -86,72 +212,8 @@ class Replica { absl::flat_hash_map current_; }; - // Coorindator for multi shard execution. - struct MultiShardExecution { - Mutex map_mu; - - struct TxExecutionSync { - Barrier barrier; - std::atomic_uint32_t counter; - BlockingCounter block; - - TxExecutionSync(uint32_t counter) : barrier(counter), counter(counter), block(counter) { - } - }; - - std::unordered_map tx_sync_execution; - }; - - public: - Replica(std::string master_host, uint16_t port, Service* se, std::string_view id); - ~Replica(); - - // Spawns a fiber that runs until link with master is broken or the replication is stopped. - // Returns true if initial link with master has been established or - // false if it has failed. - std::error_code Start(ConnectionContext* cntx); - - void Stop(); // thread-safe - - void Pause(bool pause); - - std::error_code TakeOver(std::string_view timeout); - - std::string_view MasterId() const { - return master_context_.master_repl_id; - } - - private: /* Main standalone mode functions */ - // Coordinate state transitions. Spawned by start. - void MainReplicationFb(); - - std::error_code ResolveMasterDns(); // Resolve master dns - // Connect to master and authenticate if needed. - std::error_code ConnectAndAuth(std::chrono::milliseconds connect_timeout_ms); - std::error_code Greet(); // Send PING and REPLCONF. - - std::error_code HandleCapaDflyResp(); - std::error_code ConfigureDflyMaster(); - - std::error_code InitiatePSync(); // Redis full sync. - std::error_code InitiateDflySync(); // Dragonfly full sync. - - std::error_code ConsumeRedisStream(); // Redis stable state. - std::error_code ConsumeDflyStream(); // Dragonfly stable state. - - void CloseSocket(); // Close replica sockets. - void JoinAllFlows(); // Join all flows if possible. - void SetShardStates(bool replica); // Call SetReplica(replica) on all shards. - - // Send DFLY ${kind} to the master instance. - std::error_code SendNextPhaseRequest(std::string_view kind); - - void DefaultErrorHandler(const GenericError& err); - - private: /* Main dlfly flow mode functions */ - // Initialize as single dfly flow. - Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* service, - std::shared_ptr shared_exe_data); + void Cancel(); + void JoinFlow(); // Start replica initialized as dfly flow. std::error_code StartFullSyncFlow(BlockingCounter block, Context* cntx); @@ -160,7 +222,7 @@ class Replica { std::error_code StartStableSyncFlow(Context* cntx); // Single flow full sync fiber spawned by StartFullSyncFlow. - void FullSyncDflyFb(std::string eof_token, BlockingCounter block, Context* cntx); + void FullSyncDflyFb(const std::string& eof_token, BlockingCounter block, Context* cntx); // Single flow stable state sync fiber spawned by StartStableSyncFlow. void StableSyncDflyReadFb(Context* cntx); @@ -169,81 +231,20 @@ class Replica { void StableSyncDflyExecFb(Context* cntx); - private: /* Utility */ - struct PSyncResponse { - // string - end of sync token (diskless) - // size_t - size of the full sync blob (disk-based). - // if fullsync is 0, it means that master can continue with partial replication. - std::variant fullsync; - }; - - // This function uses parser_ and cmd_args_ in order to consume a single response - // from the sock_. The output will reside in cmd_str_args_. - // For error reporting purposes, the parsed command would be in last_resp_. - // If io_buf is not given, a temporary buffer will be used. - std::error_code ReadRespReply(base::IoBuf* buffer = nullptr); - - std::error_code ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* header); - std::error_code ReadLine(base::IoBuf* io_buf, std::string_view* line); - - std::error_code ParseAndExecute(base::IoBuf* io_buf, ConnectionContext* cntx); - - // Check if reps_args contains a simple reply. - bool CheckRespIsSimpleReply(std::string_view reply) const; - - // Check resp_args contains the following types at front. - bool CheckRespFirstTypes(std::initializer_list types) const; - - // Send command, update last_io_time, return error. - std::error_code SendCommand(std::string_view command, facade::ReqSerializer* serializer); - // Send command, read response into resp_args_. - std::error_code SendCommandAndReadResponse(std::string_view command, - facade::ReqSerializer* serializer, - base::IoBuf* buffer = nullptr); - void ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* cntx); void InsertTxDataToShardResource(TransactionData&& tx_data); void ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx); bool InsertTxToSharedMap(const TransactionData& tx_data); - public: /* Utility */ - struct Info { - std::string host; - uint16_t port; - bool master_link_established; - bool full_sync_in_progress; - bool full_sync_done; - time_t master_last_io_sec; // monotonic clock. - }; + uint32_t FlowId() const; - Info GetInfo() const; // thread-safe, blocks fiber - - bool HasDflyMaster() const { - return !master_context_.dfly_session_id.empty(); - } - - bool IsDflyFlow() const { - return master_context_.dfly_flow_id != UINT32_MAX; - } - - const std::string& MasterHost() const { - return master_context_.host; - } - - uint16_t Port() const { - return master_context_.port; - } - - std::vector GetReplicaOffset() const; - std::string GetSyncId() const; + uint64_t JournalExecutedCount() const; private: Service& service_; MasterContext master_context_; - std::unique_ptr sock_; - Mutex sock_mu_; - std::shared_ptr multi_shard_exe_; + std::optional leftover_buf_; std::queue> trans_data_queue_; static constexpr size_t kYieldAfterItemsInQueue = 50; @@ -261,36 +262,16 @@ class Replica { // run out-of-order on the master instance. std::atomic_uint64_t journal_rec_executed_ = 0; - // MainReplicationFb in standalone mode, FullSyncDflyFb in flow mode. Fiber sync_fb_; + Fiber acks_fb_; + size_t ack_offs_ = 0; + bool force_ping_ = false; Fiber execution_fb_; - std::vector> shard_flows_; - - // Guard operations where flows might be in a mixed state (transition/setup) - Mutex flows_op_mu_; - - std::optional leftover_buf_; - std::unique_ptr parser_; - facade::RespVec resp_args_; - base::IoBuf resp_buf_; - std::string last_cmd_; - std::string last_resp_; - facade::CmdArgVec cmd_str_args_; - - Context cntx_; // context for tasks in replica. - - // repl_offs - till what offset we've already read from the master. - // ack_offs_ last acknowledged offset. - size_t repl_offs_ = 0, ack_offs_ = 0; - uint64_t last_io_time_ = 0; // in ns, monotonic clock. - std::atomic state_mask_ = 0; - unsigned num_df_flows_ = 0; - - bool is_paused_ = false; - std::string id_; + std::shared_ptr multi_shard_exe_; + uint32_t flow_id_ = UINT32_MAX; // Flow id if replica acts as a dfly flow. }; } // namespace dfly