feat(list family): impl rename journal commands (#704)

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2023-01-19 14:34:25 +02:00 committed by GitHub
parent 160aeaa2d8
commit 086edd9707
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 55 additions and 32 deletions

View file

@ -1224,9 +1224,7 @@ OpResult<void> GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des
auto cb = [&](Transaction* t, EngineShard* shard) {
auto ec = OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest);
// Incase of uniqe shard count we can use rename command in replica.
if (ec.ok() && shard->journal()) {
RecordJournal(t->GetOpArgs(shard), "RENAME", {key[0], key[1]});
}
t->RenableAutoJournal();
return ec;
};
OpResult<void> result = transaction->ScheduleSingleHopT(std::move(cb));

View file

@ -381,7 +381,8 @@ OpResult<string> Peek(const OpArgs& op_args, string_view key, ListDir dir, bool
}
OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir dir,
bool skip_notexist, absl::Span<std::string_view> vals) {
bool skip_notexist, absl::Span<std::string_view> vals,
bool journal_rewrite) {
DVLOG(1) << "OpPush " << key;
EngineShard* es = op_args.shard;
@ -433,12 +434,19 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
} else {
es->db_slice().PostUpdate(op_args.db_cntx.db_index, it, key, true);
}
if (journal_rewrite && op_args.shard->journal()) {
string command = dir == ListDir::LEFT ? "LPUSH" : "RPUSH";
vector<string_view> mapped(vals.size() + 1);
mapped[0] = key;
std::copy(vals.begin(), vals.end(), mapped.begin() + 1);
RecordJournal(op_args, command, mapped, 2);
}
return quicklistCount(ql);
}
OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, ListDir dir, uint32_t count,
bool return_results) {
bool return_results, bool journal_rewrite) {
auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> it_res = db_slice.Find(op_args.db_cntx, key, OBJ_LIST);
if (!it_res)
@ -469,7 +477,10 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, ListDir dir, u
if (quicklistCount(ql) == 0) {
CHECK(db_slice.Del(op_args.db_cntx.db_index, it));
}
if (op_args.shard->journal() && journal_rewrite) {
string command = dir == ListDir::LEFT ? "LPOP" : "RPOP";
RecordJournal(op_args, command, ArgSlice{key}, 2);
}
return res;
}
@ -512,9 +523,9 @@ OpResult<string> MoveTwoShards(Transaction* trans, string_view src, string_view
if (is_dest) {
string_view val{find_res[0].value()};
absl::Span<string_view> span{&val, 1};
OpPush(op_args, args.front(), dest_dir, false, span);
OpPush(op_args, args.front(), dest_dir, false, span, true);
} else {
OpPop(op_args, args.front(), src_dir, 1, false);
OpPop(op_args, args.front(), src_dir, 1, false, true);
}
return OpStatus::OK;
};
@ -784,7 +795,10 @@ void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, Lis
if (cntx->transaction->GetUniqueShardCnt() == 1) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpMoveSingleShard(t->GetOpArgs(shard), src, dest, src_dir, dest_dir);
auto ec = OpMoveSingleShard(t->GetOpArgs(shard), src, dest, src_dir, dest_dir);
// On single shard we can use the auto journal flow.
t->RenableAutoJournal();
return ec;
};
result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
@ -869,6 +883,7 @@ OpResult<string> BPopPusher::RunSingle(Transaction* t, time_point tp) {
bool is_multi = t->IsMulti();
auto cb_move = [&](Transaction* t, EngineShard* shard) {
op_res = OpMoveSingleShard(t->GetOpArgs(shard), pop_key_, push_key_, popdir_, pushdir_);
t->RenableAutoJournal(); // With single shard run auto journal flow.
return OpStatus::OK;
};
t->Execute(cb_move, false);
@ -1248,7 +1263,7 @@ void ListFamily::PushGeneric(ListDir dir, bool skip_notexists, CmdArgList args,
}
absl::Span<std::string_view> span{vals.data(), vals.size()};
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpPush(t->GetOpArgs(shard), key, dir, skip_notexists, span);
return OpPush(t->GetOpArgs(shard), key, dir, skip_notexists, span, false);
};
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
@ -1282,7 +1297,7 @@ void ListFamily::PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cnt
}
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpPop(t->GetOpArgs(shard), key, dir, count, true);
return OpPop(t->GetOpArgs(shard), key, dir, count, true, false);
};
OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
@ -1315,26 +1330,28 @@ using CI = CommandId;
#define HFUNC(x) SetHandler(&ListFamily::x)
void ListFamily::Register(CommandRegistry* registry) {
*registry << CI{"LPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(LPush)
<< CI{"LPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(LPushX)
<< CI{"LPOP", CO::WRITE | CO::FAST | CO::DENYOOM, -2, 1, 1, 1}.HFUNC(LPop)
<< CI{"RPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPush)
<< CI{"RPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPushX)
<< CI{"RPOP", CO::WRITE | CO::FAST | CO::DENYOOM, -2, 1, 1, 1}.HFUNC(RPop)
<< CI{"RPOPLPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, 3, 1, 2, 1}.SetHandler(RPopLPush)
<< CI{"BRPOPLPUSH", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, 4, 1, 2, 1}.SetHandler(
BRPopLPush)
<< CI{"BLPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BLPop)
<< CI{"BRPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BRPop)
<< CI{"LLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(LLen)
<< CI{"LPOS", CO::READONLY | CO::FAST, -3, 1, 1, 1}.HFUNC(LPos)
<< CI{"LINDEX", CO::READONLY, 3, 1, 1, 1}.HFUNC(LIndex)
<< CI{"LINSERT", CO::WRITE, 5, 1, 1, 1}.HFUNC(LInsert)
<< CI{"LRANGE", CO::READONLY, 4, 1, 1, 1}.HFUNC(LRange)
<< CI{"LSET", CO::WRITE | CO::DENYOOM, 4, 1, 1, 1}.HFUNC(LSet)
<< CI{"LTRIM", CO::WRITE, 4, 1, 1, 1}.HFUNC(LTrim)
<< CI{"LREM", CO::WRITE, 4, 1, 1, 1}.HFUNC(LRem)
<< CI{"LMOVE", CO::WRITE | CO::DENYOOM, 5, 1, 2, 1}.HFUNC(LMove);
*registry
<< CI{"LPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(LPush)
<< CI{"LPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(LPushX)
<< CI{"LPOP", CO::WRITE | CO::FAST | CO::DENYOOM, -2, 1, 1, 1}.HFUNC(LPop)
<< CI{"RPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPush)
<< CI{"RPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPushX)
<< CI{"RPOP", CO::WRITE | CO::FAST | CO::DENYOOM, -2, 1, 1, 1}.HFUNC(RPop)
<< CI{"RPOPLPUSH", CO::WRITE | CO::FAST | CO::DENYOOM | CO::NO_AUTOJOURNAL, 3, 1, 2, 1}
.SetHandler(RPopLPush)
<< CI{"BRPOPLPUSH", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING | CO::NO_AUTOJOURNAL, 4, 1, 2, 1}
.SetHandler(BRPopLPush)
<< CI{"BLPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BLPop)
<< CI{"BRPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BRPop)
<< CI{"LLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(LLen)
<< CI{"LPOS", CO::READONLY | CO::FAST, -3, 1, 1, 1}.HFUNC(LPos)
<< CI{"LINDEX", CO::READONLY, 3, 1, 1, 1}.HFUNC(LIndex)
<< CI{"LINSERT", CO::WRITE, 5, 1, 1, 1}.HFUNC(LInsert)
<< CI{"LRANGE", CO::READONLY, 4, 1, 1, 1}.HFUNC(LRange)
<< CI{"LSET", CO::WRITE | CO::DENYOOM, 4, 1, 1, 1}.HFUNC(LSet)
<< CI{"LTRIM", CO::WRITE, 4, 1, 1, 1}.HFUNC(LTrim)
<< CI{"LREM", CO::WRITE, 4, 1, 1, 1}.HFUNC(LRem)
<< CI{"LMOVE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, 5, 1, 2, 1}.HFUNC(LMove);
}
} // namespace dfly

View file

@ -1209,7 +1209,8 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) {
return;
// Ignore non-write commands or ones with disabled autojournal.
if ((cid_->opt_mask() & CO::WRITE) == 0 || (cid_->opt_mask() & CO::NO_AUTOJOURNAL) > 0)
if ((cid_->opt_mask() & CO::WRITE) == 0 || ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) > 0 &&
!renabled_auto_journal_.load(memory_order_relaxed)))
return;
auto journal = shard->journal();

View file

@ -124,6 +124,11 @@ class Transaction {
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload,
uint32_t shard_cnt) const;
// In some cases for non auto-journaling commands we want to enable the auto journal flow.
void RenableAutoJournal() {
renabled_auto_journal_.store(true, std::memory_order_relaxed);
}
// Unlock key locks of a multi transaction.
void UnlockMulti();
@ -349,6 +354,8 @@ class Transaction {
// Stores the full undivided command.
CmdArgList cmd_with_full_args_;
std::atomic<bool> renabled_auto_journal_ =
false; // True if NO_AUTOJOURNAL command asked to enable auto journal
// Reverse argument mapping for ReverseArgIndex to convert from shard index to original index.
std::vector<uint32_t> reverse_index_;