fix: reduce memory consumption during migration (#4017)

* refactor: reduce memory consumption for RestoreStreamer
* fix: add Throttling into RestoreStreamer::WriteBucket
This commit is contained in:
Borys 2024-11-03 17:03:45 +02:00 committed by GitHub
parent 5a597cf6cc
commit e4b468d953
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 32 additions and 52 deletions

View file

@ -90,6 +90,7 @@ void JournalStreamer::Write(std::string_view str) {
DVLOG(2) << "Writing " << str.size() << " bytes";
size_t total_pending = pending_buf_.size() + str.size();
if (in_flight_bytes_ > 0) {
// We can not flush data while there are in flight requests because AsyncWrite
// is not atomic. Therefore, we just aggregate.
@ -212,17 +213,11 @@ void RestoreStreamer::Run() {
if (fiber_cancelled_)
return;
bool written = false;
cursor = db_slice_->Traverse(pt, cursor, [&](PrimeTable::bucket_iterator it) {
db_slice_->FlushChangeToEarlierCallbacks(0 /*db_id always 0 for cluster*/,
DbSlice::Iterator::FromPrime(it), snapshot_version_);
if (WriteBucket(it)) {
written = true;
}
WriteBucket(it);
});
if (written) {
ThrottleIfNeeded();
}
if (++last_yield >= 100) {
ThisFiber::Yield();
@ -282,18 +277,15 @@ bool RestoreStreamer::ShouldWrite(cluster::SlotId slot_id) const {
return my_slots_.Contains(slot_id);
}
bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
bool written = false;
void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
if (it.GetVersion() < snapshot_version_) {
FiberAtomicGuard fg;
it.SetVersion(snapshot_version_);
string key_buffer; // we can reuse it
for (; !it.is_done(); ++it) {
const auto& pv = it->second;
string_view key = it->first.GetSlice(&key_buffer);
if (ShouldWrite(key)) {
written = true;
uint64_t expire = 0;
if (pv.HasExpire()) {
auto eit = db_slice_->databases()[0]->expire.Find(it->first);
@ -304,8 +296,7 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
}
}
}
return written;
ThrottleIfNeeded();
}
void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
@ -332,33 +323,31 @@ void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pk, const Pr
string expire_str = absl::StrCat(expire_ms);
args.push_back(expire_str);
io::StringSink value_dump_sink;
SerializerBase::DumpObject(pv, &value_dump_sink);
args.push_back(value_dump_sink.str());
io::StringSink restore_cmd_sink;
{ // to destroy extra copy
io::StringSink value_dump_sink;
SerializerBase::DumpObject(pv, &value_dump_sink);
args.push_back(value_dump_sink.str());
args.push_back("ABSTTL"); // Means expire string is since epoch
args.push_back("ABSTTL"); // Means expire string is since epoch
if (pk.IsSticky()) {
args.push_back("STICK");
if (pk.IsSticky()) {
args.push_back("STICK");
}
journal::Entry entry(0, // txid
journal::Op::COMMAND, // single command
0, // db index
1, // shard count
0, // slot-id, but it is ignored at this level
journal::Entry::Payload("RESTORE", ArgSlice(args)));
JournalWriter writer{&restore_cmd_sink};
writer.Write(entry);
}
WriteCommand(journal::Entry::Payload("RESTORE", ArgSlice(args)));
}
void RestoreStreamer::WriteCommand(journal::Entry::Payload cmd_payload) {
journal::Entry entry(0, // txid
journal::Op::COMMAND, // single command
0, // db index
1, // shard count
0, // slot-id, but it is ignored at this level
cmd_payload);
// TODO: From WriteEntry to till Write we tripple copy the PrimeValue. It's ver in-efficient and
// TODO: From DumpObject to till Write we tripple copy the PrimeValue. It's very inefficient and
// will burn CPU for large values.
io::StringSink sink;
JournalWriter writer{&sink};
writer.Write(entry);
Write(sink.str());
Write(restore_cmd_sink.str());
}
} // namespace dfly

View file

@ -97,9 +97,8 @@ class RestoreStreamer : public JournalStreamer {
bool ShouldWrite(cluster::SlotId slot_id) const;
// Returns whether anything was written
bool WriteBucket(PrimeTable::bucket_iterator it);
void WriteBucket(PrimeTable::bucket_iterator it);
void WriteEntry(string_view key, const PrimeValue& pk, const PrimeValue& pv, uint64_t expire_ms);
void WriteCommand(journal::Entry::Payload cmd_payload);
DbSlice* db_slice_;
DbTableArray db_array_;

View file

@ -1994,7 +1994,7 @@ async def test_replicate_disconnect_redis_cluster(redis_cluster, df_factory, df_
@pytest.mark.skip("Takes more than 10 minutes")
@dfly_args({"proactor_threads": 12, "cluster_mode": "yes"})
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFactory):
# Check data migration from one node to another
instances = [
@ -2002,29 +2002,21 @@ async def test_cluster_memory_consumption_migration(df_factory: DflyInstanceFact
maxmemory="15G",
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
vmodule="streamer=9",
)
for i in range(2)
for i in range(3)
]
replica = df_factory.create(
port=BASE_PORT + 3,
admin_port=BASE_PORT + 3 + 1000,
vmodule="outgoing_slot_migration=9,cluster_family=9,incoming_slot_migration=9",
)
df_factory.start_all(instances + [replica])
df_factory.start_all(instances)
nodes = [(await create_node_info(instance)) for instance in instances]
nodes[0].slots = [(0, 16383)]
for i in range(1, len(instances)):
nodes[i].slots = []
await replica.admin_client().execute_command(f"replicaof localhost {nodes[0].instance.port}")
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
await nodes[0].client.execute_command("DEBUG POPULATE 22500000 test 1000 RAND SLOTS 0 16383")
await nodes[0].client.execute_command("DEBUG POPULATE 5000000 test 1000 RAND SLOTS 0 16383")
await asyncio.sleep(2)