fix(replica): fix replica reconnect handing (#2068)

* fix(replica): fix replica reconnect handing

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2023-10-27 11:12:55 +03:00 committed by GitHub
parent af622f6a52
commit 474ea5137a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 63 additions and 78 deletions

View file

@ -634,12 +634,11 @@ void DbSlice::FlushDb(DbIndex db_ind) {
} }
} }
fb2::Fiber("flush_all", [all_dbs = std::move(all_dbs)]() mutable { // Explicitly drop reference counted pointers in place.
for (auto& db : all_dbs) { // If snapshotting is currently in progress, they will keep alive until it finishes.
db.reset(); for (auto& db : all_dbs)
} db.reset();
mi_heap_collect(ServerState::tlocal()->data_heap(), true); mi_heap_collect(ServerState::tlocal()->data_heap(), true);
}).Detach();
} }
void DbSlice::AddExpire(DbIndex db_ind, PrimeIterator main_it, uint64_t at) { void DbSlice::AddExpire(DbIndex db_ind, PrimeIterator main_it, uint64_t at) {

View file

@ -81,10 +81,7 @@ error_code Replica::Start(ConnectionContext* cntx) {
ProactorBase* mythread = ProactorBase::me(); ProactorBase* mythread = ProactorBase::me();
CHECK(mythread); CHECK(mythread);
RETURN_ON_ERR( auto check_connection_error = [this, &cntx](error_code ec, const char* msg) -> error_code {
cntx_.SwitchErrorHandler([this](const GenericError& ge) { this->DefaultErrorHandler(ge); }));
auto check_connection_error = [this, &cntx](const error_code& ec, const char* msg) -> error_code {
if (cntx_.IsCancelled()) { if (cntx_.IsCancelled()) {
(*cntx)->SendError("replication cancelled"); (*cntx)->SendError("replication cancelled");
return std::make_error_code(errc::operation_canceled); return std::make_error_code(errc::operation_canceled);
@ -92,11 +89,15 @@ error_code Replica::Start(ConnectionContext* cntx) {
if (ec) { if (ec) {
(*cntx)->SendError(absl::StrCat(msg, ec.message())); (*cntx)->SendError(absl::StrCat(msg, ec.message()));
cntx_.Cancel(); cntx_.Cancel();
return ec;
} }
return {}; return ec;
}; };
// 0. Set basic error handler that is reponsible for cleaning up on errors.
// Can return an error only if replication was cancelled immediately.
auto err = cntx_.SwitchErrorHandler([this](const auto& ge) { this->DefaultErrorHandler(ge); });
RETURN_ON_ERR(check_connection_error(err, "replication cancelled"));
// 1. Resolve dns. // 1. Resolve dns.
VLOG(1) << "Resolving master DNS"; VLOG(1) << "Resolving master DNS";
error_code ec = ResolveMasterDns(); error_code ec = ResolveMasterDns();

View file

@ -345,6 +345,11 @@ template <typename T> void UpdateMax(T* maxv, T current) {
*maxv = std::max(*maxv, current); *maxv = std::max(*maxv, current);
} }
void SetMasterFlagOnAllThreads(bool is_master) {
auto cb = [is_master](auto* pb) { ServerState::tlocal()->is_master = is_master; };
shard_set->pool()->DispatchBrief(cb);
}
} // namespace } // namespace
std::optional<SnapshotSpec> ParseSaveSchedule(string_view time) { std::optional<SnapshotSpec> ParseSaveSchedule(string_view time) {
@ -1585,11 +1590,10 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
} else { } else {
append("role", "replica"); append("role", "replica");
// it's safe to access replica_ because replica_ is created before etl.is_master set to // The replica pointer can still be mutated even while master=true,
// false and cleared after etl.is_master is set to true. And since the code here that checks // we don't want to drop the replica object in this fiber
// for is_master and copies shared_ptr is atomic, it1 should be correct. unique_lock lk{replicaof_mu_};
auto replica_ptr = replica_; Replica::Info rinfo = replica_->GetInfo();
Replica::Info rinfo = replica_ptr->GetInfo();
append("master_host", rinfo.host); append("master_host", rinfo.host);
append("master_port", rinfo.port); append("master_port", rinfo.port);
@ -1757,29 +1761,16 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, ConnectionContext* cntx, void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, ConnectionContext* cntx,
ActionOnConnectionFail on_err) { ActionOnConnectionFail on_err) {
auto& pool = service_.proactor_pool();
LOG(INFO) << "Replicating " << host << ":" << port_sv; LOG(INFO) << "Replicating " << host << ":" << port_sv;
// We lock to protect global state changes that we perform during the replication setup: unique_lock lk(replicaof_mu_); // Only one REPLICAOF command can run at a time
// The replica_ pointer, GlobalState, and the DB itself (we do a flushall txn before syncing).
// The lock is only released during replica_->Start because we want to allow cancellation during
// the connection. If another replication command is received during Start() of an old
// replication, it will acquire the lock, call Stop() on the old replica_ and wait for Stop() to
// complete. So Replica::Stop() must
// 1. Be very responsive, as it is called while holding the lock.
// 2. Leave the DB in a consistent state after it is done.
// We have a relatively involved state machine inside Replica itself which handels cancellation
// with those requirements.
VLOG(2) << "Acquire replica lock";
unique_lock lk(replicaof_mu_);
// If NO ONE was supplied, just stop the current replica (if it exists)
if (IsReplicatingNoOne(host, port_sv)) { if (IsReplicatingNoOne(host, port_sv)) {
if (!ServerState::tlocal()->is_master) { if (!ServerState::tlocal()->is_master) {
auto repl_ptr = replica_; CHECK(replica_);
CHECK(repl_ptr);
pool.AwaitFiberOnAll( SetMasterFlagOnAllThreads(true); // Flip flag before clearing replica
[&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = true; });
replica_->Stop(); replica_->Stop();
replica_.reset(); replica_.reset();
} }
@ -1791,34 +1782,35 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn
} }
uint32_t port; uint32_t port;
if (!absl::SimpleAtoi(port_sv, &port) || port < 1 || port > 65535) { if (!absl::SimpleAtoi(port_sv, &port) || port < 1 || port > 65535) {
(*cntx)->SendError(kInvalidIntErr); (*cntx)->SendError(kInvalidIntErr);
return; return;
} }
auto new_replica = make_shared<Replica>(string(host), port, &service_, master_id()); // First, switch into the loading state
if (auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
if (replica_) { new_state != GlobalState::LOADING) {
replica_->Stop(); // NOTE: consider introducing update API flow.
} else {
// TODO: to disconnect all the blocked clients (pubsub, blpop etc)
pool.AwaitFiberOnAll([&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = false; });
}
replica_ = new_replica;
GlobalState new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
if (new_state != GlobalState::LOADING) {
LOG(WARNING) << GlobalStateName(new_state) << " in progress, ignored"; LOG(WARNING) << GlobalStateName(new_state) << " in progress, ignored";
(*cntx)->SendError("Invalid state");
return; return;
} }
// Replica sends response in either case. No need to send response in this function. // If any replication is in progress, stop it, cancellation should kick in immediately
// It's a bit confusing but simpler. if (replica_)
lk.unlock(); replica_->Stop();
error_code ec{};
// Create a new replica and assing it
auto new_replica = make_shared<Replica>(string(host), port, &service_, master_id());
replica_ = new_replica;
// TODO: disconnect pending blocked clients (pubsub, blocking commands)
SetMasterFlagOnAllThreads(false); // Flip flag after assiging replica
// We proceed connecting below without the lock to allow interrupting the replica immediately.
// From this point and onward, it should be highly responsive.
lk.unlock();
error_code ec{};
switch (on_err) { switch (on_err) {
case ActionOnConnectionFail::kReturnOnError: case ActionOnConnectionFail::kReturnOnError:
ec = new_replica->Start(cntx); ec = new_replica->Start(cntx);
@ -1828,22 +1820,13 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn
break; break;
}; };
VLOG(2) << "Acquire replica lock"; // If the replication attempt failed, clean up global state. The replica should have stopped
// internally.
lk.lock(); lk.lock();
if (ec && replica_ == new_replica) {
// Since we released the replication lock during Start(..), we need to check if this still the service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
// last replicaof command we got. If it's not, then we were cancelled and just exit. SetMasterFlagOnAllThreads(true);
if (replica_ == new_replica) { replica_.reset();
if (ec) {
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
replica_->Stop();
replica_.reset();
}
bool is_master = !replica_;
pool.AwaitFiberOnAll(
[&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = is_master; });
} else {
new_replica->Stop();
} }
} }
@ -1851,7 +1834,6 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
string_view host = ArgS(args, 0); string_view host = ArgS(args, 0);
string_view port = ArgS(args, 1); string_view port = ArgS(args, 1);
// don't flush if input is NO ONE
if (!IsReplicatingNoOne(host, port)) if (!IsReplicatingNoOne(host, port))
Drakarys(cntx->transaction, DbSlice::kDbAll); Drakarys(cntx->transaction, DbSlice::kDbAll);
@ -1865,7 +1847,6 @@ void ServerFamily::Replicate(string_view host, string_view port) {
// we don't flush the database as the context is null // we don't flush the database as the context is null
// (and also because there is nothing to flush) // (and also because there is nothing to flush)
ReplicaOfInternal(host, port, &ctxt, ActionOnConnectionFail::kContinueReplication); ReplicaOfInternal(host, port, &ctxt, ActionOnConnectionFail::kContinueReplication);
} }
@ -1998,9 +1979,8 @@ void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) {
} }
} else { } else {
auto replica_ptr = replica_; unique_lock lk{replicaof_mu_};
CHECK(replica_ptr); Replica::Info rinfo = replica_->GetInfo();
Replica::Info rinfo = replica_ptr->GetInfo();
(*cntx)->StartArray(4); (*cntx)->StartArray(4);
(*cntx)->SendBulkString("replica"); (*cntx)->SendBulkString("replica");
(*cntx)->SendBulkString(rinfo.host); (*cntx)->SendBulkString(rinfo.host);

View file

@ -418,7 +418,7 @@ async def test_cancel_replication_immediately(
After we finish the 'fuzzing' part, replicate the first master and check that After we finish the 'fuzzing' part, replicate the first master and check that
all the data is correct. all the data is correct.
""" """
COMMANDS_TO_ISSUE = 100 COMMANDS_TO_ISSUE = 200
replica = df_local_factory.create() replica = df_local_factory.create()
master = df_local_factory.create() master = df_local_factory.create()
@ -429,7 +429,10 @@ async def test_cancel_replication_immediately(
await seeder.run(target_deviation=0.1) await seeder.run(target_deviation=0.1)
replication_commands = [] async def ping_status():
while True:
await c_replica.info()
await asyncio.sleep(0.05)
async def replicate(): async def replicate():
try: try:
@ -439,12 +442,12 @@ async def test_cancel_replication_immediately(
assert e.args[0] == "replication cancelled" assert e.args[0] == "replication cancelled"
return False return False
for i in range(COMMANDS_TO_ISSUE): ping_job = asyncio.create_task(ping_status())
replication_commands.append(asyncio.create_task(replicate())) replication_commands = [asyncio.create_task(replicate()) for _ in range(COMMANDS_TO_ISSUE)]
num_successes = 0 num_successes = 0
for result in asyncio.as_completed(replication_commands, timeout=30): for result in asyncio.as_completed(replication_commands, timeout=30):
r = await result num_successes += await result
num_successes += r
logging.info(f"succeses: {num_successes}") logging.info(f"succeses: {num_successes}")
assert COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled" assert COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled"
@ -453,6 +456,8 @@ async def test_cancel_replication_immediately(
capture = await seeder.capture() capture = await seeder.capture()
logging.info(f"number of items captured {len(capture)}") logging.info(f"number of items captured {len(capture)}")
assert await seeder.compare(capture, replica.port) assert await seeder.compare(capture, replica.port)
ping_job.cancel()
await c_replica.close() await c_replica.close()