diff --git a/src/server/bitops_family.cc b/src/server/bitops_family.cc index 517184797..81fd4b5bc 100644 --- a/src/server/bitops_family.cc +++ b/src/server/bitops_family.cc @@ -577,6 +577,10 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) { if (find_res == OpStatus::OK) { operation.Commit(op_result); } + + if (shard->journal()) { + RecordJournal(t->GetOpArgs(shard), "SET", {dest_key, op_result}); + } } return OpStatus::OK; }; @@ -690,7 +694,7 @@ void BitOpsFamily::Register(CommandRegistry* registry) { << CI{"BITCOUNT", CO::READONLY, -2, 1, 1, 1}.SetHandler(&BitCount) << CI{"BITFIELD", CO::WRITE, -3, 1, 1, 1}.SetHandler(&BitField) << CI{"BITFIELD_RO", CO::READONLY, -5, 1, 1, 1}.SetHandler(&BitFieldRo) - << CI{"BITOP", CO::WRITE, -4, 2, -1, 1}.SetHandler(&BitOp) + << CI{"BITOP", CO::WRITE | CO::NO_AUTOJOURNAL, -4, 2, -1, 1}.SetHandler(&BitOp) << CI{"GETBIT", CO::READONLY | CO::FAST | CO::FAST, 3, 1, 1, 1}.SetHandler(&GetBit) << CI{"SETBIT", CO::WRITE, 4, 1, 1, 1}.SetHandler(&SetBit); } diff --git a/src/server/common.cc b/src/server/common.cc index 970a91763..d6bc03251 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -193,8 +193,13 @@ bool ParseDouble(string_view src, double* value) { return true; } -void OpArgs::RecordJournal(string_view cmd, ArgSlice args) const { - tx->LogJournalOnShard(shard, make_pair(cmd, args), 1); +void RecordJournal(const OpArgs& op_args, string_view cmd, ArgSlice args, uint32_t shard_cnt, + bool multi_commands) { + op_args.tx->LogJournalOnShard(op_args.shard, make_pair(cmd, args), shard_cnt, multi_commands); +} + +void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt) { + op_args.tx->FinishLogJournalOnShard(op_args.shard, shard_cnt); } #define ADD(x) (x) += o.x diff --git a/src/server/common.h b/src/server/common.h index 0e4b761fb..49b39bde2 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -90,11 +90,15 @@ struct OpArgs { OpArgs(EngineShard* s, const Transaction* tx, const DbContext& cntx) : shard(s), tx(tx), db_cntx(cntx) { } - - // Log single-shard journal command with own txid and dbid. - void RecordJournal(std::string_view cmd, ArgSlice args) const; }; +// Record non auto journal command with own txid and dbid. +void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args, + uint32_t shard_cnt = 1, bool multi_commands = false); + +// Record non auto journal command finish. Call only when command translates to multi commands. +void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt); + struct TieredStats { size_t tiered_reads = 0; size_t tiered_writes = 0; diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 7e83d3ffb..3cfc29fb7 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -370,7 +370,11 @@ OpStatus Renamer::MoveSrc(Transaction* t, EngineShard* es) { pv_ = std::move(it->second); it->second.SetExpire(has_expire); } + CHECK(es->db_slice().Del(t->GetDbIndex(), it)); // delete the entry with empty value in it. + if (es->journal()) { + RecordJournal(t->GetOpArgs(es), "DEL", ArgSlice{src_res_.key}, 2); + } } return OpStatus::OK; @@ -406,6 +410,21 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) { if (!is_prior_list && dest_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) { es->blocking_controller()->AwakeWatched(t->GetDbIndex(), dest_key); } + if (es->journal()) { + OpArgs op_args = t->GetOpArgs(es); + string scratch; + // todo insert under multi exec + RecordJournal(op_args, "SET"sv, ArgSlice{dest_key, dest_it->second.GetSlice(&scratch)}, 2, + true); + if (dest_it->first.IsSticky()) { + RecordJournal(op_args, "STICK"sv, ArgSlice{dest_key}, 2, true); + } + if (dest_it->second.HasExpire()) { + auto time = absl::StrCat(src_res_.expire_ts); + RecordJournal(op_args, "PEXPIREAT"sv, ArgSlice{time}, 2, true); + } + RecordJournalFinish(op_args, 2); + } } return OpStatus::OK; @@ -592,13 +611,13 @@ OpStatus OpExpire(const OpArgs& op_args, string_view key, const DbSlice::ExpireP // If the value was deleted, replicate as DEL. // Else, replicate as PEXPIREAT with exact time. - if (auto journal = op_args.shard->journal(); journal && res.ok()) { + if (op_args.shard->journal() && res.ok()) { if (res.value() == -1) { - op_args.RecordJournal("DEL"sv, ArgSlice{key}); + RecordJournal(op_args, "DEL"sv, ArgSlice{key}); } else { auto time = absl::StrCat(res.value()); // Note: Don't forget to change this when adding arguments to expire commands. - op_args.RecordJournal("PEXPIREAT"sv, ArgSlice{time}); + RecordJournal(op_args, "PEXPIREAT"sv, ArgSlice{time}); } } @@ -1195,7 +1214,12 @@ OpResult GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des if (transaction->GetUniqueShardCnt() == 1) { auto cb = [&](Transaction* t, EngineShard* shard) { - return OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest); + auto ec = OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest); + // Incase of uniqe shard count we can use rename command in replica. + if (ec.ok() && shard->journal()) { + RecordJournal(t->GetOpArgs(shard), "RENAME", {key[0], key[1]}); + } + return ec; }; OpResult result = transaction->ScheduleSingleHopT(std::move(cb)); @@ -1419,8 +1443,8 @@ void GenericFamily::Register(CommandRegistry* registry) { << CI{"PEXPIREAT", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC( PexpireAt) << CI{"PEXPIRE", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(Pexpire) - << CI{"RENAME", CO::WRITE, 3, 1, 2, 1}.HFUNC(Rename) - << CI{"RENAMENX", CO::WRITE, 3, 1, 2, 1}.HFUNC(RenameNx) + << CI{"RENAME", CO::WRITE | CO::NO_AUTOJOURNAL, 3, 1, 2, 1}.HFUNC(Rename) + << CI{"RENAMENX", CO::WRITE | CO::NO_AUTOJOURNAL, 3, 1, 2, 1}.HFUNC(RenameNx) << CI{"SELECT", kSelectOpts, 2, 0, 0, 0}.HFUNC(Select) << CI{"SCAN", CO::READONLY | CO::FAST | CO::LOADING, -2, 0, 0, 0}.HFUNC(Scan) << CI{"TTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Ttl) diff --git a/src/server/set_family.cc b/src/server/set_family.cc index c6c83c33b..a6be3b811 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -539,7 +539,7 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v auto it = db_slice.FindExt(op_args.db_cntx, key).first; db_slice.Del(op_args.db_cntx.db_index, it); if (journal_update && op_args.shard->journal()) { - op_args.RecordJournal("DEL"sv, ArgSlice{key}); + RecordJournal(op_args, "DEL"sv, ArgSlice{key}); } return 0; } @@ -612,11 +612,11 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !new_key); if (journal_update && op_args.shard->journal()) { - op_args.RecordJournal("DEL"sv, ArgSlice{key}); + RecordJournal(op_args, "DEL"sv, ArgSlice{key}); vector mapped(vals.size() + 1); mapped[0] = key; std::copy(vals.begin(), vals.end(), mapped.begin() + 1); - op_args.RecordJournal("SADD"sv, mapped); + RecordJournal(op_args, "SADD"sv, mapped); } return res; } @@ -689,7 +689,7 @@ OpResult OpRem(const OpArgs& op_args, string_view key, const ArgSlice& vector mapped(vals.size() + 1); mapped[0] = key; std::copy(vals.begin(), vals.end(), mapped.begin() + 1); - op_args.RecordJournal("SREM"sv, mapped); + RecordJournal(op_args, "SREM"sv, mapped); } return removed; @@ -981,8 +981,8 @@ OpResult OpPop(const OpArgs& op_args, string_view key, unsigned count CHECK(db_slice.Del(op_args.db_cntx.db_index, it)); // Replicate as DEL. - if (auto journal = op_args.shard->journal(); journal) { - op_args.RecordJournal("DEL"sv, ArgSlice{key}); + if (op_args.shard->journal()) { + RecordJournal(op_args, "DEL"sv, ArgSlice{key}); } } else { SetType st{it->second.RObjPtr(), it->second.Encoding()}; @@ -1004,11 +1004,11 @@ OpResult OpPop(const OpArgs& op_args, string_view key, unsigned count } // Replicate as SREM with removed keys, because SPOP is not deterministic. - if (auto journal = op_args.shard->journal(); journal) { + if (op_args.shard->journal()) { vector mapped(result.size() + 1); mapped[0] = key; std::copy(result.begin(), result.end(), mapped.begin() + 1); - op_args.RecordJournal("SREM"sv, mapped); + RecordJournal(op_args, "SREM"sv, mapped); } db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 0dba4bdde..5b5b56c38 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1238,17 +1238,26 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) { entry_payload = make_pair(cmd, GetShardArgs(shard->shard_id())); } - LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_); + LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false); } void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, - uint32_t shard_cnt) const { + uint32_t shard_cnt, bool multi_commands) const { auto journal = shard->journal(); CHECK(journal); - auto opcode = multi_ ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND; + auto opcode = (multi_ || multi_commands) ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND; journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, std::move(payload)); } +void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const { + if (multi_) { + return; + } + auto journal = shard->journal(); + CHECK(journal); + journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt, {}); +} + void Transaction::BreakOnShutdown() { if (coordinator_state_ & COORD_BLOCKED) { coordinator_state_ |= COORD_CANCELLED; diff --git a/src/server/transaction.h b/src/server/transaction.h index aa470f896..0b114f68b 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -199,6 +199,14 @@ class Transaction { std::string DebugId() const; + // Write a journal entry to a shard journal with the given payload. When logging a non-automatic + // journal command, multiple journal entries may be necessary. In this case, call with set + // multi_commands to true and call the FinishLogJournalOnShard function after logging the final + // entry. + void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, uint32_t shard_cnt, + bool multi_commands) const; + void FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const; + private: // Holds number of locks for each IntentLock::Mode: shared and exlusive. struct LockCnt {