Fix blocking commands moved error (#3334)

* fix: BLPOP BZPOP(MIN|MAX) moved error
This commit is contained in:
Borys 2024-07-18 20:38:13 +03:00 committed by GitHub
parent d648e3ddd1
commit cad62679a4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 78 additions and 17 deletions

View file

@ -9,9 +9,13 @@ extern "C" {
#include "base/flags.h"
#include "base/logging.h"
#include "cluster_defs.h"
#include "facade/error.h"
#include "slot_set.h"
#include "src/server/common.h"
// TODO remove when tl_cluster_config will be moved out from it
#include "server/cluster/cluster_family.h"
using namespace std;
ABSL_FLAG(string, cluster_mode, "",
@ -92,4 +96,16 @@ bool IsClusterShardedByTag() {
return IsClusterEnabledOrEmulated() || LockTagOptions::instance().enabled;
}
std::optional<std::string> SlotOwnershipErrorStr(SlotId slot_id) {
const cluster::ClusterConfig* cluster_config = ClusterFamily::cluster_config();
if (!cluster_config)
return facade::kClusterNotConfigured;
if (!cluster_config->IsMySlot(slot_id)) {
// See more details here: https://redis.io/docs/reference/cluster-spec/#moved-redirection
cluster::ClusterNodeInfo master = cluster_config->GetMasterNodeForSlot(slot_id);
return absl::StrCat("-MOVED ", slot_id, " ", master.ip, ":", master.port);
}
return std::nullopt;
}
} // namespace dfly::cluster

View file

@ -5,6 +5,7 @@
#pragma once
#include <cstdint>
#include <optional>
#include <string>
#include <string_view>
#include <vector>
@ -122,6 +123,9 @@ enum class MigrationState : uint8_t {
SlotId KeySlot(std::string_view key);
// return error message if slot doesn't belong to this node
std::optional<std::string> SlotOwnershipErrorStr(SlotId slot_id);
void InitializeCluster();
bool IsClusterEnabled();
bool IsClusterEmulated();

View file

@ -29,7 +29,7 @@ class ClusterFamily {
void Register(CommandRegistry* registry);
// Returns a thread-local pointer.
ClusterConfig* cluster_config();
static ClusterConfig* cluster_config();
void ApplyMigrationSlotRangeToConfig(std::string_view node_id, const SlotRanges& slots,
bool is_outgoing);

View file

@ -18,7 +18,6 @@ extern "C" {
#include "base/flags.h"
#include "base/logging.h"
#include "core/compact_object.h"
#include "server/cluster/cluster_defs.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/journal.h"

View file

@ -1225,9 +1225,11 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn
case OpStatus::CANCELLED:
case OpStatus::TIMED_OUT:
return rb->SendNullArray();
case OpStatus::KEY_MOVED:
// TODO: proper error for moved
return cntx->SendError("-MOVED");
case OpStatus::KEY_MOVED: {
auto error = cluster::SlotOwnershipErrorStr(*transaction->GetUniqueSlotId());
CHECK(error.has_value());
return cntx->SendError(std::move(*error));
}
default:
LOG(ERROR) << "Unexpected error " << popped_key.status();
}

View file

@ -629,10 +629,10 @@ void ClusterHtmlPage(const http::QueryArgs& args, HttpContext* send,
}
if (cluster::IsClusterEnabled()) {
if (cluster_family->cluster_config() == nullptr) {
if (cluster::ClusterFamily::cluster_config() == nullptr) {
resp.body() += "<h2>Not yet configured.</h2>\n";
} else {
auto config = cluster_family->cluster_config()->GetConfig();
auto config = cluster::ClusterFamily::cluster_config()->GetConfig();
for (const auto& shard : config) {
resp.body() += "<div class='master'>\n";
resp.body() += "<h3>Master</h3>\n";
@ -957,16 +957,10 @@ optional<ErrorReply> Service::CheckKeysOwnership(const CommandId* cid, CmdArgLis
return ErrorReply{"-CROSSSLOT Keys in request don't hash to the same slot"};
}
// Check keys slot is in my ownership
const cluster::ClusterConfig* cluster_config = cluster_family_.cluster_config();
if (cluster_config == nullptr) {
return ErrorReply{kClusterNotConfigured};
}
if (keys_slot.has_value() && !cluster_config->IsMySlot(*keys_slot)) {
// See more details here: https://redis.io/docs/reference/cluster-spec/#moved-redirection
cluster::ClusterNodeInfo master = cluster_config->GetMasterNodeForSlot(*keys_slot);
return ErrorReply{absl::StrCat("-MOVED ", *keys_slot, " ", master.ip, ":", master.port)};
if (keys_slot.has_value()) {
if (auto error_str = cluster::SlotOwnershipErrorStr(*keys_slot); error_str) {
return ErrorReply{std::move(*error_str)};
}
}
return nullopt;

View file

@ -1364,6 +1364,11 @@ void BZPopMinMax(CmdArgList args, ConnectionContext* cntx, bool is_max) {
case OpStatus::CANCELLED:
case OpStatus::TIMED_OUT:
return rb->SendNullArray();
case OpStatus::KEY_MOVED: {
auto error = cluster::SlotOwnershipErrorStr(*transaction->GetUniqueSlotId());
CHECK(error.has_value());
return cntx->SendError(std::move(*error));
}
default:
LOG(ERROR) << "Unexpected error " << popped_key.status();
}

View file

@ -787,6 +787,47 @@ async def test_cluster_blocking_command(df_server):
await close_clients(c_master, c_master_admin)
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_blocking_commands_cancel(df_factory, df_seeder_factory):
instances = [
df_factory.create(port=BASE_PORT + i, admin_port=BASE_PORT + i + 1000) for i in range(2)
]
df_factory.start_all(instances)
nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = [(0, 16383)]
nodes[1].slots = []
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
set_task = asyncio.create_task(nodes[0].client.execute_command("BZPOPMIN set1 0"))
list_task = asyncio.create_task(nodes[0].client.execute_command("BLPOP list1 0"))
nodes[0].migrations.append(
MigrationInfo("127.0.0.1", nodes[1].instance.port, [(0, 16383)], nodes[1].id)
)
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
await wait_for_status(nodes[0].admin_client, nodes[1].id, "FINISHED")
nodes[0].migrations = []
nodes[0].slots = []
nodes[1].slots = [(0, 16383)]
logging.debug("remove finished migrations")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
with pytest.raises(aioredis.ResponseError) as set_e_info:
await set_task
assert "MOVED 3037 127.0.0.1:30002" == str(set_e_info.value)
with pytest.raises(aioredis.ResponseError) as list_e_info:
await list_task
assert "MOVED 7141 127.0.0.1:30002" == str(list_e_info.value)
await close_clients(*[node.client for node in nodes], *[node.admin_client for node in nodes])
@pytest.mark.parametrize("set_cluster_node_id", [True, False])
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_native_client(