feat(server): master stop sending exec opcode to replica (#3289)

* feat server: master stop sending exec opcode to replica

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2024-07-09 14:48:31 +03:00 committed by GitHub
parent 5c7c21b6c5
commit ccada875e0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 14 additions and 132 deletions

View file

@ -97,9 +97,7 @@ TEST(Journal, WriteRead) {
{2, Op::COMMAND, 1, 1, nullopt, Payload("LPUSH", list("l", "v1", "v2"))}, {2, Op::COMMAND, 1, 1, nullopt, Payload("LPUSH", list("l", "v1", "v2"))},
{3, Op::COMMAND, 0, 1, nullopt, Payload("MSET", slice("D", "4"))}, {3, Op::COMMAND, 0, 1, nullopt, Payload("MSET", slice("D", "4"))},
{4, Op::COMMAND, 1, 1, nullopt, Payload("DEL", list("l1"))}, {4, Op::COMMAND, 1, 1, nullopt, Payload("DEL", list("l1"))},
{5, Op::COMMAND, 2, 1, nullopt, Payload("DEL", list("E", "2"))}, {5, Op::COMMAND, 2, 1, nullopt, Payload("DEL", list("E", "2"))}};
{6, Op::MULTI_COMMAND, 2, 1, nullopt, Payload("SET", list("E", "2"))},
{6, Op::EXEC, 2, 1, nullopt}};
// Write all entries to a buffer. // Write all entries to a buffer.
base::IoBuf buf; base::IoBuf buf;

View file

@ -76,8 +76,6 @@ void JournalWriter::Write(const journal::Entry& entry) {
return; return;
case journal::Op::COMMAND: case journal::Op::COMMAND:
case journal::Op::EXPIRED: case journal::Op::EXPIRED:
case journal::Op::MULTI_COMMAND:
case journal::Op::EXEC:
Write(entry.txid); Write(entry.txid);
Write(entry.shard_cnt); Write(entry.shard_cnt);
Write(entry.payload); Write(entry.payload);

View file

@ -1993,7 +1993,6 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
cntx->transaction = stub_tx.get(); cntx->transaction = stub_tx.get();
result = interpreter->RunFunction(eval_args.sha, &error); result = interpreter->RunFunction(eval_args.sha, &error);
cntx->transaction->FIX_ConcludeJournalExec(); // flush journal
cntx->transaction = tx; cntx->transaction = tx;
return OpStatus::OK; return OpStatus::OK;

View file

@ -116,7 +116,6 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm
auto& sinfo = PrepareShardInfo(last_sid, slot_checker.GetUniqueSlotId()); auto& sinfo = PrepareShardInfo(last_sid, slot_checker.GetUniqueSlotId());
sinfo.had_writes |= cmd->Cid()->IsWriteOnly();
sinfo.cmds.push_back(cmd); sinfo.cmds.push_back(cmd);
order_.push_back(last_sid); order_.push_back(last_sid);
@ -280,10 +279,6 @@ void MultiCommandSquasher::Run() {
// Set last txid. // Set last txid.
cntx_->last_command_debug.clock = cntx_->transaction->txid(); cntx_->last_command_debug.clock = cntx_->transaction->txid();
if (!sharded_.empty())
cntx_->transaction->ReportWritesSquashedMulti(
[this](ShardId sid) { return sharded_[sid].had_writes; });
// UnlockMulti is a no-op for non-atomic multi transactions, // UnlockMulti is a no-op for non-atomic multi transactions,
// still called for correctness and future changes // still called for correctness and future changes
if (!IsAtomic()) { if (!IsAtomic()) {

View file

@ -30,10 +30,9 @@ class MultiCommandSquasher {
private: private:
// Per-shard execution info. // Per-shard execution info.
struct ShardExecInfo { struct ShardExecInfo {
ShardExecInfo() : had_writes{false}, cmds{}, replies{}, local_tx{nullptr} { ShardExecInfo() : cmds{}, replies{}, local_tx{nullptr} {
} }
bool had_writes;
std::vector<StoredCmd*> cmds; // accumulated commands std::vector<StoredCmd*> cmds; // accumulated commands
std::vector<facade::CapturingReplyBuilder::Payload> replies; std::vector<facade::CapturingReplyBuilder::Payload> replies;
boost::intrusive_ptr<Transaction> local_tx; // stub-mode tx for use inside shard boost::intrusive_ptr<Transaction> local_tx; // stub-mode tx for use inside shard

View file

@ -351,11 +351,6 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
// no database switch can be performed between those two calls, because they are part of one // no database switch can be performed between those two calls, because they are part of one
// transaction. // transaction.
void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) { void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool await) {
// We ignore EXEC entries because we they have no meaning during
// the LOAD phase on replica.
if (item.opcode == journal::Op::EXEC)
return;
// To enable journal flushing to sync after non auto journal command is executed we call // To enable journal flushing to sync after non auto journal command is executed we call
// TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no // TriggerJournalWriteToSink. This call uses the NOOP opcode with await=true. Since there is no
// additional journal change to serialize, it simply invokes PushSerializedToChannel. // additional journal change to serialize, it simply invokes PushSerializedToChannel.

View file

@ -132,8 +132,6 @@ Transaction::Transaction(const CommandId* cid) : cid_{cid} {
string_view cmd_name(cid_->name()); string_view cmd_name(cid_->name());
if (cmd_name == "EXEC" || cmd_name == "EVAL" || cmd_name == "EVALSHA") { if (cmd_name == "EXEC" || cmd_name == "EVAL" || cmd_name == "EVALSHA") {
multi_.reset(new MultiData); multi_.reset(new MultiData);
multi_->shard_journal_write.resize(shard_set->size(), false);
multi_->mode = NOT_DETERMINED; multi_->mode = NOT_DETERMINED;
multi_->role = DEFAULT; multi_->role = DEFAULT;
} }
@ -153,7 +151,6 @@ Transaction::Transaction(const Transaction* parent, ShardId shard_id,
} }
multi_->role = SQUASHED_STUB; multi_->role = SQUASHED_STUB;
multi_->shard_journal_write.resize(1);
MultiUpdateWithParent(parent); MultiUpdateWithParent(parent);
if (slot_id.has_value()) { if (slot_id.has_value()) {
@ -597,11 +594,6 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) {
// This is the last hop, so clear cont_trans if its held by the current tx // This is the last hop, so clear cont_trans if its held by the current tx
shard->RemoveContTx(this); shard->RemoveContTx(this);
if (IsAtomicMulti()) { // Can only be true if run through ScheduleSingleHop
DCHECK(cid_->IsMultiTransactional());
MultiReportJournalOnShard(shard);
}
// It has 2 responsibilities. // It has 2 responsibilities.
// 1: to go over potential wakened keys, verify them and activate watch queues. // 1: to go over potential wakened keys, verify them and activate watch queues.
// 2: if this transaction was notified and finished running - to remove it from the head // 2: if this transaction was notified and finished running - to remove it from the head
@ -758,15 +750,6 @@ void Transaction::ScheduleInternal() {
} }
} }
void Transaction::ReportWritesSquashedMulti(absl::FunctionRef<bool(ShardId)> had_write) {
DCHECK(multi_);
for (unsigned i = 0; i < multi_->shard_journal_write.size(); i++)
multi_->shard_journal_write[i] |= had_write(i);
// Update imemdiately if we decide to conclude after one hop without UnlockMulti
multi_->shard_journal_cnt = CalcMultiNumOfShardJournals();
}
// Runs in the coordinator fiber. // Runs in the coordinator fiber.
void Transaction::UnlockMulti() { void Transaction::UnlockMulti() {
VLOG(1) << "UnlockMulti " << DebugId(); VLOG(1) << "UnlockMulti " << DebugId();
@ -782,8 +765,6 @@ void Transaction::UnlockMulti() {
sharded_keys[sid].emplace_back(fp); sharded_keys[sid].emplace_back(fp);
} }
multi_->shard_journal_cnt = ServerState::tlocal()->journal() ? CalcMultiNumOfShardJournals() : 0;
use_count_.fetch_add(shard_data_.size(), std::memory_order_relaxed); use_count_.fetch_add(shard_data_.size(), std::memory_order_relaxed);
DCHECK_EQ(shard_data_.size(), shard_set->size()); DCHECK_EQ(shard_data_.size(), shard_set->size());
@ -798,16 +779,6 @@ void Transaction::UnlockMulti() {
VLOG(1) << "UnlockMultiEnd " << DebugId(); VLOG(1) << "UnlockMultiEnd " << DebugId();
} }
uint32_t Transaction::CalcMultiNumOfShardJournals() const {
uint32_t shard_journals_cnt = 0;
for (bool was_shard_write : multi_->shard_journal_write) {
if (was_shard_write) {
++shard_journals_cnt;
}
}
return shard_journals_cnt;
}
OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
Execute(cb, true); Execute(cb, true);
return local_result_; return local_result_;
@ -919,14 +890,6 @@ const absl::flat_hash_set<std::pair<ShardId, LockFp>>& Transaction::GetMultiFps(
return multi_->tag_fps; return multi_->tag_fps;
} }
void Transaction::FIX_ConcludeJournalExec() {
if (!multi_->shard_journal_write.front())
return;
multi_->shard_journal_cnt = 1;
MultiReportJournalOnShard(EngineShard::tlocal());
}
string Transaction::DEBUG_PrintFailState(ShardId sid) const { string Transaction::DEBUG_PrintFailState(ShardId sid) const {
auto res = StrCat( auto res = StrCat(
"usc: ", unique_shard_cnt_, ", name:", GetCId()->name(), "usc: ", unique_shard_cnt_, ", name:", GetCId()->name(),
@ -1262,21 +1225,9 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
return result; return result;
} }
void Transaction::MultiReportJournalOnShard(EngineShard* shard) const {
DCHECK_EQ(EngineShard::tlocal(), shard);
auto* journal = shard->journal();
size_t write_idx = multi_->role == SQUASHED_STUB ? 0 : shard->shard_id();
if (journal != nullptr && multi_->shard_journal_write[write_idx]) {
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, multi_->shard_journal_cnt,
unique_slot_checker_.GetUniqueSlotId(), {}, true);
}
}
void Transaction::UnlockMultiShardCb(absl::Span<const LockFp> fps, EngineShard* shard) { void Transaction::UnlockMultiShardCb(absl::Span<const LockFp> fps, EngineShard* shard) {
DCHECK(multi_ && multi_->lock_mode); DCHECK(multi_ && multi_->lock_mode);
MultiReportJournalOnShard(shard);
if (multi_->mode == GLOBAL) { if (multi_->mode == GLOBAL) {
shard->shard_lock()->Release(IntentLock::EXCLUSIVE); shard->shard_lock()->Release(IntentLock::EXCLUSIVE);
} else { } else {
@ -1402,37 +1353,15 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult resul
} }
// Record to journal autojournal commands, here we allow await which anables writing to sync // Record to journal autojournal commands, here we allow await which anables writing to sync
// the journal change. // the journal change.
LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false, true); LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, true);
} }
void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload,
uint32_t shard_cnt, bool multi_commands, uint32_t shard_cnt, bool allow_await) const {
bool allow_await) const {
auto journal = shard->journal(); auto journal = shard->journal();
CHECK(journal); CHECK(journal);
journal->RecordEntry(txid_, journal::Op::COMMAND, db_index_, shard_cnt,
if (multi_) { unique_slot_checker_.GetUniqueSlotId(), std::move(payload), allow_await);
if (multi_->role != SQUASHED_STUB)
multi_->shard_journal_write[shard->shard_id()] = true;
else
multi_->shard_journal_write[0] = true;
}
bool is_multi = multi_commands || IsAtomicMulti();
auto opcode = is_multi ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND;
journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, unique_slot_checker_.GetUniqueSlotId(),
std::move(payload), allow_await);
}
void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const {
if (multi_) {
return;
}
auto journal = shard->journal();
CHECK(journal);
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt,
unique_slot_checker_.GetUniqueSlotId(), {}, false);
} }
void Transaction::ReviveAutoJournal() { void Transaction::ReviveAutoJournal() {

View file

@ -229,10 +229,6 @@ class Transaction {
// Start multi in NON_ATOMIC mode. // Start multi in NON_ATOMIC mode.
void StartMultiNonAtomic(); void StartMultiNonAtomic();
// Report which shards had write commands that executed on stub transactions
// and thus did not mark itself in MultiData::shard_journal_write.
void ReportWritesSquashedMulti(absl::FunctionRef<bool(ShardId)> had_write);
// Unlock key locks of a multi transaction. // Unlock key locks of a multi transaction.
void UnlockMulti(); void UnlockMulti();
@ -325,14 +321,9 @@ class Transaction {
// to it must not block. // to it must not block.
void PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdArgList args); void PrepareMultiForScheduleSingleHop(ShardId sid, DbIndex db, CmdArgList args);
// Write a journal entry to a shard journal with the given payload. When logging a non-automatic // Write a journal entry to a shard journal with the given payload.
// journal command, multiple journal entries may be necessary. In this case, call with set
// multi_commands to true and call the FinishLogJournalOnShard function after logging the final
// entry.
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, uint32_t shard_cnt, void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, uint32_t shard_cnt,
bool multi_commands, bool allow_await) const; bool allow_await) const;
void FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const;
// Re-enable auto journal for commands marked as NO_AUTOJOURNAL. Call during setup. // Re-enable auto journal for commands marked as NO_AUTOJOURNAL. Call during setup.
void ReviveAutoJournal(); void ReviveAutoJournal();
@ -343,9 +334,6 @@ class Transaction {
// Get keys multi transaction was initialized with, normalized and unique // Get keys multi transaction was initialized with, normalized and unique
const absl::flat_hash_set<std::pair<ShardId, LockFp>>& GetMultiFps() const; const absl::flat_hash_set<std::pair<ShardId, LockFp>>& GetMultiFps() const;
// Send journal EXEC opcode after a series of MULTI commands on the currently active shard
void FIX_ConcludeJournalExec();
// Print in-dept failure state for debugging. // Print in-dept failure state for debugging.
std::string DEBUG_PrintFailState(ShardId sid) const; std::string DEBUG_PrintFailState(ShardId sid) const;
@ -442,13 +430,6 @@ class Transaction {
bool concluding = false; bool concluding = false;
unsigned cmd_seq_num = 0; // used for debugging purposes. unsigned cmd_seq_num = 0; // used for debugging purposes.
unsigned shard_journal_cnt;
// The shard_journal_write vector variable is used to determine the number of shards
// involved in a multi-command transaction. This information is utilized by replicas when
// executing multi-command. For every write to a shard journal, the corresponding index in the
// vector is marked as true.
absl::InlinedVector<bool, 4> shard_journal_write;
}; };
enum CoordinatorState : uint8_t { enum CoordinatorState : uint8_t {
@ -543,9 +524,6 @@ class Transaction {
// Set time_now_ms_ // Set time_now_ms_
void InitTxTime(); void InitTxTime();
// If journaling is enabled, report final exec opcode to finish the chain of commands.
void MultiReportJournalOnShard(EngineShard* shard) const;
void UnlockMultiShardCb(absl::Span<const LockFp> fps, EngineShard* shard); void UnlockMultiShardCb(absl::Span<const LockFp> fps, EngineShard* shard);
// In a multi-command transaction, we determine the number of shard journals that we wrote entries // In a multi-command transaction, we determine the number of shard journals that we wrote entries

View file

@ -24,21 +24,15 @@ size_t ShardArgs::Size() const {
} }
void RecordJournal(const OpArgs& op_args, string_view cmd, const ShardArgs& args, void RecordJournal(const OpArgs& op_args, string_view cmd, const ShardArgs& args,
uint32_t shard_cnt, bool multi_commands) { uint32_t shard_cnt) {
VLOG(2) << "Logging command " << cmd << " from txn " << op_args.tx->txid(); VLOG(2) << "Logging command " << cmd << " from txn " << op_args.tx->txid();
op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, multi_commands, op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, false);
false);
} }
void RecordJournal(const OpArgs& op_args, std::string_view cmd, facade::ArgSlice args, void RecordJournal(const OpArgs& op_args, std::string_view cmd, facade::ArgSlice args,
uint32_t shard_cnt, bool multi_commands) { uint32_t shard_cnt) {
VLOG(2) << "Logging command " << cmd << " from txn " << op_args.tx->txid(); VLOG(2) << "Logging command " << cmd << " from txn " << op_args.tx->txid();
op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, multi_commands, op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, false);
false);
}
void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt) {
op_args.tx->FinishLogJournalOnShard(op_args.shard, shard_cnt);
} }
void RecordExpiry(DbIndex dbid, string_view key) { void RecordExpiry(DbIndex dbid, string_view key) {

View file

@ -201,12 +201,9 @@ class ShardArgs {
// Record non auto journal command with own txid and dbid. // Record non auto journal command with own txid and dbid.
void RecordJournal(const OpArgs& op_args, std::string_view cmd, const ShardArgs& args, void RecordJournal(const OpArgs& op_args, std::string_view cmd, const ShardArgs& args,
uint32_t shard_cnt = 1, bool multi_commands = false); uint32_t shard_cnt = 1);
void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args, void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args,
uint32_t shard_cnt = 1, bool multi_commands = false); uint32_t shard_cnt = 1);
// Record non auto journal command finish. Call only when command translates to multi commands.
void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt);
// Record expiry in journal with independent transaction. Must be called from shard thread holding // Record expiry in journal with independent transaction. Must be called from shard thread holding
// key. // key.