chore: minor clean ups before introducing ProvidedBuffers (#4709)

Making RedisParser::Buffer const, some minor changes in dragonfly_connection code.

No functionality is changed.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2025-03-06 09:51:39 +02:00 committed by GitHub
parent 9957e0412b
commit a39d777b82
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 41 additions and 35 deletions

View file

@ -1,16 +1,16 @@
{ {
"configurations": [ "configurations": [
{ {
"name": "Linux", "name": "Linux",
"includePath": [ "includePath": [
"${default}" "${default}"
], ],
"cStandard": "c17", "cStandard": "c17",
"cppStandard": "c++17", "cppStandard": "c++17",
"intelliSenseMode": "${default}", "intelliSenseMode": "${default}",
"compileCommands": "${workspaceFolder}/build-dbg/compile_commands.json", "compileCommands": "${workspaceFolder}/build-dbg/compile_commands.json",
"configurationProvider": "ms-vscode.cmake-tools" "configurationProvider": "ms-vscode.cmake-tools"
} }
], ],
"version": 4 "version": 4
} }

View file

@ -1150,8 +1150,11 @@ Connection::ParserStatus Connection::ParseRedis() {
return {FromArgs(std::move(parse_args), tlh)}; return {FromArgs(std::move(parse_args), tlh)};
}; };
RedisParser::Buffer input_buf = io_buf_.InputBuffer();
size_t available_to_parse = io_buf_.InputLen();
do { do {
result = redis_parser_->Parse(io_buf_.InputBuffer(), &consumed, &parse_args); result = redis_parser_->Parse(input_buf, &consumed, &parse_args);
request_consumed_bytes_ += consumed; request_consumed_bytes_ += consumed;
if (result == RedisParser::OK && !parse_args.empty()) { if (result == RedisParser::OK && !parse_args.empty()) {
if (RespExpr& first = parse_args.front(); first.type == RespExpr::STRING) if (RespExpr& first = parse_args.front(); first.type == RespExpr::STRING)
@ -1160,7 +1163,7 @@ Connection::ParserStatus Connection::ParseRedis() {
if (io_req_size_hist) if (io_req_size_hist)
io_req_size_hist->Add(request_consumed_bytes_); io_req_size_hist->Add(request_consumed_bytes_);
request_consumed_bytes_ = 0; request_consumed_bytes_ = 0;
bool has_more = consumed < io_buf_.InputLen(); bool has_more = consumed < available_to_parse;
if (tl_traffic_logger.log_file && IsMain() /* log only on the main interface */) { if (tl_traffic_logger.log_file && IsMain() /* log only on the main interface */) {
LogTraffic(id_, has_more, absl::MakeSpan(parse_args), service_->GetContextInfo(cc_.get())); LogTraffic(id_, has_more, absl::MakeSpan(parse_args), service_->GetContextInfo(cc_.get()));
@ -1168,9 +1171,11 @@ Connection::ParserStatus Connection::ParseRedis() {
DispatchSingle(has_more, dispatch_sync, dispatch_async); DispatchSingle(has_more, dispatch_sync, dispatch_async);
} }
io_buf_.ConsumeInput(consumed); available_to_parse -= consumed;
} while (RedisParser::OK == result && io_buf_.InputLen() > 0 && !reply_builder_->GetError()); input_buf.remove_prefix(consumed);
} while (RedisParser::OK == result && available_to_parse > 0 && !reply_builder_->GetError());
io_buf_.ConsumeInput(io_buf_.InputLen());
parser_error_ = result; parser_error_ = result;
if (result == RedisParser::OK) if (result == RedisParser::OK)
return OK; return OK;
@ -1178,6 +1183,8 @@ Connection::ParserStatus Connection::ParseRedis() {
if (result == RedisParser::INPUT_PENDING) if (result == RedisParser::INPUT_PENDING)
return NEED_MORE; return NEED_MORE;
VLOG(1) << "Parser error " << result;
return ERROR; return ERROR;
} }
@ -1305,12 +1312,11 @@ void Connection::HandleMigrateRequest() {
} }
error_code Connection::HandleRecvSocket() { error_code Connection::HandleRecvSocket() {
phase_ = READ_SOCKET;
io::MutableBytes append_buf = io_buf_.AppendBuffer(); io::MutableBytes append_buf = io_buf_.AppendBuffer();
DCHECK(!append_buf.empty()); DCHECK(!append_buf.empty());
phase_ = READ_SOCKET;
error_code ec;
::io::Result<size_t> recv_sz = socket_->Recv(append_buf); ::io::Result<size_t> recv_sz = socket_->Recv(append_buf);
last_interaction_ = time(nullptr); last_interaction_ = time(nullptr);
@ -1324,7 +1330,7 @@ error_code Connection::HandleRecvSocket() {
stats_->io_read_bytes += commit_sz; stats_->io_read_bytes += commit_sz;
++stats_->io_read_cnt; ++stats_->io_read_cnt;
return ec; return {};
} }
auto Connection::IoLoop() -> variant<error_code, ParserStatus> { auto Connection::IoLoop() -> variant<error_code, ParserStatus> {

View file

@ -34,7 +34,7 @@ bool RespMatcher::MatchAndExplain(RespExpr e, MatchResultListener* listener) con
if (type_ == RespExpr::STRING || type_ == RespExpr::ERROR) { if (type_ == RespExpr::STRING || type_ == RespExpr::ERROR) {
RespExpr::Buffer ebuf = e.GetBuf(); RespExpr::Buffer ebuf = e.GetBuf();
std::string_view actual{reinterpret_cast<char*>(ebuf.data()), ebuf.size()}; std::string_view actual{reinterpret_cast<const char*>(ebuf.data()), ebuf.size()};
if (type_ == RespExpr::ERROR && !absl::StrContains(actual, exp_str_)) { if (type_ == RespExpr::ERROR && !absl::StrContains(actual, exp_str_)) {
*listener << "Actual does not contain '" << exp_str_ << "'"; *listener << "Actual does not contain '" << exp_str_ << "'";

View file

@ -185,11 +185,11 @@ void RedisParser::StashState(RespExpr::Vec* res) {
auto RedisParser::ParseInline(Buffer str) -> ResultConsumed { auto RedisParser::ParseInline(Buffer str) -> ResultConsumed {
DCHECK(!str.empty()); DCHECK(!str.empty());
uint8_t* ptr = str.begin(); const uint8_t* ptr = str.begin();
uint8_t* end = str.end(); const uint8_t* end = str.end();
uint8_t* token_start = ptr; const uint8_t* token_start = ptr;
auto find_token_end = [](uint8_t* ptr, uint8_t* end) { auto find_token_end = [](const uint8_t* ptr, const uint8_t* end) {
while (ptr != end && *ptr > 32) while (ptr != end && *ptr > 32)
++ptr; ++ptr;
return ptr; return ptr;
@ -411,8 +411,8 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
return ConsumeArrayLen(str); return ConsumeArrayLen(str);
} }
char* s = reinterpret_cast<char*>(str.data()); const char* s = reinterpret_cast<const char*>(str.data());
char* eol = reinterpret_cast<char*>(memchr(s, '\n', str.size())); const char* eol = reinterpret_cast<const char*>(memchr(s, '\n', str.size()));
// TODO: in client mode we still may not consume everything (see INPUT_PENDING below). // TODO: in client mode we still may not consume everything (see INPUT_PENDING below).
// It's not a problem, because we need consume all the input only in server mode. // It's not a problem, because we need consume all the input only in server mode.
@ -428,7 +428,7 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
return {BAD_STRING, 0}; return {BAD_STRING, 0};
cached_expr_->emplace_back(arg_c_ == '+' ? RespExpr::STRING : RespExpr::ERROR); cached_expr_->emplace_back(arg_c_ == '+' ? RespExpr::STRING : RespExpr::ERROR);
cached_expr_->back().u = Buffer{reinterpret_cast<uint8_t*>(s), size_t((eol - 1) - s)}; cached_expr_->back().u = Buffer{reinterpret_cast<const uint8_t*>(s), size_t((eol - 1) - s)};
} else if (arg_c_ == ':') { } else if (arg_c_ == ':') {
DCHECK(!server_mode_); DCHECK(!server_mode_);
if (!eol) { if (!eol) {
@ -477,7 +477,7 @@ auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed {
// is_broken_token_ can be false, if we just parsed the bulk length but have // is_broken_token_ can be false, if we just parsed the bulk length but have
// not parsed the token itself. // not parsed the token itself.
if (is_broken_token_) { if (is_broken_token_) {
memcpy(bulk_str.end(), str.data(), bulk_len_); memcpy(const_cast<uint8_t*>(bulk_str.end()), str.data(), bulk_len_);
bulk_str = Buffer{bulk_str.data(), bulk_str.size() + bulk_len_}; bulk_str = Buffer{bulk_str.data(), bulk_str.size() + bulk_len_};
} else { } else {
bulk_str = str.subspan(0, bulk_len_); bulk_str = str.subspan(0, bulk_len_);
@ -506,7 +506,7 @@ auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed {
size_t len = std::min<size_t>(str.size(), bulk_len_); size_t len = std::min<size_t>(str.size(), bulk_len_);
if (is_broken_token_) { if (is_broken_token_) {
memcpy(bulk_str.end(), str.data(), len); memcpy(const_cast<uint8_t*>(bulk_str.end()), str.data(), len);
bulk_str = Buffer{bulk_str.data(), bulk_str.size() + len}; bulk_str = Buffer{bulk_str.data(), bulk_str.size() + len};
DVLOG(1) << "Extending bulk stash to size " << bulk_str.size(); DVLOG(1) << "Extending bulk stash to size " << bulk_str.size();
} else { } else {

View file

@ -18,7 +18,7 @@ namespace facade {
class RespExpr { class RespExpr {
public: public:
using Buffer = absl::Span<uint8_t>; using Buffer = absl::Span<const uint8_t>;
enum Type : uint8_t { STRING, ARRAY, INT64, DOUBLE, NIL, NIL_ARRAY, ERROR }; enum Type : uint8_t { STRING, ARRAY, INT64, DOUBLE, NIL, NIL_ARRAY, ERROR };
@ -70,7 +70,7 @@ using RespVec = RespExpr::Vec;
using RespSpan = absl::Span<const RespExpr>; using RespSpan = absl::Span<const RespExpr>;
inline std::string_view ToSV(RespExpr::Buffer buf) { inline std::string_view ToSV(RespExpr::Buffer buf) {
return std::string_view{reinterpret_cast<char*>(buf.data()), buf.size()}; return std::string_view{reinterpret_cast<const char*>(buf.data()), buf.size()};
} }
} // namespace facade } // namespace facade