From 872d5e2d7d9b4289390a4be4dcda2f1a67af2c4d Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Mon, 25 Nov 2024 09:15:29 +0200 Subject: [PATCH] chore: more parser improvements (#4177) The long-term goal is to make the parser to consume the whole input when it returns INPUT_PENDING. It requires several baby step PRs. This PR: 1. Adds more invariant checks 2. Avoids calling RedisParser::Parse with an empty buffer. 3. In bulk string parsing - remove redundant "optimization" of rejecting partial strings of less than 32 bytes, in other words consume small parts as well. The unit test adjusted accordingly. Signed-off-by: Roman Gershman --- src/facade/dragonfly_connection.cc | 2 +- src/facade/redis_parser.cc | 64 +++++++++++++++++++----------- src/facade/redis_parser_test.cc | 4 +- 3 files changed, 43 insertions(+), 27 deletions(-) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 560a0e564..e43c78f8c 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -1122,7 +1122,7 @@ Connection::ParserStatus Connection::ParseRedis() { DispatchSingle(has_more, dispatch_sync, dispatch_async); } io_buf_.ConsumeInput(consumed); - } while (RedisParser::OK == result && !reply_builder_->GetError()); + } while (RedisParser::OK == result && io_buf_.InputLen() > 0 && !reply_builder_->GetError()); parser_error_ = result; if (result == RedisParser::OK) diff --git a/src/facade/redis_parser.cc b/src/facade/redis_parser.cc index 6cbd85607..f7ebd2db4 100644 --- a/src/facade/redis_parser.cc +++ b/src/facade/redis_parser.cc @@ -13,6 +13,7 @@ namespace facade { using namespace std; auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> Result { + DCHECK(!str.empty()); *consumed = 0; res->clear(); @@ -212,8 +213,10 @@ auto RedisParser::ParseInline(Buffer str) -> ResultConsumed { auto RedisParser::ParseLen(Buffer str, int64_t* res) -> ResultConsumed { DCHECK(!str.empty()); - char* s = reinterpret_cast(str.data() + 1); - char* pos = reinterpret_cast(memchr(s, '\n', str.size() - 1)); + DCHECK(str[0] == '$' || str[0] == '*' || str[0] == '%' || str[0] == '~'); + + const char* s = reinterpret_cast(str.data()); + const char* pos = reinterpret_cast(memchr(s, '\n', str.size())); if (!pos) { Result r = str.size() < 32 ? INPUT_PENDING : BAD_ARRAYLEN; return {r, 0}; @@ -223,8 +226,11 @@ auto RedisParser::ParseLen(Buffer str, int64_t* res) -> ResultConsumed { return {BAD_ARRAYLEN, 0}; } - bool success = absl::SimpleAtoi(std::string_view{s, size_t(pos - s - 1)}, res); - return ResultConsumed{success ? OK : BAD_ARRAYLEN, (pos - s) + 2}; + // Skip the first character and 2 last ones (\r\n). + bool success = absl::SimpleAtoi(std::string_view{s + 1, size_t(pos - 1 - s)}, res); + unsigned consumed = pos - s + 1; + + return ResultConsumed{success ? OK : BAD_ARRAYLEN, consumed}; } auto RedisParser::ConsumeArrayLen(Buffer str) -> ResultConsumed { @@ -288,7 +294,13 @@ auto RedisParser::ConsumeArrayLen(Buffer str) -> ResultConsumed { auto RedisParser::ParseArg(Buffer str) -> ResultConsumed { DCHECK(!str.empty()); + char c = str[0]; + unsigned min_len = 3 + int(c != '_'); + + if (str.size() < min_len) { + return {INPUT_PENDING, 0}; + } if (c == '$') { int64_t len; @@ -321,12 +333,18 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed { } if (c == '_') { // Resp3 NIL - // TODO: Do we need to validate that str[1:2] == "\r\n"? + // '_','\r','\n' + DCHECK_GE(str.size(), 3u); + + unsigned consumed = 3; + if (str[1] != '\r' || str[2] != '\n') { + return {BAD_STRING, 0}; + } cached_expr_->emplace_back(RespExpr::NIL); cached_expr_->back().u = Buffer{}; HandleFinishArg(); - return {OK, 3}; // // '_','\r','\n' + return {OK, consumed}; } if (c == '*') { @@ -415,26 +433,24 @@ auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed { return {INPUT_PENDING, consumed}; } - if (str.size() >= 32) { - DCHECK(bulk_len_); - size_t len = std::min(str.size(), bulk_len_); + DCHECK(bulk_len_); + size_t len = std::min(str.size(), bulk_len_); - if (is_broken_token_) { - memcpy(bulk_str.end(), str.data(), len); - bulk_str = Buffer{bulk_str.data(), bulk_str.size() + len}; - DVLOG(1) << "Extending bulk stash to size " << bulk_str.size(); - } else { - DVLOG(1) << "New bulk stash size " << bulk_len_; - vector nb(bulk_len_); - memcpy(nb.data(), str.data(), len); - bulk_str = Buffer{nb.data(), len}; - buf_stash_.emplace_back(std::move(nb)); - is_broken_token_ = true; - cached_expr_->back().has_support = true; - } - consumed = len; - bulk_len_ -= len; + if (is_broken_token_) { + memcpy(bulk_str.end(), str.data(), len); + bulk_str = Buffer{bulk_str.data(), bulk_str.size() + len}; + DVLOG(1) << "Extending bulk stash to size " << bulk_str.size(); + } else { + DVLOG(1) << "New bulk stash size " << bulk_len_; + vector nb(bulk_len_); + memcpy(nb.data(), str.data(), len); + bulk_str = Buffer{nb.data(), len}; + buf_stash_.emplace_back(std::move(nb)); + is_broken_token_ = true; + cached_expr_->back().has_support = true; } + consumed = len; + bulk_len_ -= len; return {INPUT_PENDING, consumed}; } diff --git a/src/facade/redis_parser_test.cc b/src/facade/redis_parser_test.cc index 37d628694..29237ca06 100644 --- a/src/facade/redis_parser_test.cc +++ b/src/facade/redis_parser_test.cc @@ -125,9 +125,9 @@ TEST_F(RedisParserTest, Multi2) { TEST_F(RedisParserTest, Multi3) { const char kFirst[] = "*3\r\n$3\r\nSET\r\n$16\r\nkey:"; - const char kSecond[] = "key:000002273458\r\n$3\r\nVXK"; + const char kSecond[] = "000002273458\r\n$3\r\nVXK"; ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(kFirst)); - ASSERT_EQ(strlen(kFirst) - 4, consumed_); + ASSERT_EQ(strlen(kFirst), consumed_); ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(kSecond)); ASSERT_EQ(strlen(kSecond), consumed_); ASSERT_EQ(RedisParser::OK, Parse("\r\n*3\r\n$3\r\nSET"));