mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
test(replica): test journal rewrite with multi shards (#720)
Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
b0741b40ad
commit
d660787c6b
2 changed files with 92 additions and 12 deletions
|
@ -612,7 +612,9 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
|
|||
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !new_key);
|
||||
if (journal_update && op_args.shard->journal()) {
|
||||
RecordJournal(op_args, "DEL"sv, ArgSlice{key});
|
||||
if (overwrite) {
|
||||
RecordJournal(op_args, "DEL"sv, ArgSlice{key});
|
||||
}
|
||||
vector<string_view> mapped(vals.size() + 1);
|
||||
mapped[0] = key;
|
||||
std::copy(vals.begin(), vals.end(), mapped.begin() + 1);
|
||||
|
|
|
@ -375,7 +375,7 @@ Test journal rewrites.
|
|||
"""
|
||||
|
||||
|
||||
@dfly_args({"proactor_threads": 1})
|
||||
@dfly_args({"proactor_threads": 4})
|
||||
@pytest.mark.asyncio
|
||||
async def test_rewrites(df_local_factory):
|
||||
CLOSE_TIMESTAMP = (int(time.time()) + 100)
|
||||
|
@ -393,25 +393,48 @@ async def test_rewrites(df_local_factory):
|
|||
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
|
||||
await wait_available_async(c_replica)
|
||||
|
||||
# Make sure journal writer sends its first SELECT command on its only shard
|
||||
await c_master.set("no-select-please", "ok")
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
# Create monitor and bind utility functions
|
||||
m_replica = c_replica.monitor()
|
||||
|
||||
async def check_rsp(rx):
|
||||
print("waiting on", rx)
|
||||
async def get_next_command():
|
||||
mcmd = (await m_replica.next_command())['command']
|
||||
print("Got:", mcmd, "Regex", rx)
|
||||
assert re.match(rx, mcmd)
|
||||
# skip select command
|
||||
if (mcmd == "SELECT 0"):
|
||||
print("Got:", mcmd)
|
||||
mcmd = (await m_replica.next_command())['command']
|
||||
print("Got:", mcmd)
|
||||
return mcmd
|
||||
|
||||
async def is_match_rsp(rx):
|
||||
mcmd = (await get_next_command())
|
||||
print("Regex:", rx)
|
||||
return re.match(rx, mcmd)
|
||||
|
||||
async def skip_cmd():
|
||||
await check_rsp(r".*")
|
||||
await is_match_rsp(r".*")
|
||||
|
||||
async def check(cmd, rx):
|
||||
await c_master.execute_command(cmd)
|
||||
await check_rsp(rx)
|
||||
match = (await is_match_rsp(rx))
|
||||
assert match
|
||||
|
||||
async def check_list(cmd, rx_list):
|
||||
print("master cmd:", cmd)
|
||||
await c_master.execute_command(cmd)
|
||||
for rx in rx_list:
|
||||
match = (await is_match_rsp(rx))
|
||||
assert match
|
||||
|
||||
async def check_list_ooo(cmd, rx_list):
|
||||
print("master cmd:", cmd)
|
||||
await c_master.execute_command(cmd)
|
||||
expected_cmds = len(rx_list)
|
||||
for i in range(expected_cmds):
|
||||
mcmd = (await get_next_command())
|
||||
#check command matches one regex from list
|
||||
match_rx = list(filter(lambda rx: re.match(rx, mcmd), rx_list))
|
||||
assert len(match_rx) == 1
|
||||
rx_list.remove(match_rx[0])
|
||||
|
||||
async def check_expire(key):
|
||||
ttl1 = await c_master.ttl(key)
|
||||
|
@ -458,3 +481,58 @@ async def test_rewrites(df_local_factory):
|
|||
await check_expire("k")
|
||||
await check("GETEX k EX 100", r"PEXPIREAT k (.*?)")
|
||||
await check_expire("k")
|
||||
|
||||
# Check SDIFFSTORE turns into DEL and SADD
|
||||
await c_master.sadd("set1", "v1", "v2", "v3")
|
||||
await c_master.sadd("set2", "v1", "v2")
|
||||
await skip_cmd()
|
||||
await skip_cmd()
|
||||
await check_list("SDIFFSTORE k set1 set2", [r"DEL k", r"SADD k v3"])
|
||||
|
||||
# Check SINTERSTORE turns into DEL and SADD
|
||||
await check_list("SINTERSTORE k set1 set2", [r"DEL k", r"SADD k (.*?)"])
|
||||
|
||||
# Check SMOVE turns into SREM and SADD
|
||||
await check_list_ooo("SMOVE set1 set2 v3", [r"SREM set1 v3", r"SADD set2 v3"])
|
||||
|
||||
# Check SUNIONSTORE turns into DEL and SADD
|
||||
await check_list_ooo("SUNIONSTORE k set1 set2", [r"DEL k", r"SADD k (.*?)"])
|
||||
|
||||
await c_master.set("k1", "1000")
|
||||
await c_master.set("k2", "1100")
|
||||
await skip_cmd()
|
||||
await skip_cmd()
|
||||
# Check BITOP turns into SET
|
||||
await check("BITOP OR kdest k1 k2", r"SET kdest 1100")
|
||||
|
||||
# Check there is no rewrite for LMOVE on single shard
|
||||
await c_master.lpush("list", "v1", "v2", "v3", "v4")
|
||||
await skip_cmd()
|
||||
await check("LMOVE list list LEFT RIGHT", r"LMOVE list list LEFT RIGHT")
|
||||
|
||||
# Check there is no rewrite for RPOPLPUSH on single shard
|
||||
await check("RPOPLPUSH list list", r"RPOPLPUSH list list")
|
||||
# Check there is no rewrite for BRPOPLPUSH on single shard
|
||||
await check("BRPOPLPUSH list list 0", r"BRPOPLPUSH list list 0")
|
||||
|
||||
|
||||
await c_master.lpush("list1s", "v1", "v2", "v3", "v4")
|
||||
await skip_cmd()
|
||||
# Check LMOVE turns into LPUSH LPOP on multi shard
|
||||
await check_list_ooo("LMOVE list1s list2s LEFT LEFT", [r"LPUSH list2s v4", r"LPOP list1s"])
|
||||
# Check RPOPLPUSH turns into LPUSH RPOP on multi shard
|
||||
await check_list_ooo("RPOPLPUSH list1s list2s", [r"LPUSH list2s v1", r"RPOP list1s"])
|
||||
# Check BRPOPLPUSH turns into LPUSH RPOP on multi shard
|
||||
await check_list_ooo("BRPOPLPUSH list1s list2s 0", [r"LPUSH list2s v2", r"RPOP list1s"])
|
||||
|
||||
# MOVE runs as global command, check only one journal entry is sent
|
||||
await check("MOVE list2s 2", r"MOVE list2s 2")
|
||||
|
||||
await c_master.set("renamekey", "1000", px=50000)
|
||||
await skip_cmd()
|
||||
# Check RENAME turns into DEL SET and PEXPIREAT
|
||||
await check_list_ooo("RENAME renamekey renamed", [r"DEL renamekey", r"SET renamed 1000", r"PEXPIREAT renamed (.*?)"])
|
||||
await check_expire("renamed")
|
||||
# Check RENAMENX turns into DEL SET and PEXPIREAT
|
||||
await check_list_ooo("RENAMENX renamed renamekey", [r"DEL renamed", r"SET renamekey 1000", r"PEXPIREAT renamekey (.*?)"])
|
||||
await check_expire("renamekey")
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue