chore(transaction): Introduce RunCallback (#2760)

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-03-22 16:02:02 +03:00 committed by GitHub
parent 9db825013a
commit 3aa4a29834
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 64 additions and 45 deletions

View file

@ -7,6 +7,7 @@
#include <absl/strings/match.h>
#include "base/logging.h"
#include "glog/logging.h"
#include "server/blocking_controller.h"
#include "server/command_registry.h"
#include "server/db_slice.h"
@ -573,7 +574,6 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
bool was_suspended = sd.local_mask & SUSPENDED_Q;
bool awaked_prerun = sd.local_mask & AWAKED_Q;
bool is_concluding = coordinator_state_ & COORD_CONCLUDING;
IntentLock::Mode mode = LockMode();
@ -581,53 +581,14 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
DCHECK(!txq_ooo || (sd.local_mask & OUT_OF_ORDER));
/*************************************************************************/
// Actually running the callback.
// If you change the logic here, also please change the logic
RunnableResult result;
try {
// if a transaction is suspended, we still run it because of brpoplpush/blmove case
// that needs to run lpush on its suspended shard.
result = (*cb_ptr_)(this, shard);
if (unique_shard_cnt_ == 1) {
cb_ptr_ = nullptr; // We can do it because only a single thread runs the callback.
local_result_ = result;
} else {
if (result == OpStatus::OUT_OF_MEMORY) {
absl::base_internal::SpinLockHolder lk{&local_result_mu_};
CHECK(local_result_ == OpStatus::OK || local_result_ == OpStatus::OUT_OF_MEMORY);
local_result_ = result;
} else {
CHECK_EQ(OpStatus::OK, result);
}
}
} catch (std::bad_alloc&) {
LOG_FIRST_N(ERROR, 16) << " out of memory"; // TODO: to log at most once per sec.
absl::base_internal::SpinLockHolder lk{&local_result_mu_};
local_result_ = OpStatus::OUT_OF_MEMORY;
} catch (std::exception& e) {
LOG(FATAL) << "Unexpected exception " << e.what();
}
RunCallback(shard);
/*************************************************************************/
// at least the coordinator thread owns the reference.
DCHECK_GE(GetUseCount(), 1u);
shard->db_slice().OnCbFinish();
// Handle result flags to alter behaviour.
if (result.flags & RunnableResult::AVOID_CONCLUDING) {
// Multi shard callbacks should either all or none choose to conclude. Because they can't
// communicate, the must know their decision ahead, consequently there is no point in using this
// flag.
CHECK_EQ(unique_shard_cnt_, 1u);
DCHECK(is_concluding || multi_->concluding);
is_concluding = false;
}
// Log to jounrnal only once the command finished running
if (is_concluding || (multi_ && multi_->concluding))
LogAutoJournalOnShard(shard, result);
bool is_concluding = coordinator_state_ & COORD_CONCLUDING;
// If we're the head of tx queue (txq_ooo is false), we remove ourselves upon first invocation
// and successive hops are run by continuation_trans_ in engine shard.
@ -692,6 +653,54 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
return !is_concluding;
}
void Transaction::RunCallback(EngineShard* shard) {
DCHECK_EQ(EngineShard::tlocal(), shard);
// Actually running the callback.
// If you change the logic here, also please change the logic
RunnableResult result;
try {
// if a transaction is suspended, we still run it because of brpoplpush/blmove case
// that needs to run lpush on its suspended shard.
result = (*cb_ptr_)(this, shard);
if (unique_shard_cnt_ == 1) {
cb_ptr_ = nullptr; // We can do it because only a single thread runs the callback.
local_result_ = result;
} else {
if (result == OpStatus::OUT_OF_MEMORY) {
absl::base_internal::SpinLockHolder lk{&local_result_mu_};
CHECK(local_result_ == OpStatus::OK || local_result_ == OpStatus::OUT_OF_MEMORY);
local_result_ = result;
} else {
CHECK_EQ(OpStatus::OK, result);
}
}
} catch (std::bad_alloc&) {
LOG_FIRST_N(ERROR, 16) << " out of memory"; // TODO: to log at most once per sec.
absl::base_internal::SpinLockHolder lk{&local_result_mu_};
local_result_ = OpStatus::OUT_OF_MEMORY;
} catch (std::exception& e) {
LOG(FATAL) << "Unexpected exception " << e.what();
}
shard->db_slice().OnCbFinish();
// Handle result flags to alter behaviour.
if (result.flags & RunnableResult::AVOID_CONCLUDING) {
// Multi shard callbacks should either all or none choose to conclude. Because they can't
// communicate, the must know their decision ahead, consequently there is no point in using this
// flag.
CHECK_EQ(unique_shard_cnt_, 1u);
DCHECK((coordinator_state_ & COORD_CONCLUDING) || multi_->concluding);
coordinator_state_ &= ~COORD_CONCLUDING; // safe because single shard
}
// Log to jounrnal only once the command finished running
if ((coordinator_state_ & COORD_CONCLUDING) || (multi_ && multi_->concluding))
LogAutoJournalOnShard(shard, result);
}
// TODO: For multi-transactions we should be able to deduce mode() at run-time based
// on the context. For regular multi-transactions we can actually inspect all commands.
// For eval-like transactions - we can decide based on the command flavor (EVAL/EVALRO) or
@ -941,9 +950,7 @@ void Transaction::ExecuteAsync() {
poll_flags.set(i, true);
});
auto* ss = ServerState::tlocal();
if (unique_shard_cnt_ == 1 && ss->thread_index() == unique_shard_id_ &&
ss->AllowInlineScheduling()) {
if (CanRunInlined()) {
DVLOG(1) << "Short-circuit ExecuteAsync " << DebugId();
EngineShard::tlocal()->PollExecution("exec_cb", this);
return;
@ -1597,6 +1604,12 @@ void Transaction::CancelBlocking(std::function<OpStatus(ArgSlice)> status_cb) {
blocking_barrier_.Close();
}
bool Transaction::CanRunInlined() const {
auto* ss = ServerState::tlocal();
return unique_shard_cnt_ == 1 && unique_shard_id_ == ss->thread_index() &&
ss->AllowInlineScheduling();
}
OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
KeyIndex key_index;