feat(server): Journal rewrite p2 + tests (#699)

This commit is contained in:
Vladislav 2023-01-19 13:55:50 +03:00 committed by GitHub
parent ae017dbfdb
commit 45162b5c0a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 183 additions and 42 deletions

View file

@ -589,6 +589,8 @@ PrimeIterator DbSlice::AddNew(const Context& cntx, string_view key, PrimeValue o
}
pair<int64_t, int64_t> DbSlice::ExpireParams::Calculate(int64_t now_ms) const {
if (persist)
return {0, 0};
int64_t msec = (unit == TimeUnit::SEC) ? value * 1000 : value;
int64_t now_msec = now_ms;
int64_t rel_msec = absolute ? msec - now_msec : msec;
@ -608,7 +610,7 @@ OpResult<int64_t> DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime
if (rel_msec <= 0 && !params.persist) {
CHECK(Del(cntx.db_index, prime_it));
return -1;
} else if (IsValid(expire_it)) {
} else if (IsValid(expire_it) && !params.persist) {
expire_it->second = FromAbsoluteTime(abs_msec);
return abs_msec;
} else {

View file

@ -65,7 +65,7 @@ void DoPopulateBatch(std::string_view prefix, size_t val_size, bool random_value
const SetCmd::SetParams& params, const PopulateBatch& batch) {
DbContext db_cntx{batch.dbid, 0};
OpArgs op_args(EngineShard::tlocal(), 0, db_cntx);
SetCmd sg(op_args);
SetCmd sg(op_args, false);
absl::InsecureBitGen gen;
for (unsigned i = 0; i < batch.sz; ++i) {

View file

@ -421,7 +421,7 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) {
}
if (dest_it->second.HasExpire()) {
auto time = absl::StrCat(src_res_.expire_ts);
RecordJournal(op_args, "PEXPIREAT"sv, ArgSlice{time}, 2, true);
RecordJournal(op_args, "PEXPIREAT"sv, ArgSlice{dest_key, time}, 2, true);
}
RecordJournalFinish(op_args, 2);
}
@ -604,8 +604,9 @@ uint64_t ScanGeneric(uint64_t cursor, const ScanOpts& scan_opts, StringVec* keys
OpStatus OpExpire(const OpArgs& op_args, string_view key, const DbSlice::ExpireParams& params) {
auto& db_slice = op_args.shard->db_slice();
auto [it, expire_it] = db_slice.FindExt(op_args.db_cntx, key);
if (!IsValid(it))
if (!IsValid(it)) {
return OpStatus::KEY_NOTFOUND;
}
auto res = db_slice.UpdateExpire(op_args.db_cntx, it, expire_it, params);
@ -617,7 +618,7 @@ OpStatus OpExpire(const OpArgs& op_args, string_view key, const DbSlice::ExpireP
} else {
auto time = absl::StrCat(res.value());
// Note: Don't forget to change this when adding arguments to expire commands.
RecordJournal(op_args, "PEXPIREAT"sv, ArgSlice{time});
RecordJournal(op_args, "PEXPIREAT"sv, ArgSlice{key, time});
}
}

View file

@ -150,7 +150,8 @@ std::string MakeMonitorMessage(const ConnectionState& conn_state,
if (conn_state.script_info.has_value()) {
absl::StrAppend(&message, " lua] ");
} else {
absl::StrAppend(&message, " ", connection->RemoteEndpointStr(), "] ");
auto endpoint = connection == nullptr ? "REPLICATION:0" : connection->RemoteEndpointStr();
absl::StrAppend(&message, " ", endpoint, "] ");
}
if (args.empty()) {
absl::StrAppend(&message, "error - empty cmd list!");

View file

@ -332,7 +332,7 @@ OpStatus OpMSet(const OpArgs& op_args, ArgSlice args) {
DCHECK(!args.empty() && args.size() % 2 == 0);
SetCmd::SetParams params;
SetCmd sg(op_args);
SetCmd sg(op_args, false);
for (size_t i = 0; i < args.size(); i += 2) {
DVLOG(1) << "MSet " << args[i] << ":" << args[i + 1];
@ -346,11 +346,11 @@ OpStatus OpMSet(const OpArgs& op_args, ArgSlice args) {
}
OpResult<void> SetGeneric(ConnectionContext* cntx, const SetCmd::SetParams& sparams,
string_view key, string_view value) {
string_view key, string_view value, bool manual_journal) {
DCHECK(cntx->transaction);
auto cb = [&](Transaction* t, EngineShard* shard) {
SetCmd sg(t->GetOpArgs(shard));
SetCmd sg(t->GetOpArgs(shard), manual_journal);
return sg.Set(sparams, key, value);
};
return cntx->transaction->ScheduleSingleHop(std::move(cb));
@ -417,6 +417,10 @@ OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value
shard->tiered_storage()->ScheduleOffload(op_args_.db_cntx.db_index, it);
}
if (manual_journal_ && op_args_.shard->journal()) {
RecordJournal(params, key, value);
}
return OpStatus::OK;
}
@ -477,9 +481,32 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt
}
db_slice.PostUpdate(op_args_.db_cntx.db_index, it, key);
if (manual_journal_ && op_args_.shard->journal()) {
RecordJournal(params, key, value);
}
return OpStatus::OK;
}
void SetCmd::RecordJournal(const SetParams& params, string_view key, string_view value) {
absl::InlinedVector<string_view, 5> cmds{}; // 4 is theoretical maximum;
cmds.insert(cmds.end(), {key, value});
std::string exp_str;
if (params.flags & SET_EXPIRE_AFTER_MS) {
exp_str = absl::StrCat(params.expire_after_ms + op_args_.db_cntx.time_now_ms);
cmds.insert(cmds.end(), {"PXAT", exp_str});
} else if (params.flags & SET_KEEP_EXPIRE) {
cmds.push_back("KEEPTTL");
}
// Skip NX/XX because SET operation was exectued.
// Skip GET, because its not important on replica.
dfly::RecordJournal(op_args_, "SET", ArgSlice{cmds});
}
void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
set_qps.Inc();
@ -550,7 +577,7 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
}
}
const auto result{SetGeneric(cntx, sparams, key, value)};
const auto result{SetGeneric(cntx, sparams, key, value, true)};
if (result == OpStatus::OK) {
return builder->SendStored();
@ -569,6 +596,10 @@ void StringFamily::SetEx(CmdArgList args, ConnectionContext* cntx) {
SetExGeneric(true, std::move(args), cntx);
}
void StringFamily::PSetEx(CmdArgList args, ConnectionContext* cntx) {
SetExGeneric(false, std::move(args), cntx);
}
void StringFamily::SetNx(CmdArgList args, ConnectionContext* cntx) {
// This is the same as calling the "Set" function, only in this case we are
// change the value only if the key does not exist. Otherwise the function
@ -580,7 +611,7 @@ void StringFamily::SetNx(CmdArgList args, ConnectionContext* cntx) {
SetCmd::SetParams sparams;
sparams.flags |= SetCmd::SET_IF_NOTEXIST;
sparams.memcache_flags = cntx->conn_state.memcache_flag;
const auto results{SetGeneric(cntx, std::move(sparams), key, value)};
const auto results{SetGeneric(cntx, std::move(sparams), key, value, false)};
SinkReplyBuilder* builder = cntx->reply_builder();
if (results == OpStatus::OK) {
return builder->SendLong(1); // this means that we successfully set the value
@ -657,7 +688,7 @@ void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) {
sparams.prev_val = &prev_val;
auto cb = [&](Transaction* t, EngineShard* shard) {
SetCmd cmd(t->GetOpArgs(shard));
SetCmd cmd(t->GetOpArgs(shard), false);
return cmd.Set(sparams, key, value);
};
@ -720,7 +751,21 @@ void StringFamily::GetEx(CmdArgList args, ConnectionContext* cntx) {
}
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpGet(t->GetOpArgs(shard), key, false, exp_params);
auto op_args = t->GetOpArgs(shard);
auto result = OpGet(op_args, key, false, exp_params);
// Replicate GETEX as PEXPIREAT or PERSIST
if (result.ok() && shard->journal()) {
if (exp_params.persist) {
RecordJournal(op_args, "PERSIST", {key});
} else {
auto [ignore, abs_time] = exp_params.Calculate(op_args.db_cntx.time_now_ms);
auto abs_time_str = absl::StrCat(abs_time);
RecordJournal(op_args, "PEXPIREAT", {key, abs_time_str});
}
}
return result;
};
DVLOG(1) << "Before Get::ScheduleSingleHopT " << key;
@ -888,13 +933,14 @@ void StringFamily::SetExGeneric(bool seconds, CmdArgList args, ConnectionContext
}
SetCmd::SetParams sparams;
sparams.flags |= SetCmd::SET_EXPIRE_AFTER_MS;
if (seconds)
sparams.expire_after_ms = uint64_t(unit_vals) * 1000;
else
sparams.expire_after_ms = unit_vals;
auto cb = [&](Transaction* t, EngineShard* shard) {
SetCmd sg(t->GetOpArgs(shard));
SetCmd sg(t->GetOpArgs(shard), true);
return sg.Set(sparams, key, value);
};
@ -1096,10 +1142,6 @@ void StringFamily::SetRange(CmdArgList args, ConnectionContext* cntx) {
}
}
void StringFamily::PSetEx(CmdArgList args, ConnectionContext* cntx) {
SetExGeneric(false, std::move(args), cntx);
}
auto StringFamily::OpMGet(bool fetch_mcflag, bool fetch_mcver, const Transaction* t,
EngineShard* shard) -> MGetResponse {
auto args = t->GetShardArgs(shard->shard_id());
@ -1141,29 +1183,30 @@ void StringFamily::Shutdown() {
#define HFUNC(x) SetHandler(&StringFamily::x)
void StringFamily::Register(CommandRegistry* registry) {
*registry << CI{"SET", CO::WRITE | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(Set)
<< CI{"SETEX", CO::WRITE | CO::DENYOOM, 4, 1, 1, 1}.HFUNC(SetEx)
<< CI{"SETNX", CO::WRITE | CO::DENYOOM, 3, 1, 1, 1}.HFUNC(SetNx)
<< CI{"PSETEX", CO::WRITE | CO::DENYOOM, 4, 1, 1, 1}.HFUNC(PSetEx)
<< CI{"APPEND", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(Append)
<< CI{"PREPEND", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(Prepend)
<< CI{"INCR", CO::WRITE | CO::DENYOOM | CO::FAST, 2, 1, 1, 1}.HFUNC(Incr)
<< CI{"DECR", CO::WRITE | CO::DENYOOM | CO::FAST, 2, 1, 1, 1}.HFUNC(Decr)
<< CI{"INCRBY", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, 1}.HFUNC(IncrBy)
<< CI{"INCRBYFLOAT", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, 1}.HFUNC(IncrByFloat)
<< CI{"DECRBY", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, 1}.HFUNC(DecrBy)
<< CI{"GET", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Get)
<< CI{"GETDEL", CO::WRITE | CO::DENYOOM | CO::FAST, 2, 1, 1, 1}.HFUNC(GetDel)
<< CI{"GETEX", CO::WRITE | CO::DENYOOM | CO::FAST, -1, 1, 1, 1}.HFUNC(GetEx)
<< CI{"GETSET", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, 1}.HFUNC(GetSet)
<< CI{"MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING, -2, 1, -1, 1}.HFUNC(MGet)
<< CI{"MSET", CO::WRITE | CO::DENYOOM, -3, 1, -1, 2}.HFUNC(MSet)
<< CI{"MSETNX", CO::WRITE | CO::DENYOOM, -3, 1, -1, 2}.HFUNC(MSetNx)
<< CI{"STRLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(StrLen)
<< CI{"GETRANGE", CO::READONLY | CO::FAST, 4, 1, 1, 1}.HFUNC(GetRange)
<< CI{"SUBSTR", CO::READONLY | CO::FAST, 4, 1, 1, 1}.HFUNC(
GetRange) // Alias for GetRange
<< CI{"SETRANGE", CO::WRITE | CO::FAST | CO::DENYOOM, 4, 1, 1, 1}.HFUNC(SetRange);
*registry
<< CI{"SET", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, -3, 1, 1, 1}.HFUNC(Set)
<< CI{"SETEX", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, 4, 1, 1, 1}.HFUNC(SetEx)
<< CI{"PSETEX", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, 4, 1, 1, 1}.HFUNC(PSetEx)
<< CI{"SETNX", CO::WRITE | CO::DENYOOM, 3, 1, 1, 1}.HFUNC(SetNx)
<< CI{"APPEND", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(Append)
<< CI{"PREPEND", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(Prepend)
<< CI{"INCR", CO::WRITE | CO::DENYOOM | CO::FAST, 2, 1, 1, 1}.HFUNC(Incr)
<< CI{"DECR", CO::WRITE | CO::DENYOOM | CO::FAST, 2, 1, 1, 1}.HFUNC(Decr)
<< CI{"INCRBY", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, 1}.HFUNC(IncrBy)
<< CI{"INCRBYFLOAT", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, 1}.HFUNC(IncrByFloat)
<< CI{"DECRBY", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, 1}.HFUNC(DecrBy)
<< CI{"GET", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Get)
<< CI{"GETDEL", CO::WRITE | CO::DENYOOM | CO::FAST, 2, 1, 1, 1}.HFUNC(GetDel)
<< CI{"GETEX", CO::WRITE | CO::DENYOOM | CO::FAST | CO::NO_AUTOJOURNAL, -1, 1, 1, 1}.HFUNC(
GetEx)
<< CI{"GETSET", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, 1}.HFUNC(GetSet)
<< CI{"MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING, -2, 1, -1, 1}.HFUNC(MGet)
<< CI{"MSET", CO::WRITE | CO::DENYOOM, -3, 1, -1, 2}.HFUNC(MSet)
<< CI{"MSETNX", CO::WRITE | CO::DENYOOM, -3, 1, -1, 2}.HFUNC(MSetNx)
<< CI{"STRLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(StrLen)
<< CI{"GETRANGE", CO::READONLY | CO::FAST, 4, 1, 1, 1}.HFUNC(GetRange)
<< CI{"SUBSTR", CO::READONLY | CO::FAST, 4, 1, 1, 1}.HFUNC(GetRange) // Alias for GetRange
<< CI{"SETRANGE", CO::WRITE | CO::FAST | CO::DENYOOM, 4, 1, 1, 1}.HFUNC(SetRange);
}
} // namespace dfly

View file

@ -17,9 +17,11 @@ using facade::OpStatus;
class SetCmd {
const OpArgs op_args_;
bool manual_journal_;
public:
explicit SetCmd(const OpArgs& op_args) : op_args_(op_args) {
explicit SetCmd(const OpArgs& op_args, bool manual_journal)
: op_args_(op_args), manual_journal_{manual_journal} {
}
enum SetFlags {
@ -48,6 +50,8 @@ class SetCmd {
private:
OpStatus SetExisting(const SetParams& params, PrimeIterator it, ExpireIterator e_it,
std::string_view key, std::string_view value);
void RecordJournal(const SetParams& params, std::string_view key, std::string_view value);
};
class StringFamily {

View file

@ -4,6 +4,7 @@ import asyncio
import aioredis
import random
from itertools import chain, repeat
import re
from .utility import *
from . import dfly_args
@ -368,3 +369,92 @@ async def test_flushall(df_local_factory):
pipe.get(f"key-{i}")
vals = await pipe.execute()
assert all(v is not None for v in vals)
"""
Test journal rewrites.
"""
@dfly_args({"proactor_threads": 1})
@pytest.mark.asyncio
async def test_rewrites(df_local_factory):
CLOSE_TIMESTAMP = (int(time.time()) + 100)
CLOSE_TIMESTAMP_MS = CLOSE_TIMESTAMP * 1000
master = df_local_factory.create(port=BASE_PORT)
replica = df_local_factory.create(port=BASE_PORT+1)
master.start()
replica.start()
# Connect clients, connect replica to master
c_master = aioredis.Redis(port=master.port)
c_replica = aioredis.Redis(port=replica.port)
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica)
# Make sure journal writer sends its first SELECT command on its only shard
await c_master.set("no-select-please", "ok")
await asyncio.sleep(0.5)
# Create monitor and bind utility functions
m_replica = c_replica.monitor()
async def check_rsp(rx):
print("waiting on", rx)
mcmd = (await m_replica.next_command())['command']
print("Got:", mcmd, "Regex", rx)
assert re.match(rx, mcmd)
async def skip_cmd():
await check_rsp(r".*")
async def check(cmd, rx):
await c_master.execute_command(cmd)
await check_rsp(rx)
async def check_expire(key):
ttl1 = await c_master.ttl(key)
ttl2 = await c_replica.ttl(key)
await skip_cmd()
assert abs(ttl1-ttl2) <= 1
async with m_replica:
# CHECK EXPIRE, PEXPIRE, PEXPIRE turn into EXPIREAT
await c_master.set("k-exp", "v")
await skip_cmd()
await check("EXPIRE k-exp 100", r"PEXPIREAT k-exp (.*?)")
await check_expire("k-exp")
await check("PEXPIRE k-exp 50000", r"PEXPIREAT k-exp (.*?)")
await check_expire("k-exp")
await check(f"EXPIREAT k-exp {CLOSE_TIMESTAMP}", rf"PEXPIREAT k-exp {CLOSE_TIMESTAMP_MS}")
# Check SPOP turns into SREM or SDEL
await c_master.sadd("k-set", "v1", "v2", "v3")
await skip_cmd()
await check("SPOP k-set 1", r"SREM k-set (v1|v2|v3)")
await check("SPOP k-set 2", r"DEL k-set")
# Check SET + {EX/PX/EXAT} + {XX/NX/GET} arguments turns into SET PXAT
await check(f"SET k v EX 100 NX GET", r"SET k v PXAT (.*?)")
await check_expire("k")
await check(f"SET k v PX 50000", r"SET k v PXAT (.*?)")
await check_expire("k")
# Exact expiry is skewed
await check(f"SET k v XX EXAT {CLOSE_TIMESTAMP}", rf"SET k v PXAT (.*?)")
await check_expire("k")
# Check SET + KEEPTTL doesn't loose KEEPTTL
await check(f"SET k v KEEPTTL", r"SET k v KEEPTTL")
# Check SETEX/PSETEX turn into SET PXAT
await check("SETEX k 100 v", r"SET k v PXAT (.*?)")
await check_expire("k")
await check("PSETEX k 500000 v", r"SET k v PXAT (.*?)")
await check_expire("k")
# Check GETEX turns into PEXPIREAT or PERSIST
await check("GETEX k PERSIST", r"PERSIST k")
await check_expire("k")
await check("GETEX k EX 100", r"PEXPIREAT k (.*?)")
await check_expire("k")