From 0f69d32b11396fc65588b73fc5edf0c37725dca2 Mon Sep 17 00:00:00 2001 From: Roy Jacobson Date: Wed, 5 Jul 2023 17:09:10 +0200 Subject: [PATCH] takeover: Cancel blocking commands (#1514) * fix: Cancel blocking commands when performing a takeover * Add some comments * Make CancelBlocking a method of ConnectionContext * add a small todo --- src/facade/dragonfly_connection.cc | 4 ++++ src/facade/dragonfly_connection.h | 2 ++ src/server/conn_context.cc | 7 +++++++ src/server/conn_context.h | 1 + src/server/dflycmd.cc | 10 ++++++++-- src/server/main_service.cc | 4 +--- src/server/server_family.cc | 13 +++++++++++++ src/server/server_family.h | 2 ++ src/server/transaction.cc | 2 +- src/server/transaction.h | 5 +++-- tests/dragonfly/replication_test.py | 11 +++++------ 11 files changed, 47 insertions(+), 14 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 606a63fac..eb288cd7a 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -915,4 +915,8 @@ std::string Connection::RemoteEndpointAddress() const { return re.address().to_string(); } +ConnectionContext* Connection::cntx() { + return cc_.get(); +} + } // namespace facade diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 3913aaec7..9bbd59d6f 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -163,6 +163,8 @@ class Connection : public util::Connection { return name_; } + ConnectionContext* cntx(); + protected: void OnShutdown() override; void OnPreMigrateThread() override; diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index 2e04cd7ba..51c124772 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -9,6 +9,7 @@ #include "server/engine_shard_set.h" #include "server/server_family.h" #include "server/server_state.h" +#include "server/transaction.h" #include "src/facade/dragonfly_connection.h" namespace dfly { @@ -93,6 +94,12 @@ void ConnectionContext::ChangeMonitor(bool start) { EnableMonitoring(start); } +void ConnectionContext::CancelBlocking() { + if (transaction) { + transaction->CancelBlocking(); + } +} + vector ChangeSubscriptions(bool pattern, CmdArgList args, bool to_add, bool to_reply, ConnectionContext* conn) { vector result(to_reply ? args.size() : 0, 0); diff --git a/src/server/conn_context.h b/src/server/conn_context.h index e768bd960..e2cabc401 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -175,6 +175,7 @@ class ConnectionContext : public facade::ConnectionContext { void UnsubscribeAll(bool to_reply); void PUnsubscribeAll(bool to_reply); void ChangeMonitor(bool start); // either start or stop monitor on a given connection + void CancelBlocking(); // Cancel an ongoing blocking transaction if there is one. // Whether this connection is a connection from a replica to its master. // This flag is true only on replica side, where we need to setup a special ConnectionContext diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index c7fe6d25f..15573e021 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -377,8 +377,9 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) { absl::Time start = absl::Now(); AggregateStatus status; - // TODO: We should cancel blocking commands before awaiting all - // dispatches to finish. + // We need to await for all dispatches to finish: Otherwise a transaction might be scheduled + // after this function exits but before the actual shutdown. + sf_->CancelBlockingCommands(); if (!sf_->AwaitDispatches(timeout_dur, [self = cntx->owner()](util::Connection* conn) { // The only command that is currently dispatching should be the takeover command - // so we wait until this is true. @@ -387,7 +388,12 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) { LOG(WARNING) << "Couldn't wait for commands to finish dispatching. " << timeout_dur; status = OpStatus::TIMED_OUT; } + VLOG(1) << "AwaitDispatches done"; + // We have this guard to disable expirations: We don't want any writes to the journal after + // we send the `PING`, and expirations could ruin that. + // TODO: Decouple disabling expirations from TransactionGuard because we don't + // really need TransactionGuard here. TransactionGuard tg{cntx->transaction, /*disable_expirations=*/true}; if (*status == OpStatus::OK) { diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 6ba768ea8..1d7195731 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1028,9 +1028,7 @@ facade::ConnectionContext* Service::CreateContext(util::FiberSocketBase* peer, // a bit of a hack. I set up breaker callback here for the owner. // Should work though it's confusing to have it here. owner->RegisterBreakHook([res, this](uint32_t) { - if (res->transaction) { - res->transaction->BreakOnShutdown(); - } + res->CancelBlocking(); this->server_family().BreakOnShutdown(); }); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index bff4ce65b..1c7859692 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1260,6 +1260,19 @@ void ServerFamily::BreakOnShutdown() { dfly_cmd_->BreakOnShutdown(); } +void ServerFamily::CancelBlockingCommands() { + auto cb = [](unsigned thread_index, util::Connection* conn) { + facade::ConnectionContext* fc = static_cast(conn)->cntx(); + if (fc) { + ConnectionContext* cntx = static_cast(fc); + cntx->CancelBlocking(); + } + }; + for (auto* listener : listeners_) { + listener->TraverseConnections(cb); + } +} + bool ServerFamily::AwaitDispatches(absl::Duration timeout, const std::function& filter) { auto start = absl::Now(); diff --git a/src/server/server_family.h b/src/server/server_family.h index a56148b64..5216848b5 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -156,6 +156,8 @@ class ServerFamily { void BreakOnShutdown(); + void CancelBlockingCommands(); + bool AwaitDispatches(absl::Duration timeout, const std::function& filter); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 3d6bd242c..6fbaec62b 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1389,7 +1389,7 @@ void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt, {}, false); } -void Transaction::BreakOnShutdown() { +void Transaction::CancelBlocking() { if (coordinator_state_ & COORD_BLOCKED) { coordinator_state_ |= COORD_CANCELLED; blocking_ec_.notify(); diff --git a/src/server/transaction.h b/src/server/transaction.h index c6c7efbba..6bb5eabe3 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -183,8 +183,9 @@ class Transaction { // blocking queue. bool NotifySuspended(TxId committed_ts, ShardId sid, std::string_view key); - // Cancel all blocking watches on shutdown. Set COORD_CANCELLED. - void BreakOnShutdown(); + // Cancel all blocking watches. Set COORD_CANCELLED. + // Must be called from coordinator thread. + void CancelBlocking(); // In some cases for non auto-journaling commands we want to enable the auto journal flow. void RenableAutoJournal() { diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 0730682d4..9992be187 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -1120,12 +1120,11 @@ async def test_take_over_counters(df_local_factory, master_threads, replica_thre async def block_during_takeover(): "Add a blocking command during takeover to make sure it doesn't block it." - # TODO: We need to make takeover interrupt blocking commands. - return - try: - await c_blocking.execute_command("BLPOP BLOCKING_KEY1 BLOCKING_KEY2 10") - except redis.exceptions.ConnectionError: - pass + start = time.time() + # The command should just be canceled + assert await c_blocking.execute_command("BLPOP BLOCKING_KEY1 BLOCKING_KEY2 100") is None + # And it should happen in reasonable amount of time. + assert time.time() - start < 10 async def delayed_takeover(): await asyncio.sleep(1)