fix: add Transaction::Conclude (#1606)

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2023-07-31 15:37:29 +03:00 committed by GitHub
parent 366f50230b
commit eda941dca6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
9 changed files with 23 additions and 34 deletions

View file

@ -498,10 +498,6 @@ template <typename T> void HandleOpValueResult(const OpResult<T>& result, Connec
}
}
OpStatus NoOpCb(Transaction* t, EngineShard* shard) {
return OpStatus::OK;
}
// ------------------------------------------------------------------------- //
// Impl for the command functions
void BitPos(CmdArgList args, ConnectionContext* cntx) {
@ -627,7 +623,7 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) {
const auto joined_results = CombineResultOp(result_set, op);
// Second phase - save to targe key if successful
if (!joined_results) {
cntx->transaction->Execute(NoOpCb, true);
cntx->transaction->Conclude();
(*cntx)->SendError(joined_results.status());
return;
} else {

View file

@ -253,8 +253,7 @@ OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty
} else if (result.status() == OpStatus::KEY_NOTFOUND) {
// Close transaction and return.
if (is_multi) {
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
trans->Execute(std::move(cb), true);
trans->Conclude();
return OpStatus::TIMED_OUT;
}
@ -270,8 +269,7 @@ OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty
} else {
// Could be the wrong-type error.
// cleanups, locks removal etc.
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
trans->Execute(std::move(cb), true);
trans->Conclude();
DCHECK_NE(result.status(), OpStatus::KEY_NOTFOUND);
return result.status();

View file

@ -333,19 +333,15 @@ void Renamer::Find(Transaction* t) {
};
void Renamer::Finalize(Transaction* t, bool skip_exist_dest) {
auto cleanup = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
if (!src_res_.found) {
status_ = OpStatus::KEY_NOTFOUND;
t->Execute(move(cleanup), true);
t->Conclude();
return;
}
if (dest_res_.found && skip_exist_dest) {
status_ = OpStatus::KEY_EXISTS;
t->Execute(move(cleanup), true);
t->Conclude();
return;
}

View file

@ -242,7 +242,7 @@ OpResult<int> PFMergeInternal(CmdArgList args, ConnectionContext* cntx) {
trans->Execute(std::move(cb), false);
if (!success) {
trans->Execute([](Transaction*, EngineShard*) { return OpStatus::OK; }, true);
trans->Conclude();
return OpStatus::INVALID_VALUE;
}

View file

@ -441,10 +441,8 @@ OpResult<string> MoveTwoShards(Transaction* trans, string_view src, string_view
if (!find_res[0] || find_res[1].status() == OpStatus::WRONG_TYPE) {
result = find_res[0] ? find_res[1] : find_res[0];
if (conclude_on_error) {
auto cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
trans->Execute(move(cb), true);
}
if (conclude_on_error)
trans->Conclude();
} else {
// Everything is ok, lets proceed with the mutations.
auto cb = [&](Transaction* t, EngineShard* shard) {
@ -880,8 +878,7 @@ OpResult<string> BPopPusher::RunSingle(Transaction* t, time_point tp) {
if (op_res.status() == OpStatus::KEY_NOTFOUND) {
op_res = OpStatus::TIMED_OUT;
}
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
t->Execute(std::move(cb), true);
t->Conclude();
return op_res;
}

View file

@ -517,10 +517,6 @@ SvArray ToSvArray(const absl::flat_hash_set<std::string_view>& set) {
return result;
}
OpStatus NoOpCb(Transaction* t, EngineShard* shard) {
return OpStatus::OK;
};
// if overwrite is true then OpAdd writes vals into the key and discards its previous value.
OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice vals, bool overwrite,
bool journal_update) {
@ -775,7 +771,7 @@ OpResult<unsigned> Mover::Commit(Transaction* t) {
}
if (noop) {
t->Execute(&NoOpCb, true);
t->Conclude();
} else {
t->Execute([this](Transaction* t, EngineShard* es) { return this->OpMutate(t, es); }, true);
}
@ -1284,7 +1280,7 @@ void SDiffStore(CmdArgList args, ConnectionContext* cntx) {
cntx->transaction->Execute(std::move(diff_cb), false);
ResultSetView rsv = DiffResultVec(result_set, src_shard);
if (!rsv) {
cntx->transaction->Execute(NoOpCb, true);
cntx->transaction->Conclude();
(*cntx)->SendError(rsv.status());
return;
}
@ -1364,7 +1360,7 @@ void SInterStore(CmdArgList args, ConnectionContext* cntx) {
OpResult<SvArray> result = InterResultVec(result_set, inter_shard_cnt.load(memory_order_relaxed));
if (!result) {
cntx->transaction->Execute(NoOpCb, true);
cntx->transaction->Conclude();
(*cntx)->SendError(result.status());
return;
}
@ -1426,7 +1422,7 @@ void SUnionStore(CmdArgList args, ConnectionContext* cntx) {
ResultSetView unionset = UnionResultVec(result_set);
if (!unionset) {
cntx->transaction->Execute(NoOpCb, true);
cntx->transaction->Conclude();
(*cntx)->SendError(unionset.status());
return;
}

View file

@ -1666,8 +1666,7 @@ void XReadBlock(ReadOpts opts, ConnectionContext* cntx) {
// entries.
if (opts.timeout == -1 || cntx->transaction->IsMulti()) {
// Close the transaction and release locks.
auto close_cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
cntx->transaction->Execute(std::move(close_cb), true);
cntx->transaction->Conclude();
return (*cntx)->SendNullArray();
}
@ -1735,8 +1734,7 @@ void XReadImpl(CmdArgList args, std::optional<ReadOpts> opts, ConnectionContext*
auto last_ids = StreamLastIDs(cntx->transaction);
if (!last_ids) {
// Close the transaction.
auto close_cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
cntx->transaction->Execute(std::move(close_cb), true);
cntx->transaction->Conclude();
if (last_ids.status() == OpStatus::WRONG_TYPE) {
(*cntx)->SendError(kWrongTypeErr);

View file

@ -897,6 +897,11 @@ void Transaction::ExecuteAsync() {
IterateActiveShards([&cb](PerShardData& sd, auto i) { shard_set->Add(i, cb); });
}
void Transaction::Conclude() {
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
Execute(std::move(cb), true);
}
void Transaction::RunQuickie(EngineShard* shard) {
DCHECK(!IsAtomicMulti());
DCHECK(shard_data_.size() == 1u || multi_->mode == NON_ATOMIC);

View file

@ -167,6 +167,9 @@ class Transaction {
// Can be used only for single key invocations, because it writes a into shared variable.
template <typename F> auto ScheduleSingleHopT(F&& f) -> decltype(f(this, nullptr));
// Conclude transaction
void Conclude();
// Called by engine shard to execute a transaction hop.
// txq_ooo is set to true if the transaction is running out of order
// not as the tx queue head.