test(cluster-migration): Fix some bugs and add cluster migration fuzzy tests (#2572)

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
Co-authored-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Shahar Mike 2024-02-12 13:47:34 +02:00 committed by GitHub
parent 6cd2f05a22
commit 6d11f86091
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 231 additions and 29 deletions

View file

@ -798,7 +798,7 @@ void ClusterFamily::RemoveOutgoingMigration(uint32_t sync_id) {
}
void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "Create slot migration config";
VLOG(1) << "Create slot migration config " << args;
CmdArgParser parser{args};
auto port = parser.Next<uint16_t>();
@ -812,8 +812,7 @@ void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendError(err->MakeReply());
if (!tl_cluster_config) {
cntx->SendError(kClusterNotConfigured);
return;
return cntx->SendError(kClusterNotConfigured);
}
for (const auto& migration_range : slots) {
@ -821,8 +820,7 @@ void ClusterFamily::MigrationConf(CmdArgList args, ConnectionContext* cntx) {
if (!tl_cluster_config->IsMySlot(i)) {
VLOG(1) << "Invalid migration slot " << i << " in range " << migration_range.start << ':'
<< migration_range.end;
cntx->SendError("Invalid slots range");
return;
return cntx->SendError("Invalid slots range");
}
}
}
@ -883,6 +881,7 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {
}
void ClusterFamily::DflyMigrateFullSyncCut(CmdArgList args, ConnectionContext* cntx) {
CHECK(cntx->slot_migration_id != 0);
CmdArgParser parser{args};
auto [sync_id, shard_id] = parser.Next<uint32_t, uint32_t>();
@ -894,9 +893,9 @@ void ClusterFamily::DflyMigrateFullSyncCut(CmdArgList args, ConnectionContext* c
<< " sync_id: " << sync_id << " shard_id: " << shard_id << " shard";
std::lock_guard lck(migration_mu_);
auto migration_it =
std::find_if(incoming_migrations_jobs_.begin(), incoming_migrations_jobs_.end(),
[sync_id = sync_id](const auto& el) { return el->GetSyncId() == sync_id; });
auto migration_it = std::find_if(
incoming_migrations_jobs_.begin(), incoming_migrations_jobs_.end(),
[cntx](const auto& el) { return cntx->slot_migration_id == el->GetLocalSyncId(); });
if (migration_it == incoming_migrations_jobs_.end()) {
LOG(WARNING) << "Couldn't find migration id";

View file

@ -89,7 +89,7 @@ class ClusterFamily {
std::vector<std::unique_ptr<ClusterSlotMigration>> incoming_migrations_jobs_
ABSL_GUARDED_BY(migration_mu_);
uint32_t next_sync_id_ = 1;
uint32_t next_sync_id_ ABSL_GUARDED_BY(migration_mu_) = 1;
// holds all outgoing slots migrations that are currently in progress
using OutgoingMigrationMap = absl::btree_map<uint32_t, std::shared_ptr<OutgoingMigration>>;
OutgoingMigrationMap outgoing_migration_jobs_;

View file

@ -21,10 +21,11 @@ using namespace facade;
using namespace util;
using absl::GetFlag;
ClusterShardMigration::ClusterShardMigration(ServerContext server_context, uint32_t shard_id,
uint32_t sync_id, Service* service)
ClusterShardMigration::ClusterShardMigration(ServerContext server_context, uint32_t local_sync_id,
uint32_t shard_id, uint32_t sync_id, Service* service)
: ProtocolClient(server_context), source_shard_id_(shard_id), sync_id_(sync_id) {
executor_ = std::make_unique<JournalExecutor>(service);
executor_->connection_context()->slot_migration_id = local_sync_id;
}
ClusterShardMigration::~ClusterShardMigration() {

View file

@ -17,8 +17,8 @@ class MultiShardExecution;
// It is created per shard on the target node to initiate FLOW step.
class ClusterShardMigration : public ProtocolClient {
public:
ClusterShardMigration(ServerContext server_context, uint32_t shard_id, uint32_t sync_id,
Service* service);
ClusterShardMigration(ServerContext server_context, uint32_t local_sync_id, uint32_t shard_id,
uint32_t sync_id, Service* service);
~ClusterShardMigration();
std::error_code StartSyncFlow(Context* cntx);

View file

@ -35,11 +35,14 @@ vector<vector<unsigned>> Partition(unsigned num_flows) {
return partition;
}
atomic_uint32_t next_local_sync_id{1};
} // namespace
ClusterSlotMigration::ClusterSlotMigration(string host_ip, uint16_t port, Service* se,
std::vector<ClusterConfig::SlotRange> slots)
: ProtocolClient(std::move(host_ip), port), service_(*se), slots_(std::move(slots)) {
local_sync_id_ = next_local_sync_id.fetch_add(1);
}
ClusterSlotMigration::~ClusterSlotMigration() {
@ -125,7 +128,7 @@ void ClusterSlotMigration::Stop() {
}
void ClusterSlotMigration::MainMigrationFb() {
VLOG(1) << "Main migration fiber started";
VLOG(1) << "Main migration fiber started " << sync_id_;
state_ = MigrationState::C_FULL_SYNC;
@ -143,7 +146,8 @@ void ClusterSlotMigration::MainMigrationFb() {
std::error_code ClusterSlotMigration::InitiateSlotsMigration() {
shard_flows_.resize(source_shards_num_);
for (unsigned i = 0; i < source_shards_num_; ++i) {
shard_flows_[i].reset(new ClusterShardMigration(server(), i, sync_id_, &service_));
shard_flows_[i].reset(
new ClusterShardMigration(server(), local_sync_id_, i, sync_id_, &service_));
}
absl::Cleanup cleanup = [this]() {

View file

@ -31,6 +31,12 @@ class ClusterSlotMigration : ProtocolClient {
return sync_id_;
}
// Remote sync ids uniquely identifies a sync *remotely*. However, multiple remote sources can
// use the same id, so we need a local id as well.
uint32_t GetLocalSyncId() const {
return local_sync_id_;
}
MigrationState GetState() const {
return state_;
}
@ -56,6 +62,7 @@ class ClusterSlotMigration : ProtocolClient {
std::vector<ClusterConfig::SlotRange> slots_;
uint32_t source_shards_num_ = 0;
uint32_t sync_id_ = 0;
uint32_t local_sync_id_ = 0;
MigrationState state_ = MigrationState::C_NO_STATE;
Fiber sync_fb_;

View file

@ -184,6 +184,7 @@ class ConnectionContext : public facade::ConnectionContext {
Transaction* transaction = nullptr;
const CommandId* cid = nullptr;
ConnectionState conn_state;
uint32_t slot_migration_id = 0; // Which slot migration ID belongs to this connection
DbIndex db_index() const {
return conn_state.db_index;

View file

@ -26,6 +26,10 @@ class JournalExecutor {
void FlushAll(); // Execute FLUSHALL.
ConnectionContext* connection_context() {
return &conn_context_;
}
private:
void Execute(journal::ParsedEntry::CmdData& cmd);

View file

@ -75,7 +75,8 @@ void RestoreStreamer::Start(io::Sink* dest) {
if (fiber_cancellation_.IsCancelled())
return;
cursor = pt->Traverse(cursor, absl::bind_front(&RestoreStreamer::WriteBucket, this));
cursor =
pt->Traverse(cursor, [this](PrimeTable::bucket_iterator it) { WriteBucket(it, true); });
++last_yield;
if (last_yield >= 100) {
@ -101,6 +102,10 @@ void RestoreStreamer::SendFinalize() {
NotifyWritten(true);
}
RestoreStreamer::~RestoreStreamer() {
CHECK(!snapshot_fb_.IsJoinable());
}
void RestoreStreamer::Cancel() {
fiber_cancellation_.Cancel();
snapshot_fb_.JoinIfNeeded();
@ -124,7 +129,7 @@ bool RestoreStreamer::ShouldWrite(SlotId slot_id) const {
return my_slots_.contains(slot_id);
}
void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it, bool allow_await) {
bool is_data_present = false;
if (it.GetVersion() < snapshot_version_) {
@ -149,7 +154,7 @@ void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) {
}
if (is_data_present)
NotifyWritten(true);
NotifyWritten(allow_await);
}
void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
@ -159,12 +164,12 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req
PrimeTable* table = db_slice_->GetTables(0).first;
if (const PrimeTable::bucket_iterator* bit = req.update()) {
WriteBucket(*bit);
WriteBucket(*bit, false);
} else {
string_view key = get<string_view>(req.change);
table->CVCUponInsert(snapshot_version_, key, [this](PrimeTable::bucket_iterator it) {
DCHECK_LT(it.GetVersion(), snapshot_version_);
WriteBucket(it);
WriteBucket(it, false);
});
}
}

View file

@ -55,6 +55,7 @@ class RestoreStreamer : public JournalStreamer {
public:
RestoreStreamer(DbSlice* slice, SlotSet slots, uint32_t sync_id, journal::Journal* journal,
Context* cntx);
~RestoreStreamer() override;
void Start(io::Sink* dest) override;
// Cancel() must be called if Start() is called
@ -72,7 +73,7 @@ class RestoreStreamer : public JournalStreamer {
bool ShouldWrite(std::string_view key) const;
bool ShouldWrite(SlotId slot_id) const;
void WriteBucket(PrimeTable::bucket_iterator it);
void WriteBucket(PrimeTable::bucket_iterator it, bool allow_await);
void WriteEntry(string_view key, const PrimeValue& pv, uint64_t expire_ms);
void WriteCommand(journal::Entry::Payload cmd_payload);

View file

@ -5,7 +5,7 @@ import redis
from redis import asyncio as aioredis
import asyncio
from .instance import DflyInstanceFactory
from .instance import DflyInstanceFactory, DflyInstance
from .utility import *
from .replication_test import check_all_replicas_finished
@ -953,3 +953,178 @@ async def test_cluster_data_migration(df_local_factory: DflyInstanceFactory):
assert await c_nodes[1].execute_command("DBSIZE") == 17
await close_clients(*c_nodes, *c_nodes_admin)
from dataclasses import dataclass
@dataclass
class NodeInfo:
instance: DflyInstance
client: aioredis.Redis
admin_client: aioredis.Redis
slots: list
next_slots: list
sync_ids: list
@dfly_args({"proactor_threads": 4, "cluster_mode": "yes"})
async def test_cluster_fuzzymigration(df_local_factory: DflyInstanceFactory, df_seeder_factory):
node_count = 3
instances = [
df_local_factory.create(
port=BASE_PORT + i,
admin_port=BASE_PORT + i + 1000,
vmodule="cluster_family=9,cluster_slot_migration=9",
)
for i in range(node_count)
]
df_local_factory.start_all(instances)
nodes = [
NodeInfo(
instance=instance,
client=instance.client(),
admin_client=instance.admin_client(),
slots=[],
next_slots=[],
sync_ids=[],
)
for instance in instances
]
cluster_client = aioredis.RedisCluster(host="localhost", port=nodes[0].instance.port)
async def generate_config():
return [
{
"slot_ranges": [{"start": s, "end": e} for (s, e) in node.slots],
"master": {
"id": await get_node_id(node.admin_client),
"ip": "localhost",
"port": node.instance.port,
},
"replicas": [],
}
for node in nodes
]
# Generate equally sized ranges and distribute by nodes
step = 1000
for slot_range in [(s, min(s + step - 1, 16383)) for s in range(0, 16383, step)]:
nodes[random.randint(0, node_count - 1)].slots.append(slot_range)
# Push config to all nodes
await push_config(json.dumps(await generate_config()), [node.admin_client for node in nodes])
# Fill instances with some data
seeder = df_seeder_factory.create(port=nodes[0].instance.port, cluster_mode=True)
await seeder.run(target_deviation=0.1)
fill_task = asyncio.create_task(seeder.run())
# Generate migration plan
for node_idx, node in enumerate(nodes):
random.shuffle(node.slots)
# Decide on number of outgoing slot ranges
outgoing = [[] for _ in range(node_count)]
num_outgoing = random.randint(0, len(node.slots))
# Distribute first 0..num_outgoing
for slot_range in node.slots[:num_outgoing]:
dest_idx = random.randint(0, node_count - 1)
while dest_idx == node_idx:
dest_idx = random.randint(0, node_count - 1)
outgoing[dest_idx].append(slot_range)
for dest_idx, dest_slots in enumerate(outgoing):
if len(dest_slots) == 0:
continue
print(node_idx, "migrates to", dest_idx, "slots", dest_slots)
sync_id = await nodes[dest_idx].admin_client.execute_command(
"DFLYCLUSTER",
"START-SLOT-MIGRATION",
"127.0.0.1",
node.instance.admin_port,
*itertools.chain(*dest_slots),
)
nodes[node_idx].sync_ids.append(sync_id)
nodes[dest_idx].next_slots.extend(dest_slots)
keeping = node.slots[num_outgoing:]
node.next_slots.extend(keeping)
# Busy loop for migrations to finish - all in stable state
iterations = 0
while True:
for node in nodes:
states = await node.admin_client.execute_command("DFLYCLUSTER", "SLOT-MIGRATION-STATUS")
print(states)
if not all(s.endswith("STABLE_SYNC") for s in states) and not states == "NO_STATE":
break
else:
break
iterations += 1
assert iterations < 100
await asyncio.sleep(0.1)
# Give seeder one more second
await asyncio.sleep(1.0)
# Stop seeder
seeder.stop()
await fill_task
# Counter that pushes values to a list
async def list_counter(key, client: aioredis.RedisCluster):
for i in itertools.count(start=1):
await client.lpush(key, i)
# Start ten counters
counter_keys = [f"_counter{i}" for i in range(10)]
counters = [asyncio.create_task(list_counter(key, cluster_client)) for key in counter_keys]
# Finalize slot migration
for node in nodes:
for sync_id in node.sync_ids:
assert "OK" == await node.admin_client.execute_command(
"DFLYCLUSTER", "SLOT-MIGRATION-FINALIZE", sync_id
)
# Stop counters
for counter in counters:
counter.cancel()
# Push new config
await push_config(json.dumps(await generate_config()), [node.admin_client for node in nodes])
# Generate capture, capture ignores counter keys
capture = await seeder.capture()
# Transfer nodes
for node in nodes:
node.slots = node.next_slots
node.new_slots = []
# Check counter consistency
# TODO: This fails and exposes a REAL BUG!
# for key in counter_keys:
# counter_list = await cluster_client.lrange(key, 0, -1)
# for i, j in zip(counter_list, counter_list[1:]):
# print(f"comparing {i}, {j}")
# assert int(i) == int(j) + 1, f"huh? {counter_list}"
# Compare capture
assert await seeder.compare(capture, nodes[0].instance.port)
await disconnect_clients(
*[node.admin_client for node in nodes], *[node.client for node in nodes]
)
await close_clients(
cluster_client, *[node.admin_client for node in nodes], *[node.client for node in nodes]
)

View file

@ -330,7 +330,7 @@ class DflyInstanceFactory:
def create(self, existing_port=None, **kwargs) -> DflyInstance:
args = {**self.args, **kwargs}
args.setdefault("dbfilename", "")
vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1"
vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1,cluster_family=1"
args.setdefault("vmodule", vmod)
for k, v in args.items():

View file

@ -445,10 +445,18 @@ class DflySeeder:
def target(self, key_cnt):
self.gen.key_cnt_target = key_cnt
def _make_client(self, **kwargs):
if self.cluster_mode:
return aioredis.RedisCluster(host="localhost", **kwargs)
else:
return aioredis.Redis(**kwargs)
async def _capture_db(self, port, target_db, keys):
client = aioredis.Redis(port=port, db=target_db)
client = self._make_client(port=port, db=target_db)
capture = DataCapture(await self._capture_entries(client, keys))
await client.connection_pool.disconnect()
if hasattr(client, "connection_pool"):
await client.connection_pool.disconnect()
return capture
async def _generator_task(self, queues, target_ops=None, target_deviation=None):
@ -506,10 +514,7 @@ class DflySeeder:
return submitted
async def _executor_task(self, db, queue):
if self.cluster_mode:
client = aioredis.RedisCluster(host="localhost", port=self.port, db=db)
else:
client = aioredis.Redis(port=self.port, db=db)
client = self._make_client(port=self.port, db=db)
while True:
tx_data = await queue.get()