diff --git a/src/server/engine_shard.cc b/src/server/engine_shard.cc index 395f54036..98354cd96 100644 --- a/src/server/engine_shard.cc +++ b/src/server/engine_shard.cc @@ -308,7 +308,7 @@ string EngineShard::TxQueueInfo::Format() const { } EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) { - static_assert(sizeof(Stats) == 88); + static_assert(sizeof(Stats) == 96); #define ADD(x) x += o.x @@ -323,6 +323,7 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) ADD(total_heartbeat_expired_keys); ADD(total_heartbeat_expired_bytes); ADD(total_heartbeat_expired_calls); + ADD(total_migrated_keys); #undef ADD return *this; diff --git a/src/server/engine_shard.h b/src/server/engine_shard.h index 03b29bc2e..321c681bb 100644 --- a/src/server/engine_shard.h +++ b/src/server/engine_shard.h @@ -48,6 +48,9 @@ class EngineShard { uint64_t total_heartbeat_expired_bytes = 0; uint64_t total_heartbeat_expired_calls = 0; + // cluster stats + uint64_t total_migrated_keys = 0; + Stats& operator+=(const Stats&); }; diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 46bf8dee9..db57b6128 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -8,7 +8,7 @@ #include "base/flags.h" #include "base/logging.h" -#include "server/cluster/cluster_defs.h" +#include "server/engine_shard.h" #include "server/journal/cmd_serializer.h" #include "server/server_state.h" #include "util/fibers/synchronization.h" @@ -298,6 +298,7 @@ bool RestoreStreamer::ShouldWrite(SlotId slot_id) const { } bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { + auto& shard_stats = EngineShard::tlocal()->stats(); bool written = false; if (!it.is_done() && it.GetVersion() < snapshot_version_) { @@ -309,7 +310,8 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { const auto& pv = it->second; string_view key = it->first.GetSlice(&key_buffer); if (ShouldWrite(key)) { - stats_.keys_written++; + ++stats_.keys_written; + ++shard_stats.total_migrated_keys; uint64_t expire = 0; if (pv.HasExpire()) { auto eit = db_slice_->databases()[0]->expire.Find(it->first); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 967d15db0..bbd9d09b7 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -2828,6 +2828,7 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio if (should_enter("CLUSTER")) { append("cluster_enabled", IsClusterEnabledOrEmulated()); append("migration_errors_total", service_.cluster_family().MigrationsErrorsCount()); + append("total_migrated_keys", m.shard_stats.total_migrated_keys); } return info; diff --git a/tests/dragonfly/cluster_test.py b/tests/dragonfly/cluster_test.py index d4f2838be..ac5faa7c0 100644 --- a/tests/dragonfly/cluster_test.py +++ b/tests/dragonfly/cluster_test.py @@ -1492,8 +1492,9 @@ async def test_network_disconnect_during_migration(df_factory): await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes]) for _ in range(10): - await asyncio.sleep(random.randint(0, 10) / 100) - logging.debug("drop connections") + await asyncio.sleep(random.randint(0, 50) / 100) + info = await nodes[0].admin_client.info("CLUSTER") + logging.debug("drop connection: %s", info) proxy.drop_connection() logging.debug( await nodes[0].admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")