From 7f56a435c4a3551fa83628e051c321f0b33450d3 Mon Sep 17 00:00:00 2001 From: adiholden Date: Sun, 23 Apr 2023 23:46:51 +0300 Subject: [PATCH] bug(server): replicate scripts in stable state (#1114) * bug(server): replicate scripts in stable state --------- Signed-off-by: adi_holden --- src/server/command_registry.cc | 4 +++- src/server/command_registry.h | 1 + src/server/db_slice.cc | 7 +++++- src/server/script_mgr.cc | 5 ++++ src/server/server_family.cc | 2 +- src/server/transaction.cc | 31 +++++++++++++++++++++---- src/server/transaction.h | 3 +++ tests/dragonfly/replication_test.py | 36 +++++++++++++++++++++++++++++ 8 files changed, 81 insertions(+), 8 deletions(-) diff --git a/src/server/command_registry.cc b/src/server/command_registry.cc index 9315145f6..1b8df89ea 100644 --- a/src/server/command_registry.cc +++ b/src/server/command_registry.cc @@ -29,7 +29,7 @@ CommandId::CommandId(const char* name, uint32_t mask, int8_t arity, int8_t first } bool CommandId::IsTransactional() const { - if (first_key_ > 0 || (opt_mask_ & CO::GLOBAL_TRANS)) + if (first_key_ > 0 || (opt_mask_ & CO::GLOBAL_TRANS) || (opt_mask_ & CO::NO_KEY_JOURNAL)) return true; string_view name{name_}; @@ -124,6 +124,8 @@ const char* OptName(CO::CommandOpt fl) { return "variadic-keys"; case NO_AUTOJOURNAL: return "custom-journal"; + case NO_KEY_JOURNAL: + return "no-key-journal"; } return "unknown"; } diff --git a/src/server/command_registry.h b/src/server/command_registry.h index 6bc1ac7f7..c7fdb2662 100644 --- a/src/server/command_registry.h +++ b/src/server/command_registry.h @@ -34,6 +34,7 @@ enum CommandOpt : uint32_t { GLOBAL_TRANS = 1U << 12, NO_AUTOJOURNAL = 1U << 15, // Skip automatically logging command to journal inside transaction. + NO_KEY_JOURNAL = 1U << 16, // Command with no keys that need to be journaled }; const char* OptName(CommandOpt fl); diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 83e8d2e92..738dedb99 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -674,6 +674,9 @@ size_t DbSlice::DbSize(DbIndex db_ind) const { } bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) { + if (lock_args.args.empty()) { + return true; + } DCHECK_GT(lock_args.key_step, 0u); auto& lt = db_arr_[lock_args.db_index]->trans_locks; @@ -700,6 +703,9 @@ bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) { } void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) { + if (lock_args.args.empty()) { + return; + } DVLOG(2) << "Release " << IntentLock::ModeName(mode) << " for " << lock_args.args[0]; if (lock_args.args.size() == 1) { Release(mode, lock_args.db_index, lock_args.args.front(), 1); @@ -729,7 +735,6 @@ bool DbSlice::CheckLock(IntentLock::Mode mode, DbIndex dbid, string_view key) co } bool DbSlice::CheckLock(IntentLock::Mode mode, const KeyLockArgs& lock_args) const { - DCHECK(!lock_args.args.empty()); const auto& lt = db_arr_[lock_args.db_index]->trans_locks; for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { auto s = lock_args.args[i]; diff --git a/src/server/script_mgr.cc b/src/server/script_mgr.cc index 14a6f0282..b1d6385a9 100644 --- a/src/server/script_mgr.cc +++ b/src/server/script_mgr.cc @@ -20,6 +20,7 @@ #include "facade/error.h" #include "server/engine_shard_set.h" #include "server/server_state.h" +#include "server/transaction.h" ABSL_FLAG(std::string, default_lua_config, "", "Configure default mode for running Lua scripts: \n - Use 'allow-undeclared-keys' to " @@ -126,6 +127,10 @@ void ScriptMgr::LoadCmd(CmdArgList args, ConnectionContext* cntx) { if (!res) return (*cntx)->SendError(res.error().Format()); + // Schedule empty callback inorder to journal command via transaction framework. + auto cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; + + cntx->transaction->ScheduleSingleHop(std::move(cb)); return (*cntx)->SendBulkString(res.value()); } diff --git a/src/server/server_family.cc b/src/server/server_family.cc index a93beca98..a2f10b3b7 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -2111,7 +2111,7 @@ void ServerFamily::Register(CommandRegistry* registry) { << CI{"REPLCONF", CO::ADMIN | CO::LOADING, -1, 0, 0, 0}.HFUNC(ReplConf) << CI{"ROLE", CO::LOADING | CO::FAST | CO::NOSCRIPT, 1, 0, 0, 0}.HFUNC(Role) << CI{"SLOWLOG", CO::ADMIN | CO::FAST, -2, 0, 0, 0}.SetHandler(SlowLog) - << CI{"SCRIPT", CO::NOSCRIPT, -2, 0, 0, 0}.HFUNC(Script) + << CI{"SCRIPT", CO::NOSCRIPT | CO::NO_KEY_JOURNAL, -2, 0, 0, 0}.HFUNC(Script) << CI{"DFLY", CO::ADMIN | CO::GLOBAL_TRANS, -2, 0, 0, 0}.HFUNC(Dfly); } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 99ac19770..45d1e289d 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -83,6 +83,14 @@ void Transaction::InitGlobal() { sd.local_mask = ACTIVE; } +void Transaction::InitNoKey() { + // No key command will use the first shard. + unique_shard_cnt_ = 1; + unique_shard_id_ = 0; + shard_data_.resize(1); + shard_data_.front().local_mask |= ACTIVE; +} + void Transaction::BuildShardIndex(KeyIndex key_index, bool rev_mapping, std::vector* out) { auto args = full_args_; @@ -310,6 +318,11 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) { return OpStatus::OK; } + if ((cid_->opt_mask() & CO::NO_KEY_JOURNAL) > 0) { + InitNoKey(); + return OpStatus::OK; + } + DCHECK_EQ(unique_shard_cnt_, 0u); DCHECK(args_.empty()); @@ -916,6 +929,7 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const { res.db_index = db_index_; res.key_step = cid_->key_arg_step(); res.args = GetShardArgs(sid); + DCHECK(!res.args.empty() || (cid_->opt_mask() & CO::NO_KEY_JOURNAL)); return res; } @@ -947,6 +961,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { sd.pq_pos = shard->txq()->Insert(this); DCHECK_EQ(0, sd.local_mask & KEYLOCK_ACQUIRED); + shard->db_slice().Acquire(mode, lock_args); sd.local_mask |= KEYLOCK_ACQUIRED; @@ -1057,8 +1072,6 @@ bool Transaction::CancelShardCb(EngineShard* shard) { // runs in engine-shard thread. ArgSlice Transaction::GetShardArgs(ShardId sid) const { - DCHECK(!args_.empty()); - // We can read unique_shard_cnt_ only because ShardArgsInShard is called after IsArmedInShard // barrier. if (unique_shard_cnt_ == 1) { @@ -1289,10 +1302,18 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) { if (multi_ && multi_->role == SQUASHER) return; - // Ignore non-write commands or ones with disabled autojournal. - if ((cid_->opt_mask() & CO::WRITE) == 0 || ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) > 0 && - !renabled_auto_journal_.load(memory_order_relaxed))) + bool journal_by_cmd_mask = true; + if ((cid_->opt_mask() & CO::NO_KEY_JOURNAL) > 0) { + journal_by_cmd_mask = true; // Enforce journaling for commands that dont change the db. + } else if ((cid_->opt_mask() & CO::WRITE) == 0) { + journal_by_cmd_mask = false; // Non-write command are not journaled. + } else if ((cid_->opt_mask() & CO::NO_AUTOJOURNAL) > 0 && + !renabled_auto_journal_.load(memory_order_relaxed)) { + journal_by_cmd_mask = false; // Command disabled auto journal. + } + if (!journal_by_cmd_mask) { return; + } auto journal = shard->journal(); if (journal == nullptr) diff --git a/src/server/transaction.h b/src/server/transaction.h index 2331292ef..68bb43a5d 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -389,6 +389,9 @@ class Transaction { // Init as a global transaction. void InitGlobal(); + // Init when command has no keys and it need to use transaction framework + void InitNoKey(); + // Init with a set of keys. void InitByKeys(KeyIndex keys); diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 13363c586..635ce02cd 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -835,3 +835,39 @@ async def test_auth_master(df_local_factory, n_keys=20): assert all(v is not None for v in res) await c_master.connection_pool.disconnect() await c_replica.connection_pool.disconnect() + + +SCRIPT_TEMPLATE = "return {}" + + +@dfly_args({"proactor_threads": 2}) +async def test_script_transfer(df_local_factory): + master = df_local_factory.create(port=BASE_PORT) + replica = df_local_factory.create(port=BASE_PORT+1) + + df_local_factory.start_all([master, replica]) + + c_master = aioredis.Redis(port=master.port) + c_replica = aioredis.Redis(port=replica.port) + + # Load some scripts into master ahead + scripts = [] + for i in range(0, 10): + sha = await c_master.script_load(SCRIPT_TEMPLATE.format(i)) + scripts.append(sha) + + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + await wait_available_async(c_replica) + + # transfer in stable state + for i in range(10, 20): + sha = await c_master.script_load(SCRIPT_TEMPLATE.format(i)) + scripts.append(sha) + + await check_all_replicas_finished([c_replica], c_master) + await c_replica.execute_command("REPLICAOF NO ONE") + + for i, sha in enumerate(scripts): + assert await c_replica.evalsha(sha, 0) == i + await c_master.connection_pool.disconnect() + await c_replica.connection_pool.disconnect()