feat: add support for meta memcache commands (#4362)

This is a stripped down version of supporting the memcache meta requests.

a. Not all meta flags are supported, but TTL, flags, arithmetics are supported.
b. does not include reply support.
c. does not include new semantics that are not part of the older, ascii protocol.

The parser interface has not changed significantly, and the meta commands are emulated
using the old, high level commands like ADD,REPLACE, INCR etc.

See https://raw.githubusercontent.com/memcached/memcached/refs/heads/master/doc/protocol.txt for more details
regarding the meta commands spec.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-12-24 15:33:24 +02:00 committed by GitHub
parent 6946820e56
commit a27cce81b1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 291 additions and 22 deletions

View file

@ -1137,8 +1137,14 @@ auto Connection::ParseMemcache() -> ParserStatus {
do {
string_view str = ToSV(io_buf_.InputBuffer());
if (str.empty()) {
return OK;
}
result = memcache_parser_->Parse(str, &consumed, &cmd);
DVLOG(2) << "mc_result " << result << " consumed: " << consumed << " type " << cmd.type;
if (result != MemcacheParser::OK) {
io_buf_.ConsumeInput(consumed);
break;

View file

@ -6,12 +6,14 @@
#include <absl/container/flat_hash_map.h>
#include <absl/container/inlined_vector.h>
#include <absl/strings/ascii.h>
#include <absl/strings/escaping.h>
#include <absl/strings/numbers.h>
#include <absl/strings/str_split.h>
#include <absl/types/span.h>
#include "base/logging.h"
#include "base/stl_util.h"
#include "facade/facade_types.h"
namespace facade {
using namespace std;
@ -29,16 +31,37 @@ MP::CmdType From(string_view token) {
{"quit", MP::QUIT}, {"version", MP::VERSION},
};
auto it = cmd_map.find(token);
if (it == cmd_map.end())
if (token.size() == 2) {
// META_COMMANDS
if (token[0] != 'm')
return MP::INVALID;
switch (token[1]) {
case 's':
return MP::META_SET;
case 'g':
return MP::META_GET;
case 'd':
return MP::META_DEL;
case 'a':
return MP::META_ARITHM;
case 'n':
return MP::META_NOOP;
case 'e':
return MP::META_DEBUG;
}
return MP::INVALID;
}
return it->second;
if (token.size() > 2) {
auto it = cmd_map.find(token);
if (it == cmd_map.end())
return MP::INVALID;
return it->second;
}
return MP::INVALID;
}
using TokensView = absl::Span<std::string_view>;
MP::Result ParseStore(TokensView tokens, MP::Command* res) {
MP::Result ParseStore(ArgSlice tokens, MP::Command* res) {
const size_t num_tokens = tokens.size();
unsigned opt_pos = 3;
if (res->type == MP::CAS) {
@ -70,7 +93,7 @@ MP::Result ParseStore(TokensView tokens, MP::Command* res) {
return MP::OK;
}
MP::Result ParseValueless(TokensView tokens, MP::Command* res) {
MP::Result ParseValueless(ArgSlice tokens, MP::Command* res) {
const size_t num_tokens = tokens.size();
size_t key_pos = 0;
if (res->type == MP::GAT || res->type == MP::GATS) {
@ -116,6 +139,150 @@ MP::Result ParseValueless(TokensView tokens, MP::Command* res) {
return MP::OK;
}
bool ParseMetaMode(char m, MP::Command* res) {
if (res->type == MP::SET) {
switch (m) {
case 'E':
res->type = MP::ADD;
break;
case 'A':
res->type = MP::APPEND;
break;
case 'R':
res->type = MP::REPLACE;
break;
case 'P':
res->type = MP::PREPEND;
break;
case 'S':
break;
default:
return false;
}
return true;
}
if (res->type == MP::INCR) {
switch (m) {
case 'I':
case '+':
break;
case 'D':
case '-':
res->type = MP::DECR;
break;
default:
return false;
}
return true;
}
return false;
}
// See https://raw.githubusercontent.com/memcached/memcached/refs/heads/master/doc/protocol.txt
MP::Result ParseMeta(ArgSlice tokens, MP::Command* res) {
DCHECK(!tokens.empty());
if (res->type == MP::META_DEBUG) {
LOG(ERROR) << "meta debug not yet implemented";
return MP::PARSE_ERROR;
}
if (tokens[0].size() > 250)
return MP::PARSE_ERROR;
res->meta = true;
res->key = tokens[0];
res->bytes_len = 0;
res->flags = 0;
res->expire_ts = 0;
tokens.remove_prefix(1);
// We emulate the behavior by returning the high level commands.
// TODO: we should reverse the interface in the future, so that a high level command
// will be represented in MemcacheParser::Command by a meta command with flags.
// high level commands should not be part of the interface in the future.
switch (res->type) {
case MP::META_GET:
res->type = MP::GET;
break;
case MP::META_DEL:
res->type = MP::DELETE;
break;
case MP::META_SET:
if (tokens.empty()) {
return MP::PARSE_ERROR;
}
if (!absl::SimpleAtoi(tokens[0], &res->bytes_len))
return MP::BAD_INT;
res->type = MP::SET;
tokens.remove_prefix(1);
break;
case MP::META_ARITHM:
res->type = MP::INCR;
res->delta = 1;
break;
default:
return MP::PARSE_ERROR;
}
for (size_t i = 0; i < tokens.size(); ++i) {
string_view token = tokens[i];
switch (token[0]) {
case 'T':
if (!absl::SimpleAtoi(token.substr(1), &res->expire_ts))
return MP::BAD_INT;
break;
case 'b':
if (token.size() != 1)
return MP::PARSE_ERROR;
if (!absl::Base64Unescape(res->key, &res->blob))
return MP::PARSE_ERROR;
res->key = res->blob;
res->base64 = true;
break;
case 'F':
if (!absl::SimpleAtoi(token.substr(1), &res->flags))
return MP::BAD_INT;
break;
case 'M':
if (token.size() != 2 || !ParseMetaMode(token[1], res))
return MP::PARSE_ERROR;
break;
case 'D':
if (!absl::SimpleAtoi(token.substr(1), &res->delta))
return MP::BAD_INT;
break;
case 'q':
res->no_reply = true;
break;
case 'f':
res->return_flags = true;
break;
case 'v':
res->return_value = true;
break;
case 't':
res->return_ttl = true;
break;
case 'l':
res->return_access_time = true;
break;
case 'h':
res->return_hit = true;
break;
default:
LOG(WARNING) << "unknown meta flag: " << token; // not yet implemented
return MP::PARSE_ERROR;
}
}
return MP::OK;
}
} // namespace
auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result {
@ -123,6 +290,8 @@ auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result {
auto pos = str.find("\r\n");
*consumed = 0;
if (pos == string_view::npos) {
// We need more data to parse the command. For get/gets commands this line can be very long.
// we limit maxmimum buffer capacity in the higher levels using max_client_iobuf_len.
return INPUT_PENDING;
}
@ -131,16 +300,15 @@ auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result {
}
*consumed = pos + 2;
std::string_view tokens_expression = str.substr(0, pos);
string_view tokens_expression = str.substr(0, pos);
// cas <key> <flags> <exptime> <bytes> <cas unique> [noreply]\r\n
// get <key>*\r\n
absl::InlinedVector<std::string_view, 32> tokens =
// ms <key> <datalen> <flags>*\r\n
absl::InlinedVector<string_view, 32> tokens =
absl::StrSplit(tokens_expression, ' ', absl::SkipWhitespace());
const size_t num_tokens = tokens.size();
if (num_tokens == 0)
if (tokens.empty())
return PARSE_ERROR;
cmd->type = From(tokens[0]);
@ -148,25 +316,31 @@ auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result {
return UNKNOWN_CMD;
}
if (cmd->type <= CAS) { // Store command
if (num_tokens < 5 || tokens[1].size() > 250) { // key length limit
ArgSlice tokens_view{tokens};
tokens_view.remove_prefix(1);
if (cmd->type <= CAS) { // Store command
if (tokens_view.size() < 4 || tokens[0].size() > 250) { // key length limit
return MP::PARSE_ERROR;
}
cmd->key = string_view{tokens[1].data(), tokens[1].size()};
cmd->key = tokens_view[0];
TokensView tokens_view{tokens.begin() + 2, num_tokens - 2};
tokens_view.remove_prefix(1);
return ParseStore(tokens_view, cmd);
}
if (num_tokens == 1) {
if (base::_in(cmd->type, {MP::STATS, MP::FLUSHALL, MP::QUIT, MP::VERSION})) {
if (cmd->type >= META_SET) {
return tokens_view.empty() ? MP::PARSE_ERROR : ParseMeta(tokens_view, cmd);
}
if (tokens_view.empty()) {
if (base::_in(cmd->type, {MP::STATS, MP::FLUSHALL, MP::QUIT, MP::VERSION, MP::META_NOOP})) {
return MP::OK;
}
return MP::PARSE_ERROR;
}
TokensView tokens_view{tokens.begin() + 1, num_tokens - 1};
return ParseValueless(tokens_view, cmd);
};

View file

@ -5,6 +5,7 @@
#pragma once
#include <cstdint>
#include <string>
#include <string_view>
#include <vector>
@ -39,6 +40,14 @@ class MemcacheParser {
INCR = 32,
DECR = 33,
FLUSHALL = 34,
// META_COMMANDS
META_NOOP = 50,
META_SET = 51,
META_DEL = 52,
META_ARITHM = 53,
META_GET = 54,
META_DEBUG = 55,
};
// According to https://github.com/memcached/memcached/wiki/Commands#standard-protocol
@ -56,7 +65,18 @@ class MemcacheParser {
0; // relative (expire_ts <= month) or unix time (expire_ts > month) in seconds
uint32_t bytes_len = 0;
uint32_t flags = 0;
bool no_reply = false;
bool no_reply = false; // q
bool meta = false;
// meta flags
bool base64 = false; // b
bool return_flags = false; // f
bool return_value = false; // v
bool return_ttl = false; // t
bool return_access_time = false; // l
bool return_hit = false; // h
// Used internally by meta parsing.
std::string blob;
};
enum Result {
@ -64,7 +84,7 @@ class MemcacheParser {
INPUT_PENDING,
UNKNOWN_CMD,
BAD_INT,
PARSE_ERROR,
PARSE_ERROR, // request parse error, but can continue parsing within the same connection.
BAD_DELTA,
};

View file

@ -101,6 +101,54 @@ TEST_F(MCParserTest, NoreplyBasic) {
EXPECT_FALSE(cmd_.no_reply);
}
TEST_F(MCParserTest, Meta) {
MemcacheParser::Result st = parser_.Parse("ms key1 ", &consumed_, &cmd_);
EXPECT_EQ(MemcacheParser::INPUT_PENDING, st);
EXPECT_EQ(0, consumed_);
st = parser_.Parse("ms key1 6 T1 F2\r\n", &consumed_, &cmd_);
EXPECT_EQ(MemcacheParser::OK, st);
EXPECT_EQ(17, consumed_);
EXPECT_EQ(MemcacheParser::SET, cmd_.type);
EXPECT_EQ("key1", cmd_.key);
EXPECT_EQ(2, cmd_.flags);
EXPECT_EQ(1, cmd_.expire_ts);
st = parser_.Parse("ms 16nXnNeV150= 5 b ME\r\n", &consumed_, &cmd_);
EXPECT_EQ(MemcacheParser::OK, st);
EXPECT_EQ(24, consumed_);
EXPECT_EQ(MemcacheParser::ADD, cmd_.type);
EXPECT_EQ("שלום", cmd_.key);
EXPECT_EQ(5, cmd_.bytes_len);
st = parser_.Parse("mg 16nXnNeV150= b\r\n", &consumed_, &cmd_);
EXPECT_EQ(MemcacheParser::OK, st);
EXPECT_EQ(19, consumed_);
EXPECT_EQ(MemcacheParser::GET, cmd_.type);
EXPECT_EQ("שלום", cmd_.key);
st = parser_.Parse("ma val b\r\n", &consumed_, &cmd_);
EXPECT_EQ(MemcacheParser::OK, st);
EXPECT_EQ(10, consumed_);
EXPECT_EQ(MemcacheParser::INCR, cmd_.type);
st = parser_.Parse("ma val M- D10\r\n", &consumed_, &cmd_);
EXPECT_EQ(MemcacheParser::OK, st);
EXPECT_EQ(15, consumed_);
EXPECT_EQ(MemcacheParser::DECR, cmd_.type);
EXPECT_EQ(10, cmd_.delta);
st = parser_.Parse("mg key f v t l h\r\n", &consumed_, &cmd_);
EXPECT_EQ(MemcacheParser::OK, st);
EXPECT_EQ(18, consumed_);
EXPECT_EQ(MemcacheParser::GET, cmd_.type);
EXPECT_EQ("key", cmd_.key);
EXPECT_TRUE(cmd_.return_flags);
EXPECT_TRUE(cmd_.return_value);
EXPECT_TRUE(cmd_.return_ttl);
EXPECT_TRUE(cmd_.return_access_time);
EXPECT_TRUE(cmd_.return_hit);
}
class MCParserNoreplyTest : public MCParserTest {
protected:
void RunTest(string_view str, bool noreply) {

View file

@ -0,0 +1,21 @@
from .instance import DflyInstance
from . import dfly_args
from meta_memcache import (
Key,
ServerAddress,
CacheClient,
connection_pool_factory_builder,
)
DEFAULT_ARGS = {"memcached_port": 11211, "proactor_threads": 4}
@dfly_args(DEFAULT_ARGS)
def test_basic(df_server: DflyInstance):
pool = CacheClient.cache_client_from_servers(
servers=[
ServerAddress(host="localhost", port=DEFAULT_ARGS.get("memcached_port")),
],
connection_pool_factory_fn=connection_pool_factory_builder(),
)
# TODO: to add integration tests

View file

@ -5,7 +5,6 @@ from redis import Redis
import socket
import random
import time
import warnings
from . import dfly_args
from .instance import DflyInstance

View file

@ -13,6 +13,7 @@ wrapt==1.14.1
pytest-asyncio==0.20.1
pytest-repeat==0.9.3
pymemcache==4.0.0
meta_memcache==2
prometheus_client==0.17.0
aiohttp==3.10.2
numpy