From ccada875e0196ec65be8a11ba190642fd2999256 Mon Sep 17 00:00:00 2001 From: adiholden Date: Tue, 9 Jul 2024 14:48:31 +0300 Subject: [PATCH] feat(server): master stop sending exec opcode to replica (#3289) * feat server: master stop sending exec opcode to replica Signed-off-by: adi_holden --- src/server/journal/journal_test.cc | 4 +- src/server/journal/serializer.cc | 2 - src/server/main_service.cc | 1 - src/server/multi_command_squasher.cc | 5 -- src/server/multi_command_squasher.h | 3 +- src/server/snapshot.cc | 5 -- src/server/transaction.cc | 79 ++-------------------------- src/server/transaction.h | 26 +-------- src/server/tx_base.cc | 14 ++--- src/server/tx_base.h | 7 +-- 10 files changed, 14 insertions(+), 132 deletions(-) diff --git a/src/server/journal/journal_test.cc b/src/server/journal/journal_test.cc index 06313bf07..b15f3f46a 100644 --- a/src/server/journal/journal_test.cc +++ b/src/server/journal/journal_test.cc @@ -97,9 +97,7 @@ TEST(Journal, WriteRead) { {2, Op::COMMAND, 1, 1, nullopt, Payload("LPUSH", list("l", "v1", "v2"))}, {3, Op::COMMAND, 0, 1, nullopt, Payload("MSET", slice("D", "4"))}, {4, Op::COMMAND, 1, 1, nullopt, Payload("DEL", list("l1"))}, - {5, Op::COMMAND, 2, 1, nullopt, Payload("DEL", list("E", "2"))}, - {6, Op::MULTI_COMMAND, 2, 1, nullopt, Payload("SET", list("E", "2"))}, - {6, Op::EXEC, 2, 1, nullopt}}; + {5, Op::COMMAND, 2, 1, nullopt, Payload("DEL", list("E", "2"))}}; // Write all entries to a buffer. base::IoBuf buf; diff --git a/src/server/journal/serializer.cc b/src/server/journal/serializer.cc index 4938421ef..d62c5ddce 100644 --- a/src/server/journal/serializer.cc +++ b/src/server/journal/serializer.cc @@ -76,8 +76,6 @@ void JournalWriter::Write(const journal::Entry& entry) { return; case journal::Op::COMMAND: case journal::Op::EXPIRED: - case journal::Op::MULTI_COMMAND: - case journal::Op::EXEC: Write(entry.txid); Write(entry.shard_cnt); Write(entry.payload); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index d25bfb643..ed74e83e2 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1993,7 +1993,6 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret cntx->transaction = stub_tx.get(); result = interpreter->RunFunction(eval_args.sha, &error); - cntx->transaction->FIX_ConcludeJournalExec(); // flush journal cntx->transaction = tx; return OpStatus::OK; diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 11abfe118..b9eeb6b54 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -116,7 +116,6 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm auto& sinfo = PrepareShardInfo(last_sid, slot_checker.GetUniqueSlotId()); - sinfo.had_writes |= cmd->Cid()->IsWriteOnly(); sinfo.cmds.push_back(cmd); order_.push_back(last_sid); @@ -280,10 +279,6 @@ void MultiCommandSquasher::Run() { // Set last txid. cntx_->last_command_debug.clock = cntx_->transaction->txid(); - if (!sharded_.empty()) - cntx_->transaction->ReportWritesSquashedMulti( - [this](ShardId sid) { return sharded_[sid].had_writes; }); - // UnlockMulti is a no-op for non-atomic multi transactions, // still called for correctness and future changes if (!IsAtomic()) { diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index 83f01d68c..26922b22b 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -30,10 +30,9 @@ class MultiCommandSquasher { private: // Per-shard execution info. struct ShardExecInfo { - ShardExecInfo() : had_writes{false}, cmds{}, replies{}, local_tx{nullptr} { + ShardExecInfo() : cmds{}, replies{}, local_tx{nullptr} { } - bool had_writes; std::vector cmds; // accumulated commands std::vector replies; boost::intrusive_ptr local_tx; // stub-mode tx for use inside shard diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 08a9ee8b8..8a202954d 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -351,11 +351,6 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) // no database switch can be performed between those two calls, because they are part of one // transaction. void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) { - // We ignore EXEC entries because we they have no meaning during - // the LOAD phase on replica. - if (item.opcode == journal::Op::EXEC) - return; - // To enable journal flushing to sync after non auto journal command is executed we call // TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no // additional journal change to serialize, it simply invokes PushSerializedToChannel. diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 002a20988..e7e7521ca 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -132,8 +132,6 @@ Transaction::Transaction(const CommandId* cid) : cid_{cid} { string_view cmd_name(cid_->name()); if (cmd_name == "EXEC" || cmd_name == "EVAL" || cmd_name == "EVALSHA") { multi_.reset(new MultiData); - multi_->shard_journal_write.resize(shard_set->size(), false); - multi_->mode = NOT_DETERMINED; multi_->role = DEFAULT; } @@ -153,7 +151,6 @@ Transaction::Transaction(const Transaction* parent, ShardId shard_id, } multi_->role = SQUASHED_STUB; - multi_->shard_journal_write.resize(1); MultiUpdateWithParent(parent); if (slot_id.has_value()) { @@ -597,11 +594,6 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { // This is the last hop, so clear cont_trans if its held by the current tx shard->RemoveContTx(this); - if (IsAtomicMulti()) { // Can only be true if run through ScheduleSingleHop - DCHECK(cid_->IsMultiTransactional()); - MultiReportJournalOnShard(shard); - } - // It has 2 responsibilities. // 1: to go over potential wakened keys, verify them and activate watch queues. // 2: if this transaction was notified and finished running - to remove it from the head @@ -758,15 +750,6 @@ void Transaction::ScheduleInternal() { } } -void Transaction::ReportWritesSquashedMulti(absl::FunctionRef had_write) { - DCHECK(multi_); - for (unsigned i = 0; i < multi_->shard_journal_write.size(); i++) - multi_->shard_journal_write[i] |= had_write(i); - - // Update imemdiately if we decide to conclude after one hop without UnlockMulti - multi_->shard_journal_cnt = CalcMultiNumOfShardJournals(); -} - // Runs in the coordinator fiber. void Transaction::UnlockMulti() { VLOG(1) << "UnlockMulti " << DebugId(); @@ -782,8 +765,6 @@ void Transaction::UnlockMulti() { sharded_keys[sid].emplace_back(fp); } - multi_->shard_journal_cnt = ServerState::tlocal()->journal() ? CalcMultiNumOfShardJournals() : 0; - use_count_.fetch_add(shard_data_.size(), std::memory_order_relaxed); DCHECK_EQ(shard_data_.size(), shard_set->size()); @@ -798,16 +779,6 @@ void Transaction::UnlockMulti() { VLOG(1) << "UnlockMultiEnd " << DebugId(); } -uint32_t Transaction::CalcMultiNumOfShardJournals() const { - uint32_t shard_journals_cnt = 0; - for (bool was_shard_write : multi_->shard_journal_write) { - if (was_shard_write) { - ++shard_journals_cnt; - } - } - return shard_journals_cnt; -} - OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { Execute(cb, true); return local_result_; @@ -919,14 +890,6 @@ const absl::flat_hash_set>& Transaction::GetMultiFps( return multi_->tag_fps; } -void Transaction::FIX_ConcludeJournalExec() { - if (!multi_->shard_journal_write.front()) - return; - - multi_->shard_journal_cnt = 1; - MultiReportJournalOnShard(EngineShard::tlocal()); -} - string Transaction::DEBUG_PrintFailState(ShardId sid) const { auto res = StrCat( "usc: ", unique_shard_cnt_, ", name:", GetCId()->name(), @@ -1262,21 +1225,9 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) { return result; } -void Transaction::MultiReportJournalOnShard(EngineShard* shard) const { - DCHECK_EQ(EngineShard::tlocal(), shard); - auto* journal = shard->journal(); - size_t write_idx = multi_->role == SQUASHED_STUB ? 0 : shard->shard_id(); - if (journal != nullptr && multi_->shard_journal_write[write_idx]) { - journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, multi_->shard_journal_cnt, - unique_slot_checker_.GetUniqueSlotId(), {}, true); - } -} - void Transaction::UnlockMultiShardCb(absl::Span fps, EngineShard* shard) { DCHECK(multi_ && multi_->lock_mode); - MultiReportJournalOnShard(shard); - if (multi_->mode == GLOBAL) { shard->shard_lock()->Release(IntentLock::EXCLUSIVE); } else { @@ -1402,37 +1353,15 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult resul } // Record to journal autojournal commands, here we allow await which anables writing to sync // the journal change. - LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false, true); + LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, true); } void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, - uint32_t shard_cnt, bool multi_commands, - bool allow_await) const { + uint32_t shard_cnt, bool allow_await) const { auto journal = shard->journal(); CHECK(journal); - - if (multi_) { - if (multi_->role != SQUASHED_STUB) - multi_->shard_journal_write[shard->shard_id()] = true; - else - multi_->shard_journal_write[0] = true; - } - - bool is_multi = multi_commands || IsAtomicMulti(); - - auto opcode = is_multi ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND; - journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, unique_slot_checker_.GetUniqueSlotId(), - std::move(payload), allow_await); -} - -void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const { - if (multi_) { - return; - } - auto journal = shard->journal(); - CHECK(journal); - journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt, - unique_slot_checker_.GetUniqueSlotId(), {}, false); + journal->RecordEntry(txid_, journal::Op::COMMAND, db_index_, shard_cnt, + unique_slot_checker_.GetUniqueSlotId(), std::move(payload), allow_await); } void Transaction::ReviveAutoJournal() { diff --git a/src/server/transaction.h b/src/server/transaction.h index 2d36d40f5..eec467a2a 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -229,10 +229,6 @@ class Transaction { // Start multi in NON_ATOMIC mode. void StartMultiNonAtomic(); - // Report which shards had write commands that executed on stub transactions - // and thus did not mark itself in MultiData::shard_journal_write. - void ReportWritesSquashedMulti(absl::FunctionRef had_write); - // Unlock key locks of a multi transaction. void UnlockMulti(); @@ -325,14 +321,9 @@ class Transaction { // to it must not block. void PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdArgList args); - // Write a journal entry to a shard journal with the given payload. When logging a non-automatic - // journal command, multiple journal entries may be necessary. In this case, call with set - // multi_commands to true and call the FinishLogJournalOnShard function after logging the final - // entry. + // Write a journal entry to a shard journal with the given payload. void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, uint32_t shard_cnt, - bool multi_commands, bool allow_await) const; - - void FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const; + bool allow_await) const; // Re-enable auto journal for commands marked as NO_AUTOJOURNAL. Call during setup. void ReviveAutoJournal(); @@ -343,9 +334,6 @@ class Transaction { // Get keys multi transaction was initialized with, normalized and unique const absl::flat_hash_set>& GetMultiFps() const; - // Send journal EXEC opcode after a series of MULTI commands on the currently active shard - void FIX_ConcludeJournalExec(); - // Print in-dept failure state for debugging. std::string DEBUG_PrintFailState(ShardId sid) const; @@ -442,13 +430,6 @@ class Transaction { bool concluding = false; unsigned cmd_seq_num = 0; // used for debugging purposes. - unsigned shard_journal_cnt; - - // The shard_journal_write vector variable is used to determine the number of shards - // involved in a multi-command transaction. This information is utilized by replicas when - // executing multi-command. For every write to a shard journal, the corresponding index in the - // vector is marked as true. - absl::InlinedVector shard_journal_write; }; enum CoordinatorState : uint8_t { @@ -543,9 +524,6 @@ class Transaction { // Set time_now_ms_ void InitTxTime(); - // If journaling is enabled, report final exec opcode to finish the chain of commands. - void MultiReportJournalOnShard(EngineShard* shard) const; - void UnlockMultiShardCb(absl::Span fps, EngineShard* shard); // In a multi-command transaction, we determine the number of shard journals that we wrote entries diff --git a/src/server/tx_base.cc b/src/server/tx_base.cc index 7d6f06e6d..b0fa25506 100644 --- a/src/server/tx_base.cc +++ b/src/server/tx_base.cc @@ -24,21 +24,15 @@ size_t ShardArgs::Size() const { } void RecordJournal(const OpArgs& op_args, string_view cmd, const ShardArgs& args, - uint32_t shard_cnt, bool multi_commands) { + uint32_t shard_cnt) { VLOG(2) << "Logging command " << cmd << " from txn " << op_args.tx->txid(); - op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, multi_commands, - false); + op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, false); } void RecordJournal(const OpArgs& op_args, std::string_view cmd, facade::ArgSlice args, - uint32_t shard_cnt, bool multi_commands) { + uint32_t shard_cnt) { VLOG(2) << "Logging command " << cmd << " from txn " << op_args.tx->txid(); - op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, multi_commands, - false); -} - -void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt) { - op_args.tx->FinishLogJournalOnShard(op_args.shard, shard_cnt); + op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, false); } void RecordExpiry(DbIndex dbid, string_view key) { diff --git a/src/server/tx_base.h b/src/server/tx_base.h index 9373bdb1d..9484b281c 100644 --- a/src/server/tx_base.h +++ b/src/server/tx_base.h @@ -201,12 +201,9 @@ class ShardArgs { // Record non auto journal command with own txid and dbid. void RecordJournal(const OpArgs& op_args, std::string_view cmd, const ShardArgs& args, - uint32_t shard_cnt = 1, bool multi_commands = false); + uint32_t shard_cnt = 1); void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args, - uint32_t shard_cnt = 1, bool multi_commands = false); - -// Record non auto journal command finish. Call only when command translates to multi commands. -void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt); + uint32_t shard_cnt = 1); // Record expiry in journal with independent transaction. Must be called from shard thread holding // key.