mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-12 10:55:46 +02:00
chore: minor simplification around WaitWatch (#4580)
1. Rename FinalizeWatched to RemovedWatched. 2. Simplify passing of WatchedKeys - instead of passing a callback we pass either a single value or signal that all the shard keys should be used. Less generic but more explicit. 3. Reverse the result from RunInShard() function - to improve readability. All in all it should not have any functional changes besides logging. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
1e15e55cee
commit
13313507bf
9 changed files with 69 additions and 67 deletions
|
@ -119,7 +119,7 @@ bool BlockingController::DbWatchTable::AddAwakeEvent(string_view key) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Removes tx from its watch queues if tx appears there.
|
// Removes tx from its watch queues if tx appears there.
|
||||||
void BlockingController::FinalizeWatched(Keys keys, Transaction* tx) {
|
void BlockingController::RemovedWatched(Keys keys, Transaction* tx) {
|
||||||
DCHECK(tx);
|
DCHECK(tx);
|
||||||
VLOG(1) << "FinalizeBlocking [" << owner_->shard_id() << "]" << tx->DebugId();
|
VLOG(1) << "FinalizeBlocking [" << owner_->shard_id() << "]" << tx->DebugId();
|
||||||
|
|
||||||
|
@ -136,7 +136,7 @@ void BlockingController::FinalizeWatched(Keys keys, Transaction* tx) {
|
||||||
|
|
||||||
// Add keys of processed transaction so we could awake the next one in the queue
|
// Add keys of processed transaction so we could awake the next one in the queue
|
||||||
// in case those keys still exist.
|
// in case those keys still exist.
|
||||||
for (string_view key : base::it::Wrap(facade::kToSV, keys)) {
|
for (string_view key : keys) {
|
||||||
bool removed_awakened = wt.UnwatchTx(key, tx);
|
bool removed_awakened = wt.UnwatchTx(key, tx);
|
||||||
CHECK(!removed_awakened || removed)
|
CHECK(!removed_awakened || removed)
|
||||||
<< tx->DebugId() << " " << key << " " << tx->DEBUG_GetLocalMask(owner_->shard_id());
|
<< tx->DebugId() << " " << key << " " << tx->DEBUG_GetLocalMask(owner_->shard_id());
|
||||||
|
@ -207,7 +207,7 @@ void BlockingController::AddWatched(Keys watch_keys, KeyReadyChecker krc, Transa
|
||||||
|
|
||||||
DbWatchTable& wt = *dbit->second;
|
DbWatchTable& wt = *dbit->second;
|
||||||
|
|
||||||
for (auto key : base::it::Wrap(facade::kToSV, watch_keys)) {
|
for (auto key : watch_keys) {
|
||||||
auto [res, inserted] = wt.queue_map.emplace(key, nullptr);
|
auto [res, inserted] = wt.queue_map.emplace(key, nullptr);
|
||||||
if (inserted) {
|
if (inserted) {
|
||||||
res->second.reset(new WatchQueue);
|
res->second.reset(new WatchQueue);
|
||||||
|
|
|
@ -22,7 +22,7 @@ class BlockingController {
|
||||||
explicit BlockingController(EngineShard* owner, Namespace* ns);
|
explicit BlockingController(EngineShard* owner, Namespace* ns);
|
||||||
~BlockingController();
|
~BlockingController();
|
||||||
|
|
||||||
using Keys = std::variant<ShardArgs, ArgSlice>;
|
using Keys = ShardArgs;
|
||||||
|
|
||||||
bool HasAwakedTransaction() const {
|
bool HasAwakedTransaction() const {
|
||||||
return !awakened_transactions_.empty();
|
return !awakened_transactions_.empty();
|
||||||
|
@ -32,7 +32,8 @@ class BlockingController {
|
||||||
return awakened_transactions_;
|
return awakened_transactions_;
|
||||||
}
|
}
|
||||||
|
|
||||||
void FinalizeWatched(Keys keys, Transaction* tx);
|
// Removes transaction from watching these keys.
|
||||||
|
void RemovedWatched(Keys keys, Transaction* tx);
|
||||||
|
|
||||||
// go over potential wakened keys, verify them and activate watch queues.
|
// go over potential wakened keys, verify them and activate watch queues.
|
||||||
void NotifyPending();
|
void NotifyPending();
|
||||||
|
|
|
@ -87,7 +87,7 @@ TEST_F(BlockingControllerTest, Basic) {
|
||||||
keys, [](auto...) { return true; }, t);
|
keys, [](auto...) { return true; }, t);
|
||||||
EXPECT_EQ(1, bc.NumWatched(0));
|
EXPECT_EQ(1, bc.NumWatched(0));
|
||||||
|
|
||||||
bc.FinalizeWatched(keys, t);
|
bc.RemovedWatched(keys, t);
|
||||||
EXPECT_EQ(0, bc.NumWatched(0));
|
EXPECT_EQ(0, bc.NumWatched(0));
|
||||||
return OpStatus::OK;
|
return OpStatus::OK;
|
||||||
});
|
});
|
||||||
|
@ -98,10 +98,8 @@ TEST_F(BlockingControllerTest, Timeout) {
|
||||||
bool blocked;
|
bool blocked;
|
||||||
bool paused;
|
bool paused;
|
||||||
|
|
||||||
auto cb = [&](Transaction* t, EngineShard* shard) { return trans_->GetShardArgs(0); };
|
|
||||||
|
|
||||||
facade::OpStatus status = trans_->WaitOnWatch(
|
facade::OpStatus status = trans_->WaitOnWatch(
|
||||||
tp, cb, [](auto...) { return true; }, &blocked, &paused);
|
tp, Transaction::kShardArgs, [](auto...) { return true; }, &blocked, &paused);
|
||||||
|
|
||||||
EXPECT_EQ(status, facade::OpStatus::TIMED_OUT);
|
EXPECT_EQ(status, facade::OpStatus::TIMED_OUT);
|
||||||
unsigned num_watched = shard_set->Await(
|
unsigned num_watched = shard_set->Await(
|
||||||
|
|
|
@ -384,14 +384,14 @@ OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty
|
||||||
limit_tp = steady_clock::now() + milliseconds(limit_ms);
|
limit_tp = steady_clock::now() + milliseconds(limit_ms);
|
||||||
}
|
}
|
||||||
|
|
||||||
auto wcb = [](Transaction* t, EngineShard* shard) { return t->GetShardArgs(shard->shard_id()); };
|
|
||||||
auto* ns = &trans->GetNamespace();
|
auto* ns = &trans->GetNamespace();
|
||||||
const auto key_checker = [req_obj_type, ns](EngineShard* owner, const DbContext& context,
|
const auto key_checker = [req_obj_type, ns](EngineShard* owner, const DbContext& context,
|
||||||
Transaction*, std::string_view key) -> bool {
|
Transaction*, std::string_view key) -> bool {
|
||||||
return ns->GetDbSlice(owner->shard_id()).FindReadOnly(context, key, req_obj_type).ok();
|
return ns->GetDbSlice(owner->shard_id()).FindReadOnly(context, key, req_obj_type).ok();
|
||||||
};
|
};
|
||||||
|
|
||||||
auto status = trans->WaitOnWatch(limit_tp, std::move(wcb), key_checker, block_flag, pause_flag);
|
auto status =
|
||||||
|
trans->WaitOnWatch(limit_tp, Transaction::kShardArgs, key_checker, block_flag, pause_flag);
|
||||||
|
|
||||||
if (status != OpStatus::OK)
|
if (status != OpStatus::OK)
|
||||||
return status;
|
return status;
|
||||||
|
|
|
@ -602,7 +602,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
|
||||||
<< trans->DEBUG_GetLocalMask(sid);
|
<< trans->DEBUG_GetLocalMask(sid);
|
||||||
|
|
||||||
// Commands like BRPOPLPUSH don't conclude immediately
|
// Commands like BRPOPLPUSH don't conclude immediately
|
||||||
if (trans->RunInShard(this, false)) {
|
if (!trans->RunInShard(this, false)) {
|
||||||
// execution is blocked while HasAwakedTransaction() returns true, so no need to set
|
// execution is blocked while HasAwakedTransaction() returns true, so no need to set
|
||||||
// continuation_trans_. Moreover, setting it for wakened multi-hop transactions may lead to
|
// continuation_trans_. Moreover, setting it for wakened multi-hop transactions may lead to
|
||||||
// inconcistency, see BLMoveSimultaneously test.
|
// inconcistency, see BLMoveSimultaneously test.
|
||||||
|
@ -614,15 +614,11 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
|
||||||
continuation_trans_ = nullptr;
|
continuation_trans_ = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
string dbg_id;
|
|
||||||
bool update_stats = false;
|
bool update_stats = false;
|
||||||
|
|
||||||
auto run = [this, &dbg_id, &update_stats](Transaction* tx, bool is_ooo) -> bool /* keep */ {
|
auto run = [this, &update_stats](Transaction* tx, bool is_ooo) -> bool /* concluding */ {
|
||||||
dbg_id = VLOG_IS_ON(1) ? tx->DebugId() : "";
|
|
||||||
bool keep = tx->RunInShard(this, is_ooo);
|
|
||||||
DLOG_IF(INFO, !dbg_id.empty()) << dbg_id << ", keep " << keep << ", ooo " << is_ooo;
|
|
||||||
update_stats = true;
|
update_stats = true;
|
||||||
return keep;
|
return tx->RunInShard(this, is_ooo);
|
||||||
};
|
};
|
||||||
|
|
||||||
// Check the currently running transaction, we have to handle it first until it concludes
|
// Check the currently running transaction, we have to handle it first until it concludes
|
||||||
|
@ -632,9 +628,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
|
||||||
trans = nullptr;
|
trans = nullptr;
|
||||||
|
|
||||||
if ((is_self && disarmed) || continuation_trans_->DisarmInShard(sid)) {
|
if ((is_self && disarmed) || continuation_trans_->DisarmInShard(sid)) {
|
||||||
if (bool keep = run(continuation_trans_, false); !keep) {
|
if (bool concludes = run(continuation_trans_, false); concludes) {
|
||||||
// if this holds, we can remove this check altogether.
|
|
||||||
DCHECK(continuation_trans_ == nullptr);
|
|
||||||
continuation_trans_ = nullptr;
|
continuation_trans_ = nullptr;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -672,24 +666,27 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
|
||||||
DCHECK_LT(committed_txid_, txid); // strictly increasing when processed via txq
|
DCHECK_LT(committed_txid_, txid); // strictly increasing when processed via txq
|
||||||
committed_txid_ = txid;
|
committed_txid_ = txid;
|
||||||
|
|
||||||
if (bool keep = run(head, false); keep)
|
DCHECK(!continuation_trans_); // while() check above ensures this.
|
||||||
|
if (bool concludes = run(head, false); !concludes) {
|
||||||
continuation_trans_ = head;
|
continuation_trans_ = head;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// If we disarmed, but didn't find ourselves in the loop, run now.
|
// If we disarmed, but didn't find ourselves in the loop, run now.
|
||||||
if (trans && disarmed) {
|
if (trans && disarmed) {
|
||||||
DCHECK(trans_mask & (Transaction::OUT_OF_ORDER | Transaction::SUSPENDED_Q));
|
DCHECK(trans_mask & (Transaction::OUT_OF_ORDER | Transaction::SUSPENDED_Q));
|
||||||
|
CHECK(trans != continuation_trans_);
|
||||||
|
|
||||||
bool is_ooo = trans_mask & Transaction::OUT_OF_ORDER;
|
bool is_ooo = trans_mask & Transaction::OUT_OF_ORDER;
|
||||||
bool keep = run(trans, is_ooo);
|
bool concludes = run(trans, is_ooo);
|
||||||
if (is_ooo && !keep) {
|
if (is_ooo && concludes) {
|
||||||
stats_.tx_ooo_total++;
|
stats_.tx_ooo_total++;
|
||||||
}
|
}
|
||||||
|
|
||||||
// If the transaction concluded, it must remove itself from the tx queue.
|
// If the transaction concluded, it must remove itself from the tx queue.
|
||||||
// Otherwise it is required to stay there to keep the relative order.
|
// Otherwise it is required to stay there to keep the relative order.
|
||||||
if (is_ooo && !trans->IsMulti())
|
if (is_ooo && !trans->IsMulti())
|
||||||
DCHECK_EQ(keep, trans->DEBUG_GetTxqPosInShard(sid) != TxQueue::kEnd);
|
LOG_IF(DFATAL, concludes != (trans->DEBUG_GetTxqPosInShard(sid) == TxQueue::kEnd));
|
||||||
}
|
}
|
||||||
if (update_stats) {
|
if (update_stats) {
|
||||||
CacheStats();
|
CacheStats();
|
||||||
|
|
|
@ -507,10 +507,13 @@ OpResult<string> MoveTwoShards(Transaction* trans, string_view src, string_view
|
||||||
// blocking_controller does not have to be set with non-blocking transactions.
|
// blocking_controller does not have to be set with non-blocking transactions.
|
||||||
auto blocking_controller = t->GetNamespace().GetBlockingController(shard->shard_id());
|
auto blocking_controller = t->GetNamespace().GetBlockingController(shard->shard_id());
|
||||||
if (blocking_controller) {
|
if (blocking_controller) {
|
||||||
|
IndexSlice slice(0, 1);
|
||||||
|
ShardArgs sa{absl::MakeSpan(&src, 1), absl::MakeSpan(&slice, 1)};
|
||||||
|
|
||||||
// hack, again. since we hacked which queue we are waiting on (see RunPair)
|
// hack, again. since we hacked which queue we are waiting on (see RunPair)
|
||||||
// we must clean-up src key here manually. See RunPair why we do this.
|
// we must clean-up src key here manually. See RunPair why we do this.
|
||||||
// in short- we suspended on "src" on both shards.
|
// in short- we suspended on "src" on both shards.
|
||||||
blocking_controller->FinalizeWatched(ArgSlice({src}), t);
|
blocking_controller->RemovedWatched(sa, t);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
DVLOG(1) << "Popping value from list: " << key;
|
DVLOG(1) << "Popping value from list: " << key;
|
||||||
|
@ -1027,14 +1030,13 @@ OpResult<string> BPopPusher::RunSingle(time_point tp, Transaction* tx, Connectio
|
||||||
return op_res;
|
return op_res;
|
||||||
}
|
}
|
||||||
|
|
||||||
auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice(&pop_key_, 1); };
|
|
||||||
|
|
||||||
const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
|
const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
|
||||||
std::string_view key) -> bool {
|
std::string_view key) -> bool {
|
||||||
return context.GetDbSlice(owner->shard_id()).FindReadOnly(context, key, OBJ_LIST).ok();
|
return context.GetDbSlice(owner->shard_id()).FindReadOnly(context, key, OBJ_LIST).ok();
|
||||||
};
|
};
|
||||||
|
|
||||||
// Block
|
// Block
|
||||||
auto status = tx->WaitOnWatch(tp, std::move(wcb), key_checker, &(cntx->blocked), &(cntx->paused));
|
auto status = tx->WaitOnWatch(tp, pop_key_, key_checker, &(cntx->blocked), &(cntx->paused));
|
||||||
if (status != OpStatus::OK)
|
if (status != OpStatus::OK)
|
||||||
return status;
|
return status;
|
||||||
|
|
||||||
|
@ -1054,18 +1056,16 @@ OpResult<string> BPopPusher::RunPair(time_point tp, Transaction* tx, ConnectionC
|
||||||
return op_res;
|
return op_res;
|
||||||
}
|
}
|
||||||
|
|
||||||
// a hack: we watch in both shards for pop_key but only in the source shard it's relevant.
|
|
||||||
// Therefore we follow the regular flow of watching the key but for the destination shard it
|
|
||||||
// will never be triggerred.
|
|
||||||
// This allows us to run Transaction::Execute on watched transactions in both shards.
|
|
||||||
auto wcb = [&](Transaction* t, EngineShard* shard) { return ArgSlice(&this->pop_key_, 1); };
|
|
||||||
|
|
||||||
const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
|
const auto key_checker = [](EngineShard* owner, const DbContext& context, Transaction*,
|
||||||
std::string_view key) -> bool {
|
std::string_view key) -> bool {
|
||||||
return context.GetDbSlice(owner->shard_id()).FindReadOnly(context, key, OBJ_LIST).ok();
|
return context.GetDbSlice(owner->shard_id()).FindReadOnly(context, key, OBJ_LIST).ok();
|
||||||
};
|
};
|
||||||
|
|
||||||
if (auto status = tx->WaitOnWatch(tp, std::move(wcb), key_checker, &cntx->blocked, &cntx->paused);
|
// a hack: we watch in both shards for pop_key but only in the source shard it's relevant.
|
||||||
|
// Therefore we follow the regular flow of watching the key but for the destination shard it
|
||||||
|
// will never be triggerred.
|
||||||
|
// This allows us to run Transaction::Execute on watched transactions in both shards.
|
||||||
|
if (auto status = tx->WaitOnWatch(tp, pop_key_, key_checker, &cntx->blocked, &cntx->paused);
|
||||||
status != OpStatus::OK)
|
status != OpStatus::OK)
|
||||||
return status;
|
return status;
|
||||||
|
|
||||||
|
|
|
@ -2339,8 +2339,6 @@ void XReadBlock(ReadOpts* opts, Transaction* tx, SinkReplyBuilder* builder,
|
||||||
return rb->SendNullArray();
|
return rb->SendNullArray();
|
||||||
}
|
}
|
||||||
|
|
||||||
auto wcb = [](Transaction* t, EngineShard* shard) { return t->GetShardArgs(shard->shard_id()); };
|
|
||||||
|
|
||||||
auto tp = (opts->timeout) ? chrono::steady_clock::now() + chrono::milliseconds(opts->timeout)
|
auto tp = (opts->timeout) ? chrono::steady_clock::now() + chrono::milliseconds(opts->timeout)
|
||||||
: Transaction::time_point::max();
|
: Transaction::time_point::max();
|
||||||
|
|
||||||
|
@ -2372,7 +2370,8 @@ void XReadBlock(ReadOpts* opts, Transaction* tx, SinkReplyBuilder* builder,
|
||||||
return streamCompareID(&last_id, &sitem.group->last_id) > 0;
|
return streamCompareID(&last_id, &sitem.group->last_id) > 0;
|
||||||
};
|
};
|
||||||
|
|
||||||
if (auto status = tx->WaitOnWatch(tp, std::move(wcb), key_checker, &cntx->blocked, &cntx->paused);
|
if (auto status =
|
||||||
|
tx->WaitOnWatch(tp, Transaction::kShardArgs, key_checker, &cntx->blocked, &cntx->paused);
|
||||||
status != OpStatus::OK)
|
status != OpStatus::OK)
|
||||||
return rb->SendNullArray();
|
return rb->SendNullArray();
|
||||||
|
|
||||||
|
|
|
@ -576,7 +576,7 @@ void Transaction::PrepareMultiForScheduleSingleHop(Namespace* ns, ShardId sid, D
|
||||||
StoreKeysInArgs(*key_index);
|
StoreKeysInArgs(*key_index);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Runs in the dbslice thread. Returns true if the transaction continues running in the thread.
|
// Runs in the dbslice thread. Returns true if the transaction concluded.
|
||||||
bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
|
bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
|
||||||
DCHECK_GT(txid_, 0u);
|
DCHECK_GT(txid_, 0u);
|
||||||
CHECK(cb_ptr_) << DebugId();
|
CHECK(cb_ptr_) << DebugId();
|
||||||
|
@ -591,7 +591,6 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
|
||||||
|
|
||||||
bool was_suspended = sd.local_mask & SUSPENDED_Q;
|
bool was_suspended = sd.local_mask & SUSPENDED_Q;
|
||||||
bool awaked_prerun = sd.local_mask & AWAKED_Q;
|
bool awaked_prerun = sd.local_mask & AWAKED_Q;
|
||||||
|
|
||||||
IntentLock::Mode mode = LockMode();
|
IntentLock::Mode mode = LockMode();
|
||||||
|
|
||||||
DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED) || (multi_ && multi_->mode == GLOBAL));
|
DCHECK(IsGlobal() || (sd.local_mask & KEYLOCK_ACQUIRED) || (multi_ && multi_->mode == GLOBAL));
|
||||||
|
@ -650,7 +649,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
|
||||||
|
|
||||||
if (auto* bcontroller = namespace_->GetBlockingController(shard->shard_id()); bcontroller) {
|
if (auto* bcontroller = namespace_->GetBlockingController(shard->shard_id()); bcontroller) {
|
||||||
if (awaked_prerun || was_suspended) {
|
if (awaked_prerun || was_suspended) {
|
||||||
bcontroller->FinalizeWatched(GetShardArgs(idx), this);
|
bcontroller->RemovedWatched(GetShardArgs(idx), this);
|
||||||
}
|
}
|
||||||
|
|
||||||
// Wake only if no tx queue head is currently running
|
// Wake only if no tx queue head is currently running
|
||||||
|
@ -662,7 +661,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
|
||||||
}
|
}
|
||||||
|
|
||||||
FinishHop(); // From this point on we can not access 'this'.
|
FinishHop(); // From this point on we can not access 'this'.
|
||||||
return !is_concluding;
|
return is_concluding;
|
||||||
}
|
}
|
||||||
|
|
||||||
void Transaction::RunCallback(EngineShard* shard) {
|
void Transaction::RunCallback(EngineShard* shard) {
|
||||||
|
@ -995,14 +994,20 @@ void Transaction::EnableAllShards() {
|
||||||
|
|
||||||
// runs in coordinator thread.
|
// runs in coordinator thread.
|
||||||
// Marks the transaction as expired and removes it from the waiting queue.
|
// Marks the transaction as expired and removes it from the waiting queue.
|
||||||
void Transaction::ExpireBlocking(WaitKeysProvider wcb) {
|
void Transaction::ExpireBlocking(WaitKeys wkeys) {
|
||||||
DCHECK(!IsGlobal());
|
DCHECK(!IsGlobal());
|
||||||
DVLOG(1) << "ExpireBlocking " << DebugId();
|
DVLOG(1) << "ExpireBlocking " << DebugId();
|
||||||
run_barrier_.Start(unique_shard_cnt_);
|
run_barrier_.Start(unique_shard_cnt_);
|
||||||
|
|
||||||
auto expire_cb = [this, &wcb] {
|
auto expire_cb = [this, &wkeys] {
|
||||||
EngineShard* es = EngineShard::tlocal();
|
EngineShard* es = EngineShard::tlocal();
|
||||||
ExpireShardCb(wcb(this, es), es);
|
if (wkeys) {
|
||||||
|
IndexSlice is(0, 1);
|
||||||
|
ShardArgs sa(absl::MakeSpan(&wkeys.value(), 1), absl::MakeSpan(&is, 1));
|
||||||
|
ExpireShardCb(sa, es);
|
||||||
|
} else {
|
||||||
|
ExpireShardCb(GetShardArgs(es->shard_id()), es);
|
||||||
|
}
|
||||||
};
|
};
|
||||||
IterateActiveShards([&expire_cb](PerShardData& sd, auto i) { shard_set->Add(i, expire_cb); });
|
IterateActiveShards([&expire_cb](PerShardData& sd, auto i) { shard_set->Add(i, expire_cb); });
|
||||||
|
|
||||||
|
@ -1250,8 +1255,8 @@ ShardArgs Transaction::GetShardArgs(ShardId sid) const {
|
||||||
absl::MakeSpan(args_slices_.data() + sd.slice_start, sd.slice_count)};
|
absl::MakeSpan(args_slices_.data() + sd.slice_start, sd.slice_count)};
|
||||||
}
|
}
|
||||||
|
|
||||||
OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider,
|
OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeys wkeys, KeyReadyChecker krc,
|
||||||
KeyReadyChecker krc, bool* block_flag, bool* pause_flag) {
|
bool* block_flag, bool* pause_flag) {
|
||||||
if (blocking_barrier_.IsClaimed()) { // Might have been cancelled ahead by a dropping connection
|
if (blocking_barrier_.IsClaimed()) { // Might have been cancelled ahead by a dropping connection
|
||||||
Conclude();
|
Conclude();
|
||||||
return OpStatus::CANCELLED;
|
return OpStatus::CANCELLED;
|
||||||
|
@ -1261,8 +1266,14 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p
|
||||||
|
|
||||||
// Register keys on active shards blocking controllers and mark shard state as suspended.
|
// Register keys on active shards blocking controllers and mark shard state as suspended.
|
||||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||||
auto keys = wkeys_provider(t, shard);
|
if (wkeys) { // single string_view.
|
||||||
return t->WatchInShard(&t->GetNamespace(), keys, shard, krc);
|
IndexSlice is(0, 1);
|
||||||
|
ShardArgs sa(absl::MakeSpan(&wkeys.value(), 1), absl::MakeSpan(&is, 1));
|
||||||
|
t->WatchInShard(&t->GetNamespace(), sa, shard, krc);
|
||||||
|
} else {
|
||||||
|
t->WatchInShard(&t->GetNamespace(), t->GetShardArgs(shard->shard_id()), shard, krc);
|
||||||
|
}
|
||||||
|
return OpStatus::OK;
|
||||||
};
|
};
|
||||||
Execute(std::move(cb), true);
|
Execute(std::move(cb), true);
|
||||||
|
|
||||||
|
@ -1296,13 +1307,13 @@ OpStatus Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_p
|
||||||
|
|
||||||
// If we don't follow up with an "action" hop, we must clean up manually on all shards.
|
// If we don't follow up with an "action" hop, we must clean up manually on all shards.
|
||||||
if (result != OpStatus::OK)
|
if (result != OpStatus::OK)
|
||||||
ExpireBlocking(wkeys_provider);
|
ExpireBlocking(std::move(wkeys));
|
||||||
|
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
|
|
||||||
OpStatus Transaction::WatchInShard(Namespace* ns, BlockingController::Keys keys, EngineShard* shard,
|
void Transaction::WatchInShard(Namespace* ns, ShardArgs keys, EngineShard* shard,
|
||||||
KeyReadyChecker krc) {
|
KeyReadyChecker krc) {
|
||||||
auto& sd = shard_data_[SidToId(shard->shard_id())];
|
auto& sd = shard_data_[SidToId(shard->shard_id())];
|
||||||
|
|
||||||
CHECK_EQ(0, sd.local_mask & SUSPENDED_Q);
|
CHECK_EQ(0, sd.local_mask & SUSPENDED_Q);
|
||||||
|
@ -1311,11 +1322,9 @@ OpStatus Transaction::WatchInShard(Namespace* ns, BlockingController::Keys keys,
|
||||||
|
|
||||||
ns->GetOrAddBlockingController(shard)->AddWatched(keys, std::move(krc), this);
|
ns->GetOrAddBlockingController(shard)->AddWatched(keys, std::move(krc), this);
|
||||||
DVLOG(2) << "WatchInShard " << DebugId();
|
DVLOG(2) << "WatchInShard " << DebugId();
|
||||||
|
|
||||||
return OpStatus::OK;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
void Transaction::ExpireShardCb(BlockingController::Keys keys, EngineShard* shard) {
|
void Transaction::ExpireShardCb(ShardArgs keys, EngineShard* shard) {
|
||||||
// Blocking transactions don't release keys when suspending, release them now.
|
// Blocking transactions don't release keys when suspending, release them now.
|
||||||
auto lock_args = GetLockArgs(shard->shard_id());
|
auto lock_args = GetLockArgs(shard->shard_id());
|
||||||
GetDbSlice(shard->shard_id()).Release(LockMode(), lock_args);
|
GetDbSlice(shard->shard_id()).Release(LockMode(), lock_args);
|
||||||
|
@ -1323,7 +1332,7 @@ void Transaction::ExpireShardCb(BlockingController::Keys keys, EngineShard* shar
|
||||||
auto& sd = shard_data_[SidToId(shard->shard_id())];
|
auto& sd = shard_data_[SidToId(shard->shard_id())];
|
||||||
sd.local_mask &= ~KEYLOCK_ACQUIRED;
|
sd.local_mask &= ~KEYLOCK_ACQUIRED;
|
||||||
|
|
||||||
namespace_->GetBlockingController(shard->shard_id())->FinalizeWatched(keys, this);
|
namespace_->GetBlockingController(shard->shard_id())->RemovedWatched(keys, this);
|
||||||
DCHECK(!namespace_->GetBlockingController(shard->shard_id())
|
DCHECK(!namespace_->GetBlockingController(shard->shard_id())
|
||||||
->awakened_transactions()
|
->awakened_transactions()
|
||||||
.contains(this));
|
.contains(this));
|
||||||
|
|
|
@ -132,9 +132,9 @@ class Transaction {
|
||||||
// Callacks should return `OpStatus` which is implicitly converitble to `RunnableResult`!
|
// Callacks should return `OpStatus` which is implicitly converitble to `RunnableResult`!
|
||||||
using RunnableType = absl::FunctionRef<RunnableResult(Transaction* t, EngineShard*)>;
|
using RunnableType = absl::FunctionRef<RunnableResult(Transaction* t, EngineShard*)>;
|
||||||
|
|
||||||
// Provides keys to block on for specific shard.
|
static constexpr std::nullopt_t kShardArgs{std::nullopt};
|
||||||
using WaitKeysProvider =
|
// Provides an override to watch a specific key or kShardArgs to watch all keys in the shard.
|
||||||
std::function<std::variant<ShardArgs, ArgSlice>(Transaction*, EngineShard* shard)>;
|
using WaitKeys = std::optional<std::string_view>;
|
||||||
|
|
||||||
// Modes in which a multi transaction can run.
|
// Modes in which a multi transaction can run.
|
||||||
enum MultiMode {
|
enum MultiMode {
|
||||||
|
@ -207,15 +207,14 @@ class Transaction {
|
||||||
|
|
||||||
// Called by engine shard to execute a transaction hop.
|
// Called by engine shard to execute a transaction hop.
|
||||||
// txq_ooo is set to true if the transaction is running out of order
|
// txq_ooo is set to true if the transaction is running out of order
|
||||||
// not as the tx queue head.
|
// not as the tx queue head. Returns true if the transaction concludes.
|
||||||
// Returns true if the transaction continues running in the thread
|
|
||||||
bool RunInShard(EngineShard* shard, bool txq_ooo);
|
bool RunInShard(EngineShard* shard, bool txq_ooo);
|
||||||
|
|
||||||
// Registers transaction into watched queue and blocks until a) either notification is received.
|
// Registers transaction into watched queue and blocks until a) either notification is received.
|
||||||
// or b) tp is reached. If tp is time_point::max() then waits indefinitely.
|
// or b) tp is reached. If tp is time_point::max() then waits indefinitely.
|
||||||
// Expects that the transaction had been scheduled before, and uses Execute(.., true) to register.
|
// Expects that the transaction had been scheduled before, and uses Execute(.., true) to register.
|
||||||
// Returns false if timeout occurred, true if was notified by one of the keys.
|
// Returns false if timeout occurred, true if was notified by one of the keys.
|
||||||
facade::OpStatus WaitOnWatch(const time_point& tp, WaitKeysProvider cb, KeyReadyChecker krc,
|
facade::OpStatus WaitOnWatch(const time_point& tp, WaitKeys keys, KeyReadyChecker krc,
|
||||||
bool* block_flag, bool* pause_flag);
|
bool* block_flag, bool* pause_flag);
|
||||||
|
|
||||||
// Returns true if transaction is awaked, false if it's timed-out and can be removed from the
|
// Returns true if transaction is awaked, false if it's timed-out and can be removed from the
|
||||||
|
@ -529,13 +528,12 @@ class Transaction {
|
||||||
void RunCallback(EngineShard* shard);
|
void RunCallback(EngineShard* shard);
|
||||||
|
|
||||||
// Adds itself to watched queue in the shard. Must run in that shard thread.
|
// Adds itself to watched queue in the shard. Must run in that shard thread.
|
||||||
OpStatus WatchInShard(Namespace* ns, std::variant<ShardArgs, ArgSlice> keys, EngineShard* shard,
|
void WatchInShard(Namespace* ns, ShardArgs keys, EngineShard* shard, KeyReadyChecker krc);
|
||||||
KeyReadyChecker krc);
|
|
||||||
|
|
||||||
// Expire blocking transaction, unlock keys and unregister it from the blocking controller
|
// Expire blocking transaction, unlock keys and unregister it from the blocking controller
|
||||||
void ExpireBlocking(WaitKeysProvider wcb);
|
void ExpireBlocking(WaitKeys keys);
|
||||||
|
|
||||||
void ExpireShardCb(std::variant<ShardArgs, ArgSlice> keys, EngineShard* shard);
|
void ExpireShardCb(ShardArgs keys, EngineShard* shard);
|
||||||
|
|
||||||
// Returns true if we need to follow up with PollExecution on this shard.
|
// Returns true if we need to follow up with PollExecution on this shard.
|
||||||
bool CancelShardCb(EngineShard* shard);
|
bool CancelShardCb(EngineShard* shard);
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue