diff --git a/src/server/journal/executor.cc b/src/server/journal/executor.cc index c4f4466dc..a437aadfe 100644 --- a/src/server/journal/executor.cc +++ b/src/server/journal/executor.cc @@ -15,22 +15,16 @@ JournalExecutor::JournalExecutor(Service* service) conn_context_.journal_emulated = true; } -void JournalExecutor::Execute(std::vector& entries) { - DCHECK_GT(entries.size(), 1U); - conn_context_.conn_state.db_index = entries.front().dbid; - +void JournalExecutor::Execute(DbIndex dbid, std::vector& cmds) { + DCHECK_GT(cmds.size(), 1U); + conn_context_.conn_state.db_index = dbid; std::string multi_cmd = {"MULTI"}; auto ms = MutableSlice{&multi_cmd[0], multi_cmd.size()}; auto span = CmdArgList{&ms, 1}; service_->DispatchCommand(span, &conn_context_); - for (auto& entry : entries) { - if (entry.payload) { - DCHECK_EQ(entry.dbid, conn_context_.conn_state.db_index); - span = CmdArgList{entry.payload->data(), entry.payload->size()}; - - service_->DispatchCommand(span, &conn_context_); - } + for (auto& cmd : cmds) { + Execute(cmd); } std::string exec_cmd = {"EXEC"}; @@ -39,13 +33,14 @@ void JournalExecutor::Execute(std::vector& entries) { service_->DispatchCommand(span, &conn_context_); } -void JournalExecutor::Execute(journal::ParsedEntry& entry) { - conn_context_.conn_state.db_index = entry.dbid; - if (entry.payload) { // TODO - when this is false? - auto span = CmdArgList{entry.payload->data(), entry.payload->size()}; +void JournalExecutor::Execute(DbIndex dbid, journal::ParsedEntry::CmdData& cmd) { + conn_context_.conn_state.db_index = dbid; + Execute(cmd); +} - service_->DispatchCommand(span, &conn_context_); - } +void JournalExecutor::Execute(journal::ParsedEntry::CmdData& cmd) { + auto span = CmdArgList{cmd.cmd_args.data(), cmd.cmd_args.size()}; + service_->DispatchCommand(span, &conn_context_); } } // namespace dfly diff --git a/src/server/journal/executor.h b/src/server/journal/executor.h index 3a1085006..98a3fe50f 100644 --- a/src/server/journal/executor.h +++ b/src/server/journal/executor.h @@ -14,10 +14,11 @@ class Service; class JournalExecutor { public: JournalExecutor(Service* service); - void Execute(std::vector& entries); - void Execute(journal::ParsedEntry& entry); + void Execute(DbIndex dbid, std::vector& cmds); + void Execute(DbIndex dbid, journal::ParsedEntry::CmdData& cmd); private: + void Execute(journal::ParsedEntry::CmdData& cmd); Service* service_; ConnectionContext conn_context_; io::NullSink null_sink_; diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 11634314e..4c0496e96 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -115,6 +115,10 @@ void Journal::RecordEntry(const Entry& entry) { journal_slice.AddLogRecord(entry); } +TxId Journal::GetLastTxId() { + return journal_slice.GetLastTxId(); +} + /* void Journal::OpArgs(TxId txid, Op opcode, Span keys) { DCHECK(journal_slice.IsOpen()); diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index 5434d9442..2631f968e 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -54,6 +54,7 @@ class Journal { LSN GetLsn() const; void RecordEntry(const Entry& entry); + TxId GetLastTxId(); private: mutable boost::fibers::mutex state_mu_; diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index 22d06f5ca..b982b054c 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -117,7 +117,7 @@ error_code JournalSlice::Close() { void JournalSlice::AddLogRecord(const Entry& entry) { DCHECK(ring_buffer_); - + last_txid_ = entry.txid; iterating_cb_arr_ = true; for (const auto& k_v : change_cb_arr_) { k_v.second(entry); diff --git a/src/server/journal/journal_slice.h b/src/server/journal/journal_slice.h index 2129d24c7..8971fc2c5 100644 --- a/src/server/journal/journal_slice.h +++ b/src/server/journal/journal_slice.h @@ -47,6 +47,10 @@ class JournalSlice { uint32_t RegisterOnChange(ChangeCallback cb); void UnregisterOnChange(uint32_t); + TxId GetLastTxId() { + return last_txid_; + } + private: struct RingItem; @@ -62,7 +66,7 @@ class JournalSlice { uint32_t slice_index_ = UINT32_MAX; uint32_t next_cb_id_ = 1; - + TxId last_txid_ = 0; std::error_code status_ec_; bool lameduck_ = false; diff --git a/src/server/journal/serializer.cc b/src/server/journal/serializer.cc index b35ed1216..732472f36 100644 --- a/src/server/journal/serializer.cc +++ b/src/server/journal/serializer.cc @@ -42,6 +42,11 @@ void JournalWriter::Write(std::string_view sv) { void JournalWriter::Write(CmdArgList args) { Write(args.size()); + size_t cmd_size = 0; + for (auto v : args) { + cmd_size += v.size(); + } + Write(cmd_size); for (auto v : args) Write(facade::ToSV(v)); } @@ -50,6 +55,13 @@ void JournalWriter::Write(std::pair args) { auto [cmd, tail_args] = args; Write(1 + tail_args.size()); + + size_t cmd_size = cmd.size(); + for (auto v : tail_args) { + cmd_size += v.size(); + } + Write(cmd_size); + Write(cmd); for (auto v : tail_args) Write(v); @@ -71,6 +83,8 @@ void JournalWriter::Write(const journal::Entry& entry) { case journal::Op::SELECT: return Write(entry.dbid); case journal::Op::COMMAND: + case journal::Op::MULTI_COMMAND: + case journal::Op::EXEC: Write(entry.txid); Write(entry.shard_cnt); return std::visit([this](const auto& payload) { return Write(payload); }, entry.payload); @@ -80,7 +94,7 @@ void JournalWriter::Write(const journal::Entry& entry) { } JournalReader::JournalReader(io::Source* source, DbIndex dbid) - : str_buf_{}, source_{source}, buf_{4096}, dbid_{dbid} { + : source_{source}, buf_{4_KB}, dbid_{dbid} { } void JournalReader::SetDb(DbIndex dbid) { @@ -134,63 +148,63 @@ template io::Result JournalReader::ReadUInt(); template io::Result JournalReader::ReadUInt(); template io::Result JournalReader::ReadUInt(); -io::Result JournalReader::ReadString() { +io::Result JournalReader::ReadString(char* buffer) { size_t size = 0; SET_OR_UNEXPECT(ReadUInt(), size); if (auto ec = EnsureRead(size); ec) return make_unexpected(ec); - unsigned offset = str_buf_.size(); - str_buf_.resize(offset + size); - buf_.ReadAndConsume(size, str_buf_.data() + offset); + buf_.ReadAndConsume(size, buffer); return size; } -std::error_code JournalReader::Read(CmdArgVec* vec) { +std::error_code JournalReader::ReadCommand(journal::ParsedEntry::CmdData* data) { size_t num_strings = 0; SET_OR_RETURN(ReadUInt(), num_strings); - vec->resize(num_strings); + data->cmd_args.resize(num_strings); + + size_t cmd_size = 0; + SET_OR_RETURN(ReadUInt(), cmd_size); // Read all strings consecutively. - str_buf_.clear(); - for (auto& span : *vec) { + data->command_buf = make_unique(cmd_size); + char* ptr = data->command_buf.get(); + for (auto& span : data->cmd_args) { size_t size; - SET_OR_RETURN(ReadString(), size); - span = MutableSlice{nullptr, size}; + SET_OR_RETURN(ReadString(ptr), size); + span = MutableSlice{ptr, size}; + ptr += size; } - - // Set span pointers, now that string buffer won't reallocate. - char* ptr = str_buf_.data(); - for (auto& span : *vec) { - span = {ptr, span.size()}; - ptr += span.size(); - } - return std::error_code{}; } io::Result JournalReader::ReadEntry() { - uint8_t opcode; - SET_OR_UNEXPECT(ReadUInt(), opcode); + uint8_t int_op; + SET_OR_UNEXPECT(ReadUInt(), int_op); + journal::Op opcode = static_cast(int_op); - journal::ParsedEntry entry{static_cast(opcode), dbid_}; + if (opcode == journal::Op::SELECT) { + SET_OR_UNEXPECT(ReadUInt(), dbid_); + return ReadEntry(); + } + + journal::ParsedEntry entry; + entry.dbid = dbid_; + entry.opcode = opcode; + + SET_OR_UNEXPECT(ReadUInt(), entry.txid); + SET_OR_UNEXPECT(ReadUInt(), entry.shard_cnt); + + if (opcode == journal::Op::EXEC) { + return entry; + } + + auto ec = ReadCommand(&entry.cmd); + if (ec) + return make_unexpected(ec); - switch (entry.opcode) { - case journal::Op::COMMAND: - SET_OR_UNEXPECT(ReadUInt(), entry.txid); - SET_OR_UNEXPECT(ReadUInt(), entry.shard_cnt); - entry.payload = CmdArgVec{}; - if (auto ec = Read(&*entry.payload); ec) - return make_unexpected(ec); - break; - case journal::Op::SELECT: - SET_OR_UNEXPECT(ReadUInt(), dbid_); - return ReadEntry(); - default: - break; - }; return entry; } diff --git a/src/server/journal/serializer.h b/src/server/journal/serializer.h index 574322cb3..880f07772 100644 --- a/src/server/journal/serializer.h +++ b/src/server/journal/serializer.h @@ -63,14 +63,13 @@ struct JournalReader { // Read unsigned integer in packed encoding. template io::Result ReadUInt(); - // Read and append string to string buffer, return size. - io::Result ReadString(); + // Read and copy to buffer, return size. + io::Result ReadString(char* buffer); // Read argument array into string buffer. - std::error_code Read(CmdArgVec* vec); + std::error_code ReadCommand(journal::ParsedEntry::CmdData* entry); private: - std::string str_buf_; // last parsed entry points here io::Source* source_; base::IoBuf buf_; DbIndex dbid_; diff --git a/src/server/journal/types.h b/src/server/journal/types.h index 40bcde1ea..b228665c1 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -16,6 +16,8 @@ enum class Op : uint8_t { NOOP = 0, SELECT = 6, COMMAND = 10, + MULTI_COMMAND = 11, + EXEC = 12, }; struct EntryBase { @@ -35,30 +37,25 @@ struct Entry : public EntryBase { std::pair // Command and its shard parts. >; - Entry(TxId txid, DbIndex dbid, Payload pl, uint32_t shard_cnt) - : EntryBase{txid, journal::Op::COMMAND, dbid, shard_cnt}, payload{pl} { + Entry(TxId txid, Op opcode, DbIndex dbid, uint32_t shard_cnt, Payload pl) + : EntryBase{txid, opcode, dbid, shard_cnt}, payload{pl} { } Entry(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid, 0}, payload{} { } + Entry(TxId txid, journal::Op opcode, DbIndex dbid, uint32_t shard_cnt) + : EntryBase{txid, opcode, dbid, shard_cnt}, payload{} { + } Payload payload; }; struct ParsedEntry : public EntryBase { - // Payload represents the parsed command. - using Payload = std::optional; - - ParsedEntry() = default; - - ParsedEntry(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid, 0}, payload{} { - } - - ParsedEntry(TxId txid, DbIndex dbid, Payload pl, uint32_t shard_cnt) - : EntryBase{txid, journal::Op::COMMAND, dbid, shard_cnt}, payload{pl} { - } - - Payload payload; + struct CmdData { + std::unique_ptr command_buf; + CmdArgVec cmd_args; // represents the parsed command. + }; + CmdData cmd; }; using ChangeCallback = std::function; diff --git a/src/server/journal_test.cc b/src/server/journal_test.cc index 6c6bf8b89..62a70e104 100644 --- a/src/server/journal_test.cc +++ b/src/server/journal_test.cc @@ -47,7 +47,8 @@ struct EntryPayloadVisitor { std::string ExtractPayload(journal::ParsedEntry& entry) { std::string out; EntryPayloadVisitor visitor{&out}; - CmdArgList list{entry.payload->data(), entry.payload->size()}; + + CmdArgList list{entry.cmd.cmd_args.data(), entry.cmd.cmd_args.size()}; visitor(list); if (out.size() > 0 && out.back() == ' ') @@ -97,13 +98,15 @@ TEST(Journal, WriteRead) { auto list = [v = &lists](auto... ss) { return StoreList(v, ss...); }; std::vector test_entries = { - {0, 0, make_pair("MSET", slice("A", "1", "B", "2")), 2}, - {0, 0, make_pair("MSET", slice("C", "3")), 2}, - {1, 0, list("DEL", "A", "B"), 2}, - {2, 1, list("LPUSH", "l", "v1", "v2"), 1}, - {3, 0, make_pair("MSET", slice("D", "4")), 1}, - {4, 1, list("DEL", "l1"), 1}, - {5, 2, list("SET", "E", "2"), 1}}; + {0, journal::Op::COMMAND, 0, 2, make_pair("MSET", slice("A", "1", "B", "2"))}, + {0, journal::Op::COMMAND, 0, 2, make_pair("MSET", slice("C", "3"))}, + {1, journal::Op::COMMAND, 0, 2, list("DEL", "A", "B")}, + {2, journal::Op::COMMAND, 1, 1, list("LPUSH", "l", "v1", "v2")}, + {3, journal::Op::COMMAND, 0, 1, make_pair("MSET", slice("D", "4"))}, + {4, journal::Op::COMMAND, 1, 1, list("DEL", "l1")}, + {5, journal::Op::COMMAND, 2, 1, list("SET", "E", "2")}, + {6, journal::Op::MULTI_COMMAND, 2, 1, list("SET", "E", "2")}, + {6, journal::Op::EXEC, 2, 1}}; // Write all entries to string file. JournalWriter writer{}; diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 30cedc473..abdb02dd2 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -1961,7 +1961,9 @@ error_code RdbLoaderBase::HandleJournalBlob(Service* service, DbIndex dbid) { while (done < num_entries) { journal::ParsedEntry entry{}; SET_OR_RETURN(journal_reader_.ReadEntry(), entry); - ex.Execute(entry); + if (entry.opcode == journal::Op::COMMAND || entry.opcode == journal::Op::MULTI_COMMAND) { + ex.Execute(entry.dbid, entry.cmd); + } done++; } diff --git a/src/server/replica.cc b/src/server/replica.cc index e815d3215..219e53f1f 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -751,21 +751,32 @@ void Replica::StableSyncDflyFb(Context* cntx) { JournalReader reader{&ps, 0}; JournalExecutor executor{&service_}; + while (!cntx->IsCancelled()) { - auto res = reader.ReadEntry(); - if (!res) { - cntx->ReportError(res.error(), "Journal format error"); - return; + TranactionData tx_data; + while (!cntx->IsCancelled()) { + auto res = reader.ReadEntry(); + if (!res) { + cntx->ReportError(res.error(), "Journal format error"); + return; + } + bool should_execute = tx_data.UpdateFromParsedEntry(std::move(*res)); + if (should_execute == true) { + break; + } } - ExecuteEntry(&executor, std::move(res.value())); + ExecuteCmd(&executor, std::move(tx_data), cntx); last_io_time_ = sock_->proactor()->GetMonotonicTimeNs(); } return; } -void Replica::ExecuteEntry(JournalExecutor* executor, journal::ParsedEntry&& entry) { - if (entry.shard_cnt <= 1) { // not multi shard cmd - executor->Execute(entry); +void Replica::ExecuteCmd(JournalExecutor* executor, TranactionData&& tx_data, Context* cntx) { + if (cntx->IsCancelled()) { + return; + } + if (tx_data.shard_cnt <= 1) { // not multi shard cmd + executor->Execute(tx_data.dbid, tx_data.commands.front()); return; } @@ -781,17 +792,20 @@ void Replica::ExecuteEntry(JournalExecutor* executor, journal::ParsedEntry&& ent // Only the first fiber to reach the transaction will create data for transaction in map multi_shard_exe_->map_mu.lock(); - auto [it, was_insert] = multi_shard_exe_->tx_sync_execution.emplace(entry.txid, entry.shard_cnt); - VLOG(2) << "txid: " << entry.txid << " unique_shard_cnt_: " << entry.shard_cnt + auto [it, was_insert] = + multi_shard_exe_->tx_sync_execution.emplace(tx_data.txid, tx_data.shard_cnt); + VLOG(2) << "txid: " << tx_data.txid << " unique_shard_cnt_: " << tx_data.shard_cnt << " was_insert: " << was_insert; - TxId txid = entry.txid; + TxId txid = tx_data.txid; // entries_vec will store all entries of trasaction and will be executed by the fiber that // inserted the txid to map. In case of global command the inserting fiber will executed his // entry. - bool global_cmd = (entry.payload.value().size() == 1); + bool global_cmd = (tx_data.commands.size() == 1 && tx_data.commands.front().cmd_args.size() == 1); if (!global_cmd) { - it->second.entries_vec.push_back(std::move(entry)); + for (auto& cmd : tx_data.commands) { + it->second.commands.push_back(std::move(cmd)); + } } auto& tx_sync = it->second; @@ -800,14 +814,17 @@ void Replica::ExecuteEntry(JournalExecutor* executor, journal::ParsedEntry&& ent // step 1 tx_sync.barrier.wait(); + // step 2 if (was_insert) { if (global_cmd) { - executor->Execute(entry); + executor->Execute(tx_data.dbid, tx_data.commands.front()); + } else { - executor->Execute(tx_sync.entries_vec); + executor->Execute(tx_data.dbid, tx_sync.commands); } } + // step 3 tx_sync.barrier.wait(); @@ -1049,4 +1066,25 @@ error_code Replica::SendCommand(string_view command, ReqSerializer* serializer) return ec; } +bool Replica::TranactionData::UpdateFromParsedEntry(journal::ParsedEntry&& entry) { + if (entry.opcode == journal::Op::EXEC) { + shard_cnt = entry.shard_cnt; + dbid = entry.dbid; + txid = entry.txid; + return true; + } else if (entry.opcode == journal::Op::COMMAND) { + txid = entry.txid; + shard_cnt = entry.shard_cnt; + dbid = entry.dbid; + commands.push_back(std::move(entry.cmd)); + return true; + } else if (entry.opcode == journal::Op::MULTI_COMMAND) { + commands.push_back(std::move(entry.cmd)); + return false; + } else { + DCHECK(false) << "Unsupported opcode"; + } + return false; +} + } // namespace dfly diff --git a/src/server/replica.h b/src/server/replica.h index 9940fc74a..feac52fed 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -50,6 +50,17 @@ class Replica { R_SYNC_OK = 0x10, }; + // This class holds the commands of transaction in single shard. + // Once all commands recieved the command can be executed. + struct TranactionData { + TxId txid; + uint32_t shard_cnt; + DbIndex dbid; + std::vector commands; + // Update the data from ParsedEntry and return if its ready for execution. + bool UpdateFromParsedEntry(journal::ParsedEntry&& entry); + }; + struct MultiShardExecution { boost::fibers::mutex map_mu; @@ -58,7 +69,7 @@ class Replica { std::atomic_uint32_t counter; TxExecutionSync(uint32_t counter) : barrier(counter), counter(counter) { } - std::vector entries_vec; + std::vector commands; }; std::unordered_map tx_sync_execution; @@ -142,7 +153,7 @@ class Replica { // Send command, update last_io_time, return error. std::error_code SendCommand(std::string_view command, facade::ReqSerializer* serializer); - void ExecuteEntry(JournalExecutor* executor, journal::ParsedEntry&& entry); + void ExecuteCmd(JournalExecutor* executor, TranactionData&& tx_data, Context* cntx); public: /* Utility */ struct Info { diff --git a/src/server/serializer_commons.h b/src/server/serializer_commons.h index b2b987035..e865dd2b7 100644 --- a/src/server/serializer_commons.h +++ b/src/server/serializer_commons.h @@ -19,7 +19,7 @@ using nonstd::make_unexpected; VLOG(1) << "Error while calling " #expr; \ return exp_val.error(); \ } \ - dest = exp_val.value(); \ + dest = std::move(exp_val.value()); \ } while (0) #define SET_OR_UNEXPECT(expr, dest) \ diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 57f9b6c2f..291bde53c 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -328,7 +328,8 @@ bool Transaction::RunInShard(EngineShard* shard) { // runnable concludes current operation, and should_release which tells // whether we should unlock the keys. should_release is false for multi and // equal to concluding otherwise. - bool should_release = (coordinator_state_ & COORD_EXEC_CONCLUDING) && !multi_; + bool is_concluding = (coordinator_state_ & COORD_EXEC_CONCLUDING); + bool should_release = is_concluding && !multi_; IntentLock::Mode mode = Mode(); // We make sure that we lock exactly once for each (multi-hop) transaction inside @@ -373,7 +374,7 @@ bool Transaction::RunInShard(EngineShard* shard) { /*************************************************************************/ - if (!was_suspended && should_release) // Check last hop & non suspended. + if (!was_suspended && is_concluding) // Check last hop & non suspended. LogJournalOnShard(shard); // at least the coordinator thread owns the reference. @@ -631,6 +632,10 @@ void Transaction::UnlockMulti() { sharded_keys[sid].push_back(k_v); } + if (ServerState::tlocal()->journal()) { + SetMultiUniqueShardCount(); + } + uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed); DCHECK_EQ(prev, 0u); @@ -643,6 +648,33 @@ void Transaction::UnlockMulti() { VLOG(1) << "UnlockMultiEnd " << DebugId(); } +void Transaction::SetMultiUniqueShardCount() { + uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed); + DCHECK_EQ(prev, 0u); + + std::atomic unique_shard_cnt = 0; + + auto update_shard_cnd = [&] { + EngineShard* shard = EngineShard::tlocal(); + auto journal = shard->journal(); + + if (journal != nullptr) { + TxId last_tx = journal->GetLastTxId(); + if (last_tx == txid_) { + unique_shard_cnt.fetch_add(1, std::memory_order_relaxed); + } + } + this->DecreaseRunCnt(); + }; + + for (ShardId i = 0; i < shard_data_.size(); ++i) { + shard_set->Add(i, std::move(update_shard_cnd)); + } + WaitForShardCallbacks(); + + unique_shard_cnt_ = unique_shard_cnt.load(std::memory_order_release); +} + void Transaction::Schedule() { if (multi_ && multi_->is_expanding) { LockMulti(); @@ -1080,6 +1112,11 @@ void Transaction::ExpireShardCb(EngineShard* shard) { } void Transaction::UnlockMultiShardCb(const std::vector& sharded_keys, EngineShard* shard) { + auto journal = shard->journal(); + if (journal != nullptr && journal->GetLastTxId() == txid_) { + journal->RecordEntry(journal::Entry{txid_, journal::Op::EXEC, db_index_, unique_shard_cnt_}); + } + if (multi_->multi_opts & CO::GLOBAL_TRANS) { shard->shard_lock()->Release(IntentLock::EXCLUSIVE); } @@ -1221,7 +1258,12 @@ void Transaction::LogJournalOnShard(EngineShard* shard) { entry_payload = make_pair(facade::ToSV(cmd_with_full_args_.front()), ShardArgsInShard(shard->shard_id())); } - journal->RecordEntry(journal::Entry{txid_, db_index_, entry_payload, unique_shard_cnt_}); + journal::Op opcode = journal::Op::COMMAND; + if (multi_) { + opcode = journal::Op::MULTI_COMMAND; + } + + journal->RecordEntry(journal::Entry{txid_, opcode, db_index_, unique_shard_cnt_, entry_payload}); } void Transaction::BreakOnShutdown() { diff --git a/src/server/transaction.h b/src/server/transaction.h index 6942a17ea..bfdeb2318 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -123,6 +123,12 @@ class Transaction { } void UnlockMulti(); + // In multi transaciton command we calculate the unique shard count of the trasaction + // after all transaciton commands where executed, by checking the last txid writen to + // all journals. + // This value is writen to journal so that replica we be able to apply the multi command + // atomicaly. + void SetMultiUniqueShardCount(); TxId txid() const { return txid_;