From 5d39521de35297db112bacab3ec80eaf7828b063 Mon Sep 17 00:00:00 2001 From: adiholden Date: Sun, 25 Dec 2022 14:03:49 +0200 Subject: [PATCH] feat(replica): Support FlushDB command for replication #580 (#591) --- src/server/journal/executor.cc | 2 +- src/server/journal/executor.h | 2 +- src/server/journal/serializer.cc | 6 +++ src/server/journal/serializer.h | 1 + src/server/journal/types.h | 13 ++++--- src/server/journal_test.cc | 15 ++++---- src/server/rdb_load.cc | 2 +- src/server/replica.cc | 65 ++++++++++++++++++++++++++++---- src/server/replica.h | 24 +++++++++++- src/server/transaction.cc | 2 +- 10 files changed, 107 insertions(+), 25 deletions(-) diff --git a/src/server/journal/executor.cc b/src/server/journal/executor.cc index b676926a7..193f001f4 100644 --- a/src/server/journal/executor.cc +++ b/src/server/journal/executor.cc @@ -12,7 +12,7 @@ namespace dfly { JournalExecutor::JournalExecutor(Service* service) : service_{service} { } -void JournalExecutor::Execute(journal::ParsedEntry&& entry) { +void JournalExecutor::Execute(journal::ParsedEntry& entry) { if (entry.payload) { io::NullSink null_sink; ConnectionContext conn_context{&null_sink, nullptr}; diff --git a/src/server/journal/executor.h b/src/server/journal/executor.h index 247d7e26c..5cf25c91a 100644 --- a/src/server/journal/executor.h +++ b/src/server/journal/executor.h @@ -14,7 +14,7 @@ class Service; class JournalExecutor { public: JournalExecutor(Service* service); - void Execute(journal::ParsedEntry&& entry); + void Execute(journal::ParsedEntry& entry); private: Service* service_; diff --git a/src/server/journal/serializer.cc b/src/server/journal/serializer.cc index 0d3fc53c1..ae337100b 100644 --- a/src/server/journal/serializer.cc +++ b/src/server/journal/serializer.cc @@ -70,6 +70,7 @@ error_code JournalWriter::Write(const journal::Entry& entry) { return Write(entry.dbid); case journal::Op::COMMAND: RETURN_ON_ERR(Write(entry.txid)); + RETURN_ON_ERR(Write(entry.shard_cnt)); return std::visit([this](const auto& payload) { return Write(payload); }, entry.payload); default: break; @@ -100,6 +101,10 @@ io::Result JournalReader::ReadU16(io::Source* source) { return ReadPackedUIntTyped(source); } +io::Result JournalReader::ReadU32(io::Source* source) { + return ReadPackedUIntTyped(source); +} + io::Result JournalReader::ReadU64(io::Source* source) { return ReadPackedUIntTyped(source); } @@ -153,6 +158,7 @@ io::Result JournalReader::ReadEntry(io::Source* source) { switch (entry.opcode) { case journal::Op::COMMAND: SET_OR_UNEXPECT(ReadU64(source), entry.txid); + SET_OR_UNEXPECT(ReadU32(source), entry.shard_cnt); entry.payload = CmdArgVec{}; if (auto ec = Read(source, &*entry.payload); ec) return make_unexpected(ec); diff --git a/src/server/journal/serializer.h b/src/server/journal/serializer.h index aceb85a2a..06a443af4 100644 --- a/src/server/journal/serializer.h +++ b/src/server/journal/serializer.h @@ -55,6 +55,7 @@ struct JournalReader { // TODO: Templated endian encoding to not repeat...? io::Result ReadU8(io::Source* source); io::Result ReadU16(io::Source* source); + io::Result ReadU32(io::Source* source); io::Result ReadU64(io::Source* source); // Read string into internal buffer and return size. diff --git a/src/server/journal/types.h b/src/server/journal/types.h index 58fc72835..40bcde1ea 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -22,6 +22,7 @@ struct EntryBase { TxId txid; Op opcode; DbIndex dbid; + uint32_t shard_cnt; }; // This struct represents a single journal entry. @@ -34,11 +35,11 @@ struct Entry : public EntryBase { std::pair // Command and its shard parts. >; - Entry(TxId txid, DbIndex dbid, Payload pl) - : EntryBase{txid, journal::Op::COMMAND, dbid}, payload{pl} { + Entry(TxId txid, DbIndex dbid, Payload pl, uint32_t shard_cnt) + : EntryBase{txid, journal::Op::COMMAND, dbid, shard_cnt}, payload{pl} { } - Entry(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid}, payload{} { + Entry(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid, 0}, payload{} { } Payload payload; @@ -50,11 +51,11 @@ struct ParsedEntry : public EntryBase { ParsedEntry() = default; - ParsedEntry(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid}, payload{} { + ParsedEntry(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid, 0}, payload{} { } - ParsedEntry(TxId txid, DbIndex dbid, Payload pl) - : EntryBase{txid, journal::Op::COMMAND, dbid}, payload{pl} { + ParsedEntry(TxId txid, DbIndex dbid, Payload pl, uint32_t shard_cnt) + : EntryBase{txid, journal::Op::COMMAND, dbid, shard_cnt}, payload{pl} { } Payload payload; diff --git a/src/server/journal_test.cc b/src/server/journal_test.cc index 8f63b18a5..c14f96898 100644 --- a/src/server/journal_test.cc +++ b/src/server/journal_test.cc @@ -95,13 +95,14 @@ TEST(Journal, WriteRead) { auto slice = [v = &slices](auto... ss) { return StoreSlice(v, ss...); }; auto list = [v = &lists](auto... ss) { return StoreList(v, ss...); }; - std::vector test_entries = {{0, 0, make_pair("MSET", slice("A", "1", "B", "2"))}, - {1, 0, make_pair("MSET", slice("C", "3"))}, - {2, 0, list("DEL", "A", "B")}, - {3, 1, list("LPUSH", "l", "v1", "v2")}, - {4, 0, make_pair("MSET", slice("D", "4"))}, - {5, 1, list("DEL", "l1")}, - {6, 2, list("SET", "E", "2")}}; + std::vector test_entries = { + {0, 0, make_pair("MSET", slice("A", "1", "B", "2")), 2}, + {0, 0, make_pair("MSET", slice("C", "3")), 2}, + {1, 0, list("DEL", "A", "B"), 2}, + {2, 1, list("LPUSH", "l", "v1", "v2"), 1}, + {3, 0, make_pair("MSET", slice("D", "4")), 1}, + {4, 1, list("DEL", "l1"), 1}, + {5, 2, list("SET", "E", "2"), 1}}; // Write all entries to string file. io::StringSink ss; diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 46adcfaa7..c777864e4 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -1970,7 +1970,7 @@ error_code RdbLoaderBase::HandleJournalBlob(Service* service, DbIndex dbid) { while (done < num_entries) { journal::ParsedEntry entry{}; SET_OR_RETURN(journal_reader_.ReadEntry(&bs), entry); - ex.Execute(std::move(entry)); + ex.Execute(entry); done++; } diff --git a/src/server/replica.cc b/src/server/replica.cc index 50a269d26..4533841f5 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -97,9 +97,11 @@ Replica::Replica(string host, uint16_t port, Service* se) : service_(*se) { master_context_.port = port; } -Replica::Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* service) +Replica::Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* service, + std::shared_ptr shared_exe_data) : service_(*service), master_context_(context) { master_context_.dfly_flow_id = dfly_flow_id; + multi_shard_exe_ = shared_exe_data; } Replica::~Replica() { @@ -427,13 +429,13 @@ error_code Replica::InitiatePSync() { // Initialize and start sub-replica for each flow. error_code Replica::InitiateDflySync() { DCHECK_GT(num_df_flows_, 0u); - + multi_shard_exe_.reset(new MultiShardExecution()); shard_flows_.resize(num_df_flows_); for (unsigned i = 0; i < num_df_flows_; ++i) { - shard_flows_[i].reset(new Replica(master_context_, i, &service_)); + shard_flows_[i].reset(new Replica(master_context_, i, &service_, multi_shard_exe_)); } - // Blocked on untill all flows got full sync cut. + // Blocked on until all flows got full sync cut. fibers_ext::BlockingCounter sync_block{num_df_flows_}; auto err_handler = [this, sync_block](const auto& ge) mutable { @@ -705,14 +707,63 @@ void Replica::StableSyncDflyFb(Context* cntx) { cntx->Error(res.error(), "Journal format error"); return; } - - executor.Execute(std::move(res.value())); - + ExecuteEntry(&executor, res.value()); last_io_time_ = sock_->proactor()->GetMonotonicTimeNs(); } return; } +void Replica::ExecuteEntry(JournalExecutor* executor, journal::ParsedEntry& entry) { + if (entry.shard_cnt <= 1) { // not multi shard cmd + executor->Execute(entry); + return; + } + + // Multi shard command flow: + // step 1: Fiber wait until all the fibers that should execute this tranaction got + // to the journal entry of the transaction. + // step 2: execute the command (All fibers) + // step 3: Fiber wait until all fibers finished the execution + // By step 1 we enforce that replica will execute multi shard commands that finished on master + // By step 3 we ensures the correctness of flushall/flushdb commands + + // TODO: this implemantaion does not support atomicity in replica + // Although multi shard transaction happen in step 2 very close to each other, + // user can query replica between executions. + // To support atomicity we should have one fiber in step 2 which will excute all the entries of + // the transaction together. In case of global comand such as flushdb the command can be executed + // by only one fiber. + + // TODO: support error handler in this flow + + // Only the first fiber to reach the transaction will create data for transaction in map + multi_shard_exe_->map_mu.lock(); + auto [it, was_insert] = multi_shard_exe_->tx_sync_execution.emplace(entry.txid, entry.shard_cnt); + + // Note: we must release the mutex befor calling wait on barrier + multi_shard_exe_->map_mu.unlock(); + + VLOG(2) << "txid: " << entry.txid << " unique_shard_cnt_: " << entry.shard_cnt + << " was_insert: " << was_insert; + + // step 1 + it->second.barrier.wait(); + // step 2 + executor->Execute(entry); + // step 3 + it->second.barrier.wait(); + + // Note: erase from map can be done only after all fibers returned from wait. + // The last fiber which will decrease the counter to 0 will be the one to erase the data from map + auto val = it->second.counter.fetch_sub(1, std::memory_order_relaxed); + VLOG(2) << "txid: " << entry.txid << " unique_shard_cnt_: " << entry.shard_cnt + << " counter: " << val; + if (val == 1) { + std::lock_guard lg{multi_shard_exe_->map_mu}; + multi_shard_exe_->tx_sync_execution.erase(entry.txid); + } +} + error_code Replica::ReadRespReply(base::IoBuf* io_buf, uint32_t* consumed) { DCHECK(parser_); diff --git a/src/server/replica.h b/src/server/replica.h index 59d48b4d6..93df10740 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -3,13 +3,16 @@ // #pragma once +#include #include +#include #include #include "base/io_buf.h" #include "facade/facade_types.h" #include "facade/redis_parser.h" #include "server/common.h" +#include "server/journal/types.h" #include "util/fiber_socket_base.h" #include "util/fibers/fibers_ext.h" @@ -21,6 +24,7 @@ namespace dfly { class Service; class ConnectionContext; +class JournalExecutor; class Replica { private: @@ -46,6 +50,19 @@ class Replica { R_SYNC_OK = 0x10, }; + struct MultiShardExecution { + boost::fibers::mutex map_mu; + + struct TxExecutionSync { + boost::fibers::barrier barrier; + std::atomic_uint32_t counter; + TxExecutionSync(uint32_t counter) : barrier(counter), counter(counter) { + } + }; + + std::unordered_map tx_sync_execution; + }; + public: Replica(std::string master_host, uint16_t port, Service* se); ~Replica(); @@ -81,7 +98,8 @@ class Replica { private: /* Main dlfly flow mode functions */ // Initialize as single dfly flow. - Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* service); + Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* service, + std::shared_ptr shared_exe_data); // Start replica initialized as dfly flow. std::error_code StartFullSyncFlow(util::fibers_ext::BlockingCounter block, Context* cntx); @@ -122,6 +140,8 @@ class Replica { // Send command, update last_io_time, return error. std::error_code SendCommand(std::string_view command, facade::ReqSerializer* serializer); + void ExecuteEntry(JournalExecutor* executor, journal::ParsedEntry& entry); + public: /* Utility */ struct Info { std::string host; @@ -154,6 +174,8 @@ class Replica { MasterContext master_context_; std::unique_ptr sock_; + std::shared_ptr multi_shard_exe_; + // MainReplicationFb in standalone mode, FullSyncDflyFb in flow mode. ::boost::fibers::fiber sync_fb_; std::vector> shard_flows_; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index f7b7f8842..57f9b6c2f 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1221,7 +1221,7 @@ void Transaction::LogJournalOnShard(EngineShard* shard) { entry_payload = make_pair(facade::ToSV(cmd_with_full_args_.front()), ShardArgsInShard(shard->shard_id())); } - journal->RecordEntry(journal::Entry{txid_, db_index_, entry_payload}); + journal->RecordEntry(journal::Entry{txid_, db_index_, entry_payload, unique_shard_cnt_}); } void Transaction::BreakOnShutdown() {