mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
feat: add migrated keys statistic (#5043)
This commit is contained in:
parent
54328fd00e
commit
8c125a23e3
5 changed files with 13 additions and 5 deletions
|
@ -308,7 +308,7 @@ string EngineShard::TxQueueInfo::Format() const {
|
||||||
}
|
}
|
||||||
|
|
||||||
EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) {
|
EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) {
|
||||||
static_assert(sizeof(Stats) == 88);
|
static_assert(sizeof(Stats) == 96);
|
||||||
|
|
||||||
#define ADD(x) x += o.x
|
#define ADD(x) x += o.x
|
||||||
|
|
||||||
|
@ -323,6 +323,7 @@ EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o)
|
||||||
ADD(total_heartbeat_expired_keys);
|
ADD(total_heartbeat_expired_keys);
|
||||||
ADD(total_heartbeat_expired_bytes);
|
ADD(total_heartbeat_expired_bytes);
|
||||||
ADD(total_heartbeat_expired_calls);
|
ADD(total_heartbeat_expired_calls);
|
||||||
|
ADD(total_migrated_keys);
|
||||||
|
|
||||||
#undef ADD
|
#undef ADD
|
||||||
return *this;
|
return *this;
|
||||||
|
|
|
@ -48,6 +48,9 @@ class EngineShard {
|
||||||
uint64_t total_heartbeat_expired_bytes = 0;
|
uint64_t total_heartbeat_expired_bytes = 0;
|
||||||
uint64_t total_heartbeat_expired_calls = 0;
|
uint64_t total_heartbeat_expired_calls = 0;
|
||||||
|
|
||||||
|
// cluster stats
|
||||||
|
uint64_t total_migrated_keys = 0;
|
||||||
|
|
||||||
Stats& operator+=(const Stats&);
|
Stats& operator+=(const Stats&);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
|
@ -8,7 +8,7 @@
|
||||||
|
|
||||||
#include "base/flags.h"
|
#include "base/flags.h"
|
||||||
#include "base/logging.h"
|
#include "base/logging.h"
|
||||||
#include "server/cluster/cluster_defs.h"
|
#include "server/engine_shard.h"
|
||||||
#include "server/journal/cmd_serializer.h"
|
#include "server/journal/cmd_serializer.h"
|
||||||
#include "server/server_state.h"
|
#include "server/server_state.h"
|
||||||
#include "util/fibers/synchronization.h"
|
#include "util/fibers/synchronization.h"
|
||||||
|
@ -298,6 +298,7 @@ bool RestoreStreamer::ShouldWrite(SlotId slot_id) const {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
|
bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
|
||||||
|
auto& shard_stats = EngineShard::tlocal()->stats();
|
||||||
bool written = false;
|
bool written = false;
|
||||||
|
|
||||||
if (!it.is_done() && it.GetVersion() < snapshot_version_) {
|
if (!it.is_done() && it.GetVersion() < snapshot_version_) {
|
||||||
|
@ -309,7 +310,8 @@ bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
|
||||||
const auto& pv = it->second;
|
const auto& pv = it->second;
|
||||||
string_view key = it->first.GetSlice(&key_buffer);
|
string_view key = it->first.GetSlice(&key_buffer);
|
||||||
if (ShouldWrite(key)) {
|
if (ShouldWrite(key)) {
|
||||||
stats_.keys_written++;
|
++stats_.keys_written;
|
||||||
|
++shard_stats.total_migrated_keys;
|
||||||
uint64_t expire = 0;
|
uint64_t expire = 0;
|
||||||
if (pv.HasExpire()) {
|
if (pv.HasExpire()) {
|
||||||
auto eit = db_slice_->databases()[0]->expire.Find(it->first);
|
auto eit = db_slice_->databases()[0]->expire.Find(it->first);
|
||||||
|
|
|
@ -2828,6 +2828,7 @@ string ServerFamily::FormatInfoMetrics(const Metrics& m, std::string_view sectio
|
||||||
if (should_enter("CLUSTER")) {
|
if (should_enter("CLUSTER")) {
|
||||||
append("cluster_enabled", IsClusterEnabledOrEmulated());
|
append("cluster_enabled", IsClusterEnabledOrEmulated());
|
||||||
append("migration_errors_total", service_.cluster_family().MigrationsErrorsCount());
|
append("migration_errors_total", service_.cluster_family().MigrationsErrorsCount());
|
||||||
|
append("total_migrated_keys", m.shard_stats.total_migrated_keys);
|
||||||
}
|
}
|
||||||
|
|
||||||
return info;
|
return info;
|
||||||
|
|
|
@ -1492,8 +1492,9 @@ async def test_network_disconnect_during_migration(df_factory):
|
||||||
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
await push_config(json.dumps(generate_config(nodes)), [node.admin_client for node in nodes])
|
||||||
|
|
||||||
for _ in range(10):
|
for _ in range(10):
|
||||||
await asyncio.sleep(random.randint(0, 10) / 100)
|
await asyncio.sleep(random.randint(0, 50) / 100)
|
||||||
logging.debug("drop connections")
|
info = await nodes[0].admin_client.info("CLUSTER")
|
||||||
|
logging.debug("drop connection: %s", info)
|
||||||
proxy.drop_connection()
|
proxy.drop_connection()
|
||||||
logging.debug(
|
logging.debug(
|
||||||
await nodes[0].admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
|
await nodes[0].admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue