From e0f86697f9fddda6412dead2a4a5f9217a75aef9 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Sun, 4 Feb 2024 20:28:44 +0300 Subject: [PATCH] fix: fix script replication (#2531) * fix: fix script replication Single key script replication was previously broken because the EXEC entry wasn't sent. Send it manually --------- Signed-off-by: Vladislav Oleshko --- src/server/journal/tx_executor.cc | 2 +- src/server/main_service.cc | 1 + src/server/transaction.cc | 26 ++++++++++++++++++----- src/server/transaction.h | 3 +++ tests/dragonfly/replication_test.py | 32 +++++++++++++++++++++++++++++ 5 files changed, 58 insertions(+), 6 deletions(-) diff --git a/src/server/journal/tx_executor.cc b/src/server/journal/tx_executor.cc index 58f92e1ed..521873975 100644 --- a/src/server/journal/tx_executor.cc +++ b/src/server/journal/tx_executor.cc @@ -119,7 +119,7 @@ std::optional TransactionReader::NextTxData(JournalReader* read // Otherwise, continue building multi command. DCHECK(res->opcode == journal::Op::MULTI_COMMAND || res->opcode == journal::Op::EXEC); - DCHECK(res->txid > 0); + DCHECK(res->txid > 0 || res->shard_cnt == 1); auto txid = res->txid; auto& txdata = current_[txid]; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index bd1f3b3c1..268e81dbe 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1850,6 +1850,7 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret cntx->transaction = stub_tx.get(); result = interpreter->RunFunction(eval_args.sha, &error); + cntx->transaction->FIX_ConcludeJournalExec(); // flush journal cntx->transaction = tx; return OpStatus::OK; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index eff17ce1e..2ff3c4b5b 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -188,7 +188,9 @@ Transaction::Transaction(const Transaction* parent, ShardId shard_id, std::optio // Use squashing mechanism for inline execution of single-shard EVAL multi_->mode = LOCK_AHEAD; } + multi_->role = SQUASHED_STUB; + multi_->shard_journal_write.resize(1); time_now_ms_ = parent->time_now_ms_; @@ -966,6 +968,16 @@ const absl::flat_hash_set& Transaction::GetMultiKeys() const { return multi_->frozen_keys_set; } +void Transaction::FIX_ConcludeJournalExec() { + if (!multi_->shard_journal_write.front()) + return; + + if (auto journal = EngineShard::tlocal()->journal(); journal != nullptr) { + journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, 1, + unique_slot_checker_.GetUniqueSlotId(), {}, false); + } +} + void Transaction::EnableShard(ShardId sid) { unique_shard_cnt_ = 1; unique_shard_id_ = sid; @@ -1464,8 +1476,13 @@ void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload& bool allow_await) const { auto journal = shard->journal(); CHECK(journal); - if (multi_ && multi_->role != SQUASHED_STUB) - multi_->shard_journal_write[shard->shard_id()] = true; + + if (multi_) { + if (multi_->role != SQUASHED_STUB) + multi_->shard_journal_write[shard->shard_id()] = true; + else + multi_->shard_journal_write[0] = true; + } bool is_multi = multi_commands || IsAtomicMulti(); @@ -1486,9 +1503,8 @@ void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt void Transaction::CancelBlocking(std::function status_cb) { // We're on the owning thread of this transaction, so we can safely access it's data below. - // We still need to claim the blocking barrier, but as this function is often called blindly, we - // want to check first if it makes sense to even proceed. - if (blocking_barrier_.IsClaimed()) + // First, check if it makes sense to proceed. + if (blocking_barrier_.IsClaimed() || cid_ == nullptr || (cid_->opt_mask() & CO::BLOCKING) == 0) return; OpStatus status = OpStatus::CANCELLED; diff --git a/src/server/transaction.h b/src/server/transaction.h index 15710d1c7..0fd0fd3e4 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -343,6 +343,9 @@ class Transaction { // Get keys multi transaction was initialized with, normalized and unique const absl::flat_hash_set& GetMultiKeys() const; + // Send journal EXEC opcode after a series of MULTI commands on the currently active shard + void FIX_ConcludeJournalExec(); + private: // Holds number of locks for each IntentLock::Mode: shared and exlusive. struct LockCnt { diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 5e9740291..c97168d5b 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -769,6 +769,38 @@ async def test_expiry(df_local_factory: DflyInstanceFactory, n_keys=1000): assert all(v is None for v in res) +@dfly_args({"proactor_threads": 4}) +async def test_simple_scripts(df_local_factory: DflyInstanceFactory): + master = df_local_factory.create() + replicas = [df_local_factory.create() for _ in range(2)] + df_local_factory.start_all([master] + replicas) + + c_replicas = [replica.client() for replica in replicas] + c_master = master.client() + + # Connect replicas and wait for sync to finish + for c_replica in c_replicas: + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + await check_all_replicas_finished([c_replica], c_master) + + # Generate some scripts and run them + keys = ["a", "b", "c", "d", "e"] + for i in range(len(keys) + 1): + script = "" + subkeys = keys[:i] + for key in subkeys: + script += f"redis.call('INCR', '{key}')" + script += f"redis.call('INCR', '{key}')" + + await c_master.eval(script, len(subkeys), *subkeys) + + # Wait for replicas + await check_all_replicas_finished([c_replica], c_master) + + for c_replica in c_replicas: + assert (await c_replica.mget(keys)) == ["10", "8", "6", "4", "2"] + + """ Test script replication.