mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
fix: make sure dfly_bench reliably connects (#3802)
1. Issue ping upon connect, add a comment why. 2. log error if dfly_bench disconnects before all the requests were processed. 3. Refactor memcache parsing code into ParseMC function. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
3945b7e4fa
commit
6a13329523
2 changed files with 71 additions and 47 deletions
|
@ -205,7 +205,8 @@ class Driver {
|
|||
private:
|
||||
void PopRequest();
|
||||
void ReceiveFb();
|
||||
void ParseRESP(facade::RedisParser* parser, io::IoBuf* io_buf);
|
||||
void ParseRESP();
|
||||
void ParseMC();
|
||||
|
||||
struct Req {
|
||||
uint64_t start;
|
||||
|
@ -219,6 +220,9 @@ class Driver {
|
|||
fb2::Fiber receive_fb_;
|
||||
queue<Req> reqs_;
|
||||
fb2::CondVarAny cnd_;
|
||||
|
||||
facade::RedisParser parser_{1 << 16, false};
|
||||
io::IoBuf io_buf_{512};
|
||||
};
|
||||
|
||||
// Per thread client.
|
||||
|
@ -333,6 +337,19 @@ void Driver::Connect(unsigned index, const tcp::endpoint& ep) {
|
|||
int yes = 1;
|
||||
CHECK_EQ(0, setsockopt(socket_->native_handle(), IPPROTO_TCP, TCP_NODELAY, &yes, sizeof(yes)));
|
||||
}
|
||||
|
||||
// TCP Connect does not ensure that the connection was indeed accepted by the server.
|
||||
// if server backlog is too short the connection will get stuck in the accept queue.
|
||||
// Therefore, we send a ping command to ensure that every connection got connected.
|
||||
ec = socket_->Write(io::Buffer("ping\r\n"));
|
||||
CHECK(!ec);
|
||||
|
||||
uint8_t buf[128];
|
||||
auto res_sz = socket_->Recv(io::MutableBytes(buf));
|
||||
CHECK(res_sz) << res_sz.error().message();
|
||||
string_view resp = io::View(io::Bytes(buf, *res_sz));
|
||||
CHECK(absl::EndsWith(resp, "\r\n")) << resp;
|
||||
|
||||
receive_fb_ = MakeFiber(fb2::Launch::dispatch, [this] { ReceiveFb(); });
|
||||
}
|
||||
|
||||
|
@ -385,7 +402,7 @@ void Driver::Run(uint64_t* cycle_ns, CommandGenerator* cmd_gen) {
|
|||
CHECK(!ec) << ec.message();
|
||||
}
|
||||
|
||||
const int finish = absl::GetCurrentTimeNanos();
|
||||
int64_t finish = absl::GetCurrentTimeNanos();
|
||||
VLOG(1) << "Done queuing " << num_reqs_ << " requests, which took "
|
||||
<< StrFormat("%.1fs", double(finish - start) / 1000000000)
|
||||
<< ". Waiting for server processing";
|
||||
|
@ -426,71 +443,40 @@ void Driver::PopRequest() {
|
|||
}
|
||||
|
||||
void Driver::ReceiveFb() {
|
||||
facade::RedisParser parser{1 << 16, false};
|
||||
io::IoBuf io_buf{512};
|
||||
|
||||
unsigned blob_len = 0;
|
||||
|
||||
while (true) {
|
||||
io_buf.EnsureCapacity(256);
|
||||
auto buf = io_buf.AppendBuffer();
|
||||
io_buf_.EnsureCapacity(256);
|
||||
auto buf = io_buf_.AppendBuffer();
|
||||
VLOG(2) << "Socket read: " << reqs_.size();
|
||||
|
||||
::io::Result<size_t> recv_sz = socket_->Recv(buf);
|
||||
if (!recv_sz && FiberSocketBase::IsConnClosed(recv_sz.error())) {
|
||||
LOG_IF(DFATAL, !reqs_.empty())
|
||||
<< "Broke with " << reqs_.size() << " requests, received: " << received_;
|
||||
// clear reqs - to prevent Driver::Run block on them indefinitely.
|
||||
decltype(reqs_)().swap(reqs_);
|
||||
break;
|
||||
}
|
||||
|
||||
CHECK(recv_sz) << recv_sz.error().message();
|
||||
io_buf.CommitWrite(*recv_sz);
|
||||
io_buf_.CommitWrite(*recv_sz);
|
||||
|
||||
if (protocol == RESP) {
|
||||
ParseRESP(&parser, &io_buf);
|
||||
ParseRESP();
|
||||
} else {
|
||||
// MC_TEXT
|
||||
while (true) {
|
||||
string_view line = FindLine(io_buf.InputBuffer());
|
||||
if (line.empty())
|
||||
break;
|
||||
CHECK_EQ(line.back(), '\n');
|
||||
if (line == "STORED\r\n" || line == "END\r\n") {
|
||||
PopRequest();
|
||||
blob_len = 0;
|
||||
} else if (absl::StartsWith(line, "VALUE")) {
|
||||
// last token is a blob length.
|
||||
auto it = line.rbegin();
|
||||
while (it != line.rend() && *it != ' ')
|
||||
++it;
|
||||
size_t len = it - line.rbegin() - 2;
|
||||
const char* start = &(*it) + 1;
|
||||
if (!absl::SimpleAtoi(string(start, len), &blob_len)) {
|
||||
LOG(ERROR) << "Invalid blob len " << line;
|
||||
return;
|
||||
}
|
||||
++stats_.hit_count;
|
||||
} else if (absl::StartsWith(line, "SERVER_ERROR")) {
|
||||
++stats_.num_errors;
|
||||
PopRequest();
|
||||
blob_len = 0;
|
||||
} else {
|
||||
auto handle = socket_->native_handle();
|
||||
CHECK_EQ(blob_len + 2, line.size()) << line;
|
||||
blob_len = 0;
|
||||
VLOG(2) << "Got line " << handle << ": " << line;
|
||||
}
|
||||
io_buf.ConsumeInput(line.size());
|
||||
}
|
||||
ParseMC();
|
||||
}
|
||||
}
|
||||
VLOG(1) << "ReceiveFb done";
|
||||
}
|
||||
|
||||
void Driver::ParseRESP(facade::RedisParser* parser, io::IoBuf* io_buf) {
|
||||
void Driver::ParseRESP() {
|
||||
uint32_t consumed = 0;
|
||||
RedisParser::Result result = RedisParser::OK;
|
||||
RespVec parse_args;
|
||||
|
||||
do {
|
||||
result = parser->Parse(io_buf->InputBuffer(), &consumed, &parse_args);
|
||||
result = parser_.Parse(io_buf_.InputBuffer(), &consumed, &parse_args);
|
||||
if (result == RedisParser::OK && !parse_args.empty()) {
|
||||
if (parse_args[0].type == facade::RespExpr::ERROR) {
|
||||
++stats_.num_errors;
|
||||
|
@ -500,10 +486,48 @@ void Driver::ParseRESP(facade::RedisParser* parser, io::IoBuf* io_buf) {
|
|||
parse_args.clear();
|
||||
PopRequest();
|
||||
}
|
||||
io_buf->ConsumeInput(consumed);
|
||||
io_buf_.ConsumeInput(consumed);
|
||||
} while (result == RedisParser::OK);
|
||||
}
|
||||
|
||||
void Driver::ParseMC() {
|
||||
unsigned blob_len = 0;
|
||||
|
||||
while (true) {
|
||||
string_view line = FindLine(io_buf_.InputBuffer());
|
||||
if (line.empty())
|
||||
break;
|
||||
|
||||
CHECK_EQ(line.back(), '\n');
|
||||
if (line == "STORED\r\n" || line == "END\r\n") {
|
||||
PopRequest();
|
||||
blob_len = 0;
|
||||
} else if (absl::StartsWith(line, "VALUE")) {
|
||||
// last token is a blob length.
|
||||
auto it = line.rbegin();
|
||||
while (it != line.rend() && *it != ' ')
|
||||
++it;
|
||||
size_t len = it - line.rbegin() - 2;
|
||||
const char* start = &(*it) + 1;
|
||||
if (!absl::SimpleAtoi(string(start, len), &blob_len)) {
|
||||
LOG(ERROR) << "Invalid blob len " << line;
|
||||
return;
|
||||
}
|
||||
++stats_.hit_count;
|
||||
} else if (absl::StartsWith(line, "SERVER_ERROR")) {
|
||||
++stats_.num_errors;
|
||||
PopRequest();
|
||||
blob_len = 0;
|
||||
} else {
|
||||
auto handle = socket_->native_handle();
|
||||
CHECK_EQ(blob_len + 2, line.size()) << line;
|
||||
blob_len = 0;
|
||||
VLOG(2) << "Got line " << handle << ": " << line;
|
||||
}
|
||||
io_buf_.ConsumeInput(line.size());
|
||||
}
|
||||
}
|
||||
|
||||
void TLocalClient::Connect(tcp::endpoint ep) {
|
||||
VLOG(2) << "Connecting client...";
|
||||
vector<fb2::Fiber> fbs(drivers_.size());
|
||||
|
|
|
@ -77,7 +77,7 @@ ABSL_FLAG(bool, force_epoll, false,
|
|||
ABSL_FLAG(bool, version_check, true,
|
||||
"If true, Will monitor for new releases on Dragonfly servers once a day.");
|
||||
|
||||
ABSL_FLAG(uint16_t, tcp_backlog, 128, "TCP listen(2) backlog parameter.");
|
||||
ABSL_FLAG(uint16_t, tcp_backlog, 256, "TCP listen(2) backlog parameter.");
|
||||
|
||||
using namespace util;
|
||||
using namespace facade;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue