Support Cluster configuration for dfly_bench (#4664)

Support Cluster configuration for dfly_bench.

The load tester client opens a connection to each shard, in total
`num_shards * FLAGS_c * FLAGS_proactor_threads` connections.

There is no support for MOVED responses at the moment so only static clusters are supported.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2025-02-26 19:45:54 +02:00 committed by GitHub
parent 8512f726f4
commit fff49e0e1f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 190 additions and 29 deletions

View file

@ -18,7 +18,7 @@ if ("${CMAKE_SYSTEM_NAME}" STREQUAL "Linux")
tiering/external_alloc.cc)
add_executable(dfly_bench dfly_bench.cc)
cxx_link(dfly_bench dfly_parser_lib fibers2 absl::random_random)
cxx_link(dfly_bench dfly_parser_lib fibers2 absl::random_random redis_lib)
cxx_test(tiering/disk_storage_test dfly_test_lib LABELS DFLY)
cxx_test(tiering/op_manager_test dfly_test_lib LABELS DFLY)
cxx_test(tiering/small_bins_test dfly_test_lib LABELS DFLY)

View file

@ -2,6 +2,11 @@
// See LICENSE for licensing terms.
//
extern "C" {
#include "redis/crc16.h"
}
#include <absl/container/flat_hash_set.h>
#include <absl/random/random.h>
#include <absl/strings/match.h>
#include <absl/strings/str_cat.h>
@ -9,6 +14,7 @@
#include <absl/strings/str_split.h>
#include <queue>
#include <tuple>
#include "absl/time/clock.h"
#include "absl/time/time.h"
@ -17,9 +23,11 @@
#include "base/random.h"
#include "base/zipf_gen.h"
#include "facade/redis_parser.h"
#include "io/io.h"
#include "io/io_buf.h"
#include "util/fibers/dns_resolve.h"
#include "util/fibers/pool.h"
#include "util/fibers/proactor_base.h"
#include "util/fibers/uring_socket.h"
// A load-test for DragonflyDB that fixes coordinated omission problem.
@ -59,6 +67,7 @@ using namespace util;
using absl::GetFlag;
using absl::StrFormat;
using facade::RedisParser;
using facade::RespExpr;
using facade::RespVec;
using tcp = ::boost::asio::ip::tcp;
using absl::StrCat;
@ -71,6 +80,7 @@ thread_local base::Xoroshiro128p bit_gen;
enum Protocol { RESP, MC_TEXT } protocol;
enum DistType { UNIFORM, NORMAL, ZIPFIAN, SEQUENTIAL } dist_type{UNIFORM};
constexpr uint16_t kNumSlots = 16384;
string_view kTmplPatterns[] = {"__key__"sv, "__data__"sv, "__score__"sv};
@ -94,25 +104,39 @@ static string GetRandomHex(size_t len) {
return res;
}
struct ShardInfo {
uint16_t slot_start = 0;
uint16_t slot_end = 0;
tcp::endpoint endpoint;
};
using ClusterSpec = vector<ShardInfo>;
class KeyGenerator {
public:
KeyGenerator(uint32_t min, uint32_t max);
string operator()();
string operator()(uint16_t slot_id) const;
void EnableClusterMode();
bool IsClusterEnabled() const {
return !hash_slots_.empty();
}
private:
string prefix_;
uint64_t min_, max_, range_;
uint64_t seq_cursor_;
mutable uint64_t seq_cursor_;
double stddev_ = 1.0 / 6;
optional<base::ZipfianGenerator> zipf_;
mutable optional<base::ZipfianGenerator> zipf_;
vector<string> hash_slots_;
};
class CommandGenerator {
public:
CommandGenerator(KeyGenerator* keygen);
string Next();
string Next(uint16_t slot_min, uint16_t slot_max);
bool might_hit() const {
return might_hit_;
@ -156,11 +180,15 @@ CommandGenerator::CommandGenerator(KeyGenerator* keygen) : keygen_(keygen) {
}
}
string CommandGenerator::Next() {
string CommandGenerator::Next(uint16_t slot_min, uint16_t slot_max) {
cmd_.clear();
noreply_ = false;
uint16_t slot_id = 0;
if (keygen_->IsClusterEnabled()) {
slot_id = absl::Uniform(absl::IntervalClosedClosed, bit_gen, slot_min, slot_max);
}
if (command_.empty()) {
string key = (*keygen_)();
string key = (*keygen_)(slot_id);
if (absl::Uniform(bit_gen, 0U, ratio_get_ + ratio_set_) < ratio_set_) {
FillSet(key);
@ -178,7 +206,7 @@ string CommandGenerator::Next() {
for (auto [pos, type] : key_indices_) {
switch (type) {
case KEY:
str = (*keygen_)();
str = (*keygen_)(slot_id);
break;
case VALUE:
str = GetRandomHex(value_.size());
@ -254,7 +282,8 @@ class Driver {
Driver(Driver&&) = delete;
Driver& operator=(Driver&&) = delete;
void Connect(unsigned index, const tcp::endpoint& ep);
void Connect(unsigned index, const tcp::endpoint& ep,
optional<pair<uint16_t, uint16_t>> slot_range);
void Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen);
float done() const {
@ -283,6 +312,7 @@ class Driver {
ClientStats& stats_;
unique_ptr<FiberSocketBase> socket_;
optional<pair<uint16_t, uint16_t>> slot_range_;
fb2::Fiber receive_fb_;
queue<Req> reqs_;
fb2::CondVarAny cnd_;
@ -295,15 +325,11 @@ class Driver {
class TLocalClient {
public:
explicit TLocalClient(ProactorBase* p) : p_(p) {
drivers_.resize(GetFlag(FLAGS_c));
for (auto& driver : drivers_) {
driver.reset(new Driver{GetFlag(FLAGS_n), GetFlag(FLAGS_test_time), &stats, p_});
}
}
TLocalClient(const TLocalClient&) = delete;
void Connect(tcp::endpoint ep);
void Connect(tcp::endpoint ep, const ClusterSpec& cluster);
void Start(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns);
void Join();
@ -344,7 +370,7 @@ class TLocalClient {
optional<CommandGenerator> cmd_gen_;
vector<fb2::Fiber> driver_fbs_;
ClusterSpec cluster_spec_;
uint64_t cur_cycle_ns_;
uint64_t target_cycle_;
int64_t start_time_;
@ -371,7 +397,7 @@ KeyGenerator::KeyGenerator(uint32_t min, uint32_t max)
}
}
string KeyGenerator::operator()() {
string KeyGenerator::operator()(uint16_t slot_id) const {
uint64_t key_suffix{0};
switch (dist_type) {
case UNIFORM:
@ -391,12 +417,35 @@ string KeyGenerator::operator()() {
seq_cursor_ = min_;
break;
}
return StrCat(prefix_, key_suffix);
string res = prefix_;
if (IsClusterEnabled()) {
absl::StrAppend(&res, "{", hash_slots_[slot_id], "}");
}
absl::StrAppend(&res, key_suffix);
return res;
}
void Driver::Connect(unsigned index, const tcp::endpoint& ep) {
VLOG(2) << "Connecting " << index;
void KeyGenerator::EnableClusterMode() {
hash_slots_.resize(kNumSlots);
uint32_t i = 0;
uint32_t num_slots_filled = 0;
// Precompute the hash slots for each of the slot ids so given the slot id
// we could generate a key that belongs to that slot.
while (num_slots_filled < kNumSlots) {
string slot = absl::StrCat(i);
uint16_t id = crc16(slot.data(), slot.length()) % kNumSlots;
if (hash_slots_[id].empty()) {
hash_slots_[id] = slot;
num_slots_filled++;
}
++i;
}
}
void Driver::Connect(unsigned index, const tcp::endpoint& ep,
optional<pair<uint16_t, uint16_t>> slot_range) {
VLOG(2) << "Connecting " << index << " to " << ep;
error_code ec = socket_->Connect(ep);
CHECK(!ec) << "Could not connect to " << ep << " " << ec;
if (GetFlag(FLAGS_tcp_nodelay)) {
@ -417,6 +466,7 @@ void Driver::Connect(unsigned index, const tcp::endpoint& ep) {
string_view resp = io::View(io::Bytes(buf, *res_sz));
CHECK(absl::EndsWith(resp, "\r\n")) << resp;
}
slot_range_ = slot_range;
receive_fb_ = MakeFiber(fb2::Launch::dispatch, [this] { ReceiveFb(); });
}
@ -427,6 +477,12 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {
stats_.num_clients++;
int64_t time_limit_ns =
time_limit_ > 0 ? int64_t(time_limit_) * 1'000'000'000 + start_ns_ : INT64_MAX;
uint16_t slot_min = 0;
uint16_t slot_max = kNumSlots - 1;
if (slot_range_) {
slot_min = slot_range_->first;
slot_max = slot_range_->second;
}
for (unsigned i = 0; i < num_reqs_; ++i) {
int64_t now = absl::GetCurrentTimeNanos();
@ -456,7 +512,7 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {
fb2::NoOpLock lk;
cnd_.wait(lk, [this, pipeline] { return reqs_.size() < pipeline; });
}
string cmd = cmd_gen->Next();
string cmd = cmd_gen->Next(slot_min, slot_max);
Req req;
req.start = absl::GetCurrentTimeNanos();
@ -552,9 +608,10 @@ void Driver::ParseRESP() {
do {
result = parser_.Parse(io_buf_.InputBuffer(), &consumed, &parse_args);
if (result == RedisParser::OK && !parse_args.empty()) {
if (parse_args[0].type == facade::RespExpr::ERROR) {
if (parse_args[0].type == RespExpr::ERROR) {
VLOG(2) << "Error " << io::View(io_buf_.InputBuffer());
++stats_.num_errors;
} else if (reqs_.front().might_hit && parse_args[0].type != facade::RespExpr::NIL) {
} else if (reqs_.front().might_hit && parse_args[0].type != RespExpr::NIL) {
++stats_.hit_count;
}
parse_args.clear();
@ -602,14 +659,33 @@ void Driver::ParseMC() {
}
}
void TLocalClient::Connect(tcp::endpoint ep) {
void TLocalClient::Connect(tcp::endpoint ep, const ClusterSpec& cluster) {
VLOG(2) << "Connecting client...";
cluster_spec_ = cluster;
unsigned conn_per_shard = GetFlag(FLAGS_c);
if (cluster.empty()) {
drivers_.resize(conn_per_shard);
} else {
drivers_.resize(cluster.size() * conn_per_shard);
}
for (auto& driver : drivers_) {
driver.reset(new Driver{GetFlag(FLAGS_n), GetFlag(FLAGS_test_time), &stats, p_});
}
vector<fb2::Fiber> fbs(drivers_.size());
for (size_t i = 0; i < fbs.size(); ++i) {
fbs[i] = MakeFiber([&, i] {
optional<pair<uint16_t, uint16_t>> slot_range;
tcp::endpoint shard_ep = ep;
if (!cluster.empty()) {
size_t shard = i / conn_per_shard;
slot_range = {cluster[shard].slot_start, cluster[shard].slot_end};
shard_ep = cluster[shard].endpoint;
}
fbs[i] = MakeFiber([&, shard_ep, i, slot_range] {
ThisFiber::SetName(StrCat("connect/", i));
drivers_[i]->Connect(i, ep);
drivers_[i]->Connect(i, shard_ep, slot_range);
});
}
@ -622,7 +698,9 @@ void TLocalClient::Start(uint32_t key_min, uint32_t key_max, uint64_t cycle_ns)
cmd_gen_.emplace(&key_gen_.value());
driver_fbs_.resize(drivers_.size());
if (!cluster_spec_.empty()) {
key_gen_->EnableClusterMode();
}
cur_cycle_ns_ = cycle_ns;
target_cycle_ = cycle_ns;
start_time_ = absl::GetCurrentTimeNanos();
@ -724,6 +802,82 @@ void WatchFiber(atomic_bool* finish_signal, ProactorPool* pp) {
}
}
ClusterSpec FetchCluster(const tcp::endpoint& ep, ProactorBase* proactor) {
unique_ptr<FiberSocketBase> socket(proactor->CreateSocket());
error_code ec = socket->Connect(ep);
CHECK(!ec) << "Could not connect to " << ep << " " << ec;
ec = socket->Write(io::Buffer("cluster nodes\r\n"));
CHECK(!ec);
facade::RedisParser parser{RedisParser::CLIENT, 1024};
uint8_t buf[1024];
RespVec resp_vec;
while (true) {
io::Result<size_t> res = socket->Recv(buf);
CHECK(res) << res.error().message();
RespExpr::Buffer bytes(buf, *res);
uint32_t consumed = 0;
facade::RedisParser::Result result = parser.Parse(bytes, &consumed, &resp_vec);
if (result == facade::RedisParser::OK) {
break;
}
CHECK_EQ(result, facade::RedisParser::INPUT_PENDING);
}
CHECK_EQ(1u, resp_vec.size());
std::ignore = socket->Close();
if (resp_vec.front().type == RespExpr::ERROR) {
LOG(INFO) << "Cluster command failed " << resp_vec.front().GetString();
return {};
}
string cluster_spec = resp_vec.front().GetString();
LOG(INFO) << "Cluster spec: " << cluster_spec;
vector<string_view> lines = absl::StrSplit(cluster_spec, '\n', absl::SkipEmpty());
ClusterSpec res;
for (string_view line : lines) {
vector<string_view> parts = absl::StrSplit(line, ' ');
// <id> <ip:port@cport[,hostname]> <flags> <master> <ping-sent> <pong-recv>
// <config-epoch> <link-state> <slot> <slot> ... <slot>
if (parts.size() < 9) {
LOG(WARNING) << "Skipping line: " << line;
continue;
}
ShardInfo shard;
vector<string_view> addr_parts = absl::StrSplit(parts[1], ':');
CHECK_EQ(2u, addr_parts.size());
auto address = ::boost::asio::ip::make_address(addr_parts[0]);
uint32_t val;
vector<string_view> port_parts = absl::StrSplit(addr_parts[1], '@');
CHECK_EQ(2u, port_parts.size());
CHECK(absl::SimpleAtoi(port_parts[0], &val));
CHECK_LT(val, 65536u);
shard.endpoint = tcp::endpoint(address, val);
string_view flags = parts[2];
absl::flat_hash_set<string_view> flags_set(absl::StrSplit(flags, ','));
if (!flags_set.contains("master")) {
LOG(INFO) << "Skipping non-master node " << shard.endpoint << " " << flags;
continue;
}
vector<string_view> slots = absl::StrSplit(parts[8], '-');
if (!absl::SimpleAtoi(slots[0], &val) || val >= kNumSlots) {
LOG(ERROR) << "Invalid slot definition " << parts[8];
continue;
}
shard.slot_start = val;
if (slots.size() > 1) {
CHECK(absl::SimpleAtoi(slots[1], &val));
shard.slot_end = val;
} else {
shard.slot_end = shard.slot_start;
}
res.push_back(shard);
}
return res;
}
int main(int argc, char* argv[]) {
MainInitGuard guard(&argc, &argv);
@ -763,14 +917,21 @@ int main(int argc, char* argv[]) {
auto address = ::boost::asio::ip::make_address(ip_addr);
tcp::endpoint ep{address, GetFlag(FLAGS_p)};
LOG(INFO) << "Connecting threads";
ClusterSpec shards;
if (protocol == RESP) {
shards = proactor->Await([&] { return FetchCluster(ep, proactor); });
}
LOG(INFO) << "Connecting threads to "
<< (shards.empty() ? string("single node ")
: absl::StrCat(shards.size(), " shard cluster"));
pp->AwaitFiberOnAll([&](unsigned index, auto* p) {
base::SplitMix64 seed_mix(GetFlag(FLAGS_seed) + index * 0x6a45554a264d72bULL);
auto seed = seed_mix();
VLOG(1) << "Seeding bitgen with seed " << seed;
bit_gen.seed(seed);
client = make_unique<TLocalClient>(p);
client->Connect(ep);
client->Connect(ep, shards);
});
const uint32_t key_minimum = GetFlag(FLAGS_key_minimum);