(generic family): journal support rename command (#698)

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2023-01-18 12:14:18 +02:00 committed by GitHub
parent be74fa0a5b
commit f175127837
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 77 additions and 23 deletions

View file

@ -577,6 +577,10 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) {
if (find_res == OpStatus::OK) { if (find_res == OpStatus::OK) {
operation.Commit(op_result); operation.Commit(op_result);
} }
if (shard->journal()) {
RecordJournal(t->GetOpArgs(shard), "SET", {dest_key, op_result});
}
} }
return OpStatus::OK; return OpStatus::OK;
}; };
@ -690,7 +694,7 @@ void BitOpsFamily::Register(CommandRegistry* registry) {
<< CI{"BITCOUNT", CO::READONLY, -2, 1, 1, 1}.SetHandler(&BitCount) << CI{"BITCOUNT", CO::READONLY, -2, 1, 1, 1}.SetHandler(&BitCount)
<< CI{"BITFIELD", CO::WRITE, -3, 1, 1, 1}.SetHandler(&BitField) << CI{"BITFIELD", CO::WRITE, -3, 1, 1, 1}.SetHandler(&BitField)
<< CI{"BITFIELD_RO", CO::READONLY, -5, 1, 1, 1}.SetHandler(&BitFieldRo) << CI{"BITFIELD_RO", CO::READONLY, -5, 1, 1, 1}.SetHandler(&BitFieldRo)
<< CI{"BITOP", CO::WRITE, -4, 2, -1, 1}.SetHandler(&BitOp) << CI{"BITOP", CO::WRITE | CO::NO_AUTOJOURNAL, -4, 2, -1, 1}.SetHandler(&BitOp)
<< CI{"GETBIT", CO::READONLY | CO::FAST | CO::FAST, 3, 1, 1, 1}.SetHandler(&GetBit) << CI{"GETBIT", CO::READONLY | CO::FAST | CO::FAST, 3, 1, 1, 1}.SetHandler(&GetBit)
<< CI{"SETBIT", CO::WRITE, 4, 1, 1, 1}.SetHandler(&SetBit); << CI{"SETBIT", CO::WRITE, 4, 1, 1, 1}.SetHandler(&SetBit);
} }

View file

@ -193,8 +193,13 @@ bool ParseDouble(string_view src, double* value) {
return true; return true;
} }
void OpArgs::RecordJournal(string_view cmd, ArgSlice args) const { void RecordJournal(const OpArgs& op_args, string_view cmd, ArgSlice args, uint32_t shard_cnt,
tx->LogJournalOnShard(shard, make_pair(cmd, args), 1); bool multi_commands) {
op_args.tx->LogJournalOnShard(op_args.shard, make_pair(cmd, args), shard_cnt, multi_commands);
}
void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt) {
op_args.tx->FinishLogJournalOnShard(op_args.shard, shard_cnt);
} }
#define ADD(x) (x) += o.x #define ADD(x) (x) += o.x

View file

@ -90,11 +90,15 @@ struct OpArgs {
OpArgs(EngineShard* s, const Transaction* tx, const DbContext& cntx) OpArgs(EngineShard* s, const Transaction* tx, const DbContext& cntx)
: shard(s), tx(tx), db_cntx(cntx) { : shard(s), tx(tx), db_cntx(cntx) {
} }
// Log single-shard journal command with own txid and dbid.
void RecordJournal(std::string_view cmd, ArgSlice args) const;
}; };
// Record non auto journal command with own txid and dbid.
void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args,
uint32_t shard_cnt = 1, bool multi_commands = false);
// Record non auto journal command finish. Call only when command translates to multi commands.
void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt);
struct TieredStats { struct TieredStats {
size_t tiered_reads = 0; size_t tiered_reads = 0;
size_t tiered_writes = 0; size_t tiered_writes = 0;

View file

@ -370,7 +370,11 @@ OpStatus Renamer::MoveSrc(Transaction* t, EngineShard* es) {
pv_ = std::move(it->second); pv_ = std::move(it->second);
it->second.SetExpire(has_expire); it->second.SetExpire(has_expire);
} }
CHECK(es->db_slice().Del(t->GetDbIndex(), it)); // delete the entry with empty value in it. CHECK(es->db_slice().Del(t->GetDbIndex(), it)); // delete the entry with empty value in it.
if (es->journal()) {
RecordJournal(t->GetOpArgs(es), "DEL", ArgSlice{src_res_.key}, 2);
}
} }
return OpStatus::OK; return OpStatus::OK;
@ -406,6 +410,21 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) {
if (!is_prior_list && dest_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) { if (!is_prior_list && dest_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) {
es->blocking_controller()->AwakeWatched(t->GetDbIndex(), dest_key); es->blocking_controller()->AwakeWatched(t->GetDbIndex(), dest_key);
} }
if (es->journal()) {
OpArgs op_args = t->GetOpArgs(es);
string scratch;
// todo insert under multi exec
RecordJournal(op_args, "SET"sv, ArgSlice{dest_key, dest_it->second.GetSlice(&scratch)}, 2,
true);
if (dest_it->first.IsSticky()) {
RecordJournal(op_args, "STICK"sv, ArgSlice{dest_key}, 2, true);
}
if (dest_it->second.HasExpire()) {
auto time = absl::StrCat(src_res_.expire_ts);
RecordJournal(op_args, "PEXPIREAT"sv, ArgSlice{time}, 2, true);
}
RecordJournalFinish(op_args, 2);
}
} }
return OpStatus::OK; return OpStatus::OK;
@ -592,13 +611,13 @@ OpStatus OpExpire(const OpArgs& op_args, string_view key, const DbSlice::ExpireP
// If the value was deleted, replicate as DEL. // If the value was deleted, replicate as DEL.
// Else, replicate as PEXPIREAT with exact time. // Else, replicate as PEXPIREAT with exact time.
if (auto journal = op_args.shard->journal(); journal && res.ok()) { if (op_args.shard->journal() && res.ok()) {
if (res.value() == -1) { if (res.value() == -1) {
op_args.RecordJournal("DEL"sv, ArgSlice{key}); RecordJournal(op_args, "DEL"sv, ArgSlice{key});
} else { } else {
auto time = absl::StrCat(res.value()); auto time = absl::StrCat(res.value());
// Note: Don't forget to change this when adding arguments to expire commands. // Note: Don't forget to change this when adding arguments to expire commands.
op_args.RecordJournal("PEXPIREAT"sv, ArgSlice{time}); RecordJournal(op_args, "PEXPIREAT"sv, ArgSlice{time});
} }
} }
@ -1195,7 +1214,12 @@ OpResult<void> GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des
if (transaction->GetUniqueShardCnt() == 1) { if (transaction->GetUniqueShardCnt() == 1) {
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest); auto ec = OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest);
// Incase of uniqe shard count we can use rename command in replica.
if (ec.ok() && shard->journal()) {
RecordJournal(t->GetOpArgs(shard), "RENAME", {key[0], key[1]});
}
return ec;
}; };
OpResult<void> result = transaction->ScheduleSingleHopT(std::move(cb)); OpResult<void> result = transaction->ScheduleSingleHopT(std::move(cb));
@ -1419,8 +1443,8 @@ void GenericFamily::Register(CommandRegistry* registry) {
<< CI{"PEXPIREAT", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC( << CI{"PEXPIREAT", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(
PexpireAt) PexpireAt)
<< CI{"PEXPIRE", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(Pexpire) << CI{"PEXPIRE", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(Pexpire)
<< CI{"RENAME", CO::WRITE, 3, 1, 2, 1}.HFUNC(Rename) << CI{"RENAME", CO::WRITE | CO::NO_AUTOJOURNAL, 3, 1, 2, 1}.HFUNC(Rename)
<< CI{"RENAMENX", CO::WRITE, 3, 1, 2, 1}.HFUNC(RenameNx) << CI{"RENAMENX", CO::WRITE | CO::NO_AUTOJOURNAL, 3, 1, 2, 1}.HFUNC(RenameNx)
<< CI{"SELECT", kSelectOpts, 2, 0, 0, 0}.HFUNC(Select) << CI{"SELECT", kSelectOpts, 2, 0, 0, 0}.HFUNC(Select)
<< CI{"SCAN", CO::READONLY | CO::FAST | CO::LOADING, -2, 0, 0, 0}.HFUNC(Scan) << CI{"SCAN", CO::READONLY | CO::FAST | CO::LOADING, -2, 0, 0, 0}.HFUNC(Scan)
<< CI{"TTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Ttl) << CI{"TTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Ttl)

View file

@ -539,7 +539,7 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
auto it = db_slice.FindExt(op_args.db_cntx, key).first; auto it = db_slice.FindExt(op_args.db_cntx, key).first;
db_slice.Del(op_args.db_cntx.db_index, it); db_slice.Del(op_args.db_cntx.db_index, it);
if (journal_update && op_args.shard->journal()) { if (journal_update && op_args.shard->journal()) {
op_args.RecordJournal("DEL"sv, ArgSlice{key}); RecordJournal(op_args, "DEL"sv, ArgSlice{key});
} }
return 0; return 0;
} }
@ -612,11 +612,11 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !new_key); db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !new_key);
if (journal_update && op_args.shard->journal()) { if (journal_update && op_args.shard->journal()) {
op_args.RecordJournal("DEL"sv, ArgSlice{key}); RecordJournal(op_args, "DEL"sv, ArgSlice{key});
vector<string_view> mapped(vals.size() + 1); vector<string_view> mapped(vals.size() + 1);
mapped[0] = key; mapped[0] = key;
std::copy(vals.begin(), vals.end(), mapped.begin() + 1); std::copy(vals.begin(), vals.end(), mapped.begin() + 1);
op_args.RecordJournal("SADD"sv, mapped); RecordJournal(op_args, "SADD"sv, mapped);
} }
return res; return res;
} }
@ -689,7 +689,7 @@ OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, const ArgSlice&
vector<string_view> mapped(vals.size() + 1); vector<string_view> mapped(vals.size() + 1);
mapped[0] = key; mapped[0] = key;
std::copy(vals.begin(), vals.end(), mapped.begin() + 1); std::copy(vals.begin(), vals.end(), mapped.begin() + 1);
op_args.RecordJournal("SREM"sv, mapped); RecordJournal(op_args, "SREM"sv, mapped);
} }
return removed; return removed;
@ -981,8 +981,8 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, unsigned count
CHECK(db_slice.Del(op_args.db_cntx.db_index, it)); CHECK(db_slice.Del(op_args.db_cntx.db_index, it));
// Replicate as DEL. // Replicate as DEL.
if (auto journal = op_args.shard->journal(); journal) { if (op_args.shard->journal()) {
op_args.RecordJournal("DEL"sv, ArgSlice{key}); RecordJournal(op_args, "DEL"sv, ArgSlice{key});
} }
} else { } else {
SetType st{it->second.RObjPtr(), it->second.Encoding()}; SetType st{it->second.RObjPtr(), it->second.Encoding()};
@ -1004,11 +1004,11 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, unsigned count
} }
// Replicate as SREM with removed keys, because SPOP is not deterministic. // Replicate as SREM with removed keys, because SPOP is not deterministic.
if (auto journal = op_args.shard->journal(); journal) { if (op_args.shard->journal()) {
vector<string_view> mapped(result.size() + 1); vector<string_view> mapped(result.size() + 1);
mapped[0] = key; mapped[0] = key;
std::copy(result.begin(), result.end(), mapped.begin() + 1); std::copy(result.begin(), result.end(), mapped.begin() + 1);
op_args.RecordJournal("SREM"sv, mapped); RecordJournal(op_args, "SREM"sv, mapped);
} }
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);

View file

@ -1238,17 +1238,26 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) {
entry_payload = make_pair(cmd, GetShardArgs(shard->shard_id())); entry_payload = make_pair(cmd, GetShardArgs(shard->shard_id()));
} }
LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_); LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false);
} }
void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload,
uint32_t shard_cnt) const { uint32_t shard_cnt, bool multi_commands) const {
auto journal = shard->journal(); auto journal = shard->journal();
CHECK(journal); CHECK(journal);
auto opcode = multi_ ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND; auto opcode = (multi_ || multi_commands) ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND;
journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, std::move(payload)); journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, std::move(payload));
} }
void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const {
if (multi_) {
return;
}
auto journal = shard->journal();
CHECK(journal);
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt, {});
}
void Transaction::BreakOnShutdown() { void Transaction::BreakOnShutdown() {
if (coordinator_state_ & COORD_BLOCKED) { if (coordinator_state_ & COORD_BLOCKED) {
coordinator_state_ |= COORD_CANCELLED; coordinator_state_ |= COORD_CANCELLED;

View file

@ -199,6 +199,14 @@ class Transaction {
std::string DebugId() const; std::string DebugId() const;
// Write a journal entry to a shard journal with the given payload. When logging a non-automatic
// journal command, multiple journal entries may be necessary. In this case, call with set
// multi_commands to true and call the FinishLogJournalOnShard function after logging the final
// entry.
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, uint32_t shard_cnt,
bool multi_commands) const;
void FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const;
private: private:
// Holds number of locks for each IntentLock::Mode: shared and exlusive. // Holds number of locks for each IntentLock::Mode: shared and exlusive.
struct LockCnt { struct LockCnt {