From c45f7bfea547c78302688c8759f1155b85229d49 Mon Sep 17 00:00:00 2001 From: adiholden Date: Mon, 27 May 2024 14:44:37 +0300 Subject: [PATCH] feat(server): remove multi shard sync from replication (#3085) feat server: remove multi shard sync from replication Signed-off-by: adi_holden --- .github/actions/regression-tests/action.yml | 28 ++--- src/server/cluster/incoming_slot_migration.cc | 6 +- src/server/journal/tx_executor.cc | 47 ++----- src/server/journal/tx_executor.h | 12 +- src/server/replica.cc | 115 ++++-------------- src/server/replica.h | 10 +- 6 files changed, 54 insertions(+), 164 deletions(-) diff --git a/.github/actions/regression-tests/action.yml b/.github/actions/regression-tests/action.yml index 5bc8e5f12..bafa6a1b7 100644 --- a/.github/actions/regression-tests/action.yml +++ b/.github/actions/regression-tests/action.yml @@ -60,25 +60,21 @@ runs: # used by PyTests export DRAGONFLY_PATH="${GITHUB_WORKSPACE}/${{inputs.build-folder-name}}/${{inputs.dfly-executable}}" - run_pytest_with_args() { - timeout 20m pytest -m "${{inputs.filter}}" --durations=10 --color=yes --json-report \ - --json-report-file=rep1_report.json dragonfly/replication_test.py --log-cli-level=INFO \ - --df alsologtostderr $1 $2 || code=$? - # timeout returns 124 if we exceeded the timeout duration - if [[ $code -eq 124 ]]; then - echo "TIMEDOUT=1">> "$GITHUB_OUTPUT" - exit 1 - fi + timeout 20m pytest -m "${{inputs.filter}}" --durations=10 --color=yes --json-report \ + --json-report-file=rep1_report.json dragonfly/replication_test.py --log-cli-level=INFO \ + --df alsologtostderr $1 $2 || code=$? - # when a test fails in pytest it returns 1 but there are other return codes as well so we just check if the code is non zero - if [[ $code -ne 0 ]]; then - exit 1 - fi - } + # timeout returns 124 if we exceeded the timeout duration + if [[ $code -eq 124 ]]; then + echo "TIMEDOUT=1">> "$GITHUB_OUTPUT" + exit 1 + fi - (run_pytest_with_args --df enable_multi_shard_sync=true) - (run_pytest_with_args --df enable_multi_shard_sync=false) + # when a test fails in pytest it returns 1 but there are other return codes as well so we just check if the code is non zero + if [[ $code -ne 0 ]]; then + exit 1 + fi - name: Print last log on timeout if: failure() diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index d4ea48a9a..61cb5ce77 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -38,7 +38,7 @@ class ClusterShardMigration { socket_ = nullptr; }); JournalReader reader{source, 0}; - TransactionReader tx_reader{false}; + TransactionReader tx_reader; while (!cntx->IsCancelled()) { auto tx_data = tx_reader.NextTxData(&reader, cntx); @@ -89,10 +89,10 @@ class ClusterShardMigration { CHECK(tx_data.shard_cnt <= 1); // we don't support sync for multishard execution if (!tx_data.IsGlobalCmd()) { VLOG(3) << "Execute cmd without sync between shards. txid: " << tx_data.txid; - executor_.Execute(tx_data.dbid, absl::MakeSpan(tx_data.commands)); + executor_.Execute(tx_data.dbid, tx_data.command); } else { // TODO check which global commands should be supported - CHECK(false) << "We don't support command: " << ToSV(tx_data.commands.front().cmd_args[0]) + CHECK(false) << "We don't support command: " << ToSV(tx_data.command.cmd_args[0]) << "in cluster migration process."; } } diff --git a/src/server/journal/tx_executor.cc b/src/server/journal/tx_executor.cc index 612e60469..dec81bb5c 100644 --- a/src/server/journal/tx_executor.cc +++ b/src/server/journal/tx_executor.cc @@ -12,8 +12,6 @@ using namespace std; using namespace facade; -ABSL_DECLARE_FLAG(bool, enable_multi_shard_sync); - namespace dfly { bool MultiShardExecution::InsertTxToSharedMap(TxId txid, uint32_t shard_cnt) { @@ -50,7 +48,6 @@ void MultiShardExecution::CancelAllBlockingEntities() { } void TransactionData::AddEntry(journal::ParsedEntry&& entry) { - ++journal_rec_count; opcode = entry.opcode; switch (entry.opcode) { @@ -63,7 +60,7 @@ void TransactionData::AddEntry(journal::ParsedEntry&& entry) { case journal::Op::EXPIRED: case journal::Op::COMMAND: case journal::Op::MULTI_COMMAND: - commands.push_back(std::move(entry.cmd)); + command = std::move(entry.cmd); [[fallthrough]]; case journal::Op::EXEC: shard_cnt = entry.shard_cnt; @@ -76,11 +73,6 @@ void TransactionData::AddEntry(journal::ParsedEntry&& entry) { } bool TransactionData::IsGlobalCmd() const { - if (commands.size() > 1) { - return false; - } - - auto& command = commands.front(); if (command.cmd_args.empty()) { return false; } @@ -96,7 +88,7 @@ bool TransactionData::IsGlobalCmd() const { return false; } -TransactionData TransactionData::FromSingle(journal::ParsedEntry&& entry) { +TransactionData TransactionData::FromEntry(journal::ParsedEntry&& entry) { TransactionData data; data.AddEntry(std::move(entry)); return data; @@ -116,35 +108,14 @@ std::optional TransactionReader::NextTxData(JournalReader* read VLOG(2) << "read lsn: " << *lsn_; } - // Check if journal command can be executed right away. - // Expiration checks lock on master, so it never conflicts with running multi transactions. - if (res->opcode == journal::Op::EXPIRED || res->opcode == journal::Op::COMMAND || - res->opcode == journal::Op::PING || res->opcode == journal::Op::FIN || - res->opcode == journal::Op::LSN || - (res->opcode == journal::Op::MULTI_COMMAND && !accumulate_multi_)) { - TransactionData tx_data = TransactionData::FromSingle(std::move(res.value())); - if (lsn_.has_value() && tx_data.opcode == journal::Op::LSN) { - DCHECK_NE(tx_data.lsn, 0u); - LOG_IF_EVERY_N(WARNING, tx_data.lsn != *lsn_, 10000) - << "master lsn:" << tx_data.lsn << " replica lsn" << *lsn_; - DCHECK_EQ(tx_data.lsn, *lsn_); - } - return tx_data; - } - - // Otherwise, continue building multi command. - DCHECK(res->opcode == journal::Op::MULTI_COMMAND || res->opcode == journal::Op::EXEC); - DCHECK(res->txid > 0 || res->shard_cnt == 1); - - auto txid = res->txid; - auto& txdata = current_[txid]; - txdata.AddEntry(std::move(res.value())); - // accumulate multi until we get exec opcode. - if (txdata.opcode == journal::Op::EXEC) { - auto out = std::move(txdata); - current_.erase(txid); - return out; + TransactionData tx_data = TransactionData::FromEntry(std::move(res.value())); + if (lsn_.has_value() && tx_data.opcode == journal::Op::LSN) { + DCHECK_NE(tx_data.lsn, 0u); + LOG_IF_EVERY_N(WARNING, tx_data.lsn != *lsn_, 10000) + << "master lsn:" << tx_data.lsn << " replica lsn" << *lsn_; + DCHECK_EQ(tx_data.lsn, *lsn_); } + return tx_data; } return std::nullopt; diff --git a/src/server/journal/tx_executor.h b/src/server/journal/tx_executor.h index 4c4d99985..d4c401a37 100644 --- a/src/server/journal/tx_executor.h +++ b/src/server/journal/tx_executor.h @@ -43,13 +43,13 @@ struct TransactionData { bool IsGlobalCmd() const; - static TransactionData FromSingle(journal::ParsedEntry&& entry); + static TransactionData FromEntry(journal::ParsedEntry&& entry); TxId txid{0}; DbIndex dbid{0}; uint32_t shard_cnt{0}; - absl::InlinedVector commands{0}; - uint32_t journal_rec_count{0}; // Count number of source entries to check offset. + journal::ParsedEntry::CmdData command; + journal::Op opcode = journal::Op::NOOP; uint64_t lsn = 0; }; @@ -58,15 +58,11 @@ struct TransactionData { // The journal stream can contain interleaved data for multiple multi transactions, // expiries and out of order executed transactions that need to be grouped on the replica side. struct TransactionReader { - TransactionReader(bool accumulate_multi, std::optional lsn = std::nullopt) - : accumulate_multi_(accumulate_multi), lsn_(lsn) { + TransactionReader(std::optional lsn = std::nullopt) : lsn_(lsn) { } std::optional NextTxData(JournalReader* reader, Context* cntx); private: - // Stores ongoing multi transaction data. - absl::flat_hash_map current_; - bool accumulate_multi_ = false; std::optional lsn_ = 0; }; diff --git a/src/server/replica.cc b/src/server/replica.cc index 4cc453bdc..6fc784394 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -33,8 +33,6 @@ extern "C" { #include "strings/human_readable.h" ABSL_FLAG(int, replication_acks_interval, 3000, "Interval between acks in milliseconds."); -ABSL_FLAG(bool, enable_multi_shard_sync, false, - "Execute multi shards commands on replica synchronized"); ABSL_FLAG(int, master_connect_timeout_ms, 20000, "Timeout for establishing connection to a replication master"); ABSL_FLAG(int, master_reconnect_timeout_ms, 1000, @@ -746,10 +744,6 @@ error_code DflyShardReplica::StartStableSyncFlow(Context* cntx) { sync_fb_ = fb2::Fiber("shard_stable_sync_read", &DflyShardReplica::StableSyncDflyReadFb, this, cntx); - if (use_multi_shard_exe_sync_) { - execution_fb_ = - fb2::Fiber("shard_stable_sync_exec", &DflyShardReplica::StableSyncDflyExecFb, this, cntx); - } return std::error_code{}; } @@ -816,20 +810,13 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) { JournalReader reader{&ps, 0}; DCHECK_GE(journal_rec_executed_, 1u); - TransactionReader tx_reader{use_multi_shard_exe_sync_, - journal_rec_executed_.load(std::memory_order_relaxed) - 1}; + TransactionReader tx_reader{journal_rec_executed_.load(std::memory_order_relaxed) - 1}; if (master_context_.version > DflyVersion::VER0) { acks_fb_ = fb2::Fiber("shard_acks", &DflyShardReplica::StableSyncDflyAcksFb, this, cntx); } while (!cntx->IsCancelled()) { - shard_replica_waker_.await([&]() { - return ((trans_data_queue_.size() < kYieldAfterItemsInQueue) || cntx->IsCancelled()); - }); - if (cntx->IsCancelled()) - break; - auto tx_data = tx_reader.NextTxData(&reader, cntx); if (!tx_data) break; @@ -841,20 +828,10 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) { force_ping_ = true; journal_rec_executed_.fetch_add(1, std::memory_order_relaxed); } else if (tx_data->opcode == journal::Op::EXEC) { - if (use_multi_shard_exe_sync_) { - InsertTxDataToShardResource(std::move(*tx_data)); - } else { - // On no shard sync mode we execute multi commands once they are recieved, therefor when - // receiving exec opcode, we only increase the journal counting. - DCHECK_EQ(tx_data->commands.size(), 0u); - journal_rec_executed_.fetch_add(1, std::memory_order_relaxed); - } + journal_rec_executed_.fetch_add(1, std::memory_order_relaxed); } else { - if (use_multi_shard_exe_sync_) { - InsertTxDataToShardResource(std::move(*tx_data)); - } else { - ExecuteTxWithNoShardSync(std::move(*tx_data), cntx); - } + ExecuteTx(std::move(*tx_data), cntx); + journal_rec_executed_.fetch_add(1, std::memory_order_relaxed); } shard_replica_waker_.notifyAll(); } @@ -923,7 +900,6 @@ DflyShardReplica::DflyShardReplica(ServerContext server_context, MasterContext m master_context_(master_context), multi_shard_exe_(multi_shard_exe), flow_id_(flow_id) { - use_multi_shard_exe_sync_ = GetFlag(FLAGS_enable_multi_shard_sync); executor_ = std::make_unique(service); rdb_loader_ = std::make_unique(&service_); } @@ -932,53 +908,19 @@ DflyShardReplica::~DflyShardReplica() { JoinFlow(); } -void DflyShardReplica::ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx) { +void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, Context* cntx) { if (cntx->IsCancelled()) { return; } - bool was_insert = tx_data.IsGlobalCmd() && - multi_shard_exe_->InsertTxToSharedMap(tx_data.txid, tx_data.shard_cnt); - - ExecuteTx(std::move(tx_data), was_insert, cntx); -} - -void DflyShardReplica::InsertTxDataToShardResource(TransactionData&& tx_data) { - bool was_insert = false; - if (tx_data.shard_cnt > 1) { - was_insert = multi_shard_exe_->InsertTxToSharedMap(tx_data.txid, tx_data.shard_cnt); - } - - VLOG(2) << "txid: " << tx_data.txid << " pushed to queue"; - trans_data_queue_.emplace(std::move(tx_data), was_insert); -} - -void DflyShardReplica::StableSyncDflyExecFb(Context* cntx) { - while (!cntx->IsCancelled()) { - shard_replica_waker_.await( - [&]() { return (!trans_data_queue_.empty() || cntx->IsCancelled()); }); - if (cntx->IsCancelled()) { - return; - } - DCHECK(!trans_data_queue_.empty()); - auto& data = trans_data_queue_.front(); - ExecuteTx(std::move(data.first), data.second, cntx); - trans_data_queue_.pop(); - shard_replica_waker_.notifyAll(); - } -} - -void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* cntx) { - if (cntx->IsCancelled()) { - return; - } - if (tx_data.shard_cnt <= 1 || (!use_multi_shard_exe_sync_ && !tx_data.IsGlobalCmd())) { + if (!tx_data.IsGlobalCmd()) { VLOG(2) << "Execute cmd without sync between shards. txid: " << tx_data.txid; - executor_->Execute(tx_data.dbid, absl::MakeSpan(tx_data.commands)); - journal_rec_executed_.fetch_add(tx_data.journal_rec_count, std::memory_order_relaxed); + executor_->Execute(tx_data.dbid, tx_data.command); return; } + bool inserted_by_me = multi_shard_exe_->InsertTxToSharedMap(tx_data.txid, tx_data.shard_cnt); + auto& multi_shard_data = multi_shard_exe_->Find(tx_data.txid); VLOG(2) << "Execute txid: " << tx_data.txid << " waiting for data in all shards"; @@ -991,29 +933,23 @@ void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, return; VLOG(2) << "Execute txid: " << tx_data.txid << " block wait finished"; - if (tx_data.IsGlobalCmd()) { - VLOG(2) << "Execute txid: " << tx_data.txid << " global command execution"; - // Wait until all shards flows get to execution step of this transaction. - multi_shard_data.barrier.Wait(); - // Check if we woke up due to cancellation. - if (cntx_.IsCancelled()) - return; - // Global command will be executed only from one flow fiber. This ensure corectness of data in - // replica. - if (inserted_by_me) { - executor_->Execute(tx_data.dbid, absl::MakeSpan(tx_data.commands)); - } - // Wait until exection is done, to make sure we done execute next commands while the global is - // executed. - multi_shard_data.barrier.Wait(); - // Check if we woke up due to cancellation. - if (cntx_.IsCancelled()) - return; - } else { // Non global command will be executed by each flow fiber - VLOG(2) << "Execute txid: " << tx_data.txid << " executing shard transaction commands"; - executor_->Execute(tx_data.dbid, absl::MakeSpan(tx_data.commands)); + VLOG(2) << "Execute txid: " << tx_data.txid << " global command execution"; + // Wait until all shards flows get to execution step of this transaction. + multi_shard_data.barrier.Wait(); + // Check if we woke up due to cancellation. + if (cntx_.IsCancelled()) + return; + // Global command will be executed only from one flow fiber. This ensure corectness of data in + // replica. + if (inserted_by_me) { + executor_->Execute(tx_data.dbid, tx_data.command); } - journal_rec_executed_.fetch_add(tx_data.journal_rec_count, std::memory_order_relaxed); + // Wait until exection is done, to make sure we done execute next commands while the global is + // executed. + multi_shard_data.barrier.Wait(); + // Check if we woke up due to cancellation. + if (cntx_.IsCancelled()) + return; // Erase from map can be done only after all flow fibers executed the transaction commands. // The last fiber which will decrease the counter to 0 will be the one to erase the data from @@ -1164,7 +1100,6 @@ uint64_t DflyShardReplica::JournalExecutedCount() const { void DflyShardReplica::JoinFlow() { sync_fb_.JoinIfNeeded(); acks_fb_.JoinIfNeeded(); - execution_fb_.JoinIfNeeded(); } void DflyShardReplica::Cancel() { diff --git a/src/server/replica.h b/src/server/replica.h index dba0cc73a..6ce11f3be 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -203,11 +203,7 @@ class DflyShardReplica : public ProtocolClient { void StableSyncDflyAcksFb(Context* cntx); - void StableSyncDflyExecFb(Context* cntx); - - void ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* cntx); - void InsertTxDataToShardResource(TransactionData&& tx_data); - void ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx); + void ExecuteTx(TransactionData&& tx_data, Context* cntx); uint32_t FlowId() const; @@ -219,10 +215,7 @@ class DflyShardReplica : public ProtocolClient { std::optional leftover_buf_; - std::queue> trans_data_queue_; - static constexpr size_t kYieldAfterItemsInQueue = 50; util::fb2::EventCount shard_replica_waker_; // waker for trans_data_queue_ - bool use_multi_shard_exe_sync_; std::unique_ptr executor_; std::unique_ptr rdb_loader_; @@ -242,7 +235,6 @@ class DflyShardReplica : public ProtocolClient { size_t ack_offs_ = 0; bool force_ping_ = false; - util::fb2::Fiber execution_fb_; std::shared_ptr multi_shard_exe_; uint32_t flow_id_ = UINT32_MAX; // Flow id if replica acts as a dfly flow.