fix(replication): redis replication flush all before full sync (#946)

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2023-03-15 09:45:04 +02:00 committed by GitHub
parent 999caa1e4f
commit bafad66fc3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 62 additions and 2 deletions

View file

@ -418,12 +418,12 @@ error_code Replica::InitiatePSync() {
io::PrefixSource ps{io_buf.InputBuffer(), &ss};
// Set LOADING state.
// TODO: Flush db on retry.
CHECK(service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING) == GlobalState::LOADING);
absl::Cleanup cleanup = [this]() {
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
};
JournalExecutor{&service_}.FlushAll();
RdbLoader loader(NULL);
loader.set_source_limit(snapshot_size);
// TODO: to allow registering callbacks within loader to send '\n' pings back to master.
@ -1122,6 +1122,7 @@ error_code Replica::ParseAndExecute(base::IoBuf* io_buf, ConnectionContext* cntx
case RedisParser::OK:
if (!resp_args_.empty()) {
VLOG(2) << "Got command " << ToSV(resp_args_[0].GetBuf()) << "\n consumed: " << consumed;
facade::RespToArgList(resp_args_, &cmd_str_args_);
CmdArgList arg_list{cmd_str_args_.data(), cmd_str_args_.size()};
service_.DispatchCommand(arg_list, cntx);

View file

@ -151,7 +151,6 @@ replication_specs = [
@pytest.mark.asyncio
@pytest.mark.parametrize("t_replicas, seeder_config", replication_specs)
# @pytest.mark.skip(reason="Skipping until we fix replication from redis")
async def test_redis_replication_all(df_local_factory, df_seeder_factory, redis_server, t_replicas, seeder_config):
master = redis_server
c_master = aioredis.Redis(port=master.port)
@ -199,3 +198,63 @@ async def test_redis_replication_all(df_local_factory, df_seeder_factory, redis_
# Check data after stable state stream
await await_synced_all(master.port, [replica.port for replica in replicas])
await check_data(seeder, replicas, c_replicas)
master_disconnect_cases = [
([6], 1, dict(keys=4_000, dbcount=1, unsupported_types=[ValueType.JSON])),
([1, 4, 6], 3, dict(keys=1_000, dbcount=2, unsupported_types=[ValueType.JSON])),
]
@pytest.mark.asyncio
@pytest.mark.parametrize("t_replicas, t_disconnect, seeder_config", master_disconnect_cases)
async def test_disconnect_master(df_local_factory, df_seeder_factory, redis_server, t_replicas, t_disconnect, seeder_config):
master = redis_server
c_master = aioredis.Redis(port=master.port)
assert await c_master.ping()
replicas = [
df_local_factory.create(port=master.port+i+1, proactor_threads=t)
for i, t in enumerate(t_replicas)
]
# Fill master with test data
seeder = df_seeder_factory.create(port=master.port, **seeder_config)
await seeder.run(target_deviation=0.1)
# Start replicas
df_local_factory.start_all(replicas)
c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas]
# Start data stream
stream_task = asyncio.create_task(seeder.run())
await asyncio.sleep(0.0)
# Start replication
async def run_replication(c_replica):
await c_replica.execute_command("REPLICAOF localhost " + str(master.port))
await wait_available_async(c_replica)
await asyncio.gather(*(asyncio.create_task(run_replication(c))
for c in c_replicas))
# Wait for streaming to finish
assert not stream_task.done(
), "Weak testcase. Increase number of streamed iterations to surpass full sync"
seeder.stop()
await stream_task
for _ in range(t_disconnect):
master.stop()
await asyncio.sleep(1)
master.start()
await asyncio.sleep(1)
# fill master with data
await seeder.run(target_deviation=0.1)
# Check data after stable state stream
await asyncio.gather(*(asyncio.create_task(wait_available_async(c)) for c in c_replicas))
await await_synced_all(master.port, [replica.port for replica in replicas])
await check_data(seeder, replicas, c_replicas)