mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
feat(cluster): Add RestoreStreamer
. (#2390)
* feat(cluster): Add `RestoreStreamer`. `RestoreStreamer`, like `JournalStreamer`, streams journal changes to a sink. However, in addition, it traverses the DB like `RdbSerializer` and sends existing entries as `RESTORE` commands. Adding it required a bit of plumbing to get all journal changes to be slot-aware. In a follow-up PR I will remove the now unneeded `SerializerBase`. * Fix build * Fix bug * Remove unimplemented function * Iterate DB, drop support for db1+ * Send FULL-SYNC-CUT
This commit is contained in:
parent
f4ea42f2f6
commit
4874da8b5b
23 changed files with 311 additions and 99 deletions
|
@ -4,6 +4,10 @@
|
|||
|
||||
#include "server/journal/streamer.h"
|
||||
|
||||
#include <absl/functional/bind_front.h>
|
||||
|
||||
#include "base/logging.h"
|
||||
|
||||
namespace dfly {
|
||||
using namespace util;
|
||||
|
||||
|
@ -11,10 +15,15 @@ void JournalStreamer::Start(io::Sink* dest) {
|
|||
using namespace journal;
|
||||
write_fb_ = fb2::Fiber("journal_stream", &JournalStreamer::WriterFb, this, dest);
|
||||
journal_cb_id_ = journal_->RegisterOnChange([this](const JournalItem& item, bool allow_await) {
|
||||
if (!ShouldWrite(item)) {
|
||||
return;
|
||||
}
|
||||
|
||||
if (item.opcode == Op::NOOP) {
|
||||
// No record to write, just await if data was written so consumer will read the data.
|
||||
return AwaitIfWritten();
|
||||
}
|
||||
|
||||
Write(io::Buffer(item.data));
|
||||
NotifyWritten(allow_await);
|
||||
});
|
||||
|
@ -40,4 +49,127 @@ void JournalStreamer::WriterFb(io::Sink* dest) {
|
|||
}
|
||||
}
|
||||
|
||||
RestoreStreamer::RestoreStreamer(DbSlice* slice, SlotSet slots, journal::Journal* journal,
|
||||
Context* cntx)
|
||||
: JournalStreamer(journal, cntx), db_slice_(slice), my_slots_(std::move(slots)) {
|
||||
DCHECK(slice != nullptr);
|
||||
}
|
||||
|
||||
void RestoreStreamer::Start(io::Sink* dest) {
|
||||
auto db_cb = absl::bind_front(&RestoreStreamer::OnDbChange, this);
|
||||
snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb));
|
||||
|
||||
JournalStreamer::Start(dest);
|
||||
|
||||
DCHECK(!snapshot_fb_.IsJoinable());
|
||||
snapshot_fb_ = fb2::Fiber("slot-snapshot", [this] {
|
||||
PrimeTable::Cursor cursor;
|
||||
uint64_t last_yield = 0;
|
||||
PrimeTable* pt = &db_slice_->databases()[0]->prime;
|
||||
|
||||
do {
|
||||
if (fiber_cancellation_.IsCancelled())
|
||||
return;
|
||||
|
||||
cursor = pt->Traverse(cursor, absl::bind_front(&RestoreStreamer::WriteBucket, this));
|
||||
|
||||
if (last_yield >= 100) {
|
||||
ThisFiber::Yield();
|
||||
last_yield = 0;
|
||||
}
|
||||
} while (cursor);
|
||||
|
||||
WriteCommand(make_pair("DFLYMIGRATE", ArgSlice{"FULL-SYNC-CUT"}));
|
||||
});
|
||||
}
|
||||
|
||||
void RestoreStreamer::Cancel() {
|
||||
fiber_cancellation_.Cancel();
|
||||
snapshot_fb_.JoinIfNeeded();
|
||||
db_slice_->UnregisterOnChange(snapshot_version_);
|
||||
JournalStreamer::Cancel();
|
||||
}
|
||||
|
||||
bool RestoreStreamer::ShouldWrite(const journal::JournalItem& item) const {
|
||||
if (!item.slot.has_value()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
return ShouldWrite(*item.slot);
|
||||
}
|
||||
|
||||
bool RestoreStreamer::ShouldWrite(SlotId slot_id) const {
|
||||
return my_slots_.contains(slot_id);
|
||||
}
|
||||
|
||||
void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
|
||||
DCHECK_LT(it.GetVersion(), snapshot_version_);
|
||||
it.SetVersion(snapshot_version_);
|
||||
|
||||
while (!it.is_done()) {
|
||||
const auto& pv = it->second;
|
||||
|
||||
string key_buffer;
|
||||
string_view key = it->first.GetSlice(&key_buffer);
|
||||
|
||||
uint64_t expire = 0;
|
||||
if (pv.HasExpire()) {
|
||||
auto eit = db_slice_->databases()[0]->expire.Find(it->first);
|
||||
expire = db_slice_->ExpireTime(eit);
|
||||
}
|
||||
|
||||
WriteEntry(key, pv, expire);
|
||||
|
||||
++it;
|
||||
}
|
||||
}
|
||||
|
||||
void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
|
||||
DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0";
|
||||
|
||||
FiberAtomicGuard fg;
|
||||
PrimeTable* table = db_slice_->GetTables(0).first;
|
||||
|
||||
if (const PrimeTable::bucket_iterator* bit = req.update()) {
|
||||
if (bit->GetVersion() < snapshot_version_) {
|
||||
WriteBucket(*bit);
|
||||
}
|
||||
} else {
|
||||
string_view key = get<string_view>(req.change);
|
||||
table->CVCUponInsert(snapshot_version_, key, [this](PrimeTable::bucket_iterator it) {
|
||||
DCHECK_LT(it.GetVersion(), snapshot_version_);
|
||||
WriteBucket(it);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pv, uint64_t expire_ms) {
|
||||
absl::InlinedVector<string_view, 4> args;
|
||||
|
||||
args.push_back(key);
|
||||
|
||||
string expire_str = absl::StrCat(expire_ms);
|
||||
args.push_back(expire_str);
|
||||
|
||||
io::StringSink value_dump_sink;
|
||||
SerializerBase::DumpObject(pv, &value_dump_sink);
|
||||
args.push_back(value_dump_sink.str());
|
||||
|
||||
args.push_back("ABSTTL"); // Means expire string is since epoch
|
||||
|
||||
WriteCommand(make_pair("RESTORE", ArgSlice{args}));
|
||||
}
|
||||
|
||||
void RestoreStreamer::WriteCommand(journal::Entry::Payload cmd_payload) {
|
||||
journal::Entry entry(0, // txid
|
||||
journal::Op::COMMAND, // single command
|
||||
0, // db index
|
||||
1, // shard count
|
||||
0, // slot-id, but it is ignored at this level
|
||||
cmd_payload);
|
||||
|
||||
JournalWriter writer{this};
|
||||
writer.Write(entry);
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue