diff --git a/src/server/replica.cc b/src/server/replica.cc index 1202d9b02..4979cc077 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -808,7 +808,7 @@ void Replica::StableSyncDflyReadFb(Context* cntx) { io::PrefixSource ps{prefix, &ss}; JournalReader reader{&ps, 0}; - + TransactionReader tx_reader{}; while (!cntx->IsCancelled()) { waker_.await([&]() { return ((trans_data_queue_.size() < kYieldAfterItemsInQueue) || cntx->IsCancelled()); @@ -816,7 +816,7 @@ void Replica::StableSyncDflyReadFb(Context* cntx) { if (cntx->IsCancelled()) break; - auto tx_data = TransactionData::ReadNext(&reader, cntx); + auto tx_data = tx_reader.NextTxData(&reader, cntx); if (!tx_data) break; @@ -1187,6 +1187,7 @@ bool Replica::TransactionData::AddEntry(journal::ParsedEntry&& entry) { return true; case journal::Op::MULTI_COMMAND: commands.push_back(std::move(entry.cmd)); + dbid = entry.dbid; return false; default: DCHECK(false) << "Unsupported opcode"; @@ -1198,18 +1199,31 @@ bool Replica::TransactionData::IsGlobalCmd() const { return commands.size() == 1 && commands.front().cmd_args.size() == 1; } -auto Replica::TransactionData::ReadNext(JournalReader* reader, Context* cntx) +// Expired entries within MULTI...EXEC sequence which belong to different database +// should be executed outside the multi transaciton. +bool Replica::TransactionReader::ReturnEntryOOO(const journal::ParsedEntry& entry) { + return !tx_data_.commands.empty() && entry.opcode == journal::Op::EXPIRED && + tx_data_.dbid != entry.dbid; +} + +auto Replica::TransactionReader::NextTxData(JournalReader* reader, Context* cntx) -> optional { - TransactionData out; io::Result res; do { if (res = reader->ReadEntry(); !res) { cntx->ReportError(res.error()); return std::nullopt; } - } while (!cntx->IsCancelled() && !out.AddEntry(std::move(*res))); - return cntx->IsCancelled() ? std::nullopt : make_optional(std::move(out)); + if (ReturnEntryOOO(*res)) { + TransactionData tmp_tx; + CHECK(tmp_tx.AddEntry(std::move(*res))); + return tmp_tx; + } + + } while (!cntx->IsCancelled() && !tx_data_.AddEntry(std::move(*res))); + + return cntx->IsCancelled() ? std::nullopt : make_optional(std::move(tx_data_)); } } // namespace dfly diff --git a/src/server/replica.h b/src/server/replica.h index 424b7709d..cfd806864 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -59,16 +59,21 @@ class Replica { bool AddEntry(journal::ParsedEntry&& entry); bool IsGlobalCmd() const; - - // Collect next complete transaction data from journal reader. - static std::optional ReadNext(JournalReader* reader, Context* cntx); - TxId txid; DbIndex dbid; uint32_t shard_cnt; std::vector commands; }; + // Utility for reading TransactionData from a journal reader. + struct TransactionReader { + std::optional NextTxData(JournalReader* reader, Context* cntx); + bool ReturnEntryOOO(const journal::ParsedEntry& entry); + + private: + TransactionData tx_data_{}; + }; + // Coorindator for multi shard execution. struct MultiShardExecution { boost::fibers::mutex map_mu; diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index a4c69d6df..15e661437 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -29,7 +29,7 @@ replication_cases = [ (8, [2, 2, 2, 2], dict(keys=4_000, dbcount=4)), (4, [8, 8], dict(keys=4_000, dbcount=4)), (4, [1] * 8, dict(keys=500, dbcount=2)), - #(1, [1], dict(keys=100, dbcount=2)), + (1, [1], dict(keys=100, dbcount=2)), ] @@ -85,19 +85,22 @@ async def test_replication_all(df_local_factory, df_seeder_factory, t_master, t_ async def check_replica_finished_exec(c_replica): info_stats = await c_replica.execute_command("INFO") tc1 = info_stats['total_commands_processed'] - await asyncio.sleep(0.1) + await asyncio.sleep(0.5) info_stats = await c_replica.execute_command("INFO") tc2 = info_stats['total_commands_processed'] - return tc1+1 == tc2 # Replica processed only the info command on above sleep. + # Replica processed only the info command on above sleep. + return tc1+1 == tc2 + async def check_all_replicas_finished(c_replicas): while True: await asyncio.sleep(1.0) is_finished_arr = await asyncio.gather(*(asyncio.create_task(check_replica_finished_exec(c)) - for c in c_replicas)) + for c in c_replicas)) if all(is_finished_arr): break + async def check_data(seeder, replicas, c_replicas): capture = await seeder.capture() for (replica, c_replica) in zip(replicas, c_replicas): @@ -439,7 +442,7 @@ async def test_rewrites(df_local_factory): expected_cmds = len(rx_list) for i in range(expected_cmds): mcmd = (await get_next_command()) - #check command matches one regex from list + # check command matches one regex from list match_rx = list(filter(lambda rx: re.match(rx, mcmd), rx_list)) assert len(match_rx) == 1 rx_list.remove(match_rx[0]) @@ -527,7 +530,6 @@ async def test_rewrites(df_local_factory): # Check BRPOP turns into RPOP await check("BRPOP list 0", r"RPOP list") - await c_master.lpush("list1s", "v1", "v2", "v3", "v4") await skip_cmd() # Check LMOVE turns into LPUSH LPOP on multi shard @@ -578,30 +580,47 @@ async def test_expiry(df_local_factory, n_keys=1000): res = await c_replica.mget(k for k, _ in gen_test_data(n_keys)) assert all(v is not None for v in res) - # Set key expries in 500ms + # Set key differnt expries times in ms pipe = c_master.pipeline(transaction=True) for k, _ in gen_test_data(n_keys): - pipe.pexpire(k, 500) + ms = random.randint(100, 500) + pipe.pexpire(k, ms) await pipe.execute() - # Wait two seconds for heatbeat to pick them up - await asyncio.sleep(2.0) + # send more traffic for differnt dbs while keys are expired + for i in range(8): + is_multi = i % 2 + c_master_db = aioredis.Redis(port=master.port, db=i) + pipe = c_master_db.pipeline(transaction=is_multi) + # Set simple keys n_keys..n_keys*2 on master + start_key = n_keys*(i+1) + end_key = start_key + n_keys + batch_fill_data(client=pipe, gen=gen_test_data( + end_key, start_key), batch_size=20) - assert len(await c_master.keys()) == 0 - assert len(await c_replica.keys()) == 0 + await pipe.execute() - # Set keys + # Wait for master to expire keys + await asyncio.sleep(3.0) + + # Check all keys with expiry has be deleted + res = await c_master.mget(k for k, _ in gen_test_data(n_keys)) + assert all(v is None for v in res) + # Check replica finished executing the replicated commands + await check_all_replicas_finished([c_replica]) + res = await c_replica.mget(k for k, _ in gen_test_data(n_keys)) + assert all(v is None for v in res) + + # Set expired keys again pipe = c_master.pipeline(transaction=False) batch_fill_data(pipe, gen_test_data(n_keys)) for k, _ in gen_test_data(n_keys): pipe.pexpire(k, 500) await pipe.execute() - await asyncio.sleep(1.0) - # Disconnect from master await c_replica.execute_command("REPLICAOF NO ONE") - # Check replica expires keys on its own await asyncio.sleep(1.0) - assert len(await c_replica.keys()) == 0 + res = await c_replica.mget(k for k, _ in gen_test_data(n_keys)) + assert all(v is None for v in res)