diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 6b1f91e72..257443b4b 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -323,21 +323,38 @@ int64_t AbsExpiryToTtl(int64_t abs_expiry_time, bool as_milli) { } // Returns true if keys were set, false otherwise. -OpStatus OpMSet(const OpArgs& op_args, ArgSlice args) { +void OpMSet(const OpArgs& op_args, ArgSlice args, atomic_bool* success) { DCHECK(!args.empty() && args.size() % 2 == 0); SetCmd::SetParams params; SetCmd sg(op_args, false); - for (size_t i = 0; i < args.size(); i += 2) { + size_t i = 0; + for (; i < args.size(); i += 2) { DVLOG(1) << "MSet " << args[i] << ":" << args[i + 1]; OpResult> res = sg.Set(params, args[i], args[i + 1]); if (res.status() != OpStatus::OK) { // OOM for example. - return res.status(); + success->store(false); + break; } } - return OpStatus::OK; + if (auto journal = op_args.shard->journal(); journal) { + // We write a custom journal because an OOM in the above loop could lead to partial success, so + // we replicate only what was changed. + string_view cmd; + ArgSlice cmd_args; + if (i == 0) { + // All shards must record the tx was executed for the replica to execute it, so we send a PING + // in case nothing was changed + cmd = "PING"; + } else { + // journal [0, i) + cmd = "MSET"; + cmd_args = ArgSlice(&args[0], i); + } + RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt()); + } } // See comment for SetCmd::Set() for when and how OpResult's value (i.e. optional) is set. @@ -1227,13 +1244,15 @@ void StringFamily::MSet(CmdArgList args, ConnectionContext* cntx) { LOG(INFO) << "MSET/" << transaction->GetUniqueShardCnt() << str; } + atomic_bool success = true; auto cb = [&](Transaction* t, EngineShard* shard) { auto args = t->GetShardArgs(shard->shard_id()); - return OpMSet(t->GetOpArgs(shard), args); + OpMSet(t->GetOpArgs(shard), args, &success); + return OpStatus::OK; }; OpStatus status = transaction->ScheduleSingleHop(std::move(cb)); - if (status == OpStatus::OK) { + if (success.load()) { cntx->SendOk(); } else { cntx->SendError(status); @@ -1261,19 +1280,20 @@ void StringFamily::MSetNx(CmdArgList args, ConnectionContext* cntx) { }; transaction->Execute(std::move(cb), false); - bool to_skip = exists.load(memory_order_relaxed) == true; + const bool to_skip = exists.load(memory_order_relaxed); + atomic_bool success = true; auto epilog_cb = [&](Transaction* t, EngineShard* shard) { if (to_skip) return OpStatus::OK; auto args = t->GetShardArgs(shard->shard_id()); - return OpMSet(t->GetOpArgs(shard), std::move(args)); + OpMSet(t->GetOpArgs(shard), std::move(args), &success); + return OpStatus::OK; }; - transaction->Execute(std::move(epilog_cb), true); - cntx->SendLong(to_skip ? 0 : 1); + cntx->SendLong(to_skip || !success.load() ? 0 : 1); } void StringFamily::StrLen(CmdArgList args, ConnectionContext* cntx) { @@ -1500,6 +1520,9 @@ constexpr uint32_t kClThrottle = THROTTLE; } // namespace acl void StringFamily::Register(CommandRegistry* registry) { + constexpr uint32_t kMSetMask = + CO::WRITE | CO::DENYOOM | CO::INTERLEAVED_KEYS | CO::NO_AUTOJOURNAL; + registry->StartFamily(); *registry << CI{"SET", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, -3, 1, 1, acl::kSet}.HFUNC(Set) @@ -1522,10 +1545,8 @@ void StringFamily::Register(CommandRegistry* registry) { << CI{"GETSET", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, acl::kGetSet}.HFUNC(GetSet) << CI{"MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING, -2, 1, -1, acl::kMGet}.HFUNC( MGet) - << CI{"MSET", CO::WRITE | CO::DENYOOM | CO::INTERLEAVED_KEYS, -3, 1, -1, acl::kMSet}.HFUNC( - MSet) - << CI{"MSETNX", CO::WRITE | CO::DENYOOM | CO::INTERLEAVED_KEYS, -3, 1, -1, acl::kMSetNx} - .HFUNC(MSetNx) + << CI{"MSET", kMSetMask, -3, 1, -1, acl::kMSet}.HFUNC(MSet) + << CI{"MSETNX", kMSetMask, -3, 1, -1, acl::kMSetNx}.HFUNC(MSetNx) << CI{"STRLEN", CO::READONLY | CO::FAST, 2, 1, 1, acl::kStrLen}.HFUNC(StrLen) << CI{"GETRANGE", CO::READONLY | CO::FAST, 4, 1, 1, acl::kGetRange}.HFUNC(GetRange) << CI{"SUBSTR", CO::READONLY | CO::FAST, 4, 1, 1, acl::kSubStr}.HFUNC( diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 8d7450568..9bf1dc5a8 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -555,7 +555,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { // Log to jounrnal only once the command finished running if (is_concluding || (multi_ && multi_->concluding)) - LogAutoJournalOnShard(shard); + LogAutoJournalOnShard(shard, result); // If we're the head of tx queue (txq_ooo is false), we remove ourselves upon first invocation // and successive hops are run by continuation_trans_ in engine shard. @@ -1047,6 +1047,10 @@ ShardId Transaction::GetUniqueShard() const { return unique_shard_id_; } +optional Transaction::GetUniqueSlotId() const { + return unique_slot_checker_.GetUniqueSlotId(); +} + KeyLockArgs Transaction::GetLockArgs(ShardId sid) const { KeyLockArgs res; res.db_index = db_index_; @@ -1089,7 +1093,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { DCHECK_EQ(sd.is_armed, false); unlocked_keys = false; } else { - LogAutoJournalOnShard(shard); + LogAutoJournalOnShard(shard, result); } } @@ -1327,7 +1331,7 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) { auto* shard = EngineShard::tlocal(); auto result = cb(this, shard); shard->db_slice().OnCbFinish(); - LogAutoJournalOnShard(shard); + LogAutoJournalOnShard(shard, result); DCHECK_EQ(result.flags, 0); // if it's sophisticated, we shouldn't squash it return result; @@ -1454,7 +1458,7 @@ optional Transaction::GetWakeKey(ShardId sid) const { return GetShardArgs(sid).at(sd.wake_key_pos); } -void Transaction::LogAutoJournalOnShard(EngineShard* shard) { +void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult result) { // TODO: For now, we ignore non shard coordination. if (shard == nullptr) return; @@ -1467,14 +1471,22 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) { if (cid_->IsWriteOnly() == 0 && (cid_->opt_mask() & CO::NO_KEY_TRANSACTIONAL) == 0) return; - // If autojournaling was disabled and not re-enabled, skip it - if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) && !renabled_auto_journal_.load(memory_order_relaxed)) - return; - auto journal = shard->journal(); if (journal == nullptr) return; + if (result.status != OpStatus::OK) { + // We log NOOP even for NO_AUTOJOURNAL commands because the non-success status could have been + // due to OOM in a single shard, while other shards succeeded + journal->RecordEntry(txid_, journal::Op::NOOP, db_index_, unique_shard_cnt_, + unique_slot_checker_.GetUniqueSlotId(), journal::Entry::Payload{}, true); + return; + } + + // If autojournaling was disabled and not re-enabled, skip it + if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) && !renabled_auto_journal_.load(memory_order_relaxed)) + return; + // TODO: Handle complex commands like LMPOP correctly once they are implemented. journal::Entry::Payload entry_payload; diff --git a/src/server/transaction.h b/src/server/transaction.h index c6ecb518f..18d16c1ce 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -295,6 +295,8 @@ class Transaction { // This method is meaningless if GetUniqueShardCnt() != 1. ShardId GetUniqueShard() const; + std::optional GetUniqueSlotId() const; + bool IsMulti() const { return bool(multi_); } @@ -518,7 +520,7 @@ class Transaction { // Log command in shard's journal, if this is a write command with auto-journaling enabled. // Should be called immediately after the last phase (hop). - void LogAutoJournalOnShard(EngineShard* shard); + void LogAutoJournalOnShard(EngineShard* shard, RunnableResult shard_result); // Returns the previous value of run count. uint32_t DecreaseRunCnt(); diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 539c7c5cf..acac25812 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -1838,52 +1838,6 @@ async def test_replicaof_reject_on_load(df_local_factory, df_seeder_factory): replica.stop() -# note: please be careful if you want to change any of the parameters used in this test. -# changing parameters without extensive testing may easily lead to weak testing case assertion -# which means eviction may not get triggered. -@pytest.mark.asyncio -@pytest.mark.skip(reason="Failing due to bug in replication on command errors") -async def test_policy_based_eviction_propagation(df_local_factory, df_seeder_factory): - master = df_local_factory.create( - proactor_threads=1, cache_mode="true", maxmemory="256mb", enable_heartbeat_eviction="false" - ) - replica = df_local_factory.create(proactor_threads=1) - df_local_factory.start_all([master, replica]) - - c_master = master.client() - c_replica = replica.client() - - await c_master.execute_command("DEBUG POPULATE 6000 size 44000") - - await c_replica.execute_command(f"REPLICAOF localhost {master.port}") - await wait_available_async(c_replica) - - seeder = df_seeder_factory.create( - port=master.port, - keys=500, - val_size=200, - stop_on_failure=False, - unsupported_types=[ - ValueType.JSON, - ValueType.LIST, - ValueType.SET, - ValueType.HSET, - ValueType.ZSET, - ], - ) - await seeder.run(target_deviation=0.1) - - info = await c_master.info("stats") - assert info["evicted_keys"] > 0, "Weak testcase: policy based eviction was not triggered." - - await check_all_replicas_finished([c_replica], c_master) - keys_master = await c_master.execute_command("keys *") - keys_replica = await c_replica.execute_command("keys *") - - assert set(keys_master) == set(keys_replica) - await disconnect_clients(c_master, *[c_replica]) - - @pytest.mark.asyncio async def test_heartbeat_eviction_propagation(df_local_factory): master = df_local_factory.create( @@ -1919,3 +1873,41 @@ async def test_heartbeat_eviction_propagation(df_local_factory): keys_replica = await c_replica.execute_command("keys *") assert set(keys_master) == set(keys_replica) await disconnect_clients(c_master, *[c_replica]) + + +@pytest.mark.asyncio +async def test_policy_based_eviction_propagation(df_local_factory, df_seeder_factory): + master = df_local_factory.create( + proactor_threads=2, + cache_mode="true", + maxmemory="512mb", + logtostdout="true", + enable_heartbeat_eviction="false", + ) + replica = df_local_factory.create(proactor_threads=2) + df_local_factory.start_all([master, replica]) + + c_master = master.client() + c_replica = replica.client() + + await c_master.execute_command("DEBUG POPULATE 6000 size 88000") + + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + await wait_available_async(c_replica) + + seeder = df_seeder_factory.create( + port=master.port, keys=500, val_size=1000, stop_on_failure=False + ) + await seeder.run(target_deviation=0.1) + + info = await c_master.info("stats") + assert info["evicted_keys"] > 0, "Weak testcase: policy based eviction was not triggered." + + await check_all_replicas_finished([c_replica], c_master) + keys_master = await c_master.execute_command("keys k*") + keys_replica = await c_replica.execute_command("keys k*") + + assert len(keys_master) == len(keys_replica) + assert set(keys_master) == set(keys_replica) + + await disconnect_clients(c_master, *[c_replica])