fix(cluster): fix slot filtration to RestoreStreamer (#2477)

* fix(cluster): fix slot filtration to RestoreStreamer

* test: add cluster data migration test
This commit is contained in:
Borys 2024-01-28 12:29:54 +02:00 committed by GitHub
parent d608ec9c62
commit 43808da27f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 136 additions and 42 deletions

View file

@ -113,6 +113,10 @@ bool RestoreStreamer::ShouldWrite(const journal::JournalItem& item) const {
return ShouldWrite(*item.slot);
}
bool RestoreStreamer::ShouldWrite(std::string_view key) const {
return ShouldWrite(ClusterConfig::KeySlot(key));
}
bool RestoreStreamer::ShouldWrite(SlotId slot_id) const {
return my_slots_.contains(slot_id);
}
@ -121,28 +125,29 @@ void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
DCHECK_LT(it.GetVersion(), snapshot_version_);
it.SetVersion(snapshot_version_);
bool is_data_present = false;
{
FiberAtomicGuard fg; // Can't switch fibers because that could invalidate iterator
while (!it.is_done()) {
string key_buffer; // we can reuse it
for (; !it.is_done(); ++it) {
const auto& pv = it->second;
string key_buffer;
string_view key = it->first.GetSlice(&key_buffer);
if (ShouldWrite(key)) {
is_data_present = true;
uint64_t expire = 0;
if (pv.HasExpire()) {
auto eit = db_slice_->databases()[0]->expire.Find(it->first);
expire = db_slice_->ExpireTime(eit);
uint64_t expire = 0;
if (pv.HasExpire()) {
auto eit = db_slice_->databases()[0]->expire.Find(it->first);
expire = db_slice_->ExpireTime(eit);
}
WriteEntry(key, pv, expire);
}
WriteEntry(key, pv, expire);
++it;
}
}
NotifyWritten(true);
if (is_data_present)
NotifyWritten(true);
}
void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
@ -166,7 +171,6 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req
void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pv, uint64_t expire_ms) {
absl::InlinedVector<string_view, 4> args;
args.push_back(key);
string expire_str = absl::StrCat(expire_ms);