server: control cluster migration speed with flag (#4734)

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2025-03-09 16:25:47 +02:00 committed by GitHub
parent 50a4bfd3cb
commit 17ebb35aca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 10 additions and 2 deletions

View file

@ -21,6 +21,9 @@ ABSL_FLAG(uint32_t, replication_timeout, 30000,
ABSL_FLAG(uint32_t, replication_stream_output_limit, 64_KB,
"Time to wait for the replication output buffer go below the throttle limit");
ABSL_FLAG(uint32_t, migration_buckets_serialization_threshold, 100,
"The Number of buckets to serialize on each iteration before yielding");
namespace dfly {
using namespace util;
using namespace journal;
@ -32,6 +35,7 @@ iovec IoVec(io::Bytes src) {
}
uint32_t replication_stream_output_limit_cached = 64_KB;
uint32_t migration_buckets_serialization_threshold_cached = 100;
} // namespace
@ -186,6 +190,8 @@ RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal
ExecutionState* cntx)
: JournalStreamer(journal, cntx), db_slice_(slice), my_slots_(std::move(slots)) {
DCHECK(slice != nullptr);
migration_buckets_serialization_threshold_cached =
absl::GetFlag(FLAGS_migration_buckets_serialization_threshold);
db_array_ = slice->databases(); // Inc ref to make sure DB isn't deleted while we use it
}
@ -229,7 +235,7 @@ void RestoreStreamer::Run() {
stats_.buckets_loop += WriteBucket(it);
});
if (++last_yield >= 100) {
if (++last_yield >= migration_buckets_serialization_threshold_cached) {
ThisFiber::Yield();
last_yield = 0;
}

View file

@ -2136,7 +2136,9 @@ async def test_cluster_migration_huge_container(df_factory: DflyInstanceFactory)
assert extract_int_after_prefix("buckets on_db_update ", line) == 0
@dfly_args({"proactor_threads": 2, "cluster_mode": "yes"})
@dfly_args(
{"proactor_threads": 2, "cluster_mode": "yes", "migration_buckets_serialization_threshold": 3}
)
@pytest.mark.parametrize("chunk_size", [1_000_000, 30])
@pytest.mark.asyncio
@pytest.mark.exclude_epoll