From eda941dca6fa32dfc313bdeb551a94fe699c1906 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Mon, 31 Jul 2023 15:37:29 +0300 Subject: [PATCH] fix: add Transaction::Conclude (#1606) Signed-off-by: Vladislav Oleshko --- src/server/bitops_family.cc | 6 +----- src/server/container_utils.cc | 6 ++---- src/server/generic_family.cc | 8 ++------ src/server/hll_family.cc | 2 +- src/server/list_family.cc | 9 +++------ src/server/set_family.cc | 12 ++++-------- src/server/stream_family.cc | 6 ++---- src/server/transaction.cc | 5 +++++ src/server/transaction.h | 3 +++ 9 files changed, 23 insertions(+), 34 deletions(-) diff --git a/src/server/bitops_family.cc b/src/server/bitops_family.cc index 4531e07d3..2bce75649 100644 --- a/src/server/bitops_family.cc +++ b/src/server/bitops_family.cc @@ -498,10 +498,6 @@ template void HandleOpValueResult(const OpResult& result, Connec } } -OpStatus NoOpCb(Transaction* t, EngineShard* shard) { - return OpStatus::OK; -} - // ------------------------------------------------------------------------- // // Impl for the command functions void BitPos(CmdArgList args, ConnectionContext* cntx) { @@ -627,7 +623,7 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) { const auto joined_results = CombineResultOp(result_set, op); // Second phase - save to targe key if successful if (!joined_results) { - cntx->transaction->Execute(NoOpCb, true); + cntx->transaction->Conclude(); (*cntx)->SendError(joined_results.status()); return; } else { diff --git a/src/server/container_utils.cc b/src/server/container_utils.cc index 4e39567fc..a1d99c17f 100644 --- a/src/server/container_utils.cc +++ b/src/server/container_utils.cc @@ -253,8 +253,7 @@ OpResult RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty } else if (result.status() == OpStatus::KEY_NOTFOUND) { // Close transaction and return. if (is_multi) { - auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; - trans->Execute(std::move(cb), true); + trans->Conclude(); return OpStatus::TIMED_OUT; } @@ -270,8 +269,7 @@ OpResult RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty } else { // Could be the wrong-type error. // cleanups, locks removal etc. - auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; - trans->Execute(std::move(cb), true); + trans->Conclude(); DCHECK_NE(result.status(), OpStatus::KEY_NOTFOUND); return result.status(); diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 11011a275..0898158b0 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -333,19 +333,15 @@ void Renamer::Find(Transaction* t) { }; void Renamer::Finalize(Transaction* t, bool skip_exist_dest) { - auto cleanup = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; - if (!src_res_.found) { status_ = OpStatus::KEY_NOTFOUND; - - t->Execute(move(cleanup), true); + t->Conclude(); return; } if (dest_res_.found && skip_exist_dest) { status_ = OpStatus::KEY_EXISTS; - - t->Execute(move(cleanup), true); + t->Conclude(); return; } diff --git a/src/server/hll_family.cc b/src/server/hll_family.cc index be5402ecf..eec83e6eb 100644 --- a/src/server/hll_family.cc +++ b/src/server/hll_family.cc @@ -242,7 +242,7 @@ OpResult PFMergeInternal(CmdArgList args, ConnectionContext* cntx) { trans->Execute(std::move(cb), false); if (!success) { - trans->Execute([](Transaction*, EngineShard*) { return OpStatus::OK; }, true); + trans->Conclude(); return OpStatus::INVALID_VALUE; } diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 594e35338..3f2e2b4dc 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -441,10 +441,8 @@ OpResult MoveTwoShards(Transaction* trans, string_view src, string_view if (!find_res[0] || find_res[1].status() == OpStatus::WRONG_TYPE) { result = find_res[0] ? find_res[1] : find_res[0]; - if (conclude_on_error) { - auto cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; - trans->Execute(move(cb), true); - } + if (conclude_on_error) + trans->Conclude(); } else { // Everything is ok, lets proceed with the mutations. auto cb = [&](Transaction* t, EngineShard* shard) { @@ -880,8 +878,7 @@ OpResult BPopPusher::RunSingle(Transaction* t, time_point tp) { if (op_res.status() == OpStatus::KEY_NOTFOUND) { op_res = OpStatus::TIMED_OUT; } - auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; - t->Execute(std::move(cb), true); + t->Conclude(); return op_res; } diff --git a/src/server/set_family.cc b/src/server/set_family.cc index 93f99b324..20c0a7e2c 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -517,10 +517,6 @@ SvArray ToSvArray(const absl::flat_hash_set& set) { return result; } -OpStatus NoOpCb(Transaction* t, EngineShard* shard) { - return OpStatus::OK; -}; - // if overwrite is true then OpAdd writes vals into the key and discards its previous value. OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice vals, bool overwrite, bool journal_update) { @@ -775,7 +771,7 @@ OpResult Mover::Commit(Transaction* t) { } if (noop) { - t->Execute(&NoOpCb, true); + t->Conclude(); } else { t->Execute([this](Transaction* t, EngineShard* es) { return this->OpMutate(t, es); }, true); } @@ -1284,7 +1280,7 @@ void SDiffStore(CmdArgList args, ConnectionContext* cntx) { cntx->transaction->Execute(std::move(diff_cb), false); ResultSetView rsv = DiffResultVec(result_set, src_shard); if (!rsv) { - cntx->transaction->Execute(NoOpCb, true); + cntx->transaction->Conclude(); (*cntx)->SendError(rsv.status()); return; } @@ -1364,7 +1360,7 @@ void SInterStore(CmdArgList args, ConnectionContext* cntx) { OpResult result = InterResultVec(result_set, inter_shard_cnt.load(memory_order_relaxed)); if (!result) { - cntx->transaction->Execute(NoOpCb, true); + cntx->transaction->Conclude(); (*cntx)->SendError(result.status()); return; } @@ -1426,7 +1422,7 @@ void SUnionStore(CmdArgList args, ConnectionContext* cntx) { ResultSetView unionset = UnionResultVec(result_set); if (!unionset) { - cntx->transaction->Execute(NoOpCb, true); + cntx->transaction->Conclude(); (*cntx)->SendError(unionset.status()); return; } diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 5990b46f1..f40fad56d 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -1666,8 +1666,7 @@ void XReadBlock(ReadOpts opts, ConnectionContext* cntx) { // entries. if (opts.timeout == -1 || cntx->transaction->IsMulti()) { // Close the transaction and release locks. - auto close_cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; - cntx->transaction->Execute(std::move(close_cb), true); + cntx->transaction->Conclude(); return (*cntx)->SendNullArray(); } @@ -1735,8 +1734,7 @@ void XReadImpl(CmdArgList args, std::optional opts, ConnectionContext* auto last_ids = StreamLastIDs(cntx->transaction); if (!last_ids) { // Close the transaction. - auto close_cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; - cntx->transaction->Execute(std::move(close_cb), true); + cntx->transaction->Conclude(); if (last_ids.status() == OpStatus::WRONG_TYPE) { (*cntx)->SendError(kWrongTypeErr); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index ec74648d0..f823beedb 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -897,6 +897,11 @@ void Transaction::ExecuteAsync() { IterateActiveShards([&cb](PerShardData& sd, auto i) { shard_set->Add(i, cb); }); } +void Transaction::Conclude() { + auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; + Execute(std::move(cb), true); +} + void Transaction::RunQuickie(EngineShard* shard) { DCHECK(!IsAtomicMulti()); DCHECK(shard_data_.size() == 1u || multi_->mode == NON_ATOMIC); diff --git a/src/server/transaction.h b/src/server/transaction.h index 945054c21..b6c9e8795 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -167,6 +167,9 @@ class Transaction { // Can be used only for single key invocations, because it writes a into shared variable. template auto ScheduleSingleHopT(F&& f) -> decltype(f(this, nullptr)); + // Conclude transaction + void Conclude(); + // Called by engine shard to execute a transaction hop. // txq_ooo is set to true if the transaction is running out of order // not as the tx queue head.