feat: support memcache meta responses (#4366)

Fixes #4348 and #3071

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-12-25 09:46:50 +02:00 committed by GitHub
parent c88c707341
commit 966a1a46fd
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 107 additions and 29 deletions

View file

@ -421,8 +421,8 @@ struct Connection::AsyncOperations {
} }
void operator()(const PubMessage& msg); void operator()(const PubMessage& msg);
void operator()(Connection::PipelineMessage& msg); void operator()(PipelineMessage& msg);
void operator()(const Connection::MCPipelineMessage& msg); void operator()(const MCPipelineMessage& msg);
void operator()(const MonitorMessage& msg); void operator()(const MonitorMessage& msg);
void operator()(const AclUpdateMessage& msg); void operator()(const AclUpdateMessage& msg);
void operator()(const MigrationRequestMessage& msg); void operator()(const MigrationRequestMessage& msg);
@ -479,7 +479,7 @@ void Connection::AsyncOperations::operator()(Connection::PipelineMessage& msg) {
self->skip_next_squashing_ = false; self->skip_next_squashing_ = false;
} }
void Connection::AsyncOperations::operator()(const Connection::MCPipelineMessage& msg) { void Connection::AsyncOperations::operator()(const MCPipelineMessage& msg) {
self->service_->DispatchMC(msg.cmd, msg.value, self->service_->DispatchMC(msg.cmd, msg.value,
static_cast<MCReplyBuilder*>(self->reply_builder_.get()), static_cast<MCReplyBuilder*>(self->reply_builder_.get()),
self->cc_.get()); self->cc_.get());

View file

@ -274,6 +274,9 @@ MP::Result ParseMeta(ArgSlice tokens, MP::Command* res) {
case 'h': case 'h':
res->return_hit = true; res->return_hit = true;
break; break;
case 'c':
res->return_version = true;
break;
default: default:
LOG(WARNING) << "unknown meta flag: " << token; // not yet implemented LOG(WARNING) << "unknown meta flag: " << token; // not yet implemented
return MP::PARSE_ERROR; return MP::PARSE_ERROR;
@ -291,7 +294,7 @@ auto MP::Parse(string_view str, uint32_t* consumed, Command* cmd) -> Result {
*consumed = 0; *consumed = 0;
if (pos == string_view::npos) { if (pos == string_view::npos) {
// We need more data to parse the command. For get/gets commands this line can be very long. // We need more data to parse the command. For get/gets commands this line can be very long.
// we limit maxmimum buffer capacity in the higher levels using max_client_iobuf_len. // we limit maximum buffer capacity in the higher levels using max_client_iobuf_len.
return INPUT_PENDING; return INPUT_PENDING;
} }

View file

@ -75,6 +75,8 @@ class MemcacheParser {
bool return_ttl = false; // t bool return_ttl = false; // t
bool return_access_time = false; // l bool return_access_time = false; // l
bool return_hit = false; // h bool return_hit = false; // h
bool return_version = false; // c
// Used internally by meta parsing. // Used internally by meta parsing.
std::string blob; std::string blob;
}; };

View file

@ -204,27 +204,40 @@ void SinkReplyBuilder::NextVec(std::string_view str) {
vecs_.push_back(iovec{const_cast<char*>(str.data()), str.size()}); vecs_.push_back(iovec{const_cast<char*>(str.data()), str.size()});
} }
MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink), noreply_(false) { MCReplyBuilder::MCReplyBuilder(::io::Sink* sink) : SinkReplyBuilder(sink), all_(0) {
} }
void MCReplyBuilder::SendValue(std::string_view key, std::string_view value, uint64_t mc_ver, void MCReplyBuilder::SendValue(std::string_view key, std::string_view value, uint64_t mc_ver,
uint32_t mc_flag) { uint32_t mc_flag) {
ReplyScope scope(this); ReplyScope scope(this);
WritePieces("VALUE ", key, " ", mc_flag, " ", value.size()); if (flag_.meta) {
if (mc_ver) string flags;
WritePieces(" ", mc_ver); if (flag_.return_mcflag)
absl::StrAppend(&flags, " f", mc_flag);
if (value.size() <= kMaxInlineSize) { if (flag_.return_version)
WritePieces(kCRLF, value, kCRLF); absl::StrAppend(&flags, " c", mc_ver);
if (flag_.return_value) {
WritePieces("VA ", value.size(), flags, kCRLF, value, kCRLF);
} else {
WritePieces("HD ", flags, kCRLF);
}
} else { } else {
WritePieces(kCRLF); WritePieces("VALUE ", key, " ", mc_flag, " ", value.size());
WriteRef(value); if (mc_ver)
WritePieces(kCRLF); WritePieces(" ", mc_ver);
if (value.size() <= kMaxInlineSize) {
WritePieces(kCRLF, value, kCRLF);
} else {
WritePieces(kCRLF);
WriteRef(value);
WritePieces(kCRLF);
}
} }
} }
void MCReplyBuilder::SendSimpleString(std::string_view str) { void MCReplyBuilder::SendSimpleString(std::string_view str) {
if (noreply_) if (flag_.noreply)
return; return;
ReplyScope scope(this); ReplyScope scope(this);
@ -232,7 +245,7 @@ void MCReplyBuilder::SendSimpleString(std::string_view str) {
} }
void MCReplyBuilder::SendStored() { void MCReplyBuilder::SendStored() {
SendSimpleString("STORED"); SendSimpleString(flag_.meta ? "HD" : "STORED");
} }
void MCReplyBuilder::SendLong(long val) { void MCReplyBuilder::SendLong(long val) {
@ -253,11 +266,21 @@ void MCReplyBuilder::SendClientError(string_view str) {
} }
void MCReplyBuilder::SendSetSkipped() { void MCReplyBuilder::SendSetSkipped() {
SendSimpleString("NOT_STORED"); SendSimpleString(flag_.meta ? "NS" : "NOT_STORED");
} }
void MCReplyBuilder::SendNotFound() { void MCReplyBuilder::SendNotFound() {
SendSimpleString("NOT_FOUND"); SendSimpleString(flag_.meta ? "NF" : "NOT_FOUND");
}
void MCReplyBuilder::SendGetEnd() {
if (!flag_.meta)
SendSimpleString("END");
}
void MCReplyBuilder::SendMiss() {
if (flag_.meta)
SendSimpleString("EN");
} }
void MCReplyBuilder::SendRaw(std::string_view str) { void MCReplyBuilder::SendRaw(std::string_view str) {

View file

@ -171,6 +171,8 @@ class MCReplyBuilder : public SinkReplyBuilder {
void SendClientError(std::string_view str); void SendClientError(std::string_view str);
void SendNotFound(); void SendNotFound();
void SendMiss();
void SendGetEnd();
void SendValue(std::string_view key, std::string_view value, uint64_t mc_ver, uint32_t mc_flag); void SendValue(std::string_view key, std::string_view value, uint64_t mc_ver, uint32_t mc_flag);
void SendSimpleString(std::string_view str) final; void SendSimpleString(std::string_view str) final;
@ -179,15 +181,45 @@ class MCReplyBuilder : public SinkReplyBuilder {
void SendRaw(std::string_view str); void SendRaw(std::string_view str);
void SetNoreply(bool noreply) { void SetNoreply(bool noreply) {
noreply_ = noreply; flag_.noreply = noreply;
} }
bool NoReply() const { bool NoReply() const {
return noreply_; return flag_.noreply;
}
void SetMeta(bool meta) {
flag_.meta = meta;
}
void SetBase64(bool base64) {
flag_.base64 = base64;
}
void SetReturnMCFlag(bool val) {
flag_.return_mcflag = val;
}
void SetReturnValue(bool val) {
flag_.return_value = val;
}
void SetReturnVersion(bool val) {
flag_.return_version = val;
} }
private: private:
bool noreply_ = false; union {
struct {
uint8_t noreply : 1;
uint8_t meta : 1;
uint8_t base64 : 1;
uint8_t return_value : 1;
uint8_t return_mcflag : 1;
uint8_t return_version : 1;
} flag_;
uint8_t all_;
};
}; };
// Redis reply builder interface for sending RESP data. // Redis reply builder interface for sending RESP data.

View file

@ -1490,6 +1490,13 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va
char ttl_op[] = "EXAT"; char ttl_op[] = "EXAT";
mc_builder->SetNoreply(cmd.no_reply); mc_builder->SetNoreply(cmd.no_reply);
mc_builder->SetMeta(cmd.meta);
if (cmd.meta) {
mc_builder->SetBase64(cmd.base64);
mc_builder->SetReturnMCFlag(cmd.return_flags);
mc_builder->SetReturnValue(cmd.return_value);
mc_builder->SetReturnVersion(cmd.return_version);
}
switch (cmd.type) { switch (cmd.type) {
case MemcacheParser::REPLACE: case MemcacheParser::REPLACE:
@ -1533,7 +1540,7 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va
server_family_.StatsMC(cmd.key, mc_builder); server_family_.StatsMC(cmd.key, mc_builder);
return; return;
case MemcacheParser::VERSION: case MemcacheParser::VERSION:
mc_builder->SendSimpleString("VERSION 1.5.0 DF"); mc_builder->SendSimpleString("VERSION 1.6.0 DF");
return; return;
default: default:
mc_builder->SendClientError("bad command line format"); mc_builder->SendClientError("bad command line format");

View file

@ -1360,11 +1360,13 @@ void StringFamily::MGet(CmdArgList args, const CommandContext& cmnd_cntx) {
auto* rb = static_cast<MCReplyBuilder*>(builder); auto* rb = static_cast<MCReplyBuilder*>(builder);
DCHECK(dynamic_cast<CapturingReplyBuilder*>(builder) == nullptr); DCHECK(dynamic_cast<CapturingReplyBuilder*>(builder) == nullptr);
for (const auto& entry : res) { for (const auto& entry : res) {
if (!entry) if (entry) {
continue; rb->SendValue(entry->key, entry->value, entry->mc_ver, entry->mc_flag);
rb->SendValue(entry->key, entry->value, entry->mc_ver, entry->mc_flag); } else {
rb->SendMiss();
}
} }
rb->SendSimpleString("END"); rb->SendGetEnd();
} else { } else {
auto* rb = static_cast<RedisReplyBuilder*>(builder); auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(res.size()); rb->StartArray(res.size());

View file

@ -6,6 +6,7 @@ from meta_memcache import (
CacheClient, CacheClient,
connection_pool_factory_builder, connection_pool_factory_builder,
) )
from meta_memcache.protocol import RequestFlags, Miss, Value, Success
DEFAULT_ARGS = {"memcached_port": 11211, "proactor_threads": 4} DEFAULT_ARGS = {"memcached_port": 11211, "proactor_threads": 4}
@ -16,6 +17,14 @@ def test_basic(df_server: DflyInstance):
servers=[ servers=[
ServerAddress(host="localhost", port=DEFAULT_ARGS.get("memcached_port")), ServerAddress(host="localhost", port=DEFAULT_ARGS.get("memcached_port")),
], ],
connection_pool_factory_fn=connection_pool_factory_builder(), connection_pool_factory_fn=connection_pool_factory_builder(recv_timeout=5),
) )
# TODO: to add integration tests
assert pool.set("key1", "value1", 100)
assert pool.set("key1", "value2", 0)
assert pool.get("key1") == "value2"
request_flags = RequestFlags(return_value=False)
response = pool.meta_get(Key("key1"), flags=request_flags)
assert isinstance(response, Success)
assert pool.get("key2") is None

View file

@ -137,7 +137,7 @@ def test_version(memcached_client: MCClient):
Our real version is being returned in the stats command. Our real version is being returned in the stats command.
Also verified manually that php client parses correctly the version string that ends with "DF". Also verified manually that php client parses correctly the version string that ends with "DF".
""" """
assert b"1.5.0 DF" == memcached_client.version() assert b"1.6.0 DF" == memcached_client.version()
stats = memcached_client.stats() stats = memcached_client.stats()
version = stats[b"version"].decode("utf-8") version = stats[b"version"].decode("utf-8")
assert version.startswith("v") or version == "dev" assert version.startswith("v") or version == "dev"