fix: fix cluster incorrect keys status (#3083)

* fix: fix cluster incorrect keys status
This commit is contained in:
Borys 2024-05-26 15:10:01 +03:00 committed by GitHub
parent 5534ee5c23
commit 0dea257f41
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
12 changed files with 78 additions and 38 deletions

View file

@ -28,7 +28,7 @@ endif()
add_library(dfly_transaction db_slice.cc malloc_stats.cc blocking_controller.cc
command_registry.cc cluster/unique_slot_checker.cc
command_registry.cc cluster/cluster_utility.cc
journal/tx_executor.cc
common.cc journal/journal.cc journal/types.cc journal/journal_slice.cc
server_state.cc table.cc top_keys.cc transaction.cc tx_base.cc

View file

@ -660,26 +660,6 @@ static string_view StateToStr(MigrationState state) {
return "UNDEFINED_STATE"sv;
}
static uint64_t GetKeyCount(const SlotRanges& slots) {
atomic_uint64_t keys = 0;
shard_set->pool()->AwaitFiberOnAll([&](auto*) {
EngineShard* shard = EngineShard::tlocal();
if (shard == nullptr)
return;
uint64_t shard_keys = 0;
for (const SlotRange& range : slots) {
for (SlotId slot = range.start; slot <= range.end; slot++) {
shard_keys += shard->db_slice().GetSlotStats(slot).key_count;
}
}
keys.fetch_add(shard_keys);
});
return keys.load();
}
void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
CmdArgParser parser(args);
@ -698,22 +678,21 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext*
reply.reserve(incoming_migrations_jobs_.size() + outgoing_migration_jobs_.size());
auto append_answer = [rb, &reply](string_view direction, string_view node_id, string_view filter,
MigrationState state, const SlotRanges& slots,
string_view error) {
MigrationState state, size_t keys_number, string_view error) {
if (filter.empty() || filter == node_id) {
error = error.empty() ? "0" : error;
reply.push_back(absl::StrCat(direction, " ", node_id, " ", StateToStr(state),
" keys:", GetKeyCount(slots), " errors: ", error));
" keys:", keys_number, " errors: ", error));
}
};
for (const auto& m : incoming_migrations_jobs_) {
// TODO add error status
append_answer("in", m->GetSourceID(), node_id, m->GetState(), m->GetSlots(), "");
append_answer("in", m->GetSourceID(), node_id, m->GetState(), m->GetKeyCount(), "");
}
for (const auto& migration : outgoing_migration_jobs_) {
append_answer("out", migration->GetMigrationInfo().node_id, node_id, migration->GetState(),
migration->GetSlots(), migration->GetErrorStr());
migration->GetKeyCount(), migration->GetErrorStr());
}
if (reply.empty()) {
@ -927,6 +906,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) {
if (!migration)
return cntx->SendError(kIdNotFound);
// TODO add timeout for join because it can fail
migration->Join();
VLOG(1) << "Migration is joined for " << source_id;

View file

@ -1,6 +1,7 @@
#include "server/cluster/unique_slot_checker.h"
#include "server/cluster/cluster_utility.h"
#include "server/cluster/cluster_defs.h"
#include "server/engine_shard_set.h"
using namespace std;
@ -37,4 +38,24 @@ optional<SlotId> UniqueSlotChecker::GetUniqueSlotId() const {
return slot_id_;
}
uint64_t GetKeyCount(const SlotRanges& slots) {
std::atomic_uint64_t keys = 0;
shard_set->pool()->AwaitFiberOnAll([&](auto*) {
EngineShard* shard = EngineShard::tlocal();
if (shard == nullptr)
return;
uint64_t shard_keys = 0;
for (const SlotRange& range : slots) {
for (SlotId slot = range.start; slot <= range.end; slot++) {
shard_keys += shard->db_slice().GetSlotStats(slot).key_count;
}
}
keys.fetch_add(shard_keys);
});
return keys.load();
}
} // namespace dfly::cluster

View file

@ -24,4 +24,6 @@ class UniqueSlotChecker {
std::optional<SlotId> slot_id_;
};
uint64_t GetKeyCount(const SlotRanges& slots);
} // namespace dfly::cluster

View file

@ -6,6 +6,7 @@
#include "absl/cleanup/cleanup.h"
#include "base/logging.h"
#include "cluster_utility.h"
#include "server/error.h"
#include "server/journal/executor.h"
#include "server/journal/tx_executor.h"
@ -50,7 +51,7 @@ class ClusterShardMigration {
while (tx_data->opcode == journal::Op::FIN) {
VLOG(2) << "Attempt to finalize flow " << source_shard_id_;
bc->Dec(); // we can Join the flow now
// if we get new data attempt is failed
// if we get new data, attempt is failed
if (tx_data = tx_reader.NextTxData(&reader, cntx); !tx_data) {
VLOG(1) << "Finalized flow " << source_shard_id_;
return;
@ -120,8 +121,10 @@ IncomingSlotMigration::~IncomingSlotMigration() {
}
void IncomingSlotMigration::Join() {
// TODO add timeout
bc_->Wait();
state_ = MigrationState::C_FINISHED;
state_.store(MigrationState::C_FINISHED);
keys_number_ = cluster::GetKeyCount(slots_);
}
void IncomingSlotMigration::Cancel() {
@ -141,4 +144,11 @@ void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* sou
shard_flows_[shard]->Start(&cntx_, source, bc_);
}
size_t IncomingSlotMigration::GetKeyCount() const {
if (state_.load() == MigrationState::C_FINISHED) {
return keys_number_;
}
return cluster::GetKeyCount(slots_);
}
} // namespace dfly::cluster

View file

@ -47,6 +47,8 @@ class IncomingSlotMigration {
return source_id_;
}
size_t GetKeyCount() const;
private:
std::string source_id_;
Service& service_;
@ -54,6 +56,9 @@ class IncomingSlotMigration {
SlotRanges slots_;
std::atomic<MigrationState> state_ = MigrationState::C_NO_STATE;
Context cntx_;
// when migration is finished we need to store number of migrated keys
// because new request can add or remove keys and we get incorrect statistic
size_t keys_number_ = 0;
util::fb2::BlockingCounter bc_;
};

View file

@ -11,6 +11,7 @@
#include "absl/cleanup/cleanup.h"
#include "base/logging.h"
#include "cluster_family.h"
#include "cluster_utility.h"
#include "server/db_slice.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
@ -270,6 +271,7 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
auto is_error = CheckFlowsForErrors();
Finish(is_error);
if (!is_error) {
keys_number_ = cluster::GetKeyCount(migration_info_.slot_ranges);
cf_->UpdateConfig(migration_info_.slot_ranges, false);
VLOG(1) << "Config is updated for " << cf_->MyID();
}
@ -295,4 +297,11 @@ bool OutgoingMigration::CheckFlowsForErrors() {
}
return false;
}
size_t OutgoingMigration::GetKeyCount() const {
if (state_ == MigrationState::C_FINISHED) {
return keys_number_;
}
return cluster::GetKeyCount(migration_info_.slot_ranges);
}
} // namespace dfly::cluster

View file

@ -53,6 +53,8 @@ class OutgoingMigration : private ProtocolClient {
return last_error_.Format();
}
size_t GetKeyCount() const;
static constexpr long kInvalidAttempt = -1;
static constexpr std::string_view kUnknownMigration = "UNKNOWN_MIGRATION";
@ -84,6 +86,10 @@ class OutgoingMigration : private ProtocolClient {
mutable util::fb2::Mutex state_mu_;
MigrationState state_ ABSL_GUARDED_BY(state_mu_) = MigrationState::C_NO_STATE;
// when migration is finished we need to store number of migrated keys
// because new request can add or remove keys and we get incorrect statistic
size_t keys_number_ = 0;
};
} // namespace dfly::cluster

View file

@ -35,7 +35,7 @@ extern "C" {
#include "server/bitops_family.h"
#include "server/bloom_family.h"
#include "server/cluster/cluster_family.h"
#include "server/cluster/unique_slot_checker.h"
#include "server/cluster/cluster_utility.h"
#include "server/conn_context.h"
#include "server/error.h"
#include "server/generic_family.h"

View file

@ -8,7 +8,7 @@
#include "base/logging.h"
#include "facade/dragonfly_connection.h"
#include "server/cluster/unique_slot_checker.h"
#include "server/cluster/cluster_utility.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/engine_shard_set.h"

View file

@ -18,7 +18,7 @@
#include "core/intent_lock.h"
#include "core/tx_queue.h"
#include "facade/op_status.h"
#include "server/cluster/unique_slot_checker.h"
#include "server/cluster/cluster_utility.h"
#include "server/common.h"
#include "server/journal/types.h"
#include "server/table.h"

View file

@ -1067,12 +1067,17 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory):
c_nodes_admin,
)
await asyncio.sleep(0.5)
await wait_for_status(c_nodes_admin[1], node_ids[0], "FINISHED")
while "FINISHED" not in await c_nodes_admin[1].execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[0]
):
await asyncio.sleep(0.05)
assert await c_nodes[1].set("KEY20", "value")
assert await c_nodes[1].set("KEY21", "value")
assert (
await c_nodes_admin[0].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[1])
).startswith(f"""out {node_ids[1]} FINISHED keys:7""")
assert (
await c_nodes_admin[1].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS", node_ids[0])
).startswith(f"""in {node_ids[0]} FINISHED keys:7""")
await push_config(
config.replace("LAST_SLOT_CUTOFF", "2999").replace("NEXT_SLOT_CUTOFF", "3000"),
@ -1099,7 +1104,9 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory):
assert await c_nodes[1].get("KEY17") == "value"
assert await c_nodes[1].get("KEY18") == "value"
assert await c_nodes[1].get("KEY19") == "value"
assert await c_nodes[1].execute_command("DBSIZE") == 17
assert await c_nodes[1].get("KEY20") == "value"
assert await c_nodes[1].get("KEY21") == "value"
assert await c_nodes[1].execute_command("DBSIZE") == 19
assert (
await c_nodes_admin[1].execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS") == "NO_STATE"