fix: Make replica's REPLCONF IP-ADDRESS optional (#3473)

**Background**

In v1.21.0 we introduced support for `--announce_ip` for replicas to
announce their public IP addresses.

Like Valkey, this uses `REPLCONF IP-ADDRESS` to announce their IP
address.

**The issue**

Older Dragonfly releases (<1.21) did not support this feature. The
master side simply returned an error for such `REPLCONF` attempts,
however the replica code failed the replication, resulting in
incompatible versions.

**The fix**

The fix is simple, just log an error if the master did not respect
`REPLCONF IP-ADDRESS`. We can make this non-optional in the future
again.

However, in addition, I added a regression test to make sure we are
backwards compatible with v1.19.2. We'll bump this up every once in a
while.
This commit is contained in:
Shahar Mike 2024-08-08 17:22:30 +03:00 committed by Roman Gershman
parent 420046aac8
commit 25fc8bce2a
No known key found for this signature in database
GPG key ID: 6568CCAB9736B618
3 changed files with 74 additions and 2 deletions

View file

@ -289,7 +289,8 @@ error_code Replica::Greet() {
auto announce_ip = absl::GetFlag(FLAGS_announce_ip);
if (!announce_ip.empty()) {
RETURN_ON_ERR(SendCommandAndReadResponse(StrCat("REPLCONF ip-address ", announce_ip)));
PC_RETURN_ON_BAD_RESPONSE(CheckRespIsSimpleReply("OK"));
LOG_IF(WARNING, !CheckRespIsSimpleReply("OK"))
<< "Master did not OK announced IP address, perhaps it is using an old version";
}
// Corresponds to server.repl_state == REPL_STATE_SEND_CAPA

View file

@ -334,7 +334,7 @@ class DflyInstanceFactory:
self.params = params
self.instances = []
def create(self, existing_port=None, **kwargs) -> DflyInstance:
def create(self, existing_port=None, path=None, **kwargs) -> DflyInstance:
args = {**self.args, **kwargs}
args.setdefault("dbfilename", "")
args.setdefault("noversion_check", None)
@ -352,6 +352,10 @@ class DflyInstanceFactory:
params = dataclasses.replace(self.params, existing_port=existing_port)
else:
params = self.params
if path is not None:
params = dataclasses.replace(self.params, path=path)
instance = DflyInstance(params, args)
self.instances.append(instance)
return instance

View file

@ -4,8 +4,11 @@ import re
import pytest
import asyncio
import async_timeout
import platform
import pymemcache
import logging
import tarfile
import urllib.request
from redis import asyncio as aioredis
from .utility import *
from .instance import DflyInstanceFactory, DflyInstance
@ -2232,3 +2235,67 @@ async def test_announce_ip_port(df_factory):
host, port, _ = node[0]
assert host == "overrode-host"
assert port == "1337"
def download_dragonfly_release(version):
path = f"/tmp/{tmp_file_name()}"
os.mkdir(path)
gzfile = f"{path}/dragonfly.tar.gz"
logging.debug(f"Downloading Dragonfly release into {gzfile}...")
# Download
urllib.request.urlretrieve(
f"https://github.com/dragonflydb/dragonfly/releases/download/{version}/dragonfly-x86_64.tar.gz",
gzfile,
)
# Extract
file = tarfile.open(gzfile)
file.extractall(path)
file.close()
# Return path
return f"{path}/dragonfly-x86_64"
@pytest.mark.parametrize(
"cluster_mode, announce_ip, announce_port",
[
("", "localhost", 7000),
("emulated", "", 0),
("emulated", "localhost", 7000),
],
)
async def test_replicate_old_master(
df_factory: DflyInstanceFactory, cluster_mode, announce_ip, announce_port
):
cpu = platform.processor()
if cpu != "x86_64":
pytest.skip(f"Supported only on x64, running on {cpu}")
dfly_version = "v1.19.2"
released_dfly_path = download_dragonfly_release(dfly_version)
master = df_factory.create(path=released_dfly_path, cluster_mode=cluster_mode)
replica = df_factory.create(
cluster_mode=cluster_mode, announce_ip=announce_ip, announce_port=announce_port
)
df_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
assert (
f"df-{dfly_version}"
== (await c_master.execute_command("info", "server"))["dragonfly_version"]
)
assert "df-dev" == (await c_replica.execute_command("info", "server"))["dragonfly_version"]
await c_master.execute_command("set", "k1", "v1")
assert await c_replica.execute_command(f"REPLICAOF localhost {master.port}") == "OK"
await wait_available_async(c_replica)
assert await c_replica.execute_command("get", "k1") == "v1"
await disconnect_clients(c_master, c_replica)