chore: Make snapshotting more responsive (#4910)

* chore: Make snapshotting more responsive

This should improve situation around #4787 -
maybe not solve it completely but improve it significantly.

On my tests when doing snapshotting under read traffic with master
(memtier_benchmark --ratio 0:1 -d 256  --test-time=400  --distinct-client-seed --key-maximum=2000000 -c 5 -t 2 --pipeline=3)
I got drop from 250K qps to 8K qps during the full sync phase.

With this PR, the throughput went up to 70-80K qps.

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2025-04-16 07:48:23 +03:00 committed by GitHub
parent b86b692461
commit eb80d576d5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 51 additions and 96 deletions

2
helio

@ -1 +1 @@
Subproject commit 309cf5816cee5eb732f31311b73bcdb31f51d378
Subproject commit e1e3934b656a258c58125c18c7524dd6438c5585

View file

@ -14,7 +14,6 @@
namespace dfly {
namespace journal {
namespace fs = std::filesystem;
using namespace std;
using namespace util;

View file

@ -23,38 +23,11 @@ namespace dfly {
namespace journal {
using namespace std;
using namespace util;
namespace fs = std::filesystem;
namespace {
/*
string ShardName(std::string_view base, unsigned index) {
return absl::StrCat(base, "-", absl::Dec(index, absl::kZeroPad4), ".log");
}
uint32_t NextPowerOf2(uint32_t x) {
if (x < 2) {
return 1;
}
int log = 32 - __builtin_clz(x - 1);
return 1 << log;
}
*/
} // namespace
#define CHECK_EC(x) \
do { \
auto __ec$ = (x); \
CHECK(!__ec$) << "Error: " << __ec$ << " " << __ec$.message() << " for " << #x; \
} while (false)
JournalSlice::JournalSlice() {
}
JournalSlice::~JournalSlice() {
// CHECK(!shard_file_);
}
void JournalSlice::Init(unsigned index) {
@ -175,14 +148,6 @@ void JournalSlice::AddLogRecord(const Entry& entry) {
VLOG(2) << "Writing item [" << item.lsn << "]: " << entry.ToString();
}
#if 0
if (shard_file_) {
string line = absl::StrCat(item.lsn, " ", entry.txid, " ", entry.opcode, "\n");
error_code ec = shard_file_->Write(io::Buffer(line), file_offset_, 0);
CHECK_EC(ec);
file_offset_ += line.size();
}
#endif
CallOnChange(item);
}

View file

@ -383,7 +383,7 @@ error_code RdbSerializer::SaveListObject(const PrimeValue& pv) {
<< "/" << node->sz;
// Use listpack encoding
SaveLen(node->container);
RETURN_ON_ERR(SaveLen(node->container));
if (quicklistNodeIsCompressed(node)) {
void* data;
size_t compress_len = quicklistGetLzf(node, &data);
@ -910,7 +910,7 @@ size_t SerializerBase::SerializedLen() const {
io::Bytes SerializerBase::PrepareFlush(SerializerBase::FlushState flush_state) {
size_t sz = mem_buf_.InputLen();
if (sz == 0)
return mem_buf_.InputBuffer();
return {};
bool is_last_chunk = flush_state == FlushState::kFlushEndEntry;
VLOG(2) << "PrepareFlush:" << is_last_chunk << " " << number_of_chunks_;

View file

@ -7,8 +7,7 @@
#include <absl/strings/match.h>
#include <absl/strings/str_cat.h>
#include <mutex>
#include "base/cycle_clock.h"
#include "base/flags.h"
#include "base/logging.h"
#include "core/heap_size.h"
@ -27,12 +26,13 @@ using namespace std;
using namespace util;
using namespace chrono_literals;
using facade::operator""_MB;
using facade::operator""_KB;
namespace {
thread_local absl::flat_hash_set<SliceSnapshot*> tl_slice_snapshots;
constexpr size_t kMinBlobSize = 32_KB;
// Controls the chunks size for pushing serialized data. The larger the chunk the more CPU
// it may require (especially with compression), and less responsive the server may be.
constexpr size_t kMinBlobSize = 8_KB;
} // namespace
@ -98,7 +98,8 @@ void SliceSnapshot::Start(bool stream_journal, SnapshotFlush allow_flush) {
VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_;
snapshot_fb_ = fb2::Fiber("snapshot", [this, stream_journal] {
string fb_name = absl::StrCat("SliceSnapshot-", ProactorBase::me()->GetPoolIndex());
snapshot_fb_ = fb2::Fiber(fb_name, [this, stream_journal] {
this->IterateBucketsFb(stream_journal);
db_slice_->UnregisterOnChange(snapshot_version_);
consumer_->Finalize();
@ -114,7 +115,7 @@ void SliceSnapshot::StartIncremental(LSN start_lsn) {
// Called only for replication use-case.
void SliceSnapshot::FinalizeJournalStream(bool cancel) {
VLOG(1) << "Finalize Snapshot";
VLOG(1) << "FinalizeJournalStream";
DCHECK(db_slice_->shard_owner()->IsMyThread());
if (!journal_cb_id_) { // Finalize only once.
return;
@ -129,7 +130,8 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) {
journal->UnregisterOnChange(cb_id);
if (!cancel) {
serializer_->SendJournalOffset(journal->GetLsn());
// always succeeds because serializer_ flushes to string.
std::ignore = serializer_->SendJournalOffset(journal->GetLsn());
PushSerialized(true);
}
}
@ -147,16 +149,13 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) {
// Serializes all the entries with version less than snapshot_version_.
void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
{
auto fiber_name = absl::StrCat("SliceSnapshot-", ProactorBase::me()->GetPoolIndex());
ThisFiber::SetName(std::move(fiber_name));
}
PrimeTable::Cursor cursor;
for (DbIndex db_indx = 0; db_indx < db_array_.size(); ++db_indx) {
stats_.keys_total += db_slice_->DbSize(db_indx);
}
const uint64_t kCyclesPerJiffy = base::CycleClock::Frequency() >> 16; // ~15usec.
for (DbIndex db_indx = 0; db_indx < db_array_.size(); ++db_indx) {
if (!cntx_->IsRunning())
return;
@ -164,10 +163,9 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
if (!db_array_[db_indx])
continue;
uint64_t last_yield = 0;
PrimeTable* pt = &db_array_[db_indx]->prime;
VLOG(1) << "Start traversing " << pt->size() << " items for index " << db_indx;
do {
if (!cntx_->IsRunning()) {
return;
@ -176,17 +174,13 @@ void SliceSnapshot::IterateBucketsFb(bool send_full_sync_cut) {
PrimeTable::Cursor next = pt->TraverseBuckets(
cursor, [this, &db_indx](auto it) { return BucketSaveCb(db_indx, it); });
cursor = next;
PushSerialized(false);
if (stats_.loop_serialized >= last_yield + 100) {
DVLOG(2) << "Before sleep " << ThisFiber::GetName();
// If we do not flush the data, and have not preempted,
// we may need to yield to other fibers to avoid grabbing CPU for too long.
if (!PushSerialized(false)) {
if (ThisFiber::GetRunningTimeCycles() > kCyclesPerJiffy) {
ThisFiber::Yield();
DVLOG(2) << "After sleep";
last_yield = stats_.loop_serialized;
// Push in case other fibers (writes commands that pushed previous values)
// filled the buffer.
PushSerialized(false);
}
}
} while (cursor);
@ -214,7 +208,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
// The replica sends the LSN of the next entry is wants to receive.
while (cntx_->IsRunning() && journal->IsLSNInBuffer(lsn)) {
serializer_->WriteJournalEntry(journal->GetEntry(lsn));
std::ignore = serializer_->WriteJournalEntry(journal->GetEntry(lsn));
PushSerialized(false);
lsn++;
}
@ -231,10 +225,8 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
// GetLsn() is always the next lsn that we expect to create.
if (journal->GetLsn() == lsn) {
{
FiberAtomicGuard fg;
serializer_->SendFullSyncCut();
}
std::ignore = serializer_->SendFullSyncCut();
auto journal_cb = [this](const journal::JournalItem& item, bool await) {
OnJournalEntry(item, await);
};
@ -255,29 +247,22 @@ bool SliceSnapshot::BucketSaveCb(DbIndex db_index, PrimeTable::bucket_iterator i
++stats_.savecb_calls;
auto check = [&](uint64_t v) {
if (v >= snapshot_version_) {
if (it.GetVersion() >= snapshot_version_) {
// either has been already serialized or added after snapshotting started.
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << " at " << v;
DVLOG(3) << "Skipped " << it.segment_id() << ":" << it.bucket_id() << " at " << it.GetVersion();
++stats_.skipped;
return false;
}
return true;
};
if (!check(it.GetVersion())) {
return false;
}
db_slice_->FlushChangeToEarlierCallbacks(db_index, DbSlice::Iterator::FromPrime(it),
snapshot_version_);
auto* blocking_counter = db_slice_->GetLatch();
auto* latch = db_slice_->GetLatch();
// Locking this never preempts. We merely just increment the underline counter such that
// if SerializeBucket preempts, Heartbeat() won't run because the blocking counter is not
// zero.
std::lock_guard blocking_counter_guard(*blocking_counter);
std::lock_guard latch_guard(*latch);
stats_.loop_serialized += SerializeBucket(db_index, it);
@ -324,7 +309,8 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
io::StringFile sfile;
serializer_->FlushToSink(&sfile, flush_state);
error_code ec = serializer_->FlushToSink(&sfile, flush_state);
CHECK(!ec); // always succeeds
size_t serialized = sfile.val.size();
if (serialized == 0)
@ -333,6 +319,8 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
uint64_t id = rec_id_++;
DVLOG(2) << "Pushing " << id;
uint64_t running_cycles = ThisFiber::GetRunningTimeCycles();
fb2::NoOpLock lk;
// We create a critical section here that ensures that records are pushed in sequential order.
@ -351,6 +339,12 @@ size_t SliceSnapshot::FlushSerialized(SerializerBase::FlushState flush_state) {
VLOG(2) << "Pushed with Serialize() " << serialized;
// FlushToSink can be quite slow for large values or due compression, therefore
// we counter-balance CPU over-usage by forcing sleep.
// We measure running_cycles before the preemption points, because they reset the counter.
uint64_t sleep_usec = (running_cycles * 1000'000 / base::CycleClock::Frequency()) / 2;
ThisFiber::SleepFor(chrono::microseconds(std::min(sleep_usec, 2000ul)));
return serialized;
}
@ -419,19 +413,19 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
// value. This is guaranteed by the fact that OnJournalEntry runs always after OnDbChange, and
// no database switch can be performed between those two calls, because they are part of one
// transaction.
void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) {
// To enable journal flushing to sync after non auto journal command is executed we call
// TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no
// additional journal change to serialize, it simply invokes PushSerialized.
// allow_flush is controlled by Journal::SetFlushMode
// (usually it's true unless we are in the middle of a critical section that can not preempt).
void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool allow_flush) {
{
// We should release the lock after we preempt
std::lock_guard guard(big_value_mu_);
// We grab the lock in case we are in the middle of serializing a bucket, so it serves as a
// barrier here for atomic serialization.
std::lock_guard barrier(big_value_mu_);
if (item.opcode != journal::Op::NOOP) {
serializer_->WriteJournalEntry(item.data);
std::ignore = serializer_->WriteJournalEntry(item.data);
}
}
if (await) {
if (allow_flush) {
// This is the only place that flushes in streaming mode
// once the iterate buckets fiber finished.
PushSerialized(false);

View file

@ -125,7 +125,7 @@ class SliceSnapshot {
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
// Journal listener
void OnJournalEntry(const journal::JournalItem& item, bool allow_await);
void OnJournalEntry(const journal::JournalItem& item, bool allow_flush);
// Push serializer's internal buffer.
// Push regardless of buffer size if force is true.

View file

@ -227,8 +227,4 @@ void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args,
// Might block the calling fiber unless Journal::SetFlushMode(false) is called.
void RecordExpiryBlocking(DbIndex dbid, std::string_view key);
// Trigger journal write to sink, no journal record will be added to journal.
// Must be called from shard thread of journal to sink.
void TriggerJournalWriteToSink();
} // namespace dfly

View file

@ -167,6 +167,7 @@ async def check_all_replicas_finished(c_replicas, c_master, timeout=20):
start = time.time()
while (time.time() - start) < timeout:
if not waiting_for:
logging.debug("All replicas finished after %s seconds", time.time() - start)
return
await asyncio.sleep(0.2)
m_offset = await c_master.execute_command("DFLY REPLICAOFFSET")
@ -2715,7 +2716,7 @@ async def test_replication_timeout_on_full_sync_heartbeat_expiry(
await asyncio.sleep(1) # replica will start resync
await check_all_replicas_finished([c_replica], c_master)
await check_all_replicas_finished([c_replica], c_master, 60)
await assert_replica_reconnections(replica, 0)