Clean-ups in transaction code.

This commit is contained in:
Roman Gershman 2022-04-29 15:50:16 +03:00
parent 2966d04743
commit afd52d5571
5 changed files with 68 additions and 73 deletions

View file

@ -211,16 +211,6 @@ void BlockingController::AwakeWatched(DbIndex db_index, string_view db_key) {
}
}
void BlockingController::RegisterAwaitForConverge(Transaction* t) {
TxId notify_id = t->notify_txid();
DVLOG(1) << "RegisterForConverge " << t->DebugId() << " at notify " << notify_id;
// t->notify_txid might improve in parallel. it does not matter since convergence
// will happen even with stale notify_id.
waiting_convergence_.emplace(notify_id, t);
}
// Internal function called from ProcessAwakened().
// Marks the queue as active and notifies the first transaction in the queue.
void BlockingController::NotifyWatchQueue(WatchQueue* wq) {
@ -245,6 +235,8 @@ void BlockingController::NotifyWatchQueue(WatchQueue* wq) {
} while (!queue.empty());
}
#if 0
void BlockingController::OnTxFinish() {
VLOG(1) << "OnTxFinish [" << owner_->shard_id() << "]";
@ -276,9 +268,17 @@ void BlockingController::OnTxFinish() {
} while (!waiting_convergence_.empty());
}
void BlockingController::NotifyConvergence(Transaction* tx) {
LOG(FATAL) << "TBD";
void BlockingController::RegisterAwaitForConverge(Transaction* t) {
TxId notify_id = t->notify_txid();
DVLOG(1) << "RegisterForConverge " << t->DebugId() << " at notify " << notify_id;
// t->notify_txid might improve in parallel. it does not matter since convergence
// will happen even with stale notify_id.
waiting_convergence_.emplace(notify_id, t);
}
#endif
size_t BlockingController::NumWatched(DbIndex db_indx) const {
auto it = watched_dbs_.find(db_indx);

View file

@ -44,9 +44,9 @@ class BlockingController {
// Called from operations that create keys like lpush, rename etc.
void AwakeWatched(DbIndex db_index, std::string_view db_key);
void OnTxFinish();
// void OnTxFinish();
void RegisterAwaitForConverge(Transaction* t);
// void RegisterAwaitForConverge(Transaction* t);
size_t NumWatched(DbIndex db_indx) const;
@ -60,7 +60,7 @@ class BlockingController {
/// or null if all transactions in the queue have expired..
void NotifyWatchQueue(WatchQueue* wq);
void NotifyConvergence(Transaction* tx);
// void NotifyConvergence(Transaction* tx);
EngineShard* owner_;
@ -75,6 +75,6 @@ class BlockingController {
// could awaken arbitrary number of keys.
absl::flat_hash_set<Transaction*> awakened_transactions_;
absl::btree_multimap<TxId, Transaction*> waiting_convergence_;
// absl::btree_multimap<TxId, Transaction*> waiting_convergence_;
};
} // namespace dfly

View file

@ -143,8 +143,6 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
DVLOG(1) << "RunContTrans: " << continuation_trans_->DebugId() << " keep: " << to_keep;
if (!to_keep) {
continuation_trans_ = nullptr;
if (blocking_controller_)
blocking_controller_->OnTxFinish();
}
}
}
@ -196,9 +194,6 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
continuation_trans_ = head;
break;
}
if (blocking_controller_)
blocking_controller_->OnTxFinish();
} // while(!txq_.Empty())
} else { // if (continuation_trans_ == nullptr && !has_awaked_trans)
DVLOG(1) << "Skipped TxQueue " << continuation_trans_ << " " << has_awaked_trans;
@ -242,8 +237,6 @@ void EngineShard::ShutdownMulti(Transaction* multi) {
if (continuation_trans_ == multi) {
continuation_trans_ = nullptr;
}
if (blocking_controller_)
blocking_controller_->OnTxFinish();
}
#if 0

View file

@ -543,56 +543,11 @@ void Transaction::UnlockMulti() {
sharded_keys[sid].push_back(k_v);
}
auto cb = [&] {
EngineShard* shard = EngineShard::tlocal();
if (multi_->multi_opts & CO::GLOBAL_TRANS) {
shard->shard_lock()->Release(IntentLock::EXCLUSIVE);
}
ShardId sid = shard->shard_id();
for (const auto& k_v : sharded_keys[sid]) {
auto release = [&](IntentLock::Mode mode) {
if (k_v.second.cnt[mode]) {
shard->db_slice().Release(mode, this->db_index_, k_v.first, k_v.second.cnt[mode]);
}
};
release(IntentLock::SHARED);
release(IntentLock::EXCLUSIVE);
}
auto& sd = shard_data_[SidToId(shard->shard_id())];
// It does not have to be that all shards in multi transaction execute this tx.
// Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from
// there.
if (sd.pq_pos != TxQueue::kEnd) {
DVLOG(1) << "unlockmulti: TxPopFront " << DebugId();
TxQueue* txq = shard->txq();
DCHECK(!txq->Empty());
Transaction* trans = absl::get<Transaction*>(txq->Front());
DCHECK(trans == this);
txq->PopFront();
sd.pq_pos = TxQueue::kEnd;
}
shard->ShutdownMulti(this);
// notify awakened transactions.
if (shard->blocking_controller())
shard->blocking_controller()->RunStep(nullptr);
shard->PollExecution("unlockmulti", nullptr);
this->DecreaseRunCnt();
};
uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed);
DCHECK_EQ(prev, 0u);
for (ShardId i = 0; i < shard_data_.size(); ++i) {
ess_->Add(i, cb);
ess_->Add(i, [&] { UnlockMultiShardCb(sharded_keys, EngineShard::tlocal()); });
}
WaitForShardCallbacks();
DCHECK_GE(use_count(), 1u);
@ -1044,6 +999,49 @@ void Transaction::ExpireShardCb(EngineShard* shard) {
CHECK_GE(DecreaseRunCnt(), 1u);
}
void Transaction::UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, EngineShard* shard) {
if (multi_->multi_opts & CO::GLOBAL_TRANS) {
shard->shard_lock()->Release(IntentLock::EXCLUSIVE);
}
ShardId sid = shard->shard_id();
for (const auto& k_v : sharded_keys[sid]) {
auto release = [&](IntentLock::Mode mode) {
if (k_v.second.cnt[mode]) {
shard->db_slice().Release(mode, db_index_, k_v.first, k_v.second.cnt[mode]);
}
};
release(IntentLock::SHARED);
release(IntentLock::EXCLUSIVE);
}
auto& sd = shard_data_[SidToId(shard->shard_id())];
// It does not have to be that all shards in multi transaction execute this tx.
// Hence it could stay in the tx queue. We perform the necessary cleanup and remove it from
// there.
if (sd.pq_pos != TxQueue::kEnd) {
DVLOG(1) << "unlockmulti: TxPopFront " << DebugId();
TxQueue* txq = shard->txq();
DCHECK(!txq->Empty());
Transaction* trans = absl::get<Transaction*>(txq->Front());
DCHECK(trans == this);
txq->PopFront();
sd.pq_pos = TxQueue::kEnd;
}
shard->ShutdownMulti(this);
// notify awakened transactions.
if (shard->blocking_controller())
shard->blocking_controller()->RunStep(nullptr);
shard->PollExecution("unlockmulti", nullptr);
this->DecreaseRunCnt();
}
#if 0
// HasResultConverged has detailed documentation on how convergence is determined.
void Transaction::CheckForConvergence(EngineShard* shard) {

View file

@ -187,6 +187,13 @@ class Transaction {
KeyLockArgs GetLockArgs(ShardId sid) const;
private:
struct LockCnt {
unsigned cnt[2] = {0, 0};
};
using KeyList = std::vector<std::pair<std::string_view, LockCnt>>;
unsigned SidToId(ShardId sid) const {
return sid < shard_data_.size() ? sid : 0;
}
@ -215,7 +222,7 @@ class Transaction {
OpStatus AddToWatchedShardCb(EngineShard* shard);
bool RemoveFromWatchedShardCb(EngineShard* shard);
void ExpireShardCb(EngineShard* shard);
void CheckForConvergence(EngineShard* shard);
void UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, EngineShard* shard);
void WaitForShardCallbacks() {
run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); });
@ -251,9 +258,6 @@ class Transaction {
};
enum { kPerShardSize = sizeof(PerShardData) };
struct LockCnt {
unsigned cnt[2] = {0, 0};
};
struct Multi {
absl::flat_hash_map<std::string_view, LockCnt> locks;