From 2f0287429d8d0138f16d291d716628e2a9cd206a Mon Sep 17 00:00:00 2001 From: Shahar Mike Date: Thu, 18 Jan 2024 12:29:59 +0200 Subject: [PATCH] fix(replication): Correctly replicate commands even when OOM (#2428) * fix(replication): Correctly replicate commands even when OOM Before this change, OOM in shard callbacks could have led to data inconsistency between the master and the replica. For example, commands which mutated data on 1 shard but failed on another, like `LMOVE`. After this change, callbacks that result in an OOM will correctly replicate their work (none, partial or complete) to replicas. Note that `MSET` and `MSETNX` required special handling, in that they are the only commands that can _create_ multiple keys, and so some of them can fail. Fixes #2381 * fixes * test fix * RecordJournal * UNDO idiotnessness * 2 shards * fix pytest --- src/server/string_family.cc | 49 ++++++++++++----- src/server/transaction.cc | 28 +++++++--- src/server/transaction.h | 4 +- tests/dragonfly/replication_test.py | 84 +++++++++++++---------------- 4 files changed, 96 insertions(+), 69 deletions(-) diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 6b1f91e72..257443b4b 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -323,21 +323,38 @@ int64_t AbsExpiryToTtl(int64_t abs_expiry_time, bool as_milli) { } // Returns true if keys were set, false otherwise. -OpStatus OpMSet(const OpArgs& op_args, ArgSlice args) { +void OpMSet(const OpArgs& op_args, ArgSlice args, atomic_bool* success) { DCHECK(!args.empty() && args.size() % 2 == 0); SetCmd::SetParams params; SetCmd sg(op_args, false); - for (size_t i = 0; i < args.size(); i += 2) { + size_t i = 0; + for (; i < args.size(); i += 2) { DVLOG(1) << "MSet " << args[i] << ":" << args[i + 1]; OpResult> res = sg.Set(params, args[i], args[i + 1]); if (res.status() != OpStatus::OK) { // OOM for example. - return res.status(); + success->store(false); + break; } } - return OpStatus::OK; + if (auto journal = op_args.shard->journal(); journal) { + // We write a custom journal because an OOM in the above loop could lead to partial success, so + // we replicate only what was changed. + string_view cmd; + ArgSlice cmd_args; + if (i == 0) { + // All shards must record the tx was executed for the replica to execute it, so we send a PING + // in case nothing was changed + cmd = "PING"; + } else { + // journal [0, i) + cmd = "MSET"; + cmd_args = ArgSlice(&args[0], i); + } + RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt()); + } } // See comment for SetCmd::Set() for when and how OpResult's value (i.e. optional) is set. @@ -1227,13 +1244,15 @@ void StringFamily::MSet(CmdArgList args, ConnectionContext* cntx) { LOG(INFO) << "MSET/" << transaction->GetUniqueShardCnt() << str; } + atomic_bool success = true; auto cb = [&](Transaction* t, EngineShard* shard) { auto args = t->GetShardArgs(shard->shard_id()); - return OpMSet(t->GetOpArgs(shard), args); + OpMSet(t->GetOpArgs(shard), args, &success); + return OpStatus::OK; }; OpStatus status = transaction->ScheduleSingleHop(std::move(cb)); - if (status == OpStatus::OK) { + if (success.load()) { cntx->SendOk(); } else { cntx->SendError(status); @@ -1261,19 +1280,20 @@ void StringFamily::MSetNx(CmdArgList args, ConnectionContext* cntx) { }; transaction->Execute(std::move(cb), false); - bool to_skip = exists.load(memory_order_relaxed) == true; + const bool to_skip = exists.load(memory_order_relaxed); + atomic_bool success = true; auto epilog_cb = [&](Transaction* t, EngineShard* shard) { if (to_skip) return OpStatus::OK; auto args = t->GetShardArgs(shard->shard_id()); - return OpMSet(t->GetOpArgs(shard), std::move(args)); + OpMSet(t->GetOpArgs(shard), std::move(args), &success); + return OpStatus::OK; }; - transaction->Execute(std::move(epilog_cb), true); - cntx->SendLong(to_skip ? 0 : 1); + cntx->SendLong(to_skip || !success.load() ? 0 : 1); } void StringFamily::StrLen(CmdArgList args, ConnectionContext* cntx) { @@ -1500,6 +1520,9 @@ constexpr uint32_t kClThrottle = THROTTLE; } // namespace acl void StringFamily::Register(CommandRegistry* registry) { + constexpr uint32_t kMSetMask = + CO::WRITE | CO::DENYOOM | CO::INTERLEAVED_KEYS | CO::NO_AUTOJOURNAL; + registry->StartFamily(); *registry << CI{"SET", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, -3, 1, 1, acl::kSet}.HFUNC(Set) @@ -1522,10 +1545,8 @@ void StringFamily::Register(CommandRegistry* registry) { << CI{"GETSET", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, acl::kGetSet}.HFUNC(GetSet) << CI{"MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING, -2, 1, -1, acl::kMGet}.HFUNC( MGet) - << CI{"MSET", CO::WRITE | CO::DENYOOM | CO::INTERLEAVED_KEYS, -3, 1, -1, acl::kMSet}.HFUNC( - MSet) - << CI{"MSETNX", CO::WRITE | CO::DENYOOM | CO::INTERLEAVED_KEYS, -3, 1, -1, acl::kMSetNx} - .HFUNC(MSetNx) + << CI{"MSET", kMSetMask, -3, 1, -1, acl::kMSet}.HFUNC(MSet) + << CI{"MSETNX", kMSetMask, -3, 1, -1, acl::kMSetNx}.HFUNC(MSetNx) << CI{"STRLEN", CO::READONLY | CO::FAST, 2, 1, 1, acl::kStrLen}.HFUNC(StrLen) << CI{"GETRANGE", CO::READONLY | CO::FAST, 4, 1, 1, acl::kGetRange}.HFUNC(GetRange) << CI{"SUBSTR", CO::READONLY | CO::FAST, 4, 1, 1, acl::kSubStr}.HFUNC( diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 8d7450568..9bf1dc5a8 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -555,7 +555,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { // Log to jounrnal only once the command finished running if (is_concluding || (multi_ && multi_->concluding)) - LogAutoJournalOnShard(shard); + LogAutoJournalOnShard(shard, result); // If we're the head of tx queue (txq_ooo is false), we remove ourselves upon first invocation // and successive hops are run by continuation_trans_ in engine shard. @@ -1047,6 +1047,10 @@ ShardId Transaction::GetUniqueShard() const { return unique_shard_id_; } +optional Transaction::GetUniqueSlotId() const { + return unique_slot_checker_.GetUniqueSlotId(); +} + KeyLockArgs Transaction::GetLockArgs(ShardId sid) const { KeyLockArgs res; res.db_index = db_index_; @@ -1089,7 +1093,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { DCHECK_EQ(sd.is_armed, false); unlocked_keys = false; } else { - LogAutoJournalOnShard(shard); + LogAutoJournalOnShard(shard, result); } } @@ -1327,7 +1331,7 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) { auto* shard = EngineShard::tlocal(); auto result = cb(this, shard); shard->db_slice().OnCbFinish(); - LogAutoJournalOnShard(shard); + LogAutoJournalOnShard(shard, result); DCHECK_EQ(result.flags, 0); // if it's sophisticated, we shouldn't squash it return result; @@ -1454,7 +1458,7 @@ optional Transaction::GetWakeKey(ShardId sid) const { return GetShardArgs(sid).at(sd.wake_key_pos); } -void Transaction::LogAutoJournalOnShard(EngineShard* shard) { +void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult result) { // TODO: For now, we ignore non shard coordination. if (shard == nullptr) return; @@ -1467,14 +1471,22 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) { if (cid_->IsWriteOnly() == 0 && (cid_->opt_mask() & CO::NO_KEY_TRANSACTIONAL) == 0) return; - // If autojournaling was disabled and not re-enabled, skip it - if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) && !renabled_auto_journal_.load(memory_order_relaxed)) - return; - auto journal = shard->journal(); if (journal == nullptr) return; + if (result.status != OpStatus::OK) { + // We log NOOP even for NO_AUTOJOURNAL commands because the non-success status could have been + // due to OOM in a single shard, while other shards succeeded + journal->RecordEntry(txid_, journal::Op::NOOP, db_index_, unique_shard_cnt_, + unique_slot_checker_.GetUniqueSlotId(), journal::Entry::Payload{}, true); + return; + } + + // If autojournaling was disabled and not re-enabled, skip it + if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) && !renabled_auto_journal_.load(memory_order_relaxed)) + return; + // TODO: Handle complex commands like LMPOP correctly once they are implemented. journal::Entry::Payload entry_payload; diff --git a/src/server/transaction.h b/src/server/transaction.h index c6ecb518f..18d16c1ce 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -295,6 +295,8 @@ class Transaction { // This method is meaningless if GetUniqueShardCnt() != 1. ShardId GetUniqueShard() const; + std::optional GetUniqueSlotId() const; + bool IsMulti() const { return bool(multi_); } @@ -518,7 +520,7 @@ class Transaction { // Log command in shard's journal, if this is a write command with auto-journaling enabled. // Should be called immediately after the last phase (hop). - void LogAutoJournalOnShard(EngineShard* shard); + void LogAutoJournalOnShard(EngineShard* shard, RunnableResult shard_result); // Returns the previous value of run count. uint32_t DecreaseRunCnt(); diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 539c7c5cf..acac25812 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -1838,52 +1838,6 @@ async def test_replicaof_reject_on_load(df_local_factory, df_seeder_factory): replica.stop() -# note: please be careful if you want to change any of the parameters used in this test. -# changing parameters without extensive testing may easily lead to weak testing case assertion -# which means eviction may not get triggered. -@pytest.mark.asyncio -@pytest.mark.skip(reason="Failing due to bug in replication on command errors") -async def test_policy_based_eviction_propagation(df_local_factory, df_seeder_factory): - master = df_local_factory.create( - proactor_threads=1, cache_mode="true", maxmemory="256mb", enable_heartbeat_eviction="false" - ) - replica = df_local_factory.create(proactor_threads=1) - df_local_factory.start_all([master, replica]) - - c_master = master.client() - c_replica = replica.client() - - await c_master.execute_command("DEBUG POPULATE 6000 size 44000") - - await c_replica.execute_command(f"REPLICAOF localhost {master.port}") - await wait_available_async(c_replica) - - seeder = df_seeder_factory.create( - port=master.port, - keys=500, - val_size=200, - stop_on_failure=False, - unsupported_types=[ - ValueType.JSON, - ValueType.LIST, - ValueType.SET, - ValueType.HSET, - ValueType.ZSET, - ], - ) - await seeder.run(target_deviation=0.1) - - info = await c_master.info("stats") - assert info["evicted_keys"] > 0, "Weak testcase: policy based eviction was not triggered." - - await check_all_replicas_finished([c_replica], c_master) - keys_master = await c_master.execute_command("keys *") - keys_replica = await c_replica.execute_command("keys *") - - assert set(keys_master) == set(keys_replica) - await disconnect_clients(c_master, *[c_replica]) - - @pytest.mark.asyncio async def test_heartbeat_eviction_propagation(df_local_factory): master = df_local_factory.create( @@ -1919,3 +1873,41 @@ async def test_heartbeat_eviction_propagation(df_local_factory): keys_replica = await c_replica.execute_command("keys *") assert set(keys_master) == set(keys_replica) await disconnect_clients(c_master, *[c_replica]) + + +@pytest.mark.asyncio +async def test_policy_based_eviction_propagation(df_local_factory, df_seeder_factory): + master = df_local_factory.create( + proactor_threads=2, + cache_mode="true", + maxmemory="512mb", + logtostdout="true", + enable_heartbeat_eviction="false", + ) + replica = df_local_factory.create(proactor_threads=2) + df_local_factory.start_all([master, replica]) + + c_master = master.client() + c_replica = replica.client() + + await c_master.execute_command("DEBUG POPULATE 6000 size 88000") + + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + await wait_available_async(c_replica) + + seeder = df_seeder_factory.create( + port=master.port, keys=500, val_size=1000, stop_on_failure=False + ) + await seeder.run(target_deviation=0.1) + + info = await c_master.info("stats") + assert info["evicted_keys"] > 0, "Weak testcase: policy based eviction was not triggered." + + await check_all_replicas_finished([c_replica], c_master) + keys_master = await c_master.execute_command("keys k*") + keys_replica = await c_replica.execute_command("keys k*") + + assert len(keys_master) == len(keys_replica) + assert set(keys_master) == set(keys_replica) + + await disconnect_clients(c_master, *[c_replica])