feat(cluster): Cancel blocking commands on cluster update (#2255)

Handle blocking commands during cluster config update

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2023-12-17 15:32:35 +03:00 committed by GitHub
parent e4da54fdeb
commit aaf01d4244
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
25 changed files with 173 additions and 77 deletions

View file

@ -101,6 +101,7 @@ class ConnectionContext {
bool sync_dispatch : 1; // whether this connection is amid a sync dispatch bool sync_dispatch : 1; // whether this connection is amid a sync dispatch
bool journal_emulated : 1; // whether it is used to dispatch journal commands bool journal_emulated : 1; // whether it is used to dispatch journal commands
bool paused : 1; // whether this connection is paused due to CLIENT PAUSE bool paused : 1; // whether this connection is paused due to CLIENT PAUSE
bool blocked; // whether it's blocked on blocking commands like BLPOP, needs to be addressable
// How many async subscription sources are active: monitor and/or pubsub - at most 2. // How many async subscription sources are active: monitor and/or pubsub - at most 2.
uint8_t subscriptions; uint8_t subscriptions;

View file

@ -1048,6 +1048,12 @@ std::string Connection::DebugInfo() const {
absl::StrAppend(&info, "dispatch_queue:pipelined=", pending_pipeline_cmd_cnt_, ", "); absl::StrAppend(&info, "dispatch_queue:pipelined=", pending_pipeline_cmd_cnt_, ", ");
absl::StrAppend(&info, "dispatch_queue:intrusive=", intrusive_front, ", "); absl::StrAppend(&info, "dispatch_queue:intrusive=", intrusive_front, ", ");
absl::StrAppend(&info, "state=");
if (cc_->paused)
absl::StrAppend(&info, "p");
if (cc_->blocked)
absl::StrAppend(&info, "b");
absl::StrAppend(&info, "}"); absl::StrAppend(&info, "}");
return info; return info;
} }
@ -1225,13 +1231,16 @@ void Connection::SendAclUpdateAsync(AclUpdateMessage msg) {
SendAsync({make_unique<AclUpdateMessage>(std::move(msg))}); SendAsync({make_unique<AclUpdateMessage>(std::move(msg))});
} }
void Connection::SendCheckpoint(fb2::BlockingCounter bc, bool ignore_paused) { void Connection::SendCheckpoint(fb2::BlockingCounter bc, bool ignore_paused, bool ignore_blocked) {
if (!IsCurrentlyDispatching()) if (!IsCurrentlyDispatching())
return; return;
if (cc_->paused && ignore_paused) if (cc_->paused && ignore_paused)
return; return;
if (cc_->blocked && ignore_blocked)
return;
VLOG(1) << "Sent checkpoint to " << DebugInfo(); VLOG(1) << "Sent checkpoint to " << DebugInfo();
bc.Add(1); bc.Add(1);

View file

@ -195,8 +195,9 @@ class Connection : public util::Connection {
void SendAclUpdateAsync(AclUpdateMessage msg); void SendAclUpdateAsync(AclUpdateMessage msg);
// If any dispatch is currently in progress, increment counter and send checkpoint message to // If any dispatch is currently in progress, increment counter and send checkpoint message to
// decrement it once finished. It ignore_paused is true, paused dispatches are ignored. // decrement it once finished.
void SendCheckpoint(util::fb2::BlockingCounter bc, bool ignore_paused = false); void SendCheckpoint(util::fb2::BlockingCounter bc, bool ignore_paused = false,
bool ignore_blocked = false);
// Must be called before sending pubsub messages to ensure the threads pipeline queue limit is not // Must be called before sending pubsub messages to ensure the threads pipeline queue limit is not
// reached. Blocks until free space is available. Controlled with `pipeline_queue_limit` flag. // reached. Blocks until free space is available. Controlled with `pipeline_queue_limit` flag.

View file

@ -374,10 +374,12 @@ ProactorBase* Listener::PickConnectionProactor(util::FiberSocketBase* sock) {
} }
DispatchTracker::DispatchTracker(absl::Span<facade::Listener* const> listeners, DispatchTracker::DispatchTracker(absl::Span<facade::Listener* const> listeners,
facade::Connection* issuer, bool ignore_paused) facade::Connection* issuer, bool ignore_paused,
bool ignore_blocked)
: listeners_{listeners.begin(), listeners.end()}, : listeners_{listeners.begin(), listeners.end()},
issuer_{issuer}, issuer_{issuer},
ignore_paused_{ignore_paused} { ignore_paused_{ignore_paused},
ignore_blocked_{ignore_blocked} {
} }
void DispatchTracker::TrackOnThread() { void DispatchTracker::TrackOnThread() {
@ -396,7 +398,7 @@ void DispatchTracker::TrackAll() {
void DispatchTracker::Handle(unsigned thread_index, util::Connection* conn) { void DispatchTracker::Handle(unsigned thread_index, util::Connection* conn) {
if (auto* fconn = static_cast<facade::Connection*>(conn); fconn != issuer_) if (auto* fconn = static_cast<facade::Connection*>(conn); fconn != issuer_)
fconn->SendCheckpoint(bc_, ignore_paused_); fconn->SendCheckpoint(bc_, ignore_paused_, ignore_blocked_);
} }
} // namespace facade } // namespace facade

View file

@ -89,7 +89,7 @@ class Listener : public util::ListenerInterface {
class DispatchTracker { class DispatchTracker {
public: public:
DispatchTracker(absl::Span<facade::Listener* const>, facade::Connection* issuer = nullptr, DispatchTracker(absl::Span<facade::Listener* const>, facade::Connection* issuer = nullptr,
bool ignore_paused = false); bool ignore_paused = false, bool ignore_blocked = false);
void TrackAll(); // Track busy connection on all threads void TrackAll(); // Track busy connection on all threads
void TrackOnThread(); // Track busy connections on current thread void TrackOnThread(); // Track busy connections on current thread
@ -105,6 +105,7 @@ class DispatchTracker {
facade::Connection* issuer_; facade::Connection* issuer_;
util::fb2::BlockingCounter bc_{0}; util::fb2::BlockingCounter bc_{0};
bool ignore_paused_; bool ignore_paused_;
bool ignore_blocked_;
}; };
} // namespace facade } // namespace facade

View file

@ -134,6 +134,7 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : ow
sync_dispatch = false; sync_dispatch = false;
journal_emulated = false; journal_emulated = false;
paused = false; paused = false;
blocked = false;
subscriptions = 0; subscriptions = 0;
} }

View file

@ -13,6 +13,7 @@ enum class OpStatus : uint16_t {
OK, OK,
KEY_EXISTS, KEY_EXISTS,
KEY_NOTFOUND, KEY_NOTFOUND,
KEY_MOVED,
SKIPPED, SKIPPED,
INVALID_VALUE, INVALID_VALUE,
OUT_OF_RANGE, OUT_OF_RANGE,

View file

@ -88,9 +88,9 @@ TEST_F(BlockingControllerTest, Timeout) {
trans_->Schedule(); trans_->Schedule();
auto cb = [&](Transaction* t, EngineShard* shard) { return trans_->GetShardArgs(0); }; auto cb = [&](Transaction* t, EngineShard* shard) { return trans_->GetShardArgs(0); };
bool res = trans_->WaitOnWatch(tp, cb); facade::OpStatus status = trans_->WaitOnWatch(tp, cb);
EXPECT_FALSE(res); EXPECT_EQ(status, facade::OpStatus::TIMED_OUT);
unsigned num_watched = shard_set->Await( unsigned num_watched = shard_set->Await(
0, [&] { return EngineShard::tlocal()->blocking_controller()->NumWatched(0); }); 0, [&] { return EngineShard::tlocal()->blocking_controller()->NumWatched(0); });

View file

@ -311,6 +311,10 @@ bool ClusterConfig::IsMySlot(SlotId id) const {
return my_slots_.test(id); return my_slots_.test(id);
} }
bool ClusterConfig::IsMySlot(std::string_view key) const {
return IsMySlot(KeySlot(key));
}
ClusterConfig::Node ClusterConfig::GetMasterNodeForSlot(SlotId id) const { ClusterConfig::Node ClusterConfig::GetMasterNodeForSlot(SlotId id) const {
CHECK_LT(id, my_slots_.size()) << "Requesting a non-existing slot id " << id; CHECK_LT(id, my_slots_.size()) << "Requesting a non-existing slot id " << id;

View file

@ -65,6 +65,7 @@ class ClusterConfig {
// If key is in my slots ownership return true // If key is in my slots ownership return true
bool IsMySlot(SlotId id) const; bool IsMySlot(SlotId id) const;
bool IsMySlot(std::string_view key) const;
// Returns the master configured for `id`. // Returns the master configured for `id`.
Node GetMasterNodeForSlot(SlotId id) const; Node GetMasterNodeForSlot(SlotId id) const;

View file

@ -514,11 +514,21 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
before = tl_cluster_config->GetOwnedSlots(); before = tl_cluster_config->GetOwnedSlots();
} }
DispatchTracker tracker{server_family_->GetListeners(), cntx->conn()}; // Ignore blocked commands because we filter them with CancelBlockingOnThread
auto cb = [&tracker, &new_config](util::ProactorBase* pb) { DispatchTracker tracker{server_family_->GetListeners(), cntx->conn(), false /* ignore paused */,
true /* ignore blocked */};
auto blocking_filter = [&new_config](ArgSlice keys) {
bool moved = any_of(keys.begin(), keys.end(), [&](auto k) { return !new_config->IsMySlot(k); });
return moved ? OpStatus::KEY_MOVED : OpStatus::OK;
};
auto cb = [this, &tracker, &new_config, blocking_filter](util::ProactorBase* pb) {
server_family_->CancelBlockingOnThread(blocking_filter);
tl_cluster_config = new_config; tl_cluster_config = new_config;
tracker.TrackOnThread(); tracker.TrackOnThread();
}; };
server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb)); server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb));
DCHECK(tl_cluster_config != nullptr); DCHECK(tl_cluster_config != nullptr);

View file

@ -117,12 +117,6 @@ void ConnectionContext::ChangeMonitor(bool start) {
EnableMonitoring(start); EnableMonitoring(start);
} }
void ConnectionContext::CancelBlocking() {
if (transaction) {
transaction->CancelBlocking();
}
}
vector<unsigned> ChangeSubscriptions(bool pattern, CmdArgList args, bool to_add, bool to_reply, vector<unsigned> ChangeSubscriptions(bool pattern, CmdArgList args, bool to_add, bool to_reply,
ConnectionContext* conn) { ConnectionContext* conn) {
vector<unsigned> result(to_reply ? args.size() : 0, 0); vector<unsigned> result(to_reply ? args.size() : 0, 0);

View file

@ -155,8 +155,6 @@ struct ConnectionState {
// For get op - we use it as a mask of MCGetMask values. // For get op - we use it as a mask of MCGetMask values.
uint32_t memcache_flag = 0; uint32_t memcache_flag = 0;
bool is_blocking = false; // whether this connection is blocking on a command
ExecInfo exec_info; ExecInfo exec_info;
ReplicationInfo replication_info; ReplicationInfo replication_info;
@ -194,7 +192,6 @@ class ConnectionContext : public facade::ConnectionContext {
void UnsubscribeAll(bool to_reply); void UnsubscribeAll(bool to_reply);
void PUnsubscribeAll(bool to_reply); void PUnsubscribeAll(bool to_reply);
void ChangeMonitor(bool start); // either start or stop monitor on a given connection void ChangeMonitor(bool start); // either start or stop monitor on a given connection
void CancelBlocking(); // Cancel an ongoing blocking transaction if there is one.
size_t UsedMemory() const override; size_t UsedMemory() const override;

View file

@ -238,7 +238,8 @@ OpResult<ShardFFResult> FindFirstNonEmptyKey(Transaction* trans, int req_obj_typ
} }
OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type, OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type,
BlockingResultCb func, unsigned limit_ms) { BlockingResultCb func, unsigned limit_ms,
bool* block_flag) {
trans->Schedule(); trans->Schedule();
string result_key; string result_key;
@ -281,9 +282,12 @@ OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty
auto wcb = [](Transaction* t, EngineShard* shard) { return t->GetShardArgs(shard->shard_id()); }; auto wcb = [](Transaction* t, EngineShard* shard) { return t->GetShardArgs(shard->shard_id()); };
bool wait_succeeded = trans->WaitOnWatch(limit_tp, std::move(wcb)); *block_flag = true;
if (!wait_succeeded) auto status = trans->WaitOnWatch(limit_tp, std::move(wcb));
return OpStatus::TIMED_OUT; *block_flag = false;
if (status != OpStatus::OK)
return status;
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
if (auto wake_key = t->GetWakeKey(shard->shard_id()); wake_key) { if (auto wake_key = t->GetWakeKey(shard->shard_id()); wake_key) {

View file

@ -95,8 +95,10 @@ using BlockingResultCb =
// Block until a any key of the transaction becomes non-empty and executes the callback. // Block until a any key of the transaction becomes non-empty and executes the callback.
// If multiple keys are non-empty when this function is called, the callback is executed // If multiple keys are non-empty when this function is called, the callback is executed
// immediately with the first key listed in the tx arguments. // immediately with the first key listed in the tx arguments.
// The block flag is set to true while the transaction is blocking.
OpResult<std::string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type, OpResult<std::string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type,
BlockingResultCb cb, unsigned limit_ms); BlockingResultCb cb, unsigned limit_ms,
bool* block_flag);
}; // namespace container_utils }; // namespace container_utils

View file

@ -421,12 +421,13 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) {
absl::Time start = absl::Now(); absl::Time start = absl::Now();
AggregateStatus status; AggregateStatus status;
sf_->CancelBlockingCommands();
// We need to await for all dispatches to finish: Otherwise a transaction might be scheduled // We need to await for all dispatches to finish: Otherwise a transaction might be scheduled
// after this function exits but before the actual shutdown. // after this function exits but before the actual shutdown.
facade::DispatchTracker tracker{sf_->GetListeners(), cntx->conn()}; facade::DispatchTracker tracker{sf_->GetListeners(), cntx->conn()};
tracker.TrackAll(); shard_set->pool()->Await([&](unsigned index, auto* pb) {
sf_->CancelBlockingOnThread();
tracker.TrackOnThread();
});
if (!tracker.Wait(timeout_dur)) { if (!tracker.Wait(timeout_dur)) {
LOG(WARNING) << "Couldn't wait for commands to finish dispatching. " << timeout_dur; LOG(WARNING) << "Couldn't wait for commands to finish dispatching. " << timeout_dur;

View file

@ -889,9 +889,8 @@ OpResult<string> BPopPusher::RunSingle(Transaction* t, time_point tp) {
auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; }; auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; };
// Block // Block
bool wait_succeeded = t->WaitOnWatch(tp, std::move(wcb)); if (auto status = t->WaitOnWatch(tp, std::move(wcb)); status != OpStatus::OK)
if (!wait_succeeded) return status;
return OpStatus::TIMED_OUT;
t->Execute(cb_move, true); t->Execute(cb_move, true);
return op_res; return op_res;
@ -914,9 +913,8 @@ OpResult<string> BPopPusher::RunPair(Transaction* t, time_point tp) {
// This allows us to run Transaction::Execute on watched transactions in both shards. // This allows us to run Transaction::Execute on watched transactions in both shards.
auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; }; auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; };
bool wait_succeeded = t->WaitOnWatch(tp, std::move(wcb)); if (auto status = t->WaitOnWatch(tp, std::move(wcb)); status != OpStatus::OK)
if (!wait_succeeded) return status;
return OpStatus::TIMED_OUT;
return MoveTwoShards(t, pop_key_, push_key_, popdir_, pushdir_, true); return MoveTwoShards(t, pop_key_, push_key_, popdir_, pushdir_, true);
} }
@ -1206,10 +1204,9 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn
popped_value = OpBPop(t, shard, key, dir); popped_value = OpBPop(t, shard, key, dir);
}; };
cntx->conn_state.is_blocking = true;
OpResult<string> popped_key = container_utils::RunCbOnFirstNonEmptyBlocking( OpResult<string> popped_key = container_utils::RunCbOnFirstNonEmptyBlocking(
transaction, OBJ_LIST, std::move(cb), unsigned(timeout * 1000)); transaction, OBJ_LIST, std::move(cb), unsigned(timeout * 1000), &cntx->blocked);
cntx->conn_state.is_blocking = false;
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
if (popped_key) { if (popped_key) {
DVLOG(1) << "BPop " << transaction->DebugId() << " popped from key " << popped_key; // key. DVLOG(1) << "BPop " << transaction->DebugId() << " popped from key " << popped_key; // key.
@ -1222,8 +1219,12 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn
switch (popped_key.status()) { switch (popped_key.status()) {
case OpStatus::WRONG_TYPE: case OpStatus::WRONG_TYPE:
return rb->SendError(kWrongTypeErr); return rb->SendError(kWrongTypeErr);
case OpStatus::CANCELLED:
case OpStatus::TIMED_OUT: case OpStatus::TIMED_OUT:
return rb->SendNullArray(); return rb->SendNullArray();
case OpStatus::KEY_MOVED:
// TODO: proper error for moved
return rb->SendError("-MOVED");
default: default:
LOG(ERROR) << "Unexpected error " << popped_key.status(); LOG(ERROR) << "Unexpected error " << popped_key.status();
} }

View file

@ -1447,7 +1447,8 @@ facade::ConnectionContext* Service::CreateContext(util::FiberSocketBase* peer,
// a bit of a hack. I set up breaker callback here for the owner. // a bit of a hack. I set up breaker callback here for the owner.
// Should work though it's confusing to have it here. // Should work though it's confusing to have it here.
owner->RegisterBreakHook([res, this](uint32_t) { owner->RegisterBreakHook([res, this](uint32_t) {
res->CancelBlocking(); if (res->transaction)
res->transaction->CancelBlocking(nullptr);
this->server_family().BreakOnShutdown(); this->server_family().BreakOnShutdown();
}); });
@ -2425,7 +2426,7 @@ string Service::GetContextInfo(facade::ConnectionContext* cntx) {
if (server_cntx->conn_state.subscribe_info) if (server_cntx->conn_state.subscribe_info)
buf[index++] = 'P'; buf[index++] = 'P';
if (server_cntx->conn_state.is_blocking) if (server_cntx->blocked)
buf[index++] = 'b'; buf[index++] = 'b';
if (index) { if (index) {

View file

@ -1142,17 +1142,18 @@ void ServerFamily::BreakOnShutdown() {
dfly_cmd_->BreakOnShutdown(); dfly_cmd_->BreakOnShutdown();
} }
void ServerFamily::CancelBlockingCommands() { void ServerFamily::CancelBlockingOnThread(std::function<OpStatus(ArgSlice)> status_cb) {
auto cb = [](unsigned thread_index, util::Connection* conn) { auto cb = [status_cb](unsigned thread_index, util::Connection* conn) {
facade::ConnectionContext* fc = static_cast<facade::Connection*>(conn)->cntx(); if (auto fcntx = static_cast<facade::Connection*>(conn)->cntx(); fcntx) {
if (fc) { auto* cntx = static_cast<ConnectionContext*>(fcntx);
ConnectionContext* cntx = static_cast<ConnectionContext*>(fc); if (cntx->transaction && cntx->blocked) {
cntx->CancelBlocking(); cntx->transaction->CancelBlocking(status_cb);
}
} }
}; };
for (auto* listener : listeners_) {
listener->TraverseConnections(cb); for (auto* listener : listeners_)
} listener->TraverseConnectionsOnThread(cb);
} }
string GetPassword() { string GetPassword() {

View file

@ -196,7 +196,7 @@ class ServerFamily {
void BreakOnShutdown(); void BreakOnShutdown();
void CancelBlockingCommands(); void CancelBlockingOnThread(std::function<facade::OpStatus(ArgSlice)> = {});
// Sets the server to replicate another instance. Does not flush the database beforehand! // Sets the server to replicate another instance. Does not flush the database beforehand!
void Replicate(std::string_view host, std::string_view port); void Replicate(std::string_view host, std::string_view port);

View file

@ -2823,10 +2823,8 @@ void XReadBlock(ReadOpts opts, ConnectionContext* cntx) {
auto tp = (opts.timeout) ? chrono::steady_clock::now() + chrono::milliseconds(opts.timeout) auto tp = (opts.timeout) ? chrono::steady_clock::now() + chrono::milliseconds(opts.timeout)
: Transaction::time_point::max(); : Transaction::time_point::max();
bool wait_succeeded = cntx->transaction->WaitOnWatch(tp, std::move(wcb)); if (auto status = cntx->transaction->WaitOnWatch(tp, std::move(wcb)); status != OpStatus::OK)
if (!wait_succeeded) {
return rb->SendNullArray(); return rb->SendNullArray();
}
// Resolve the entry in the woken key. Note this must not use OpRead since // Resolve the entry in the woken key. Note this must not use OpRead since
// only the shard that contains the woken key blocks for the awoken // only the shard that contains the woken key blocks for the awoken

View file

@ -1144,7 +1144,7 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const {
return reverse_index_[sd.arg_start + arg_index]; return reverse_index_[sd.arg_start + arg_index];
} }
bool Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider) { OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider) {
DVLOG(2) << "WaitOnWatch " << DebugId(); DVLOG(2) << "WaitOnWatch " << DebugId();
using namespace chrono; using namespace chrono;
@ -1165,29 +1165,36 @@ bool Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provi
auto* stats = ServerState::tl_connection_stats(); auto* stats = ServerState::tl_connection_stats();
++stats->num_blocked_clients; ++stats->num_blocked_clients;
if (DCHECK_IS_ON()) {
int64_t ms = -1;
if (tp != time_point::max())
ms = duration_cast<milliseconds>(tp - time_point::clock::now()).count();
DVLOG(1) << "WaitOnWatch TimeWait for " << ms << " ms " << DebugId();
}
cv_status status = cv_status::no_timeout; cv_status status = cv_status::no_timeout;
if (tp == time_point::max()) { if (tp == time_point::max()) {
DVLOG(1) << "WaitOnWatch foreva " << DebugId();
blocking_ec_.await(std::move(wake_cb)); blocking_ec_.await(std::move(wake_cb));
DVLOG(1) << "WaitOnWatch AfterWait";
} else { } else {
DVLOG(1) << "WaitOnWatch TimeWait for "
<< duration_cast<milliseconds>(tp - time_point::clock::now()).count() << " ms "
<< DebugId();
status = blocking_ec_.await_until(std::move(wake_cb), tp); status = blocking_ec_.await_until(std::move(wake_cb), tp);
DVLOG(1) << "WaitOnWatch await_until " << int(status);
} }
DVLOG(1) << "WaitOnWatch done " << int(status) << " " << DebugId();
--stats->num_blocked_clients; --stats->num_blocked_clients;
bool is_expired = (coordinator_state_ & COORD_CANCELLED) || status == cv_status::timeout; OpStatus result = OpStatus::OK;
if (is_expired) if (status == cv_status::timeout) {
result = OpStatus::TIMED_OUT;
} else if (coordinator_state_ & COORD_CANCELLED) {
result = local_result_;
}
if (result != OpStatus::OK)
ExpireBlocking(wkeys_provider); ExpireBlocking(wkeys_provider);
coordinator_state_ &= ~COORD_BLOCKED; coordinator_state_ &= ~COORD_BLOCKED;
return !is_expired; return result;
} }
// Runs only in the shard thread. // Runs only in the shard thread.
@ -1430,11 +1437,26 @@ void Transaction::RunOnceAsCommand(const CommandId* cid, RunnableType cb) {
}); });
} }
void Transaction::CancelBlocking() { void Transaction::CancelBlocking(std::function<OpStatus(ArgSlice)> status_cb) {
if (coordinator_state_ & COORD_BLOCKED) { if ((coordinator_state_ & COORD_BLOCKED) == 0)
coordinator_state_ |= COORD_CANCELLED; return;
blocking_ec_.notify();
OpStatus status = OpStatus::CANCELLED;
if (status_cb) {
vector<string_view> all_keys;
IterateActiveShards([this, &all_keys](PerShardData&, auto i) {
auto shard_keys = GetShardArgs(i);
all_keys.insert(all_keys.end(), shard_keys.begin(), shard_keys.end());
});
status = status_cb(absl::MakeSpan(all_keys));
} }
if (status == OpStatus::OK)
return;
coordinator_state_ |= COORD_CANCELLED;
local_result_ = status;
blocking_ec_.notify();
} }
OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) { OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {

View file

@ -186,7 +186,7 @@ class Transaction {
// or b) tp is reached. If tp is time_point::max() then waits indefinitely. // or b) tp is reached. If tp is time_point::max() then waits indefinitely.
// Expects that the transaction had been scheduled before, and uses Execute(.., true) to register. // Expects that the transaction had been scheduled before, and uses Execute(.., true) to register.
// Returns false if timeout occurred, true if was notified by one of the keys. // Returns false if timeout occurred, true if was notified by one of the keys.
bool WaitOnWatch(const time_point& tp, WaitKeysProvider cb); facade::OpStatus WaitOnWatch(const time_point& tp, WaitKeysProvider cb);
// Returns true if transaction is awaked, false if it's timed-out and can be removed from the // Returns true if transaction is awaked, false if it's timed-out and can be removed from the
// blocking queue. // blocking queue.
@ -194,7 +194,7 @@ class Transaction {
// Cancel all blocking watches. Set COORD_CANCELLED. // Cancel all blocking watches. Set COORD_CANCELLED.
// Must be called from coordinator thread. // Must be called from coordinator thread.
void CancelBlocking(); void CancelBlocking(std::function<OpStatus(ArgSlice)>);
// In some cases for non auto-journaling commands we want to enable the auto journal flow. // In some cases for non auto-journaling commands we want to enable the auto journal flow.
void RenableAutoJournal() { void RenableAutoJournal() {

View file

@ -1321,15 +1321,14 @@ void BZPopMinMax(CmdArgList args, ConnectionContext* cntx, bool is_max) {
VLOG(1) << "BZPop timeout(" << timeout << ")"; VLOG(1) << "BZPop timeout(" << timeout << ")";
Transaction* transaction = cntx->transaction; Transaction* transaction = cntx->transaction;
OpResult<ScoredArray> popped_array; OpResult<ScoredArray> popped_array;
cntx->conn_state.is_blocking = true; auto cb = [is_max, &popped_array](Transaction* t, EngineShard* shard, std::string_view key) {
popped_array = OpBZPop(t, shard, key, is_max);
};
OpResult<string> popped_key = container_utils::RunCbOnFirstNonEmptyBlocking( OpResult<string> popped_key = container_utils::RunCbOnFirstNonEmptyBlocking(
transaction, OBJ_ZSET, transaction, OBJ_ZSET, std::move(cb), unsigned(timeout * 1000), &cntx->blocked);
[is_max, &popped_array](Transaction* t, EngineShard* shard, std::string_view key) {
popped_array = OpBZPop(t, shard, key, is_max);
},
unsigned(timeout * 1000));
cntx->conn_state.is_blocking = false;
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
if (popped_key) { if (popped_key) {

View file

@ -1,5 +1,6 @@
import pytest import pytest
import re import re
import json
import redis import redis
from redis import asyncio as aioredis from redis import asyncio as aioredis
import asyncio import asyncio
@ -515,6 +516,50 @@ async def test_cluster_flush_slots_after_config_change(df_local_factory: DflyIns
assert await c_replica.execute_command("dbsize") == (100_000 - slot_0_size) assert await c_replica.execute_command("dbsize") == (100_000 - slot_0_size)
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "admin_port": 30001})
async def test_cluster_blocking_command(df_server):
c_master = df_server.client()
c_master_admin = df_server.admin_client()
config = [
{
"slot_ranges": [{"start": 0, "end": 8000}],
"master": {"id": await get_node_id(c_master_admin), "ip": "10.0.0.1", "port": 7000},
"replicas": [],
},
{
"slot_ranges": [{"start": 8001, "end": 16383}],
"master": {"id": "other", "ip": "10.0.0.2", "port": 7000},
"replicas": [],
},
]
assert (
await c_master_admin.execute_command("DFLYCLUSTER", "CONFIG", json.dumps(config))
) == "OK"
assert (await c_master.execute_command("CLUSTER", "KEYSLOT", "keep-local")) == 3479
assert (await c_master.execute_command("CLUSTER", "KEYSLOT", "remove-key-4")) == 6103
v1 = asyncio.create_task(c_master.blpop("keep-local", 2))
v2 = asyncio.create_task(c_master.blpop("remove-key-4", 2))
await asyncio.sleep(0.1)
config[0]["slot_ranges"][0]["end"] = 5000
config[1]["slot_ranges"][0]["start"] = 5001
assert (
await c_master_admin.execute_command("DFLYCLUSTER", "CONFIG", json.dumps(config))
) == "OK"
await c_master.lpush("keep-local", "WORKS")
assert (await v1) == ("keep-local", "WORKS")
with pytest.raises(aioredis.ResponseError) as e_info:
await v2
assert "MOVED" in str(e_info.value)
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_native_client(df_local_factory: DflyInstanceFactory): async def test_cluster_native_client(df_local_factory: DflyInstanceFactory):
# Start and configure cluster with 3 masters and 3 replicas # Start and configure cluster with 3 masters and 3 replicas