feat(server): support cluster replication (#2748)

* feat(server): support cluster replication

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2024-03-26 15:26:19 +02:00 committed by GitHub
parent 3abee8a361
commit 2ad7439128
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
20 changed files with 626 additions and 138 deletions

View file

@ -427,8 +427,8 @@ namespace {
// Guards set configuration, so that we won't handle 2 in parallel.
util::fb2::Mutex set_config_mu;
void DeleteSlots(const SlotSet& slots) {
if (slots.Empty()) {
void DeleteSlots(const SlotRanges& slots_ranges) {
if (slots_ranges.empty()) {
return;
}
@ -437,23 +437,23 @@ void DeleteSlots(const SlotSet& slots) {
if (shard == nullptr)
return;
shard->db_slice().FlushSlots(slots);
shard->db_slice().FlushSlots(slots_ranges);
};
shard_set->pool()->AwaitFiberOnAll(std::move(cb));
}
void WriteFlushSlotsToJournal(const SlotSet& slots) {
if (slots.Empty()) {
void WriteFlushSlotsToJournal(const SlotRanges& slot_ranges) {
if (slot_ranges.empty()) {
return;
}
// Build args
vector<string> args;
args.reserve(slots.Count() + 1);
args.reserve(slot_ranges.size() + 1);
args.push_back("FLUSHSLOTS");
for (SlotId slot = 0; slot <= SlotSet::kMaxSlot; ++slot) {
if (slots.Contains(slot))
args.push_back(absl::StrCat(slot));
for (SlotRange range : slot_ranges) {
args.push_back(absl::StrCat(range.start));
args.push_back(absl::StrCat(range.end));
}
// Build view
@ -535,7 +535,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
SlotSet after = tl_cluster_config->GetOwnedSlots();
if (ServerState::tlocal()->is_master) {
auto deleted_slots = before.GetRemovedSlots(after);
auto deleted_slots = (before.GetRemovedSlots(after)).ToSlotRanges();
DeleteSlots(deleted_slots);
WriteFlushSlotsToJournal(deleted_slots);
}
@ -591,16 +591,18 @@ void ClusterFamily::DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* c
}
void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx) {
SlotSet slots;
for (size_t i = 0; i < args.size(); ++i) {
unsigned slot;
if (!absl::SimpleAtoi(ArgS(args, i), &slot) || (slot > ClusterConfig::kMaxSlotNum)) {
return cntx->SendError(kSyntaxErrType);
}
slots.Set(static_cast<SlotId>(slot), true);
}
SlotRanges slot_ranges;
DeleteSlots(slots);
CmdArgParser parser(args);
do {
auto [slot_start, slot_end] = parser.Next<SlotId, SlotId>();
slot_ranges.emplace_back(SlotRange{slot_start, slot_end});
} while (parser.HasNext());
if (auto err = parser.Error(); err)
return cntx->SendError(err->MakeReply());
DeleteSlots(slot_ranges);
return cntx->SendOk();
}

View file

@ -719,7 +719,7 @@ TEST_F(ClusterFamilyTest, FlushSlots) {
"total_writes", _, "memory_bytes", _)))));
ExpectConditionWithinTimeout([&]() {
return RunPrivileged({"dflycluster", "flushslots", "0"}) == "OK";
return RunPrivileged({"dflycluster", "flushslots", "0", "0"}) == "OK";
});
EXPECT_THAT(RunPrivileged({"dflycluster", "getslotinfo", "slots", "0", "1"}),

View file

@ -13,20 +13,23 @@ namespace dfly {
using SlotId = uint16_t;
struct SlotRange {
static constexpr SlotId kMaxSlotId = 0x3FFF;
SlotId start = 0;
SlotId end = 0;
bool operator==(const SlotRange& r) const {
return start == r.start && end == r.end;
}
bool IsValid() {
return start <= end && start <= kMaxSlotId && end <= kMaxSlotId;
}
};
using SlotRanges = std::vector<SlotRange>;
class SlotSet {
public:
static constexpr SlotId kMaxSlot = 0x3FFF;
static constexpr SlotId kSlotsNumber = kMaxSlot + 1;
static constexpr SlotId kSlotsNumber = SlotRange::kMaxSlotId + 1;
SlotSet(bool full_house = false) : slots_(std::make_unique<BitsetType>()) {
if (full_house)

View file

@ -395,4 +395,17 @@ std::string AbslUnparseFlag(const dfly::MemoryBytesFlag& flag) {
return strings::HumanReadableNumBytes(flag.value);
}
std::ostream& operator<<(std::ostream& os, const GlobalState& state) {
return os << GlobalStateName(state);
}
std::ostream& operator<<(std::ostream& os, ArgSlice list) {
os << "[";
if (!list.empty()) {
std::for_each(list.begin(), list.end() - 1, [&os](const auto& val) { os << val << ", "; });
os << (*(list.end() - 1));
}
return os << "]";
}
} // namespace dfly

View file

@ -160,6 +160,10 @@ enum class GlobalState : uint8_t {
TAKEN_OVER,
};
std::ostream& operator<<(std::ostream& os, const GlobalState& state);
std::ostream& operator<<(std::ostream& os, ArgSlice list);
enum class TimeUnit : uint8_t { SEC, MSEC };
inline void ToUpper(const MutableSlice* val) {

View file

@ -703,7 +703,6 @@ void DbSlice::FlushSlotsFb(const SlotSet& slot_ids) {
// We want to flush all the data of a slot that was added till the time the call to FlushSlotsFb
// was made. Therefore we delete slots entries with version < next_version
uint64_t next_version = NextVersion();
std::string tmp;
auto del_entry_cb = [&](PrimeTable::iterator it) {
std::string_view key = it->first.GetSlice(&tmp);
@ -730,10 +729,11 @@ void DbSlice::FlushSlotsFb(const SlotSet& slot_ids) {
etl.DecommitMemory(ServerState::kDataHeap);
}
void DbSlice::FlushSlots(SlotSet slot_ids) {
InvalidateSlotWatches(slot_ids);
fb2::Fiber("flush_slots", [this, slot_ids = std::move(slot_ids)]() mutable {
FlushSlotsFb(slot_ids);
void DbSlice::FlushSlots(SlotRanges slot_ranges) {
SlotSet slot_set(slot_ranges);
InvalidateSlotWatches(slot_set);
fb2::Fiber("flush_slots", [this, slot_set = std::move(slot_set)]() mutable {
FlushSlotsFb(slot_set);
}).Detach();
}

View file

@ -278,8 +278,8 @@ class DbSlice {
*/
void FlushDb(DbIndex db_ind);
// Flushes the data of given slot ids.
void FlushSlots(SlotSet slot_ids);
// Flushes the data of given slot ranges.
void FlushSlots(SlotRanges slot_ranges);
EngineShard* shard_owner() const {
return owner_;

View file

@ -536,8 +536,9 @@ void DebugCmd::Load(string_view filename) {
}
auto new_state = sf_.service().SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
if (new_state.first != GlobalState::LOADING) {
LOG(WARNING) << GlobalStateName(new_state.first) << " in progress, ignored";
if (new_state != GlobalState::LOADING) {
LOG(WARNING) << new_state << " in progress, ignored";
return cntx_->SendError("Could not load file");
}

View file

@ -115,9 +115,6 @@ struct SaveStagesController : public SaveStagesInputs {
// Build full path: get dir, try creating dirs, get filename with placeholder
GenericError BuildFullPath();
// Switch to saving state if in active state
GenericError SwitchState();
void SaveCb(unsigned index);
void CloseCb(unsigned index);

View file

@ -67,6 +67,11 @@ void JournalExecutor::FlushAll() {
Execute(cmd);
}
void JournalExecutor::FlushSlots(const SlotRange& slot_range) {
auto cmd = BuildFromParts("DFLYCLUSTER", "FLUSHSLOTS", slot_range.start, slot_range.end);
Execute(cmd);
}
void JournalExecutor::Execute(journal::ParsedEntry::CmdData& cmd) {
auto span = CmdArgList{cmd.cmd_args.data(), cmd.cmd_args.size()};
service_->DispatchCommand(span, &conn_context_);

View file

@ -7,6 +7,7 @@
#include <absl/types/span.h>
#include "facade/reply_capture.h"
#include "server/cluster/slot_set.h"
#include "server/journal/types.h"
namespace dfly {
@ -25,6 +26,7 @@ class JournalExecutor {
void Execute(DbIndex dbid, journal::ParsedEntry::CmdData& cmd);
void FlushAll(); // Execute FLUSHALL.
void FlushSlots(const SlotRange& slot_range);
ConnectionContext* connection_context() {
return &conn_context_;

View file

@ -1028,8 +1028,7 @@ std::optional<ErrorReply> Service::VerifyCommandState(const CommandId* cid, CmdA
}
if (!allowed_by_state) {
VLOG(1) << "Command " << cid->name() << " not executed because global state is "
<< GlobalStateName(gstate);
VLOG(1) << "Command " << cid->name() << " not executed because global state is " << gstate;
if (gstate == GlobalState::LOADING) {
return ErrorReply(kLoadingErr);
@ -2407,17 +2406,38 @@ VarzValue::Map Service::GetVarzStats() {
return res;
}
std::pair<GlobalState, bool> Service::SwitchState(GlobalState from, GlobalState to) {
GlobalState Service::SwitchState(GlobalState from, GlobalState to) {
lock_guard lk(mu_);
if (global_state_ != from)
return {global_state_, false};
VLOG(1) << "Switching state from " << GlobalStateName(from) << " to " << GlobalStateName(to);
if (global_state_ != from) {
return global_state_;
}
VLOG(1) << "Switching state from " << from << " to " << to;
global_state_ = to;
pp_.Await([&](ProactorBase*) { ServerState::tlocal()->set_gstate(to); });
return {to, true};
return to;
}
void Service::RequestLoadingState() {
unique_lock lk(mu_);
++loading_state_counter_;
if (global_state_ != GlobalState::LOADING) {
DCHECK_EQ(global_state_, GlobalState::ACTIVE);
lk.unlock();
SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
}
}
void Service::RemoveLoadingState() {
unique_lock lk(mu_);
DCHECK_EQ(global_state_, GlobalState::LOADING);
DCHECK_GT(loading_state_counter_, 0u);
--loading_state_counter_;
if (loading_state_counter_ == 0) {
lk.unlock();
SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
}
}
GlobalState Service::GetGlobalState() const {

View file

@ -84,9 +84,11 @@ class Service : public facade::ServiceInterface {
// Returns: the new state.
// if from equals the old state then the switch is performed "to" is returned.
// Otherwise, does not switch and returns the current state in the system.
// true if operation is successed
// Upon switch, updates cached global state in threadlocal ServerState struct.
std::pair<GlobalState, bool> SwitchState(GlobalState from, GlobalState to);
GlobalState SwitchState(GlobalState from, GlobalState to);
void RequestLoadingState();
void RemoveLoadingState();
GlobalState GetGlobalState() const;
@ -186,7 +188,8 @@ class Service : public facade::ServiceInterface {
const CommandId* exec_cid_; // command id of EXEC command for pipeline squashing
mutable util::fb2::Mutex mu_;
GlobalState global_state_ = GlobalState::ACTIVE; // protected by mu_;
GlobalState global_state_ ABSL_GUARDED_BY(mu_) = GlobalState::ACTIVE;
uint32_t loading_state_counter_ ABSL_GUARDED_BY(mu_) = 0;
};
uint64_t GetMaxMemoryFlag();

View file

@ -69,8 +69,9 @@ vector<vector<unsigned>> Partition(unsigned num_flows) {
} // namespace
Replica::Replica(string host, uint16_t port, Service* se, std::string_view id)
: ProtocolClient(std::move(host), port), service_(*se), id_{id} {
Replica::Replica(string host, uint16_t port, Service* se, std::string_view id,
std::optional<SlotRange> slot_range)
: ProtocolClient(std::move(host), port), service_(*se), id_{id}, slot_range_(slot_range) {
proactor_ = ProactorBase::me();
}
@ -385,13 +386,15 @@ error_code Replica::InitiatePSync() {
io::PrefixSource ps{io_buf.InputBuffer(), Sock()};
// Set LOADING state.
CHECK(service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING).first ==
GlobalState::LOADING);
absl::Cleanup cleanup = [this]() {
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
};
service_.RequestLoadingState();
absl::Cleanup cleanup = [this]() { service_.RemoveLoadingState(); };
if (slot_range_.has_value()) {
JournalExecutor{&service_}.FlushSlots(slot_range_.value());
} else {
JournalExecutor{&service_}.FlushAll();
}
JournalExecutor{&service_}.FlushAll();
RdbLoader loader(NULL);
loader.set_source_limit(snapshot_size);
// TODO: to allow registering callbacks within loader to send '\n' pings back to master.
@ -439,14 +442,6 @@ error_code Replica::InitiatePSync() {
error_code Replica::InitiateDflySync() {
auto start_time = absl::Now();
absl::Cleanup cleanup = [this]() {
// We do the following operations regardless of outcome.
JoinDflyFlows();
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
state_mask_.fetch_and(~R_SYNCING);
last_journal_LSNs_.reset();
};
// Initialize MultiShardExecution.
multi_shard_exe_.reset(new MultiShardExecution());
@ -476,11 +471,19 @@ error_code Replica::InitiateDflySync() {
RETURN_ON_ERR(cntx_.SwitchErrorHandler(std::move(err_handler)));
// Make sure we're in LOADING state.
CHECK(service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING).first ==
GlobalState::LOADING);
service_.RequestLoadingState();
// Start full sync flows.
state_mask_.fetch_or(R_SYNCING);
absl::Cleanup cleanup = [this]() {
// We do the following operations regardless of outcome.
JoinDflyFlows();
service_.RemoveLoadingState();
state_mask_.fetch_and(~R_SYNCING);
last_journal_LSNs_.reset();
};
std::string_view sync_type = "full";
{
// Going out of the way to avoid using std::vector<bool>...
@ -508,7 +511,11 @@ error_code Replica::InitiateDflySync() {
std::accumulate(is_full_sync.get(), is_full_sync.get() + num_df_flows_, 0);
if (num_full_flows == num_df_flows_) {
JournalExecutor{&service_}.FlushAll();
if (slot_range_.has_value()) {
JournalExecutor{&service_}.FlushSlots(slot_range_.value());
} else {
JournalExecutor{&service_}.FlushAll();
}
RdbLoader::PerformPreLoad(&service_);
} else if (num_full_flows == 0) {
sync_type = "partial";
@ -855,7 +862,7 @@ void Replica::RedisStreamAcksFb() {
auto next_ack_tp = std::chrono::steady_clock::now();
while (!cntx_.IsCancelled()) {
VLOG(1) << "Sending an ACK with offset=" << repl_offs_;
VLOG(2) << "Sending an ACK with offset=" << repl_offs_;
ack_cmd = absl::StrCat("REPLCONF ACK ", repl_offs_);
next_ack_tp = std::chrono::steady_clock::now() + ack_time_max_interval;
if (auto ec = SendCommand(ack_cmd); ec) {

View file

@ -12,6 +12,7 @@
#include "base/io_buf.h"
#include "facade/facade_types.h"
#include "facade/redis_parser.h"
#include "server/cluster/slot_set.h"
#include "server/common.h"
#include "server/journal/tx_executor.h"
#include "server/journal/types.h"
@ -53,7 +54,8 @@ class Replica : ProtocolClient {
};
public:
Replica(std::string master_host, uint16_t port, Service* se, std::string_view id);
Replica(std::string master_host, uint16_t port, Service* se, std::string_view id,
std::optional<SlotRange> slot_range);
~Replica();
// Spawns a fiber that runs until link with master is broken or the replication is stopped.
@ -170,6 +172,8 @@ class Replica : ProtocolClient {
bool is_paused_ = false;
std::string id_;
std::optional<SlotRange> slot_range_;
};
// This class implements a single shard replication flow from a Dragonfly master instance.

View file

@ -269,10 +269,6 @@ bool ValidateServerTlsFlags() {
return true;
}
bool IsReplicatingNoOne(string_view host, string_view port) {
return absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port, "one");
}
template <typename T> void UpdateMax(T* maxv, T current) {
*maxv = std::max(*maxv, current);
}
@ -554,6 +550,58 @@ string_view GetRedisMode() {
return ClusterConfig::IsEnabledOrEmulated() ? "cluster"sv : "standalone"sv;
}
struct ReplicaOfArgs {
string host;
uint16_t port;
std::optional<SlotRange> slot_range;
static optional<ReplicaOfArgs> FromCmdArgs(CmdArgList args, ConnectionContext* cntx);
bool IsReplicaOfNoOne() const {
return port == 0;
}
friend std::ostream& operator<<(std::ostream& os, const ReplicaOfArgs& args) {
if (args.IsReplicaOfNoOne()) {
return os << "NO ONE";
}
os << args.host << ":" << args.port;
if (args.slot_range.has_value()) {
os << " SLOTS [" << args.slot_range.value().start << "-" << args.slot_range.value().end
<< "]";
}
return os;
}
};
optional<ReplicaOfArgs> ReplicaOfArgs::FromCmdArgs(CmdArgList args, ConnectionContext* cntx) {
ReplicaOfArgs replicaof_args;
CmdArgParser parser(args);
if (parser.Check("NO").IgnoreCase().ExpectTail(1)) {
parser.ExpectTag("ONE");
replicaof_args.port = 0;
} else {
replicaof_args.host = parser.Next<string>();
replicaof_args.port = parser.Next<uint16_t>();
if (auto err = parser.Error(); err || replicaof_args.port < 1) {
cntx->SendError("port is out of range");
return nullopt;
}
if (parser.HasNext()) {
auto [slot_start, slot_end] = parser.Next<SlotId, SlotId>();
replicaof_args.slot_range = SlotRange{slot_start, slot_end};
if (auto err = parser.Error(); err || !replicaof_args.slot_range->IsValid()) {
cntx->SendError("Invalid slot range");
return nullopt;
}
}
}
if (auto err = parser.Error(); err) {
cntx->SendError(err->MakeReply());
return nullopt;
}
return replicaof_args;
}
} // namespace
std::optional<fb2::Fiber> Pause(absl::Span<facade::Listener* const> listeners,
@ -799,6 +847,7 @@ void ServerFamily::Shutdown() {
if (replica_) {
replica_->Stop();
}
StopAllClusterReplicas();
dfly_cmd_->Shutdown();
DebugCmd::Shutdown();
@ -828,8 +877,8 @@ fb2::Future<GenericError> ServerFamily::Load(const std::string& load_path) {
LOG(INFO) << "Loading " << load_path;
auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
if (new_state.first != GlobalState::LOADING) {
LOG(WARNING) << GlobalStateName(new_state.first) << " in progress, ignored";
if (new_state != GlobalState::LOADING) {
LOG(WARNING) << new_state << " in progress, ignored";
return {};
}
@ -1239,11 +1288,6 @@ optional<Replica::Info> ServerFamily::GetReplicaInfo() const {
}
}
string ServerFamily::GetReplicaMasterId() const {
unique_lock lk(replicaof_mu_);
return string(replica_->MasterId());
}
void ServerFamily::OnClose(ConnectionContext* cntx) {
dfly_cmd_->OnClose(cntx);
}
@ -2108,15 +2152,21 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
// The replica pointer can still be mutated even while master=true,
// we don't want to drop the replica object in this fiber
unique_lock lk{replicaof_mu_};
Replica::Info rinfo = replica_->GetInfo();
append("master_host", rinfo.host);
append("master_port", rinfo.port);
const char* link = rinfo.master_link_established ? "up" : "down";
append("master_link_status", link);
append("master_last_io_seconds_ago", rinfo.master_last_io_sec);
append("master_sync_in_progress", rinfo.full_sync_in_progress);
append("master_replid", rinfo.master_id);
auto replication_info_cb = [&](Replica::Info rinfo) {
append("master_host", rinfo.host);
append("master_port", rinfo.port);
const char* link = rinfo.master_link_established ? "up" : "down";
append("master_link_status", link);
append("master_last_io_seconds_ago", rinfo.master_last_io_sec);
append("master_sync_in_progress", rinfo.full_sync_in_progress);
append("master_replid", rinfo.master_id);
};
replication_info_cb(replica_->GetInfo());
for (const auto& replica : cluster_replicas_) {
replication_info_cb(replica->GetInfo());
}
}
}
@ -2285,9 +2335,33 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
rb->SendBulkString((*ServerState::tlocal()).is_master ? "master" : "slave");
}
void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, ConnectionContext* cntx,
void ServerFamily::AddReplicaOf(CmdArgList args, ConnectionContext* cntx) {
unique_lock lk(replicaof_mu_);
if (ServerState::tlocal()->is_master) {
cntx->SendError("Calling ADDREPLICAOFF allowed only after server is already a replica");
return;
}
CHECK(replica_);
auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, cntx);
if (!replicaof_args.has_value()) {
return;
}
if (replicaof_args->IsReplicaOfNoOne()) {
return cntx->SendError("ADDREPLICAOF does not support no one");
}
LOG(INFO) << "Add Replica " << *replicaof_args;
auto add_replica = make_unique<Replica>(replicaof_args->host, replicaof_args->port, &service_,
master_replid(), replicaof_args->slot_range);
error_code ec = add_replica->Start(cntx);
if (!ec) {
cluster_replicas_.push_back(std::move(add_replica));
}
}
void ServerFamily::ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx,
ActionOnConnectionFail on_err) {
LOG(INFO) << "Replicating " << host << ":" << port_sv;
unique_lock lk(replicaof_mu_); // Only one REPLICAOF command can run at a time
// We should not execute replica of command while loading from snapshot.
@ -2296,40 +2370,43 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn
return;
}
auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, cntx);
if (!replicaof_args.has_value()) {
return;
}
LOG(INFO) << "Replicating " << *replicaof_args;
// If NO ONE was supplied, just stop the current replica (if it exists)
if (IsReplicatingNoOne(host, port_sv)) {
if (replicaof_args->IsReplicaOfNoOne()) {
if (!ServerState::tlocal()->is_master) {
CHECK(replica_);
SetMasterFlagOnAllThreads(true); // Flip flag before clearing replica
replica_->Stop();
replica_.reset();
StopAllClusterReplicas();
}
CHECK(service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE).first ==
GlobalState::ACTIVE)
CHECK_EQ(service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE), GlobalState::ACTIVE)
<< "Server is set to replica no one, yet state is not active!";
return cntx->SendOk();
}
uint32_t port;
if (!absl::SimpleAtoi(port_sv, &port) || port < 1 || port > 65535) {
cntx->SendError(kInvalidIntErr);
return;
}
// First, switch into the loading state
if (auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
new_state.first != GlobalState::LOADING) {
LOG(WARNING) << GlobalStateName(new_state.first) << " in progress, ignored";
cntx->SendError("Invalid state");
return;
}
// If any replication is in progress, stop it, cancellation should kick in immediately
if (replica_)
replica_->Stop();
StopAllClusterReplicas();
// First, switch into the loading state
if (auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
new_state != GlobalState::LOADING) {
LOG(WARNING) << new_state << " in progress, ignored";
cntx->SendError("Invalid state");
return;
}
// If we are called by "Replicate", cntx->transaction will be null but we do not need
// to flush anything.
@ -2338,7 +2415,9 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn
}
// Create a new replica and assing it
auto new_replica = make_shared<Replica>(string(host), port, &service_, master_replid());
auto new_replica = make_shared<Replica>(replicaof_args->host, replicaof_args->port, &service_,
master_replid(), replicaof_args->slot_range);
replica_ = new_replica;
// TODO: disconnect pending blocked clients (pubsub, blocking commands)
@ -2368,11 +2447,17 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn
}
}
void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
string_view host = ArgS(args, 0);
string_view port = ArgS(args, 1);
void ServerFamily::StopAllClusterReplicas() {
// Stop all cluster replication.
for (auto& replica : cluster_replicas_) {
replica->Stop();
replica.reset();
}
cluster_replicas_.clear();
}
ReplicaOfInternal(host, port, cntx, ActionOnConnectionFail::kReturnOnError);
void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
ReplicaOfInternal(args, cntx, ActionOnConnectionFail::kReturnOnError);
}
void ServerFamily::Replicate(string_view host, string_view port) {
@ -2380,7 +2465,14 @@ void ServerFamily::Replicate(string_view host, string_view port) {
ConnectionContext ctxt{&sink, nullptr};
ctxt.skip_acl_validation = true;
ReplicaOfInternal(host, port, &ctxt, ActionOnConnectionFail::kContinueReplication);
StringVec replicaof_params{string(host), string(port)};
CmdArgVec args_vec;
for (auto& s : replicaof_params) {
args_vec.emplace_back(MutableSlice{s.data(), s.size()});
}
CmdArgList args_list = absl::MakeSpan(args_vec);
ReplicaOfInternal(args_list, &ctxt, ActionOnConnectionFail::kContinueReplication);
}
void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) {
@ -2526,19 +2618,25 @@ void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) {
} else {
unique_lock lk{replicaof_mu_};
Replica::Info rinfo = replica_->GetInfo();
rb->StartArray(4);
rb->StartArray(4 + cluster_replicas_.size() * 3);
rb->SendBulkString("replica");
rb->SendBulkString(rinfo.host);
rb->SendBulkString(absl::StrCat(rinfo.port));
if (rinfo.full_sync_done) {
rb->SendBulkString("stable_sync");
} else if (rinfo.full_sync_in_progress) {
rb->SendBulkString("full_sync");
} else if (rinfo.master_link_established) {
rb->SendBulkString("preparation");
} else {
rb->SendBulkString("connecting");
auto send_replica_info = [rb](Replica::Info rinfo) {
rb->SendBulkString(rinfo.host);
rb->SendBulkString(absl::StrCat(rinfo.port));
if (rinfo.full_sync_done) {
rb->SendBulkString("stable_sync");
} else if (rinfo.full_sync_in_progress) {
rb->SendBulkString("full_sync");
} else if (rinfo.master_link_established) {
rb->SendBulkString("preparation");
} else {
rb->SendBulkString("connecting");
}
};
send_replica_info(replica_->GetInfo());
for (const auto& replica : cluster_replicas_) {
send_replica_info(replica->GetInfo());
}
}
}
@ -2720,7 +2818,8 @@ void ServerFamily::Register(CommandRegistry* registry) {
<< CI{"SHUTDOWN", CO::ADMIN | CO::NOSCRIPT | CO::LOADING, -1, 0, 0, acl::kShutDown}.HFUNC(
ShutdownCmd)
<< CI{"SLAVEOF", kReplicaOpts, 3, 0, 0, acl::kSlaveOf}.HFUNC(ReplicaOf)
<< CI{"REPLICAOF", kReplicaOpts, 3, 0, 0, acl::kReplicaOf}.HFUNC(ReplicaOf)
<< CI{"REPLICAOF", kReplicaOpts, -3, 0, 0, acl::kReplicaOf}.HFUNC(ReplicaOf)
<< CI{"ADDREPLICAOF", kReplicaOpts, 5, 0, 0, acl::kReplicaOf}.HFUNC(AddReplicaOf)
<< CI{"REPLTAKEOVER", CO::ADMIN | CO::GLOBAL_TRANS, -2, 0, 0, acl::kReplTakeOver}.HFUNC(
ReplTakeOver)
<< CI{"REPLCONF", CO::ADMIN | CO::LOADING, -1, 0, 0, acl::kReplConf}.HFUNC(ReplConf)

View file

@ -210,7 +210,6 @@ class ServerFamily {
bool HasReplica() const;
std::optional<Replica::Info> GetReplicaInfo() const;
std::string GetReplicaMasterId() const;
void OnClose(ConnectionContext* cntx);
@ -243,6 +242,7 @@ class ServerFamily {
void LastSave(CmdArgList args, ConnectionContext* cntx);
void Latency(CmdArgList args, ConnectionContext* cntx);
void ReplicaOf(CmdArgList args, ConnectionContext* cntx);
void AddReplicaOf(CmdArgList args, ConnectionContext* cntx);
void ReplTakeOver(CmdArgList args, ConnectionContext* cntx);
void ReplConf(CmdArgList args, ConnectionContext* cntx);
void Role(CmdArgList args, ConnectionContext* cntx);
@ -261,8 +261,7 @@ class ServerFamily {
};
// REPLICAOF implementation. See arguments above
void ReplicaOfInternal(std::string_view host, std::string_view port, ConnectionContext* cntx,
ActionOnConnectionFail on_error);
void ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx, ActionOnConnectionFail on_error);
// Returns the number of loaded keys if successful.
io::Result<size_t> LoadRdb(const std::string& rdb_file);
@ -282,6 +281,7 @@ class ServerFamily {
bool ignore_state = false);
GenericError WaitUntilSaveFinished(Transaction* trans, bool ignore_state = false);
void StopAllClusterReplicas();
util::fb2::Fiber snapshot_schedule_fb_;
util::fb2::Future<GenericError> load_result_;
@ -295,6 +295,8 @@ class ServerFamily {
mutable util::fb2::Mutex replicaof_mu_, save_mu_;
std::shared_ptr<Replica> replica_ ABSL_GUARDED_BY(replicaof_mu_);
std::vector<std::unique_ptr<Replica>> cluster_replicas_
ABSL_GUARDED_BY(replicaof_mu_); // used to replicating multiple nodes to single dragonfly
std::unique_ptr<ScriptMgr> script_mgr_;
std::unique_ptr<journal::Journal> journal_;

View file

@ -31,15 +31,6 @@ ABSL_FLAG(bool, force_epoll, false, "If true, uses epoll api instead iouring to
namespace dfly {
std::ostream& operator<<(std::ostream& os, ArgSlice list) {
os << "[";
if (!list.empty()) {
std::for_each(list.begin(), list.end() - 1, [&os](const auto& val) { os << val << ", "; });
os << (*(list.end() - 1));
}
return os << "]";
}
std::ostream& operator<<(std::ostream& os, const DbStats& stats) {
os << "keycount: " << stats.key_count << ", tiered_size: " << stats.tiered_size
<< ", tiered_entries: " << stats.tiered_entries << "\n";

View file

@ -823,7 +823,7 @@ double GetKeyWeight(Transaction* t, ShardId shard_id, const vector<double>& weig
OpResult<ScoredMap> OpUnion(EngineShard* shard, Transaction* t, string_view dest, AggType agg_type,
const vector<double>& weights, bool store) {
ArgSlice keys = t->GetShardArgs(shard->shard_id());
DVLOG(1) << "shard:" << shard->shard_id() << ", keys " << vector(keys.begin(), keys.end());
DVLOG(1) << "shard:" << shard->shard_id() << ", keys " << keys;
DCHECK(!keys.empty());
unsigned cmdargs_keys_offset = 1; // after {numkeys} for ZUNION
@ -872,7 +872,7 @@ ScoredMap ZSetFromSet(const PrimeValue& pv, double weight) {
OpResult<ScoredMap> OpInter(EngineShard* shard, Transaction* t, string_view dest, AggType agg_type,
const vector<double>& weights, bool store) {
ArgSlice keys = t->GetShardArgs(shard->shard_id());
DVLOG(1) << "shard:" << shard->shard_id() << ", keys " << vector(keys.begin(), keys.end());
DVLOG(1) << "shard:" << shard->shard_id() << ", keys " << keys;
DCHECK(!keys.empty());
unsigned removed_keys = 0;
@ -1345,7 +1345,7 @@ void BZPopMinMax(CmdArgList args, ConnectionContext* cntx, bool is_max) {
vector<ScoredMap> OpFetch(EngineShard* shard, Transaction* t) {
ArgSlice keys = t->GetShardArgs(shard->shard_id());
DVLOG(1) << "shard:" << shard->shard_id() << ", keys " << vector(keys.begin(), keys.end());
DVLOG(1) << "shard:" << shard->shard_id() << ", keys " << keys;
DCHECK(!keys.empty());
vector<ScoredMap> results;

View file

@ -9,12 +9,65 @@ from dataclasses import dataclass
from .instance import DflyInstanceFactory, DflyInstance
from .utility import *
from .replication_test import check_all_replicas_finished
from redis.cluster import RedisCluster
from redis.cluster import ClusterNode
from .proxy import Proxy
from . import dfly_args
BASE_PORT = 30001
class RedisClusterNode:
def __init__(self, port):
self.port = port
self.proc = None
def start(self):
self.proc = subprocess.Popen(
[
"redis-server-6.2.11",
f"--port {self.port}",
"--save ''",
"--cluster-enabled yes",
f"--cluster-config-file nodes_{self.port}.conf",
"--cluster-node-timeout 5000",
"--appendonly no",
"--protected-mode no",
"--repl-diskless-sync yes",
"--repl-diskless-sync-delay 0",
]
)
logging.debug(self.proc.args)
def stop(self):
self.proc.terminate()
try:
self.proc.wait(timeout=10)
except Exception as e:
pass
@pytest.fixture(scope="function")
def redis_cluster(port_picker):
# create redis client with 3 node with default slot configuration
# node1 slots 0-5460
# node2 slots 5461-10922
# node3 slots 10923-16383
ports = [port_picker.get_available_port() for i in range(3)]
nodes = [RedisClusterNode(port) for port in ports]
for node in nodes:
node.start()
time.sleep(1)
create_command = f'echo "yes" |redis-cli --cluster create {" ".join([f"127.0.0.1:{port}" for port in ports])}'
subprocess.run(create_command, shell=True)
time.sleep(4)
yield nodes
for node in nodes:
node.stop()
async def push_config(config, admin_connections):
logging.debug("Pushing config %s", config)
res = await asyncio.gather(
@ -1227,3 +1280,285 @@ async def test_cluster_fuzzymigration(
await close_clients(
cluster_client, *[node.admin_client for node in nodes], *[node.client for node in nodes]
)
def parse_lag(replication_info: str):
lags = re.findall("lag=([0-9]+)\r\n", replication_info)
assert len(lags) == 1
return int(lags[0])
async def await_no_lag(client: aioredis.Redis, timeout=10):
start = time.time()
while (time.time() - start) < timeout:
lag = parse_lag(await client.execute_command("info replication"))
print("current lag =", lag)
if lag == 0:
return
await asyncio.sleep(0.05)
raise RuntimeError("Lag did not reduced to 0!")
@dfly_args({"proactor_threads": 4})
async def test_replicate_cluster(df_local_factory: DflyInstanceFactory, df_seeder_factory):
"""
Create dragonfly cluster of 2 nodes.
Create additional dragonfly server in emulated mode.
Replicate the dragonfly cluster into a single dragonfly node.
Send traffic before replication start and while replicating.
Promote the replica to master and check data consistency between cluster and single node.
"""
replica = df_local_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
cluster_nodes = [
df_local_factory.create(admin_port=BASE_PORT + i + 1, cluster_mode="yes") for i in range(2)
]
# Start instances and connect clients
df_local_factory.start_all(cluster_nodes + [replica])
c_nodes = [node.client() for node in cluster_nodes]
c_replica = replica.client()
node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes))
config = f"""
[
{{
"slot_ranges": [ {{ "start": 0, "end": LAST_SLOT_CUTOFF }} ],
"master": {{ "id": "{node_ids[0]}", "ip": "localhost", "port": {cluster_nodes[0].port} }},
"replicas": []
}},
{{
"slot_ranges": [ {{ "start": NEXT_SLOT_CUTOFF, "end": 16383 }} ],
"master": {{ "id": "{node_ids[1]}", "ip": "localhost", "port": {cluster_nodes[1].port} }},
"replicas": []
}}
]
"""
await push_config(
config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"),
c_nodes,
)
# Fill instances with some data
seeder = df_seeder_factory.create(keys=2000, port=cluster_nodes[0].port, cluster_mode=True)
await seeder.run(target_deviation=0.1)
fill_task = asyncio.create_task(seeder.run())
# Start replication
await c_replica.execute_command("REPLICAOF localhost " + str(cluster_nodes[0].port) + " 0 5259")
await c_replica.execute_command(
"ADDREPLICAOF localhost " + str(cluster_nodes[1].port) + " 5260 16383"
)
# give seeder time to run.
await asyncio.sleep(1.0)
# Stop seeder
seeder.stop()
await fill_task
# wait for replication to finish
await asyncio.gather(*(asyncio.create_task(await_no_lag(c)) for c in c_nodes))
# promote replica to master and compare data
await c_replica.execute_command("REPLICAOF NO ONE")
capture = await seeder.capture()
assert await seeder.compare(capture, replica.port)
await disconnect_clients(*c_nodes, c_replica)
async def await_stable_sync(m_client: aioredis.Redis, replica_port, timeout=10):
start = time.time()
async def is_stable():
role = await m_client.execute_command("role")
return role == [
"master",
[["127.0.0.1", str(replica_port), "stable_sync"]],
]
while (time.time() - start) < timeout:
if await is_stable():
return
await asyncio.sleep(0.05)
raise RuntimeError("Failed to reach stable sync")
@dfly_args({"proactor_threads": 4})
async def test_replicate_disconnect_cluster(
df_local_factory: DflyInstanceFactory, df_seeder_factory
):
"""
Create dragonfly cluster of 2 nodes and additional dragonfly server in emulated mode.
Populate the cluster with data
Replicate the dragonfly cluster into a single dragonfly node and wait for stable sync
Break connection between cluster node 0 and replica and reconnect
Promote replica to master
Compare cluster data and replica data
"""
replica = df_local_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
cluster_nodes = [
df_local_factory.create(admin_port=BASE_PORT + i + 1, cluster_mode="yes") for i in range(2)
]
# Start instances and connect clients
df_local_factory.start_all(cluster_nodes + [replica])
c_nodes = [node.client() for node in cluster_nodes]
c_replica = replica.client()
node_ids = await asyncio.gather(*(get_node_id(c) for c in c_nodes))
config = f"""
[
{{
"slot_ranges": [ {{ "start": 0, "end": LAST_SLOT_CUTOFF }} ],
"master": {{ "id": "{node_ids[0]}", "ip": "localhost", "port": {cluster_nodes[0].port} }},
"replicas": []
}},
{{
"slot_ranges": [ {{ "start": NEXT_SLOT_CUTOFF, "end": 16383 }} ],
"master": {{ "id": "{node_ids[1]}", "ip": "localhost", "port": {cluster_nodes[1].port} }},
"replicas": []
}}
]
"""
await push_config(
config.replace("LAST_SLOT_CUTOFF", "5259").replace("NEXT_SLOT_CUTOFF", "5260"),
c_nodes,
)
# Fill instances with some data
seeder = df_seeder_factory.create(keys=2000, port=cluster_nodes[0].port, cluster_mode=True)
await seeder.run(target_deviation=0.1)
fill_task = asyncio.create_task(seeder.run())
proxy = Proxy("127.0.0.1", 1114, "127.0.0.1", cluster_nodes[0].port)
await proxy.start()
proxy_task = asyncio.create_task(proxy.serve())
# Start replication
await c_replica.execute_command("REPLICAOF localhost " + str(proxy.port) + " 0 5259")
await c_replica.execute_command(
"ADDREPLICAOF localhost " + str(cluster_nodes[1].port) + " 5260 16383"
)
# wait for replication to reach stable state on all nodes
await asyncio.gather(
*(asyncio.create_task(await_stable_sync(c, replica.port)) for c in c_nodes)
)
# break connection between first node and replica
await proxy.close(proxy_task)
await asyncio.sleep(3)
async def is_first_master_conn_down(conn):
info = await conn.execute_command("INFO REPLICATION")
print(info)
statuses = re.findall("master_link_status:(down|up)\r\n", info)
assert len(statuses) == 2
assert statuses[0] == "down"
assert statuses[1] == "up"
await is_first_master_conn_down(c_replica)
# start connection again
await proxy.start()
proxy_task = asyncio.create_task(proxy.serve())
seeder.stop()
await fill_task
# wait for stable sync on first master
await await_stable_sync(c_nodes[0], replica.port)
# wait for no lag on all cluster nodes
await asyncio.gather(*(asyncio.create_task(await_no_lag(c)) for c in c_nodes))
# promote replica to master and compare data
await c_replica.execute_command("REPLICAOF NO ONE")
capture = await seeder.capture()
assert await seeder.compare(capture, replica.port)
await disconnect_clients(*c_nodes, c_replica)
await proxy.close(proxy_task)
def is_offset_eq_master_repl_offset(replication_info: str):
offset = re.findall("offset=([0-9]+),", replication_info)
assert len(offset) == 1
print("current offset =", offset)
master_repl_offset = re.findall("master_repl_offset:([0-9]+)\r\n", replication_info)
assert len(master_repl_offset) == 1
print("current master_repl_offset =", master_repl_offset)
return int(offset[0]) == int(master_repl_offset[0])
async def await_eq_offset(client: aioredis.Redis, timeout=20):
start = time.time()
while (time.time() - start) < timeout:
if is_offset_eq_master_repl_offset(await client.execute_command("info replication")):
return
await asyncio.sleep(0.05)
raise RuntimeError("offset not equal!")
@dfly_args({"proactor_threads": 4})
async def test_replicate_redis_cluster(redis_cluster, df_local_factory, df_seeder_factory):
"""
Create redis cluster of 3 nodes.
Create dragonfly server in emulated mode.
Replicate the redis cluster into a single dragonfly node.
Send traffic before replication start and while replicating.
Promote the replica to master and check data consistency between cluster and single dragonfly node.
"""
replica = df_local_factory.create(admin_port=BASE_PORT, cluster_mode="emulated")
# Start instances and connect clients
df_local_factory.start_all([replica])
redis_cluster_nodes = redis_cluster
node_clients = [
aioredis.Redis(decode_responses=True, host="localhost", port=node.port)
for node in redis_cluster_nodes
]
c_replica = replica.client()
seeder = df_seeder_factory.create(
keys=2000, port=redis_cluster_nodes[0].port, cluster_mode=True
)
await seeder.run(target_deviation=0.1)
fill_task = asyncio.create_task(seeder.run())
# Start replication
await c_replica.execute_command(
"REPLICAOF localhost " + str(redis_cluster_nodes[0].port) + " 0 5460"
)
await c_replica.execute_command(
"ADDREPLICAOF localhost " + str(redis_cluster_nodes[1].port) + " 5461 10922"
)
await c_replica.execute_command(
"ADDREPLICAOF localhost " + str(redis_cluster_nodes[2].port) + " 10923 16383"
)
# give seeder time to run.
await asyncio.sleep(0.5)
# Stop seeder
seeder.stop()
await fill_task
# wait for replication to finish
await asyncio.gather(*(asyncio.create_task(await_eq_offset(client)) for client in node_clients))
await c_replica.execute_command("REPLICAOF NO ONE")
capture = await seeder.capture()
assert await seeder.compare(capture, replica.port)
await disconnect_clients(c_replica, *node_clients)