fix(replication): Correctly replicate commands even when OOM (#2428)

* fix(replication): Correctly replicate commands even when OOM

Before this change, OOM in shard callbacks could have led to data
inconsistency between the master and the replica. For example, commands
which mutated data on 1 shard but failed on another, like `LMOVE`.

After this change, callbacks that result in an OOM will correctly
replicate their work (none, partial or complete) to replicas.

Note that `MSET` and `MSETNX` required special handling, in that they are
the only commands that can _create_ multiple keys, and so some of them
can fail.

Fixes #2381

* fixes

* test fix

* RecordJournal

* UNDO idiotnessness

* 2 shards

* fix pytest
This commit is contained in:
Shahar Mike 2024-01-18 12:29:59 +02:00 committed by GitHub
parent b66db852f9
commit 2f0287429d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 96 additions and 69 deletions

View file

@ -323,21 +323,38 @@ int64_t AbsExpiryToTtl(int64_t abs_expiry_time, bool as_milli) {
}
// Returns true if keys were set, false otherwise.
OpStatus OpMSet(const OpArgs& op_args, ArgSlice args) {
void OpMSet(const OpArgs& op_args, ArgSlice args, atomic_bool* success) {
DCHECK(!args.empty() && args.size() % 2 == 0);
SetCmd::SetParams params;
SetCmd sg(op_args, false);
for (size_t i = 0; i < args.size(); i += 2) {
size_t i = 0;
for (; i < args.size(); i += 2) {
DVLOG(1) << "MSet " << args[i] << ":" << args[i + 1];
OpResult<optional<string>> res = sg.Set(params, args[i], args[i + 1]);
if (res.status() != OpStatus::OK) { // OOM for example.
return res.status();
success->store(false);
break;
}
}
return OpStatus::OK;
if (auto journal = op_args.shard->journal(); journal) {
// We write a custom journal because an OOM in the above loop could lead to partial success, so
// we replicate only what was changed.
string_view cmd;
ArgSlice cmd_args;
if (i == 0) {
// All shards must record the tx was executed for the replica to execute it, so we send a PING
// in case nothing was changed
cmd = "PING";
} else {
// journal [0, i)
cmd = "MSET";
cmd_args = ArgSlice(&args[0], i);
}
RecordJournal(op_args, cmd, cmd_args, op_args.tx->GetUniqueShardCnt());
}
}
// See comment for SetCmd::Set() for when and how OpResult's value (i.e. optional<string>) is set.
@ -1227,13 +1244,15 @@ void StringFamily::MSet(CmdArgList args, ConnectionContext* cntx) {
LOG(INFO) << "MSET/" << transaction->GetUniqueShardCnt() << str;
}
atomic_bool success = true;
auto cb = [&](Transaction* t, EngineShard* shard) {
auto args = t->GetShardArgs(shard->shard_id());
return OpMSet(t->GetOpArgs(shard), args);
OpMSet(t->GetOpArgs(shard), args, &success);
return OpStatus::OK;
};
OpStatus status = transaction->ScheduleSingleHop(std::move(cb));
if (status == OpStatus::OK) {
if (success.load()) {
cntx->SendOk();
} else {
cntx->SendError(status);
@ -1261,19 +1280,20 @@ void StringFamily::MSetNx(CmdArgList args, ConnectionContext* cntx) {
};
transaction->Execute(std::move(cb), false);
bool to_skip = exists.load(memory_order_relaxed) == true;
const bool to_skip = exists.load(memory_order_relaxed);
atomic_bool success = true;
auto epilog_cb = [&](Transaction* t, EngineShard* shard) {
if (to_skip)
return OpStatus::OK;
auto args = t->GetShardArgs(shard->shard_id());
return OpMSet(t->GetOpArgs(shard), std::move(args));
OpMSet(t->GetOpArgs(shard), std::move(args), &success);
return OpStatus::OK;
};
transaction->Execute(std::move(epilog_cb), true);
cntx->SendLong(to_skip ? 0 : 1);
cntx->SendLong(to_skip || !success.load() ? 0 : 1);
}
void StringFamily::StrLen(CmdArgList args, ConnectionContext* cntx) {
@ -1500,6 +1520,9 @@ constexpr uint32_t kClThrottle = THROTTLE;
} // namespace acl
void StringFamily::Register(CommandRegistry* registry) {
constexpr uint32_t kMSetMask =
CO::WRITE | CO::DENYOOM | CO::INTERLEAVED_KEYS | CO::NO_AUTOJOURNAL;
registry->StartFamily();
*registry
<< CI{"SET", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, -3, 1, 1, acl::kSet}.HFUNC(Set)
@ -1522,10 +1545,8 @@ void StringFamily::Register(CommandRegistry* registry) {
<< CI{"GETSET", CO::WRITE | CO::DENYOOM | CO::FAST, 3, 1, 1, acl::kGetSet}.HFUNC(GetSet)
<< CI{"MGET", CO::READONLY | CO::FAST | CO::REVERSE_MAPPING, -2, 1, -1, acl::kMGet}.HFUNC(
MGet)
<< CI{"MSET", CO::WRITE | CO::DENYOOM | CO::INTERLEAVED_KEYS, -3, 1, -1, acl::kMSet}.HFUNC(
MSet)
<< CI{"MSETNX", CO::WRITE | CO::DENYOOM | CO::INTERLEAVED_KEYS, -3, 1, -1, acl::kMSetNx}
.HFUNC(MSetNx)
<< CI{"MSET", kMSetMask, -3, 1, -1, acl::kMSet}.HFUNC(MSet)
<< CI{"MSETNX", kMSetMask, -3, 1, -1, acl::kMSetNx}.HFUNC(MSetNx)
<< CI{"STRLEN", CO::READONLY | CO::FAST, 2, 1, 1, acl::kStrLen}.HFUNC(StrLen)
<< CI{"GETRANGE", CO::READONLY | CO::FAST, 4, 1, 1, acl::kGetRange}.HFUNC(GetRange)
<< CI{"SUBSTR", CO::READONLY | CO::FAST, 4, 1, 1, acl::kSubStr}.HFUNC(

View file

@ -555,7 +555,7 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
// Log to jounrnal only once the command finished running
if (is_concluding || (multi_ && multi_->concluding))
LogAutoJournalOnShard(shard);
LogAutoJournalOnShard(shard, result);
// If we're the head of tx queue (txq_ooo is false), we remove ourselves upon first invocation
// and successive hops are run by continuation_trans_ in engine shard.
@ -1047,6 +1047,10 @@ ShardId Transaction::GetUniqueShard() const {
return unique_shard_id_;
}
optional<SlotId> Transaction::GetUniqueSlotId() const {
return unique_slot_checker_.GetUniqueSlotId();
}
KeyLockArgs Transaction::GetLockArgs(ShardId sid) const {
KeyLockArgs res;
res.db_index = db_index_;
@ -1089,7 +1093,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
DCHECK_EQ(sd.is_armed, false);
unlocked_keys = false;
} else {
LogAutoJournalOnShard(shard);
LogAutoJournalOnShard(shard, result);
}
}
@ -1327,7 +1331,7 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
auto* shard = EngineShard::tlocal();
auto result = cb(this, shard);
shard->db_slice().OnCbFinish();
LogAutoJournalOnShard(shard);
LogAutoJournalOnShard(shard, result);
DCHECK_EQ(result.flags, 0); // if it's sophisticated, we shouldn't squash it
return result;
@ -1454,7 +1458,7 @@ optional<string_view> Transaction::GetWakeKey(ShardId sid) const {
return GetShardArgs(sid).at(sd.wake_key_pos);
}
void Transaction::LogAutoJournalOnShard(EngineShard* shard) {
void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult result) {
// TODO: For now, we ignore non shard coordination.
if (shard == nullptr)
return;
@ -1467,14 +1471,22 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) {
if (cid_->IsWriteOnly() == 0 && (cid_->opt_mask() & CO::NO_KEY_TRANSACTIONAL) == 0)
return;
// If autojournaling was disabled and not re-enabled, skip it
if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) && !renabled_auto_journal_.load(memory_order_relaxed))
return;
auto journal = shard->journal();
if (journal == nullptr)
return;
if (result.status != OpStatus::OK) {
// We log NOOP even for NO_AUTOJOURNAL commands because the non-success status could have been
// due to OOM in a single shard, while other shards succeeded
journal->RecordEntry(txid_, journal::Op::NOOP, db_index_, unique_shard_cnt_,
unique_slot_checker_.GetUniqueSlotId(), journal::Entry::Payload{}, true);
return;
}
// If autojournaling was disabled and not re-enabled, skip it
if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) && !renabled_auto_journal_.load(memory_order_relaxed))
return;
// TODO: Handle complex commands like LMPOP correctly once they are implemented.
journal::Entry::Payload entry_payload;

View file

@ -295,6 +295,8 @@ class Transaction {
// This method is meaningless if GetUniqueShardCnt() != 1.
ShardId GetUniqueShard() const;
std::optional<SlotId> GetUniqueSlotId() const;
bool IsMulti() const {
return bool(multi_);
}
@ -518,7 +520,7 @@ class Transaction {
// Log command in shard's journal, if this is a write command with auto-journaling enabled.
// Should be called immediately after the last phase (hop).
void LogAutoJournalOnShard(EngineShard* shard);
void LogAutoJournalOnShard(EngineShard* shard, RunnableResult shard_result);
// Returns the previous value of run count.
uint32_t DecreaseRunCnt();

View file

@ -1838,52 +1838,6 @@ async def test_replicaof_reject_on_load(df_local_factory, df_seeder_factory):
replica.stop()
# note: please be careful if you want to change any of the parameters used in this test.
# changing parameters without extensive testing may easily lead to weak testing case assertion
# which means eviction may not get triggered.
@pytest.mark.asyncio
@pytest.mark.skip(reason="Failing due to bug in replication on command errors")
async def test_policy_based_eviction_propagation(df_local_factory, df_seeder_factory):
master = df_local_factory.create(
proactor_threads=1, cache_mode="true", maxmemory="256mb", enable_heartbeat_eviction="false"
)
replica = df_local_factory.create(proactor_threads=1)
df_local_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
await c_master.execute_command("DEBUG POPULATE 6000 size 44000")
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica)
seeder = df_seeder_factory.create(
port=master.port,
keys=500,
val_size=200,
stop_on_failure=False,
unsupported_types=[
ValueType.JSON,
ValueType.LIST,
ValueType.SET,
ValueType.HSET,
ValueType.ZSET,
],
)
await seeder.run(target_deviation=0.1)
info = await c_master.info("stats")
assert info["evicted_keys"] > 0, "Weak testcase: policy based eviction was not triggered."
await check_all_replicas_finished([c_replica], c_master)
keys_master = await c_master.execute_command("keys *")
keys_replica = await c_replica.execute_command("keys *")
assert set(keys_master) == set(keys_replica)
await disconnect_clients(c_master, *[c_replica])
@pytest.mark.asyncio
async def test_heartbeat_eviction_propagation(df_local_factory):
master = df_local_factory.create(
@ -1919,3 +1873,41 @@ async def test_heartbeat_eviction_propagation(df_local_factory):
keys_replica = await c_replica.execute_command("keys *")
assert set(keys_master) == set(keys_replica)
await disconnect_clients(c_master, *[c_replica])
@pytest.mark.asyncio
async def test_policy_based_eviction_propagation(df_local_factory, df_seeder_factory):
master = df_local_factory.create(
proactor_threads=2,
cache_mode="true",
maxmemory="512mb",
logtostdout="true",
enable_heartbeat_eviction="false",
)
replica = df_local_factory.create(proactor_threads=2)
df_local_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
await c_master.execute_command("DEBUG POPULATE 6000 size 88000")
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica)
seeder = df_seeder_factory.create(
port=master.port, keys=500, val_size=1000, stop_on_failure=False
)
await seeder.run(target_deviation=0.1)
info = await c_master.info("stats")
assert info["evicted_keys"] > 0, "Weak testcase: policy based eviction was not triggered."
await check_all_replicas_finished([c_replica], c_master)
keys_master = await c_master.execute_command("keys k*")
keys_replica = await c_replica.execute_command("keys k*")
assert len(keys_master) == len(keys_replica)
assert set(keys_master) == set(keys_replica)
await disconnect_clients(c_master, *[c_replica])