fix: fixes for v1.14.0 (#2473)

* fix: fixes for v1.14.0

Stop writing to the replication ring_buffer
Stop allocating in TopKeys
Tighter CHECKs around tx execution.

---------

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-01-25 14:23:14 +03:00 committed by GitHub
parent f69f2ec0ca
commit 08d2fa52e1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 23 additions and 16 deletions

View file

@ -31,7 +31,6 @@ namespace {
string ShardName(std::string_view base, unsigned index) { string ShardName(std::string_view base, unsigned index) {
return absl::StrCat(base, "-", absl::Dec(index, absl::kZeroPad4), ".log"); return absl::StrCat(base, "-", absl::Dec(index, absl::kZeroPad4), ".log");
} }
*/
uint32_t NextPowerOf2(uint32_t x) { uint32_t NextPowerOf2(uint32_t x) {
if (x < 2) { if (x < 2) {
@ -41,6 +40,8 @@ uint32_t NextPowerOf2(uint32_t x) {
return 1 << log; return 1 << log;
} }
*/
} // namespace } // namespace
#define CHECK_EC(x) \ #define CHECK_EC(x) \
@ -61,7 +62,7 @@ void JournalSlice::Init(unsigned index) {
return; return;
slice_index_ = index; slice_index_ = index;
ring_buffer_.emplace(NextPowerOf2(absl::GetFlag(FLAGS_shard_repl_backlog_len))); ring_buffer_.emplace(2);
} }
#if 0 #if 0
@ -156,7 +157,7 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
FiberAtomicGuard fg; FiberAtomicGuard fg;
// GetTail gives a pointer to a new tail entry in the buffer, possibly overriding the last entry // GetTail gives a pointer to a new tail entry in the buffer, possibly overriding the last entry
// if the buffer is full. // if the buffer is full.
item = ring_buffer_->GetTail(true); item = &dummy;
item->opcode = entry.opcode; item->opcode = entry.opcode;
item->lsn = lsn_++; item->lsn = lsn_++;
item->slot = entry.slot; item->slot = entry.slot;

View file

@ -12,8 +12,8 @@
namespace dfly { namespace dfly {
TopKeys::TopKeys(Options options) : options_(options) { TopKeys::TopKeys(Options options)
fingerprints_.resize(options_.buckets * options_.arrays); : options_(options), fingerprints_(options.enabled ? options_.buckets * options_.arrays : 0) {
} }
void TopKeys::Touch(std::string_view key) { void TopKeys::Touch(std::string_view key) {
@ -63,8 +63,11 @@ void TopKeys::Touch(std::string_view key) {
} }
absl::flat_hash_map<std::string, uint64_t> TopKeys::GetTopKeys() const { absl::flat_hash_map<std::string, uint64_t> TopKeys::GetTopKeys() const {
absl::flat_hash_map<std::string, uint64_t> results; if (!IsEnabled()) {
return {};
}
absl::flat_hash_map<std::string, uint64_t> results;
for (uint64_t array = 0; array < options_.arrays; ++array) { for (uint64_t array = 0; array < options_.arrays; ++array) {
for (uint64_t bucket = 0; bucket < options_.buckets; ++bucket) { for (uint64_t bucket = 0; bucket < options_.buckets; ++bucket) {
const Cell& cell = GetCell(array, bucket); const Cell& cell = GetCell(array, bucket);
@ -73,7 +76,6 @@ absl::flat_hash_map<std::string, uint64_t> TopKeys::GetTopKeys() const {
} }
} }
} }
return results; return results;
} }
@ -82,12 +84,14 @@ bool TopKeys::IsEnabled() const {
} }
TopKeys::Cell& TopKeys::GetCell(uint64_t array, uint64_t bucket) { TopKeys::Cell& TopKeys::GetCell(uint64_t array, uint64_t bucket) {
DCHECK(IsEnabled());
DCHECK(array < options_.arrays); DCHECK(array < options_.arrays);
DCHECK(bucket < options_.buckets); DCHECK(bucket < options_.buckets);
return fingerprints_[array * options_.buckets + bucket]; return fingerprints_[array * options_.buckets + bucket];
} }
const TopKeys::Cell& TopKeys::GetCell(uint64_t array, uint64_t bucket) const { const TopKeys::Cell& TopKeys::GetCell(uint64_t array, uint64_t bucket) const {
DCHECK(IsEnabled());
DCHECK(array < options_.arrays); DCHECK(array < options_.arrays);
DCHECK(bucket < options_.buckets); DCHECK(bucket < options_.buckets);
return fingerprints_[array * options_.buckets + bucket]; return fingerprints_[array * options_.buckets + bucket];

View file

@ -501,9 +501,8 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
unsigned idx = SidToId(shard->shard_id()); unsigned idx = SidToId(shard->shard_id());
auto& sd = shard_data_[idx]; auto& sd = shard_data_[idx];
bool prev_armed = sd.is_armed.load(memory_order_relaxed); CHECK(sd.is_armed.exchange(false, memory_order_relaxed));
DCHECK(prev_armed); CHECK_GT(run_count_.load(memory_order_relaxed), 0u);
sd.is_armed.store(false, memory_order_relaxed);
VLOG(2) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id() << " " << sd.local_mask; VLOG(2) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id() << " " << sd.local_mask;
@ -618,7 +617,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
} }
} }
CHECK_GE(DecreaseRunCnt(), 1u); DecreaseRunCnt();
// From this point on we can not access 'this'. // From this point on we can not access 'this'.
return !is_concluding; return !is_concluding;
@ -744,7 +743,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
// If DecreaseRunCnt were called before ScheduleUniqueShard finishes // If DecreaseRunCnt were called before ScheduleUniqueShard finishes
// then WaitForShardCallbacks below could exit before schedule_cb assigns return value // then WaitForShardCallbacks below could exit before schedule_cb assigns return value
// to was_ooo and cause stack corruption. // to was_ooo and cause stack corruption.
CHECK_GE(DecreaseRunCnt(), 1u); DecreaseRunCnt();
} }
}; };
@ -1332,7 +1331,7 @@ void Transaction::ExpireShardCb(ArgSlice wkeys, EngineShard* shard) {
// Resume processing of transaction queue // Resume processing of transaction queue
shard->PollExecution("unwatchcb", nullptr); shard->PollExecution("unwatchcb", nullptr);
CHECK_GE(DecreaseRunCnt(), 1u); DecreaseRunCnt();
} }
OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) { OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
@ -1396,7 +1395,7 @@ void Transaction::UnlockMultiShardCb(const KeyList& sharded_keys, EngineShard* s
this->DecreaseRunCnt(); this->DecreaseRunCnt();
} }
inline uint32_t Transaction::DecreaseRunCnt() { void Transaction::DecreaseRunCnt() {
// to protect against cases where Transaction is destroyed before run_ec_.notify // to protect against cases where Transaction is destroyed before run_ec_.notify
// finishes running. We can not put it inside the (res == 1) block because then it's too late. // finishes running. We can not put it inside the (res == 1) block because then it's too late.
::boost::intrusive_ptr guard(this); ::boost::intrusive_ptr guard(this);
@ -1407,10 +1406,13 @@ inline uint32_t Transaction::DecreaseRunCnt() {
// The fact that run_ec_.notify() does release operation is not enough, because // The fact that run_ec_.notify() does release operation is not enough, because
// WaitForCallbacks might skip reading run_ec_ if run_count_ is already 0. // WaitForCallbacks might skip reading run_ec_ if run_count_ is already 0.
uint32_t res = run_count_.fetch_sub(1, memory_order_release); uint32_t res = run_count_.fetch_sub(1, memory_order_release);
CHECK_GE(res, 1u) << unique_shard_cnt_ << " " << unique_shard_id_ << " " << cid_->name() << " "
<< use_count_.load(memory_order_relaxed) << " " << uint32_t(coordinator_state_);
if (res == 1) { if (res == 1) {
run_ec_.notify(); run_ec_.notify();
} }
return res;
} }
bool Transaction::IsGlobal() const { bool Transaction::IsGlobal() const {

View file

@ -512,7 +512,7 @@ class Transaction {
void LogAutoJournalOnShard(EngineShard* shard, RunnableResult shard_result); void LogAutoJournalOnShard(EngineShard* shard, RunnableResult shard_result);
// Returns the previous value of run count. // Returns the previous value of run count.
uint32_t DecreaseRunCnt(); void DecreaseRunCnt();
uint32_t GetUseCount() const { uint32_t GetUseCount() const {
return use_count_.load(std::memory_order_relaxed); return use_count_.load(std::memory_order_relaxed);