From 08d2fa52e1705ba0ded51227f890e657e0d18137 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Thu, 25 Jan 2024 14:23:14 +0300 Subject: [PATCH] fix: fixes for v1.14.0 (#2473) * fix: fixes for v1.14.0 Stop writing to the replication ring_buffer Stop allocating in TopKeys Tighter CHECKs around tx execution. --------- Signed-off-by: Vladislav Oleshko --- src/server/journal/journal_slice.cc | 7 ++++--- src/server/top_keys.cc | 12 ++++++++---- src/server/transaction.cc | 18 ++++++++++-------- src/server/transaction.h | 2 +- 4 files changed, 23 insertions(+), 16 deletions(-) diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index 585af26ec..463403f22 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -31,7 +31,6 @@ namespace { string ShardName(std::string_view base, unsigned index) { return absl::StrCat(base, "-", absl::Dec(index, absl::kZeroPad4), ".log"); } -*/ uint32_t NextPowerOf2(uint32_t x) { if (x < 2) { @@ -41,6 +40,8 @@ uint32_t NextPowerOf2(uint32_t x) { return 1 << log; } +*/ + } // namespace #define CHECK_EC(x) \ @@ -61,7 +62,7 @@ void JournalSlice::Init(unsigned index) { return; slice_index_ = index; - ring_buffer_.emplace(NextPowerOf2(absl::GetFlag(FLAGS_shard_repl_backlog_len))); + ring_buffer_.emplace(2); } #if 0 @@ -156,7 +157,7 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) { FiberAtomicGuard fg; // GetTail gives a pointer to a new tail entry in the buffer, possibly overriding the last entry // if the buffer is full. - item = ring_buffer_->GetTail(true); + item = &dummy; item->opcode = entry.opcode; item->lsn = lsn_++; item->slot = entry.slot; diff --git a/src/server/top_keys.cc b/src/server/top_keys.cc index 27141cdd2..e92d00fc6 100644 --- a/src/server/top_keys.cc +++ b/src/server/top_keys.cc @@ -12,8 +12,8 @@ namespace dfly { -TopKeys::TopKeys(Options options) : options_(options) { - fingerprints_.resize(options_.buckets * options_.arrays); +TopKeys::TopKeys(Options options) + : options_(options), fingerprints_(options.enabled ? options_.buckets * options_.arrays : 0) { } void TopKeys::Touch(std::string_view key) { @@ -63,8 +63,11 @@ void TopKeys::Touch(std::string_view key) { } absl::flat_hash_map TopKeys::GetTopKeys() const { - absl::flat_hash_map results; + if (!IsEnabled()) { + return {}; + } + absl::flat_hash_map results; for (uint64_t array = 0; array < options_.arrays; ++array) { for (uint64_t bucket = 0; bucket < options_.buckets; ++bucket) { const Cell& cell = GetCell(array, bucket); @@ -73,7 +76,6 @@ absl::flat_hash_map TopKeys::GetTopKeys() const { } } } - return results; } @@ -82,12 +84,14 @@ bool TopKeys::IsEnabled() const { } TopKeys::Cell& TopKeys::GetCell(uint64_t array, uint64_t bucket) { + DCHECK(IsEnabled()); DCHECK(array < options_.arrays); DCHECK(bucket < options_.buckets); return fingerprints_[array * options_.buckets + bucket]; } const TopKeys::Cell& TopKeys::GetCell(uint64_t array, uint64_t bucket) const { + DCHECK(IsEnabled()); DCHECK(array < options_.arrays); DCHECK(bucket < options_.buckets); return fingerprints_[array * options_.buckets + bucket]; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index f09812bf3..ed607b7a4 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -501,9 +501,8 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { unsigned idx = SidToId(shard->shard_id()); auto& sd = shard_data_[idx]; - bool prev_armed = sd.is_armed.load(memory_order_relaxed); - DCHECK(prev_armed); - sd.is_armed.store(false, memory_order_relaxed); + CHECK(sd.is_armed.exchange(false, memory_order_relaxed)); + CHECK_GT(run_count_.load(memory_order_relaxed), 0u); VLOG(2) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id() << " " << sd.local_mask; @@ -618,7 +617,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { } } - CHECK_GE(DecreaseRunCnt(), 1u); + DecreaseRunCnt(); // From this point on we can not access 'this'. return !is_concluding; @@ -744,7 +743,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { // If DecreaseRunCnt were called before ScheduleUniqueShard finishes // then WaitForShardCallbacks below could exit before schedule_cb assigns return value // to was_ooo and cause stack corruption. - CHECK_GE(DecreaseRunCnt(), 1u); + DecreaseRunCnt(); } }; @@ -1332,7 +1331,7 @@ void Transaction::ExpireShardCb(ArgSlice wkeys, EngineShard* shard) { // Resume processing of transaction queue shard->PollExecution("unwatchcb", nullptr); - CHECK_GE(DecreaseRunCnt(), 1u); + DecreaseRunCnt(); } OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) { @@ -1396,7 +1395,7 @@ void Transaction::UnlockMultiShardCb(const KeyList& sharded_keys, EngineShard* s this->DecreaseRunCnt(); } -inline uint32_t Transaction::DecreaseRunCnt() { +void Transaction::DecreaseRunCnt() { // to protect against cases where Transaction is destroyed before run_ec_.notify // finishes running. We can not put it inside the (res == 1) block because then it's too late. ::boost::intrusive_ptr guard(this); @@ -1407,10 +1406,13 @@ inline uint32_t Transaction::DecreaseRunCnt() { // The fact that run_ec_.notify() does release operation is not enough, because // WaitForCallbacks might skip reading run_ec_ if run_count_ is already 0. uint32_t res = run_count_.fetch_sub(1, memory_order_release); + + CHECK_GE(res, 1u) << unique_shard_cnt_ << " " << unique_shard_id_ << " " << cid_->name() << " " + << use_count_.load(memory_order_relaxed) << " " << uint32_t(coordinator_state_); + if (res == 1) { run_ec_.notify(); } - return res; } bool Transaction::IsGlobal() const { diff --git a/src/server/transaction.h b/src/server/transaction.h index d02119ea3..82ded12cf 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -512,7 +512,7 @@ class Transaction { void LogAutoJournalOnShard(EngineShard* shard, RunnableResult shard_result); // Returns the previous value of run count. - uint32_t DecreaseRunCnt(); + void DecreaseRunCnt(); uint32_t GetUseCount() const { return use_count_.load(std::memory_order_relaxed);