diff --git a/src/server/cluster/cluster_config.cc b/src/server/cluster/cluster_config.cc index 2a2d87ac0..416887594 100644 --- a/src/server/cluster/cluster_config.cc +++ b/src/server/cluster/cluster_config.cc @@ -4,6 +4,8 @@ extern "C" { #include "redis/crc16.h" } +#include + #include #include #include @@ -152,11 +154,7 @@ shared_ptr ClusterConfig::CreateFromConfig(string_view my_id, shard.master.id == my_id || any_of(shard.replicas.begin(), shard.replicas.end(), [&](const Node& node) { return node.id == my_id; }); if (owned_by_me) { - for (const auto& slot_range : shard.slot_ranges) { - for (SlotId i = slot_range.start; i <= slot_range.end; ++i) { - result->my_slots_.set(i); - } - } + result->my_slots_.Set(shard.slot_ranges, true); } } @@ -175,13 +173,13 @@ template optional ReadNumeric(const JsonType& obj) { return obj.as(); } -optional> GetClusterSlotRanges(const JsonType& slots) { +optional GetClusterSlotRanges(const JsonType& slots) { if (!slots.is_array()) { LOG(WARNING) << kInvalidConfigPrefix << "slot_ranges is not an array " << slots; return nullopt; } - vector ranges; + SlotRanges ranges; for (const auto& range : slots.array_range()) { if (!range.is_object()) { @@ -301,22 +299,17 @@ shared_ptr ClusterConfig::CreateFromConfig(string_view my_id, std::shared_ptr ClusterConfig::CloneWithChanges(const std::vector& slots, bool enable) const { auto new_config = std::make_shared(*this); - - auto slot_set = ToSlotSet(slots); - - for (const auto s : slot_set) { - new_config->my_slots_.set(s, enable); - } + new_config->my_slots_.Set(slots, enable); return new_config; } bool ClusterConfig::IsMySlot(SlotId id) const { - if (id >= my_slots_.size()) { + if (id > ClusterConfig::kMaxSlotNum) { DCHECK(false) << "Requesting a non-existing slot id " << id; return false; } - return my_slots_.test(id); + return my_slots_.Contains(id); } bool ClusterConfig::IsMySlot(std::string_view key) const { @@ -324,7 +317,7 @@ bool ClusterConfig::IsMySlot(std::string_view key) const { } ClusterConfig::Node ClusterConfig::GetMasterNodeForSlot(SlotId id) const { - CHECK_LT(id, my_slots_.size()) << "Requesting a non-existing slot id " << id; + CHECK_LE(id, ClusterConfig::kMaxSlotNum) << "Requesting a non-existing slot id " << id; for (const auto& shard : config_) { for (const auto& range : shard.slot_ranges) { @@ -342,23 +335,8 @@ ClusterConfig::ClusterShards ClusterConfig::GetConfig() const { return config_; } -SlotSet ClusterConfig::GetOwnedSlots() const { - SlotSet set; - for (SlotId id = 0; id <= kMaxSlotNum; ++id) { - if (IsMySlot(id)) { - set.insert(id); - } - } - return set; -} - -SlotSet ToSlotSet(const std::vector& slots) { - SlotSet sset; - for (const auto& slot_range : slots) { - for (auto i = slot_range.start; i <= slot_range.end; ++i) - sset.insert(i); - } - return sset; +const SlotSet& ClusterConfig::GetOwnedSlots() const { + return my_slots_; } } // namespace dfly diff --git a/src/server/cluster/cluster_config.h b/src/server/cluster/cluster_config.h index e31f8af05..34a7ff9c1 100644 --- a/src/server/cluster/cluster_config.h +++ b/src/server/cluster/cluster_config.h @@ -4,24 +4,17 @@ #pragma once -#include - #include -#include #include #include #include #include "core/json/json_object.h" -#include "src/core/fibers.h" +#include "src/server/cluster/slot_set.h" #include "src/server/common.h" namespace dfly { -using SlotId = uint16_t; -// TODO consider to use bit set or some more compact way to store SlotId -using SlotSet = absl::flat_hash_set; - // MigrationState constants are ordered in state changing order enum class MigrationState : uint8_t { C_NO_STATE, @@ -43,13 +36,8 @@ class ClusterConfig { uint16_t port = 0; }; - struct SlotRange { - SlotId start = 0; - SlotId end = 0; - }; - struct ClusterShard { - std::vector slot_ranges; + SlotRanges slot_ranges; Node master; std::vector replicas; }; @@ -94,7 +82,7 @@ class ClusterConfig { ClusterShards GetConfig() const; - SlotSet GetOwnedSlots() const; + const SlotSet& GetOwnedSlots() const; private: struct SlotEntry { @@ -106,10 +94,7 @@ class ClusterConfig { ClusterShards config_; - // True bits in `my_slots_` indicate that this slot is owned by this node. - std::bitset my_slots_; + SlotSet my_slots_; }; -SlotSet ToSlotSet(const std::vector& slots); - } // namespace dfly diff --git a/src/server/cluster/cluster_config_test.cc b/src/server/cluster/cluster_config_test.cc index 6e1b603b0..f63f6c023 100644 --- a/src/server/cluster/cluster_config_test.cc +++ b/src/server/cluster/cluster_config_test.cc @@ -88,7 +88,7 @@ TEST_F(ClusterConfigTest, ConfigSetOk) { EXPECT_NE(config, nullptr); EXPECT_THAT(config->GetMasterNodeForSlot(0), NodeMatches(Node{.id = "other", .ip = "192.168.0.100", .port = 7000})); - EXPECT_THAT(config->GetOwnedSlots(), UnorderedElementsAre()); + EXPECT_TRUE(config->GetOwnedSlots().Empty()); } TEST_F(ClusterConfigTest, ConfigSetOkWithReplica) { @@ -114,14 +114,15 @@ TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) { .replicas = {{.id = "other-replica3", .ip = "192.168.0.105", .port = 7005}}}}); EXPECT_NE(config, nullptr); SlotSet owned_slots = config->GetOwnedSlots(); - EXPECT_EQ(owned_slots.size(), 5'000); + EXPECT_EQ(owned_slots.ToSlotRanges().size(), 1); + EXPECT_EQ(owned_slots.Count(), 5'000); { for (int i = 0; i <= 5'000; ++i) { EXPECT_THAT(config->GetMasterNodeForSlot(i), NodeMatches(Node{.id = "other-master", .ip = "192.168.0.100", .port = 7000})); EXPECT_FALSE(config->IsMySlot(i)); - EXPECT_FALSE(owned_slots.contains(i)); + EXPECT_FALSE(owned_slots.Contains(i)); } } { @@ -129,7 +130,7 @@ TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) { EXPECT_THAT(config->GetMasterNodeForSlot(i), NodeMatches(Node{.id = kMyId, .ip = "192.168.0.102", .port = 7002})); EXPECT_TRUE(config->IsMySlot(i)); - EXPECT_TRUE(owned_slots.contains(i)); + EXPECT_TRUE(owned_slots.Contains(i)); } } { @@ -137,7 +138,7 @@ TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) { EXPECT_THAT(config->GetMasterNodeForSlot(i), NodeMatches(Node{.id = "other-master3", .ip = "192.168.0.104", .port = 7004})); EXPECT_FALSE(config->IsMySlot(i)); - EXPECT_FALSE(owned_slots.contains(i)); + EXPECT_FALSE(owned_slots.Contains(i)); } } } diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index e858b9087..a14656104 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -37,7 +37,6 @@ using CI = CommandId; using ClusterShard = ClusterConfig::ClusterShard; using ClusterShards = ClusterConfig::ClusterShards; using Node = ClusterConfig::Node; -using SlotRange = ClusterConfig::SlotRange; constexpr char kIdNotFound[] = "syncid not found"; @@ -419,21 +418,11 @@ void ClusterFamily::DflyClusterMyId(CmdArgList args, ConnectionContext* cntx) { } namespace { -SlotSet GetDeletedSlots(bool is_first_config, const SlotSet& before, const SlotSet& after) { - SlotSet result; - for (SlotId id = 0; id <= ClusterConfig::kMaxSlotNum; ++id) { - if ((before.contains(id) || is_first_config) && !after.contains(id)) { - result.insert(id); - } - } - return result; -} - // Guards set configuration, so that we won't handle 2 in parallel. Mutex set_config_mu; void DeleteSlots(const SlotSet& slots) { - if (slots.empty()) { + if (slots.Empty()) { return; } @@ -448,16 +437,17 @@ void DeleteSlots(const SlotSet& slots) { } void WriteFlushSlotsToJournal(const SlotSet& slots) { - if (slots.empty()) { + if (slots.Empty()) { return; } // Build args vector args; - args.reserve(slots.size() + 1); + args.reserve(slots.Count() + 1); args.push_back("FLUSHSLOTS"); - for (const SlotId slot : slots) { - args.push_back(absl::StrCat(slot)); + for (SlotId slot = 0; slot <= SlotSet::kMaxSlot; ++slot) { + if (slots.Contains(slot)) + args.push_back(absl::StrCat(slot)); } // Build view @@ -510,12 +500,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) lock_guard gu(set_config_mu); - bool is_first_config = true; - SlotSet before; - if (tl_cluster_config != nullptr) { - is_first_config = false; - before = tl_cluster_config->GetOwnedSlots(); - } + SlotSet before = tl_cluster_config ? tl_cluster_config->GetOwnedSlots() : SlotSet(true); // Ignore blocked commands because we filter them with CancelBlockingOnThread DispatchTracker tracker{server_family_->GetListeners(), cntx->conn(), false /* ignore paused */, @@ -544,7 +529,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) SlotSet after = tl_cluster_config->GetOwnedSlots(); if (ServerState::tlocal()->is_master) { - auto deleted_slots = GetDeletedSlots(is_first_config, before, after); + auto deleted_slots = before.GetRemovedSlots(after); DeleteSlots(deleted_slots); WriteFlushSlotsToJournal(deleted_slots); } @@ -601,13 +586,12 @@ void ClusterFamily::DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* c void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx) { SlotSet slots; - slots.reserve(args.size()); for (size_t i = 0; i < args.size(); ++i) { unsigned slot; if (!absl::SimpleAtoi(ArgS(args, i), &slot) || (slot > ClusterConfig::kMaxSlotNum)) { return cntx->SendError(kSyntaxErrType); } - slots.insert(static_cast(slot)); + slots.Set(static_cast(slot), true); } DeleteSlots(slots); @@ -714,7 +698,6 @@ void ClusterFamily::DflyClusterMigrationFinalize(CmdArgList args, ConnectionCont } // TODO implement blocking on migrated slots only - [[maybe_unused]] const auto deleted_slots = ToSlotSet(migration->GetSlots()); bool is_block_active = true; auto is_pause_in_progress = [&is_block_active] { return is_block_active; }; @@ -761,7 +744,7 @@ void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) { } ClusterSlotMigration* ClusterFamily::AddMigration(std::string host_ip, uint16_t port, - std::vector slots) { + SlotRanges slots) { lock_guard lk(migration_mu_); for (const auto& mj : incoming_migrations_jobs_) { if (auto info = mj->GetInfo(); info.host == host_ip && info.port == port) { @@ -787,7 +770,7 @@ void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) { CmdArgParser parser{args}; auto port = parser.Next(); - std::vector slots; + SlotRanges slots; do { auto [slot_start, slot_end] = parser.Next(); slots.emplace_back(SlotRange{slot_start, slot_end}); @@ -821,7 +804,7 @@ void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) { } uint32_t ClusterFamily::CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port, - std::vector slots) { + SlotRanges slots) { std::lock_guard lk(migration_mu_); auto sync_id = next_sync_id_++; auto err_handler = [](const GenericError& err) { diff --git a/src/server/cluster/cluster_family.h b/src/server/cluster/cluster_family.h index 57c0f8905..ea4327cf0 100644 --- a/src/server/cluster/cluster_family.h +++ b/src/server/cluster/cluster_family.h @@ -76,14 +76,12 @@ class ClusterFamily { void DflyMigrateAck(CmdArgList args, ConnectionContext* cntx); // create a ClusterSlotMigration entity which will execute migration - ClusterSlotMigration* AddMigration(std::string host_ip, uint16_t port, - std::vector slots); + ClusterSlotMigration* AddMigration(std::string host_ip, uint16_t port, SlotRanges slots); void RemoveFinishedIncomingMigrations(); // store info about migration and create unique session id - uint32_t CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port, - std::vector slots); + uint32_t CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port, SlotRanges slots); std::shared_ptr GetOutgoingMigration(uint32_t sync_id); diff --git a/src/server/cluster/cluster_slot_migration.cc b/src/server/cluster/cluster_slot_migration.cc index db596dc06..674bbacad 100644 --- a/src/server/cluster/cluster_slot_migration.cc +++ b/src/server/cluster/cluster_slot_migration.cc @@ -41,7 +41,7 @@ atomic_uint32_t next_local_sync_id{1}; } // namespace ClusterSlotMigration::ClusterSlotMigration(ClusterFamily* cl_fm, string host_ip, uint16_t port, - Service* se, std::vector slots) + Service* se, SlotRanges slots) : ProtocolClient(std::move(host_ip), port), cluster_family_(cl_fm), service_(*se), @@ -145,8 +145,6 @@ void ClusterSlotMigration::MainMigrationFb() { if (IsFinalized()) { state_ = MigrationState::C_FINISHED; - const auto added_slots = ToSlotSet(slots_); - auto cmd = absl::StrCat("DFLYMIGRATE ACK ", sync_id_); VLOG(1) << "send " << cmd; diff --git a/src/server/cluster/cluster_slot_migration.h b/src/server/cluster/cluster_slot_migration.h index 1f6d0b953..994996d15 100644 --- a/src/server/cluster/cluster_slot_migration.h +++ b/src/server/cluster/cluster_slot_migration.h @@ -22,7 +22,7 @@ class ClusterSlotMigration : private ProtocolClient { }; ClusterSlotMigration(ClusterFamily* cl_fm, std::string host_ip, uint16_t port, Service* se, - std::vector slots); + SlotRanges slots); ~ClusterSlotMigration(); // Initiate connection with source node and create migration fiber @@ -46,7 +46,7 @@ class ClusterSlotMigration : private ProtocolClient { void Stop(); - const std::vector& GetSlots() const { + const SlotRanges& GetSlots() const { return slots_; } @@ -65,7 +65,7 @@ class ClusterSlotMigration : private ProtocolClient { Service& service_; Mutex flows_op_mu_; std::vector> shard_flows_; - std::vector slots_; + SlotRanges slots_; uint32_t source_shards_num_ = 0; uint32_t sync_id_ = 0; uint32_t local_sync_id_ = 0; diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index a20e56aba..8edd5f050 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -44,8 +44,7 @@ class OutgoingMigration::SliceSlotMigration { }; OutgoingMigration::OutgoingMigration(std::uint32_t flows_num, std::string ip, uint16_t port, - std::vector slots, - Context::ErrHandler err_handler) + SlotRanges slots, Context::ErrHandler err_handler) : host_ip_(ip), port_(port), slots_(slots), cntx_(err_handler), slot_migrations_(flows_num) { } @@ -53,13 +52,11 @@ OutgoingMigration::~OutgoingMigration() = default; void OutgoingMigration::StartFlow(DbSlice* slice, uint32_t sync_id, journal::Journal* journal, io::Sink* dest) { - SlotSet sset = ToSlotSet(slots_); - const auto shard_id = slice->shard_id(); std::lock_guard lck(flows_mu_); slot_migrations_[shard_id] = - std::make_unique(slice, std::move(sset), sync_id, journal, &cntx_, dest); + std::make_unique(slice, slots_, sync_id, journal, &cntx_, dest); } void OutgoingMigration::Finalize(uint32_t shard_id) { diff --git a/src/server/cluster/outgoing_slot_migration.h b/src/server/cluster/outgoing_slot_migration.h index 0d73e913d..33c226d82 100644 --- a/src/server/cluster/outgoing_slot_migration.h +++ b/src/server/cluster/outgoing_slot_migration.h @@ -20,8 +20,8 @@ class OutgoingMigration { public: OutgoingMigration() = default; ~OutgoingMigration(); - OutgoingMigration(std::uint32_t flows_num, std::string ip, uint16_t port, - std::vector slots, Context::ErrHandler err_handler); + OutgoingMigration(std::uint32_t flows_num, std::string ip, uint16_t port, SlotRanges slots, + Context::ErrHandler err_handler); void StartFlow(DbSlice* slice, uint32_t sync_id, journal::Journal* journal, io::Sink* dest); @@ -38,7 +38,7 @@ class OutgoingMigration { return port_; }; - const std::vector& GetSlots() const { + const SlotRanges& GetSlots() const { return slots_; } @@ -50,7 +50,7 @@ class OutgoingMigration { private: std::string host_ip_; uint16_t port_; - std::vector slots_; + SlotRanges slots_; Context cntx_; mutable Mutex flows_mu_; std::vector> slot_migrations_ ABSL_GUARDED_BY(flows_mu_); diff --git a/src/server/cluster/slot_set.h b/src/server/cluster/slot_set.h new file mode 100644 index 000000000..431c40286 --- /dev/null +++ b/src/server/cluster/slot_set.h @@ -0,0 +1,97 @@ +// Copyright 2024, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include +#include +#include + +namespace dfly { + +using SlotId = uint16_t; + +struct SlotRange { + SlotId start = 0; + SlotId end = 0; +}; + +using SlotRanges = std::vector; + +class SlotSet { + public: + static constexpr SlotId kMaxSlot = 0x3FFF; + static constexpr SlotId kSlotsNumber = kMaxSlot + 1; + + SlotSet(bool full_house = false) : slots_(std::make_unique()) { + if (full_house) + slots_->flip(); + } + + SlotSet(const SlotRanges& slot_ranges) : SlotSet() { + Set(slot_ranges, true); + } + + SlotSet(const SlotSet& s) : SlotSet() { + *slots_ = *s.slots_; + } + + bool Contains(SlotId slot) const { + return slots_->test(slot); + } + + void Set(const SlotRanges& slot_ranges, bool value) { + for (const auto& slot_range : slot_ranges) { + for (auto i = slot_range.start; i <= slot_range.end; ++i) { + slots_->set(i); + } + } + } + + void Set(SlotId slot, bool value) { + slots_->set(slot, value); + } + + bool Empty() const { + return slots_->none(); + } + + size_t Count() const { + return slots_->count(); + } + + bool All() const { + return slots_->all(); + } + + // Get SlotSet that are absent in the slots + SlotSet GetRemovedSlots(SlotSet slots) { + slots.slots_->flip(); + *slots.slots_ &= *slots_; + return slots; + } + + SlotRanges ToSlotRanges() const { + SlotRanges res; + + for (SlotId i = 0; i < kSlotsNumber; ++i) { + if (!slots_->test(i)) { + continue; + } else { + auto& range = res.emplace_back(SlotRange{i, i}); + for (++i; i < kSlotsNumber && slots_->test(i); ++i) { + range.end = i; + } + } + } + + return res; + } + + private: + using BitsetType = std::bitset; + std::unique_ptr slots_; +}; + +} // namespace dfly diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index b17b921fa..de3a64af8 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -702,7 +702,7 @@ void DbSlice::FlushSlotsFb(const SlotSet& slot_ids) { auto del_entry_cb = [&](PrimeTable::iterator it) { std::string_view key = it->first.GetSlice(&tmp); SlotId sid = ClusterConfig::KeySlot(key); - if (slot_ids.contains(sid) && it.GetVersion() < next_version) { + if (slot_ids.Contains(sid) && it.GetVersion() < next_version) { PerformDeletion(it, db_arr_[0].get()); } return true; @@ -1439,7 +1439,7 @@ void DbSlice::InvalidateDbWatches(DbIndex db_indx) { void DbSlice::InvalidateSlotWatches(const SlotSet& slot_ids) { for (const auto& [key, conn_list] : db_arr_[0]->watched_keys) { SlotId sid = ClusterConfig::KeySlot(key); - if (!slot_ids.contains(sid)) { + if (!slot_ids.Contains(sid)) { continue; } for (auto conn_ptr : conn_list) { diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 82f47c5cc..0f8659196 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -642,8 +642,8 @@ optional DebugCmd::ParsePopulateArgs(CmdArgList args) cntx_->SendError(end.status()); return nullopt; } - options.slot_range = ClusterConfig::SlotRange{.start = static_cast(start.value()), - .end = static_cast(end.value())}; + options.slot_range = SlotRange{.start = static_cast(start.value()), + .end = static_cast(end.value())}; } else { cntx_->SendError(kSyntaxErr); diff --git a/src/server/debugcmd.h b/src/server/debugcmd.h index 8dbfef3bd..034af15b3 100644 --- a/src/server/debugcmd.h +++ b/src/server/debugcmd.h @@ -22,7 +22,7 @@ class DebugCmd { std::string_view type{"STRING"}; uint32_t elements = 1; - std::optional slot_range; + std::optional slot_range; }; public: diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 37e80ee7c..95a519b66 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -133,7 +133,7 @@ bool RestoreStreamer::ShouldWrite(std::string_view key) const { } bool RestoreStreamer::ShouldWrite(SlotId slot_id) const { - return my_slots_.contains(slot_id); + return my_slots_.Contains(slot_id); } bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {