diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 39ef714e5..7102d49e1 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -1137,8 +1137,14 @@ auto Connection::ParseMemcache() -> ParserStatus { do { string_view str = ToSV(io_buf_.InputBuffer()); + + if (str.empty()) { + return OK; + } + result = memcache_parser_->Parse(str, &consumed, &cmd); + DVLOG(2) << "mc_result " << result << " consumed: " << consumed << " type " << cmd.type; if (result != MemcacheParser::OK) { io_buf_.ConsumeInput(consumed); break; diff --git a/src/facade/memcache_parser.cc b/src/facade/memcache_parser.cc index 763599ee5..4720b0f67 100644 --- a/src/facade/memcache_parser.cc +++ b/src/facade/memcache_parser.cc @@ -6,12 +6,14 @@ #include #include #include +#include #include #include #include #include "base/logging.h" #include "base/stl_util.h" +#include "facade/facade_types.h" namespace facade { using namespace std; @@ -29,16 +31,37 @@ MP::CmdType From(string_view token) { {"quit", MP::QUIT}, {"version", MP::VERSION}, }; - auto it = cmd_map.find(token); - if (it == cmd_map.end()) + if (token.size() == 2) { + // META_COMMANDS + if (token[0] != 'm') + return MP::INVALID; + switch (token[1]) { + case 's': + return MP::META_SET; + case 'g': + return MP::META_GET; + case 'd': + return MP::META_DEL; + case 'a': + return MP::META_ARITHM; + case 'n': + return MP::META_NOOP; + case 'e': + return MP::META_DEBUG; + } return MP::INVALID; + } - return it->second; + if (token.size() > 2) { + auto it = cmd_map.find(token); + if (it == cmd_map.end()) + return MP::INVALID; + return it->second; + } + return MP::INVALID; } -using TokensView = absl::Span; - -MP::Result ParseStore(TokensView tokens, MP::Command* res) { +MP::Result ParseStore(ArgSlice tokens, MP::Command* res) { const size_t num_tokens = tokens.size(); unsigned opt_pos = 3; if (res->type == MP::CAS) { @@ -70,7 +93,7 @@ MP::Result ParseStore(TokensView tokens, MP::Command* res) { return MP::OK; } -MP::Result ParseValueless(TokensView tokens, MP::Command* res) { +MP::Result ParseValueless(ArgSlice tokens, MP::Command* res) { const size_t num_tokens = tokens.size(); size_t key_pos = 0; if (res->type == MP::GAT || res->type == MP::GATS) { @@ -116,6 +139,150 @@ MP::Result ParseValueless(TokensView tokens, MP::Command* res) { return MP::OK; } +bool ParseMetaMode(char m, MP::Command* res) { + if (res->type == MP::SET) { + switch (m) { + case 'E': + res->type = MP::ADD; + break; + case 'A': + res->type = MP::APPEND; + break; + case 'R': + res->type = MP::REPLACE; + break; + case 'P': + res->type = MP::PREPEND; + break; + case 'S': + break; + default: + return false; + } + return true; + } + + if (res->type == MP::INCR) { + switch (m) { + case 'I': + case '+': + break; + case 'D': + case '-': + res->type = MP::DECR; + break; + default: + return false; + } + return true; + } + return false; +} + +// See https://raw.githubusercontent.com/memcached/memcached/refs/heads/master/doc/protocol.txt +MP::Result ParseMeta(ArgSlice tokens, MP::Command* res) { + DCHECK(!tokens.empty()); + + if (res->type == MP::META_DEBUG) { + LOG(ERROR) << "meta debug not yet implemented"; + return MP::PARSE_ERROR; + } + + if (tokens[0].size() > 250) + return MP::PARSE_ERROR; + + res->meta = true; + res->key = tokens[0]; + res->bytes_len = 0; + res->flags = 0; + res->expire_ts = 0; + + tokens.remove_prefix(1); + + // We emulate the behavior by returning the high level commands. + // TODO: we should reverse the interface in the future, so that a high level command + // will be represented in MemcacheParser::Command by a meta command with flags. + // high level commands should not be part of the interface in the future. + switch (res->type) { + case MP::META_GET: + res->type = MP::GET; + break; + case MP::META_DEL: + res->type = MP::DELETE; + break; + case MP::META_SET: + if (tokens.empty()) { + return MP::PARSE_ERROR; + } + if (!absl::SimpleAtoi(tokens[0], &res->bytes_len)) + return MP::BAD_INT; + + res->type = MP::SET; + tokens.remove_prefix(1); + break; + case MP::META_ARITHM: + res->type = MP::INCR; + res->delta = 1; + break; + default: + return MP::PARSE_ERROR; + } + + for (size_t i = 0; i < tokens.size(); ++i) { + string_view token = tokens[i]; + + switch (token[0]) { + case 'T': + if (!absl::SimpleAtoi(token.substr(1), &res->expire_ts)) + return MP::BAD_INT; + break; + case 'b': + if (token.size() != 1) + return MP::PARSE_ERROR; + if (!absl::Base64Unescape(res->key, &res->blob)) + return MP::PARSE_ERROR; + res->key = res->blob; + res->base64 = true; + break; + case 'F': + if (!absl::SimpleAtoi(token.substr(1), &res->flags)) + return MP::BAD_INT; + break; + case 'M': + if (token.size() != 2 || !ParseMetaMode(token[1], res)) + return MP::PARSE_ERROR; + break; + case 'D': + if (!absl::SimpleAtoi(token.substr(1), &res->delta)) + return MP::BAD_INT; + break; + case 'q': + res->no_reply = true; + break; + case 'f': + res->return_flags = true; + break; + case 'v': + res->return_value = true; + break; + case 't': + res->return_ttl = true; + break; + case 'l': + res->return_access_time = true; + break; + case 'h': + res->return_hit = true; + break; + default: + LOG(WARNING) << "unknown meta flag: " << token; // not yet implemented + return MP::PARSE_ERROR; + } + } + + return MP::OK; +} + } // namespace auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result { @@ -123,6 +290,8 @@ auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result { auto pos = str.find("\r\n"); *consumed = 0; if (pos == string_view::npos) { + // We need more data to parse the command. For get/gets commands this line can be very long. + // we limit maxmimum buffer capacity in the higher levels using max_client_iobuf_len. return INPUT_PENDING; } @@ -131,16 +300,15 @@ auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result { } *consumed = pos + 2; - std::string_view tokens_expression = str.substr(0, pos); + string_view tokens_expression = str.substr(0, pos); // cas [noreply]\r\n // get *\r\n - absl::InlinedVector tokens = + // ms *\r\n + absl::InlinedVector tokens = absl::StrSplit(tokens_expression, ' ', absl::SkipWhitespace()); - const size_t num_tokens = tokens.size(); - - if (num_tokens == 0) + if (tokens.empty()) return PARSE_ERROR; cmd->type = From(tokens[0]); @@ -148,25 +316,31 @@ auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result { return UNKNOWN_CMD; } - if (cmd->type <= CAS) { // Store command - if (num_tokens < 5 || tokens[1].size() > 250) { // key length limit + ArgSlice tokens_view{tokens}; + tokens_view.remove_prefix(1); + + if (cmd->type <= CAS) { // Store command + if (tokens_view.size() < 4 || tokens[0].size() > 250) { // key length limit return MP::PARSE_ERROR; } - cmd->key = string_view{tokens[1].data(), tokens[1].size()}; + cmd->key = tokens_view[0]; - TokensView tokens_view{tokens.begin() + 2, num_tokens - 2}; + tokens_view.remove_prefix(1); return ParseStore(tokens_view, cmd); } - if (num_tokens == 1) { - if (base::_in(cmd->type, {MP::STATS, MP::FLUSHALL, MP::QUIT, MP::VERSION})) { + if (cmd->type >= META_SET) { + return tokens_view.empty() ? MP::PARSE_ERROR : ParseMeta(tokens_view, cmd); + } + + if (tokens_view.empty()) { + if (base::_in(cmd->type, {MP::STATS, MP::FLUSHALL, MP::QUIT, MP::VERSION, MP::META_NOOP})) { return MP::OK; } return MP::PARSE_ERROR; } - TokensView tokens_view{tokens.begin() + 1, num_tokens - 1}; return ParseValueless(tokens_view, cmd); }; diff --git a/src/facade/memcache_parser.h b/src/facade/memcache_parser.h index 1906c3ded..65db0f747 100644 --- a/src/facade/memcache_parser.h +++ b/src/facade/memcache_parser.h @@ -5,6 +5,7 @@ #pragma once #include +#include #include #include @@ -39,6 +40,14 @@ class MemcacheParser { INCR = 32, DECR = 33, FLUSHALL = 34, + + // META_COMMANDS + META_NOOP = 50, + META_SET = 51, + META_DEL = 52, + META_ARITHM = 53, + META_GET = 54, + META_DEBUG = 55, }; // According to https://github.com/memcached/memcached/wiki/Commands#standard-protocol @@ -56,7 +65,18 @@ class MemcacheParser { 0; // relative (expire_ts <= month) or unix time (expire_ts > month) in seconds uint32_t bytes_len = 0; uint32_t flags = 0; - bool no_reply = false; + bool no_reply = false; // q + bool meta = false; + + // meta flags + bool base64 = false; // b + bool return_flags = false; // f + bool return_value = false; // v + bool return_ttl = false; // t + bool return_access_time = false; // l + bool return_hit = false; // h + // Used internally by meta parsing. + std::string blob; }; enum Result { @@ -64,7 +84,7 @@ class MemcacheParser { INPUT_PENDING, UNKNOWN_CMD, BAD_INT, - PARSE_ERROR, + PARSE_ERROR, // request parse error, but can continue parsing within the same connection. BAD_DELTA, }; diff --git a/src/facade/memcache_parser_test.cc b/src/facade/memcache_parser_test.cc index 9e20d4049..a3c145587 100644 --- a/src/facade/memcache_parser_test.cc +++ b/src/facade/memcache_parser_test.cc @@ -101,6 +101,54 @@ TEST_F(MCParserTest, NoreplyBasic) { EXPECT_FALSE(cmd_.no_reply); } +TEST_F(MCParserTest, Meta) { + MemcacheParser::Result st = parser_.Parse("ms key1 ", &consumed_, &cmd_); + EXPECT_EQ(MemcacheParser::INPUT_PENDING, st); + EXPECT_EQ(0, consumed_); + st = parser_.Parse("ms key1 6 T1 F2\r\n", &consumed_, &cmd_); + EXPECT_EQ(MemcacheParser::OK, st); + EXPECT_EQ(17, consumed_); + EXPECT_EQ(MemcacheParser::SET, cmd_.type); + EXPECT_EQ("key1", cmd_.key); + EXPECT_EQ(2, cmd_.flags); + EXPECT_EQ(1, cmd_.expire_ts); + + st = parser_.Parse("ms 16nXnNeV150= 5 b ME\r\n", &consumed_, &cmd_); + EXPECT_EQ(MemcacheParser::OK, st); + EXPECT_EQ(24, consumed_); + EXPECT_EQ(MemcacheParser::ADD, cmd_.type); + EXPECT_EQ("שלום", cmd_.key); + EXPECT_EQ(5, cmd_.bytes_len); + + st = parser_.Parse("mg 16nXnNeV150= b\r\n", &consumed_, &cmd_); + EXPECT_EQ(MemcacheParser::OK, st); + EXPECT_EQ(19, consumed_); + EXPECT_EQ(MemcacheParser::GET, cmd_.type); + EXPECT_EQ("שלום", cmd_.key); + + st = parser_.Parse("ma val b\r\n", &consumed_, &cmd_); + EXPECT_EQ(MemcacheParser::OK, st); + EXPECT_EQ(10, consumed_); + EXPECT_EQ(MemcacheParser::INCR, cmd_.type); + + st = parser_.Parse("ma val M- D10\r\n", &consumed_, &cmd_); + EXPECT_EQ(MemcacheParser::OK, st); + EXPECT_EQ(15, consumed_); + EXPECT_EQ(MemcacheParser::DECR, cmd_.type); + EXPECT_EQ(10, cmd_.delta); + + st = parser_.Parse("mg key f v t l h\r\n", &consumed_, &cmd_); + EXPECT_EQ(MemcacheParser::OK, st); + EXPECT_EQ(18, consumed_); + EXPECT_EQ(MemcacheParser::GET, cmd_.type); + EXPECT_EQ("key", cmd_.key); + EXPECT_TRUE(cmd_.return_flags); + EXPECT_TRUE(cmd_.return_value); + EXPECT_TRUE(cmd_.return_ttl); + EXPECT_TRUE(cmd_.return_access_time); + EXPECT_TRUE(cmd_.return_hit); +} + class MCParserNoreplyTest : public MCParserTest { protected: void RunTest(string_view str, bool noreply) { diff --git a/tests/dragonfly/memcache_meta.py b/tests/dragonfly/memcache_meta.py new file mode 100644 index 000000000..3b4cf0b4c --- /dev/null +++ b/tests/dragonfly/memcache_meta.py @@ -0,0 +1,21 @@ +from .instance import DflyInstance +from . import dfly_args +from meta_memcache import ( + Key, + ServerAddress, + CacheClient, + connection_pool_factory_builder, +) + +DEFAULT_ARGS = {"memcached_port": 11211, "proactor_threads": 4} + + +@dfly_args(DEFAULT_ARGS) +def test_basic(df_server: DflyInstance): + pool = CacheClient.cache_client_from_servers( + servers=[ + ServerAddress(host="localhost", port=DEFAULT_ARGS.get("memcached_port")), + ], + connection_pool_factory_fn=connection_pool_factory_builder(), + ) + # TODO: to add integration tests diff --git a/tests/dragonfly/pymemcached_test.py b/tests/dragonfly/pymemcached_test.py index 3d5842341..a8b405555 100644 --- a/tests/dragonfly/pymemcached_test.py +++ b/tests/dragonfly/pymemcached_test.py @@ -5,7 +5,6 @@ from redis import Redis import socket import random import time -import warnings from . import dfly_args from .instance import DflyInstance diff --git a/tests/dragonfly/requirements.txt b/tests/dragonfly/requirements.txt index f6be33893..cfbbd8262 100644 --- a/tests/dragonfly/requirements.txt +++ b/tests/dragonfly/requirements.txt @@ -13,6 +13,7 @@ wrapt==1.14.1 pytest-asyncio==0.20.1 pytest-repeat==0.9.3 pymemcache==4.0.0 +meta_memcache==2 prometheus_client==0.17.0 aiohttp==3.10.2 numpy