mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
feat(server): Better logging for replication (#1041)
This commit is contained in:
parent
905593b47e
commit
d6e6bcf5a9
3 changed files with 28 additions and 8 deletions
|
@ -98,6 +98,10 @@ vector<vector<unsigned>> Partition(unsigned num_flows) {
|
|||
|
||||
} // namespace
|
||||
|
||||
std::string Replica::MasterContext::Description() const {
|
||||
return absl::StrCat(host, ":", port);
|
||||
}
|
||||
|
||||
Replica::Replica(string host, uint16_t port, Service* se, std::string_view id)
|
||||
: service_(*se), id_{id} {
|
||||
master_context_.host = std::move(host);
|
||||
|
@ -130,16 +134,19 @@ Replica::~Replica() {
|
|||
static const char kConnErr[] = "could not connect to master: ";
|
||||
|
||||
bool Replica::Start(ConnectionContext* cntx) {
|
||||
VLOG(1) << "Starting replication";
|
||||
ProactorBase* mythread = ProactorBase::me();
|
||||
CHECK(mythread);
|
||||
|
||||
// 1. Resolve dns.
|
||||
VLOG(1) << "Resolving master DNS";
|
||||
error_code ec = ResolveMasterDns();
|
||||
if (ec) {
|
||||
(*cntx)->SendError(StrCat("could not resolve master dns", ec.message()));
|
||||
return false;
|
||||
}
|
||||
// 2. Connect socket.
|
||||
VLOG(1) << "Connecting to master";
|
||||
ec = ConnectAndAuth();
|
||||
if (ec) {
|
||||
(*cntx)->SendError(StrCat(kConnErr, ec.message()));
|
||||
|
@ -147,6 +154,7 @@ bool Replica::Start(ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
// 3. Greet.
|
||||
VLOG(1) << "Greeting";
|
||||
state_mask_ = R_ENABLED | R_TCP_CONNECTED;
|
||||
last_io_time_ = mythread->GetMonotonicTimeNs();
|
||||
ec = Greet();
|
||||
|
@ -166,6 +174,7 @@ bool Replica::Start(ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
void Replica::Stop() {
|
||||
VLOG(1) << "Stopping replication";
|
||||
// Mark disabled, prevent from retrying.
|
||||
if (sock_) {
|
||||
sock_->proactor()->Await([this] {
|
||||
|
@ -181,10 +190,12 @@ void Replica::Stop() {
|
|||
}
|
||||
|
||||
void Replica::Pause(bool pause) {
|
||||
VLOG(1) << "Pausing replication";
|
||||
sock_->proactor()->Await([&] { is_paused_ = pause; });
|
||||
}
|
||||
|
||||
void Replica::MainReplicationFb() {
|
||||
VLOG(1) << "Main replication fiber started";
|
||||
// Switch shard states to replication.
|
||||
SetShardStates(true);
|
||||
|
||||
|
@ -200,13 +211,13 @@ void Replica::MainReplicationFb() {
|
|||
|
||||
ec = ResolveMasterDns();
|
||||
if (ec) {
|
||||
LOG(ERROR) << "Error resolving dns " << ec;
|
||||
LOG(ERROR) << "Error resolving dns to " << master_context_.host << " " << ec;
|
||||
continue;
|
||||
}
|
||||
|
||||
ec = ConnectAndAuth();
|
||||
if (ec) {
|
||||
LOG(ERROR) << "Error connecting " << ec;
|
||||
LOG(ERROR) << "Error connecting to " << master_context_.Description() << " " << ec;
|
||||
continue;
|
||||
}
|
||||
VLOG(1) << "Replica socket connected";
|
||||
|
@ -217,7 +228,8 @@ void Replica::MainReplicationFb() {
|
|||
if ((state_mask_ & R_GREETED) == 0) {
|
||||
ec = Greet();
|
||||
if (ec) {
|
||||
LOG(INFO) << "Error greeting " << ec << " " << ec.message();
|
||||
LOG(INFO) << "Error greeting " << master_context_.Description() << " " << ec << " "
|
||||
<< ec.message();
|
||||
state_mask_ &= R_ENABLED;
|
||||
continue;
|
||||
}
|
||||
|
@ -231,7 +243,8 @@ void Replica::MainReplicationFb() {
|
|||
ec = InitiatePSync();
|
||||
|
||||
if (ec) {
|
||||
LOG(WARNING) << "Error syncing " << ec << " " << ec.message();
|
||||
LOG(WARNING) << "Error syncing with " << master_context_.Description() << " " << ec << " "
|
||||
<< ec.message();
|
||||
state_mask_ &= R_ENABLED; // reset all flags besides R_ENABLED
|
||||
continue;
|
||||
}
|
||||
|
@ -246,7 +259,8 @@ void Replica::MainReplicationFb() {
|
|||
else
|
||||
ec = ConsumeRedisStream();
|
||||
|
||||
LOG(WARNING) << "Error full sync " << ec << " " << ec.message();
|
||||
LOG(WARNING) << "Error full sync with " << master_context_.Description() << " " << ec << " "
|
||||
<< ec.message();
|
||||
state_mask_ &= R_ENABLED;
|
||||
}
|
||||
|
||||
|
@ -560,7 +574,7 @@ error_code Replica::InitiateDflySync() {
|
|||
return cntx_.ReportError(ec);
|
||||
}
|
||||
|
||||
LOG(INFO) << "Started full sync";
|
||||
LOG(INFO) << absl::StrCat("Started full sync with ", master_context_.Description());
|
||||
|
||||
// Wait for all flows to receive full sync cut.
|
||||
// In case of an error, this is unblocked by the error handler.
|
||||
|
|
|
@ -36,6 +36,8 @@ class Replica {
|
|||
std::string master_repl_id;
|
||||
std::string dfly_session_id; // Sync session id for dfly sync.
|
||||
uint32_t dfly_flow_id = UINT32_MAX; // Flow id if replica acts as a dfly flow.
|
||||
|
||||
std::string Description() const;
|
||||
};
|
||||
|
||||
// The flow is : R_ENABLED -> R_TCP_CONNECTED -> (R_SYNCING) -> R_SYNC_OK.
|
||||
|
@ -50,10 +52,10 @@ class Replica {
|
|||
};
|
||||
|
||||
// This class holds the commands of transaction in single shard.
|
||||
// Once all commands recieved the command can be executed.
|
||||
// Once all commands were received, the command can be executed.
|
||||
struct TransactionData {
|
||||
// Update the data from ParsedEntry and return true if all shard transaction commands were
|
||||
// recieved.
|
||||
// received.
|
||||
bool AddEntry(journal::ParsedEntry&& entry);
|
||||
|
||||
bool IsGlobalCmd() const;
|
||||
|
|
|
@ -1768,8 +1768,11 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
|
|||
std::string_view port_s = ArgS(args, 2);
|
||||
auto& pool = service_.proactor_pool();
|
||||
|
||||
LOG(INFO) << "Replicating " << host << ":" << port_s;
|
||||
|
||||
if (absl::EqualsIgnoreCase(host, "no") && absl::EqualsIgnoreCase(port_s, "one")) {
|
||||
// use this lock as critical section to prevent concurrent replicaof commands running.
|
||||
VLOG(1) << "Acquire replica lock";
|
||||
unique_lock lk(replicaof_mu_);
|
||||
|
||||
if (!ServerState::tlocal()->is_master) {
|
||||
|
@ -1794,6 +1797,7 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
auto new_replica = make_shared<Replica>(string(host), port, &service_, master_id());
|
||||
|
||||
VLOG(1) << "Acquire replica lock";
|
||||
unique_lock lk(replicaof_mu_);
|
||||
if (replica_) {
|
||||
replica_->Stop(); // NOTE: consider introducing update API flow.
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue