fix(redis replication): remove partial sync flow ,not supported yet (#2865)

* fix redis replicaiton: remove partial sync flow ,not supported yet

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2024-04-10 09:27:02 +03:00 committed by GitHub
parent fffeb4bacd
commit eb164be596
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 80 additions and 4 deletions

View file

@ -359,7 +359,8 @@ error_code Replica::InitiatePSync() {
int64_t offs = -1;
if (!master_context_.master_repl_id.empty()) { // in case we synced before
id = master_context_.master_repl_id; // provide the replication offset and master id
offs = repl_offs_; // to try incremental sync.
// TBD: for incremental sync send repl_offs_, not supported yet.
// offs = repl_offs_;
}
RETURN_ON_ERR(SendCommand(StrCat("PSYNC ", id, " ", offs)));
@ -1136,6 +1137,11 @@ error_code Replica::ParseReplicationHeader(base::IoBuf* io_buf, PSyncResponse* d
// That could change due to redis failovers.
// TODO: part sync
dest->fullsync.emplace<size_t>(0);
LOG(ERROR) << "Partial replication not supported yet";
return std::make_error_code(std::errc::not_supported);
} else {
LOG(ERROR) << "Unknown replication header";
return bad_header();
}
return error_code{};

View file

@ -5,6 +5,7 @@ from redis import asyncio as aioredis
import subprocess
from .utility import *
from .instance import DflyInstanceFactory
from .proxy import Proxy
class RedisServer:
@ -55,8 +56,6 @@ async def await_synced(c_master: aioredis.Redis, c_replica: aioredis.Redis, dbco
timeout -= 1
await c_master.close()
await c_replica.close()
if timeout == 0:
breakpoint()
assert timeout > 0, "Timeout while waiting for replica to sync"
@ -229,7 +228,7 @@ master_disconnect_cases = [
@pytest.mark.parametrize("t_replicas, t_disconnect, seeder_config", master_disconnect_cases)
async def test_disconnect_master(
async def test_redis_master_restart(
df_local_factory,
df_seeder_factory,
redis_server,
@ -286,3 +285,74 @@ async def test_disconnect_master(
await asyncio.gather(*(asyncio.create_task(wait_available_async(c)) for c in c_replicas))
await await_synced_all(c_master, c_replicas)
await check_data(seeder, replicas, c_replicas)
master_disconnect_cases = [
([6], dict(keys=4_000, dbcount=1, unsupported_types=[ValueType.JSON])),
pytest.param(
[1, 4, 6],
dict(keys=1_000, dbcount=2, unsupported_types=[ValueType.JSON]),
marks=pytest.mark.slow,
),
]
@pytest.mark.parametrize("t_replicas, seeder_config", master_disconnect_cases)
async def test_disconnect_master(
df_local_factory,
df_seeder_factory,
redis_server,
t_replicas,
seeder_config,
port_picker,
):
master = redis_server
c_master = aioredis.Redis(port=master.port)
assert await c_master.ping()
proxy = Proxy("127.0.0.1", 1114, "127.0.0.1", master.port)
await proxy.start()
proxy_task = asyncio.create_task(proxy.serve())
replicas = [
df_local_factory.create(port=port_picker.get_available_port(), proactor_threads=t)
for i, t in enumerate(t_replicas)
]
# Fill master with test data
seeder = df_seeder_factory.create(port=master.port, **seeder_config)
await seeder.run(target_deviation=0.1)
# Start replicas
df_local_factory.start_all(replicas)
c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas]
# Start data stream
stream_task = asyncio.create_task(seeder.run())
await asyncio.sleep(0.5)
# Start replication
async def run_replication(c_replica):
await c_replica.execute_command("REPLICAOF localhost " + str(proxy.port))
await wait_available_async(c_replica)
await asyncio.gather(*(asyncio.create_task(run_replication(c)) for c in c_replicas))
# Break the connection between master and replica
await proxy.close(proxy_task)
await asyncio.sleep(2)
await proxy.start()
proxy_task = asyncio.create_task(proxy.serve())
# finish streaming data
await asyncio.sleep(1)
seeder.stop()
await stream_task
# Check data after stable state stream
await asyncio.gather(*(asyncio.create_task(wait_available_async(c)) for c in c_replicas))
await await_synced_all(c_master, c_replicas)
await check_data(seeder, replicas, c_replicas)
await proxy.close(proxy_task)