fix(replication): do not log to journal on callback fail (#4392)

fix replication: do not log to journal on callback fail

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2025-01-02 09:39:31 +02:00 committed by GitHub
parent 413ec0a1cf
commit 3b082e42b8
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 8 additions and 4 deletions

View file

@ -704,7 +704,7 @@ void Transaction::RunCallback(EngineShard* shard) {
// Log to journal only once the command finished running
if ((coordinator_state_ & COORD_CONCLUDING) || (multi_ && multi_->concluding)) {
LogAutoJournalOnShard(shard);
LogAutoJournalOnShard(shard, result);
MaybeInvokeTrackingCb();
}
}
@ -1346,7 +1346,7 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
auto result = cb(this, shard);
db_slice.OnCbFinish();
LogAutoJournalOnShard(shard);
LogAutoJournalOnShard(shard, result);
MaybeInvokeTrackingCb();
DCHECK_EQ(result.flags, 0); // if it's sophisticated, we shouldn't squash it
@ -1438,7 +1438,7 @@ optional<string_view> Transaction::GetWakeKey(ShardId sid) const {
return ArgS(full_args_, sd.wake_key_pos);
}
void Transaction::LogAutoJournalOnShard(EngineShard* shard) {
void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult result) {
// TODO: For now, we ignore non shard coordination.
if (shard == nullptr)
return;
@ -1455,6 +1455,10 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) {
if (journal == nullptr)
return;
if (result.status != OpStatus::OK) {
return; // Do not log to journal if command execution failed.
}
// If autojournaling was disabled and not re-enabled the callback is writing to journal.
if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) && !re_enabled_auto_journal_) {
return;

View file

@ -559,7 +559,7 @@ class Transaction {
// Log command in shard's journal, if this is a write command with auto-journaling enabled.
// Should be called immediately after the last hop.
void LogAutoJournalOnShard(EngineShard* shard);
void LogAutoJournalOnShard(EngineShard* shard, RunnableResult shard_result);
// Whether the callback can be run directly on this thread without dispatching on the shard queue
bool CanRunInlined() const;