From 474ea5137a40d20da41fc3bdc81412b5a044aaa2 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Fri, 27 Oct 2023 11:12:55 +0300 Subject: [PATCH] fix(replica): fix replica reconnect handing (#2068) * fix(replica): fix replica reconnect handing Signed-off-by: Vladislav Oleshko --------- Signed-off-by: Vladislav Oleshko --- src/server/db_slice.cc | 11 ++- src/server/replica.cc | 13 ++-- src/server/server_family.cc | 100 +++++++++++----------------- tests/dragonfly/replication_test.py | 17 +++-- 4 files changed, 63 insertions(+), 78 deletions(-) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index ac22888e9..05523a46b 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -634,12 +634,11 @@ void DbSlice::FlushDb(DbIndex db_ind) { } } - fb2::Fiber("flush_all", [all_dbs = std::move(all_dbs)]() mutable { - for (auto& db : all_dbs) { - db.reset(); - } - mi_heap_collect(ServerState::tlocal()->data_heap(), true); - }).Detach(); + // Explicitly drop reference counted pointers in place. + // If snapshotting is currently in progress, they will keep alive until it finishes. + for (auto& db : all_dbs) + db.reset(); + mi_heap_collect(ServerState::tlocal()->data_heap(), true); } void DbSlice::AddExpire(DbIndex db_ind, PrimeIterator main_it, uint64_t at) { diff --git a/src/server/replica.cc b/src/server/replica.cc index 07b230377..4059dbd6c 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -81,10 +81,7 @@ error_code Replica::Start(ConnectionContext* cntx) { ProactorBase* mythread = ProactorBase::me(); CHECK(mythread); - RETURN_ON_ERR( - cntx_.SwitchErrorHandler([this](const GenericError& ge) { this->DefaultErrorHandler(ge); })); - - auto check_connection_error = [this, &cntx](const error_code& ec, const char* msg) -> error_code { + auto check_connection_error = [this, &cntx](error_code ec, const char* msg) -> error_code { if (cntx_.IsCancelled()) { (*cntx)->SendError("replication cancelled"); return std::make_error_code(errc::operation_canceled); @@ -92,11 +89,15 @@ error_code Replica::Start(ConnectionContext* cntx) { if (ec) { (*cntx)->SendError(absl::StrCat(msg, ec.message())); cntx_.Cancel(); - return ec; } - return {}; + return ec; }; + // 0. Set basic error handler that is reponsible for cleaning up on errors. + // Can return an error only if replication was cancelled immediately. + auto err = cntx_.SwitchErrorHandler([this](const auto& ge) { this->DefaultErrorHandler(ge); }); + RETURN_ON_ERR(check_connection_error(err, "replication cancelled")); + // 1. Resolve dns. VLOG(1) << "Resolving master DNS"; error_code ec = ResolveMasterDns(); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 876544043..38df11e71 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -345,6 +345,11 @@ template void UpdateMax(T* maxv, T current) { *maxv = std::max(*maxv, current); } +void SetMasterFlagOnAllThreads(bool is_master) { + auto cb = [is_master](auto* pb) { ServerState::tlocal()->is_master = is_master; }; + shard_set->pool()->DispatchBrief(cb); +} + } // namespace std::optional ParseSaveSchedule(string_view time) { @@ -1585,11 +1590,10 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { } else { append("role", "replica"); - // it's safe to access replica_ because replica_ is created before etl.is_master set to - // false and cleared after etl.is_master is set to true. And since the code here that checks - // for is_master and copies shared_ptr is atomic, it1 should be correct. - auto replica_ptr = replica_; - Replica::Info rinfo = replica_ptr->GetInfo(); + // The replica pointer can still be mutated even while master=true, + // we don't want to drop the replica object in this fiber + unique_lock lk{replicaof_mu_}; + Replica::Info rinfo = replica_->GetInfo(); append("master_host", rinfo.host); append("master_port", rinfo.port); @@ -1757,29 +1761,16 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) { void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, ConnectionContext* cntx, ActionOnConnectionFail on_err) { - auto& pool = service_.proactor_pool(); LOG(INFO) << "Replicating " << host << ":" << port_sv; - // We lock to protect global state changes that we perform during the replication setup: - // The replica_ pointer, GlobalState, and the DB itself (we do a flushall txn before syncing). - // The lock is only released during replica_->Start because we want to allow cancellation during - // the connection. If another replication command is received during Start() of an old - // replication, it will acquire the lock, call Stop() on the old replica_ and wait for Stop() to - // complete. So Replica::Stop() must - // 1. Be very responsive, as it is called while holding the lock. - // 2. Leave the DB in a consistent state after it is done. - // We have a relatively involved state machine inside Replica itself which handels cancellation - // with those requirements. - VLOG(2) << "Acquire replica lock"; - unique_lock lk(replicaof_mu_); + unique_lock lk(replicaof_mu_); // Only one REPLICAOF command can run at a time + // If NO ONE was supplied, just stop the current replica (if it exists) if (IsReplicatingNoOne(host, port_sv)) { if (!ServerState::tlocal()->is_master) { - auto repl_ptr = replica_; - CHECK(repl_ptr); + CHECK(replica_); - pool.AwaitFiberOnAll( - [&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = true; }); + SetMasterFlagOnAllThreads(true); // Flip flag before clearing replica replica_->Stop(); replica_.reset(); } @@ -1791,34 +1782,35 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn } uint32_t port; - if (!absl::SimpleAtoi(port_sv, &port) || port < 1 || port > 65535) { (*cntx)->SendError(kInvalidIntErr); return; } - auto new_replica = make_shared(string(host), port, &service_, master_id()); - - if (replica_) { - replica_->Stop(); // NOTE: consider introducing update API flow. - } else { - // TODO: to disconnect all the blocked clients (pubsub, blpop etc) - - pool.AwaitFiberOnAll([&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = false; }); - } - replica_ = new_replica; - - GlobalState new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING); - if (new_state != GlobalState::LOADING) { + // First, switch into the loading state + if (auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING); + new_state != GlobalState::LOADING) { LOG(WARNING) << GlobalStateName(new_state) << " in progress, ignored"; + (*cntx)->SendError("Invalid state"); return; } - // Replica sends response in either case. No need to send response in this function. - // It's a bit confusing but simpler. - lk.unlock(); - error_code ec{}; + // If any replication is in progress, stop it, cancellation should kick in immediately + if (replica_) + replica_->Stop(); + // Create a new replica and assing it + auto new_replica = make_shared(string(host), port, &service_, master_id()); + replica_ = new_replica; + + // TODO: disconnect pending blocked clients (pubsub, blocking commands) + SetMasterFlagOnAllThreads(false); // Flip flag after assiging replica + + // We proceed connecting below without the lock to allow interrupting the replica immediately. + // From this point and onward, it should be highly responsive. + lk.unlock(); + + error_code ec{}; switch (on_err) { case ActionOnConnectionFail::kReturnOnError: ec = new_replica->Start(cntx); @@ -1828,22 +1820,13 @@ void ServerFamily::ReplicaOfInternal(string_view host, string_view port_sv, Conn break; }; - VLOG(2) << "Acquire replica lock"; + // If the replication attempt failed, clean up global state. The replica should have stopped + // internally. lk.lock(); - - // Since we released the replication lock during Start(..), we need to check if this still the - // last replicaof command we got. If it's not, then we were cancelled and just exit. - if (replica_ == new_replica) { - if (ec) { - service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); - replica_->Stop(); - replica_.reset(); - } - bool is_master = !replica_; - pool.AwaitFiberOnAll( - [&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = is_master; }); - } else { - new_replica->Stop(); + if (ec && replica_ == new_replica) { + service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); + SetMasterFlagOnAllThreads(true); + replica_.reset(); } } @@ -1851,7 +1834,6 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) { string_view host = ArgS(args, 0); string_view port = ArgS(args, 1); - // don't flush if input is NO ONE if (!IsReplicatingNoOne(host, port)) Drakarys(cntx->transaction, DbSlice::kDbAll); @@ -1865,7 +1847,6 @@ void ServerFamily::Replicate(string_view host, string_view port) { // we don't flush the database as the context is null // (and also because there is nothing to flush) - ReplicaOfInternal(host, port, &ctxt, ActionOnConnectionFail::kContinueReplication); } @@ -1998,9 +1979,8 @@ void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) { } } else { - auto replica_ptr = replica_; - CHECK(replica_ptr); - Replica::Info rinfo = replica_ptr->GetInfo(); + unique_lock lk{replicaof_mu_}; + Replica::Info rinfo = replica_->GetInfo(); (*cntx)->StartArray(4); (*cntx)->SendBulkString("replica"); (*cntx)->SendBulkString(rinfo.host); diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index c8ec82d51..37f1deb7a 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -418,7 +418,7 @@ async def test_cancel_replication_immediately( After we finish the 'fuzzing' part, replicate the first master and check that all the data is correct. """ - COMMANDS_TO_ISSUE = 100 + COMMANDS_TO_ISSUE = 200 replica = df_local_factory.create() master = df_local_factory.create() @@ -429,7 +429,10 @@ async def test_cancel_replication_immediately( await seeder.run(target_deviation=0.1) - replication_commands = [] + async def ping_status(): + while True: + await c_replica.info() + await asyncio.sleep(0.05) async def replicate(): try: @@ -439,12 +442,12 @@ async def test_cancel_replication_immediately( assert e.args[0] == "replication cancelled" return False - for i in range(COMMANDS_TO_ISSUE): - replication_commands.append(asyncio.create_task(replicate())) + ping_job = asyncio.create_task(ping_status()) + replication_commands = [asyncio.create_task(replicate()) for _ in range(COMMANDS_TO_ISSUE)] + num_successes = 0 for result in asyncio.as_completed(replication_commands, timeout=30): - r = await result - num_successes += r + num_successes += await result logging.info(f"succeses: {num_successes}") assert COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled" @@ -453,6 +456,8 @@ async def test_cancel_replication_immediately( capture = await seeder.capture() logging.info(f"number of items captured {len(capture)}") assert await seeder.compare(capture, replica.port) + + ping_job.cancel() await c_replica.close()