diff --git a/src/facade/conn_context.h b/src/facade/conn_context.h index 61788551d..e369dc236 100644 --- a/src/facade/conn_context.h +++ b/src/facade/conn_context.h @@ -101,6 +101,7 @@ class ConnectionContext { bool sync_dispatch : 1; // whether this connection is amid a sync dispatch bool journal_emulated : 1; // whether it is used to dispatch journal commands bool paused : 1; // whether this connection is paused due to CLIENT PAUSE + bool blocked; // whether it's blocked on blocking commands like BLPOP, needs to be addressable // How many async subscription sources are active: monitor and/or pubsub - at most 2. uint8_t subscriptions; diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index c2df33088..9ead4e8cd 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -1048,6 +1048,12 @@ std::string Connection::DebugInfo() const { absl::StrAppend(&info, "dispatch_queue:pipelined=", pending_pipeline_cmd_cnt_, ", "); absl::StrAppend(&info, "dispatch_queue:intrusive=", intrusive_front, ", "); + absl::StrAppend(&info, "state="); + if (cc_->paused) + absl::StrAppend(&info, "p"); + if (cc_->blocked) + absl::StrAppend(&info, "b"); + absl::StrAppend(&info, "}"); return info; } @@ -1225,13 +1231,16 @@ void Connection::SendAclUpdateAsync(AclUpdateMessage msg) { SendAsync({make_unique(std::move(msg))}); } -void Connection::SendCheckpoint(fb2::BlockingCounter bc, bool ignore_paused) { +void Connection::SendCheckpoint(fb2::BlockingCounter bc, bool ignore_paused, bool ignore_blocked) { if (!IsCurrentlyDispatching()) return; if (cc_->paused && ignore_paused) return; + if (cc_->blocked && ignore_blocked) + return; + VLOG(1) << "Sent checkpoint to " << DebugInfo(); bc.Add(1); diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 4d502e57e..75233ddec 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -195,8 +195,9 @@ class Connection : public util::Connection { void SendAclUpdateAsync(AclUpdateMessage msg); // If any dispatch is currently in progress, increment counter and send checkpoint message to - // decrement it once finished. It ignore_paused is true, paused dispatches are ignored. - void SendCheckpoint(util::fb2::BlockingCounter bc, bool ignore_paused = false); + // decrement it once finished. + void SendCheckpoint(util::fb2::BlockingCounter bc, bool ignore_paused = false, + bool ignore_blocked = false); // Must be called before sending pubsub messages to ensure the threads pipeline queue limit is not // reached. Blocks until free space is available. Controlled with `pipeline_queue_limit` flag. diff --git a/src/facade/dragonfly_listener.cc b/src/facade/dragonfly_listener.cc index f51cfc23b..d239ad536 100644 --- a/src/facade/dragonfly_listener.cc +++ b/src/facade/dragonfly_listener.cc @@ -374,10 +374,12 @@ ProactorBase* Listener::PickConnectionProactor(util::FiberSocketBase* sock) { } DispatchTracker::DispatchTracker(absl::Span listeners, - facade::Connection* issuer, bool ignore_paused) + facade::Connection* issuer, bool ignore_paused, + bool ignore_blocked) : listeners_{listeners.begin(), listeners.end()}, issuer_{issuer}, - ignore_paused_{ignore_paused} { + ignore_paused_{ignore_paused}, + ignore_blocked_{ignore_blocked} { } void DispatchTracker::TrackOnThread() { @@ -396,7 +398,7 @@ void DispatchTracker::TrackAll() { void DispatchTracker::Handle(unsigned thread_index, util::Connection* conn) { if (auto* fconn = static_cast(conn); fconn != issuer_) - fconn->SendCheckpoint(bc_, ignore_paused_); + fconn->SendCheckpoint(bc_, ignore_paused_, ignore_blocked_); } } // namespace facade diff --git a/src/facade/dragonfly_listener.h b/src/facade/dragonfly_listener.h index 163fa435f..e6cc751d1 100644 --- a/src/facade/dragonfly_listener.h +++ b/src/facade/dragonfly_listener.h @@ -89,7 +89,7 @@ class Listener : public util::ListenerInterface { class DispatchTracker { public: DispatchTracker(absl::Span, facade::Connection* issuer = nullptr, - bool ignore_paused = false); + bool ignore_paused = false, bool ignore_blocked = false); void TrackAll(); // Track busy connection on all threads void TrackOnThread(); // Track busy connections on current thread @@ -105,6 +105,7 @@ class DispatchTracker { facade::Connection* issuer_; util::fb2::BlockingCounter bc_{0}; bool ignore_paused_; + bool ignore_blocked_; }; } // namespace facade diff --git a/src/facade/facade.cc b/src/facade/facade.cc index b4c98e547..0999439e8 100644 --- a/src/facade/facade.cc +++ b/src/facade/facade.cc @@ -134,6 +134,7 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : ow sync_dispatch = false; journal_emulated = false; paused = false; + blocked = false; subscriptions = 0; } diff --git a/src/facade/op_status.h b/src/facade/op_status.h index f96fdba7c..183b7da54 100644 --- a/src/facade/op_status.h +++ b/src/facade/op_status.h @@ -13,6 +13,7 @@ enum class OpStatus : uint16_t { OK, KEY_EXISTS, KEY_NOTFOUND, + KEY_MOVED, SKIPPED, INVALID_VALUE, OUT_OF_RANGE, diff --git a/src/server/blocking_controller_test.cc b/src/server/blocking_controller_test.cc index 3faa31f09..e5839e52b 100644 --- a/src/server/blocking_controller_test.cc +++ b/src/server/blocking_controller_test.cc @@ -88,9 +88,9 @@ TEST_F(BlockingControllerTest, Timeout) { trans_->Schedule(); auto cb = [&](Transaction* t, EngineShard* shard) { return trans_->GetShardArgs(0); }; - bool res = trans_->WaitOnWatch(tp, cb); + facade::OpStatus status = trans_->WaitOnWatch(tp, cb); - EXPECT_FALSE(res); + EXPECT_EQ(status, facade::OpStatus::TIMED_OUT); unsigned num_watched = shard_set->Await( 0, [&] { return EngineShard::tlocal()->blocking_controller()->NumWatched(0); }); diff --git a/src/server/cluster/cluster_config.cc b/src/server/cluster/cluster_config.cc index fd897bdde..50dfd0f91 100644 --- a/src/server/cluster/cluster_config.cc +++ b/src/server/cluster/cluster_config.cc @@ -311,6 +311,10 @@ bool ClusterConfig::IsMySlot(SlotId id) const { return my_slots_.test(id); } +bool ClusterConfig::IsMySlot(std::string_view key) const { + return IsMySlot(KeySlot(key)); +} + ClusterConfig::Node ClusterConfig::GetMasterNodeForSlot(SlotId id) const { CHECK_LT(id, my_slots_.size()) << "Requesting a non-existing slot id " << id; diff --git a/src/server/cluster/cluster_config.h b/src/server/cluster/cluster_config.h index 542ac18bf..87b55286c 100644 --- a/src/server/cluster/cluster_config.h +++ b/src/server/cluster/cluster_config.h @@ -65,6 +65,7 @@ class ClusterConfig { // If key is in my slots ownership return true bool IsMySlot(SlotId id) const; + bool IsMySlot(std::string_view key) const; // Returns the master configured for `id`. Node GetMasterNodeForSlot(SlotId id) const; diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 5ebb119ef..db6b357f1 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -514,11 +514,21 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) before = tl_cluster_config->GetOwnedSlots(); } - DispatchTracker tracker{server_family_->GetListeners(), cntx->conn()}; - auto cb = [&tracker, &new_config](util::ProactorBase* pb) { + // Ignore blocked commands because we filter them with CancelBlockingOnThread + DispatchTracker tracker{server_family_->GetListeners(), cntx->conn(), false /* ignore paused */, + true /* ignore blocked */}; + + auto blocking_filter = [&new_config](ArgSlice keys) { + bool moved = any_of(keys.begin(), keys.end(), [&](auto k) { return !new_config->IsMySlot(k); }); + return moved ? OpStatus::KEY_MOVED : OpStatus::OK; + }; + + auto cb = [this, &tracker, &new_config, blocking_filter](util::ProactorBase* pb) { + server_family_->CancelBlockingOnThread(blocking_filter); tl_cluster_config = new_config; tracker.TrackOnThread(); }; + server_family_->service().proactor_pool().AwaitFiberOnAll(std::move(cb)); DCHECK(tl_cluster_config != nullptr); diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index 7fe81e443..9fc161b89 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -117,12 +117,6 @@ void ConnectionContext::ChangeMonitor(bool start) { EnableMonitoring(start); } -void ConnectionContext::CancelBlocking() { - if (transaction) { - transaction->CancelBlocking(); - } -} - vector ChangeSubscriptions(bool pattern, CmdArgList args, bool to_add, bool to_reply, ConnectionContext* conn) { vector result(to_reply ? args.size() : 0, 0); diff --git a/src/server/conn_context.h b/src/server/conn_context.h index f0880c35b..4b5773a3f 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -155,8 +155,6 @@ struct ConnectionState { // For get op - we use it as a mask of MCGetMask values. uint32_t memcache_flag = 0; - bool is_blocking = false; // whether this connection is blocking on a command - ExecInfo exec_info; ReplicationInfo replication_info; @@ -194,7 +192,6 @@ class ConnectionContext : public facade::ConnectionContext { void UnsubscribeAll(bool to_reply); void PUnsubscribeAll(bool to_reply); void ChangeMonitor(bool start); // either start or stop monitor on a given connection - void CancelBlocking(); // Cancel an ongoing blocking transaction if there is one. size_t UsedMemory() const override; diff --git a/src/server/container_utils.cc b/src/server/container_utils.cc index 463258009..aa3901c42 100644 --- a/src/server/container_utils.cc +++ b/src/server/container_utils.cc @@ -238,7 +238,8 @@ OpResult FindFirstNonEmptyKey(Transaction* trans, int req_obj_typ } OpResult RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type, - BlockingResultCb func, unsigned limit_ms) { + BlockingResultCb func, unsigned limit_ms, + bool* block_flag) { trans->Schedule(); string result_key; @@ -281,9 +282,12 @@ OpResult RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty auto wcb = [](Transaction* t, EngineShard* shard) { return t->GetShardArgs(shard->shard_id()); }; - bool wait_succeeded = trans->WaitOnWatch(limit_tp, std::move(wcb)); - if (!wait_succeeded) - return OpStatus::TIMED_OUT; + *block_flag = true; + auto status = trans->WaitOnWatch(limit_tp, std::move(wcb)); + *block_flag = false; + + if (status != OpStatus::OK) + return status; auto cb = [&](Transaction* t, EngineShard* shard) { if (auto wake_key = t->GetWakeKey(shard->shard_id()); wake_key) { diff --git a/src/server/container_utils.h b/src/server/container_utils.h index d359895d6..dfce2b166 100644 --- a/src/server/container_utils.h +++ b/src/server/container_utils.h @@ -95,8 +95,10 @@ using BlockingResultCb = // Block until a any key of the transaction becomes non-empty and executes the callback. // If multiple keys are non-empty when this function is called, the callback is executed // immediately with the first key listed in the tx arguments. +// The block flag is set to true while the transaction is blocking. OpResult RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type, - BlockingResultCb cb, unsigned limit_ms); + BlockingResultCb cb, unsigned limit_ms, + bool* block_flag); }; // namespace container_utils diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 77eeabb07..e5af8137f 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -421,12 +421,13 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) { absl::Time start = absl::Now(); AggregateStatus status; - sf_->CancelBlockingCommands(); - // We need to await for all dispatches to finish: Otherwise a transaction might be scheduled // after this function exits but before the actual shutdown. facade::DispatchTracker tracker{sf_->GetListeners(), cntx->conn()}; - tracker.TrackAll(); + shard_set->pool()->Await([&](unsigned index, auto* pb) { + sf_->CancelBlockingOnThread(); + tracker.TrackOnThread(); + }); if (!tracker.Wait(timeout_dur)) { LOG(WARNING) << "Couldn't wait for commands to finish dispatching. " << timeout_dur; diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 2255fd5fc..dd38bb816 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -889,9 +889,8 @@ OpResult BPopPusher::RunSingle(Transaction* t, time_point tp) { auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; }; // Block - bool wait_succeeded = t->WaitOnWatch(tp, std::move(wcb)); - if (!wait_succeeded) - return OpStatus::TIMED_OUT; + if (auto status = t->WaitOnWatch(tp, std::move(wcb)); status != OpStatus::OK) + return status; t->Execute(cb_move, true); return op_res; @@ -914,9 +913,8 @@ OpResult BPopPusher::RunPair(Transaction* t, time_point tp) { // This allows us to run Transaction::Execute on watched transactions in both shards. auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice{&this->pop_key_, 1}; }; - bool wait_succeeded = t->WaitOnWatch(tp, std::move(wcb)); - if (!wait_succeeded) - return OpStatus::TIMED_OUT; + if (auto status = t->WaitOnWatch(tp, std::move(wcb)); status != OpStatus::OK) + return status; return MoveTwoShards(t, pop_key_, push_key_, popdir_, pushdir_, true); } @@ -1206,10 +1204,9 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn popped_value = OpBPop(t, shard, key, dir); }; - cntx->conn_state.is_blocking = true; OpResult popped_key = container_utils::RunCbOnFirstNonEmptyBlocking( - transaction, OBJ_LIST, std::move(cb), unsigned(timeout * 1000)); - cntx->conn_state.is_blocking = false; + transaction, OBJ_LIST, std::move(cb), unsigned(timeout * 1000), &cntx->blocked); + auto* rb = static_cast(cntx->reply_builder()); if (popped_key) { DVLOG(1) << "BPop " << transaction->DebugId() << " popped from key " << popped_key; // key. @@ -1222,8 +1219,12 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn switch (popped_key.status()) { case OpStatus::WRONG_TYPE: return rb->SendError(kWrongTypeErr); + case OpStatus::CANCELLED: case OpStatus::TIMED_OUT: return rb->SendNullArray(); + case OpStatus::KEY_MOVED: + // TODO: proper error for moved + return rb->SendError("-MOVED"); default: LOG(ERROR) << "Unexpected error " << popped_key.status(); } diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 20e566049..93ade7adb 100755 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1447,7 +1447,8 @@ facade::ConnectionContext* Service::CreateContext(util::FiberSocketBase* peer, // a bit of a hack. I set up breaker callback here for the owner. // Should work though it's confusing to have it here. owner->RegisterBreakHook([res, this](uint32_t) { - res->CancelBlocking(); + if (res->transaction) + res->transaction->CancelBlocking(nullptr); this->server_family().BreakOnShutdown(); }); @@ -2425,7 +2426,7 @@ string Service::GetContextInfo(facade::ConnectionContext* cntx) { if (server_cntx->conn_state.subscribe_info) buf[index++] = 'P'; - if (server_cntx->conn_state.is_blocking) + if (server_cntx->blocked) buf[index++] = 'b'; if (index) { diff --git a/src/server/server_family.cc b/src/server/server_family.cc index afc85f3cd..b85e2a841 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1142,17 +1142,18 @@ void ServerFamily::BreakOnShutdown() { dfly_cmd_->BreakOnShutdown(); } -void ServerFamily::CancelBlockingCommands() { - auto cb = [](unsigned thread_index, util::Connection* conn) { - facade::ConnectionContext* fc = static_cast(conn)->cntx(); - if (fc) { - ConnectionContext* cntx = static_cast(fc); - cntx->CancelBlocking(); +void ServerFamily::CancelBlockingOnThread(std::function status_cb) { + auto cb = [status_cb](unsigned thread_index, util::Connection* conn) { + if (auto fcntx = static_cast(conn)->cntx(); fcntx) { + auto* cntx = static_cast(fcntx); + if (cntx->transaction && cntx->blocked) { + cntx->transaction->CancelBlocking(status_cb); + } } }; - for (auto* listener : listeners_) { - listener->TraverseConnections(cb); - } + + for (auto* listener : listeners_) + listener->TraverseConnectionsOnThread(cb); } string GetPassword() { diff --git a/src/server/server_family.h b/src/server/server_family.h index 192869b04..211f456ec 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -196,7 +196,7 @@ class ServerFamily { void BreakOnShutdown(); - void CancelBlockingCommands(); + void CancelBlockingOnThread(std::function = {}); // Sets the server to replicate another instance. Does not flush the database beforehand! void Replicate(std::string_view host, std::string_view port); diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index ccfc29386..95b0bbe54 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -2823,10 +2823,8 @@ void XReadBlock(ReadOpts opts, ConnectionContext* cntx) { auto tp = (opts.timeout) ? chrono::steady_clock::now() + chrono::milliseconds(opts.timeout) : Transaction::time_point::max(); - bool wait_succeeded = cntx->transaction->WaitOnWatch(tp, std::move(wcb)); - if (!wait_succeeded) { + if (auto status = cntx->transaction->WaitOnWatch(tp, std::move(wcb)); status != OpStatus::OK) return rb->SendNullArray(); - } // Resolve the entry in the woken key. Note this must not use OpRead since // only the shard that contains the woken key blocks for the awoken diff --git a/src/server/transaction.cc b/src/server/transaction.cc index bc12987ef..8cd30b069 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1144,7 +1144,7 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const { return reverse_index_[sd.arg_start + arg_index]; } -bool Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider) { +OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider) { DVLOG(2) << "WaitOnWatch " << DebugId(); using namespace chrono; @@ -1165,29 +1165,36 @@ bool Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provi auto* stats = ServerState::tl_connection_stats(); ++stats->num_blocked_clients; + if (DCHECK_IS_ON()) { + int64_t ms = -1; + if (tp != time_point::max()) + ms = duration_cast(tp - time_point::clock::now()).count(); + DVLOG(1) << "WaitOnWatch TimeWait for " << ms << " ms " << DebugId(); + } + cv_status status = cv_status::no_timeout; if (tp == time_point::max()) { - DVLOG(1) << "WaitOnWatch foreva " << DebugId(); blocking_ec_.await(std::move(wake_cb)); - DVLOG(1) << "WaitOnWatch AfterWait"; } else { - DVLOG(1) << "WaitOnWatch TimeWait for " - << duration_cast(tp - time_point::clock::now()).count() << " ms " - << DebugId(); - status = blocking_ec_.await_until(std::move(wake_cb), tp); - - DVLOG(1) << "WaitOnWatch await_until " << int(status); } + DVLOG(1) << "WaitOnWatch done " << int(status) << " " << DebugId(); + --stats->num_blocked_clients; - bool is_expired = (coordinator_state_ & COORD_CANCELLED) || status == cv_status::timeout; - if (is_expired) + OpStatus result = OpStatus::OK; + if (status == cv_status::timeout) { + result = OpStatus::TIMED_OUT; + } else if (coordinator_state_ & COORD_CANCELLED) { + result = local_result_; + } + + if (result != OpStatus::OK) ExpireBlocking(wkeys_provider); coordinator_state_ &= ~COORD_BLOCKED; - return !is_expired; + return result; } // Runs only in the shard thread. @@ -1430,11 +1437,26 @@ void Transaction::RunOnceAsCommand(const CommandId* cid, RunnableType cb) { }); } -void Transaction::CancelBlocking() { - if (coordinator_state_ & COORD_BLOCKED) { - coordinator_state_ |= COORD_CANCELLED; - blocking_ec_.notify(); +void Transaction::CancelBlocking(std::function status_cb) { + if ((coordinator_state_ & COORD_BLOCKED) == 0) + return; + + OpStatus status = OpStatus::CANCELLED; + if (status_cb) { + vector all_keys; + IterateActiveShards([this, &all_keys](PerShardData&, auto i) { + auto shard_keys = GetShardArgs(i); + all_keys.insert(all_keys.end(), shard_keys.begin(), shard_keys.end()); + }); + status = status_cb(absl::MakeSpan(all_keys)); } + + if (status == OpStatus::OK) + return; + + coordinator_state_ |= COORD_CANCELLED; + local_result_ = status; + blocking_ec_.notify(); } OpResult DetermineKeys(const CommandId* cid, CmdArgList args) { diff --git a/src/server/transaction.h b/src/server/transaction.h index 64d175e28..6ff5f75c4 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -186,7 +186,7 @@ class Transaction { // or b) tp is reached. If tp is time_point::max() then waits indefinitely. // Expects that the transaction had been scheduled before, and uses Execute(.., true) to register. // Returns false if timeout occurred, true if was notified by one of the keys. - bool WaitOnWatch(const time_point& tp, WaitKeysProvider cb); + facade::OpStatus WaitOnWatch(const time_point& tp, WaitKeysProvider cb); // Returns true if transaction is awaked, false if it's timed-out and can be removed from the // blocking queue. @@ -194,7 +194,7 @@ class Transaction { // Cancel all blocking watches. Set COORD_CANCELLED. // Must be called from coordinator thread. - void CancelBlocking(); + void CancelBlocking(std::function); // In some cases for non auto-journaling commands we want to enable the auto journal flow. void RenableAutoJournal() { diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index 8c29e2b7d..71f34f7b9 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -1321,15 +1321,14 @@ void BZPopMinMax(CmdArgList args, ConnectionContext* cntx, bool is_max) { VLOG(1) << "BZPop timeout(" << timeout << ")"; Transaction* transaction = cntx->transaction; + OpResult popped_array; - cntx->conn_state.is_blocking = true; + auto cb = [is_max, &popped_array](Transaction* t, EngineShard* shard, std::string_view key) { + popped_array = OpBZPop(t, shard, key, is_max); + }; + OpResult popped_key = container_utils::RunCbOnFirstNonEmptyBlocking( - transaction, OBJ_ZSET, - [is_max, &popped_array](Transaction* t, EngineShard* shard, std::string_view key) { - popped_array = OpBZPop(t, shard, key, is_max); - }, - unsigned(timeout * 1000)); - cntx->conn_state.is_blocking = false; + transaction, OBJ_ZSET, std::move(cb), unsigned(timeout * 1000), &cntx->blocked); auto* rb = static_cast(cntx->reply_builder()); if (popped_key) { diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index 9692c0554..d36bdfb95 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1,5 +1,6 @@ import pytest import re +import json import redis from redis import asyncio as aioredis import asyncio @@ -515,6 +516,50 @@ async def test_cluster_flush_slots_after_config_change(df_local_factory: DflyIns assert await c_replica.execute_command("dbsize") == (100_000 - slot_0_size) +@dfly_args({"proactor_threads": 4, "cluster_mode": "yes", "admin_port": 30001}) +async def test_cluster_blocking_command(df_server): + c_master = df_server.client() + c_master_admin = df_server.admin_client() + + config = [ + { + "slot_ranges": [{"start": 0, "end": 8000}], + "master": {"id": await get_node_id(c_master_admin), "ip": "10.0.0.1", "port": 7000}, + "replicas": [], + }, + { + "slot_ranges": [{"start": 8001, "end": 16383}], + "master": {"id": "other", "ip": "10.0.0.2", "port": 7000}, + "replicas": [], + }, + ] + + assert ( + await c_master_admin.execute_command("DFLYCLUSTER", "CONFIG", json.dumps(config)) + ) == "OK" + + assert (await c_master.execute_command("CLUSTER", "KEYSLOT", "keep-local")) == 3479 + assert (await c_master.execute_command("CLUSTER", "KEYSLOT", "remove-key-4")) == 6103 + + v1 = asyncio.create_task(c_master.blpop("keep-local", 2)) + v2 = asyncio.create_task(c_master.blpop("remove-key-4", 2)) + + await asyncio.sleep(0.1) + + config[0]["slot_ranges"][0]["end"] = 5000 + config[1]["slot_ranges"][0]["start"] = 5001 + assert ( + await c_master_admin.execute_command("DFLYCLUSTER", "CONFIG", json.dumps(config)) + ) == "OK" + + await c_master.lpush("keep-local", "WORKS") + + assert (await v1) == ("keep-local", "WORKS") + with pytest.raises(aioredis.ResponseError) as e_info: + await v2 + assert "MOVED" in str(e_info.value) + + @dfly_args({"proactor_threads": 4, "cluster_mode": "yes"}) async def test_cluster_native_client(df_local_factory: DflyInstanceFactory): # Start and configure cluster with 3 masters and 3 replicas