refactor: create one type for slots set #2459 (#2645)

* refactor: create one type for slot ranges #2459
This commit is contained in:
Borys 2024-02-23 14:10:42 +02:00 committed by GitHub
parent bcae2dfb46
commit 8771ab32a6
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
14 changed files with 148 additions and 111 deletions

View file

@ -4,6 +4,8 @@ extern "C" {
#include "redis/crc16.h"
}
#include <absl/container/flat_hash_set.h>
#include <jsoncons/json.hpp>
#include <shared_mutex>
#include <string_view>
@ -152,11 +154,7 @@ shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
shard.master.id == my_id || any_of(shard.replicas.begin(), shard.replicas.end(),
[&](const Node& node) { return node.id == my_id; });
if (owned_by_me) {
for (const auto& slot_range : shard.slot_ranges) {
for (SlotId i = slot_range.start; i <= slot_range.end; ++i) {
result->my_slots_.set(i);
}
}
result->my_slots_.Set(shard.slot_ranges, true);
}
}
@ -175,13 +173,13 @@ template <typename T> optional<T> ReadNumeric(const JsonType& obj) {
return obj.as<T>();
}
optional<vector<ClusterConfig::SlotRange>> GetClusterSlotRanges(const JsonType& slots) {
optional<SlotRanges> GetClusterSlotRanges(const JsonType& slots) {
if (!slots.is_array()) {
LOG(WARNING) << kInvalidConfigPrefix << "slot_ranges is not an array " << slots;
return nullopt;
}
vector<ClusterConfig::SlotRange> ranges;
SlotRanges ranges;
for (const auto& range : slots.array_range()) {
if (!range.is_object()) {
@ -301,22 +299,17 @@ shared_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
std::shared_ptr<ClusterConfig> ClusterConfig::CloneWithChanges(const std::vector<SlotRange>& slots,
bool enable) const {
auto new_config = std::make_shared<ClusterConfig>(*this);
auto slot_set = ToSlotSet(slots);
for (const auto s : slot_set) {
new_config->my_slots_.set(s, enable);
}
new_config->my_slots_.Set(slots, enable);
return new_config;
}
bool ClusterConfig::IsMySlot(SlotId id) const {
if (id >= my_slots_.size()) {
if (id > ClusterConfig::kMaxSlotNum) {
DCHECK(false) << "Requesting a non-existing slot id " << id;
return false;
}
return my_slots_.test(id);
return my_slots_.Contains(id);
}
bool ClusterConfig::IsMySlot(std::string_view key) const {
@ -324,7 +317,7 @@ bool ClusterConfig::IsMySlot(std::string_view key) const {
}
ClusterConfig::Node ClusterConfig::GetMasterNodeForSlot(SlotId id) const {
CHECK_LT(id, my_slots_.size()) << "Requesting a non-existing slot id " << id;
CHECK_LE(id, ClusterConfig::kMaxSlotNum) << "Requesting a non-existing slot id " << id;
for (const auto& shard : config_) {
for (const auto& range : shard.slot_ranges) {
@ -342,23 +335,8 @@ ClusterConfig::ClusterShards ClusterConfig::GetConfig() const {
return config_;
}
SlotSet ClusterConfig::GetOwnedSlots() const {
SlotSet set;
for (SlotId id = 0; id <= kMaxSlotNum; ++id) {
if (IsMySlot(id)) {
set.insert(id);
}
}
return set;
}
SlotSet ToSlotSet(const std::vector<ClusterConfig::SlotRange>& slots) {
SlotSet sset;
for (const auto& slot_range : slots) {
for (auto i = slot_range.start; i <= slot_range.end; ++i)
sset.insert(i);
}
return sset;
const SlotSet& ClusterConfig::GetOwnedSlots() const {
return my_slots_;
}
} // namespace dfly

View file

@ -4,24 +4,17 @@
#pragma once
#include <absl/container/flat_hash_set.h>
#include <array>
#include <bitset>
#include <memory>
#include <string_view>
#include <vector>
#include "core/json/json_object.h"
#include "src/core/fibers.h"
#include "src/server/cluster/slot_set.h"
#include "src/server/common.h"
namespace dfly {
using SlotId = uint16_t;
// TODO consider to use bit set or some more compact way to store SlotId
using SlotSet = absl::flat_hash_set<SlotId>;
// MigrationState constants are ordered in state changing order
enum class MigrationState : uint8_t {
C_NO_STATE,
@ -43,13 +36,8 @@ class ClusterConfig {
uint16_t port = 0;
};
struct SlotRange {
SlotId start = 0;
SlotId end = 0;
};
struct ClusterShard {
std::vector<SlotRange> slot_ranges;
SlotRanges slot_ranges;
Node master;
std::vector<Node> replicas;
};
@ -94,7 +82,7 @@ class ClusterConfig {
ClusterShards GetConfig() const;
SlotSet GetOwnedSlots() const;
const SlotSet& GetOwnedSlots() const;
private:
struct SlotEntry {
@ -106,10 +94,7 @@ class ClusterConfig {
ClusterShards config_;
// True bits in `my_slots_` indicate that this slot is owned by this node.
std::bitset<kMaxSlotNum + 1> my_slots_;
SlotSet my_slots_;
};
SlotSet ToSlotSet(const std::vector<ClusterConfig::SlotRange>& slots);
} // namespace dfly

View file

@ -88,7 +88,7 @@ TEST_F(ClusterConfigTest, ConfigSetOk) {
EXPECT_NE(config, nullptr);
EXPECT_THAT(config->GetMasterNodeForSlot(0),
NodeMatches(Node{.id = "other", .ip = "192.168.0.100", .port = 7000}));
EXPECT_THAT(config->GetOwnedSlots(), UnorderedElementsAre());
EXPECT_TRUE(config->GetOwnedSlots().Empty());
}
TEST_F(ClusterConfigTest, ConfigSetOkWithReplica) {
@ -114,14 +114,15 @@ TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) {
.replicas = {{.id = "other-replica3", .ip = "192.168.0.105", .port = 7005}}}});
EXPECT_NE(config, nullptr);
SlotSet owned_slots = config->GetOwnedSlots();
EXPECT_EQ(owned_slots.size(), 5'000);
EXPECT_EQ(owned_slots.ToSlotRanges().size(), 1);
EXPECT_EQ(owned_slots.Count(), 5'000);
{
for (int i = 0; i <= 5'000; ++i) {
EXPECT_THAT(config->GetMasterNodeForSlot(i),
NodeMatches(Node{.id = "other-master", .ip = "192.168.0.100", .port = 7000}));
EXPECT_FALSE(config->IsMySlot(i));
EXPECT_FALSE(owned_slots.contains(i));
EXPECT_FALSE(owned_slots.Contains(i));
}
}
{
@ -129,7 +130,7 @@ TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) {
EXPECT_THAT(config->GetMasterNodeForSlot(i),
NodeMatches(Node{.id = kMyId, .ip = "192.168.0.102", .port = 7002}));
EXPECT_TRUE(config->IsMySlot(i));
EXPECT_TRUE(owned_slots.contains(i));
EXPECT_TRUE(owned_slots.Contains(i));
}
}
{
@ -137,7 +138,7 @@ TEST_F(ClusterConfigTest, ConfigSetMultipleInstances) {
EXPECT_THAT(config->GetMasterNodeForSlot(i),
NodeMatches(Node{.id = "other-master3", .ip = "192.168.0.104", .port = 7004}));
EXPECT_FALSE(config->IsMySlot(i));
EXPECT_FALSE(owned_slots.contains(i));
EXPECT_FALSE(owned_slots.Contains(i));
}
}
}

View file

@ -37,7 +37,6 @@ using CI = CommandId;
using ClusterShard = ClusterConfig::ClusterShard;
using ClusterShards = ClusterConfig::ClusterShards;
using Node = ClusterConfig::Node;
using SlotRange = ClusterConfig::SlotRange;
constexpr char kIdNotFound[] = "syncid not found";
@ -419,21 +418,11 @@ void ClusterFamily::DflyClusterMyId(CmdArgList args, ConnectionContext* cntx) {
}
namespace {
SlotSet GetDeletedSlots(bool is_first_config, const SlotSet& before, const SlotSet& after) {
SlotSet result;
for (SlotId id = 0; id <= ClusterConfig::kMaxSlotNum; ++id) {
if ((before.contains(id) || is_first_config) && !after.contains(id)) {
result.insert(id);
}
}
return result;
}
// Guards set configuration, so that we won't handle 2 in parallel.
Mutex set_config_mu;
void DeleteSlots(const SlotSet& slots) {
if (slots.empty()) {
if (slots.Empty()) {
return;
}
@ -448,16 +437,17 @@ void DeleteSlots(const SlotSet& slots) {
}
void WriteFlushSlotsToJournal(const SlotSet& slots) {
if (slots.empty()) {
if (slots.Empty()) {
return;
}
// Build args
vector<string> args;
args.reserve(slots.size() + 1);
args.reserve(slots.Count() + 1);
args.push_back("FLUSHSLOTS");
for (const SlotId slot : slots) {
args.push_back(absl::StrCat(slot));
for (SlotId slot = 0; slot <= SlotSet::kMaxSlot; ++slot) {
if (slots.Contains(slot))
args.push_back(absl::StrCat(slot));
}
// Build view
@ -510,12 +500,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
lock_guard gu(set_config_mu);
bool is_first_config = true;
SlotSet before;
if (tl_cluster_config != nullptr) {
is_first_config = false;
before = tl_cluster_config->GetOwnedSlots();
}
SlotSet before = tl_cluster_config ? tl_cluster_config->GetOwnedSlots() : SlotSet(true);
// Ignore blocked commands because we filter them with CancelBlockingOnThread
DispatchTracker tracker{server_family_->GetListeners(), cntx->conn(), false /* ignore paused */,
@ -544,7 +529,7 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
SlotSet after = tl_cluster_config->GetOwnedSlots();
if (ServerState::tlocal()->is_master) {
auto deleted_slots = GetDeletedSlots(is_first_config, before, after);
auto deleted_slots = before.GetRemovedSlots(after);
DeleteSlots(deleted_slots);
WriteFlushSlotsToJournal(deleted_slots);
}
@ -601,13 +586,12 @@ void ClusterFamily::DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* c
void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx) {
SlotSet slots;
slots.reserve(args.size());
for (size_t i = 0; i < args.size(); ++i) {
unsigned slot;
if (!absl::SimpleAtoi(ArgS(args, i), &slot) || (slot > ClusterConfig::kMaxSlotNum)) {
return cntx->SendError(kSyntaxErrType);
}
slots.insert(static_cast<SlotId>(slot));
slots.Set(static_cast<SlotId>(slot), true);
}
DeleteSlots(slots);
@ -714,7 +698,6 @@ void ClusterFamily::DflyClusterMigrationFinalize(CmdArgList args, ConnectionCont
}
// TODO implement blocking on migrated slots only
[[maybe_unused]] const auto deleted_slots = ToSlotSet(migration->GetSlots());
bool is_block_active = true;
auto is_pause_in_progress = [&is_block_active] { return is_block_active; };
@ -761,7 +744,7 @@ void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) {
}
ClusterSlotMigration* ClusterFamily::AddMigration(std::string host_ip, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots) {
SlotRanges slots) {
lock_guard lk(migration_mu_);
for (const auto& mj : incoming_migrations_jobs_) {
if (auto info = mj->GetInfo(); info.host == host_ip && info.port == port) {
@ -787,7 +770,7 @@ void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) {
CmdArgParser parser{args};
auto port = parser.Next<uint16_t>();
std::vector<ClusterConfig::SlotRange> slots;
SlotRanges slots;
do {
auto [slot_start, slot_end] = parser.Next<SlotId, SlotId>();
slots.emplace_back(SlotRange{slot_start, slot_end});
@ -821,7 +804,7 @@ void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) {
}
uint32_t ClusterFamily::CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots) {
SlotRanges slots) {
std::lock_guard lk(migration_mu_);
auto sync_id = next_sync_id_++;
auto err_handler = [](const GenericError& err) {

View file

@ -76,14 +76,12 @@ class ClusterFamily {
void DflyMigrateAck(CmdArgList args, ConnectionContext* cntx);
// create a ClusterSlotMigration entity which will execute migration
ClusterSlotMigration* AddMigration(std::string host_ip, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots);
ClusterSlotMigration* AddMigration(std::string host_ip, uint16_t port, SlotRanges slots);
void RemoveFinishedIncomingMigrations();
// store info about migration and create unique session id
uint32_t CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots);
uint32_t CreateOutgoingMigration(ConnectionContext* cntx, uint16_t port, SlotRanges slots);
std::shared_ptr<OutgoingMigration> GetOutgoingMigration(uint32_t sync_id);

View file

@ -41,7 +41,7 @@ atomic_uint32_t next_local_sync_id{1};
} // namespace
ClusterSlotMigration::ClusterSlotMigration(ClusterFamily* cl_fm, string host_ip, uint16_t port,
Service* se, std::vector<ClusterConfig::SlotRange> slots)
Service* se, SlotRanges slots)
: ProtocolClient(std::move(host_ip), port),
cluster_family_(cl_fm),
service_(*se),
@ -145,8 +145,6 @@ void ClusterSlotMigration::MainMigrationFb() {
if (IsFinalized()) {
state_ = MigrationState::C_FINISHED;
const auto added_slots = ToSlotSet(slots_);
auto cmd = absl::StrCat("DFLYMIGRATE ACK ", sync_id_);
VLOG(1) << "send " << cmd;

View file

@ -22,7 +22,7 @@ class ClusterSlotMigration : private ProtocolClient {
};
ClusterSlotMigration(ClusterFamily* cl_fm, std::string host_ip, uint16_t port, Service* se,
std::vector<ClusterConfig::SlotRange> slots);
SlotRanges slots);
~ClusterSlotMigration();
// Initiate connection with source node and create migration fiber
@ -46,7 +46,7 @@ class ClusterSlotMigration : private ProtocolClient {
void Stop();
const std::vector<ClusterConfig::SlotRange>& GetSlots() const {
const SlotRanges& GetSlots() const {
return slots_;
}
@ -65,7 +65,7 @@ class ClusterSlotMigration : private ProtocolClient {
Service& service_;
Mutex flows_op_mu_;
std::vector<std::unique_ptr<ClusterShardMigration>> shard_flows_;
std::vector<ClusterConfig::SlotRange> slots_;
SlotRanges slots_;
uint32_t source_shards_num_ = 0;
uint32_t sync_id_ = 0;
uint32_t local_sync_id_ = 0;

View file

@ -44,8 +44,7 @@ class OutgoingMigration::SliceSlotMigration {
};
OutgoingMigration::OutgoingMigration(std::uint32_t flows_num, std::string ip, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots,
Context::ErrHandler err_handler)
SlotRanges slots, Context::ErrHandler err_handler)
: host_ip_(ip), port_(port), slots_(slots), cntx_(err_handler), slot_migrations_(flows_num) {
}
@ -53,13 +52,11 @@ OutgoingMigration::~OutgoingMigration() = default;
void OutgoingMigration::StartFlow(DbSlice* slice, uint32_t sync_id, journal::Journal* journal,
io::Sink* dest) {
SlotSet sset = ToSlotSet(slots_);
const auto shard_id = slice->shard_id();
std::lock_guard lck(flows_mu_);
slot_migrations_[shard_id] =
std::make_unique<SliceSlotMigration>(slice, std::move(sset), sync_id, journal, &cntx_, dest);
std::make_unique<SliceSlotMigration>(slice, slots_, sync_id, journal, &cntx_, dest);
}
void OutgoingMigration::Finalize(uint32_t shard_id) {

View file

@ -20,8 +20,8 @@ class OutgoingMigration {
public:
OutgoingMigration() = default;
~OutgoingMigration();
OutgoingMigration(std::uint32_t flows_num, std::string ip, uint16_t port,
std::vector<ClusterConfig::SlotRange> slots, Context::ErrHandler err_handler);
OutgoingMigration(std::uint32_t flows_num, std::string ip, uint16_t port, SlotRanges slots,
Context::ErrHandler err_handler);
void StartFlow(DbSlice* slice, uint32_t sync_id, journal::Journal* journal, io::Sink* dest);
@ -38,7 +38,7 @@ class OutgoingMigration {
return port_;
};
const std::vector<ClusterConfig::SlotRange>& GetSlots() const {
const SlotRanges& GetSlots() const {
return slots_;
}
@ -50,7 +50,7 @@ class OutgoingMigration {
private:
std::string host_ip_;
uint16_t port_;
std::vector<ClusterConfig::SlotRange> slots_;
SlotRanges slots_;
Context cntx_;
mutable Mutex flows_mu_;
std::vector<std::unique_ptr<SliceSlotMigration>> slot_migrations_ ABSL_GUARDED_BY(flows_mu_);

View file

@ -0,0 +1,97 @@
// Copyright 2024, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <bitset>
#include <memory>
#include <vector>
namespace dfly {
using SlotId = uint16_t;
struct SlotRange {
SlotId start = 0;
SlotId end = 0;
};
using SlotRanges = std::vector<SlotRange>;
class SlotSet {
public:
static constexpr SlotId kMaxSlot = 0x3FFF;
static constexpr SlotId kSlotsNumber = kMaxSlot + 1;
SlotSet(bool full_house = false) : slots_(std::make_unique<BitsetType>()) {
if (full_house)
slots_->flip();
}
SlotSet(const SlotRanges& slot_ranges) : SlotSet() {
Set(slot_ranges, true);
}
SlotSet(const SlotSet& s) : SlotSet() {
*slots_ = *s.slots_;
}
bool Contains(SlotId slot) const {
return slots_->test(slot);
}
void Set(const SlotRanges& slot_ranges, bool value) {
for (const auto& slot_range : slot_ranges) {
for (auto i = slot_range.start; i <= slot_range.end; ++i) {
slots_->set(i);
}
}
}
void Set(SlotId slot, bool value) {
slots_->set(slot, value);
}
bool Empty() const {
return slots_->none();
}
size_t Count() const {
return slots_->count();
}
bool All() const {
return slots_->all();
}
// Get SlotSet that are absent in the slots
SlotSet GetRemovedSlots(SlotSet slots) {
slots.slots_->flip();
*slots.slots_ &= *slots_;
return slots;
}
SlotRanges ToSlotRanges() const {
SlotRanges res;
for (SlotId i = 0; i < kSlotsNumber; ++i) {
if (!slots_->test(i)) {
continue;
} else {
auto& range = res.emplace_back(SlotRange{i, i});
for (++i; i < kSlotsNumber && slots_->test(i); ++i) {
range.end = i;
}
}
}
return res;
}
private:
using BitsetType = std::bitset<kSlotsNumber>;
std::unique_ptr<BitsetType> slots_;
};
} // namespace dfly

View file

@ -702,7 +702,7 @@ void DbSlice::FlushSlotsFb(const SlotSet& slot_ids) {
auto del_entry_cb = [&](PrimeTable::iterator it) {
std::string_view key = it->first.GetSlice(&tmp);
SlotId sid = ClusterConfig::KeySlot(key);
if (slot_ids.contains(sid) && it.GetVersion() < next_version) {
if (slot_ids.Contains(sid) && it.GetVersion() < next_version) {
PerformDeletion(it, db_arr_[0].get());
}
return true;
@ -1439,7 +1439,7 @@ void DbSlice::InvalidateDbWatches(DbIndex db_indx) {
void DbSlice::InvalidateSlotWatches(const SlotSet& slot_ids) {
for (const auto& [key, conn_list] : db_arr_[0]->watched_keys) {
SlotId sid = ClusterConfig::KeySlot(key);
if (!slot_ids.contains(sid)) {
if (!slot_ids.Contains(sid)) {
continue;
}
for (auto conn_ptr : conn_list) {

View file

@ -642,8 +642,8 @@ optional<DebugCmd::PopulateOptions> DebugCmd::ParsePopulateArgs(CmdArgList args)
cntx_->SendError(end.status());
return nullopt;
}
options.slot_range = ClusterConfig::SlotRange{.start = static_cast<SlotId>(start.value()),
.end = static_cast<SlotId>(end.value())};
options.slot_range = SlotRange{.start = static_cast<SlotId>(start.value()),
.end = static_cast<SlotId>(end.value())};
} else {
cntx_->SendError(kSyntaxErr);

View file

@ -22,7 +22,7 @@ class DebugCmd {
std::string_view type{"STRING"};
uint32_t elements = 1;
std::optional<ClusterConfig::SlotRange> slot_range;
std::optional<SlotRange> slot_range;
};
public:

View file

@ -133,7 +133,7 @@ bool RestoreStreamer::ShouldWrite(std::string_view key) const {
}
bool RestoreStreamer::ShouldWrite(SlotId slot_id) const {
return my_slots_.contains(slot_id);
return my_slots_.Contains(slot_id);
}
bool RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {