feat(dfly_bench): Handle moved slots in cluster (#4761)

* feat(dfly_bench): Handle MOVED slots

Range slots for each shard are managed by ShardSlots object that
is passed to all threads and connections. Each shard endpoint have it's
own interval set containing valid slot ranges. When MOVED error is
received on one connection, that slot will be removed from source
node slot ranges and added to destination node slot ranges.

Boost ICL interval_set container is used to store intervals. Unique lock
is taken during slot move, while for fetching slot we use shard lock.

Signed-off-by: mkaruza <mario@dragonflydb.io>
This commit is contained in:
mkaruza 2025-03-24 16:51:15 +01:00 committed by GitHub
parent 5c675dad1a
commit a9ecee2ba5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -13,6 +13,7 @@ extern "C" {
#include <absl/strings/str_format.h>
#include <absl/strings/str_split.h>
#include <boost/icl/interval_set.hpp>
#include <queue>
#include <tuple>
@ -125,7 +126,83 @@ struct ShardInfo {
tcp::endpoint endpoint;
};
using ClusterSpec = vector<ShardInfo>;
using ClusterShards = vector<ShardInfo>;
class ShardSlots {
private:
using IntervalSet = boost::icl::interval_set<uint16_t>;
using Interval = boost::icl::interval<uint16_t>;
public:
void SetClusterSlotRanges(const ClusterShards& cluster_shards) {
for (auto shard : cluster_shards) {
IntervalSet shard_slots_;
for (auto& slot : shard.slots) {
shard_slots_.insert(Interval::closed(slot.first, slot.second));
}
shards_slots_.emplace(shard.endpoint, shard_slots_);
}
}
SlotRange NextSlotRange(const tcp::endpoint& ep, size_t i) {
shared_lock<fb2::SharedMutex> lock(mu_);
const auto& shard_slot_interval = shards_slots_[ep];
unsigned index = i % shard_slot_interval.iterative_size();
const auto& interval = next(shard_slot_interval.begin(), index);
return SlotRange{boost::icl::first(*interval), boost::icl::last(*interval)};
}
bool Empty() const {
return shards_slots_.empty();
}
size_t Size() const {
return shards_slots_.size();
}
vector<tcp::endpoint> Endpoints() const {
vector<tcp::endpoint> endpoints;
for (const auto& shard : shards_slots_) {
endpoints.push_back(shard.first);
}
return endpoints;
}
void MoveSlot(const tcp::endpoint& src_ep, const tcp::endpoint& dst_ep, uint16_t slot_id) {
unique_lock<fb2::SharedMutex> lock(mu_);
// Remove slot from source ep
auto& src_shard_slots = shards_slots_[src_ep];
// If slot id doesn't exists on source ep we have moved this slot before
if (src_shard_slots.find(slot_id) == src_shard_slots.end()) {
return;
}
src_shard_slots.subtract(slot_id);
// Add slot to dest ep
auto& dst_shard_slots = shards_slots_[dst_ep];
dst_shard_slots.insert(slot_id);
}
private:
struct Hasher {
using is_transparent = void;
size_t operator()(const tcp::endpoint& ep) const {
std::size_t hash1 = std::hash<string>()(ep.address().to_string());
std::size_t hash2 = std::hash<unsigned short>()(ep.port());
return hash1 ^ (hash2 + 0x9e3779b9 + (hash1 << 6) + (hash1 >> 2));
}
};
struct Eq {
using is_transparent = void;
bool operator()(const tcp::endpoint& left, const tcp::endpoint& right) const {
return left == right;
}
};
private:
fb2::SharedMutex mu_;
absl::flat_hash_map<tcp::endpoint, IntervalSet, Hasher, Eq> shards_slots_;
};
class KeyGenerator {
public:
@ -305,8 +382,9 @@ struct ClientStats {
// Per connection driver.
class Driver {
public:
explicit Driver(uint32_t num_reqs, uint32_t time_limit, ClientStats* stats, ProactorBase* p)
: num_reqs_(num_reqs), time_limit_(time_limit), stats_(*stats) {
explicit Driver(uint32_t num_reqs, uint32_t time_limit, ClientStats* stats, ProactorBase* p,
ShardSlots* ss)
: num_reqs_(num_reqs), time_limit_(time_limit), shard_slots_(*ss), stats_(*stats) {
socket_.reset(p->CreateSocket());
if (time_limit_ > 0)
num_reqs_ = UINT32_MAX;
@ -316,7 +394,7 @@ class Driver {
Driver(Driver&&) = delete;
Driver& operator=(Driver&&) = delete;
void Connect(unsigned index, const tcp::endpoint& ep, const vector<SlotRange>& slots);
void Connect(unsigned index, const tcp::endpoint& ep);
void Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen);
float done() const {
@ -343,9 +421,10 @@ class Driver {
uint32_t num_reqs_, time_limit_, received_ = 0;
int64_t start_ns_ = 0;
tcp::endpoint ep_;
ShardSlots& shard_slots_;
ClientStats& stats_;
unique_ptr<FiberSocketBase> socket_;
vector<SlotRange> slots_;
fb2::Fiber receive_fb_;
queue<Req> reqs_;
fb2::CondVarAny cnd_;
@ -357,12 +436,12 @@ class Driver {
// Per thread client.
class TLocalClient {
public:
explicit TLocalClient(ProactorBase* p) : p_(p) {
explicit TLocalClient(ProactorBase* p, ShardSlots* ss) : p_(p), shard_slots_(ss) {
}
TLocalClient(const TLocalClient&) = delete;
void Connect(tcp::endpoint ep, const ClusterSpec& cluster);
void Connect(tcp::endpoint ep, const vector<tcp::endpoint>& shard_endpoints);
void Start(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns);
void Join();
@ -398,12 +477,12 @@ class TLocalClient {
private:
ProactorBase* p_;
ShardSlots* shard_slots_;
vector<unique_ptr<Driver>> drivers_;
optional<KeyGenerator> key_gen_;
optional<CommandGenerator> cmd_gen_;
vector<fb2::Fiber> driver_fbs_;
ClusterSpec cluster_spec_;
uint64_t cur_cycle_ns_;
uint64_t target_cycle_;
int64_t start_time_;
@ -497,7 +576,7 @@ void KeyGenerator::EnableClusterMode() {
}
}
void Driver::Connect(unsigned index, const tcp::endpoint& ep, const vector<SlotRange>& slots) {
void Driver::Connect(unsigned index, const tcp::endpoint& ep) {
VLOG(2) << "Connecting " << index << " to " << ep;
error_code ec = socket_->Connect(ep);
CHECK(!ec) << "Could not connect to " << ep << " " << ec;
@ -519,7 +598,7 @@ void Driver::Connect(unsigned index, const tcp::endpoint& ep, const vector<SlotR
string_view resp = io::View(io::Bytes(buf, *res_sz));
CHECK(absl::EndsWith(resp, "\r\n")) << resp;
}
slots_ = slots;
ep_ = ep;
receive_fb_ = MakeFiber(fb2::Launch::dispatch, [this] { ReceiveFb(); });
}
@ -566,9 +645,8 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {
// TODO: this skews the distribution if slot ranges are uneven.
// Ideally we would like to pick randomly a single slot from all the ranges we have
// and pass it to cmd_gen->Next below.
if (!slots_.empty()) {
unsigned index = i % slots_.size();
slot_range = slots_[index];
if (!shard_slots_.Empty()) {
slot_range = shard_slots_.NextSlotRange(ep_, i);
}
string cmd = cmd_gen->Next(slot_range);
@ -677,11 +755,17 @@ void Driver::ParseRESP() {
CHECK_EQ(parts.size(), 2u);
uint32_t slot_id;
CHECK(absl::SimpleAtoi(parts[0], &slot_id));
vector<string_view> host_port = absl::StrSplit(parts[1], ':');
CHECK_EQ(host_port.size(), 2u);
// TODO: to support slot migration.
LOG_EVERY_T(INFO, 1) << "Moved: " << slot_id << " to " << parts[1];
vector<string_view> addr_parts = absl::StrSplit(parts[1], ':');
CHECK_EQ(2u, addr_parts.size());
auto host = ::boost::asio::ip::make_address(addr_parts[0]);
uint32_t port;
CHECK(absl::SimpleAtoi(addr_parts[1], &port));
CHECK_LT(port, 65536u);
shard_slots_.MoveSlot(ep_, tcp::endpoint(host, port), slot_id);
}
++stats_.num_errors;
} else if (reqs_.front().might_hit && parse_args[0].type != RespExpr::NIL) {
@ -732,33 +816,31 @@ void Driver::ParseMC() {
}
}
void TLocalClient::Connect(tcp::endpoint ep, const ClusterSpec& cluster) {
void TLocalClient::Connect(tcp::endpoint ep, const vector<tcp::endpoint>& endpoints) {
VLOG(2) << "Connecting client...";
cluster_spec_ = cluster;
unsigned conn_per_shard = GetFlag(FLAGS_c);
if (cluster.empty()) {
if (shard_slots_->Empty()) {
drivers_.resize(conn_per_shard);
} else {
drivers_.resize(cluster.size() * conn_per_shard);
drivers_.resize(shard_slots_->Size() * conn_per_shard);
}
for (auto& driver : drivers_) {
driver.reset(new Driver{GetFlag(FLAGS_n), GetFlag(FLAGS_test_time), &stats, p_});
driver.reset(new Driver{GetFlag(FLAGS_n), GetFlag(FLAGS_test_time), &stats, p_, shard_slots_});
}
vector<fb2::Fiber> fbs(drivers_.size());
for (size_t i = 0; i < fbs.size(); ++i) {
vector<SlotRange> slots;
tcp::endpoint shard_ep = ep;
if (!cluster.empty()) {
if (!shard_slots_->Empty()) {
size_t shard = i / conn_per_shard;
slots = cluster[shard].slots;
shard_ep = cluster[shard].endpoint;
shard_ep = endpoints[shard];
}
fbs[i] = MakeFiber([&, shard_ep, i, slots = move(slots)] {
fbs[i] = MakeFiber([&, shard_ep, i] {
ThisFiber::SetName(StrCat("connect/", i));
drivers_[i]->Connect(i, shard_ep, slots);
drivers_[i]->Connect(i, shard_ep);
});
}
@ -771,7 +853,7 @@ void TLocalClient::Start(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns)
cmd_gen_.emplace(&key_gen_.value());
driver_fbs_.resize(drivers_.size());
if (!cluster_spec_.empty()) {
if (!shard_slots_->Empty()) {
key_gen_->EnableClusterMode();
}
cur_cycle_ns_ = cycle_ns;
@ -875,7 +957,7 @@ void WatchFiber(size_t num_shards, atomic_bool* finish_signal, ProactorPool* pp)
}
}
ClusterSpec FetchCluster(const tcp::endpoint& ep, ProactorBase* proactor) {
ClusterShards FetchClusterInfo(const tcp::endpoint& ep, ProactorBase* proactor) {
unique_ptr<FiberSocketBase> socket(proactor->CreateSocket());
error_code ec = socket->Connect(ep);
CHECK(!ec) << "Could not connect to " << ep << " " << ec;
@ -904,7 +986,7 @@ ClusterSpec FetchCluster(const tcp::endpoint& ep, ProactorBase* proactor) {
string cluster_spec = resp_vec.front().GetString();
LOG(INFO) << "Cluster spec: " << cluster_spec;
vector<string_view> lines = absl::StrSplit(cluster_spec, '\n', absl::SkipEmpty());
ClusterSpec res;
ClusterShards res;
for (string_view line : lines) {
vector<string_view> parts = absl::StrSplit(line, ' ');
// <id> <ip:port@cport[,hostname]> <flags> <master> <ping-sent> <pong-recv>
@ -996,9 +1078,9 @@ int main(int argc, char* argv[]) {
auto address = ::boost::asio::ip::make_address(ip_addr);
tcp::endpoint ep{address, GetFlag(FLAGS_p)};
ClusterSpec shards;
ClusterShards shards;
if (protocol == RESP) {
shards = proactor->Await([&] { return FetchCluster(ep, proactor); });
shards = proactor->Await([&] { return FetchClusterInfo(ep, proactor); });
}
LOG(INFO) << "Connecting threads to "
<< (shards.empty() ? string("single node ")
@ -1010,13 +1092,17 @@ int main(int argc, char* argv[]) {
absl::SetFlag(&FLAGS_cluster_skip_tags, false);
}
ShardSlots shard_slots;
shard_slots.SetClusterSlotRanges(shards);
std::vector<tcp::endpoint> shard_endpoints = shard_slots.Endpoints();
pp->AwaitFiberOnAll([&](unsigned index, auto* p) {
base::SplitMix64 seed_mix(GetFlag(FLAGS_seed) + index * 0x6a45554a264d72bULL);
auto seed = seed_mix();
VLOG(1) << "Seeding bitgen with seed " << seed;
bit_gen.seed(seed);
client = make_unique<TLocalClient>(p);
client->Connect(ep, shards);
client = make_unique<TLocalClient>(p, &shard_slots);
client->Connect(ep, shard_endpoints);
});
const uint32_t key_minimum = GetFlag(FLAGS_key_minimum);