feat(cluster): Use thread-local cluster config (#1361)

To support this, I refactored some of the code:

* We no longer have `IsConfigured()` and `SetConfig()`, now
  configuration validation is done via instantiation
* As a result, we check if `tl_cluster_config` is `nullptr` or not to
  determine whether the cluster has been configured
* Reduce the size of the config by only storing 1 bit per slot (whether it's owned locally or not)
* Pushing new configuration is done via copy-c'tor

While at it, add a small test function to remove `sleep()`s and wait in a less fragile way.

Fixes #1357
This commit is contained in:
Chaka 2023-06-08 15:18:06 +03:00 committed by GitHub
parent 891155c927
commit ff338bebe2
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 496 additions and 393 deletions

View file

@ -1,3 +1,5 @@
#include <optional>
extern "C" {
#include "redis/crc16.h"
}
@ -32,10 +34,6 @@ SlotId ClusterConfig::KeySlot(string_view key) {
return crc16(tag.data(), tag.length()) & kMaxSlotNum;
}
ClusterConfig::ClusterConfig(string_view my_id) : my_id_(my_id) {
cluster_enabled = true;
}
namespace {
bool HasValidNodeIds(const ClusterConfig::ClusterShards& new_config) {
absl::flat_hash_set<string_view> nodes;
@ -60,11 +58,10 @@ bool HasValidNodeIds(const ClusterConfig::ClusterShards& new_config) {
return true;
}
} // namespace
bool ClusterConfig::IsConfigValid(const ClusterShards& new_config) {
bool IsConfigValid(const ClusterConfig::ClusterShards& new_config) {
// Make sure that all slots are set exactly once.
array<bool, tuple_size<decltype(slots_)>::value> slots_found = {};
array<bool, ClusterConfig::kMaxSlotNum + 1> slots_found = {};
if (!HasValidNodeIds(new_config)) {
return false;
@ -103,38 +100,33 @@ bool ClusterConfig::IsConfigValid(const ClusterShards& new_config) {
return true;
}
} // namespace
optional<SlotSet> ClusterConfig::SetConfig(const vector<ClusterShard>& new_config) {
if (!IsConfigValid(new_config)) {
return nullopt;
/* static */
unique_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
const ClusterShards& config) {
if (!IsConfigValid(config)) {
return nullptr;
}
// When set config is called the first time, deleted_slots will contain all slots which are not
// allocated to this node. This makes sure that if there is data in server that was loaded from
// disk (rdb_load), then after the call to set config the server data will contain only data from
// the node owned slots.
bool is_first_config = !IsConfigured();
unique_ptr<ClusterConfig> result(new ClusterConfig());
lock_guard gu(mu_);
result->config_ = config;
config_ = new_config;
SlotSet deleted_slots;
for (const auto& shard : config_) {
for (const auto& slot_range : shard.slot_ranges) {
bool owned_by_me =
shard.master.id == my_id_ || any_of(shard.replicas.begin(), shard.replicas.end(),
[&](const Node& node) { return node.id == my_id_; });
for (SlotId i = slot_range.start; i <= slot_range.end; ++i) {
if ((slots_[i].owned_by_me || is_first_config) && !owned_by_me) {
deleted_slots.insert(i);
for (const auto& shard : result->config_) {
bool owned_by_me =
shard.master.id == my_id || any_of(shard.replicas.begin(), shard.replicas.end(),
[&](const Node& node) { return node.id == my_id; });
if (owned_by_me) {
for (const auto& slot_range : shard.slot_ranges) {
for (SlotId i = slot_range.start; i <= slot_range.end; ++i) {
result->my_slots_.set(i);
}
slots_[i] = {.shard = &shard, .owned_by_me = owned_by_me};
}
}
}
return deleted_slots;
return result;
}
namespace {
@ -261,43 +253,53 @@ optional<ClusterConfig::ClusterShards> BuildClusterConfigFromJson(const JsonType
}
} // namespace
optional<SlotSet> ClusterConfig::SetConfig(const JsonType& json) {
optional<ClusterShards> config = BuildClusterConfigFromJson(json);
/* static */
unique_ptr<ClusterConfig> ClusterConfig::CreateFromConfig(string_view my_id,
const JsonType& json_config) {
optional<ClusterShards> config = BuildClusterConfigFromJson(json_config);
if (!config.has_value()) {
return nullopt;
return nullptr;
}
return SetConfig(config.value());
return CreateFromConfig(my_id, config.value());
}
bool ClusterConfig::IsMySlot(SlotId id) const {
if (id >= slots_.size()) {
if (id >= my_slots_.size()) {
DCHECK(false) << "Requesting a non-existing slot id " << id;
return false;
}
shared_lock gu(mu_);
return slots_[id].owned_by_me;
return my_slots_.test(id);
}
ClusterConfig::Node ClusterConfig::GetMasterNodeForSlot(SlotId id) const {
shared_lock gu(mu_);
CHECK_LT(id, my_slots_.size()) << "Requesting a non-existing slot id " << id;
CHECK_LT(id, slots_.size()) << "Requesting a non-existing slot id " << id;
CHECK_NE(slots_[id].shard, nullptr)
<< "Calling GetMasterNodeForSlot(" << id << ") before SetConfig()";
for (const auto& shard : config_) {
for (const auto& range : shard.slot_ranges) {
if (id >= range.start && id <= range.end) {
return shard.master;
}
}
}
return slots_[id].shard->master;
DCHECK(false) << "Can't find master node for slot " << id;
return {};
}
ClusterConfig::ClusterShards ClusterConfig::GetConfig() const {
shared_lock gu(mu_);
return config_;
}
bool ClusterConfig::IsConfigured() const {
shared_lock gu(mu_);
return !config_.empty();
SlotSet ClusterConfig::GetOwnedSlots() const {
SlotSet set;
for (SlotId id = 0; id <= kMaxSlotNum; ++id) {
if (IsMySlot(id)) {
set.insert(id);
}
}
return set;
}
} // namespace dfly