diff --git a/src/server/set_family.cc b/src/server/set_family.cc index a6be3b811..dc942b63d 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -612,7 +612,9 @@ OpResult OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !new_key); if (journal_update && op_args.shard->journal()) { - RecordJournal(op_args, "DEL"sv, ArgSlice{key}); + if (overwrite) { + RecordJournal(op_args, "DEL"sv, ArgSlice{key}); + } vector mapped(vals.size() + 1); mapped[0] = key; std::copy(vals.begin(), vals.end(), mapped.begin() + 1); diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index b80566a69..63f84994e 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -375,7 +375,7 @@ Test journal rewrites. """ -@dfly_args({"proactor_threads": 1}) +@dfly_args({"proactor_threads": 4}) @pytest.mark.asyncio async def test_rewrites(df_local_factory): CLOSE_TIMESTAMP = (int(time.time()) + 100) @@ -393,25 +393,48 @@ async def test_rewrites(df_local_factory): await c_replica.execute_command(f"REPLICAOF localhost {master.port}") await wait_available_async(c_replica) - # Make sure journal writer sends its first SELECT command on its only shard - await c_master.set("no-select-please", "ok") - await asyncio.sleep(0.5) - # Create monitor and bind utility functions m_replica = c_replica.monitor() - async def check_rsp(rx): - print("waiting on", rx) + async def get_next_command(): mcmd = (await m_replica.next_command())['command'] - print("Got:", mcmd, "Regex", rx) - assert re.match(rx, mcmd) + # skip select command + if (mcmd == "SELECT 0"): + print("Got:", mcmd) + mcmd = (await m_replica.next_command())['command'] + print("Got:", mcmd) + return mcmd + + async def is_match_rsp(rx): + mcmd = (await get_next_command()) + print("Regex:", rx) + return re.match(rx, mcmd) async def skip_cmd(): - await check_rsp(r".*") + await is_match_rsp(r".*") async def check(cmd, rx): await c_master.execute_command(cmd) - await check_rsp(rx) + match = (await is_match_rsp(rx)) + assert match + + async def check_list(cmd, rx_list): + print("master cmd:", cmd) + await c_master.execute_command(cmd) + for rx in rx_list: + match = (await is_match_rsp(rx)) + assert match + + async def check_list_ooo(cmd, rx_list): + print("master cmd:", cmd) + await c_master.execute_command(cmd) + expected_cmds = len(rx_list) + for i in range(expected_cmds): + mcmd = (await get_next_command()) + #check command matches one regex from list + match_rx = list(filter(lambda rx: re.match(rx, mcmd), rx_list)) + assert len(match_rx) == 1 + rx_list.remove(match_rx[0]) async def check_expire(key): ttl1 = await c_master.ttl(key) @@ -458,3 +481,58 @@ async def test_rewrites(df_local_factory): await check_expire("k") await check("GETEX k EX 100", r"PEXPIREAT k (.*?)") await check_expire("k") + + # Check SDIFFSTORE turns into DEL and SADD + await c_master.sadd("set1", "v1", "v2", "v3") + await c_master.sadd("set2", "v1", "v2") + await skip_cmd() + await skip_cmd() + await check_list("SDIFFSTORE k set1 set2", [r"DEL k", r"SADD k v3"]) + + # Check SINTERSTORE turns into DEL and SADD + await check_list("SINTERSTORE k set1 set2", [r"DEL k", r"SADD k (.*?)"]) + + # Check SMOVE turns into SREM and SADD + await check_list_ooo("SMOVE set1 set2 v3", [r"SREM set1 v3", r"SADD set2 v3"]) + + # Check SUNIONSTORE turns into DEL and SADD + await check_list_ooo("SUNIONSTORE k set1 set2", [r"DEL k", r"SADD k (.*?)"]) + + await c_master.set("k1", "1000") + await c_master.set("k2", "1100") + await skip_cmd() + await skip_cmd() + # Check BITOP turns into SET + await check("BITOP OR kdest k1 k2", r"SET kdest 1100") + + # Check there is no rewrite for LMOVE on single shard + await c_master.lpush("list", "v1", "v2", "v3", "v4") + await skip_cmd() + await check("LMOVE list list LEFT RIGHT", r"LMOVE list list LEFT RIGHT") + + # Check there is no rewrite for RPOPLPUSH on single shard + await check("RPOPLPUSH list list", r"RPOPLPUSH list list") + # Check there is no rewrite for BRPOPLPUSH on single shard + await check("BRPOPLPUSH list list 0", r"BRPOPLPUSH list list 0") + + + await c_master.lpush("list1s", "v1", "v2", "v3", "v4") + await skip_cmd() + # Check LMOVE turns into LPUSH LPOP on multi shard + await check_list_ooo("LMOVE list1s list2s LEFT LEFT", [r"LPUSH list2s v4", r"LPOP list1s"]) + # Check RPOPLPUSH turns into LPUSH RPOP on multi shard + await check_list_ooo("RPOPLPUSH list1s list2s", [r"LPUSH list2s v1", r"RPOP list1s"]) + # Check BRPOPLPUSH turns into LPUSH RPOP on multi shard + await check_list_ooo("BRPOPLPUSH list1s list2s 0", [r"LPUSH list2s v2", r"RPOP list1s"]) + + # MOVE runs as global command, check only one journal entry is sent + await check("MOVE list2s 2", r"MOVE list2s 2") + + await c_master.set("renamekey", "1000", px=50000) + await skip_cmd() + # Check RENAME turns into DEL SET and PEXPIREAT + await check_list_ooo("RENAME renamekey renamed", [r"DEL renamekey", r"SET renamed 1000", r"PEXPIREAT renamed (.*?)"]) + await check_expire("renamed") + # Check RENAMENX turns into DEL SET and PEXPIREAT + await check_list_ooo("RENAMENX renamed renamekey", [r"DEL renamed", r"SET renamekey 1000", r"PEXPIREAT renamekey (.*?)"]) + await check_expire("renamekey")