From 6d4d740d6e2a060cbbbecd987ee438cc6e60de79 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Sun, 18 Jun 2023 21:14:28 +0300 Subject: [PATCH] fix: Don't remove non-concluding tx from queue on ooo runs (#1427) * fix: Don't remove non-concluding tx from queue on ooo runs --------- Signed-off-by: Vladislav Oleshko --- src/server/engine_shard_set.cc | 15 +++++++++++---- src/server/transaction.cc | 11 +++++++---- src/server/transaction.h | 19 ++++++++++++------- tests/dragonfly/generic_test.py | 32 +++++++++++++++++++++++++++++++- 4 files changed, 61 insertions(+), 16 deletions(-) diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 7ff8b4ef9..ed98853be 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -298,7 +298,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { DCHECK(continuation_trans_ == nullptr) << continuation_trans_->DebugId() << " when polling " << trans->DebugId(); - bool keep = trans->RunInShard(this); + bool keep = trans->RunInShard(this, false); if (keep) { return; } @@ -309,7 +309,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { trans = nullptr; if (continuation_trans_->IsArmedInShard(sid)) { - bool to_keep = continuation_trans_->RunInShard(this); + bool to_keep = continuation_trans_->RunInShard(this, false); DVLOG(1) << "RunContTrans: " << continuation_trans_->DebugId() << " keep: " << to_keep; if (!to_keep) { continuation_trans_ = nullptr; @@ -361,7 +361,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { dbg_id = head->DebugId(); } - bool keep = head->RunInShard(this); + bool keep = head->RunInShard(this, false); // We should not access head from this point since RunInShard callback decrements refcount. DLOG_IF(INFO, !dbg_id.empty()) << "RunHead " << dbg_id << ", keep " << keep; @@ -390,7 +390,14 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { } ++stats_.ooo_runs; - bool keep = trans->RunInShard(this); + bool txq_ooo = trans_mask & Transaction::OUT_OF_ORDER; + bool keep = trans->RunInShard(this, txq_ooo); + + // If the transaction concluded, it must remove itself from the tx queue. + // Otherwise it is required to stay there to keep the relative order. + if (txq_ooo && !trans->IsMulti()) + DCHECK_EQ(keep, trans->GetLocalTxqPos(sid) != TxQueue::kEnd); + DLOG_IF(INFO, !dbg_id.empty()) << "Eager run " << sid << ", " << dbg_id << ", keep " << keep; } } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 54396be79..29a54ea78 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -423,7 +423,7 @@ string Transaction::DebugId() const { } // Runs in the dbslice thread. Returns true if transaction needs to be kept in the queue. -bool Transaction::RunInShard(EngineShard* shard) { +bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { DCHECK_GT(run_count_.load(memory_order_relaxed), 0u); CHECK(cb_ptr_) << DebugId(); DCHECK_GT(txid_, 0u); @@ -491,9 +491,11 @@ bool Transaction::RunInShard(EngineShard* shard) { // at least the coordinator thread owns the reference. DCHECK_GE(GetUseCount(), 1u); - // we remove tx from tx-queue upon first invocation. - // if it needs to run again it runs via a dedicated continuation_trans_ state in EngineShard. - if (sd.pq_pos != TxQueue::kEnd) { + // If we're the head of tx queue (txq_ooo is false), we remove ourselves upon first invocation + // and successive hops are run by continuation_trans_ in engine shard. + // Otherwise we can remove ourselves only when we're concluding (so no more hops will follow). + bool remove_txq = is_concluding || !txq_ooo; + if (remove_txq && sd.pq_pos != TxQueue::kEnd) { shard->txq()->Remove(sd.pq_pos); sd.pq_pos = TxQueue::kEnd; } @@ -1152,6 +1154,7 @@ OpStatus Transaction::WatchInShard(ArgSlice keys, EngineShard* shard) { bc->AddWatched(keys, this); sd.local_mask |= SUSPENDED_Q; + sd.local_mask &= ~OUT_OF_ORDER; DVLOG(2) << "AddWatched " << DebugId() << " local_mask:" << sd.local_mask << ", first_key:" << keys.front(); diff --git a/src/server/transaction.h b/src/server/transaction.h index 4e3bc7d57..3d171d95c 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -166,9 +166,11 @@ class Transaction { // Can be used only for single key invocations, because it writes a into shared variable. template auto ScheduleSingleHopT(F&& f) -> decltype(f(this, nullptr)); - // Called by EngineShard when performing Execute over the tx queue. + // Called by engine shard to execute a transaction hop. + // txq_ooo is set to true if the transaction is running out of order + // not as the tx queue head. // Returns true if transaction should be kept in the queue. - bool RunInShard(EngineShard* shard); + bool RunInShard(EngineShard* shard, bool txq_ooo); // Registers transaction into watched queue and blocks until a) either notification is received. // or b) tp is reached. If tp is time_point::max() then waits indefinitely. @@ -243,6 +245,10 @@ class Transaction { return shard_data_[SidToId(sid)].local_mask; } + uint32_t GetLocalTxqPos(ShardId sid) const { + return shard_data_[SidToId(sid)].pq_pos; + } + TxId txid() const { return txid_; } @@ -365,11 +371,10 @@ class Transaction { COORD_SCHED = 1, COORD_EXEC = 2, - // We are running the last execution step in multi-hop operation. - COORD_EXEC_CONCLUDING = 4, - COORD_BLOCKED = 8, - COORD_CANCELLED = 0x10, - COORD_OOO = 0x20, + COORD_EXEC_CONCLUDING = 1 << 2, // Whether its the last hop of a transaction + COORD_BLOCKED = 1 << 3, + COORD_CANCELLED = 1 << 4, + COORD_OOO = 1 << 5, }; struct PerShardCache { diff --git a/tests/dragonfly/generic_test.py b/tests/dragonfly/generic_test.py index 9b5c89a9d..c25d0d8c5 100644 --- a/tests/dragonfly/generic_test.py +++ b/tests/dragonfly/generic_test.py @@ -1,9 +1,10 @@ import os import pytest import redis +import asyncio from redis import asyncio as aioredis -from . import dfly_multi_test_args +from . import dfly_multi_test_args, dfly_args from .utility import batch_fill_data, gen_test_data @@ -48,3 +49,32 @@ async def test_password(df_local_factory, export_dfly_password): client = aioredis.Redis(password=requirepass) await client.ping() dfly.stop() + + +""" +Make sure that multi-hop transactions can't run OOO. +""" + +MULTI_HOPS = """ +for i = 0, ARGV[1] do + redis.call('INCR', KEYS[1]) +end +""" + +@dfly_args({"proactor_threads": 1}) +async def test_txq_ooo(async_client: aioredis.Redis, df_server): + async def task1(k, h): + c = aioredis.Redis(port=df_server.port) + for _ in range(100): + await c.eval(MULTI_HOPS, 1, k, h) + + async def task2(k, n): + c = aioredis.Redis(port=df_server.port) + for _ in range(100): + pipe = c.pipeline(transaction=False) + pipe.lpush(k, 1) + for _ in range(n): + pipe.blpop(k, 0.001) + await pipe.execute() + + await asyncio.gather(task1('i1', 2), task1('i2', 3), task2('l1', 2), task2('l1', 2), task2('l1', 5))