feat(server): Rewrite journal commands (basic) (#651)

This commit is contained in:
Vladislav 2023-01-16 14:52:46 +03:00 committed by GitHub
parent 96100382b9
commit 1eb227e318
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
15 changed files with 127 additions and 57 deletions

View file

@ -111,6 +111,8 @@ const char* OptName(CO::CommandOpt fl) {
return "global-trans";
case VARIADIC_KEYS:
return "variadic-keys";
case NO_AUTOJOURNAL:
return "custom-journal";
}
return "unknown";
}

View file

@ -19,20 +19,21 @@ class ConnectionContext;
namespace CO {
enum CommandOpt : uint32_t {
READONLY = 1,
FAST = 2,
WRITE = 4,
LOADING = 8,
DENYOOM = 0x10, // use-memory in redis.
REVERSE_MAPPING = 0x20,
READONLY = 1U << 0,
FAST = 1U << 1,
WRITE = 1U << 2,
LOADING = 1U << 3,
DENYOOM = 1U << 4, // use-memory in redis.
REVERSE_MAPPING = 1U << 5,
// arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc.
VARIADIC_KEYS = 0x40,
VARIADIC_KEYS = 1U << 6, // arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc.
ADMIN = 0x80, // implies NOSCRIPT,
NOSCRIPT = 0x100,
BLOCKING = 0x200, // implies REVERSE_MAPPING
GLOBAL_TRANS = 0x1000,
ADMIN = 1U << 7, // implies NOSCRIPT,
NOSCRIPT = 1U << 8,
BLOCKING = 1U << 9, // implies REVERSE_MAPPING
GLOBAL_TRANS = 1U << 12,
NO_AUTOJOURNAL = 1U << 15, // Skip automatically logging command to journal inside transaction.
};
const char* OptName(CommandOpt fl);

View file

@ -19,8 +19,11 @@ extern "C" {
}
#include "base/logging.h"
#include "core/compact_object.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/journal.h"
#include "server/server_state.h"
#include "server/transaction.h"
namespace dfly {
@ -190,6 +193,10 @@ bool ParseDouble(string_view src, double* value) {
return true;
}
void OpArgs::RecordJournal(string_view cmd, ArgSlice args) const {
tx->LogJournalOnShard(shard, make_pair(cmd, args));
}
#define ADD(x) (x) += o.x
TieredStats& TieredStats::operator+=(const TieredStats& o) {

View file

@ -81,14 +81,18 @@ struct DbContext {
struct OpArgs {
EngineShard* shard;
TxId txid;
const Transaction* tx;
DbContext db_cntx;
OpArgs() : shard(nullptr), txid(0) {
OpArgs() : shard(nullptr), tx(nullptr) {
}
OpArgs(EngineShard* s, TxId i, const DbContext& cntx) : shard(s), txid(i), db_cntx(cntx) {
OpArgs(EngineShard* s, const Transaction* tx, const DbContext& cntx)
: shard(s), tx(tx), db_cntx(cntx) {
}
// Log single-shard journal command with own txid and dbid.
void RecordJournal(std::string_view cmd, ArgSlice args) const;
};
struct TieredStats {

View file

@ -13,6 +13,7 @@ extern "C" {
#include "base/logging.h"
#include "server/engine_shard_set.h"
#include "server/journal/journal.h"
#include "server/server_state.h"
#include "server/tiered_storage.h"
#include "util/fiber_sched_algo.h"
@ -587,27 +588,33 @@ PrimeIterator DbSlice::AddNew(const Context& cntx, string_view key, PrimeValue o
return it;
}
OpStatus DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime_it,
ExpireIterator expire_it, const ExpireParams& params) {
pair<int64_t, int64_t> DbSlice::ExpireParams::Calculate(int64_t now_ms) const {
int64_t msec = (unit == TimeUnit::SEC) ? value * 1000 : value;
int64_t now_msec = now_ms;
int64_t rel_msec = absolute ? msec - now_msec : msec;
return make_pair(rel_msec, now_msec + rel_msec);
}
OpResult<int64_t> DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime_it,
ExpireIterator expire_it, const ExpireParams& params) {
DCHECK(params.IsDefined());
DCHECK(IsValid(prime_it));
int64_t msec = (params.unit == TimeUnit::SEC) ? params.value * 1000 : params.value;
int64_t now_msec = cntx.time_now_ms;
int64_t rel_msec = params.absolute ? msec - now_msec : msec;
auto [rel_msec, abs_msec] = params.Calculate(cntx.time_now_ms);
if (rel_msec > kMaxExpireDeadlineSec * 1000) {
return OpStatus::OUT_OF_RANGE;
}
if (rel_msec <= 0 && !params.persist) {
CHECK(Del(cntx.db_index, prime_it));
return -1;
} else if (IsValid(expire_it)) {
expire_it->second = FromAbsoluteTime(now_msec + rel_msec);
expire_it->second = FromAbsoluteTime(abs_msec);
return abs_msec;
} else {
UpdateExpire(cntx.db_index, prime_it, params.persist ? 0 : rel_msec + now_msec);
UpdateExpire(cntx.db_index, prime_it, params.persist ? 0 : abs_msec);
return params.persist ? 0 : abs_msec;
}
return OpStatus::OK;
}
std::pair<PrimeIterator, bool> DbSlice::AddOrUpdateInternal(const Context& cntx,

View file

@ -97,6 +97,9 @@ class DbSlice {
bool IsDefined() const {
return persist || value > INT64_MIN;
}
// Calculate relative and absolue timepoints.
std::pair<int64_t, int64_t> Calculate(int64_t now_msec) const;
};
DbSlice(uint32_t index, bool caching_mode, EngineShard* owner);
@ -171,8 +174,10 @@ class DbSlice {
PrimeIterator AddNew(const Context& cntx, std::string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false);
facade::OpStatus UpdateExpire(const Context& cntx, PrimeIterator prime_it, ExpireIterator exp_it,
const ExpireParams& params);
// Update entry expiration. Return epxiration timepoint in abs milliseconds, or -1 if the entry
// already expired and was deleted;
facade::OpResult<int64_t> UpdateExpire(const Context& cntx, PrimeIterator prime_it,
ExpireIterator exp_it, const ExpireParams& params);
// Adds expiry information.
void AddExpire(DbIndex db_ind, PrimeIterator main_it, uint64_t at);

View file

@ -19,6 +19,7 @@ extern "C" {
#include "server/container_utils.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/journal.h"
#include "server/rdb_extensions.h"
#include "server/rdb_load.h"
#include "server/rdb_save.h"
@ -563,7 +564,6 @@ uint64_t ScanGeneric(uint64_t cursor, const ScanOpts& scan_opts, StringVec* keys
do {
ess->Await(sid, [&] {
OpArgs op_args{EngineShard::tlocal(), 0, db_cntx};
OpScan(op_args, scan_opts, &cursor, keys);
});
if (cursor == 0) {
@ -588,7 +588,21 @@ OpStatus OpExpire(const OpArgs& op_args, string_view key, const DbSlice::ExpireP
if (!IsValid(it))
return OpStatus::KEY_NOTFOUND;
return db_slice.UpdateExpire(op_args.db_cntx, it, expire_it, params);
auto res = db_slice.UpdateExpire(op_args.db_cntx, it, expire_it, params);
// If the value was deleted, replicate as DEL.
// Else, replicate as PEXPIREAT with exact time.
if (auto journal = op_args.shard->journal(); journal && res.ok()) {
if (res.value() == -1) {
op_args.RecordJournal("DEL"sv, ArgSlice{key});
} else {
auto time = absl::StrCat(res.value());
// Note: Don't forget to change this when adding arguments to expire commands.
op_args.RecordJournal("PEXPIREAT"sv, ArgSlice{time});
}
}
return res.status();
}
} // namespace
@ -1398,12 +1412,13 @@ void GenericFamily::Register(CommandRegistry* registry) {
<< CI{"ECHO", CO::LOADING | CO::FAST, 2, 0, 0, 0}.HFUNC(Echo)
<< CI{"EXISTS", CO::READONLY | CO::FAST, -2, 1, -1, 1}.HFUNC(Exists)
<< CI{"TOUCH", CO::READONLY | CO::FAST, -2, 1, -1, 1}.HFUNC(Exists)
<< CI{"EXPIRE", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(Expire)
<< CI{"EXPIREAT", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(ExpireAt)
<< CI{"EXPIRE", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(Expire)
<< CI{"EXPIREAT", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(ExpireAt)
<< CI{"PERSIST", CO::WRITE | CO::FAST, 2, 1, 1, 1}.HFUNC(Persist)
<< CI{"KEYS", CO::READONLY, 2, 0, 0, 0}.HFUNC(Keys)
<< CI{"PEXPIREAT", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(PexpireAt)
<< CI{"PEXPIRE", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(Pexpire)
<< CI{"PEXPIREAT", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(
PexpireAt)
<< CI{"PEXPIRE", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(Pexpire)
<< CI{"RENAME", CO::WRITE, 3, 1, 2, 1}.HFUNC(Rename)
<< CI{"RENAMENX", CO::WRITE, 3, 1, 2, 1}.HFUNC(RenameNx)
<< CI{"SELECT", kSelectOpts, 2, 0, 0, 0}.HFUNC(Select)

View file

@ -111,8 +111,9 @@ bool Journal::EnterLameDuck() {
return res;
}
void Journal::RecordEntry(const Entry& entry) {
journal_slice.AddLogRecord(entry);
void Journal::RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt,
Entry::Payload payload) {
journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, std::move(payload)});
}
TxId Journal::GetLastTxId() {

View file

@ -53,7 +53,8 @@ class Journal {
*/
LSN GetLsn() const;
void RecordEntry(const Entry& entry);
void RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, Entry::Payload payload);
TxId GetLastTxId();
private:

View file

@ -20,6 +20,7 @@ extern "C" {
#include "server/container_utils.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/journal.h"
#include "server/transaction.h"
ABSL_DECLARE_FLAG(bool, use_set2);
@ -959,8 +960,13 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, unsigned count
return true;
});
/* Delete the set as it is now empty */
// Delete the set as it is now empty
CHECK(db_slice.Del(op_args.db_cntx.db_index, it));
// Replicate as DEL.
if (auto journal = op_args.shard->journal(); journal) {
op_args.RecordJournal("DEL"sv, ArgSlice{key});
}
} else {
SetType st{it->second.RObjPtr(), it->second.Encoding()};
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
@ -979,6 +985,15 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, unsigned count
} else {
result = PopStrSet(op_args.db_cntx, count, st);
}
// Replicate as SREM with removed keys, because SPOP is not deterministic.
if (auto journal = op_args.shard->journal(); journal) {
vector<string_view> mapped(result.size() + 1);
mapped[0] = key;
std::copy(result.begin(), result.end(), mapped.begin() + 1);
op_args.RecordJournal("SREM"sv, mapped);
}
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);
}
return result;
@ -1545,7 +1560,7 @@ void SetFamily::Register(CommandRegistry* registry) {
<< CI{"SMOVE", CO::FAST | CO::WRITE, 4, 1, 2, 1}.HFUNC(SMove)
<< CI{"SREM", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(SRem)
<< CI{"SCARD", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(SCard)
<< CI{"SPOP", CO::WRITE | CO::FAST, -2, 1, 1, 1}.HFUNC(SPop)
<< CI{"SPOP", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, -2, 1, 1, 1}.HFUNC(SPop)
<< CI{"SUNION", CO::READONLY, -2, 1, -1, 1}.HFUNC(SUnion)
<< CI{"SUNIONSTORE", CO::WRITE | CO::DENYOOM, -3, 1, -1, 1}.HFUNC(SUnionStore)
<< CI{"SSCAN", CO::READONLY, -3, 1, 1, 1}.HFUNC(SScan);

View file

@ -211,7 +211,7 @@ OpResult<string> OpGet(const OpArgs& op_args, string_view key, bool del_hit = fa
if (exp_params.IsDefined()) {
DVLOG(1) << "Expire: " << key;
auto& db_slice = op_args.shard->db_slice();
OpStatus status = db_slice.UpdateExpire(op_args.db_cntx, it, it_expire, exp_params);
OpStatus status = db_slice.UpdateExpire(op_args.db_cntx, it, it_expire, exp_params).status();
if (status != OpStatus::OK)
return status;
}

View file

@ -373,7 +373,7 @@ bool Transaction::RunInShard(EngineShard* shard) {
/*************************************************************************/
if (!was_suspended && is_concluding) // Check last hop & non suspended.
LogJournalOnShard(shard);
LogAutoJournalOnShard(shard);
// at least the coordinator thread owns the reference.
DCHECK_GE(use_count(), 1u);
@ -801,7 +801,7 @@ void Transaction::RunQuickie(EngineShard* shard) {
LOG(FATAL) << "Unexpected exception " << e.what();
}
LogJournalOnShard(shard);
LogAutoJournalOnShard(shard);
sd.local_mask &= ~ARMED;
cb_ = nullptr; // We can do it because only a single shard runs the callback.
@ -1093,7 +1093,7 @@ void Transaction::UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard
void Transaction::UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, EngineShard* shard) {
auto journal = shard->journal();
if (journal != nullptr && journal->GetLastTxId() == txid_) {
journal->RecordEntry(journal::Entry{txid_, journal::Op::EXEC, db_index_, unique_shard_cnt_});
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, unique_shard_cnt_, {});
}
if (multi_->multi_opts & CO::GLOBAL_TRANS) {
@ -1216,12 +1216,13 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) {
return false;
}
void Transaction::LogJournalOnShard(EngineShard* shard) {
void Transaction::LogAutoJournalOnShard(EngineShard* shard) {
// TODO: For now, we ignore non shard coordination.
if (shard == nullptr)
return;
if ((cid_->opt_mask() & CO::WRITE) == 0)
// Ignore non-write commands or ones with disabled autojournal.
if ((cid_->opt_mask() & CO::WRITE) == 0 || (cid_->opt_mask() & CO::NO_AUTOJOURNAL) > 0)
return;
auto journal = shard->journal();
@ -1234,15 +1235,18 @@ void Transaction::LogJournalOnShard(EngineShard* shard) {
CHECK(!cmd_with_full_args_.empty());
entry_payload = cmd_with_full_args_;
} else {
entry_payload =
make_pair(facade::ToSV(cmd_with_full_args_.front()), ShardArgsInShard(shard->shard_id()));
}
journal::Op opcode = journal::Op::COMMAND;
if (multi_) {
opcode = journal::Op::MULTI_COMMAND;
auto cmd = facade::ToSV(cmd_with_full_args_.front());
entry_payload = make_pair(cmd, ShardArgsInShard(shard->shard_id()));
}
journal->RecordEntry(journal::Entry{txid_, opcode, db_index_, unique_shard_cnt_, entry_payload});
LogJournalOnShard(shard, std::move(entry_payload));
}
void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload) const {
auto journal = shard->journal();
CHECK(journal);
auto opcode = multi_ ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND;
journal->RecordEntry(txid_, opcode, db_index_, unique_shard_cnt_, std::move(payload));
}
void Transaction::BreakOnShutdown() {

View file

@ -16,6 +16,7 @@
#include "core/tx_queue.h"
#include "facade/op_status.h"
#include "server/common.h"
#include "server/journal/types.h"
#include "server/table.h"
#include "util/fibers/fibers_ext.h"
@ -181,7 +182,7 @@ class Transaction {
KeyLockArgs GetLockArgs(ShardId sid) const;
OpArgs GetOpArgs(EngineShard* shard) const {
return OpArgs{shard, txid_, db_context()};
return OpArgs{shard, this, db_context()};
}
DbContext db_context() const {
@ -192,6 +193,9 @@ class Transaction {
return db_index_;
}
// Log a journal entry on shard with payload.
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload) const;
private:
struct LockCnt {
unsigned cnt[2] = {0, 0};
@ -245,9 +249,9 @@ class Transaction {
return use_count_.load(std::memory_order_relaxed);
}
// If needed, notify the jounral of the executed command on the given shard.
// Log command in the journal of a shard for write commands with auto-journaling enabled.
// Should be called immediately after the last phase (hop).
void LogJournalOnShard(EngineShard* shard);
void LogAutoJournalOnShard(EngineShard* shard);
struct PerShardData {
uint32_t arg_start = 0; // Indices into args_ array.
@ -266,6 +270,7 @@ class Transaction {
PerShardData() = default;
};
enum { kPerShardSize = sizeof(PerShardData) };
struct Multi {

View file

@ -31,6 +31,7 @@ replication_cases = [
(1, [1], dict(keys=100, dbcount=2)),
]
@pytest.mark.asyncio
@pytest.mark.parametrize("t_master, t_replicas, seeder_config", replication_cases)
async def test_replication_all(df_local_factory, df_seeder_factory, t_master, t_replicas, seeder_config):
@ -316,12 +317,12 @@ async def test_disconnect_master(df_local_factory, df_seeder_factory, t_master,
assert await seeder.compare(capture, port=replica.port)
"""
Test flushall command. Set data to master send flashall and set more data.
Check replica keys at the end.
"""
@pytest.mark.asyncio
async def test_flushall(df_local_factory):
master = df_local_factory.create(port=BASE_PORT, proactor_threads=4)
@ -335,6 +336,7 @@ async def test_flushall(df_local_factory):
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
n_keys = 1000
def gen_test_data(start, end):
for i in range(start, end):
yield f"key-{i}", f"value-{i}"
@ -346,7 +348,8 @@ async def test_flushall(df_local_factory):
# flushall
pipe.flushall()
# Set simple keys n_keys..n_keys*2 on master
batch_fill_data(client=pipe, gen=gen_test_data(n_keys, n_keys*2), batch_size=3)
batch_fill_data(client=pipe, gen=gen_test_data(
n_keys, n_keys*2), batch_size=3)
await pipe.execute()

View file

@ -189,8 +189,8 @@ class CommandGenerator:
('SETRANGE {k} 10 {val}', ValueType.STRING),
('LPUSH {k} {val}', ValueType.LIST),
('LPOP {k}', ValueType.LIST),
# ('SADD {k} {val}', ValueType.SET),
# ('SPOP {k}', ValueType.SET),
('SADD {k} {val}', ValueType.SET),
('SPOP {k}', ValueType.SET),
# ('HSETNX {k} v0 {val}', ValueType.HSET),
# ('HINCRBY {k} v1 1', ValueType.HSET),
# ('ZPOPMIN {k} 1', ValueType.ZSET),