mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
feat(replication): Extend replication handshake protocol to support DF->DF replication.
1. Introduce REPLCONF command support. 2. Introduce DF specific DFLY SYNC command to allow multi-flow replication. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
042800c2f4
commit
14e61f532f
11 changed files with 420 additions and 133 deletions
2
helio
2
helio
|
@ -1 +1 @@
|
|||
Subproject commit 17fdc10f97c8c28eb9a0544ca65fb7e60cfc575a
|
||||
Subproject commit 6bc0a8d49e534c8f7492ecbabc622c54b5ab7e72
|
|
@ -679,6 +679,8 @@ auto Connection::FromArgs(RespVec args, mi_heap_t* heap) -> Request* {
|
|||
void RespToArgList(const RespVec& src, CmdArgVec* dest) {
|
||||
dest->resize(src.size());
|
||||
for (size_t i = 0; i < src.size(); ++i) {
|
||||
DCHECK(src[i].type == RespExpr::STRING);
|
||||
|
||||
(*dest)[i] = ToMSS(src[i].GetBuf());
|
||||
}
|
||||
}
|
||||
|
|
|
@ -8,13 +8,14 @@ cxx_link(dfly_transaction uring_fiber_lib dfly_core strings_lib)
|
|||
|
||||
add_library(dragonfly_lib channel_slice.cc command_registry.cc
|
||||
config_flags.cc conn_context.cc debugcmd.cc dflycmd.cc
|
||||
generic_family.cc hset_family.cc json_family.cc
|
||||
generic_family.cc hset_family.cc json_family.cc
|
||||
list_family.cc main_service.cc rdb_load.cc rdb_save.cc replica.cc
|
||||
snapshot.cc script_mgr.cc server_family.cc
|
||||
set_family.cc stream_family.cc string_family.cc
|
||||
zset_family.cc version.cc)
|
||||
|
||||
cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib TRDP::jsoncons)
|
||||
cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib
|
||||
absl::random_random TRDP::jsoncons)
|
||||
|
||||
add_library(dfly_test_lib test_utils.cc)
|
||||
cxx_link(dfly_test_lib dragonfly_lib facade_test gtest_main_ext)
|
||||
|
|
|
@ -39,6 +39,9 @@ struct ConnectionState {
|
|||
// For get op - we use it as a mask of MCGetMask values.
|
||||
uint32_t memcache_flag = 0;
|
||||
|
||||
// If it's a replication client - then it holds positive sync session id.
|
||||
uint32_t sync_session_id = 0;
|
||||
|
||||
// Lua-script related data.
|
||||
struct Script {
|
||||
bool is_write = true;
|
||||
|
|
|
@ -9,10 +9,10 @@
|
|||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "facade/dragonfly_connection.h"
|
||||
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/error.h"
|
||||
#include "server/journal/journal.h"
|
||||
#include "server/server_family.h"
|
||||
#include "server/server_state.h"
|
||||
#include "server/transaction.h"
|
||||
|
||||
|
@ -26,7 +26,8 @@ using namespace facade;
|
|||
using namespace std;
|
||||
using util::ProactorBase;
|
||||
|
||||
DflyCmd::DflyCmd(util::ListenerInterface* listener, journal::Journal* journal) : listener_(listener), journal_(journal) {
|
||||
DflyCmd::DflyCmd(util::ListenerInterface* listener, ServerFamily* server_family)
|
||||
: listener_(listener), sf_(server_family) {
|
||||
}
|
||||
|
||||
void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
|
||||
|
@ -35,7 +36,7 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
|
|||
ToUpper(&args[1]);
|
||||
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||
|
||||
std::string_view sub_cmd = ArgS(args, 1);
|
||||
string_view sub_cmd = ArgS(args, 1);
|
||||
if (sub_cmd == "JOURNAL") {
|
||||
if (args.size() < 3) {
|
||||
return rb->SendError(WrongNumArgsError("DFLY JOURNAL"));
|
||||
|
@ -44,9 +45,8 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
|
|||
return;
|
||||
}
|
||||
|
||||
util::ProactorPool* pool = shard_set->pool();
|
||||
if (sub_cmd == "THREAD") {
|
||||
util::ProactorPool* pool = shard_set->pool();
|
||||
|
||||
if (args.size() == 2) { // DFLY THREAD : returns connection thread index and number of threads.
|
||||
rb->StartArray(2);
|
||||
rb->SendLong(ProactorBase::GetIndex());
|
||||
|
@ -55,7 +55,7 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
// DFLY THREAD to_thread : migrates current connection to a different thread.
|
||||
std::string_view arg = ArgS(args, 2);
|
||||
string_view arg = ArgS(args, 2);
|
||||
unsigned num_thread;
|
||||
if (!absl::SimpleAtoi(arg, &num_thread)) {
|
||||
return rb->SendError(kSyntaxErr);
|
||||
|
@ -73,10 +73,46 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (sub_cmd == "SYNC") {
|
||||
// SYNC <masterid> <syncid> <shardid>
|
||||
|
||||
if (args.size() == 5) {
|
||||
string_view masterid = ArgS(args, 2);
|
||||
string_view syncid_str = ArgS(args, 3);
|
||||
string_view shard_id_str = ArgS(args, 4);
|
||||
|
||||
unsigned shard_id, sync_id;
|
||||
VLOG(1) << "Got DFLY SYNC " << masterid << " " << syncid_str << " " << shard_id_str;
|
||||
|
||||
if (masterid != sf_->master_id()) {
|
||||
return rb->SendError("Bad master id");
|
||||
}
|
||||
|
||||
if (!absl::SimpleAtoi(shard_id_str, &shard_id) || !absl::StartsWith(syncid_str, "SYNC")) {
|
||||
return rb->SendError(kSyntaxErr);
|
||||
}
|
||||
|
||||
syncid_str.remove_prefix(4);
|
||||
if (!absl::SimpleAtoi(syncid_str, &sync_id) || shard_id >= shard_set->size()) {
|
||||
return rb->SendError("Bad id");
|
||||
}
|
||||
|
||||
auto it = sync_info_.find(sync_id);
|
||||
if (it == sync_info_.end()) {
|
||||
return rb->SendError("syncid not found");
|
||||
}
|
||||
|
||||
// assuming here that shard id and thread id is the same thing.
|
||||
if (int(shard_id) != ProactorBase::GetIndex()) {
|
||||
listener_->Migrate(cntx->owner(), pool->at(shard_id));
|
||||
}
|
||||
return rb->SendOk();
|
||||
}
|
||||
}
|
||||
|
||||
rb->SendError(kSyntaxErr);
|
||||
}
|
||||
|
||||
|
||||
void DflyCmd::HandleJournal(CmdArgList args, ConnectionContext* cntx) {
|
||||
DCHECK_GE(args.size(), 3u);
|
||||
ToUpper(&args[2]);
|
||||
|
@ -91,7 +127,7 @@ void DflyCmd::HandleJournal(CmdArgList args, ConnectionContext* cntx) {
|
|||
journal::Journal* journal = ServerState::tlocal()->journal();
|
||||
if (!journal) {
|
||||
string dir = absl::GetFlag(FLAGS_dir);
|
||||
journal_->StartLogging(dir);
|
||||
sf_->journal()->StartLogging(dir);
|
||||
trans->Schedule();
|
||||
auto barrier_cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
|
||||
trans->Execute(barrier_cb, true);
|
||||
|
@ -105,11 +141,11 @@ void DflyCmd::HandleJournal(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
if (sub_cmd == "STOP") {
|
||||
unique_lock lk(mu_);
|
||||
if (journal_->EnterLameDuck()) {
|
||||
if (sf_->journal()->EnterLameDuck()) {
|
||||
auto barrier_cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
|
||||
trans->ScheduleSingleHop(std::move(barrier_cb));
|
||||
|
||||
auto ec = journal_->Close();
|
||||
auto ec = sf_->journal()->Close();
|
||||
LOG_IF(ERROR, ec) << "Error closing journal " << ec;
|
||||
journal_txid_ = trans->txid();
|
||||
}
|
||||
|
@ -121,4 +157,12 @@ void DflyCmd::HandleJournal(CmdArgList args, ConnectionContext* cntx) {
|
|||
return rb->SendError(reply, kSyntaxErrType);
|
||||
}
|
||||
|
||||
uint32_t DflyCmd::AllocateSyncSession() {
|
||||
unique_lock lk(mu_);
|
||||
auto [it, inserted] = sync_info_.emplace(next_sync_id_, SyncInfo{});
|
||||
CHECK(inserted);
|
||||
++next_sync_id_;
|
||||
return it->first;
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -15,6 +15,7 @@ class ListenerInterface;
|
|||
namespace dfly {
|
||||
|
||||
class EngineShardSet;
|
||||
class ServerFamily;
|
||||
|
||||
namespace journal {
|
||||
class Journal;
|
||||
|
@ -22,17 +23,26 @@ class Journal;
|
|||
|
||||
class DflyCmd {
|
||||
public:
|
||||
DflyCmd(util::ListenerInterface* listener, journal::Journal* journal);
|
||||
DflyCmd(util::ListenerInterface* listener, ServerFamily* server_family);
|
||||
|
||||
void Run(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
uint32_t AllocateSyncSession();
|
||||
|
||||
private:
|
||||
void HandleJournal(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
util::ListenerInterface* listener_;
|
||||
journal::Journal* journal_;
|
||||
ServerFamily* sf_;
|
||||
::boost::fibers::mutex mu_;
|
||||
TxId journal_txid_ = 0;
|
||||
|
||||
struct SyncInfo {
|
||||
int64_t tx_id = 0;
|
||||
};
|
||||
|
||||
absl::btree_map<uint32_t, SyncInfo> sync_info_;
|
||||
uint32_t next_sync_id_ = 1;
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -521,8 +521,9 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
|
|||
}
|
||||
dfly_cntx->transaction->SetExecCmd(cid);
|
||||
OpStatus st = dfly_cntx->transaction->InitByArgs(dfly_cntx->conn_state.db_index, args);
|
||||
if (st != OpStatus::OK)
|
||||
if (st != OpStatus::OK) {
|
||||
return (*cntx)->SendError(st);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
DCHECK(dfly_cntx->transaction == nullptr);
|
||||
|
@ -542,7 +543,14 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
|
|||
|
||||
dfly_cntx->cid = cid;
|
||||
|
||||
cid->Invoke(args, dfly_cntx);
|
||||
try {
|
||||
cid->Invoke(args, dfly_cntx);
|
||||
} catch(std::exception& e) {
|
||||
LOG(ERROR) << "Internal error, system probably unstable " << e.what();
|
||||
dfly_cntx->reply_builder()->SendError("Internal Error");
|
||||
dfly_cntx->reply_builder()->CloseConnection();
|
||||
}
|
||||
|
||||
end_usec = ProactorBase::GetMonotonicTimeNs();
|
||||
|
||||
request_latency_usec.IncBy(cmd_str, (end_usec - start_usec) / 1000);
|
||||
|
|
|
@ -81,8 +81,14 @@ constexpr unsigned kRdbEofMarkSize = 40;
|
|||
|
||||
} // namespace
|
||||
|
||||
Replica::Replica(string host, uint16_t port, Service* se)
|
||||
: service_(*se), host_(std::move(host)), port_(port) {
|
||||
Replica::Replica(string host, uint16_t port, Service* se) : service_(*se) {
|
||||
master_context_.host = std::move(host);
|
||||
master_context_.port = port;
|
||||
}
|
||||
|
||||
Replica::Replica(const MasterContext& context, uint32_t flow_id, Service* service)
|
||||
: service_(*service), master_context_(context) {
|
||||
master_context_.flow_id = flow_id;
|
||||
}
|
||||
|
||||
Replica::~Replica() {
|
||||
|
@ -98,10 +104,10 @@ Replica::~Replica() {
|
|||
static const char kConnErr[] = "could not connect to master: ";
|
||||
|
||||
bool Replica::Run(ConnectionContext* cntx) {
|
||||
CHECK(!sock_ && !sock_thread_);
|
||||
CHECK(!sock_);
|
||||
|
||||
sock_thread_ = ProactorBase::me();
|
||||
CHECK(sock_thread_);
|
||||
ProactorBase* mythread = ProactorBase::me();
|
||||
CHECK(mythread);
|
||||
|
||||
error_code ec = ConnectSocket();
|
||||
if (ec) {
|
||||
|
@ -110,7 +116,7 @@ bool Replica::Run(ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
state_mask_ = R_ENABLED | R_TCP_CONNECTED;
|
||||
last_io_time_ = sock_thread_->GetMonotonicTimeNs();
|
||||
last_io_time_ = mythread->GetMonotonicTimeNs();
|
||||
ec = Greet();
|
||||
if (ec) {
|
||||
(*cntx)->SendError(StrCat("could not greet master ", ec.message()));
|
||||
|
@ -123,17 +129,53 @@ bool Replica::Run(ConnectionContext* cntx) {
|
|||
return true;
|
||||
}
|
||||
|
||||
std::error_code Replica::ConnectSocket() {
|
||||
sock_.reset(sock_thread_->CreateSocket());
|
||||
error_code Replica::ReadRespReply(base::IoBuf* io_buf, uint32_t* consumed) {
|
||||
DCHECK(parser_);
|
||||
|
||||
error_code ec;
|
||||
|
||||
// basically reflection of dragonfly_connection IoLoop function.
|
||||
while (!ec) {
|
||||
io::MutableBytes buf = io_buf->AppendBuffer();
|
||||
io::Result<size_t> size_res = sock_->Recv(buf);
|
||||
if (!size_res)
|
||||
return size_res.error();
|
||||
|
||||
VLOG(2) << "Read master response of " << *size_res << " bytes";
|
||||
|
||||
last_io_time_ = sock_->proactor()->GetMonotonicTimeNs();
|
||||
|
||||
io_buf->CommitWrite(*size_res);
|
||||
|
||||
RedisParser::Result result = parser_->Parse(io_buf->InputBuffer(), consumed, &resp_args_);
|
||||
|
||||
if (result == RedisParser::OK && !resp_args_.empty()) {
|
||||
return error_code{}; // success path
|
||||
}
|
||||
|
||||
if (result != RedisParser::INPUT_PENDING) {
|
||||
LOG(ERROR) << "Invalid parser status " << result << " for buffer of size "
|
||||
<< io_buf->InputLen();
|
||||
return std::make_error_code(std::errc::bad_message);
|
||||
}
|
||||
io_buf->ConsumeInput(*consumed);
|
||||
}
|
||||
|
||||
return ec;
|
||||
}
|
||||
|
||||
error_code Replica::ConnectSocket() {
|
||||
sock_.reset(ProactorBase::me()->CreateSocket());
|
||||
|
||||
char ip_addr[INET6_ADDRSTRLEN];
|
||||
int resolve_res = ResolveDns(host_, ip_addr);
|
||||
int resolve_res = ResolveDns(master_context_.host, ip_addr);
|
||||
if (resolve_res != 0) {
|
||||
LOG(ERROR) << "Dns error " << gai_strerror(resolve_res);
|
||||
LOG(ERROR) << "Dns error " << gai_strerror(resolve_res) << ", host: " << master_context_.host;
|
||||
return make_error_code(errc::host_unreachable);
|
||||
}
|
||||
|
||||
auto address = ip::make_address(ip_addr);
|
||||
ip::tcp::endpoint ep{address, port_};
|
||||
master_context_.master_ep = ip::tcp::endpoint{address, master_context_.port};
|
||||
|
||||
/* These may help but require additional field testing to learn.
|
||||
|
||||
|
@ -152,12 +194,12 @@ std::error_code Replica::ConnectSocket() {
|
|||
CHECK_EQ(0, setsockopt(sock_->native_handle(), IPPROTO_TCP, TCP_KEEPCNT, &intv, sizeof(intv)));
|
||||
*/
|
||||
|
||||
return sock_->Connect(ep);
|
||||
return sock_->Connect(master_context_.master_ep);
|
||||
}
|
||||
|
||||
void Replica::Stop() {
|
||||
if (sock_thread_) {
|
||||
sock_thread_->Await([this] {
|
||||
if (sock_) {
|
||||
sock_->proactor()->Await([this] {
|
||||
state_mask_ = 0; // Specifically ~R_ENABLED.
|
||||
auto ec = sock_->Shutdown(SHUT_RDWR);
|
||||
LOG_IF(ERROR, ec) << "Could not shutdown socket " << ec;
|
||||
|
@ -195,23 +237,34 @@ void Replica::ReplicateFb() {
|
|||
}
|
||||
|
||||
if ((state_mask_ & R_SYNC_OK) == 0) { // has not synced
|
||||
ec = InitiatePSync();
|
||||
if (master_context_.dfly_session_id.empty()) {
|
||||
ec = InitiatePSync(); // redis -> df
|
||||
|
||||
// There is a data race condition in Redis-master code, where "ACK 0" handler may be
|
||||
// triggerred
|
||||
// before Redis is ready to transition to the streaming state and it silenty ignores "ACK
|
||||
// 0". We reduce the chance it happens with this delay.
|
||||
this_fiber::sleep_for(50ms);
|
||||
} else {
|
||||
ec = InitiateDflySync();
|
||||
}
|
||||
|
||||
if (ec) {
|
||||
LOG(WARNING) << "Error syncing " << ec << " " << ec.message();
|
||||
state_mask_ &= R_ENABLED; // reset
|
||||
state_mask_ &= R_ENABLED; // reset all flags besides R_ENABLED
|
||||
continue;
|
||||
}
|
||||
VLOG(1) << "Replica greet ok";
|
||||
}
|
||||
|
||||
// There is a data race condition in Redis-master code, where "ACK 0" handler may be
|
||||
// triggerred
|
||||
// before Redis is ready to transition to the streaming state and it silenty ignores "ACK
|
||||
// 0". We reduce the chance it happens with this delay.
|
||||
this_fiber::sleep_for(50ms);
|
||||
DCHECK(state_mask_ & R_SYNC_OK);
|
||||
|
||||
// Start consuming the replication stream.
|
||||
ec = ConsumeRedisStream();
|
||||
|
||||
if (master_context_.dfly_session_id.empty())
|
||||
ec = ConsumeRedisStream();
|
||||
else
|
||||
ec = ConsumeDflyStream();
|
||||
|
||||
LOG_IF(ERROR, !FiberSocketBase::IsConnClosed(ec)) << "Replica socket error " << ec;
|
||||
state_mask_ &= ~R_SYNC_OK; //
|
||||
|
@ -225,48 +278,24 @@ error_code Replica::Greet() {
|
|||
|
||||
ReqSerializer serializer{sock_.get()};
|
||||
|
||||
RedisParser parser{false}; // client mode
|
||||
RespVec args;
|
||||
parser_.reset(new RedisParser{false});
|
||||
ProactorBase* sock_thread = sock_->proactor();
|
||||
uint32_t consumed = 0;
|
||||
|
||||
// Corresponds to server.repl_state == REPL_STATE_CONNECTING state in redis
|
||||
serializer.SendCommand("PING"); // optional.
|
||||
RETURN_ON_ERR(serializer.ec());
|
||||
RETURN_ON_ERR(Recv(sock_.get(), &io_buf));
|
||||
last_io_time_ = sock_thread_->GetMonotonicTimeNs();
|
||||
|
||||
uint32_t consumed = 0;
|
||||
RedisParser::Result result = parser.Parse(io_buf.InputBuffer(), &consumed, &args);
|
||||
RETURN_ON_ERR(ReadRespReply(&io_buf, &consumed));
|
||||
|
||||
auto check_respok = [&] {
|
||||
return result == RedisParser::OK && !args.empty() && args.front().type == RespExpr::STRING;
|
||||
};
|
||||
last_io_time_ = sock_thread->GetMonotonicTimeNs();
|
||||
|
||||
auto reply_err = [&]() -> string {
|
||||
if (result != RedisParser::OK)
|
||||
return StrCat("parse_err: ", result);
|
||||
if (args.empty())
|
||||
return "none";
|
||||
|
||||
if (args.front().type == RespExpr::ERROR) {
|
||||
return StrCat("error(", ToSV(args.front().GetBuf()), ")");
|
||||
}
|
||||
return RespExpr::TypeName(args.front().type);
|
||||
};
|
||||
|
||||
if (!check_respok()) {
|
||||
LOG(WARNING) << "Bad reply from the server " << reply_err();
|
||||
if (resp_args_.size() != 1 || resp_args_.front().type != RespExpr::STRING ||
|
||||
ToSV(resp_args_.front().GetBuf()) != "PONG") {
|
||||
LOG(ERROR) << "Bad pong response " << ToSV(io_buf.InputBuffer());
|
||||
return make_error_code(errc::bad_message);
|
||||
}
|
||||
|
||||
string_view pong = ToSV(args.front().GetBuf());
|
||||
VLOG(1) << "Master ping reply " << pong;
|
||||
|
||||
// TODO: to check nauth, permission denied etc responses.
|
||||
if (pong != "PONG") {
|
||||
LOG(ERROR) << "Unsupported reply " << pong;
|
||||
return make_error_code(errc::operation_not_permitted);
|
||||
}
|
||||
|
||||
io_buf.ConsumeInput(consumed);
|
||||
|
||||
// TODO: we may also send REPLCONF listening-port, ip-address
|
||||
|
@ -275,19 +304,14 @@ error_code Replica::Greet() {
|
|||
// Corresponds to server.repl_state == REPL_STATE_SEND_CAPA
|
||||
serializer.SendCommand("REPLCONF capa eof capa psync2");
|
||||
RETURN_ON_ERR(serializer.ec());
|
||||
RETURN_ON_ERR(Recv(sock_.get(), &io_buf));
|
||||
RETURN_ON_ERR(ReadRespReply(&io_buf, &consumed));
|
||||
|
||||
result = parser.Parse(io_buf.InputBuffer(), &consumed, &args);
|
||||
if (!check_respok()) {
|
||||
LOG(WARNING) << "Bad reply from the server " << reply_err();
|
||||
if (resp_args_.size() != 1 || resp_args_.front().type != RespExpr::STRING ||
|
||||
ToSV(resp_args_.front().GetBuf()) != "OK") {
|
||||
LOG(ERROR) << "Bad REPLCONF response " << ToSV(io_buf.InputBuffer());
|
||||
return make_error_code(errc::bad_message);
|
||||
}
|
||||
|
||||
pong = ToSV(args.front().GetBuf());
|
||||
|
||||
VLOG(1) << "Master REPLCONF reply " << pong;
|
||||
// TODO: to check replconf reply.
|
||||
|
||||
io_buf.ConsumeInput(consumed);
|
||||
|
||||
// Announce that we are the dragonfly client.
|
||||
|
@ -295,27 +319,54 @@ error_code Replica::Greet() {
|
|||
//
|
||||
serializer.SendCommand("REPLCONF capa dragonfly");
|
||||
RETURN_ON_ERR(serializer.ec());
|
||||
RETURN_ON_ERR(Recv(sock_.get(), &io_buf));
|
||||
RETURN_ON_ERR(ReadRespReply(&io_buf, &consumed));
|
||||
|
||||
result = parser.Parse(io_buf.InputBuffer(), &consumed, &args);
|
||||
if (!check_respok()) {
|
||||
LOG(ERROR) << "Bad response from the server: " << reply_err();
|
||||
last_io_time_ = sock_thread->GetMonotonicTimeNs();
|
||||
|
||||
if (resp_args_.front().type != RespExpr::STRING) {
|
||||
return make_error_code(errc::bad_message);
|
||||
}
|
||||
string_view cmd = ToSV(resp_args_[0].GetBuf());
|
||||
if (resp_args_.size() == 1) { // Redis
|
||||
if (cmd != "OK") {
|
||||
LOG(ERROR) << "Unexpected response " << cmd;
|
||||
return make_error_code(errc::bad_message);
|
||||
}
|
||||
} else if (resp_args_.size() == 3) { // it's dragonfly master.
|
||||
// Reponse is: <master_repl_id, syncid, num_shards>
|
||||
|
||||
if (resp_args_[0].type != RespExpr::STRING || resp_args_[1].type != RespExpr::STRING ||
|
||||
resp_args_[2].type != RespExpr::INT64 ||
|
||||
resp_args_[0].GetBuf().size() != CONFIG_RUN_ID_SIZE) {
|
||||
LOG(ERROR) << "Unexpected response " << ToSV(io_buf.InputBuffer());
|
||||
return make_error_code(errc::bad_message);
|
||||
}
|
||||
|
||||
string_view param0 = ToSV(resp_args_[0].GetBuf());
|
||||
string_view param1 = ToSV(resp_args_[1].GetBuf());
|
||||
int64 param2 = get<int64_t>(resp_args_[2].u);
|
||||
|
||||
if (param2 <= 0 || param2 > 1024) {
|
||||
// sanity check, we support upto 1024 shards.
|
||||
// It's not that we can not support more but it's probably highly unlikely that someone
|
||||
// will run dragonfly with more than 1024 cores.
|
||||
LOG(ERROR) << "Invalid flow count " << param2;
|
||||
return make_error_code(errc::bad_message);
|
||||
}
|
||||
|
||||
master_context_.master_repl_id = param0;
|
||||
master_context_.dfly_session_id = param1;
|
||||
num_df_flows_ = param2;
|
||||
|
||||
VLOG(1) << "Master id: " << param0 << ", sync id: " << param1
|
||||
<< ", num journals " << num_df_flows_;
|
||||
} else {
|
||||
LOG(ERROR) << "Bad response " << ToSV(io_buf.InputBuffer());
|
||||
|
||||
return make_error_code(errc::bad_message);
|
||||
}
|
||||
|
||||
last_io_time_ = sock_thread_->GetMonotonicTimeNs();
|
||||
string_view cmd = ToSV(args[0].GetBuf());
|
||||
if (args.size() == 1) { // Redis
|
||||
if (cmd != "OK") {
|
||||
LOG(ERROR) << "Unexpected command " << cmd;
|
||||
return make_error_code(errc::bad_message);
|
||||
}
|
||||
} else {
|
||||
// TODO: dragonfly
|
||||
LOG(FATAL) << "Bad response " << args;
|
||||
}
|
||||
|
||||
io_buf.ConsumeInput(consumed);
|
||||
state_mask_ |= R_GREETED;
|
||||
|
||||
return error_code{};
|
||||
|
@ -326,15 +377,12 @@ error_code Replica::InitiatePSync() {
|
|||
|
||||
ReqSerializer serializer{sock_.get()};
|
||||
|
||||
// Start full sync
|
||||
state_mask_ |= R_SYNCING;
|
||||
|
||||
// Corresponds to server.repl_state == REPL_STATE_SEND_PSYNC
|
||||
string id("?"); // corresponds to null master id and null offset
|
||||
int64_t offs = -1;
|
||||
if (!master_repl_id_.empty()) { // in case we synced before
|
||||
id = master_repl_id_; // provide the replication offset and master id
|
||||
offs = repl_offs_; // to try incremental sync.
|
||||
if (!master_context_.master_repl_id.empty()) { // in case we synced before
|
||||
id = master_context_.master_repl_id; // provide the replication offset and master id
|
||||
offs = repl_offs_; // to try incremental sync.
|
||||
}
|
||||
serializer.SendCommand(StrCat("PSYNC ", id, " ", offs));
|
||||
RETURN_ON_ERR(serializer.ec());
|
||||
|
@ -344,16 +392,20 @@ error_code Replica::InitiatePSync() {
|
|||
|
||||
RETURN_ON_ERR(ParseReplicationHeader(&io_buf, &repl_header));
|
||||
|
||||
ProactorBase* sock_thread = sock_->proactor();
|
||||
string* token = absl::get_if<string>(&repl_header.fullsync);
|
||||
size_t snapshot_size = SIZE_MAX;
|
||||
if (!token) {
|
||||
snapshot_size = absl::get<size_t>(repl_header.fullsync);
|
||||
}
|
||||
last_io_time_ = sock_thread_->GetMonotonicTimeNs();
|
||||
last_io_time_ = sock_thread->GetMonotonicTimeNs();
|
||||
|
||||
// we get token for diskless redis replication. For disk based replication
|
||||
// we get the snapshot size.
|
||||
if (snapshot_size || token != nullptr) { // full sync
|
||||
// Start full sync
|
||||
state_mask_ |= R_SYNCING;
|
||||
|
||||
SocketSource ss{sock_.get()};
|
||||
io::PrefixSource ps{io_buf.InputBuffer(), &ss};
|
||||
|
||||
|
@ -384,7 +436,7 @@ error_code Replica::InitiatePSync() {
|
|||
|
||||
CHECK(ps.unused_prefix().empty());
|
||||
io_buf.ConsumeInput(io_buf.InputLen());
|
||||
last_io_time_ = sock_thread_->GetMonotonicTimeNs();
|
||||
last_io_time_ = sock_thread->GetMonotonicTimeNs();
|
||||
}
|
||||
|
||||
state_mask_ &= ~R_SYNCING;
|
||||
|
@ -393,6 +445,40 @@ error_code Replica::InitiatePSync() {
|
|||
return error_code{};
|
||||
}
|
||||
|
||||
error_code Replica::InitiateDflySync() {
|
||||
DCHECK_GT(num_df_flows_, 0u);
|
||||
unsigned num_threads = shard_set->pool()->size();
|
||||
vector<vector<unsigned>> partition(num_threads);
|
||||
|
||||
shard_flows_.resize(num_df_flows_);
|
||||
for (unsigned i = 0; i < num_df_flows_; ++i) {
|
||||
partition[i % num_threads].push_back(i);
|
||||
shard_flows_[i].reset(new Replica(master_context_, i, &service_));
|
||||
}
|
||||
|
||||
boost::fibers::mutex mu;
|
||||
error_code ec;
|
||||
|
||||
shard_set->pool()->AwaitFiberOnAll([&](unsigned index, auto*) {
|
||||
const auto& local_ids = partition[index];
|
||||
for (auto id : local_ids) {
|
||||
error_code local_ec = shard_flows_[id]->StartFlow();
|
||||
if (local_ec) {
|
||||
lock_guard lk(mu);
|
||||
ec = local_ec;
|
||||
break;
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
if (ec)
|
||||
return ec;
|
||||
|
||||
state_mask_ |= R_SYNC_OK;
|
||||
|
||||
return error_code{};
|
||||
}
|
||||
|
||||
error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* dest) {
|
||||
std::string_view str;
|
||||
|
||||
|
@ -416,9 +502,9 @@ error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* d
|
|||
size_t pos = header.find(' ');
|
||||
if (pos != std::string_view::npos) {
|
||||
if (absl::SimpleAtoi(header.substr(pos + 1), &repl_offs_)) {
|
||||
master_repl_id_ = string(header.substr(0, pos));
|
||||
master_context_.master_repl_id = string(header.substr(0, pos));
|
||||
valid = true;
|
||||
VLOG(1) << "master repl_id " << master_repl_id_ << " / " << repl_offs_;
|
||||
VLOG(1) << "master repl_id " << master_context_.master_repl_id << " / " << repl_offs_;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -530,7 +616,7 @@ error_code Replica::ConsumeRedisStream() {
|
|||
return size_res.error();
|
||||
|
||||
VLOG(1) << "Read replication stream of " << *size_res << " bytes";
|
||||
last_io_time_ = sock_thread_->GetMonotonicTimeNs();
|
||||
last_io_time_ = sock_->proactor()->GetMonotonicTimeNs();
|
||||
|
||||
io_buf.CommitWrite(*size_res);
|
||||
repl_offs_ += *size_res;
|
||||
|
@ -550,13 +636,28 @@ error_code Replica::ConsumeRedisStream() {
|
|||
return ec;
|
||||
}
|
||||
|
||||
error_code Replica::ConsumeDflyStream() {
|
||||
ReqSerializer serializer{sock_.get()};
|
||||
// TBD
|
||||
serializer.SendCommand("QUIT");
|
||||
state_mask_ &= ~R_ENABLED; // disable further - TODO: not finished.
|
||||
RETURN_ON_ERR(serializer.ec());
|
||||
|
||||
base::IoBuf io_buf{128};
|
||||
|
||||
RETURN_ON_ERR(Recv(sock_.get(), &io_buf));
|
||||
|
||||
return error_code{};
|
||||
}
|
||||
|
||||
// Threadsafe, fiber blocking.
|
||||
auto Replica::GetInfo() const -> Info {
|
||||
CHECK(sock_thread_);
|
||||
return sock_thread_->AwaitBrief([this] {
|
||||
CHECK(sock_);
|
||||
|
||||
return sock_->proactor()->AwaitBrief([this] {
|
||||
Info res;
|
||||
res.host = host_;
|
||||
res.port = port_;
|
||||
res.host = master_context_.host;
|
||||
res.port = master_context_.port;
|
||||
res.master_link_established = (state_mask_ & R_TCP_CONNECTED);
|
||||
res.sync_in_progress = (state_mask_ & R_SYNCING);
|
||||
res.master_last_io_sec = (ProactorBase::GetMonotonicTimeNs() - last_io_time_) / 1000000000UL;
|
||||
|
@ -565,7 +666,7 @@ auto Replica::GetInfo() const -> Info {
|
|||
}
|
||||
|
||||
void Replica::Pause(bool pause) {
|
||||
sock_thread_->Await([&] { is_paused_ = pause; });
|
||||
sock_->proactor()->Await([&] { is_paused_ = pause; });
|
||||
}
|
||||
|
||||
error_code Replica::ParseAndExecute(base::IoBuf* io_buf) {
|
||||
|
@ -582,14 +683,14 @@ error_code Replica::ParseAndExecute(base::IoBuf* io_buf) {
|
|||
conn_context.is_replicating = true;
|
||||
|
||||
do {
|
||||
result = parser_->Parse(io_buf->InputBuffer(), &consumed, &cmd_args_);
|
||||
result = parser_->Parse(io_buf->InputBuffer(), &consumed, &resp_args_);
|
||||
|
||||
switch (result) {
|
||||
case RedisParser::OK:
|
||||
if (!cmd_args_.empty()) {
|
||||
VLOG(2) << "Got command " << ToSV(cmd_args_[0].GetBuf()) << ToSV(cmd_args_[1].GetBuf())
|
||||
if (!resp_args_.empty()) {
|
||||
VLOG(2) << "Got command " << ToSV(resp_args_[0].GetBuf()) << ToSV(resp_args_[1].GetBuf())
|
||||
<< "\n consumed: " << consumed;
|
||||
facade::RespToArgList(cmd_args_, &cmd_str_args_);
|
||||
facade::RespToArgList(resp_args_, &cmd_str_args_);
|
||||
CmdArgList arg_list{cmd_str_args_.data(), cmd_str_args_.size()};
|
||||
service_.DispatchCommand(arg_list, &conn_context);
|
||||
}
|
||||
|
@ -609,4 +710,33 @@ error_code Replica::ParseAndExecute(base::IoBuf* io_buf) {
|
|||
return error_code{};
|
||||
}
|
||||
|
||||
error_code Replica::StartFlow() {
|
||||
CHECK(!sock_);
|
||||
DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty());
|
||||
|
||||
ProactorBase* mythread = ProactorBase::me();
|
||||
CHECK(mythread);
|
||||
|
||||
sock_.reset(mythread->CreateSocket());
|
||||
RETURN_ON_ERR(sock_->Connect(master_context_.master_ep));
|
||||
|
||||
ReqSerializer serializer{sock_.get()};
|
||||
serializer.SendCommand(StrCat("DFLY SYNC ", master_context_.master_repl_id, " ",
|
||||
master_context_.dfly_session_id, " ", master_context_.flow_id));
|
||||
RETURN_ON_ERR(serializer.ec());
|
||||
|
||||
parser_.reset(new RedisParser{false}); // client mode
|
||||
base::IoBuf io_buf{128};
|
||||
unsigned consumed = 0;
|
||||
RETURN_ON_ERR(ReadRespReply(&io_buf, &consumed));
|
||||
|
||||
if (resp_args_.size() != 1 || resp_args_.front().type != RespExpr::STRING ||
|
||||
ToSV(resp_args_.front().GetBuf()) != "OK") {
|
||||
LOG(ERROR) << "Bad SYNC response " << ToSV(io_buf.InputBuffer());
|
||||
return make_error_code(errc::bad_message);
|
||||
}
|
||||
|
||||
return error_code{};
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -17,6 +17,18 @@ class Service;
|
|||
class ConnectionContext;
|
||||
|
||||
class Replica {
|
||||
|
||||
// The attributes of the master we are connecting to.
|
||||
struct MasterContext {
|
||||
std::string host;
|
||||
std::string master_repl_id;
|
||||
std::string dfly_session_id; // for dragonfly replication
|
||||
boost::asio::ip::tcp::endpoint master_ep;
|
||||
|
||||
uint16_t port;
|
||||
uint32_t flow_id = UINT32_MAX; // Relevant if this replica is used to transfer a flow.
|
||||
};
|
||||
|
||||
public:
|
||||
Replica(std::string master_host, uint16_t port, Service* se);
|
||||
~Replica();
|
||||
|
@ -30,11 +42,11 @@ class Replica {
|
|||
void Stop();
|
||||
|
||||
const std::string& master_host() const {
|
||||
return host_;
|
||||
return master_context_.host;
|
||||
}
|
||||
|
||||
uint16_t port() {
|
||||
return port_;
|
||||
return master_context_.port;
|
||||
}
|
||||
|
||||
struct Info {
|
||||
|
@ -50,6 +62,9 @@ class Replica {
|
|||
void Pause(bool pause);
|
||||
|
||||
private:
|
||||
// Used to initialize df replication flow.
|
||||
Replica(const MasterContext& context, uint32_t flow_id, Service* service);
|
||||
|
||||
// The flow is : R_ENABLED -> R_TCP_CONNECTED -> (R_SYNCING) -> R_SYNC_OK.
|
||||
// SYNCING means that the initial ack succeeded. It may be optional if we can still load from
|
||||
// the journal offset.
|
||||
|
@ -70,27 +85,33 @@ class Replica {
|
|||
std::variant<std::string, size_t> fullsync;
|
||||
};
|
||||
|
||||
// This function uses parser_ and cmd_args_ in order to consume a single response
|
||||
// from the sock_. The output will reside in cmd_str_args_.
|
||||
std::error_code ReadRespReply(base::IoBuf* io_buf, uint32_t* consumed);
|
||||
|
||||
std::error_code ConnectSocket();
|
||||
std::error_code Greet();
|
||||
std::error_code InitiatePSync();
|
||||
std::error_code InitiateDflySync();
|
||||
|
||||
std::error_code ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* header);
|
||||
std::error_code ReadLine(base::IoBuf* io_buf, std::string_view* line);
|
||||
std::error_code ConsumeRedisStream();
|
||||
std::error_code ConsumeDflyStream();
|
||||
std::error_code ParseAndExecute(base::IoBuf* io_buf);
|
||||
|
||||
std::error_code StartFlow();
|
||||
|
||||
Service& service_;
|
||||
std::string host_;
|
||||
std::string master_repl_id_;
|
||||
uint16_t port_;
|
||||
|
||||
::boost::fibers::fiber sync_fb_;
|
||||
std::unique_ptr<util::LinuxSocketBase> sock_;
|
||||
MasterContext master_context_;
|
||||
|
||||
// Where the sock_ is handled.
|
||||
util::ProactorBase* sock_thread_ = nullptr;
|
||||
// util::ProactorBase* sock_thread_ = nullptr;
|
||||
std::unique_ptr<facade::RedisParser> parser_;
|
||||
facade::RespVec cmd_args_;
|
||||
facade::RespVec resp_args_;
|
||||
facade::CmdArgVec cmd_str_args_;
|
||||
|
||||
// repl_offs - till what offset we've already read from the master.
|
||||
|
@ -98,7 +119,11 @@ class Replica {
|
|||
size_t repl_offs_ = 0, ack_offs_ = 0;
|
||||
uint64_t last_io_time_ = 0; // in ns, monotonic clock.
|
||||
unsigned state_mask_ = 0;
|
||||
unsigned num_df_flows_ = 0;
|
||||
|
||||
bool is_paused_ = false;
|
||||
|
||||
std::vector<std::unique_ptr<Replica>> shard_flows_;
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -278,10 +278,26 @@ bool DoesTimeMatchSpecifier(const SnapshotSpec& spec, time_t now) {
|
|||
|
||||
ServerFamily::ServerFamily(Service* service) : service_(*service) {
|
||||
start_time_ = time(NULL);
|
||||
lsinfo_ = make_shared<LastSaveInfo>();
|
||||
lsinfo_->save_time = start_time_;
|
||||
last_save_info_ = make_shared<LastSaveInfo>();
|
||||
last_save_info_->save_time = start_time_;
|
||||
script_mgr_.reset(new ScriptMgr());
|
||||
journal_.reset(new journal::Journal);
|
||||
|
||||
{
|
||||
// TODO: if we start using random generator in more places, we should probably
|
||||
// refactor this code.
|
||||
|
||||
absl::InsecureBitGen eng;
|
||||
absl::uniform_int_distribution<uint32_t> ud;
|
||||
|
||||
absl::AlphaNum a1(absl::Hex(eng(), absl::kZeroPad16));
|
||||
absl::AlphaNum a2(absl::Hex(eng(), absl::kZeroPad16));
|
||||
absl::AlphaNum a3(absl::Hex(ud(eng), absl::kZeroPad8));
|
||||
absl::StrAppend(&master_id_, a1, a2, a3);
|
||||
|
||||
size_t constexpr kConfigRunIdSize = CONFIG_RUN_ID_SIZE;
|
||||
DCHECK_EQ(kConfigRunIdSize, master_id_.size());
|
||||
}
|
||||
}
|
||||
|
||||
ServerFamily::~ServerFamily() {
|
||||
|
@ -291,7 +307,7 @@ void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* m
|
|||
CHECK(acceptor_ == nullptr);
|
||||
acceptor_ = acceptor;
|
||||
main_listener_ = main_listener;
|
||||
dfly_cmd_.reset(new DflyCmd(main_listener, journal_.get()));
|
||||
dfly_cmd_.reset(new DflyCmd(main_listener, this));
|
||||
|
||||
pb_task_ = shard_set->pool()->GetNextProactor();
|
||||
|
||||
|
@ -431,7 +447,7 @@ void ServerFamily::SnapshotScheduling(const SnapshotSpec&& spec) {
|
|||
time_t last_save;
|
||||
{
|
||||
lock_guard lk(save_mu_);
|
||||
last_save = lsinfo_->save_time;
|
||||
last_save = last_save_info_->save_time;
|
||||
}
|
||||
|
||||
if ((last_save / 60) == (now / 60)) {
|
||||
|
@ -724,6 +740,7 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er
|
|||
if (new_version) {
|
||||
snapshots.resize(shard_set->size());
|
||||
|
||||
// In the new version we open a file per shard
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
fs::path shard_file = filename, abs_path = path;
|
||||
ShardId sid = shard->shard_id();
|
||||
|
@ -812,7 +829,7 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er
|
|||
|
||||
lock_guard lk(save_mu_);
|
||||
// swap - to deallocate the old version outstide of the lock.
|
||||
lsinfo_.swap(save_info);
|
||||
last_save_info_.swap(save_info);
|
||||
}
|
||||
return ec;
|
||||
}
|
||||
|
@ -834,7 +851,7 @@ error_code ServerFamily::DoFlush(Transaction* transaction, DbIndex db_ind) {
|
|||
|
||||
shared_ptr<const LastSaveInfo> ServerFamily::GetLastSaveInfo() const {
|
||||
lock_guard lk(save_mu_);
|
||||
return lsinfo_;
|
||||
return last_save_info_;
|
||||
}
|
||||
|
||||
void ServerFamily::DbSize(CmdArgList args, ConnectionContext* cntx) {
|
||||
|
@ -1168,10 +1185,10 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
if (should_enter("PERSISTENCE", true)) {
|
||||
ADD_HEADER("# PERSISTENCE");
|
||||
decltype(lsinfo_) save_info;
|
||||
decltype(last_save_info_) save_info;
|
||||
{
|
||||
lock_guard lk(save_mu_);
|
||||
save_info = lsinfo_;
|
||||
save_info = last_save_info_;
|
||||
}
|
||||
append("last_save", save_info->save_time);
|
||||
append("last_save_file", save_info->file_name);
|
||||
|
@ -1188,6 +1205,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (etl.is_master) {
|
||||
append("role", "master");
|
||||
append("connected_slaves", m.conn_stats.num_replicas);
|
||||
append("master_replid", master_id_);
|
||||
} else {
|
||||
append("role", "slave");
|
||||
|
||||
|
@ -1351,6 +1369,41 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
|
|||
[&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = is_master; });
|
||||
}
|
||||
|
||||
void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
|
||||
if (args.size() % 2 == 0)
|
||||
goto err;
|
||||
|
||||
for (unsigned i = 1; i < args.size(); i += 2) {
|
||||
DCHECK_LT(i + 1, args.size());
|
||||
ToUpper(&args[i]);
|
||||
|
||||
std::string_view cmd = ArgS(args, i);
|
||||
std::string_view arg = ArgS(args, i + 1);
|
||||
if (cmd == "CAPA") {
|
||||
if (arg == "dragonfly" && args.size() == 3 && i == 1) {
|
||||
uint32_t sid = dfly_cmd_->AllocateSyncSession();
|
||||
string sync_id = absl::StrCat("SYNC", sid);
|
||||
cntx->conn_state.sync_session_id = sid;
|
||||
|
||||
// The response for 'capa dragonfly' is: <masterid> <syncid> <numthreads>
|
||||
(*cntx)->StartArray(3);
|
||||
(*cntx)->SendSimpleString(master_id_);
|
||||
(*cntx)->SendSimpleString(sync_id);
|
||||
(*cntx)->SendLong(shard_set->size());
|
||||
return;
|
||||
}
|
||||
} else {
|
||||
VLOG(1) << cmd << " " << arg;
|
||||
}
|
||||
}
|
||||
|
||||
(*cntx)->SendOk();
|
||||
return;
|
||||
|
||||
err:
|
||||
(*cntx)->SendError(kSyntaxErr);
|
||||
}
|
||||
|
||||
void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) {
|
||||
(*cntx)->SendRaw("*3\r\n$6\r\nmaster\r\n:0\r\n*0\r\n");
|
||||
}
|
||||
|
@ -1374,7 +1427,7 @@ void ServerFamily::LastSave(CmdArgList args, ConnectionContext* cntx) {
|
|||
time_t save_time;
|
||||
{
|
||||
lock_guard lk(save_mu_);
|
||||
save_time = lsinfo_->save_time;
|
||||
save_time = last_save_info_->save_time;
|
||||
}
|
||||
(*cntx)->SendLong(save_time);
|
||||
}
|
||||
|
@ -1437,6 +1490,7 @@ void ServerFamily::Register(CommandRegistry* registry) {
|
|||
<< CI{"SHUTDOWN", CO::ADMIN | CO::NOSCRIPT | CO::LOADING, 1, 0, 0, 0}.HFUNC(_Shutdown)
|
||||
<< CI{"SLAVEOF", kReplicaOpts, 3, 0, 0, 0}.HFUNC(ReplicaOf)
|
||||
<< CI{"REPLICAOF", kReplicaOpts, 3, 0, 0, 0}.HFUNC(ReplicaOf)
|
||||
<< CI{"REPLCONF", CO::ADMIN | CO::LOADING, -1, 0, 0, 0}.HFUNC(ReplConf)
|
||||
<< CI{"ROLE", CO::LOADING | CO::FAST | CO::NOSCRIPT, 1, 0, 0, 0}.HFUNC(Role)
|
||||
<< CI{"SYNC", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Sync)
|
||||
<< CI{"PSYNC", CO::ADMIN | CO::GLOBAL_TRANS, 3, 0, 0, 0}.HFUNC(Psync)
|
||||
|
|
|
@ -94,6 +94,14 @@ class ServerFamily {
|
|||
|
||||
void PauseReplication(bool pause);
|
||||
|
||||
const std::string& master_id() const {
|
||||
return master_id_;
|
||||
}
|
||||
|
||||
journal::Journal* journal() {
|
||||
return journal_.get();
|
||||
}
|
||||
|
||||
private:
|
||||
uint32_t shard_count() const {
|
||||
return shard_set->size();
|
||||
|
@ -114,6 +122,7 @@ class ServerFamily {
|
|||
void Latency(CmdArgList args, ConnectionContext* cntx);
|
||||
void Psync(CmdArgList args, ConnectionContext* cntx);
|
||||
void ReplicaOf(CmdArgList args, ConnectionContext* cntx);
|
||||
void ReplConf(CmdArgList args, ConnectionContext* cntx);
|
||||
void Role(CmdArgList args, ConnectionContext* cntx);
|
||||
void Save(CmdArgList args, ConnectionContext* cntx);
|
||||
void Script(CmdArgList args, ConnectionContext* cntx);
|
||||
|
@ -142,10 +151,11 @@ class ServerFamily {
|
|||
std::unique_ptr<ScriptMgr> script_mgr_;
|
||||
std::unique_ptr<journal::Journal> journal_;
|
||||
std::unique_ptr<DflyCmd> dfly_cmd_;
|
||||
std::string master_id_;
|
||||
|
||||
time_t start_time_ = 0; // in seconds, epoch time.
|
||||
|
||||
std::shared_ptr<LastSaveInfo> lsinfo_; // protected by save_mu_;
|
||||
std::shared_ptr<LastSaveInfo> last_save_info_; // protected by save_mu_;
|
||||
std::atomic_bool is_saving_{false};
|
||||
|
||||
util::fibers_ext::Done is_snapshot_done_;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue