mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
fix(cluster): fix incorrect version checking and resource double free (#2499)
* fix(cluster): fix incorrect version checking and resource double free
This commit is contained in:
parent
e4862f227d
commit
1dee082f86
3 changed files with 11 additions and 21 deletions
|
@ -12,15 +12,16 @@ namespace dfly {
|
||||||
class OutgoingMigration::SliceSlotMigration {
|
class OutgoingMigration::SliceSlotMigration {
|
||||||
public:
|
public:
|
||||||
SliceSlotMigration(DbSlice* slice, SlotSet slots, uint32_t sync_id, journal::Journal* journal,
|
SliceSlotMigration(DbSlice* slice, SlotSet slots, uint32_t sync_id, journal::Journal* journal,
|
||||||
Context* cntx)
|
Context* cntx, io::Sink* dest)
|
||||||
: streamer_(slice, std::move(slots), sync_id, journal, cntx) {
|
: streamer_(slice, std::move(slots), sync_id, journal, cntx) {
|
||||||
}
|
|
||||||
|
|
||||||
void Start(io::Sink* dest) {
|
|
||||||
streamer_.Start(dest);
|
streamer_.Start(dest);
|
||||||
state_ = MigrationState::C_FULL_SYNC;
|
state_ = MigrationState::C_FULL_SYNC;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
~SliceSlotMigration() {
|
||||||
|
streamer_.Cancel();
|
||||||
|
}
|
||||||
|
|
||||||
MigrationState GetState() const {
|
MigrationState GetState() const {
|
||||||
return state_ == MigrationState::C_FULL_SYNC && streamer_.IsSnapshotFinished()
|
return state_ == MigrationState::C_FULL_SYNC && streamer_.IsSnapshotFinished()
|
||||||
? MigrationState::C_STABLE_SYNC
|
? MigrationState::C_STABLE_SYNC
|
||||||
|
@ -52,8 +53,7 @@ void OutgoingMigration::StartFlow(DbSlice* slice, uint32_t sync_id, journal::Jou
|
||||||
|
|
||||||
std::lock_guard lck(flows_mu_);
|
std::lock_guard lck(flows_mu_);
|
||||||
slot_migrations_[shard_id] =
|
slot_migrations_[shard_id] =
|
||||||
std::make_unique<SliceSlotMigration>(slice, std::move(sset), sync_id, journal, &cntx_);
|
std::make_unique<SliceSlotMigration>(slice, std::move(sset), sync_id, journal, &cntx_, dest);
|
||||||
slot_migrations_[shard_id]->Start(dest);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
MigrationState OutgoingMigration::GetState() const {
|
MigrationState OutgoingMigration::GetState() const {
|
||||||
|
|
|
@ -99,12 +99,6 @@ void RestoreStreamer::Cancel() {
|
||||||
JournalStreamer::Cancel();
|
JournalStreamer::Cancel();
|
||||||
}
|
}
|
||||||
|
|
||||||
RestoreStreamer::~RestoreStreamer() {
|
|
||||||
fiber_cancellation_.Cancel();
|
|
||||||
snapshot_fb_.JoinIfNeeded();
|
|
||||||
db_slice_->UnregisterOnChange(snapshot_version_);
|
|
||||||
}
|
|
||||||
|
|
||||||
bool RestoreStreamer::ShouldWrite(const journal::JournalItem& item) const {
|
bool RestoreStreamer::ShouldWrite(const journal::JournalItem& item) const {
|
||||||
if (!item.slot.has_value()) {
|
if (!item.slot.has_value()) {
|
||||||
return false;
|
return false;
|
||||||
|
@ -122,11 +116,10 @@ bool RestoreStreamer::ShouldWrite(SlotId slot_id) const {
|
||||||
}
|
}
|
||||||
|
|
||||||
void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
|
void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
|
||||||
DCHECK_LT(it.GetVersion(), snapshot_version_);
|
|
||||||
it.SetVersion(snapshot_version_);
|
|
||||||
|
|
||||||
bool is_data_present = false;
|
bool is_data_present = false;
|
||||||
{
|
|
||||||
|
if (it.GetVersion() < snapshot_version_) {
|
||||||
|
it.SetVersion(snapshot_version_);
|
||||||
FiberAtomicGuard fg; // Can't switch fibers because that could invalidate iterator
|
FiberAtomicGuard fg; // Can't switch fibers because that could invalidate iterator
|
||||||
string key_buffer; // we can reuse it
|
string key_buffer; // we can reuse it
|
||||||
for (; !it.is_done(); ++it) {
|
for (; !it.is_done(); ++it) {
|
||||||
|
@ -157,9 +150,7 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req
|
||||||
PrimeTable* table = db_slice_->GetTables(0).first;
|
PrimeTable* table = db_slice_->GetTables(0).first;
|
||||||
|
|
||||||
if (const PrimeTable::bucket_iterator* bit = req.update()) {
|
if (const PrimeTable::bucket_iterator* bit = req.update()) {
|
||||||
if (bit->GetVersion() < snapshot_version_) {
|
WriteBucket(*bit);
|
||||||
WriteBucket(*bit);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
string_view key = get<string_view>(req.change);
|
string_view key = get<string_view>(req.change);
|
||||||
table->CVCUponInsert(snapshot_version_, key, [this](PrimeTable::bucket_iterator it) {
|
table->CVCUponInsert(snapshot_version_, key, [this](PrimeTable::bucket_iterator it) {
|
||||||
|
|
|
@ -57,14 +57,13 @@ class RestoreStreamer : public JournalStreamer {
|
||||||
Context* cntx);
|
Context* cntx);
|
||||||
|
|
||||||
void Start(io::Sink* dest) override;
|
void Start(io::Sink* dest) override;
|
||||||
|
// Cancel() must be called if Start() is called
|
||||||
void Cancel() override;
|
void Cancel() override;
|
||||||
|
|
||||||
bool IsSnapshotFinished() const {
|
bool IsSnapshotFinished() const {
|
||||||
return snapshot_finished_;
|
return snapshot_finished_;
|
||||||
}
|
}
|
||||||
|
|
||||||
~RestoreStreamer();
|
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
|
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
|
||||||
bool ShouldWrite(const journal::JournalItem& item) const override;
|
bool ShouldWrite(const journal::JournalItem& item) const override;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue