refactor(cluster): Move CLUSTER commands to their own methods. (#1230)

* Migrate cluster related commands to cluster_family

* Move MutexGuardedObject to its own file.

* Re-add mistakenly removed test

* Remove very cool and useful MutexGuardedObject :)

* clang-format
This commit is contained in:
Chaka 2023-05-18 14:38:57 +03:00 committed by GitHub
parent 7e3a8c8b96
commit 378a0907b2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 375 additions and 239 deletions

View file

@ -22,7 +22,8 @@ add_library(dragonfly_lib channel_store.cc command_registry.cc
set_family.cc stream_family.cc string_family.cc
zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc
serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc
top_keys.cc multi_command_squasher.cc hll_family.cc cluster/cluster_config.cc)
top_keys.cc multi_command_squasher.cc hll_family.cc cluster/cluster_config.cc
cluster/cluster_family.cc)
cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib aws_lib strings_lib html_lib
@ -60,4 +61,5 @@ add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY)
add_dependencies(check_dfly dragonfly_test json_family_test list_family_test
generic_family_test memcache_parser_test rdb_test journal_test
redis_parser_test snapshot_test stream_family_test string_family_test
bitops_family_test set_family_test zset_family_test hll_family_test cluster_config_test)
bitops_family_test set_family_test zset_family_test hll_family_test
cluster_config_test)

View file

@ -0,0 +1,266 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/cluster/cluster_family.h"
#include <mutex>
#include <string>
#include "base/flags.h"
#include "base/logging.h"
#include "facade/dragonfly_connection.h"
#include "facade/error.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/dflycmd.h"
#include "server/error.h"
#include "server/replica.h"
#include "server/server_family.h"
#include "server/server_state.h"
ABSL_FLAG(std::string, cluster_mode, "",
"Cluster mode supported."
"default: \"\"");
ABSL_FLAG(std::string, cluster_announce_ip, "", "ip that cluster commands announce to the client");
ABSL_DECLARE_FLAG(uint32_t, port);
namespace dfly {
namespace {
using namespace std;
using CI = CommandId;
void BuildClusterSlotNetworkInfo(ConnectionContext* cntx, std::string_view host, uint32_t port,
std::string_view id) {
constexpr unsigned int kNetworkInfoSize = 3;
(*cntx)->StartArray(kNetworkInfoSize);
(*cntx)->SendBulkString(host);
(*cntx)->SendLong(port);
(*cntx)->SendBulkString(id);
}
} // namespace
ClusterFamily::ClusterFamily(ServerFamily* server_family) : server_family_(server_family) {
CHECK_NOTNULL(server_family_);
string cluster_mode = absl::GetFlag(FLAGS_cluster_mode);
if (cluster_mode == "emulated") {
is_emulated_cluster_ = true;
} else if (cluster_mode == "yes") {
cluster_config_ = std::make_unique<ClusterConfig>(server_family_->master_id());
} else if (!cluster_mode.empty()) {
LOG(ERROR) << "invalid cluster_mode. Exiting...";
exit(1);
}
}
bool ClusterFamily::IsEnabledOrEmulated() const {
return is_emulated_cluster_ || ClusterConfig::IsClusterEnabled();
}
string ClusterFamily::BuildClusterNodeReply(ConnectionContext* cntx) const {
ServerState& etl = *ServerState::tlocal();
auto epoch_master_time = std::time(nullptr) * 1000;
if (etl.is_master) {
std::string cluster_announce_ip = absl::GetFlag(FLAGS_cluster_announce_ip);
std::string preferred_endpoint =
cluster_announce_ip.empty() ? cntx->owner()->LocalBindAddress() : cluster_announce_ip;
auto vec = server_family_->GetDflyCmd()->GetReplicasRoleInfo();
auto my_port = absl::GetFlag(FLAGS_port);
const char* connect_state = vec.empty() ? "disconnected" : "connected";
std::string msg = absl::StrCat(server_family_->master_id(), " ", preferred_endpoint, ":",
my_port, "@", my_port, " myself,master - 0 ", epoch_master_time,
" 1 ", connect_state, " 0-16383\r\n");
if (!vec.empty()) { // info about the replica
const auto& info = vec[0];
absl::StrAppend(&msg, etl.remote_client_id_, " ", info.address, ":", info.listening_port, "@",
info.listening_port, " slave 0 ", server_family_->master_id(), " 1 ",
connect_state, "\r\n");
}
return msg;
} else {
Replica::Info info = server_family_->GetReplicaInfo();
auto my_ip = cntx->owner()->LocalBindAddress();
auto my_port = absl::GetFlag(FLAGS_port);
const char* connect_state = info.master_link_established ? "connected" : "disconnected";
std::string msg = absl::StrCat(server_family_->master_id(), " ", my_ip, ":", my_port, "@",
my_port, " myself,slave ", server_family_->master_id(), " 0 ",
epoch_master_time, " 1 ", connect_state, "\r\n");
absl::StrAppend(&msg, server_family_->GetReplicaMasterId(), " ", info.host, ":", info.port, "@",
info.port, " master - 0 ", epoch_master_time, " 1 ", connect_state,
" 0-16383\r\n");
return msg;
}
}
void ClusterFamily::ClusterHelp(ConnectionContext* cntx) {
string_view help_arr[] = {
"CLUSTER <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",
"SLOTS",
" Return information about slots range mappings. Each range is made of:",
" start, end, master and replicas IP addresses, ports and ids.",
"NODES",
" Return cluster configuration seen by node. Output format:",
" <id> <ip:port> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ...",
"INFO",
" Return information about the cluster",
"HELP",
" Prints this help.",
};
return (*cntx)->SendSimpleStrArr(help_arr);
}
void ClusterFamily::ClusterSlots(ConnectionContext* cntx) {
// For more details https://redis.io/commands/cluster-slots/
constexpr unsigned int kClustersShardingCount = 1;
constexpr unsigned int kNoReplicaInfoSize = 3;
constexpr unsigned int kWithReplicaInfoSize = 4;
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) node ID
* 4) 1) replica IP (optional)
* 2) replica port
* 3) node ID
* ... note that in this case, only 1 slot
*/
ServerState& etl = *ServerState::tlocal();
// we have 3 cases here
// 1. This is a stand alone, in this case we only sending local information
// 2. We are the master, and we have replica, in this case send us as master
// 3. We are replica to a master, sends the information about us as replica
(*cntx)->StartArray(kClustersShardingCount);
if (etl.is_master) {
std::string cluster_announce_ip = absl::GetFlag(FLAGS_cluster_announce_ip);
std::string preferred_endpoint =
cluster_announce_ip.empty() ? cntx->owner()->LocalBindAddress() : cluster_announce_ip;
auto vec = server_family_->GetDflyCmd()->GetReplicasRoleInfo();
unsigned int info_len = vec.empty() ? kNoReplicaInfoSize : kWithReplicaInfoSize;
(*cntx)->StartArray(info_len);
(*cntx)->SendLong(0); // start sharding range
(*cntx)->SendLong(ClusterConfig::kMaxSlotNum); // end sharding range
BuildClusterSlotNetworkInfo(cntx, preferred_endpoint, absl::GetFlag(FLAGS_port),
server_family_->master_id());
if (!vec.empty()) { // info about the replica
const auto& info = vec[0];
BuildClusterSlotNetworkInfo(cntx, info.address, info.listening_port, etl.remote_client_id_);
}
} else {
Replica::Info info = server_family_->GetReplicaInfo();
(*cntx)->StartArray(kWithReplicaInfoSize);
(*cntx)->SendLong(0); // start sharding range
(*cntx)->SendLong(ClusterConfig::kMaxSlotNum); // end sharding range
BuildClusterSlotNetworkInfo(cntx, info.host, info.port, server_family_->GetReplicaMasterId());
BuildClusterSlotNetworkInfo(cntx, cntx->owner()->LocalBindAddress(), absl::GetFlag(FLAGS_port),
server_family_->master_id());
}
}
void ClusterFamily::ClusterNodes(ConnectionContext* cntx) {
// Support for NODES commands can help in case we are working in cluster mode
// In this case, we can save information about the cluster
// In case this is the master, it can save the information about the replica from this command
std::string msg = BuildClusterNodeReply(cntx);
(*cntx)->SendBulkString(msg);
}
void ClusterFamily::ClusterInfo(ConnectionContext* cntx) {
std::string msg;
auto append = [&msg](absl::AlphaNum a1, absl::AlphaNum a2) {
absl::StrAppend(&msg, a1, ":", a2, "\r\n");
};
// info command just return some stats about this instance
int known_nodes = 1;
long epoch = 1;
ServerState& etl = *ServerState::tlocal();
if (etl.is_master) {
auto vec = server_family_->GetDflyCmd()->GetReplicasRoleInfo();
if (!vec.empty()) {
known_nodes = 2;
}
} else {
if (server_family_->HasReplica()) {
known_nodes = 2;
epoch = server_family_->GetReplicaInfo().master_last_io_sec;
}
}
int cluster_size = known_nodes - 1;
append("cluster_state", "ok");
append("cluster_slots_assigned", ClusterConfig::kMaxSlotNum);
append("cluster_slots_ok", ClusterConfig::kMaxSlotNum);
append("cluster_slots_pfail", 0);
append("cluster_slots_fail", 0);
append("cluster_known_nodes", known_nodes);
append("cluster_size", cluster_size);
append("cluster_current_epoch", epoch);
append("cluster_my_epoch", 1);
append("cluster_stats_messages_ping_sent", 1);
append("cluster_stats_messages_pong_sent", 1);
append("cluster_stats_messages_sent", 1);
append("cluster_stats_messages_ping_received", 1);
append("cluster_stats_messages_pong_received", 1);
append("cluster_stats_messages_meet_received", 0);
append("cluster_stats_messages_received", 1);
(*cntx)->SendBulkString(msg);
}
void ClusterFamily::Cluster(CmdArgList args, ConnectionContext* cntx) {
// In emulated cluster mode, all slots are mapped to the same host, and number of cluster
// instances is thus 1.
ToUpper(&args[0]);
string_view sub_cmd = ArgS(args, 0);
if (!is_emulated_cluster_ && !ClusterConfig::IsClusterEnabled()) {
return (*cntx)->SendError(
"CLUSTER commands requires --cluster_mode=emulated or --cluster_mode=yes");
}
if (sub_cmd == "HELP") {
return ClusterHelp(cntx);
} else if (sub_cmd == "SLOTS") {
return ClusterSlots(cntx);
} else if (sub_cmd == "NODES") {
return ClusterNodes(cntx);
} else if (sub_cmd == "INFO") {
return ClusterInfo(cntx);
} else {
return (*cntx)->SendError(facade::UnknownSubCmd(sub_cmd, "CLUSTER"), facade::kSyntaxErrType);
}
}
void ClusterFamily::ReadOnly(CmdArgList args, ConnectionContext* cntx) {
if (!is_emulated_cluster_) {
return (*cntx)->SendError("READONLY command requires --cluster_mode=emulated");
}
(*cntx)->SendOk();
}
void ClusterFamily::ReadWrite(CmdArgList args, ConnectionContext* cntx) {
if (!is_emulated_cluster_) {
return (*cntx)->SendError("READWRITE command requires --cluster_mode=emulated");
}
(*cntx)->SendOk();
}
using EngineFunc = void (ClusterFamily::*)(CmdArgList args, ConnectionContext* cntx);
inline CommandId::Handler HandlerFunc(ClusterFamily* se, EngineFunc f) {
return [=](CmdArgList args, ConnectionContext* cntx) { return (se->*f)(args, cntx); };
}
#define HFUNC(x) SetHandler(HandlerFunc(this, &ClusterFamily::x))
void ClusterFamily::Register(CommandRegistry* registry) {
*registry << CI{"CLUSTER", CO::READONLY, 2, 0, 0, 0}.HFUNC(Cluster)
<< CI{"READONLY", CO::READONLY, 1, 0, 0, 0}.HFUNC(ReadOnly)
<< CI{"READWRITE", CO::READONLY, 1, 0, 0, 0}.HFUNC(ReadWrite);
}
} // namespace dfly

View file

@ -0,0 +1,50 @@
// Copyright 2023, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <memory>
#include <string>
#include "facade/conn_context.h"
#include "server/cluster/cluster_config.h"
#include "server/common.h"
namespace dfly {
class CommandRegistry;
class ConnectionContext;
class ServerFamily;
class DflyCmd;
class ClusterFamily {
public:
explicit ClusterFamily(ServerFamily* server_family);
void Register(CommandRegistry* registry);
bool IsEnabledOrEmulated() const;
ClusterConfig* cluster_config() {
return cluster_config_.get();
}
private:
void Cluster(CmdArgList args, ConnectionContext* cntx);
void ClusterHelp(ConnectionContext* cntx);
void ClusterSlots(ConnectionContext* cntx);
void ClusterNodes(ConnectionContext* cntx);
void ClusterInfo(ConnectionContext* cntx);
void ReadOnly(CmdArgList args, ConnectionContext* cntx);
void ReadWrite(CmdArgList args, ConnectionContext* cntx);
std::string BuildClusterNodeReply(ConnectionContext* cntx) const;
bool is_emulated_cluster_ = false;
ServerFamily* server_family_ = nullptr;
std::unique_ptr<ClusterConfig> cluster_config_;
};
} // namespace dfly

View file

@ -16,6 +16,7 @@
#include "core/json_object.h"
#include "facade/dragonfly_connection.h"
#include "server/cluster/cluster_config.h"
#include "server/cluster/cluster_family.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/journal.h"
@ -87,8 +88,9 @@ DflyCmd::ReplicaRoleInfo::ReplicaRoleInfo(std::string address, uint32_t listenin
}
}
DflyCmd::DflyCmd(util::ListenerInterface* listener, ServerFamily* server_family)
: sf_(server_family), listener_(listener) {
DflyCmd::DflyCmd(util::ListenerInterface* listener, ServerFamily* server_family,
ClusterFamily* cluster_family)
: sf_(server_family), listener_(listener), cluster_family_(cluster_family) {
}
void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
@ -390,7 +392,7 @@ void DflyCmd::ClusterConfig(CmdArgList args, ConnectionContext* cntx) {
return rb->SendError("Invalid JSON cluster config", kSyntaxErrType);
}
if (!sf_->cluster_config()->SetConfig(json.value())) {
if (!cluster_family_->cluster_config()->SetConfig(json.value())) {
return rb->SendError("Invalid cluster configuration.");
}
@ -401,7 +403,7 @@ void DflyCmd::ClusterManagmentCmd(CmdArgList args, ConnectionContext* cntx) {
if (!ClusterConfig::IsClusterEnabled()) {
return (*cntx)->SendError("DFLY CLUSTER commands requires --cluster_mode=yes");
}
CHECK_NE(sf_->cluster_config(), nullptr);
CHECK_NE(cluster_family_->cluster_config(), nullptr);
// TODO check admin port
ToUpper(&args[1]);

View file

@ -21,6 +21,7 @@ class ListenerInterface;
namespace dfly {
class ClusterFamily;
class EngineShardSet;
class ServerFamily;
class RdbSaver;
@ -118,7 +119,8 @@ class DflyCmd {
};
public:
DflyCmd(util::ListenerInterface* listener, ServerFamily* server_family);
DflyCmd(util::ListenerInterface* listener, ServerFamily* server_family,
ClusterFamily* cluster_family);
void Run(CmdArgList args, ConnectionContext* cntx);
@ -202,11 +204,13 @@ class DflyCmd {
facade::RedisReplyBuilder* rb);
private:
ServerFamily* sf_;
ServerFamily* sf_; // Not owned
util::ListenerInterface* listener_;
TxId journal_txid_ = 0;
ClusterFamily* cluster_family_; // Not owned
uint32_t next_sync_id_ = 1;
using ReplicaInfoMap = absl::btree_map<uint32_t, std::shared_ptr<ReplicaInfo>>;

View file

@ -23,6 +23,7 @@ extern "C" {
#include "facade/error.h"
#include "facade/reply_capture.h"
#include "server/bitops_family.h"
#include "server/cluster/cluster_family.h"
#include "server/conn_context.h"
#include "server/error.h"
#include "server/generic_family.h"
@ -518,7 +519,8 @@ ExecEvalState DetermineEvalPresense(const std::vector<StoredCmd>& body) {
} // namespace
Service::Service(ProactorPool* pp) : pp_(*pp), server_family_(this) {
Service::Service(ProactorPool* pp)
: pp_(*pp), server_family_(this), cluster_family_(&server_family_) {
CHECK(pp);
CHECK(shard_set == NULL);
@ -555,7 +557,7 @@ void Service::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_i
request_latency_usec.Init(&pp_);
StringFamily::Init(&pp_);
GenericFamily::Init(&pp_);
server_family_.Init(acceptor, main_interface);
server_family_.Init(acceptor, main_interface, &cluster_family_);
ChannelStore* cs = new ChannelStore{};
pp_.Await(
@ -619,7 +621,7 @@ bool Service::CheckKeysOwnership(const CommandId* cid, CmdArgList args,
return false;
}
// Check keys slot is in my ownership
if (keys_slot && !server_family_.cluster_config()->IsMySlot(*keys_slot)) {
if (keys_slot && !cluster_family_.cluster_config()->IsMySlot(*keys_slot)) {
(*dfly_cntx)->SendError("MOVED"); // TODO add more info to moved error.
return false;
}
@ -1834,6 +1836,7 @@ void Service::RegisterCommands() {
SearchFamily::Register(&registry_);
server_family_.Register(&registry_);
cluster_family_.Register(&registry_);
if (VLOG_IS_ON(1)) {
LOG(INFO) << "Multi-key commands are: ";

View file

@ -7,6 +7,7 @@
#include "base/varz_value.h"
#include "core/interpreter.h"
#include "facade/service_interface.h"
#include "server/cluster/cluster_family.h"
#include "server/command_registry.h"
#include "server/engine_shard_set.h"
#include "server/server_family.h"
@ -139,6 +140,7 @@ class Service : public facade::ServiceInterface {
util::ProactorPool& pp_;
ServerFamily server_family_;
ClusterFamily cluster_family_;
CommandRegistry registry_;
absl::flat_hash_map<std::string, unsigned> unknown_cmds_;

View file

@ -38,7 +38,6 @@ extern "C" {
#include "server/memory_cmd.h"
#include "server/rdb_load.h"
#include "server/rdb_save.h"
#include "server/replica.h"
#include "server/script_mgr.h"
#include "server/server_state.h"
#include "server/tiered_storage.h"
@ -62,10 +61,6 @@ ABSL_FLAG(string, save_schedule, "",
"glob spec for the UTC time to save a snapshot which matches HH:MM 24h time");
ABSL_FLAG(bool, df_snapshot_format, true,
"if true, save in dragonfly-specific snapshotting format");
ABSL_FLAG(string, cluster_mode, "",
"Cluster mode supported."
"default: \"\"");
ABSL_FLAG(string, cluster_announce_ip, "", "ip that cluster commands announce to the client");
ABSL_DECLARE_FLAG(uint32_t, port);
ABSL_DECLARE_FLAG(bool, cache_mode);
@ -421,16 +416,6 @@ void SlowLog(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendError(UnknownSubCmd(sub_cmd, "SLOWLOG"), kSyntaxErrType);
}
void BuildClusterSlotNetworkInfo(ConnectionContext* cntx, std::string_view host, uint32_t port,
std::string_view id) {
constexpr unsigned int kNetworkInfoSize = 3;
(*cntx)->StartArray(kNetworkInfoSize);
(*cntx)->SendBulkString(host);
(*cntx)->SendLong(port);
(*cntx)->SendBulkString(id);
}
} // namespace
std::optional<SnapshotSpec> ParseSaveSchedule(string_view time) {
@ -495,17 +480,6 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) {
DCHECK_EQ(CONFIG_RUN_ID_SIZE, master_id_.size());
}
string cluster_mode = GetFlag(FLAGS_cluster_mode);
if (cluster_mode == "emulated") {
is_emulated_cluster_ = true;
} else if (cluster_mode == "yes") {
cluster_config_ = std::make_unique<ClusterConfig>(master_id_);
} else if (!cluster_mode.empty()) {
LOG(ERROR) << "invalid cluster_mode. Exiting...";
exit(1);
}
if (auto ec = ValidateFilename(GetFlag(FLAGS_dbfilename), GetFlag(FLAGS_df_snapshot_format));
ec) {
LOG(ERROR) << ec.Format();
@ -516,11 +490,13 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) {
ServerFamily::~ServerFamily() {
}
void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_listener) {
void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_listener,
ClusterFamily* cluster_family) {
CHECK(acceptor_ == nullptr);
acceptor_ = acceptor;
main_listener_ = main_listener;
dfly_cmd_.reset(new DflyCmd(main_listener, this));
dfly_cmd_ = make_unique<DflyCmd>(main_listener, this, cluster_family);
cluster_family_ = cluster_family;
pb_task_ = shard_set->pool()->GetNextProactor();
if (pb_task_->GetKind() == ProactorBase::EPOLL) {
@ -925,6 +901,21 @@ std::optional<ReplicaOffsetInfo> ServerFamily::GetReplicaOffsetInfo() {
return nullopt;
}
bool ServerFamily::HasReplica() const {
unique_lock lk(replicaof_mu_);
return replica_ != nullptr;
}
Replica::Info ServerFamily::GetReplicaInfo() const {
unique_lock lk(replicaof_mu_);
return replica_->GetInfo();
}
string ServerFamily::GetReplicaMasterId() const {
unique_lock lk(replicaof_mu_);
return string(replica_->MasterId());
}
void ServerFamily::OnClose(ConnectionContext* cntx) {
dfly_cmd_->OnClose(cntx);
}
@ -1337,144 +1328,6 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "CLIENT"), kSyntaxErrType);
}
void ServerFamily::Cluster(CmdArgList args, ConnectionContext* cntx) {
// This command supports 2 sub options:
// 1. HELP
// 2. SLOTS: the slots are a mapping between sharding and hosts in the cluster.
// Note that as of the beginning of 2023 DF don't have cluster mode (i.e sharding across
// multiple hosts), as a results all shards are map to the same host (i.e. range is between and
// kEndSlot) and number of cluster sharding is thus == 1 (kClustersShardingCount). For more
// details https://redis.io/commands/cluster-slots/
constexpr unsigned int kEndSlot = 16383; // see redis code (cluster.c CLUSTER_SLOTS).
constexpr unsigned int kStartSlot = 0;
constexpr unsigned int kClustersShardingCount = 1;
constexpr unsigned int kNoReplicaInfoSize = 3;
constexpr unsigned int kWithReplicaInfoSize = 4;
ToUpper(&args[0]);
string_view sub_cmd = ArgS(args, 0);
if (!is_emulated_cluster_ && !ClusterConfig::IsClusterEnabled()) {
return (*cntx)->SendError(
"CLUSTER commands requires --cluster_mode=emulated or --cluster_mode=yes");
}
if (sub_cmd == "HELP") {
string_view help_arr[] = {
"CLUSTER <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",
"SLOTS",
" Return information about slots range mappings. Each range is made of:",
" start, end, master and replicas IP addresses, ports and ids.",
"NODES",
" Return cluster configuration seen by node. Output format:",
" <id> <ip:port> <flags> <master> <pings> <pongs> <epoch> <link> <slot> ...",
"INFO",
" Return information about the cluster",
"HELP",
" Prints this help.",
};
return (*cntx)->SendSimpleStrArr(help_arr);
}
if (sub_cmd == "SLOTS") {
/* Format: 1) 1) start slot
* 2) end slot
* 3) 1) master IP
* 2) master port
* 3) node ID
* 4) 1) replica IP (optional)
* 2) replica port
* 3) node ID
* ... note that in this case, only 1 slot
*/
ServerState& etl = *ServerState::tlocal();
// we have 3 cases here
// 1. This is a stand alone, in this case we only sending local information
// 2. We are the master, and we have replica, in this case send us as master
// 3. We are replica to a master, sends the information about us as replica
(*cntx)->StartArray(kClustersShardingCount);
if (etl.is_master) {
std::string cluster_announce_ip = GetFlag(FLAGS_cluster_announce_ip);
std::string preferred_endpoint =
cluster_announce_ip.empty() ? cntx->owner()->LocalBindAddress() : cluster_announce_ip;
auto vec = dfly_cmd_->GetReplicasRoleInfo();
unsigned int info_len = vec.empty() ? kNoReplicaInfoSize : kWithReplicaInfoSize;
(*cntx)->StartArray(info_len);
(*cntx)->SendLong(kStartSlot); // start sharding range
(*cntx)->SendLong(kEndSlot); // end sharding range
BuildClusterSlotNetworkInfo(cntx, preferred_endpoint, GetFlag(FLAGS_port), master_id());
if (!vec.empty()) { // info about the replica
const auto& info = vec[0];
BuildClusterSlotNetworkInfo(cntx, info.address, info.listening_port, etl.remote_client_id_);
}
} else {
unique_lock lk(replicaof_mu_); // make sure that this pointer stays alive!
auto replica_ptr = replica_;
CHECK(replica_ptr);
Replica::Info info = replica_ptr->GetInfo();
(*cntx)->StartArray(kWithReplicaInfoSize);
(*cntx)->SendLong(kStartSlot); // start sharding range
(*cntx)->SendLong(kEndSlot); // end sharding range
BuildClusterSlotNetworkInfo(cntx, info.host, info.port, replica_ptr->MasterId());
BuildClusterSlotNetworkInfo(cntx, cntx->owner()->LocalBindAddress(), GetFlag(FLAGS_port),
master_id());
}
return;
} else if (sub_cmd == "NODES") {
// Support for NODES commands can help in case we are working in cluster mode
// In this case, we can save information about the cluster
// In case this is the master, it can save the information about the replica from this command
std::string msg = BuildClusterNodeReply(cntx);
(*cntx)->SendBulkString(msg);
return;
} else if (sub_cmd == "INFO") {
std::string msg;
auto append = [&msg](absl::AlphaNum a1, absl::AlphaNum a2) {
absl::StrAppend(&msg, a1, ":", a2, "\r\n");
};
// info command just return some stats about this instance
int known_nodes = 1;
long epoch = 1;
ServerState& etl = *ServerState::tlocal();
if (etl.is_master) {
auto vec = dfly_cmd_->GetReplicasRoleInfo();
if (!vec.empty()) {
known_nodes = 2;
}
} else {
if (replica_) {
known_nodes = 2;
unique_lock lk(replicaof_mu_); // make sure that this pointer stays alive!
auto replica_ptr = replica_;
CHECK(replica_ptr);
epoch = replica_ptr->GetInfo().master_last_io_sec;
}
}
int cluster_size = known_nodes - 1;
append("cluster_state", "ok");
append("cluster_slots_assigned", kEndSlot);
append("cluster_slots_ok", kEndSlot);
append("cluster_slots_pfail", 0);
append("cluster_slots_fail", 0);
append("cluster_known_nodes", known_nodes);
append("cluster_size", cluster_size);
append("cluster_current_epoch", epoch);
append("cluster_my_epoch", 1);
append("cluster_stats_messages_ping_sent", 1);
append("cluster_stats_messages_pong_sent", 1);
append("cluster_stats_messages_sent", 1);
append("cluster_stats_messages_ping_received", 1);
append("cluster_stats_messages_pong_received", 1);
append("cluster_stats_messages_meet_received", 0);
append("cluster_stats_messages_received", 1);
(*cntx)->SendBulkString(msg);
return;
}
return (*cntx)->SendError(UnknownSubCmd(sub_cmd, "CLUSTER"), kSyntaxErrType);
}
void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
ToUpper(&args[0]);
string_view sub_cmd = ArgS(args, 0);
@ -1865,7 +1718,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
if (should_enter("CLUSTER")) {
ADD_HEADER("# Cluster");
append("cluster_enabled", is_emulated_cluster_ || ClusterConfig::IsClusterEnabled());
append("cluster_enabled", cluster_family_->IsEnabledOrEmulated());
}
(*cntx)->SendBulkString(info);
@ -1956,42 +1809,6 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendBulkString((*ServerState::tlocal()).is_master ? "master" : "slave");
}
std::string ServerFamily::BuildClusterNodeReply(ConnectionContext* cntx) const {
ServerState& etl = *ServerState::tlocal();
auto epoch_master_time = std::time(nullptr) * 1000;
if (etl.is_master) {
std::string cluster_announce_ip = GetFlag(FLAGS_cluster_announce_ip);
std::string preferred_endpoint =
cluster_announce_ip.empty() ? cntx->owner()->LocalBindAddress() : cluster_announce_ip;
auto vec = dfly_cmd_->GetReplicasRoleInfo();
auto my_port = GetFlag(FLAGS_port);
const char* connect_state = vec.empty() ? "disconnected" : "connected";
std::string msg = absl::StrCat(master_id(), " ", preferred_endpoint, ":", my_port, "@", my_port,
" myself,master - 0 ", epoch_master_time, " 1 ", connect_state,
" 0-16383\r\n");
if (!vec.empty()) { // info about the replica
const auto& info = vec[0];
absl::StrAppend(&msg, etl.remote_client_id_, " ", info.address, ":", info.listening_port, "@",
info.listening_port, " slave 0 ", master_id(), " 1 ", connect_state, "\r\n");
}
return msg;
} else {
unique_lock lk(replicaof_mu_); // make sure that this pointer stays alive!
auto replica_ptr = replica_;
Replica::Info info = replica_ptr->GetInfo();
auto my_ip = cntx->owner()->LocalBindAddress();
auto my_port = GetFlag(FLAGS_port);
const char* connect_state =
replica_ptr->GetInfo().master_link_established ? "connected" : "disconnected";
std::string msg =
absl::StrCat(master_id(), " ", my_ip, ":", my_port, "@", my_port, " myself,slave ",
master_id(), " 0 ", epoch_master_time, " 1 ", connect_state, "\r\n");
absl::StrAppend(&msg, replica_ptr->MasterId(), " ", info.host, ":", info.port, "@", info.port,
" master - 0 ", epoch_master_time, " 1 ", connect_state, " 0-16383\r\n");
return msg;
}
}
void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
std::string_view host = ArgS(args, 0);
std::string_view port_s = ArgS(args, 1);
@ -2181,13 +1998,6 @@ void ServerFamily::Psync(CmdArgList args, ConnectionContext* cntx) {
SyncGeneric("?", 0, cntx); // full sync, ignore the request.
}
void ServerFamily::ReadOnly(CmdArgList args, ConnectionContext* cntx) {
if (!is_emulated_cluster_) {
return (*cntx)->SendError("READONLY command requires --cluster_mode=emulated");
}
(*cntx)->SendOk();
}
void ServerFamily::LastSave(CmdArgList args, ConnectionContext* cntx) {
time_t save_time;
{
@ -2257,7 +2067,6 @@ void ServerFamily::Register(CommandRegistry* registry) {
*registry << CI{"AUTH", CO::NOSCRIPT | CO::FAST | CO::LOADING, -2, 0, 0, 0}.HFUNC(Auth)
<< CI{"BGSAVE", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Save)
<< CI{"CLIENT", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.HFUNC(Client)
<< CI{"CLUSTER", CO::READONLY, 2, 1, 1, 1}.HFUNC(Cluster)
<< CI{"CONFIG", CO::ADMIN, -2, 0, 0, 0}.HFUNC(Config)
<< CI{"DBSIZE", CO::READONLY | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(DbSize)
<< CI{"DEBUG", CO::ADMIN | CO::LOADING, -2, 0, 0, 0}.HFUNC(Debug)
@ -2271,7 +2080,6 @@ void ServerFamily::Register(CommandRegistry* registry) {
<< CI{"SAVE", CO::ADMIN | CO::GLOBAL_TRANS, -1, 0, 0, 0}.HFUNC(Save)
<< CI{"SHUTDOWN", CO::ADMIN | CO::NOSCRIPT | CO::LOADING, -1, 0, 0, 0}.HFUNC(_Shutdown)
<< CI{"SLAVEOF", kReplicaOpts, 3, 0, 0, 0}.HFUNC(ReplicaOf)
<< CI{"READONLY", CO::READONLY, 1, 0, 0, 0}.HFUNC(ReadOnly)
<< CI{"REPLICAOF", kReplicaOpts, 3, 0, 0, 0}.HFUNC(ReplicaOf)
<< CI{"REPLCONF", CO::ADMIN | CO::LOADING, -1, 0, 0, 0}.HFUNC(ReplConf)
<< CI{"ROLE", CO::LOADING | CO::FAST | CO::NOSCRIPT, 1, 0, 0, 0}.HFUNC(Role)

View file

@ -9,8 +9,8 @@
#include "facade/conn_context.h"
#include "facade/redis_parser.h"
#include "server/channel_store.h"
#include "server/cluster/cluster_config.h"
#include "server/engine_shard_set.h"
#include "server/replica.h"
namespace util {
class AcceptServer;
@ -31,11 +31,11 @@ namespace journal {
class Journal;
} // namespace journal
class ClusterFamily;
class ConnectionContext;
class CommandRegistry;
class DflyCmd;
class Service;
class Replica;
class ScriptMgr;
struct Metrics {
@ -75,10 +75,11 @@ struct ReplicaOffsetInfo {
class ServerFamily {
public:
ServerFamily(Service* service);
explicit ServerFamily(Service* service);
~ServerFamily();
void Init(util::AcceptServer* acceptor, util::ListenerInterface* main_listener);
void Init(util::AcceptServer* acceptor, util::ListenerInterface* main_listener,
ClusterFamily* cluster_family);
void Register(CommandRegistry* registry);
void Shutdown();
@ -92,10 +93,6 @@ class ServerFamily {
return script_mgr_.get();
}
ClusterConfig* cluster_config() {
return cluster_config_.get();
}
void StatsMC(std::string_view section, facade::ConnectionContext* cntx);
// if new_version is true, saves DF specific, non redis compatible snapshot.
@ -134,6 +131,14 @@ class ServerFamily {
return journal_.get();
}
DflyCmd* GetDflyCmd() const {
return dfly_cmd_.get();
}
bool HasReplica() const;
Replica::Info GetReplicaInfo() const;
std::string GetReplicaMasterId() const;
void OnClose(ConnectionContext* cntx);
void BreakOnShutdown();
@ -143,11 +148,8 @@ class ServerFamily {
return shard_set->size();
}
std::string BuildClusterNodeReply(ConnectionContext* cntx) const;
void Auth(CmdArgList args, ConnectionContext* cntx);
void Client(CmdArgList args, ConnectionContext* cntx);
void Cluster(CmdArgList args, ConnectionContext* cntx);
void Config(CmdArgList args, ConnectionContext* cntx);
void DbSize(CmdArgList args, ConnectionContext* cntx);
void Debug(CmdArgList args, ConnectionContext* cntx);
@ -160,7 +162,6 @@ class ServerFamily {
void LastSave(CmdArgList args, ConnectionContext* cntx);
void Latency(CmdArgList args, ConnectionContext* cntx);
void Psync(CmdArgList args, ConnectionContext* cntx);
void ReadOnly(CmdArgList args, ConnectionContext* cntx);
void ReplicaOf(CmdArgList args, ConnectionContext* cntx);
void ReplConf(CmdArgList args, ConnectionContext* cntx);
void Role(CmdArgList args, ConnectionContext* cntx);
@ -187,18 +188,16 @@ class ServerFamily {
util::ProactorBase* pb_task_ = nullptr;
mutable Mutex replicaof_mu_, save_mu_;
std::shared_ptr<Replica> replica_; // protected by replica_of_mu_
std::shared_ptr<Replica> replica_ ABSL_GUARDED_BY(replicaof_mu_);
std::unique_ptr<ScriptMgr> script_mgr_;
std::unique_ptr<journal::Journal> journal_;
std::unique_ptr<DflyCmd> dfly_cmd_;
ClusterFamily* cluster_family_ = nullptr; // Not owned
std::string master_id_;
time_t start_time_ = 0; // in seconds, epoch time.
bool is_emulated_cluster_ = false;
std::unique_ptr<ClusterConfig> cluster_config_;
std::shared_ptr<LastSaveInfo> last_save_info_; // protected by save_mu_;
std::atomic_bool is_saving_{false};