fix: Don't remove non-concluding tx from queue on ooo runs (#1427)

* fix: Don't remove non-concluding tx from queue on ooo runs

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2023-06-18 21:14:28 +03:00 committed by GitHub
parent 7ec5bd912c
commit 6d4d740d6e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 61 additions and 16 deletions

View file

@ -298,7 +298,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
DCHECK(continuation_trans_ == nullptr)
<< continuation_trans_->DebugId() << " when polling " << trans->DebugId();
bool keep = trans->RunInShard(this);
bool keep = trans->RunInShard(this, false);
if (keep) {
return;
}
@ -309,7 +309,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
trans = nullptr;
if (continuation_trans_->IsArmedInShard(sid)) {
bool to_keep = continuation_trans_->RunInShard(this);
bool to_keep = continuation_trans_->RunInShard(this, false);
DVLOG(1) << "RunContTrans: " << continuation_trans_->DebugId() << " keep: " << to_keep;
if (!to_keep) {
continuation_trans_ = nullptr;
@ -361,7 +361,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
dbg_id = head->DebugId();
}
bool keep = head->RunInShard(this);
bool keep = head->RunInShard(this, false);
// We should not access head from this point since RunInShard callback decrements refcount.
DLOG_IF(INFO, !dbg_id.empty()) << "RunHead " << dbg_id << ", keep " << keep;
@ -390,7 +390,14 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
}
++stats_.ooo_runs;
bool keep = trans->RunInShard(this);
bool txq_ooo = trans_mask & Transaction::OUT_OF_ORDER;
bool keep = trans->RunInShard(this, txq_ooo);
// If the transaction concluded, it must remove itself from the tx queue.
// Otherwise it is required to stay there to keep the relative order.
if (txq_ooo && !trans->IsMulti())
DCHECK_EQ(keep, trans->GetLocalTxqPos(sid) != TxQueue::kEnd);
DLOG_IF(INFO, !dbg_id.empty()) << "Eager run " << sid << ", " << dbg_id << ", keep " << keep;
}
}

View file

@ -423,7 +423,7 @@ string Transaction::DebugId() const {
}
// Runs in the dbslice thread. Returns true if transaction needs to be kept in the queue.
bool Transaction::RunInShard(EngineShard* shard) {
bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
DCHECK_GT(run_count_.load(memory_order_relaxed), 0u);
CHECK(cb_ptr_) << DebugId();
DCHECK_GT(txid_, 0u);
@ -491,9 +491,11 @@ bool Transaction::RunInShard(EngineShard* shard) {
// at least the coordinator thread owns the reference.
DCHECK_GE(GetUseCount(), 1u);
// we remove tx from tx-queue upon first invocation.
// if it needs to run again it runs via a dedicated continuation_trans_ state in EngineShard.
if (sd.pq_pos != TxQueue::kEnd) {
// If we're the head of tx queue (txq_ooo is false), we remove ourselves upon first invocation
// and successive hops are run by continuation_trans_ in engine shard.
// Otherwise we can remove ourselves only when we're concluding (so no more hops will follow).
bool remove_txq = is_concluding || !txq_ooo;
if (remove_txq && sd.pq_pos != TxQueue::kEnd) {
shard->txq()->Remove(sd.pq_pos);
sd.pq_pos = TxQueue::kEnd;
}
@ -1152,6 +1154,7 @@ OpStatus Transaction::WatchInShard(ArgSlice keys, EngineShard* shard) {
bc->AddWatched(keys, this);
sd.local_mask |= SUSPENDED_Q;
sd.local_mask &= ~OUT_OF_ORDER;
DVLOG(2) << "AddWatched " << DebugId() << " local_mask:" << sd.local_mask
<< ", first_key:" << keys.front();

View file

@ -166,9 +166,11 @@ class Transaction {
// Can be used only for single key invocations, because it writes a into shared variable.
template <typename F> auto ScheduleSingleHopT(F&& f) -> decltype(f(this, nullptr));
// Called by EngineShard when performing Execute over the tx queue.
// Called by engine shard to execute a transaction hop.
// txq_ooo is set to true if the transaction is running out of order
// not as the tx queue head.
// Returns true if transaction should be kept in the queue.
bool RunInShard(EngineShard* shard);
bool RunInShard(EngineShard* shard, bool txq_ooo);
// Registers transaction into watched queue and blocks until a) either notification is received.
// or b) tp is reached. If tp is time_point::max() then waits indefinitely.
@ -243,6 +245,10 @@ class Transaction {
return shard_data_[SidToId(sid)].local_mask;
}
uint32_t GetLocalTxqPos(ShardId sid) const {
return shard_data_[SidToId(sid)].pq_pos;
}
TxId txid() const {
return txid_;
}
@ -365,11 +371,10 @@ class Transaction {
COORD_SCHED = 1,
COORD_EXEC = 2,
// We are running the last execution step in multi-hop operation.
COORD_EXEC_CONCLUDING = 4,
COORD_BLOCKED = 8,
COORD_CANCELLED = 0x10,
COORD_OOO = 0x20,
COORD_EXEC_CONCLUDING = 1 << 2, // Whether its the last hop of a transaction
COORD_BLOCKED = 1 << 3,
COORD_CANCELLED = 1 << 4,
COORD_OOO = 1 << 5,
};
struct PerShardCache {

View file

@ -1,9 +1,10 @@
import os
import pytest
import redis
import asyncio
from redis import asyncio as aioredis
from . import dfly_multi_test_args
from . import dfly_multi_test_args, dfly_args
from .utility import batch_fill_data, gen_test_data
@ -48,3 +49,32 @@ async def test_password(df_local_factory, export_dfly_password):
client = aioredis.Redis(password=requirepass)
await client.ping()
dfly.stop()
"""
Make sure that multi-hop transactions can't run OOO.
"""
MULTI_HOPS = """
for i = 0, ARGV[1] do
redis.call('INCR', KEYS[1])
end
"""
@dfly_args({"proactor_threads": 1})
async def test_txq_ooo(async_client: aioredis.Redis, df_server):
async def task1(k, h):
c = aioredis.Redis(port=df_server.port)
for _ in range(100):
await c.eval(MULTI_HOPS, 1, k, h)
async def task2(k, n):
c = aioredis.Redis(port=df_server.port)
for _ in range(100):
pipe = c.pipeline(transaction=False)
pipe.lpush(k, 1)
for _ in range(n):
pipe.blpop(k, 0.001)
await pipe.execute()
await asyncio.gather(task1('i1', 2), task1('i2', 3), task2('l1', 2), task2('l1', 2), task2('l1', 5))