fix(bug): crash on takeover and info replication (#3282)

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2024-07-08 17:21:12 +03:00 committed by GitHub
parent fba902d0ac
commit 5c7c21b6c5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 30 additions and 28 deletions

View file

@ -2258,9 +2258,12 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
}
if (should_enter("REPLICATION")) {
ServerState& etl = *ServerState::tlocal();
if (etl.is_master) {
unique_lock lk(replicaof_mu_);
// Thread local var is_master is updated under mutex replicaof_mu_ together with replica_,
// ensuring eventual consistency of is_master. When determining if the server is a replica and
// accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is
// insufficient in this scenario.
if (!replica_) {
append("role", "master");
append("connected_slaves", m.facade_stats.conn_stats.num_replicas);
const auto& replicas = m.replication_metrics;
@ -2274,10 +2277,6 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
} else {
append("role", GetFlag(FLAGS_info_replication_valkey_compatible) ? "slave" : "replica");
// The replica pointer can still be mutated even while master=true,
// we don't want to drop the replica object in this fiber
unique_lock lk{replicaof_mu_};
auto replication_info_cb = [&](Replica::Info rinfo) {
append("master_host", rinfo.host);
append("master_port", rinfo.port);
@ -2737,8 +2736,12 @@ err:
void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
ServerState& etl = *ServerState::tlocal();
if (etl.is_master) {
unique_lock lk(replicaof_mu_);
// Thread local var is_master is updated under mutex replicaof_mu_ together with replica_,
// ensuring eventual consistency of is_master. When determining if the server is a replica and
// accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is
// insufficient in this scenario.
if (!replica_) {
rb->StartArray(2);
rb->SendBulkString("master");
auto vec = dfly_cmd_->GetReplicasRoleInfo();
@ -2751,7 +2754,6 @@ void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) {
}
} else {
unique_lock lk{replicaof_mu_};
rb->StartArray(4 + cluster_replicas_.size() * 3);
rb->SendBulkString(GetFlag(FLAGS_info_replication_valkey_compatible) ? "slave" : "replica");

View file

@ -336,7 +336,7 @@ class DflyInstanceFactory:
args.setdefault("noversion_check", None)
# MacOs does not set it automatically, so we need to set it manually
args.setdefault("maxmemory", "8G")
vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1,cluster_family=1"
vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1,cluster_family=1,dflycmd=1"
args.setdefault("vmodule", vmod)
args.setdefault("jsonpathv2")

View file

@ -1151,10 +1151,7 @@ take_over_cases = [
@pytest.mark.parametrize("master_threads, replica_threads", take_over_cases)
@pytest.mark.asyncio
async def test_take_over_counters(df_factory, master_threads, replica_threads):
master = df_factory.create(
proactor_threads=master_threads,
logtostderr=True,
)
master = df_factory.create(proactor_threads=master_threads)
replica1 = df_factory.create(proactor_threads=replica_threads)
replica2 = df_factory.create(proactor_threads=replica_threads)
replica3 = df_factory.create(proactor_threads=replica_threads)
@ -1214,11 +1211,7 @@ async def test_take_over_seeder(
request, df_factory, df_seeder_factory, master_threads, replica_threads
):
tmp_file_name = "".join(random.choices(string.ascii_letters, k=10))
master = df_factory.create(
proactor_threads=master_threads,
dbfilename=f"dump_{tmp_file_name}",
logtostderr=True,
)
master = df_factory.create(proactor_threads=master_threads, dbfilename=f"dump_{tmp_file_name}")
replica = df_factory.create(proactor_threads=replica_threads)
df_factory.start_all([master, replica])
@ -1229,17 +1222,27 @@ async def test_take_over_seeder(
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica)
async def seed():
await seeder.run(target_ops=3000)
fill_task = asyncio.create_task(seeder.run())
fill_task = asyncio.create_task(seed())
stop_info = False
async def info_task():
my_client = replica.client()
while not stop_info:
info = await my_client.info("replication")
asyncio.sleep(0.5)
info_task = asyncio.create_task(info_task())
# Give the seeder a bit of time.
await asyncio.sleep(1)
await asyncio.sleep(3)
logging.debug("running repltakover")
await c_replica.execute_command(f"REPLTAKEOVER 5 SAVE")
logging.debug("after running repltakover")
seeder.stop()
assert await c_replica.execute_command("role") == ["master", []]
stop_info = True
# Need to wait a bit to give time to write the shutdown snapshot
await asyncio.sleep(1)
@ -1258,10 +1261,7 @@ async def test_take_over_seeder(
@pytest.mark.parametrize("master_threads, replica_threads", [[4, 4]])
@pytest.mark.asyncio
async def test_take_over_read_commands(df_factory, master_threads, replica_threads):
master = df_factory.create(
proactor_threads=master_threads,
logtostderr=True,
)
master = df_factory.create(proactor_threads=master_threads)
replica = df_factory.create(proactor_threads=replica_threads)
df_factory.start_all([master, replica])