From eb80d576d586a18145f8fbdffa8620316333ab5b Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Wed, 16 Apr 2025 07:48:23 +0300 Subject: [PATCH] chore: Make snapshotting more responsive (#4910) * chore: Make snapshotting more responsive This should improve situation around #4787 - maybe not solve it completely but improve it significantly. On my tests when doing snapshotting under read traffic with master (memtier_benchmark --ratio 0:1 -d 256 --test-time=400 --distinct-client-seed --key-maximum=2000000 -c 5 -t 2 --pipeline=3) I got drop from 250K qps to 8K qps during the full sync phase. With this PR, the throughput went up to 70-80K qps. --------- Signed-off-by: Roman Gershman --- helio | 2 +- src/server/journal/journal.cc | 1 - src/server/journal/journal_slice.cc | 35 ----------- src/server/rdb_save.cc | 4 +- src/server/snapshot.cc | 96 ++++++++++++++--------------- src/server/snapshot.h | 2 +- src/server/tx_base.h | 4 -- tests/dragonfly/replication_test.py | 3 +- 8 files changed, 51 insertions(+), 96 deletions(-) diff --git a/helio b/helio index 309cf5816..e1e3934b6 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 309cf5816cee5eb732f31311b73bcdb31f51d378 +Subproject commit e1e3934b656a258c58125c18c7524dd6438c5585 diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index cf04bc790..945931bc5 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -14,7 +14,6 @@ namespace dfly { namespace journal { -namespace fs = std::filesystem; using namespace std; using namespace util; diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index 87a730871..82670c4cd 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -23,38 +23,11 @@ namespace dfly { namespace journal { using namespace std; using namespace util; -namespace fs = std::filesystem; - -namespace { - -/* -string ShardName(std::string_view base, unsigned index) { - return absl::StrCat(base, "-", absl::Dec(index, absl::kZeroPad4), ".log"); -} - -uint32_t NextPowerOf2(uint32_t x) { - if (x < 2) { - return 1; - } - int log = 32 - __builtin_clz(x - 1); - return 1 << log; -} - -*/ - -} // namespace - -#define CHECK_EC(x) \ - do { \ - auto __ec$ = (x); \ - CHECK(!__ec$) << "Error: " << __ec$ << " " << __ec$.message() << " for " << #x; \ - } while (false) JournalSlice::JournalSlice() { } JournalSlice::~JournalSlice() { - // CHECK(!shard_file_); } void JournalSlice::Init(unsigned index) { @@ -175,14 +148,6 @@ void JournalSlice::AddLogRecord(const Entry& entry) { VLOG(2) << "Writing item [" << item.lsn << "]: " << entry.ToString(); } -#if 0 - if (shard_file_) { - string line = absl::StrCat(item.lsn, " ", entry.txid, " ", entry.opcode, "\n"); - error_code ec = shard_file_->Write(io::Buffer(line), file_offset_, 0); - CHECK_EC(ec); - file_offset_ += line.size(); - } -#endif CallOnChange(item); } diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 03a7231b7..8a17dc897 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -383,7 +383,7 @@ error_code RdbSerializer::SaveListObject(const PrimeValue& pv) { << "/" << node->sz; // Use listpack encoding - SaveLen(node->container); + RETURN_ON_ERR(SaveLen(node->container)); if (quicklistNodeIsCompressed(node)) { void* data; size_t compress_len = quicklistGetLzf(node, &data); @@ -910,7 +910,7 @@ size_t SerializerBase::SerializedLen() const { io::Bytes SerializerBase::PrepareFlush(SerializerBase::FlushState flush_state) { size_t sz = mem_buf_.InputLen(); if (sz == 0) - return mem_buf_.InputBuffer(); + return {}; bool is_last_chunk = flush_state == FlushState::kFlushEndEntry; VLOG(2) << "PrepareFlush:" << is_last_chunk << " " << number_of_chunks_; diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 54a964c3c..0e4438a7d 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -7,8 +7,7 @@ #include #include -#include - +#include "base/cycle_clock.h" #include "base/flags.h" #include "base/logging.h" #include "core/heap_size.h" @@ -27,12 +26,13 @@ using namespace std; using namespace util; using namespace chrono_literals; -using facade::operator""_MB; using facade::operator""_KB; namespace { thread_local absl::flat_hash_set tl_slice_snapshots; -constexpr size_t kMinBlobSize = 32_KB; +// Controls the chunks size for pushing serialized data. The larger the chunk the more CPU +// it may require (especially with compression), and less responsive the server may be. +constexpr size_t kMinBlobSize = 8_KB; } // namespace @@ -98,7 +98,8 @@ void SliceSnapshot::Start(bool stream_journal, SnapshotFlush allow_flush) { VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_; - snapshot_fb_ = fb2::Fiber("snapshot", [this, stream_journal] { + string fb_name = absl::StrCat("SliceSnapshot-", ProactorBase::me()->GetPoolIndex()); + snapshot_fb_ = fb2::Fiber(fb_name, [this, stream_journal] { this->IterateBucketsFb(stream_journal); db_slice_->UnregisterOnChange(snapshot_version_); consumer_->Finalize(); @@ -114,7 +115,7 @@ void SliceSnapshot::StartIncremental(LSN start_lsn) { // Called only for replication use-case. void SliceSnapshot::FinalizeJournalStream(bool cancel) { - VLOG(1) << "Finalize Snapshot"; + VLOG(1) << "FinalizeJournalStream"; DCHECK(db_slice_->shard_owner()->IsMyThread()); if (!journal_cb_id_) { // Finalize only once. return; @@ -129,7 +130,8 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) { journal->UnregisterOnChange(cb_id); if (!cancel) { - serializer_->SendJournalOffset(journal->GetLsn()); + // always succeeds because serializer_ flushes to string. + std::ignore = serializer_->SendJournalOffset(journal->GetLsn()); PushSerialized(true); } } @@ -147,16 +149,13 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) { // Serializes all the entries with version less than snapshot_version_. void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) { - { - auto fiber_name = absl::StrCat("SliceSnapshot-", ProactorBase::me()->GetPoolIndex()); - ThisFiber::SetName(std::move(fiber_name)); - } - PrimeTable::Cursor cursor; for (DbIndex db_indx = 0; db_indx < db_array_.size(); ++db_indx) { stats_.keys_total += db_slice_->DbSize(db_indx); } + const uint64_t kCyclesPerJiffy = base::CycleClock::Frequency() >> 16; // ~15usec. + for (DbIndex db_indx = 0; db_indx < db_array_.size(); ++db_indx) { if (!cntx_->IsRunning()) return; @@ -164,10 +163,9 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) { if (!db_array_[db_indx]) continue; - uint64_t last_yield = 0; PrimeTable* pt = &db_array_[db_indx]->prime; - VLOG(1) << "Start traversing " << pt->size() << " items for index " << db_indx; + do { if (!cntx_->IsRunning()) { return; @@ -176,17 +174,13 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) { PrimeTable::Cursor next = pt->TraverseBuckets( cursor, [this, &db_indx](auto it) { return BucketSaveCb(db_indx, it); }); cursor = next; - PushSerialized(false); - if (stats_.loop_serialized >= last_yield + 100) { - DVLOG(2) << "Before sleep " << ThisFiber::GetName(); - ThisFiber::Yield(); - DVLOG(2) << "After sleep"; - - last_yield = stats_.loop_serialized; - // Push in case other fibers (writes commands that pushed previous values) - // filled the buffer. - PushSerialized(false); + // If we do not flush the data, and have not preempted, + // we may need to yield to other fibers to avoid grabbing CPU for too long. + if (!PushSerialized(false)) { + if (ThisFiber::GetRunningTimeCycles() > kCyclesPerJiffy) { + ThisFiber::Yield(); + } } } while (cursor); @@ -214,7 +208,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) { // The replica sends the LSN of the next entry is wants to receive. while (cntx_->IsRunning() && journal->IsLSNInBuffer(lsn)) { - serializer_->WriteJournalEntry(journal->GetEntry(lsn)); + std::ignore = serializer_->WriteJournalEntry(journal->GetEntry(lsn)); PushSerialized(false); lsn++; } @@ -231,10 +225,8 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) { // GetLsn() is always the next lsn that we expect to create. if (journal->GetLsn() == lsn) { - { - FiberAtomicGuard fg; - serializer_->SendFullSyncCut(); - } + std::ignore = serializer_->SendFullSyncCut(); + auto journal_cb = [this](const journal::JournalItem& item, bool await) { OnJournalEntry(item, await); }; @@ -255,29 +247,22 @@ bool SliceSnapshot::BucketSaveCb(DbIndex db_index, PrimeTable::bucket_iterator i ++stats_.savecb_calls; - auto check = [&](uint64_t v) { - if (v >= snapshot_version_) { - // either has been already serialized or added after snapshotting started. - DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << " at " << v; - ++stats_.skipped; - return false; - } - return true; - }; - - if (!check(it.GetVersion())) { + if (it.GetVersion() >= snapshot_version_) { + // either has been already serialized or added after snapshotting started. + DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << " at " << it.GetVersion(); + ++stats_.skipped; return false; } db_slice_->FlushChangeToEarlierCallbacks(db_index, DbSlice::Iterator::FromPrime(it), snapshot_version_); - auto* blocking_counter = db_slice_->GetLatch(); + auto* latch = db_slice_->GetLatch(); // Locking this never preempts. We merely just increment the underline counter such that // if SerializeBucket preempts, Heartbeat() won't run because the blocking counter is not // zero. - std::lock_guard blocking_counter_guard(*blocking_counter); + std::lock_guard latch_guard(*latch); stats_.loop_serialized += SerializeBucket(db_index, it); @@ -324,7 +309,8 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) { io::StringFile sfile; - serializer_->FlushToSink(&sfile, flush_state); + error_code ec = serializer_->FlushToSink(&sfile, flush_state); + CHECK(!ec); // always succeeds size_t serialized = sfile.val.size(); if (serialized == 0) @@ -333,6 +319,8 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) { uint64_t id = rec_id_++; DVLOG(2) << "Pushing " << id; + uint64_t running_cycles = ThisFiber::GetRunningTimeCycles(); + fb2::NoOpLock lk; // We create a critical section here that ensures that records are pushed in sequential order. @@ -351,6 +339,12 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) { VLOG(2) << "Pushed with Serialize() " << serialized; + // FlushToSink can be quite slow for large values or due compression, therefore + // we counter-balance CPU over-usage by forcing sleep. + // We measure running_cycles before the preemption points, because they reset the counter. + uint64_t sleep_usec = (running_cycles * 1000'000 / base::CycleClock::Frequency()) / 2; + ThisFiber::SleepFor(chrono::microseconds(std::min(sleep_usec, 2000ul))); + return serialized; } @@ -419,19 +413,19 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) // value. This is guaranteed by the fact that OnJournalEntry runs always after OnDbChange, and // no database switch can be performed between those two calls, because they are part of one // transaction. -void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) { - // To enable journal flushing to sync after non auto journal command is executed we call - // TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no - // additional journal change to serialize, it simply invokes PushSerialized. +// allow_flush is controlled by Journal::SetFlushMode +// (usually it's true unless we are in the middle of a critical section that can not preempt). +void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool allow_flush) { { - // We should release the lock after we preempt - std::lock_guard guard(big_value_mu_); + // We grab the lock in case we are in the middle of serializing a bucket, so it serves as a + // barrier here for atomic serialization. + std::lock_guard barrier(big_value_mu_); if (item.opcode != journal::Op::NOOP) { - serializer_->WriteJournalEntry(item.data); + std::ignore = serializer_->WriteJournalEntry(item.data); } } - if (await) { + if (allow_flush) { // This is the only place that flushes in streaming mode // once the iterate buckets fiber finished. PushSerialized(false); diff --git a/src/server/snapshot.h b/src/server/snapshot.h index fe44841d9..f027b5e48 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -125,7 +125,7 @@ class SliceSnapshot { void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req); // Journal listener - void OnJournalEntry(const journal::JournalItem& item, bool allow_await); + void OnJournalEntry(const journal::JournalItem& item, bool allow_flush); // Push serializer's internal buffer. // Push regardless of buffer size if force is true. diff --git a/src/server/tx_base.h b/src/server/tx_base.h index bd964f3ea..fb9024190 100644 --- a/src/server/tx_base.h +++ b/src/server/tx_base.h @@ -227,8 +227,4 @@ void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args, // Might block the calling fiber unless Journal::SetFlushMode(false) is called. void RecordExpiryBlocking(DbIndex dbid, std::string_view key); -// Trigger journal write to sink, no journal record will be added to journal. -// Must be called from shard thread of journal to sink. -void TriggerJournalWriteToSink(); - } // namespace dfly diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index a40762b65..b35132512 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -167,6 +167,7 @@ async def check_all_replicas_finished(c_replicas, c_master, timeout=20): start = time.time() while (time.time() - start) < timeout: if not waiting_for: + logging.debug("All replicas finished after %s seconds", time.time() - start) return await asyncio.sleep(0.2) m_offset = await c_master.execute_command("DFLY REPLICAOFFSET") @@ -2715,7 +2716,7 @@ async def test_replication_timeout_on_full_sync_heartbeat_expiry( await asyncio.sleep(1) # replica will start resync - await check_all_replicas_finished([c_replica], c_master) + await check_all_replicas_finished([c_replica], c_master, 60) await assert_replica_reconnections(replica, 0)