fix(dfly_bench): support dns resolution for cluster hosts (#4715)

fix(dfly_bench): support dns resolution for cluster hosts and multiple slot ranges.

Initial parsing of MOVED response is done but slot migration is not supported yet.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2025-03-06 13:46:58 +02:00 committed by GitHub
parent 5ffe939b3d
commit 94d9cf79ef
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -104,9 +104,10 @@ static string GetRandomHex(size_t len) {
return res;
}
using SlotRange = pair<uint16_t, uint16_t>;
struct ShardInfo {
uint16_t slot_start = 0;
uint16_t slot_end = 0;
vector<SlotRange> slots; // list of [start, end] pairs. inclusive.
tcp::endpoint endpoint;
};
@ -136,7 +137,7 @@ class CommandGenerator {
public:
CommandGenerator(KeyGenerator* keygen);
string Next(uint16_t slot_min, uint16_t slot_max);
string Next(SlotRange range);
bool might_hit() const {
return might_hit_;
@ -180,12 +181,12 @@ CommandGenerator::CommandGenerator(KeyGenerator* keygen) : keygen_(keygen) {
}
}
string CommandGenerator::Next(uint16_t slot_min, uint16_t slot_max) {
string CommandGenerator::Next(SlotRange range) {
cmd_.clear();
noreply_ = false;
uint16_t slot_id = 0;
if (keygen_->IsClusterEnabled()) {
slot_id = absl::Uniform(absl::IntervalClosedClosed, bit_gen, slot_min, slot_max);
slot_id = absl::Uniform(absl::IntervalClosedClosed, bit_gen, range.first, range.second);
}
if (command_.empty()) {
string key = (*keygen_)(slot_id);
@ -282,8 +283,7 @@ class Driver {
Driver(Driver&&) = delete;
Driver& operator=(Driver&&) = delete;
void Connect(unsigned index, const tcp::endpoint& ep,
optional<pair<uint16_t, uint16_t>> slot_range);
void Connect(unsigned index, const tcp::endpoint& ep, const vector<SlotRange>& slots);
void Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen);
float done() const {
@ -312,7 +312,7 @@ class Driver {
ClientStats& stats_;
unique_ptr<FiberSocketBase> socket_;
optional<pair<uint16_t, uint16_t>> slot_range_;
vector<SlotRange> slots_;
fb2::Fiber receive_fb_;
queue<Req> reqs_;
fb2::CondVarAny cnd_;
@ -443,8 +443,7 @@ void KeyGenerator::EnableClusterMode() {
}
}
void Driver::Connect(unsigned index, const tcp::endpoint& ep,
optional<pair<uint16_t, uint16_t>> slot_range) {
void Driver::Connect(unsigned index, const tcp::endpoint& ep, const vector<SlotRange>& slots) {
VLOG(2) << "Connecting " << index << " to " << ep;
error_code ec = socket_->Connect(ep);
CHECK(!ec) << "Could not connect to " << ep << " " << ec;
@ -466,7 +465,7 @@ void Driver::Connect(unsigned index, const tcp::endpoint& ep,
string_view resp = io::View(io::Bytes(buf, *res_sz));
CHECK(absl::EndsWith(resp, "\r\n")) << resp;
}
slot_range_ = slot_range;
slots_ = slots;
receive_fb_ = MakeFiber(fb2::Launch::dispatch, [this] { ReceiveFb(); });
}
@ -477,12 +476,9 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {
stats_.num_clients++;
int64_t time_limit_ns =
time_limit_ > 0 ? int64_t(time_limit_) * 1'000'000'000 + start_ns_ : INT64_MAX;
uint16_t slot_min = 0;
uint16_t slot_max = kNumSlots - 1;
if (slot_range_) {
slot_min = slot_range_->first;
slot_max = slot_range_->second;
}
SlotRange slot_range{0, kNumSlots - 1};
for (unsigned i = 0; i < num_reqs_; ++i) {
int64_t now = absl::GetCurrentTimeNanos();
@ -512,7 +508,16 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {
fb2::NoOpLock lk;
cnd_.wait(lk, [this, pipeline] { return reqs_.size() < pipeline; });
}
string cmd = cmd_gen->Next(slot_min, slot_max);
// TODO: this skews the distribution if slot ranges are uneven.
// Ideally we would like to pick randomly a single slot from all the ranges we have
// and pass it to cmd_gen->Next below.
if (!slots_.empty()) {
unsigned index = i % slots_.size();
slot_range = slots_[index];
}
string cmd = cmd_gen->Next(slot_range);
Req req;
req.start = absl::GetCurrentTimeNanos();
@ -576,7 +581,7 @@ void Driver::ReceiveFb() {
while (true) {
io_buf_.EnsureCapacity(256);
auto buf = io_buf_.AppendBuffer();
VLOG(2) << "Socket read: " << reqs_.size();
VLOG(3) << "Socket read: " << reqs_.size();
::io::Result<size_t> recv_sz = socket_->Recv(buf);
if (!recv_sz && FiberSocketBase::IsConnClosed(recv_sz.error())) {
@ -609,7 +614,21 @@ void Driver::ParseRESP() {
result = parser_.Parse(io_buf_.InputBuffer(), &consumed, &parse_args);
if (result == RedisParser::OK && !parse_args.empty()) {
if (parse_args[0].type == RespExpr::ERROR) {
VLOG(2) << "Error " << io::View(io_buf_.InputBuffer());
string_view error = io::View(io_buf_.InputBuffer());
VLOG(2) << "Error " << error;
if (absl::StartsWith(error, "-MOVED ")) {
error = error.substr(7);
vector<string_view> parts =
absl::StrSplit(absl::StripTrailingAsciiWhitespace(error), ' ', absl::SkipEmpty());
CHECK_EQ(parts.size(), 2u);
uint32_t slot_id;
CHECK(absl::SimpleAtoi(parts[0], &slot_id));
vector<string_view> host_port = absl::StrSplit(parts[1], ':');
CHECK_EQ(host_port.size(), 2u);
// TODO: to support slot migration.
LOG_EVERY_T(INFO, 1) << "Moved: " << slot_id << " to " << parts[1];
}
++stats_.num_errors;
} else if (reqs_.front().might_hit && parse_args[0].type != RespExpr::NIL) {
++stats_.hit_count;
@ -676,16 +695,16 @@ void TLocalClient::Connect(tcp::endpoint ep, const ClusterSpec& cluster) {
vector<fb2::Fiber> fbs(drivers_.size());
for (size_t i = 0; i < fbs.size(); ++i) {
optional<pair<uint16_t, uint16_t>> slot_range;
vector<SlotRange> slots;
tcp::endpoint shard_ep = ep;
if (!cluster.empty()) {
size_t shard = i / conn_per_shard;
slot_range = {cluster[shard].slot_start, cluster[shard].slot_end};
slots = cluster[shard].slots;
shard_ep = cluster[shard].endpoint;
}
fbs[i] = MakeFiber([&, shard_ep, i, slot_range] {
fbs[i] = MakeFiber([&, shard_ep, i, slots = move(slots)] {
ThisFiber::SetName(StrCat("connect/", i));
drivers_[i]->Connect(i, shard_ep, slot_range);
drivers_[i]->Connect(i, shard_ep, slots);
});
}
@ -843,7 +862,11 @@ ClusterSpec FetchCluster(const tcp::endpoint& ep, ProactorBase* proactor) {
ShardInfo shard;
vector<string_view> addr_parts = absl::StrSplit(parts[1], ':');
CHECK_EQ(2u, addr_parts.size());
auto address = ::boost::asio::ip::make_address(addr_parts[0]);
string host(addr_parts[0]);
char ip_addr[INET6_ADDRSTRLEN];
std::error_code ec = fb2::DnsResolve(host, ip_addr);
CHECK(!ec) << "Could not resolve " << host << " " << ec;
auto address = ::boost::asio::ip::make_address(ip_addr);
uint32_t val;
vector<string_view> port_parts = absl::StrSplit(addr_parts[1], '@');
@ -860,17 +883,18 @@ ClusterSpec FetchCluster(const tcp::endpoint& ep, ProactorBase* proactor) {
continue;
}
vector<string_view> slots = absl::StrSplit(parts[8], '-');
if (!absl::SimpleAtoi(slots[0], &val) || val >= kNumSlots) {
LOG(ERROR) << "Invalid slot definition " << parts[8];
continue;
}
shard.slot_start = val;
if (slots.size() > 1) {
CHECK(absl::SimpleAtoi(slots[1], &val));
shard.slot_end = val;
} else {
shard.slot_end = shard.slot_start;
for (size_t i = 8; i < parts.size(); ++i) {
vector<string_view> slots = absl::StrSplit(parts[i], '-');
if (!absl::SimpleAtoi(slots[0], &val) || val >= kNumSlots) {
LOG(ERROR) << "Invalid slot definition " << parts[i];
continue;
}
SlotRange slot_range{uint16_t(val), uint16_t(val)};
if (slots.size() > 1) {
CHECK(absl::SimpleAtoi(slots[1], &val));
slot_range.second = val;
}
shard.slots.push_back(slot_range);
}
res.push_back(shard);
}
@ -884,6 +908,7 @@ int main(int argc, char* argv[]) {
unique_ptr<ProactorPool> pp;
pp.reset(fb2::Pool::IOUring(256));
pp->Run();
fb2::InitDnsResolver(2000);
string proto_str = GetFlag(FLAGS_P);
if (proto_str == "memcache_text") {