introduce --replicaof flag (#1583)

* introduce `--replicaof` flag

Closes #1381.

The behvaiour of `--replicaof` is similar to `REPLICAOF`. On startup, the instance continuously attempts to connect to master. Stop using the normal `REPLICAOF NO ONE` command.

The flag expects format `<IPv4/host>:<port>` or `[<IPv6>]:<port>`.

---------

Signed-off-by: talbii <ido@dragonflydb.io>
Signed-off-by: talbii <41526934+talbii@users.noreply.github.com>
This commit is contained in:
talbii 2023-08-09 14:42:08 +03:00 committed by GitHub
parent 734401098c
commit 16c2353faf
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 291 additions and 23 deletions

View file

@ -117,6 +117,15 @@ error_code Replica::Start(ConnectionContext* cntx) {
return {};
} // namespace dfly
void Replica::EnableReplication(ConnectionContext* cntx) {
VLOG(1) << "Enabling replication";
state_mask_.store(R_ENABLED); // set replica state to enabled
sync_fb_ = MakeFiber(&Replica::MainReplicationFb, this); // call replication fiber
(*cntx)->SendOk();
}
void Replica::Stop() {
VLOG(1) << "Stopping replication";
// Stops the loop in MainReplicationFb.
@ -1010,9 +1019,7 @@ bad_header:
}
Replica::Info Replica::GetInfo() const {
CHECK(Sock());
return Proactor()->AwaitBrief([this] {
auto f = [this]() {
auto last_io_time = LastIoTime();
for (const auto& flow : shard_flows_) { // Get last io time from all sub flows.
last_io_time = std::max(last_io_time, flow->LastIoTime());
@ -1026,7 +1033,22 @@ Replica::Info Replica::GetInfo() const {
res.full_sync_done = (state_mask_.load() & R_SYNC_OK);
res.master_last_io_sec = (ProactorBase::GetMonotonicTimeNs() - last_io_time) / 1000000000UL;
return res;
});
};
if (Sock())
return Proactor()->AwaitBrief(f);
else {
/**
* when this branch happens: there is a very short grace period
* where Sock() is not initialized, yet the server can
* receive ROLE/INFO commands. That period happens when launching
* an instance with '--replicaof' and then immediately
* sending a command.
*
* In that instance, we have to run f() on the current fiber.
*/
return f();
}
}
std::vector<uint64_t> Replica::GetReplicaOffset() const {

View file

@ -77,6 +77,11 @@ class Replica : ProtocolClient {
// false if it has failed.
std::error_code Start(ConnectionContext* cntx);
// Sets the server state to have replication enabled.
// It is like Start(), but does not attempt to establish
// a connection right-away, but instead lets MainReplicationFb do the work.
void EnableReplication(ConnectionContext* cntx);
void Stop(); // thread-safe
void Pause(bool pause);

View file

@ -25,6 +25,7 @@ extern "C" {
#include "base/logging.h"
#include "croncpp.h" // cron::cronexpr
#include "facade/dragonfly_connection.h"
#include "facade/reply_builder.h"
#include "io/file_util.h"
#include "io/proc_reader.h"
#include "server/command_registry.h"
@ -54,6 +55,18 @@ extern "C" {
using namespace std;
struct ReplicaOfFlag {
string host;
string port;
bool has_value() const {
return !host.empty() && !port.empty();
}
};
static bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err);
static std::string AbslUnparseFlag(const ReplicaOfFlag& flag);
ABSL_FLAG(string, dir, "", "working directory");
ABSL_FLAG(string, dbfilename, "dump-{timestamp}", "the filename to save/load the DB");
ABSL_FLAG(string, requirepass, "",
@ -68,6 +81,10 @@ ABSL_FLAG(bool, df_snapshot_format, true,
ABSL_FLAG(int, epoll_file_threads, 0,
"thread size for file workers when running in epoll mode, default is hardware concurrent "
"threads");
ABSL_FLAG(ReplicaOfFlag, replicaof, ReplicaOfFlag{},
"Specifies a host and port which point to a target master "
"to replicate. "
"Format should be <IPv4>:<PORT> or host:<PORT> or [<IPv6>]:<PORT>");
ABSL_DECLARE_FLAG(uint32_t, port);
ABSL_DECLARE_FLAG(bool, cache_mode);
@ -76,6 +93,54 @@ ABSL_DECLARE_FLAG(bool, tls);
ABSL_DECLARE_FLAG(string, tls_ca_cert_file);
ABSL_DECLARE_FLAG(string, tls_ca_cert_dir);
bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err) {
#define RETURN_ON_ERROR(cond, m) \
do { \
if ((cond)) { \
*err = m; \
LOG(WARNING) << "Error in parsing arguments for --replicaof: " << m; \
return false; \
} \
} while (0)
if (in.empty()) { // on empty flag "parse" nothing. If we return false then DF exists.
*flag = ReplicaOfFlag{};
return true;
}
auto pos = in.find_last_of(':');
RETURN_ON_ERROR(pos == string::npos, "missing ':'.");
string_view ip = in.substr(0, pos);
flag->port = in.substr(pos + 1);
RETURN_ON_ERROR(ip.empty() || flag->port.empty(), "IP/host or port are empty.");
// For IPv6: ip1.front == '[' AND ip1.back == ']'
// For IPv4: ip1.front != '[' AND ip1.back != ']'
// Together, this ip1.front == '[' iff ip1.back == ']', which can be implemented as XNOR (NOT XOR)
RETURN_ON_ERROR(((ip.front() == '[') ^ (ip.back() == ']')), "unclosed brackets.");
if (ip.front() == '[') {
// shortest possible IPv6 is '::1' (loopback)
RETURN_ON_ERROR(ip.length() <= 2, "IPv6 host name is too short");
flag->host = ip.substr(1, ip.length() - 2);
VLOG(1) << "received IP of type IPv6: " << flag->host;
} else {
flag->host = ip;
VLOG(1) << "received IP of type IPv4 (or a host): " << flag->host;
}
VLOG(1) << "--replicaof: Received " << flag->host << " : " << flag->port;
return true;
#undef RETURN_ON_ERROR
}
std::string AbslUnparseFlag(const ReplicaOfFlag& flag) {
return (flag.has_value()) ? absl::StrCat(flag.host, ":", flag.port) : "";
}
namespace dfly {
namespace fs = std::filesystem;
@ -453,6 +518,10 @@ void ValidateServerTlsFlags() {
}
}
bool IsReplicatingNoOne(string_view host, string_view port) {
return absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port, "one");
}
} // namespace
std::optional<SnapshotSpec> ParseSaveSchedule(string_view time) {
@ -594,6 +663,13 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
stats_caching_task_ =
pb_task_->AwaitBrief([&] { return pb_task_->AddPeriodic(period_ms, cache_cb); });
// check for '--replicaof' before loading anything
if (ReplicaOfFlag flag = GetFlag(FLAGS_replicaof); flag.has_value()) {
service_.proactor_pool().GetNextProactor()->Await(
[this, &flag]() { this->Replicate(flag.host, flag.port); });
return; // DONT load any snapshots
}
string flag_dir = GetFlag(FLAGS_dir);
if (IsCloudPath(flag_dir)) {
aws_ = make_unique<cloud::AWS>("s3");
@ -2000,12 +2076,10 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendBulkString((*ServerState::tlocal()).is_master ? "master" : "slave");
}
void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
std::string_view host = ArgS(args, 0);
std::string_view port_s = ArgS(args, 1);
void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, ConnectionContext* cntx,
ActionOnConnectionFail on_err) {
auto& pool = service_.proactor_pool();
LOG(INFO) << "Replicating " << host << ":" << port_s;
LOG(INFO) << "Replicating " << host << ":" << port_sv;
// We lock to protect global state changes that we perform during the replication setup:
// The replica_ pointer, GlobalState, and the DB itself (we do a flushall txn before syncing).
@ -2020,7 +2094,7 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Acquire replica lock";
unique_lock lk(replicaof_mu_);
if (absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port_s, "one")) {
if (IsReplicatingNoOne(host, port_sv)) {
if (!ServerState::tlocal()->is_master) {
auto repl_ptr = replica_;
CHECK(repl_ptr);
@ -2031,12 +2105,15 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
replica_.reset();
}
CHECK(service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE) == GlobalState::ACTIVE)
<< "Server is set to replica no one, yet state is not active!";
return (*cntx)->SendOk();
}
uint32_t port;
if (!absl::SimpleAtoi(port_s, &port) || port < 1 || port > 65535) {
if (!absl::SimpleAtoi(port_sv, &port) || port < 1 || port > 65535) {
(*cntx)->SendError(kInvalidIntErr);
return;
}
@ -2058,20 +2135,20 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
return;
}
// Flushing all the data after we marked this instance as replica.
Transaction* transaction = cntx->transaction;
transaction->Schedule();
auto cb = [](Transaction* t, EngineShard* shard) {
shard->db_slice().FlushDb(DbSlice::kDbAll);
return OpStatus::OK;
};
transaction->Execute(std::move(cb), true);
// Replica sends response in either case. No need to send response in this function.
// It's a bit confusing but simpler.
lk.unlock();
error_code ec = new_replica->Start(cntx);
error_code ec{};
switch (on_err) {
case ActionOnConnectionFail::kReturnOnError:
ec = new_replica->Start(cntx);
break;
case ActionOnConnectionFail::kContinueReplication: // set DF to replicate, and forget about it
new_replica->EnableReplication(cntx);
break;
};
VLOG(1) << "Acquire replica lock";
lk.lock();
@ -2091,6 +2168,27 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
}
}
void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
string_view host = ArgS(args, 0);
string_view port = ArgS(args, 1);
// don't flush if input is NO ONE
if (!IsReplicatingNoOne(host, port))
Drakarys(cntx->transaction, DbSlice::kDbAll);
ReplicaOfInternal(host, port, cntx, ActionOnConnectionFail::kReturnOnError);
}
void ServerFamily::Replicate(string_view host, string_view port) {
io::NullSink sink;
ConnectionContext ctxt{&sink, nullptr};
// we don't flush the database as the context is null
// (and also because there is nothing to flush)
ReplicaOfInternal(host, port, &ctxt, ActionOnConnectionFail::kContinueReplication);
}
void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Starting take over";
VLOG(1) << "Acquire replica lock";

View file

@ -163,6 +163,9 @@ class ServerFamily {
bool AwaitDispatches(absl::Duration timeout,
const std::function<bool(util::Connection*)>& filter);
// Sets the server to replicate another instance. Does not flush the database beforehand!
void Replicate(std::string_view host, std::string_view port);
private:
uint32_t shard_count() const {
return shard_set->size();
@ -192,7 +195,17 @@ class ServerFamily {
void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx);
// Returns the number of loaded keys if successfull.
enum ActionOnConnectionFail {
kReturnOnError, // if we fail to connect to master, return to err
kContinueReplication, // continue attempting to connect to master, regardless of initial
// failure
};
// REPLICAOF implementation. See arguments above
void ReplicaOfInternal(std::string_view host, std::string_view port, ConnectionContext* cntx,
ActionOnConnectionFail on_error);
// Returns the number of loaded keys if successful.
io::Result<size_t> LoadRdb(const std::string& rdb_file);
void SnapshotScheduling();

View file

@ -1371,3 +1371,133 @@ async def test_tls_replication(
await c_replica.close()
await c_master.close()
# busy wait for 'replica' instance to have replication status 'status'
async def wait_for_replica_status(replica: aioredis.Redis, status: str, wait_for_seconds=0.01):
while True:
await asyncio.sleep(wait_for_seconds)
info = await replica.info("replication")
if info["master_link_status"] == status:
return
@pytest.mark.asyncio
async def test_replicaof_flag(df_local_factory):
# tests --replicaof works under normal conditions
master = df_local_factory.create(
port=BASE_PORT,
proactor_threads=2,
)
# set up master
master.start()
c_master = aioredis.Redis(port=master.port)
await c_master.set("KEY", b"VALUE")
db_size = await c_master.dbsize()
assert 1 == db_size
replica = df_local_factory.create(
port=BASE_PORT + 1,
proactor_threads=2,
replicaof=f"localhost:{BASE_PORT}", # start to replicate master
)
# set up replica. check that it is replicating
replica.start()
c_replica = aioredis.Redis(port=replica.port)
await wait_available_async(c_replica) # give it time to startup
await wait_for_replica_status(c_replica, status="up") # wait until we have a connection
dbsize = await c_replica.dbsize()
assert 1 == dbsize
val = await c_replica.get("KEY")
assert b"VALUE" == val
@pytest.mark.asyncio
async def test_replicaof_flag_replication_waits(df_local_factory):
# tests --replicaof works when we launch replication before the master
replica = df_local_factory.create(
port=BASE_PORT + 1,
proactor_threads=2,
replicaof=f"localhost:{BASE_PORT}", # start to replicate master
)
# set up replica first
replica.start()
c_replica = aioredis.Redis(port=replica.port)
await wait_for_replica_status(c_replica, status="down")
# check that it is in replica mode, yet status is down
info = await c_replica.info("replication")
assert info["role"] == "replica"
assert info["master_host"] == "localhost"
assert info["master_port"] == BASE_PORT
assert info["master_link_status"] == "down"
# set up master
master = df_local_factory.create(
port=BASE_PORT,
proactor_threads=2,
)
master.start()
c_master = aioredis.Redis(port=master.port)
await c_master.set("KEY", b"VALUE")
db_size = await c_master.dbsize()
assert 1 == db_size
# check that replication works now
await wait_for_replica_status(c_replica, status="up")
dbsize = await c_replica.dbsize()
assert 1 == dbsize
val = await c_replica.get("KEY")
assert b"VALUE" == val
@pytest.mark.asyncio
async def test_replicaof_flag_disconnect(df_local_factory):
# test stopping replication when started using --replicaof
master = df_local_factory.create(
port=BASE_PORT,
proactor_threads=2,
)
# set up master
master.start()
c_master = aioredis.Redis(port=master.port)
await wait_available_async(c_master)
await c_master.set("KEY", b"VALUE")
db_size = await c_master.dbsize()
assert 1 == db_size
replica = df_local_factory.create(
port=BASE_PORT + 1,
proactor_threads=2,
replicaof=f"localhost:{BASE_PORT}", # start to replicate master
)
# set up replica. check that it is replicating
replica.start()
c_replica = aioredis.Redis(port=replica.port)
await wait_available_async(c_replica)
await wait_for_replica_status(c_replica, status="up")
dbsize = await c_replica.dbsize()
assert 1 == dbsize
val = await c_replica.get("KEY")
assert b"VALUE" == val
await c_replica.replicaof("no", "one") # disconnect
role = await c_replica.role()
assert role[0] == b"master"