chore: cosmetic changes around Snapshot functions (#3652)

* chore: cosmetic changes around Snapshot functions

Some renames and added comments. Refactored StartIncremental into a separate function
without any functional changes.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>

* chore: fix comments

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-09-08 09:25:41 +03:00 committed by GitHub
parent 1d34bf735e
commit 264835e9c4
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 102 additions and 88 deletions

View file

@ -112,20 +112,21 @@ void DflyCmd::ReplicaInfo::Cancel() {
// Update state and cancel context.
replica_state = SyncState::CANCELLED;
cntx.Cancel();
// Wait for tasks to finish.
shard_set->RunBlockingInParallel([this](EngineShard* shard) {
VLOG(2) << "Disconnecting flow " << shard->shard_id();
FlowInfo* flow = &flows[shard->shard_id()];
if (flow->cleanup) {
flow->cleanup();
}
VLOG(2) << "After flow cleanup " << shard->shard_id();
flow->full_sync_fb.JoinIfNeeded();
flow->conn = nullptr;
});
// Wait for error handler to quit.
cntx.JoinErrorHandler();
VLOG(1) << "Disconnecting replica " << address << ":" << listening_port;
}
DflyCmd::DflyCmd(ServerFamily* server_family) : sf_(server_family) {
@ -598,10 +599,8 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineSha
}
void DflyCmd::StopFullSyncInThread(FlowInfo* flow, EngineShard* shard) {
// Shard can be null for io thread.
if (shard != nullptr) {
flow->saver->StopSnapshotInShard(shard);
}
DCHECK(shard);
flow->saver->StopFullSyncInShard(shard);
// Wait for full sync to finish.
flow->full_sync_fb.JoinIfNeeded();

View file

@ -601,6 +601,7 @@ void EngineShard::RemoveContTx(Transaction* tx) {
}
void EngineShard::Heartbeat() {
DVLOG(2) << " Hearbeat";
DCHECK(namespaces.IsInitialized());
CacheStats();

View file

@ -56,6 +56,10 @@ class EngineShard {
return shard_;
}
bool IsMyThread() const {
return this == shard_;
}
ShardId shard_id() const {
return shard_id_;
}

View file

@ -1324,7 +1324,7 @@ void RdbSaver::Impl::StartIncrementalSnapshotting(Context* cntx, EngineShard* sh
}
void RdbSaver::Impl::StopSnapshotting(EngineShard* shard) {
GetSnapshot(shard)->Stop();
GetSnapshot(shard)->Finalize();
}
void RdbSaver::Impl::Cancel() {
@ -1334,7 +1334,7 @@ void RdbSaver::Impl::Cancel() {
auto& snapshot = GetSnapshot(shard);
if (snapshot)
snapshot->Cancel();
snapshot->StopChannel();
dfly::SliceSnapshot::DbRecord rec;
while (channel_.Pop(rec)) {
@ -1479,7 +1479,7 @@ void RdbSaver::StartIncrementalSnapshotInShard(Context* cntx, EngineShard* shard
impl_->StartIncrementalSnapshotting(cntx, shard, start_lsn);
}
void RdbSaver::StopSnapshotInShard(EngineShard* shard) {
void RdbSaver::StopFullSyncInShard(EngineShard* shard) {
impl_->StopSnapshotting(shard);
}

View file

@ -92,8 +92,8 @@ class RdbSaver {
// Send only the incremental snapshot since start_lsn.
void StartIncrementalSnapshotInShard(Context* cntx, EngineShard* shard, LSN start_lsn);
// Stops serialization in journal streaming mode in the shard's thread.
void StopSnapshotInShard(EngineShard* shard);
// Stops full-sync serialization for replication in the shard's thread.
void StopFullSyncInShard(EngineShard* shard);
// Stores auxiliary (meta) values and header_info
std::error_code SaveHeader(const GlobalData& header_info);

View file

@ -90,79 +90,41 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot
snapshot_fb_ = fb2::Fiber("snapshot", [this, stream_journal, cll] {
IterateBucketsFb(cll, stream_journal);
db_slice_->UnregisterOnChange(snapshot_version_);
if (cll->IsCancelled()) {
Cancel();
} else if (!stream_journal) {
CloseRecordChannel();
// We stop the channel if we are performing backups (non-streaming) or full sync failed.
// For a successful full-sync we keep the channel in order to switch to streaming mode.
if (cll->IsCancelled() || !stream_journal) {
StopChannel();
}
});
}
void SliceSnapshot::StartIncremental(Context* cntx, LSN start_lsn) {
auto* journal = db_slice_->shard_owner()->journal();
DCHECK(journal);
serializer_ = std::make_unique<RdbSerializer>(compression_mode_);
snapshot_fb_ =
fb2::Fiber("incremental_snapshot", [this, journal, cntx, lsn = start_lsn]() mutable {
DCHECK(lsn <= journal->GetLsn()) << "The replica tried to sync from the future.";
VLOG(1) << "Starting incremental snapshot from lsn=" << lsn;
// The replica sends the LSN of the next entry is wants to receive.
while (!cntx->IsCancelled() && journal->IsLSNInBuffer(lsn)) {
serializer_->WriteJournalEntry(journal->GetEntry(lsn));
PushSerializedToChannel(false);
lsn++;
}
VLOG(1) << "Last LSN sent in incremental snapshot was " << (lsn - 1);
// This check is safe, but it is not trivially safe.
// We rely here on the fact that JournalSlice::AddLogRecord can
// only preempt while holding the callback lock.
// That guarantees that if we have processed the last LSN the callback
// will only be added after JournalSlice::AddLogRecord has finished
// iterating its callbacks and we won't process the record twice.
// We have to make sure we don't preempt ourselves before registering the callback!
// GetLsn() is always the next lsn that we expect to create.
if (journal->GetLsn() == lsn) {
{
FiberAtomicGuard fg;
serializer_->SendFullSyncCut();
}
auto journal_cb = absl::bind_front(&SliceSnapshot::OnJournalEntry, this);
journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb));
PushSerializedToChannel(true);
} else {
// We stopped but we didn't manage to send the whole stream.
cntx->ReportError(
std::make_error_code(errc::state_not_recoverable),
absl::StrCat("Partial sync was unsuccessful because entry #", lsn,
" was dropped from the buffer. Current lsn=", journal->GetLsn()));
Cancel();
}
});
snapshot_fb_ = fb2::Fiber("incremental_snapshot", [cntx, start_lsn, this] {
this->SwitchIncrementalFb(cntx, start_lsn);
});
}
void SliceSnapshot::Stop() {
// Called only for replication use-case.
void SliceSnapshot::Finalize() {
DCHECK(db_slice_->shard_owner()->IsMyThread());
DCHECK(journal_cb_id_);
// Wait for serialization to finish in any case.
Join();
if (journal_cb_id_) {
auto* journal = db_slice_->shard_owner()->journal();
serializer_->SendJournalOffset(journal->GetLsn());
journal->UnregisterOnChange(journal_cb_id_);
}
snapshot_fb_.JoinIfNeeded();
auto* journal = db_slice_->shard_owner()->journal();
serializer_->SendJournalOffset(journal->GetLsn());
journal->UnregisterOnChange(journal_cb_id_);
journal_cb_id_ = 0;
PushSerializedToChannel(true);
CloseRecordChannel();
}
void SliceSnapshot::Cancel() {
VLOG(1) << "SliceSnapshot::Cancel";
void SliceSnapshot::StopChannel() {
VLOG(1) << "SliceSnapshot::StopChannel";
DCHECK(db_slice_->shard_owner()->IsMyThread());
// Cancel() might be called multiple times from different fibers of the same thread, but we
// should unregister the callback only once.
@ -175,11 +137,6 @@ void SliceSnapshot::Cancel() {
CloseRecordChannel();
}
void SliceSnapshot::Join() {
// Fiber could have already been joined by Stop.
snapshot_fb_.JoinIfNeeded();
}
// The algorithm is to go over all the buckets and serialize those with
// version < snapshot_version_. In order to serialize each physical bucket exactly once we update
// bucket version to snapshot_version_ once it has been serialized.
@ -251,6 +208,49 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll, bool send_full_syn
<< stats_.loop_serialized << "/" << stats_.side_saved << "/" << stats_.savecb_calls;
}
void SliceSnapshot::SwitchIncrementalFb(Context* cntx, LSN lsn) {
auto* journal = db_slice_->shard_owner()->journal();
DCHECK(journal);
DCHECK_LE(lsn, journal->GetLsn()) << "The replica tried to sync from the future.";
VLOG(1) << "Starting incremental snapshot from lsn=" << lsn;
// The replica sends the LSN of the next entry is wants to receive.
while (!cntx->IsCancelled() && journal->IsLSNInBuffer(lsn)) {
serializer_->WriteJournalEntry(journal->GetEntry(lsn));
PushSerializedToChannel(false);
lsn++;
}
VLOG(1) << "Last LSN sent in incremental snapshot was " << (lsn - 1);
// This check is safe, but it is not trivially safe.
// We rely here on the fact that JournalSlice::AddLogRecord can
// only preempt while holding the callback lock.
// That guarantees that if we have processed the last LSN the callback
// will only be added after JournalSlice::AddLogRecord has finished
// iterating its callbacks and we won't process the record twice.
// We have to make sure we don't preempt ourselves before registering the callback!
// GetLsn() is always the next lsn that we expect to create.
if (journal->GetLsn() == lsn) {
{
FiberAtomicGuard fg;
serializer_->SendFullSyncCut();
}
auto journal_cb = absl::bind_front(&SliceSnapshot::OnJournalEntry, this);
journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb));
PushSerializedToChannel(true);
} else {
// We stopped but we didn't manage to send the whole stream.
cntx->ReportError(
std::make_error_code(errc::state_not_recoverable),
absl::StrCat("Partial sync was unsuccessful because entry #", lsn,
" was dropped from the buffer. Current lsn=", journal->GetLsn()));
StopChannel();
}
}
bool SliceSnapshot::BucketSaveCb(PrimeIterator it) {
++stats_.savecb_calls;
@ -288,7 +288,7 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite
while (!it.is_done()) {
++result;
// might preempt
// might preempt due to big value serialization.
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get());
++it;
}
@ -349,7 +349,7 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) {
return false;
// Flush any of the leftovers to avoid interleavings
const auto serialized = Serialize();
size_t serialized = Serialize();
// Bucket serialization might have accumulated some delayed values.
// Because we can finally block in this function, we'll await and serialize them
@ -359,7 +359,7 @@ bool SliceSnapshot::PushSerializedToChannel(bool force) {
delayed_entries_.pop_back();
}
const auto total_serialized = Serialize() + serialized;
size_t total_serialized = Serialize() + serialized;
return total_serialized > 0;
}
@ -401,6 +401,7 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await)
}
void SliceSnapshot::CloseRecordChannel() {
DCHECK(db_slice_->shard_owner()->IsMyThread());
std::unique_lock lk(db_slice_->GetSerializationMutex());
CHECK(!serialize_bucket_running_);

View file

@ -79,22 +79,30 @@ class SliceSnapshot {
// called.
void StartIncremental(Context* cntx, LSN start_lsn);
// Stop snapshot. Only needs to be called for journal streaming mode.
void Stop();
// Finalizes the snapshot. Only called for replication.
// Blocking. Must be called from the Snapshot thread.
void Finalize();
// Wait for iteration fiber to stop.
void Join();
// Force stop. Needs to be called together with cancelling the context.
// Stops channel. Needs to be called together with cancelling the context.
// Snapshot can't always react to cancellation in streaming mode because the
// iteration fiber might have finished running by then.
void Cancel();
// Blocking. Must be called from the Snapshot thread.
void StopChannel();
// Waits for a regular, non journal snapshot to finish.
// Called only for non-replication, backups usecases.
void Join() {
snapshot_fb_.JoinIfNeeded();
}
private:
// Main fiber that iterates over all buckets in the db slice
// Main snapshotting fiber that iterates over all buckets in the db slice
// and submits them to SerializeBucket.
void IterateBucketsFb(const Cancellation* cll, bool send_full_sync_cut);
// A fiber function that switches to the incremental mode
void SwitchIncrementalFb(Context* cntx, LSN lsn);
// Called on traversing cursor by IterateBucketsFb.
bool BucketSaveCb(PrimeIterator it);
@ -117,10 +125,11 @@ class SliceSnapshot {
// Push serializer's internal buffer to channel.
// Push regardless of buffer size if force is true.
// Return if pushed.
// Return true if pushed. Can block. Is called from the snapshot thread.
bool PushSerializedToChannel(bool force);
// Helper function that flushes the serialized items into the RecordStream
// Helper function that flushes the serialized items into the RecordStream.
// Can block on the channel.
using FlushState = SerializerBase::FlushState;
size_t Serialize(FlushState flush_state = FlushState::kFlushMidEntry);