feat(cluster): Add params to slot migration full sync cut (#2403)

This commit is contained in:
Shahar Mike 2024-01-11 12:56:09 +02:00 committed by GitHub
parent 7b61268533
commit 409d22b1e6
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 13 additions and 5 deletions

View file

@ -49,9 +49,13 @@ void JournalStreamer::WriterFb(io::Sink* dest) {
} }
} }
RestoreStreamer::RestoreStreamer(DbSlice* slice, SlotSet slots, journal::Journal* journal, RestoreStreamer::RestoreStreamer(DbSlice* slice, SlotSet slots, uint32_t sync_id, uint32_t flow_id,
Context* cntx) journal::Journal* journal, Context* cntx)
: JournalStreamer(journal, cntx), db_slice_(slice), my_slots_(std::move(slots)) { : JournalStreamer(journal, cntx),
db_slice_(slice),
my_slots_(std::move(slots)),
sync_id_(sync_id),
flow_id_(flow_id) {
DCHECK(slice != nullptr); DCHECK(slice != nullptr);
} }
@ -79,7 +83,8 @@ void RestoreStreamer::Start(io::Sink* dest) {
} }
} while (cursor); } while (cursor);
WriteCommand(make_pair("DFLYMIGRATE", ArgSlice{"FULL-SYNC-CUT"})); WriteCommand(make_pair(
"DFLYMIGRATE", ArgSlice{"FULL-SYNC-CUT", absl::StrCat(sync_id_), absl::StrCat(flow_id_)}));
}); });
} }

View file

@ -53,7 +53,8 @@ class JournalStreamer : protected BufferedStreamerBase {
// Only handles relevant slots, while ignoring all others. // Only handles relevant slots, while ignoring all others.
class RestoreStreamer : public JournalStreamer { class RestoreStreamer : public JournalStreamer {
public: public:
RestoreStreamer(DbSlice* slice, SlotSet slots, journal::Journal* journal, Context* cntx); RestoreStreamer(DbSlice* slice, SlotSet slots, uint32_t sync_id, uint32_t flow_id,
journal::Journal* journal, Context* cntx);
void Start(io::Sink* dest) override; void Start(io::Sink* dest) override;
void Cancel() override; void Cancel() override;
@ -70,6 +71,8 @@ class RestoreStreamer : public JournalStreamer {
DbSlice* db_slice_; DbSlice* db_slice_;
uint64_t snapshot_version_ = 0; uint64_t snapshot_version_ = 0;
SlotSet my_slots_; SlotSet my_slots_;
uint32_t sync_id_;
uint32_t flow_id_;
Fiber snapshot_fb_; Fiber snapshot_fb_;
Cancellation fiber_cancellation_; Cancellation fiber_cancellation_;
}; };