From 0312b662440a10a990fbbb136814ab1c6faec0cb Mon Sep 17 00:00:00 2001 From: adiholden Date: Wed, 29 Mar 2023 12:11:56 +0300 Subject: [PATCH] bug(replication): fix deadlock in cancle replication flow (#1007) Signed-off-by: adi_holden --- src/server/dflycmd.cc | 16 ++-------------- src/server/io_utils.cc | 5 +---- src/server/journal/streamer.cc | 10 ++++++++-- tests/dragonfly/replication_test.py | 4 ++++ 4 files changed, 15 insertions(+), 20 deletions(-) diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index ca0b64b51..c0436cd5b 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -517,23 +517,11 @@ void DflyCmd::CancelReplication(uint32_t sync_id, shared_ptr replic replica_ptr->state.store(SyncState::CANCELLED, memory_order_release); replica_ptr->cntx.Cancel(); - // Run cleanup for shard threads. - shard_set->AwaitRunningOnShardQueue([replica_ptr](EngineShard* shard) { - FlowInfo* flow = &replica_ptr->flows[shard->shard_id()]; - if (flow->cleanup) { - flow->cleanup(); - } - }); - // Wait for tasks to finish. shard_set->pool()->AwaitFiberOnAll([replica_ptr](unsigned index, auto*) { FlowInfo* flow = &replica_ptr->flows[index]; - - // Cleanup hasn't been run for io-thread. - if (EngineShard::tlocal() == nullptr) { - if (flow->cleanup) { - flow->cleanup(); - } + if (flow->cleanup) { + flow->cleanup(); } if (flow->full_sync_fb.IsJoinable()) { diff --git a/src/server/io_utils.cc b/src/server/io_utils.cc index 0e652e463..5d936fb2e 100644 --- a/src/server/io_utils.cc +++ b/src/server/io_utils.cc @@ -5,7 +5,6 @@ #include "server/io_utils.h" #include "base/flags.h" -#include "base/logging.h" #include "server/error.h" using namespace std; @@ -41,8 +40,7 @@ error_code BufferedStreamerBase::ConsumeIntoSink(io::Sink* dest) { // Wait for more data or stop signal. waker_.await([this]() { return buffered_ > 0 || IsStopped(); }); // Break immediately on cancellation. - if (cll_->IsCancelled()) { - waker_.notifyAll(); // Wake consumer if it missed it. + if (IsStopped()) { break; } @@ -62,7 +60,6 @@ error_code BufferedStreamerBase::ConsumeIntoSink(io::Sink* dest) { // TODO: shrink big stash. consumer_buf_.Clear(); } - return std::error_code{}; } diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 5b5ca5507..6e9df49d6 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -25,11 +25,17 @@ uint64_t JournalStreamer::GetRecordCount() const { } void JournalStreamer::Cancel() { + Finalize(); // Finalize must be called before UnregisterOnChange because we first need to stop + // writing to buffer and notify the all the producers. + // Writing to journal holds mutex protecting change_cb_arr_, than the fiber can + // preemt when calling NotifyWritten and it will not run again till notified. + // UnregisterOnChange will try to lock the mutex therefor calling UnregisterOnChange + // before Finalize may cause deadlock. journal_->UnregisterOnChange(journal_cb_id_); - Finalize(); - if (write_fb_.IsJoinable()) + if (write_fb_.IsJoinable()) { write_fb_.Join(); + } } void JournalStreamer::WriterFb(io::Sink* dest) { diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index dd54c08a8..2b7f8e96a 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -189,6 +189,7 @@ async def test_disconnect_replica(df_local_factory: DflyInstanceFactory, df_seed await c_replica.execute_command("REPLICAOF localhost " + str(master.port)) if crash_type == 0: await asyncio.sleep(random.random()/100+0.01) + await c_replica.connection_pool.disconnect() replica.stop(kill=True) else: await wait_available_async(c_replica) @@ -208,6 +209,7 @@ async def test_disconnect_replica(df_local_factory: DflyInstanceFactory, df_seed # Run stable state crashes async def stable_sync(replica, c_replica, crash_type): await asyncio.sleep(random.random() / 100) + await c_replica.connection_pool.disconnect() replica.stop(kill=True) await asyncio.gather(*(stable_sync(*args) for args @@ -249,11 +251,13 @@ async def test_disconnect_replica(df_local_factory: DflyInstanceFactory, df_seed for replica, c_replica, _ in replicas_of_type(lambda t: t == 2): assert await c_replica.ping() assert await seeder.compare(capture, port=replica.port) + await c_replica.connection_pool.disconnect() # Check master survived all disconnects assert await c_master.ping() await c_master.close() + """ Test stopping master during different phases.