feat(server): remove multi shard sync from replication (#3085)

feat server: remove multi shard sync from replication

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2024-05-27 14:44:37 +03:00 committed by GitHub
parent 3474eebbb7
commit c45f7bfea5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 54 additions and 164 deletions

View file

@ -60,25 +60,21 @@ runs:
# used by PyTests
export DRAGONFLY_PATH="${GITHUB_WORKSPACE}/${{inputs.build-folder-name}}/${{inputs.dfly-executable}}"
run_pytest_with_args() {
timeout 20m pytest -m "${{inputs.filter}}" --durations=10 --color=yes --json-report \
--json-report-file=rep1_report.json dragonfly/replication_test.py --log-cli-level=INFO \
--df alsologtostderr $1 $2 || code=$?
# timeout returns 124 if we exceeded the timeout duration
if [[ $code -eq 124 ]]; then
echo "TIMEDOUT=1">> "$GITHUB_OUTPUT"
exit 1
fi
timeout 20m pytest -m "${{inputs.filter}}" --durations=10 --color=yes --json-report \
--json-report-file=rep1_report.json dragonfly/replication_test.py --log-cli-level=INFO \
--df alsologtostderr $1 $2 || code=$?
# when a test fails in pytest it returns 1 but there are other return codes as well so we just check if the code is non zero
if [[ $code -ne 0 ]]; then
exit 1
fi
}
# timeout returns 124 if we exceeded the timeout duration
if [[ $code -eq 124 ]]; then
echo "TIMEDOUT=1">> "$GITHUB_OUTPUT"
exit 1
fi
(run_pytest_with_args --df enable_multi_shard_sync=true)
(run_pytest_with_args --df enable_multi_shard_sync=false)
# when a test fails in pytest it returns 1 but there are other return codes as well so we just check if the code is non zero
if [[ $code -ne 0 ]]; then
exit 1
fi
- name: Print last log on timeout
if: failure()

View file

@ -38,7 +38,7 @@ class ClusterShardMigration {
socket_ = nullptr;
});
JournalReader reader{source, 0};
TransactionReader tx_reader{false};
TransactionReader tx_reader;
while (!cntx->IsCancelled()) {
auto tx_data = tx_reader.NextTxData(&reader, cntx);
@ -89,10 +89,10 @@ class ClusterShardMigration {
CHECK(tx_data.shard_cnt <= 1); // we don't support sync for multishard execution
if (!tx_data.IsGlobalCmd()) {
VLOG(3) << "Execute cmd without sync between shards. txid: " << tx_data.txid;
executor_.Execute(tx_data.dbid, absl::MakeSpan(tx_data.commands));
executor_.Execute(tx_data.dbid, tx_data.command);
} else {
// TODO check which global commands should be supported
CHECK(false) << "We don't support command: " << ToSV(tx_data.commands.front().cmd_args[0])
CHECK(false) << "We don't support command: " << ToSV(tx_data.command.cmd_args[0])
<< "in cluster migration process.";
}
}

View file

@ -12,8 +12,6 @@
using namespace std;
using namespace facade;
ABSL_DECLARE_FLAG(bool, enable_multi_shard_sync);
namespace dfly {
bool MultiShardExecution::InsertTxToSharedMap(TxId txid, uint32_t shard_cnt) {
@ -50,7 +48,6 @@ void MultiShardExecution::CancelAllBlockingEntities() {
}
void TransactionData::AddEntry(journal::ParsedEntry&& entry) {
++journal_rec_count;
opcode = entry.opcode;
switch (entry.opcode) {
@ -63,7 +60,7 @@ void TransactionData::AddEntry(journal::ParsedEntry&& entry) {
case journal::Op::EXPIRED:
case journal::Op::COMMAND:
case journal::Op::MULTI_COMMAND:
commands.push_back(std::move(entry.cmd));
command = std::move(entry.cmd);
[[fallthrough]];
case journal::Op::EXEC:
shard_cnt = entry.shard_cnt;
@ -76,11 +73,6 @@ void TransactionData::AddEntry(journal::ParsedEntry&& entry) {
}
bool TransactionData::IsGlobalCmd() const {
if (commands.size() > 1) {
return false;
}
auto& command = commands.front();
if (command.cmd_args.empty()) {
return false;
}
@ -96,7 +88,7 @@ bool TransactionData::IsGlobalCmd() const {
return false;
}
TransactionData TransactionData::FromSingle(journal::ParsedEntry&& entry) {
TransactionData TransactionData::FromEntry(journal::ParsedEntry&& entry) {
TransactionData data;
data.AddEntry(std::move(entry));
return data;
@ -116,35 +108,14 @@ std::optional<TransactionData> TransactionReader::NextTxData(JournalReader* read
VLOG(2) << "read lsn: " << *lsn_;
}
// Check if journal command can be executed right away.
// Expiration checks lock on master, so it never conflicts with running multi transactions.
if (res->opcode == journal::Op::EXPIRED || res->opcode == journal::Op::COMMAND ||
res->opcode == journal::Op::PING || res->opcode == journal::Op::FIN ||
res->opcode == journal::Op::LSN ||
(res->opcode == journal::Op::MULTI_COMMAND && !accumulate_multi_)) {
TransactionData tx_data = TransactionData::FromSingle(std::move(res.value()));
if (lsn_.has_value() && tx_data.opcode == journal::Op::LSN) {
DCHECK_NE(tx_data.lsn, 0u);
LOG_IF_EVERY_N(WARNING, tx_data.lsn != *lsn_, 10000)
<< "master lsn:" << tx_data.lsn << " replica lsn" << *lsn_;
DCHECK_EQ(tx_data.lsn, *lsn_);
}
return tx_data;
}
// Otherwise, continue building multi command.
DCHECK(res->opcode == journal::Op::MULTI_COMMAND || res->opcode == journal::Op::EXEC);
DCHECK(res->txid > 0 || res->shard_cnt == 1);
auto txid = res->txid;
auto& txdata = current_[txid];
txdata.AddEntry(std::move(res.value()));
// accumulate multi until we get exec opcode.
if (txdata.opcode == journal::Op::EXEC) {
auto out = std::move(txdata);
current_.erase(txid);
return out;
TransactionData tx_data = TransactionData::FromEntry(std::move(res.value()));
if (lsn_.has_value() && tx_data.opcode == journal::Op::LSN) {
DCHECK_NE(tx_data.lsn, 0u);
LOG_IF_EVERY_N(WARNING, tx_data.lsn != *lsn_, 10000)
<< "master lsn:" << tx_data.lsn << " replica lsn" << *lsn_;
DCHECK_EQ(tx_data.lsn, *lsn_);
}
return tx_data;
}
return std::nullopt;

View file

@ -43,13 +43,13 @@ struct TransactionData {
bool IsGlobalCmd() const;
static TransactionData FromSingle(journal::ParsedEntry&& entry);
static TransactionData FromEntry(journal::ParsedEntry&& entry);
TxId txid{0};
DbIndex dbid{0};
uint32_t shard_cnt{0};
absl::InlinedVector<journal::ParsedEntry::CmdData, 1> commands{0};
uint32_t journal_rec_count{0}; // Count number of source entries to check offset.
journal::ParsedEntry::CmdData command;
journal::Op opcode = journal::Op::NOOP;
uint64_t lsn = 0;
};
@ -58,15 +58,11 @@ struct TransactionData {
// The journal stream can contain interleaved data for multiple multi transactions,
// expiries and out of order executed transactions that need to be grouped on the replica side.
struct TransactionReader {
TransactionReader(bool accumulate_multi, std::optional<uint64_t> lsn = std::nullopt)
: accumulate_multi_(accumulate_multi), lsn_(lsn) {
TransactionReader(std::optional<uint64_t> lsn = std::nullopt) : lsn_(lsn) {
}
std::optional<TransactionData> NextTxData(JournalReader* reader, Context* cntx);
private:
// Stores ongoing multi transaction data.
absl::flat_hash_map<TxId, TransactionData> current_;
bool accumulate_multi_ = false;
std::optional<uint64_t> lsn_ = 0;
};

View file

@ -33,8 +33,6 @@ extern "C" {
#include "strings/human_readable.h"
ABSL_FLAG(int, replication_acks_interval, 3000, "Interval between acks in milliseconds.");
ABSL_FLAG(bool, enable_multi_shard_sync, false,
"Execute multi shards commands on replica synchronized");
ABSL_FLAG(int, master_connect_timeout_ms, 20000,
"Timeout for establishing connection to a replication master");
ABSL_FLAG(int, master_reconnect_timeout_ms, 1000,
@ -746,10 +744,6 @@ error_code DflyShardReplica::StartStableSyncFlow(Context* cntx) {
sync_fb_ =
fb2::Fiber("shard_stable_sync_read", &DflyShardReplica::StableSyncDflyReadFb, this, cntx);
if (use_multi_shard_exe_sync_) {
execution_fb_ =
fb2::Fiber("shard_stable_sync_exec", &DflyShardReplica::StableSyncDflyExecFb, this, cntx);
}
return std::error_code{};
}
@ -816,20 +810,13 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) {
JournalReader reader{&ps, 0};
DCHECK_GE(journal_rec_executed_, 1u);
TransactionReader tx_reader{use_multi_shard_exe_sync_,
journal_rec_executed_.load(std::memory_order_relaxed) - 1};
TransactionReader tx_reader{journal_rec_executed_.load(std::memory_order_relaxed) - 1};
if (master_context_.version > DflyVersion::VER0) {
acks_fb_ = fb2::Fiber("shard_acks", &DflyShardReplica::StableSyncDflyAcksFb, this, cntx);
}
while (!cntx->IsCancelled()) {
shard_replica_waker_.await([&]() {
return ((trans_data_queue_.size() < kYieldAfterItemsInQueue) || cntx->IsCancelled());
});
if (cntx->IsCancelled())
break;
auto tx_data = tx_reader.NextTxData(&reader, cntx);
if (!tx_data)
break;
@ -841,20 +828,10 @@ void DflyShardReplica::StableSyncDflyReadFb(Context* cntx) {
force_ping_ = true;
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
} else if (tx_data->opcode == journal::Op::EXEC) {
if (use_multi_shard_exe_sync_) {
InsertTxDataToShardResource(std::move(*tx_data));
} else {
// On no shard sync mode we execute multi commands once they are recieved, therefor when
// receiving exec opcode, we only increase the journal counting.
DCHECK_EQ(tx_data->commands.size(), 0u);
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
}
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
} else {
if (use_multi_shard_exe_sync_) {
InsertTxDataToShardResource(std::move(*tx_data));
} else {
ExecuteTxWithNoShardSync(std::move(*tx_data), cntx);
}
ExecuteTx(std::move(*tx_data), cntx);
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
}
shard_replica_waker_.notifyAll();
}
@ -923,7 +900,6 @@ DflyShardReplica::DflyShardReplica(ServerContext server_context, MasterContext m
master_context_(master_context),
multi_shard_exe_(multi_shard_exe),
flow_id_(flow_id) {
use_multi_shard_exe_sync_ = GetFlag(FLAGS_enable_multi_shard_sync);
executor_ = std::make_unique<JournalExecutor>(service);
rdb_loader_ = std::make_unique<RdbLoader>(&service_);
}
@ -932,53 +908,19 @@ DflyShardReplica::~DflyShardReplica() {
JoinFlow();
}
void DflyShardReplica::ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx) {
void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, Context* cntx) {
if (cntx->IsCancelled()) {
return;
}
bool was_insert = tx_data.IsGlobalCmd() &&
multi_shard_exe_->InsertTxToSharedMap(tx_data.txid, tx_data.shard_cnt);
ExecuteTx(std::move(tx_data), was_insert, cntx);
}
void DflyShardReplica::InsertTxDataToShardResource(TransactionData&& tx_data) {
bool was_insert = false;
if (tx_data.shard_cnt > 1) {
was_insert = multi_shard_exe_->InsertTxToSharedMap(tx_data.txid, tx_data.shard_cnt);
}
VLOG(2) << "txid: " << tx_data.txid << " pushed to queue";
trans_data_queue_.emplace(std::move(tx_data), was_insert);
}
void DflyShardReplica::StableSyncDflyExecFb(Context* cntx) {
while (!cntx->IsCancelled()) {
shard_replica_waker_.await(
[&]() { return (!trans_data_queue_.empty() || cntx->IsCancelled()); });
if (cntx->IsCancelled()) {
return;
}
DCHECK(!trans_data_queue_.empty());
auto& data = trans_data_queue_.front();
ExecuteTx(std::move(data.first), data.second, cntx);
trans_data_queue_.pop();
shard_replica_waker_.notifyAll();
}
}
void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* cntx) {
if (cntx->IsCancelled()) {
return;
}
if (tx_data.shard_cnt <= 1 || (!use_multi_shard_exe_sync_ && !tx_data.IsGlobalCmd())) {
if (!tx_data.IsGlobalCmd()) {
VLOG(2) << "Execute cmd without sync between shards. txid: " << tx_data.txid;
executor_->Execute(tx_data.dbid, absl::MakeSpan(tx_data.commands));
journal_rec_executed_.fetch_add(tx_data.journal_rec_count, std::memory_order_relaxed);
executor_->Execute(tx_data.dbid, tx_data.command);
return;
}
bool inserted_by_me = multi_shard_exe_->InsertTxToSharedMap(tx_data.txid, tx_data.shard_cnt);
auto& multi_shard_data = multi_shard_exe_->Find(tx_data.txid);
VLOG(2) << "Execute txid: " << tx_data.txid << " waiting for data in all shards";
@ -991,29 +933,23 @@ void DflyShardReplica::ExecuteTx(TransactionData&& tx_data, bool inserted_by_me,
return;
VLOG(2) << "Execute txid: " << tx_data.txid << " block wait finished";
if (tx_data.IsGlobalCmd()) {
VLOG(2) << "Execute txid: " << tx_data.txid << " global command execution";
// Wait until all shards flows get to execution step of this transaction.
multi_shard_data.barrier.Wait();
// Check if we woke up due to cancellation.
if (cntx_.IsCancelled())
return;
// Global command will be executed only from one flow fiber. This ensure corectness of data in
// replica.
if (inserted_by_me) {
executor_->Execute(tx_data.dbid, absl::MakeSpan(tx_data.commands));
}
// Wait until exection is done, to make sure we done execute next commands while the global is
// executed.
multi_shard_data.barrier.Wait();
// Check if we woke up due to cancellation.
if (cntx_.IsCancelled())
return;
} else { // Non global command will be executed by each flow fiber
VLOG(2) << "Execute txid: " << tx_data.txid << " executing shard transaction commands";
executor_->Execute(tx_data.dbid, absl::MakeSpan(tx_data.commands));
VLOG(2) << "Execute txid: " << tx_data.txid << " global command execution";
// Wait until all shards flows get to execution step of this transaction.
multi_shard_data.barrier.Wait();
// Check if we woke up due to cancellation.
if (cntx_.IsCancelled())
return;
// Global command will be executed only from one flow fiber. This ensure corectness of data in
// replica.
if (inserted_by_me) {
executor_->Execute(tx_data.dbid, tx_data.command);
}
journal_rec_executed_.fetch_add(tx_data.journal_rec_count, std::memory_order_relaxed);
// Wait until exection is done, to make sure we done execute next commands while the global is
// executed.
multi_shard_data.barrier.Wait();
// Check if we woke up due to cancellation.
if (cntx_.IsCancelled())
return;
// Erase from map can be done only after all flow fibers executed the transaction commands.
// The last fiber which will decrease the counter to 0 will be the one to erase the data from
@ -1164,7 +1100,6 @@ uint64_t DflyShardReplica::JournalExecutedCount() const {
void DflyShardReplica::JoinFlow() {
sync_fb_.JoinIfNeeded();
acks_fb_.JoinIfNeeded();
execution_fb_.JoinIfNeeded();
}
void DflyShardReplica::Cancel() {

View file

@ -203,11 +203,7 @@ class DflyShardReplica : public ProtocolClient {
void StableSyncDflyAcksFb(Context* cntx);
void StableSyncDflyExecFb(Context* cntx);
void ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* cntx);
void InsertTxDataToShardResource(TransactionData&& tx_data);
void ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx);
void ExecuteTx(TransactionData&& tx_data, Context* cntx);
uint32_t FlowId() const;
@ -219,10 +215,7 @@ class DflyShardReplica : public ProtocolClient {
std::optional<base::IoBuf> leftover_buf_;
std::queue<std::pair<TransactionData, bool>> trans_data_queue_;
static constexpr size_t kYieldAfterItemsInQueue = 50;
util::fb2::EventCount shard_replica_waker_; // waker for trans_data_queue_
bool use_multi_shard_exe_sync_;
std::unique_ptr<JournalExecutor> executor_;
std::unique_ptr<RdbLoader> rdb_loader_;
@ -242,7 +235,6 @@ class DflyShardReplica : public ProtocolClient {
size_t ack_offs_ = 0;
bool force_ping_ = false;
util::fb2::Fiber execution_fb_;
std::shared_ptr<MultiShardExecution> multi_shard_exe_;
uint32_t flow_id_ = UINT32_MAX; // Flow id if replica acts as a dfly flow.