mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
chore: Pull helio with new BlockingCounter (#2711)
Pull helio with new BlockingCounter and fix all usages to use -> operator --------- Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
9ba532a826
commit
a3dc9382bb
8 changed files with 23 additions and 24 deletions
2
helio
2
helio
|
@ -1 +1 @@
|
|||
Subproject commit aae233db752422608ac11b93b9edbe948b25cbf1
|
||||
Subproject commit 4829d23bca125186e0d34faebf294a7579acb398
|
|
@ -476,7 +476,7 @@ void Connection::DispatchOperations::operator()(const MigrationRequestMessage& m
|
|||
void Connection::DispatchOperations::operator()(CheckpointMessage msg) {
|
||||
VLOG(2) << "Decremented checkpoint at " << self->DebugInfo();
|
||||
|
||||
msg.bc.Dec();
|
||||
msg.bc->Dec();
|
||||
}
|
||||
|
||||
void Connection::DispatchOperations::operator()(const InvalidationMessage& msg) {
|
||||
|
@ -1458,7 +1458,7 @@ void Connection::SendCheckpoint(fb2::BlockingCounter bc, bool ignore_paused, boo
|
|||
|
||||
VLOG(2) << "Sent checkpoint to " << DebugInfo();
|
||||
|
||||
bc.Add(1);
|
||||
bc->Add(1);
|
||||
SendAsync({CheckpointMessage{bc}});
|
||||
}
|
||||
|
||||
|
|
|
@ -423,7 +423,6 @@ DispatchTracker::DispatchTracker(absl::Span<facade::Listener* const> listeners,
|
|||
issuer_{issuer},
|
||||
ignore_paused_{ignore_paused},
|
||||
ignore_blocked_{ignore_blocked} {
|
||||
bc_ = make_unique<util::fb2::BlockingCounter>(0);
|
||||
}
|
||||
|
||||
void DispatchTracker::TrackOnThread() {
|
||||
|
@ -436,7 +435,7 @@ bool DispatchTracker::Wait(absl::Duration duration) {
|
|||
if (!res && ignore_blocked_) {
|
||||
// We track all connections again because a connection might became blocked between the time
|
||||
// we call tracking the last time.
|
||||
bc_.reset(new util::fb2::BlockingCounter(0));
|
||||
bc_ = BlockingCounter{0};
|
||||
TrackAll();
|
||||
res = bc_->WaitFor(absl::ToChronoMilliseconds(duration));
|
||||
}
|
||||
|
@ -450,7 +449,7 @@ void DispatchTracker::TrackAll() {
|
|||
|
||||
void DispatchTracker::Handle(unsigned thread_index, util::Connection* conn) {
|
||||
if (auto* fconn = static_cast<facade::Connection*>(conn); fconn != issuer_)
|
||||
fconn->SendCheckpoint(*bc_, ignore_paused_, ignore_blocked_);
|
||||
fconn->SendCheckpoint(bc_, ignore_paused_, ignore_blocked_);
|
||||
}
|
||||
|
||||
} // namespace facade
|
||||
|
|
|
@ -106,7 +106,7 @@ class DispatchTracker {
|
|||
|
||||
std::vector<facade::Listener*> listeners_;
|
||||
facade::Connection* issuer_;
|
||||
std::unique_ptr<util::fb2::BlockingCounter> bc_;
|
||||
util::fb2::BlockingCounter bc_{0}; // tracks number of pending checkpoints
|
||||
bool ignore_paused_;
|
||||
bool ignore_blocked_;
|
||||
};
|
||||
|
|
|
@ -322,15 +322,15 @@ class EngineShardSet {
|
|||
// The functions running inside the shard queue run atomically (sequentially)
|
||||
// with respect each other on the same shard.
|
||||
template <typename U> void AwaitRunningOnShardQueue(U&& func) {
|
||||
util::fb2::BlockingCounter bc{unsigned(shard_queue_.size())};
|
||||
util::fb2::BlockingCounter bc(shard_queue_.size());
|
||||
for (size_t i = 0; i < shard_queue_.size(); ++i) {
|
||||
Add(i, [&func, bc]() mutable {
|
||||
func(EngineShard::tlocal());
|
||||
bc.Dec();
|
||||
bc->Dec();
|
||||
});
|
||||
}
|
||||
|
||||
bc.Wait();
|
||||
bc->Wait();
|
||||
}
|
||||
|
||||
// Used in tests
|
||||
|
@ -353,14 +353,14 @@ void EngineShardSet::RunBriefInParallel(U&& func, P&& pred) const {
|
|||
if (!pred(i))
|
||||
continue;
|
||||
|
||||
bc.Add(1);
|
||||
bc->Add(1);
|
||||
util::ProactorBase* dest = pp_->at(i);
|
||||
dest->DispatchBrief([&func, bc]() mutable {
|
||||
func(EngineShard::tlocal());
|
||||
bc.Dec();
|
||||
bc->Dec();
|
||||
});
|
||||
}
|
||||
bc.Wait();
|
||||
bc->Wait();
|
||||
}
|
||||
|
||||
template <typename U, typename P> void EngineShardSet::RunBlockingInParallel(U&& func, P&& pred) {
|
||||
|
@ -374,16 +374,16 @@ template <typename U, typename P> void EngineShardSet::RunBlockingInParallel(U&&
|
|||
if (!pred(i))
|
||||
continue;
|
||||
|
||||
bc.Add(1);
|
||||
bc->Add(1);
|
||||
util::ProactorBase* dest = pp_->at(i);
|
||||
|
||||
// the "Dispatch" call spawns a fiber underneath.
|
||||
dest->Dispatch([&func, bc]() mutable {
|
||||
func(EngineShard::tlocal());
|
||||
bc.Dec();
|
||||
bc->Dec();
|
||||
});
|
||||
}
|
||||
bc.Wait();
|
||||
bc->Wait();
|
||||
}
|
||||
|
||||
ShardId Shard(std::string_view v, ShardId shard_num);
|
||||
|
|
|
@ -23,7 +23,7 @@ bool MultiShardExecution::InsertTxToSharedMap(TxId txid, uint32_t shard_cnt) {
|
|||
|
||||
VLOG(2) << "txid: " << txid << " unique_shard_cnt_: " << shard_cnt
|
||||
<< " was_insert: " << was_insert;
|
||||
it->second.block.Dec();
|
||||
it->second.block->Dec();
|
||||
|
||||
return was_insert;
|
||||
}
|
||||
|
@ -45,7 +45,7 @@ void MultiShardExecution::CancelAllBlockingEntities() {
|
|||
lock_guard lk{map_mu};
|
||||
for (auto& tx_data : tx_sync_execution) {
|
||||
tx_data.second.barrier.Cancel();
|
||||
tx_data.second.block.Cancel();
|
||||
tx_data.second.block->Cancel();
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2027,9 +2027,9 @@ void RdbLoader::FinishLoad(absl::Time start_time, size_t* keys_loaded) {
|
|||
FlushShardAsync(i);
|
||||
|
||||
// Send sentinel callbacks to ensure that all previous messages have been processed.
|
||||
shard_set->Add(i, [bc]() mutable { bc.Dec(); });
|
||||
shard_set->Add(i, [bc]() mutable { bc->Dec(); });
|
||||
}
|
||||
bc.Wait(); // wait for sentinels to report.
|
||||
bc->Wait(); // wait for sentinels to report.
|
||||
|
||||
absl::Duration dur = absl::Now() - start_time;
|
||||
load_time_ = double(absl::ToInt64Milliseconds(dur)) / 1000;
|
||||
|
|
|
@ -451,7 +451,7 @@ error_code Replica::InitiateDflySync() {
|
|||
// Switch to new error handler that closes flow sockets.
|
||||
auto err_handler = [this, sync_block](const auto& ge) mutable {
|
||||
// Unblock this function.
|
||||
sync_block.Cancel();
|
||||
sync_block->Cancel();
|
||||
|
||||
// Make sure the flows are not in a state transition
|
||||
lock_guard lk{flows_op_mu_};
|
||||
|
@ -519,7 +519,7 @@ error_code Replica::InitiateDflySync() {
|
|||
// Wait for all flows to receive full sync cut.
|
||||
// In case of an error, this is unblocked by the error handler.
|
||||
VLOG(1) << "Waiting for all full sync cut confirmations";
|
||||
sync_block.Wait();
|
||||
sync_block->Wait();
|
||||
|
||||
// Check if we woke up due to cancellation.
|
||||
if (cntx_.IsCancelled())
|
||||
|
@ -731,7 +731,7 @@ void DflyShardReplica::FullSyncDflyFb(std::string eof_token, BlockingCounter bc,
|
|||
RdbLoader loader(&service_);
|
||||
loader.SetFullSyncCutCb([bc, ran = false]() mutable {
|
||||
if (!ran) {
|
||||
bc.Dec();
|
||||
bc->Dec();
|
||||
ran = true;
|
||||
}
|
||||
});
|
||||
|
@ -951,7 +951,7 @@ void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, bool inserted_by_me,
|
|||
// Wait until shards flows got transaction data and inserted to map.
|
||||
// This step enforces that replica will execute multi shard commands that finished on master
|
||||
// and replica recieved all the commands from all shards.
|
||||
multi_shard_data.block.Wait();
|
||||
multi_shard_data.block->Wait();
|
||||
// Check if we woke up due to cancellation.
|
||||
if (cntx_.IsCancelled())
|
||||
return;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue