diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index 0ed07768a..0ca02598b 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -94,7 +94,7 @@ class ClusterShardMigration { if (tx_data->opcode == journal::Op::PING) { // TODO check about ping logic } else { - ExecuteTxWithNoShardSync(std::move(*tx_data), cntx); + ExecuteTx(std::move(*tx_data), cntx); } } @@ -125,11 +125,10 @@ class ClusterShardMigration { } private: - void ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx) { + void ExecuteTx(TransactionData&& tx_data, Context* cntx) { if (cntx->IsCancelled()) { return; } - CHECK(tx_data.shard_cnt <= 1); // we don't support sync for multishard execution if (!tx_data.IsGlobalCmd()) { executor_.Execute(tx_data.dbid, tx_data.command); } else { diff --git a/src/server/journal/tx_executor.cc b/src/server/journal/tx_executor.cc index 734a59637..f163f6973 100644 --- a/src/server/journal/tx_executor.cc +++ b/src/server/journal/tx_executor.cc @@ -60,7 +60,6 @@ void TransactionData::AddEntry(journal::ParsedEntry&& entry) { case journal::Op::EXPIRED: case journal::Op::COMMAND: command = std::move(entry.cmd); - shard_cnt = entry.shard_cnt; dbid = entry.dbid; txid = entry.txid; return; diff --git a/src/server/journal/tx_executor.h b/src/server/journal/tx_executor.h index d4c401a37..4ed9d2ec9 100644 --- a/src/server/journal/tx_executor.h +++ b/src/server/journal/tx_executor.h @@ -47,7 +47,6 @@ struct TransactionData { TxId txid{0}; DbIndex dbid{0}; - uint32_t shard_cnt{0}; journal::ParsedEntry::CmdData command; journal::Op opcode = journal::Op::NOOP; diff --git a/src/server/journal/types.h b/src/server/journal/types.h index f03af1409..b460cfeb5 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -28,7 +28,8 @@ struct EntryBase { TxId txid; Op opcode; DbIndex dbid; - uint32_t shard_cnt; + uint32_t shard_cnt; // This field is no longer used by the replica, but we continue to serialize + // and deserialize it to maintain backward compatibility. std::optional slot; LSN lsn{0}; }; diff --git a/src/server/replica.cc b/src/server/replica.cc index 928ad6104..977ec0f88 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -966,7 +966,8 @@ void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, Context* cntx) { return; } - bool inserted_by_me = multi_shard_exe_->InsertTxToSharedMap(tx_data.txid, tx_data.shard_cnt); + bool inserted_by_me = + multi_shard_exe_->InsertTxToSharedMap(tx_data.txid, master_context_.num_flows); auto& multi_shard_data = multi_shard_exe_->Find(tx_data.txid); diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 537611c5f..ac4679d5f 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -399,17 +399,12 @@ OpStatus OpMSet(const OpArgs& op_args, const ShardArgs& args) { if (stored * 2 == args.Size()) { RecordJournal(op_args, "MSET", args, op_args.tx->GetUniqueShardCnt()); DCHECK_EQ(result, OpStatus::OK); - return result; + } else if (stored > 0) { + vector store_args(args.begin(), args.end()); + store_args.resize(stored * 2); + RecordJournal(op_args, "MSET", store_args, op_args.tx->GetUniqueShardCnt()); } - - // Even without changes, we have to send a dummy command like PING for the - // replica to ack - string_view cmd = stored == 0 ? "PING" : "MSET"; - vector store_args(args.begin(), args.end()); - store_args.resize(stored * 2); - RecordJournal(op_args, cmd, store_args, op_args.tx->GetUniqueShardCnt()); } - return result; } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 9abf8b346..3e180f871 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -704,7 +704,7 @@ void Transaction::RunCallback(EngineShard* shard) { // Log to journal only once the command finished running if ((coordinator_state_ & COORD_CONCLUDING) || (multi_ && multi_->concluding)) { - LogAutoJournalOnShard(shard, result); + LogAutoJournalOnShard(shard); MaybeInvokeTrackingCb(); } } @@ -1346,7 +1346,7 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) { auto result = cb(this, shard); db_slice.OnCbFinish(); - LogAutoJournalOnShard(shard, result); + LogAutoJournalOnShard(shard); MaybeInvokeTrackingCb(); DCHECK_EQ(result.flags, 0); // if it's sophisticated, we shouldn't squash it @@ -1438,7 +1438,7 @@ optional Transaction::GetWakeKey(ShardId sid) const { return ArgS(full_args_, sd.wake_key_pos); } -void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult result) { +void Transaction::LogAutoJournalOnShard(EngineShard* shard) { // TODO: For now, we ignore non shard coordination. if (shard == nullptr) return; @@ -1455,20 +1455,8 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult resul if (journal == nullptr) return; - if (result.status != OpStatus::OK) { - // We log NOOP even for NO_AUTOJOURNAL commands because the non-success status could have been - // due to OOM in a single shard, while other shards succeeded - journal->RecordEntry(txid_, journal::Op::NOOP, db_index_, unique_shard_cnt_, - unique_slot_checker_.GetUniqueSlotId(), journal::Entry::Payload{}, true); - return; - } - // If autojournaling was disabled and not re-enabled the callback is writing to journal. - // We do not allow preemption in callbacks and therefor the call to RecordJournal from - // from callbacks does not allow await. - // To make sure we flush the changes to sync we call TriggerJournalWriteToSink here. if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) && !re_enabled_auto_journal_) { - TriggerJournalWriteToSink(); return; } diff --git a/src/server/transaction.h b/src/server/transaction.h index 5e209eb96..c0eb37ff8 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -559,7 +559,7 @@ class Transaction { // Log command in shard's journal, if this is a write command with auto-journaling enabled. // Should be called immediately after the last hop. - void LogAutoJournalOnShard(EngineShard* shard, RunnableResult shard_result); + void LogAutoJournalOnShard(EngineShard* shard); // Whether the callback can be run directly on this thread without dispatching on the shard queue bool CanRunInlined() const;