From 409d22b1e6fcb94a4cea231beae713ffd27e258e Mon Sep 17 00:00:00 2001 From: Shahar Mike Date: Thu, 11 Jan 2024 12:56:09 +0200 Subject: [PATCH] feat(cluster): Add params to slot migration full sync cut (#2403) --- src/server/journal/streamer.cc | 13 +++++++++---- src/server/journal/streamer.h | 5 ++++- 2 files changed, 13 insertions(+), 5 deletions(-) diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 43c808849..0f3084dc7 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -49,9 +49,13 @@ void JournalStreamer::WriterFb(io::Sink* dest) { } } -RestoreStreamer::RestoreStreamer(DbSlice* slice, SlotSet slots, journal::Journal* journal, - Context* cntx) - : JournalStreamer(journal, cntx), db_slice_(slice), my_slots_(std::move(slots)) { +RestoreStreamer::RestoreStreamer(DbSlice* slice, SlotSet slots, uint32_t sync_id, uint32_t flow_id, + journal::Journal* journal, Context* cntx) + : JournalStreamer(journal, cntx), + db_slice_(slice), + my_slots_(std::move(slots)), + sync_id_(sync_id), + flow_id_(flow_id) { DCHECK(slice != nullptr); } @@ -79,7 +83,8 @@ void RestoreStreamer::Start(io::Sink* dest) { } } while (cursor); - WriteCommand(make_pair("DFLYMIGRATE", ArgSlice{"FULL-SYNC-CUT"})); + WriteCommand(make_pair( + "DFLYMIGRATE", ArgSlice{"FULL-SYNC-CUT", absl::StrCat(sync_id_), absl::StrCat(flow_id_)})); }); } diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index f77f8f27d..9b0799af3 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -53,7 +53,8 @@ class JournalStreamer : protected BufferedStreamerBase { // Only handles relevant slots, while ignoring all others. class RestoreStreamer : public JournalStreamer { public: - RestoreStreamer(DbSlice* slice, SlotSet slots, journal::Journal* journal, Context* cntx); + RestoreStreamer(DbSlice* slice, SlotSet slots, uint32_t sync_id, uint32_t flow_id, + journal::Journal* journal, Context* cntx); void Start(io::Sink* dest) override; void Cancel() override; @@ -70,6 +71,8 @@ class RestoreStreamer : public JournalStreamer { DbSlice* db_slice_; uint64_t snapshot_version_ = 0; SlotSet my_slots_; + uint32_t sync_id_; + uint32_t flow_id_; Fiber snapshot_fb_; Cancellation fiber_cancellation_; };