mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
fix: fix expiration processing for set command (#3607)
* fix: fix expiration processing for set command
This commit is contained in:
parent
de5ecc7447
commit
8e9b097b9d
6 changed files with 61 additions and 24 deletions
|
@ -52,7 +52,8 @@ class MemcacheParser {
|
|||
uint64_t delta; // for DECR/INCR commands.
|
||||
};
|
||||
|
||||
uint32_t expire_ts = 0; // relative time in seconds.
|
||||
uint32_t expire_ts =
|
||||
0; // relative (expire_ts <= month) or unix time (expire_ts > month) in seconds
|
||||
uint32_t bytes_len = 0;
|
||||
uint32_t flags = 0;
|
||||
bool no_reply = false;
|
||||
|
|
|
@ -691,23 +691,6 @@ OpResult<long> OpFieldTtl(Transaction* t, EngineShard* shard, string_view key, s
|
|||
return res <= 0 ? res : int32_t(res - MemberTimeSeconds(db_cntx.time_now_ms));
|
||||
}
|
||||
|
||||
OpResult<uint32_t> OpDel(const OpArgs& op_args, const ShardArgs& keys) {
|
||||
DVLOG(1) << "Del: " << keys.Front();
|
||||
auto& db_slice = op_args.GetDbSlice();
|
||||
|
||||
uint32_t res = 0;
|
||||
|
||||
for (string_view key : keys) {
|
||||
auto fres = db_slice.FindMutable(op_args.db_cntx, key);
|
||||
if (!IsValid(fres.it))
|
||||
continue;
|
||||
fres.post_updater.Run();
|
||||
res += int(db_slice.Del(op_args.db_cntx, fres.it));
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
OpResult<uint32_t> OpStick(const OpArgs& op_args, const ShardArgs& keys) {
|
||||
DVLOG(1) << "Stick: " << keys.Front();
|
||||
|
||||
|
@ -727,6 +710,23 @@ OpResult<uint32_t> OpStick(const OpArgs& op_args, const ShardArgs& keys) {
|
|||
|
||||
} // namespace
|
||||
|
||||
OpResult<uint32_t> GenericFamily::OpDel(const OpArgs& op_args, const ShardArgs& keys) {
|
||||
DVLOG(1) << "Del: " << keys.Front();
|
||||
auto& db_slice = op_args.GetDbSlice();
|
||||
|
||||
uint32_t res = 0;
|
||||
|
||||
for (string_view key : keys) {
|
||||
auto fres = db_slice.FindMutable(op_args.db_cntx, key);
|
||||
if (!IsValid(fres.it))
|
||||
continue;
|
||||
fres.post_updater.Run();
|
||||
res += int(db_slice.Del(op_args.db_cntx, fres.it));
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
void GenericFamily::Del(CmdArgList args, ConnectionContext* cntx) {
|
||||
Transaction* transaction = cntx->transaction;
|
||||
VLOG(1) << "Del " << ArgS(args, 0);
|
||||
|
|
|
@ -39,6 +39,7 @@ class GenericFamily {
|
|||
|
||||
// Accessed by Service::Exec and Service::Watch as an utility.
|
||||
static OpResult<uint32_t> OpExists(const OpArgs& op_args, const ShardArgs& keys);
|
||||
static OpResult<uint32_t> OpDel(const OpArgs& op_args, const ShardArgs& keys);
|
||||
|
||||
private:
|
||||
static void Del(CmdArgList args, ConnectionContext* cntx);
|
||||
|
|
|
@ -1493,7 +1493,7 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va
|
|||
char cmd_name[16];
|
||||
char ttl[16];
|
||||
char store_opt[32] = {0};
|
||||
char ttl_op[] = "EX";
|
||||
char ttl_op[] = "EXAT";
|
||||
|
||||
MCReplyBuilder* mc_builder = static_cast<MCReplyBuilder*>(cntx->reply_builder());
|
||||
mc_builder->SetNoreply(cmd.no_reply);
|
||||
|
@ -1564,9 +1564,15 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va
|
|||
args.emplace_back(store_opt, strlen(store_opt));
|
||||
}
|
||||
|
||||
if (cmd.expire_ts && memcmp(cmd_name, "SET", 3) == 0) {
|
||||
char* next = absl::numbers_internal::FastIntToBuffer(cmd.expire_ts, ttl);
|
||||
args.emplace_back(ttl_op, 2);
|
||||
// if expire_ts is greater than month it's a unix timestamp
|
||||
// https://github.com/memcached/memcached/blob/master/doc/protocol.txt#L139
|
||||
constexpr uint32_t kExpireLimit = 60 * 60 * 24 * 30;
|
||||
const uint64_t expire_ts = cmd.expire_ts && cmd.expire_ts <= kExpireLimit
|
||||
? cmd.expire_ts + time(nullptr)
|
||||
: cmd.expire_ts;
|
||||
if (expire_ts && memcmp(cmd_name, "SET", 3) == 0) {
|
||||
char* next = absl::numbers_internal::FastIntToBuffer(expire_ts, ttl);
|
||||
args.emplace_back(ttl_op, 4);
|
||||
args.emplace_back(ttl, next - ttl);
|
||||
}
|
||||
dfly_cntx->conn_state.memcache_flag = cmd.flags;
|
||||
|
|
|
@ -28,6 +28,7 @@
|
|||
#include "server/conn_context.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/error.h"
|
||||
#include "server/generic_family.h"
|
||||
#include "server/journal/journal.h"
|
||||
#include "server/table.h"
|
||||
#include "server/tiered_storage.h"
|
||||
|
@ -787,9 +788,15 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (abs_ms < 0)
|
||||
return cntx->SendError(InvalidExpireTime("set"));
|
||||
|
||||
// Redis reports just OK in this case
|
||||
if (rel_ms < 0)
|
||||
// Remove existed key if the key is expired already
|
||||
if (rel_ms < 0) {
|
||||
cntx->transaction->ScheduleSingleHop([key](const Transaction* tx, EngineShard* es) {
|
||||
ShardArgs args = tx->GetShardArgs(es->shard_id());
|
||||
GenericFamily::OpDel(tx->GetOpArgs(es), args);
|
||||
return OpStatus::OK;
|
||||
});
|
||||
return builder->SendStored();
|
||||
}
|
||||
|
||||
tie(sparams.expire_after_ms, ignore) = expiry.Calculate(now_ms, true);
|
||||
} else if (parser.Check("_MCFLAGS").ExpectTail(1)) {
|
||||
|
|
|
@ -3,6 +3,7 @@ from pymemcache.client.base import Client as MCClient
|
|||
from redis import Redis
|
||||
import socket
|
||||
import random
|
||||
import time
|
||||
|
||||
from . import dfly_args
|
||||
from .instance import DflyInstance
|
||||
|
@ -141,3 +142,24 @@ def test_flags(memcached_client: MCClient):
|
|||
# workaround sometimes memcached_client.raw_command returns empty str
|
||||
if len(res) > 0:
|
||||
assert res[2].decode() == str(flags)
|
||||
|
||||
|
||||
@dfly_args(DEFAULT_ARGS)
|
||||
def test_expiration(memcached_client: MCClient):
|
||||
assert not memcached_client.default_noreply
|
||||
|
||||
assert memcached_client.set("key1", "value1", 2)
|
||||
assert memcached_client.set("key2", "value2", int(time.time()) + 2)
|
||||
assert memcached_client.set("key3", "value3", int(time.time()) + 200)
|
||||
assert memcached_client.get("key1") == b"value1"
|
||||
assert memcached_client.get("key2") == b"value2"
|
||||
assert memcached_client.get("key3") == b"value3"
|
||||
time.sleep(1)
|
||||
assert memcached_client.set("key3", "value3", int(time.time()) - 200)
|
||||
assert memcached_client.get("key1") == b"value1"
|
||||
assert memcached_client.get("key2") == b"value2"
|
||||
assert memcached_client.get("key3") == None
|
||||
time.sleep(1)
|
||||
assert memcached_client.get("key1") == None
|
||||
assert memcached_client.get("key2") == None
|
||||
assert memcached_client.get("key3") == None
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue