mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
takeover: Cancel blocking commands (#1514)
* fix: Cancel blocking commands when performing a takeover * Add some comments * Make CancelBlocking a method of ConnectionContext * add a small todo
This commit is contained in:
parent
646f5304a5
commit
0f69d32b11
11 changed files with 47 additions and 14 deletions
|
@ -915,4 +915,8 @@ std::string Connection::RemoteEndpointAddress() const {
|
||||||
return re.address().to_string();
|
return re.address().to_string();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ConnectionContext* Connection::cntx() {
|
||||||
|
return cc_.get();
|
||||||
|
}
|
||||||
|
|
||||||
} // namespace facade
|
} // namespace facade
|
||||||
|
|
|
@ -163,6 +163,8 @@ class Connection : public util::Connection {
|
||||||
return name_;
|
return name_;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
ConnectionContext* cntx();
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
void OnShutdown() override;
|
void OnShutdown() override;
|
||||||
void OnPreMigrateThread() override;
|
void OnPreMigrateThread() override;
|
||||||
|
|
|
@ -9,6 +9,7 @@
|
||||||
#include "server/engine_shard_set.h"
|
#include "server/engine_shard_set.h"
|
||||||
#include "server/server_family.h"
|
#include "server/server_family.h"
|
||||||
#include "server/server_state.h"
|
#include "server/server_state.h"
|
||||||
|
#include "server/transaction.h"
|
||||||
#include "src/facade/dragonfly_connection.h"
|
#include "src/facade/dragonfly_connection.h"
|
||||||
|
|
||||||
namespace dfly {
|
namespace dfly {
|
||||||
|
@ -93,6 +94,12 @@ void ConnectionContext::ChangeMonitor(bool start) {
|
||||||
EnableMonitoring(start);
|
EnableMonitoring(start);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ConnectionContext::CancelBlocking() {
|
||||||
|
if (transaction) {
|
||||||
|
transaction->CancelBlocking();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
vector<unsigned> ChangeSubscriptions(bool pattern, CmdArgList args, bool to_add, bool to_reply,
|
vector<unsigned> ChangeSubscriptions(bool pattern, CmdArgList args, bool to_add, bool to_reply,
|
||||||
ConnectionContext* conn) {
|
ConnectionContext* conn) {
|
||||||
vector<unsigned> result(to_reply ? args.size() : 0, 0);
|
vector<unsigned> result(to_reply ? args.size() : 0, 0);
|
||||||
|
|
|
@ -175,6 +175,7 @@ class ConnectionContext : public facade::ConnectionContext {
|
||||||
void UnsubscribeAll(bool to_reply);
|
void UnsubscribeAll(bool to_reply);
|
||||||
void PUnsubscribeAll(bool to_reply);
|
void PUnsubscribeAll(bool to_reply);
|
||||||
void ChangeMonitor(bool start); // either start or stop monitor on a given connection
|
void ChangeMonitor(bool start); // either start or stop monitor on a given connection
|
||||||
|
void CancelBlocking(); // Cancel an ongoing blocking transaction if there is one.
|
||||||
|
|
||||||
// Whether this connection is a connection from a replica to its master.
|
// Whether this connection is a connection from a replica to its master.
|
||||||
// This flag is true only on replica side, where we need to setup a special ConnectionContext
|
// This flag is true only on replica side, where we need to setup a special ConnectionContext
|
||||||
|
|
|
@ -377,8 +377,9 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) {
|
||||||
absl::Time start = absl::Now();
|
absl::Time start = absl::Now();
|
||||||
AggregateStatus status;
|
AggregateStatus status;
|
||||||
|
|
||||||
// TODO: We should cancel blocking commands before awaiting all
|
// We need to await for all dispatches to finish: Otherwise a transaction might be scheduled
|
||||||
// dispatches to finish.
|
// after this function exits but before the actual shutdown.
|
||||||
|
sf_->CancelBlockingCommands();
|
||||||
if (!sf_->AwaitDispatches(timeout_dur, [self = cntx->owner()](util::Connection* conn) {
|
if (!sf_->AwaitDispatches(timeout_dur, [self = cntx->owner()](util::Connection* conn) {
|
||||||
// The only command that is currently dispatching should be the takeover command -
|
// The only command that is currently dispatching should be the takeover command -
|
||||||
// so we wait until this is true.
|
// so we wait until this is true.
|
||||||
|
@ -387,7 +388,12 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) {
|
||||||
LOG(WARNING) << "Couldn't wait for commands to finish dispatching. " << timeout_dur;
|
LOG(WARNING) << "Couldn't wait for commands to finish dispatching. " << timeout_dur;
|
||||||
status = OpStatus::TIMED_OUT;
|
status = OpStatus::TIMED_OUT;
|
||||||
}
|
}
|
||||||
|
VLOG(1) << "AwaitDispatches done";
|
||||||
|
|
||||||
|
// We have this guard to disable expirations: We don't want any writes to the journal after
|
||||||
|
// we send the `PING`, and expirations could ruin that.
|
||||||
|
// TODO: Decouple disabling expirations from TransactionGuard because we don't
|
||||||
|
// really need TransactionGuard here.
|
||||||
TransactionGuard tg{cntx->transaction, /*disable_expirations=*/true};
|
TransactionGuard tg{cntx->transaction, /*disable_expirations=*/true};
|
||||||
|
|
||||||
if (*status == OpStatus::OK) {
|
if (*status == OpStatus::OK) {
|
||||||
|
|
|
@ -1028,9 +1028,7 @@ facade::ConnectionContext* Service::CreateContext(util::FiberSocketBase* peer,
|
||||||
// a bit of a hack. I set up breaker callback here for the owner.
|
// a bit of a hack. I set up breaker callback here for the owner.
|
||||||
// Should work though it's confusing to have it here.
|
// Should work though it's confusing to have it here.
|
||||||
owner->RegisterBreakHook([res, this](uint32_t) {
|
owner->RegisterBreakHook([res, this](uint32_t) {
|
||||||
if (res->transaction) {
|
res->CancelBlocking();
|
||||||
res->transaction->BreakOnShutdown();
|
|
||||||
}
|
|
||||||
this->server_family().BreakOnShutdown();
|
this->server_family().BreakOnShutdown();
|
||||||
});
|
});
|
||||||
|
|
||||||
|
|
|
@ -1260,6 +1260,19 @@ void ServerFamily::BreakOnShutdown() {
|
||||||
dfly_cmd_->BreakOnShutdown();
|
dfly_cmd_->BreakOnShutdown();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void ServerFamily::CancelBlockingCommands() {
|
||||||
|
auto cb = [](unsigned thread_index, util::Connection* conn) {
|
||||||
|
facade::ConnectionContext* fc = static_cast<facade::Connection*>(conn)->cntx();
|
||||||
|
if (fc) {
|
||||||
|
ConnectionContext* cntx = static_cast<ConnectionContext*>(fc);
|
||||||
|
cntx->CancelBlocking();
|
||||||
|
}
|
||||||
|
};
|
||||||
|
for (auto* listener : listeners_) {
|
||||||
|
listener->TraverseConnections(cb);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
bool ServerFamily::AwaitDispatches(absl::Duration timeout,
|
bool ServerFamily::AwaitDispatches(absl::Duration timeout,
|
||||||
const std::function<bool(util::Connection*)>& filter) {
|
const std::function<bool(util::Connection*)>& filter) {
|
||||||
auto start = absl::Now();
|
auto start = absl::Now();
|
||||||
|
|
|
@ -156,6 +156,8 @@ class ServerFamily {
|
||||||
|
|
||||||
void BreakOnShutdown();
|
void BreakOnShutdown();
|
||||||
|
|
||||||
|
void CancelBlockingCommands();
|
||||||
|
|
||||||
bool AwaitDispatches(absl::Duration timeout,
|
bool AwaitDispatches(absl::Duration timeout,
|
||||||
const std::function<bool(util::Connection*)>& filter);
|
const std::function<bool(util::Connection*)>& filter);
|
||||||
|
|
||||||
|
|
|
@ -1389,7 +1389,7 @@ void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt
|
||||||
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt, {}, false);
|
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt, {}, false);
|
||||||
}
|
}
|
||||||
|
|
||||||
void Transaction::BreakOnShutdown() {
|
void Transaction::CancelBlocking() {
|
||||||
if (coordinator_state_ & COORD_BLOCKED) {
|
if (coordinator_state_ & COORD_BLOCKED) {
|
||||||
coordinator_state_ |= COORD_CANCELLED;
|
coordinator_state_ |= COORD_CANCELLED;
|
||||||
blocking_ec_.notify();
|
blocking_ec_.notify();
|
||||||
|
|
|
@ -183,8 +183,9 @@ class Transaction {
|
||||||
// blocking queue.
|
// blocking queue.
|
||||||
bool NotifySuspended(TxId committed_ts, ShardId sid, std::string_view key);
|
bool NotifySuspended(TxId committed_ts, ShardId sid, std::string_view key);
|
||||||
|
|
||||||
// Cancel all blocking watches on shutdown. Set COORD_CANCELLED.
|
// Cancel all blocking watches. Set COORD_CANCELLED.
|
||||||
void BreakOnShutdown();
|
// Must be called from coordinator thread.
|
||||||
|
void CancelBlocking();
|
||||||
|
|
||||||
// In some cases for non auto-journaling commands we want to enable the auto journal flow.
|
// In some cases for non auto-journaling commands we want to enable the auto journal flow.
|
||||||
void RenableAutoJournal() {
|
void RenableAutoJournal() {
|
||||||
|
|
|
@ -1120,12 +1120,11 @@ async def test_take_over_counters(df_local_factory, master_threads, replica_thre
|
||||||
|
|
||||||
async def block_during_takeover():
|
async def block_during_takeover():
|
||||||
"Add a blocking command during takeover to make sure it doesn't block it."
|
"Add a blocking command during takeover to make sure it doesn't block it."
|
||||||
# TODO: We need to make takeover interrupt blocking commands.
|
start = time.time()
|
||||||
return
|
# The command should just be canceled
|
||||||
try:
|
assert await c_blocking.execute_command("BLPOP BLOCKING_KEY1 BLOCKING_KEY2 100") is None
|
||||||
await c_blocking.execute_command("BLPOP BLOCKING_KEY1 BLOCKING_KEY2 10")
|
# And it should happen in reasonable amount of time.
|
||||||
except redis.exceptions.ConnectionError:
|
assert time.time() - start < 10
|
||||||
pass
|
|
||||||
|
|
||||||
async def delayed_takeover():
|
async def delayed_takeover():
|
||||||
await asyncio.sleep(1)
|
await asyncio.sleep(1)
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue