mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
chore: Introduce small buffer in redis parser (#4076)
* chore: Introduce small buffer in redis parser This is needed in order to eliminate cases where we return INPUT_PENDING but do not consume the whole string by rejecting just several bytes. This should simplify buffer management for the caller, so that if they pass a string that did not result in complete parsed request, at least the whole string is consumed and can be discarded. Signed-off-by: Roman Gershman <roman@dragonflydb.io> * chore: comments fixes Signed-off-by: Roman Gershman <roman@dragonflydb.io> --------- Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
6d03afaa76
commit
a4d243b96f
3 changed files with 87 additions and 27 deletions
|
@ -3,6 +3,7 @@
|
||||||
//
|
//
|
||||||
#include "facade/redis_parser.h"
|
#include "facade/redis_parser.h"
|
||||||
|
|
||||||
|
#include <absl/strings/escaping.h>
|
||||||
#include <absl/strings/numbers.h>
|
#include <absl/strings/numbers.h>
|
||||||
|
|
||||||
#include "base/logging.h"
|
#include "base/logging.h"
|
||||||
|
@ -18,6 +19,9 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
|
||||||
*consumed = 0;
|
*consumed = 0;
|
||||||
res->clear();
|
res->clear();
|
||||||
|
|
||||||
|
DVLOG(2) << "Parsing: "
|
||||||
|
<< absl::CHexEscape(string_view{reinterpret_cast<const char*>(str.data()), str.size()});
|
||||||
|
|
||||||
if (state_ == CMD_COMPLETE_S) {
|
if (state_ == CMD_COMPLETE_S) {
|
||||||
if (InitStart(str[0], res)) {
|
if (InitStart(str[0], res)) {
|
||||||
// We recognized a non-INLINE state, starting with a special char.
|
// We recognized a non-INLINE state, starting with a special char.
|
||||||
|
@ -62,6 +66,18 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
|
||||||
case BULK_STR_S:
|
case BULK_STR_S:
|
||||||
resultc = ConsumeBulk(str);
|
resultc = ConsumeBulk(str);
|
||||||
break;
|
break;
|
||||||
|
case SLASH_N_S:
|
||||||
|
if (str[0] != '\n') {
|
||||||
|
resultc.first = BAD_STRING;
|
||||||
|
} else {
|
||||||
|
resultc = {OK, 1};
|
||||||
|
if (arg_c_ == '_') {
|
||||||
|
cached_expr_->emplace_back(RespExpr::NIL);
|
||||||
|
cached_expr_->back().u = Buffer{};
|
||||||
|
}
|
||||||
|
HandleFinishArg();
|
||||||
|
}
|
||||||
|
break;
|
||||||
default:
|
default:
|
||||||
LOG(FATAL) << "Unexpected state " << int(state_);
|
LOG(FATAL) << "Unexpected state " << int(state_);
|
||||||
}
|
}
|
||||||
|
@ -76,6 +92,7 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
|
||||||
}
|
}
|
||||||
|
|
||||||
if (resultc.first == INPUT_PENDING) {
|
if (resultc.first == INPUT_PENDING) {
|
||||||
|
DCHECK(str.empty());
|
||||||
StashState(res);
|
StashState(res);
|
||||||
}
|
}
|
||||||
return resultc.first;
|
return resultc.first;
|
||||||
|
@ -83,6 +100,8 @@ auto RedisParser::Parse(Buffer str, uint32_t* consumed, RespExpr::Vec* res) -> R
|
||||||
|
|
||||||
if (resultc.first == OK) {
|
if (resultc.first == OK) {
|
||||||
DCHECK(cached_expr_);
|
DCHECK(cached_expr_);
|
||||||
|
DCHECK_EQ(0, small_len_);
|
||||||
|
|
||||||
if (res != cached_expr_) {
|
if (res != cached_expr_) {
|
||||||
DCHECK(!stash_.empty());
|
DCHECK(!stash_.empty());
|
||||||
|
|
||||||
|
@ -233,15 +252,27 @@ auto RedisParser::ParseLen(Buffer str, int64_t* res) -> ResultConsumed {
|
||||||
const char* s = reinterpret_cast<const char*>(str.data());
|
const char* s = reinterpret_cast<const char*>(str.data());
|
||||||
const char* pos = reinterpret_cast<const char*>(memchr(s, '\n', str.size()));
|
const char* pos = reinterpret_cast<const char*>(memchr(s, '\n', str.size()));
|
||||||
if (!pos) {
|
if (!pos) {
|
||||||
Result r = INPUT_PENDING;
|
if (str.size() + small_len_ < small_buf_.size()) {
|
||||||
if (str.size() >= 32) {
|
memcpy(&small_buf_[small_len_], str.data(), str.size());
|
||||||
LOG(WARNING) << "Unexpected format " << string_view{s, str.size()};
|
small_len_ += str.size();
|
||||||
r = BAD_ARRAYLEN;
|
return {INPUT_PENDING, str.size()};
|
||||||
}
|
}
|
||||||
return {r, 0};
|
LOG(WARNING) << "Unexpected format " << string_view{s, str.size()};
|
||||||
|
return ResultConsumed{BAD_ARRAYLEN, 0};
|
||||||
}
|
}
|
||||||
|
|
||||||
unsigned consumed = pos - s + 1;
|
unsigned consumed = pos - s + 1;
|
||||||
|
if (small_len_ > 0) {
|
||||||
|
if (small_len_ + consumed >= small_buf_.size()) {
|
||||||
|
return ResultConsumed{BAD_ARRAYLEN, consumed};
|
||||||
|
}
|
||||||
|
memcpy(&small_buf_[small_len_], str.data(), consumed);
|
||||||
|
small_len_ += consumed;
|
||||||
|
s = small_buf_.data();
|
||||||
|
pos = s + small_len_ - 1;
|
||||||
|
small_len_ = 0;
|
||||||
|
}
|
||||||
|
|
||||||
if (pos[-1] != '\r') {
|
if (pos[-1] != '\r') {
|
||||||
return {BAD_ARRAYLEN, consumed};
|
return {BAD_ARRAYLEN, consumed};
|
||||||
}
|
}
|
||||||
|
@ -320,12 +351,6 @@ auto RedisParser::ConsumeArrayLen(Buffer str) -> ResultConsumed {
|
||||||
auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
|
auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
|
||||||
DCHECK(!str.empty());
|
DCHECK(!str.empty());
|
||||||
|
|
||||||
unsigned min_len = 2 + int(arg_c_ != '_');
|
|
||||||
|
|
||||||
if (str.size() < min_len) {
|
|
||||||
return {INPUT_PENDING, 0};
|
|
||||||
}
|
|
||||||
|
|
||||||
if (arg_c_ == '$') {
|
if (arg_c_ == '$') {
|
||||||
int64_t len;
|
int64_t len;
|
||||||
|
|
||||||
|
@ -352,12 +377,21 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
|
||||||
}
|
}
|
||||||
|
|
||||||
DCHECK(!server_mode_);
|
DCHECK(!server_mode_);
|
||||||
|
|
||||||
if (arg_c_ == '_') { // Resp3 NIL
|
if (arg_c_ == '_') { // Resp3 NIL
|
||||||
// '\r','\n'
|
// "_\r\n", with '_' consumed into arg_c_.
|
||||||
if (str[0] != '\r' || str[1] != '\n') {
|
DCHECK_LT(small_len_, 2u); // must be because we never fill here with more than 2 bytes.
|
||||||
|
DCHECK_GE(str.size(), 1u);
|
||||||
|
|
||||||
|
if (str[0] != '\r' || (str.size() > 1 && str[1] != '\n')) {
|
||||||
return {BAD_STRING, 0};
|
return {BAD_STRING, 0};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (str.size() == 1) {
|
||||||
|
state_ = SLASH_N_S;
|
||||||
|
return {INPUT_PENDING, 1};
|
||||||
|
}
|
||||||
|
|
||||||
cached_expr_->emplace_back(RespExpr::NIL);
|
cached_expr_->emplace_back(RespExpr::NIL);
|
||||||
cached_expr_->back().u = Buffer{};
|
cached_expr_->back().u = Buffer{};
|
||||||
HandleFinishArg();
|
HandleFinishArg();
|
||||||
|
@ -371,6 +405,9 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
|
||||||
char* s = reinterpret_cast<char*>(str.data());
|
char* s = reinterpret_cast<char*>(str.data());
|
||||||
char* eol = reinterpret_cast<char*>(memchr(s, '\n', str.size()));
|
char* eol = reinterpret_cast<char*>(memchr(s, '\n', str.size()));
|
||||||
|
|
||||||
|
// TODO: in client mode we still may not consume everything (see INPUT_PENDING below).
|
||||||
|
// It's not a problem, because we need consume all the input only in server mode.
|
||||||
|
|
||||||
if (arg_c_ == '+' || arg_c_ == '-') { // Simple string or error.
|
if (arg_c_ == '+' || arg_c_ == '-') { // Simple string or error.
|
||||||
DCHECK(!server_mode_);
|
DCHECK(!server_mode_);
|
||||||
if (!eol) {
|
if (!eol) {
|
||||||
|
@ -421,9 +458,9 @@ auto RedisParser::ParseArg(Buffer str) -> ResultConsumed {
|
||||||
}
|
}
|
||||||
|
|
||||||
auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed {
|
auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed {
|
||||||
auto& bulk_str = get<Buffer>(cached_expr_->back().u);
|
DCHECK_EQ(small_len_, 0);
|
||||||
|
|
||||||
uint32_t consumed = 0;
|
uint32_t consumed = 0;
|
||||||
|
auto& bulk_str = get<Buffer>(cached_expr_->back().u);
|
||||||
|
|
||||||
if (str.size() >= bulk_len_) {
|
if (str.size() >= bulk_len_) {
|
||||||
consumed = bulk_len_;
|
consumed = bulk_len_;
|
||||||
|
@ -446,6 +483,12 @@ auto RedisParser::ConsumeBulk(Buffer str) -> ResultConsumed {
|
||||||
}
|
}
|
||||||
HandleFinishArg();
|
HandleFinishArg();
|
||||||
return {OK, consumed + 2};
|
return {OK, consumed + 2};
|
||||||
|
} else if (str.size() == 1) {
|
||||||
|
if (str[0] != '\r') {
|
||||||
|
return {BAD_STRING, consumed};
|
||||||
|
}
|
||||||
|
state_ = SLASH_N_S;
|
||||||
|
consumed++;
|
||||||
}
|
}
|
||||||
return {INPUT_PENDING, consumed};
|
return {INPUT_PENDING, consumed};
|
||||||
}
|
}
|
||||||
|
@ -490,6 +533,7 @@ void RedisParser::HandleFinishArg() {
|
||||||
}
|
}
|
||||||
cached_expr_ = parse_stack_.back().second;
|
cached_expr_ = parse_stack_.back().second;
|
||||||
}
|
}
|
||||||
|
small_len_ = 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
void RedisParser::ExtendLastString(Buffer str) {
|
void RedisParser::ExtendLastString(Buffer str) {
|
||||||
|
|
|
@ -45,8 +45,6 @@ class RedisParser {
|
||||||
* part of str because parser caches the intermediate state internally according to 'consumed'
|
* part of str because parser caches the intermediate state internally according to 'consumed'
|
||||||
* result.
|
* result.
|
||||||
*
|
*
|
||||||
* Note: A parser does not always guarantee progress, i.e. if a small buffer was passed it may
|
|
||||||
* returns INPUT_PENDING with consumed == 0.
|
|
||||||
*
|
*
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
@ -93,13 +91,16 @@ class RedisParser {
|
||||||
PARSE_ARG_TYPE, // Parse [$:+-]
|
PARSE_ARG_TYPE, // Parse [$:+-]
|
||||||
PARSE_ARG_S, // Parse string\r\n
|
PARSE_ARG_S, // Parse string\r\n
|
||||||
BULK_STR_S,
|
BULK_STR_S,
|
||||||
|
SLASH_N_S,
|
||||||
CMD_COMPLETE_S,
|
CMD_COMPLETE_S,
|
||||||
};
|
};
|
||||||
|
|
||||||
State state_ = CMD_COMPLETE_S;
|
State state_ = CMD_COMPLETE_S;
|
||||||
bool is_broken_token_ = false; // true, if a token (inline or bulk) is broken during the parsing.
|
bool is_broken_token_ = false; // true, if a token (inline or bulk) is broken during the parsing.
|
||||||
bool server_mode_ = true;
|
bool server_mode_ = true;
|
||||||
|
uint8_t small_len_ = 0;
|
||||||
char arg_c_ = 0;
|
char arg_c_ = 0;
|
||||||
|
|
||||||
uint32_t bulk_len_ = 0;
|
uint32_t bulk_len_ = 0;
|
||||||
uint32_t last_stashed_level_ = 0, last_stashed_index_ = 0;
|
uint32_t last_stashed_level_ = 0, last_stashed_index_ = 0;
|
||||||
uint32_t max_arr_len_;
|
uint32_t max_arr_len_;
|
||||||
|
@ -114,6 +115,7 @@ class RedisParser {
|
||||||
|
|
||||||
using Blob = std::vector<uint8_t>;
|
using Blob = std::vector<uint8_t>;
|
||||||
std::vector<Blob> buf_stash_;
|
std::vector<Blob> buf_stash_;
|
||||||
|
std::array<char, 32> small_buf_;
|
||||||
};
|
};
|
||||||
|
|
||||||
} // namespace facade
|
} // namespace facade
|
||||||
|
|
|
@ -146,6 +146,19 @@ TEST_F(RedisParserTest, ClientMode) {
|
||||||
|
|
||||||
ASSERT_EQ(RedisParser::OK, Parse("-ERR foo bar\r\n"));
|
ASSERT_EQ(RedisParser::OK, Parse("-ERR foo bar\r\n"));
|
||||||
EXPECT_THAT(args_, ElementsAre(ErrArg("ERR foo")));
|
EXPECT_THAT(args_, ElementsAre(ErrArg("ERR foo")));
|
||||||
|
|
||||||
|
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("_"));
|
||||||
|
EXPECT_EQ(1, consumed_);
|
||||||
|
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\r"));
|
||||||
|
EXPECT_EQ(1, consumed_);
|
||||||
|
ASSERT_EQ(RedisParser::OK, Parse("\n"));
|
||||||
|
EXPECT_EQ(1, consumed_);
|
||||||
|
EXPECT_THAT(args_, ElementsAre(ArgType(RespExpr::NIL)));
|
||||||
|
ASSERT_EQ(RedisParser::OK, Parse("*2\r\n_\r\n_\r\n"));
|
||||||
|
ASSERT_EQ(10, consumed_);
|
||||||
|
|
||||||
|
ASSERT_EQ(RedisParser::OK, Parse("*3\r\n+OK\r\n$1\r\n1\r\n*2\r\n$1\r\n1\r\n$-1\r\n"));
|
||||||
|
ASSERT_THAT(args_, ElementsAre("OK", "1", ArrLen(2)));
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(RedisParserTest, Hierarchy) {
|
TEST_F(RedisParserTest, Hierarchy) {
|
||||||
|
@ -183,9 +196,9 @@ TEST_F(RedisParserTest, LargeBulk) {
|
||||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(half));
|
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(half));
|
||||||
ASSERT_EQ(512, consumed_);
|
ASSERT_EQ(512, consumed_);
|
||||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\r"));
|
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\r"));
|
||||||
ASSERT_EQ(0, consumed_);
|
ASSERT_EQ(1, consumed_);
|
||||||
ASSERT_EQ(RedisParser::OK, Parse("\r\n"));
|
ASSERT_EQ(RedisParser::OK, Parse("\n"));
|
||||||
ASSERT_EQ(2, consumed_);
|
EXPECT_EQ(1, consumed_);
|
||||||
|
|
||||||
string part1 = absl::StrCat(prefix, half);
|
string part1 = absl::StrCat(prefix, half);
|
||||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(part1));
|
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse(part1));
|
||||||
|
@ -208,7 +221,8 @@ TEST_F(RedisParserTest, LargeBulk) {
|
||||||
TEST_F(RedisParserTest, NILs) {
|
TEST_F(RedisParserTest, NILs) {
|
||||||
ASSERT_EQ(RedisParser::BAD_ARRAYLEN, Parse("_\r\n"));
|
ASSERT_EQ(RedisParser::BAD_ARRAYLEN, Parse("_\r\n"));
|
||||||
parser_.SetClientMode();
|
parser_.SetClientMode();
|
||||||
ASSERT_EQ(RedisParser::OK, Parse("_\r\n"));
|
ASSERT_EQ(RedisParser::OK, Parse("_\r\nfooobar"));
|
||||||
|
EXPECT_EQ(3, consumed_);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(RedisParserTest, NestedArray) {
|
TEST_F(RedisParserTest, NestedArray) {
|
||||||
|
@ -245,15 +259,15 @@ TEST_F(RedisParserTest, UsedMemory) {
|
||||||
|
|
||||||
TEST_F(RedisParserTest, Eol) {
|
TEST_F(RedisParserTest, Eol) {
|
||||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("*1\r"));
|
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("*1\r"));
|
||||||
EXPECT_EQ(1, consumed_);
|
EXPECT_EQ(3, consumed_);
|
||||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("1\r\n$5\r\n"));
|
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("\n$5\r\n"));
|
||||||
EXPECT_EQ(7, consumed_);
|
EXPECT_EQ(5, consumed_);
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(RedisParserTest, BulkSplit) {
|
TEST_F(RedisParserTest, BulkSplit) {
|
||||||
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("*1\r\n$4\r\nSADD"));
|
ASSERT_EQ(RedisParser::INPUT_PENDING, Parse("*1\r\n$4\r\nSADD\r"));
|
||||||
ASSERT_EQ(12, consumed_);
|
ASSERT_EQ(13, consumed_);
|
||||||
ASSERT_EQ(RedisParser::OK, Parse("\r\n"));
|
ASSERT_EQ(RedisParser::OK, Parse("\n"));
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(RedisParserTest, InlineSplit) {
|
TEST_F(RedisParserTest, InlineSplit) {
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue