From db21b735f69b1ed8dff2d5115888dec3e4b5cd84 Mon Sep 17 00:00:00 2001 From: Roy Jacobson Date: Mon, 18 Sep 2023 13:59:41 +0300 Subject: [PATCH] feat(replication): Use a ring buffer with messages to serve replication. (#1835) * feat(replication): Use a ring buffer with messages to serve replication. * Fix libraries dep graph * Address PR feedback * nits * add a comment * Lower the default log length --- src/server/CMakeLists.txt | 2 +- src/server/journal/executor.cc | 11 +++-- src/server/journal/executor.h | 2 +- src/server/journal/journal.cc | 8 +++ src/server/journal/journal.h | 3 ++ src/server/journal/journal_slice.cc | 76 ++++++++++++++++++++--------- src/server/journal/journal_slice.h | 15 ++++-- src/server/journal/serializer.cc | 3 ++ src/server/journal/streamer.cc | 8 +-- src/server/journal/streamer.h | 1 - src/server/journal/types.h | 8 ++- src/server/rdb_load.cc | 30 ++++++++---- src/server/rdb_save.cc | 9 +--- src/server/rdb_save.h | 3 +- src/server/snapshot.cc | 11 ++--- src/server/snapshot.h | 2 +- 16 files changed, 127 insertions(+), 65 deletions(-) diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index bf2220a46..85fa5848d 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -17,6 +17,7 @@ endif() add_library(dfly_transaction db_slice.cc malloc_stats.cc engine_shard_set.cc blocking_controller.cc common.cc journal/journal.cc journal/types.cc journal/journal_slice.cc server_state.cc table.cc top_keys.cc transaction.cc + serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc ${TX_LINUX_SRCS} ) cxx_link(dfly_transaction dfly_core strings_lib) @@ -38,7 +39,6 @@ add_library(dragonfly_lib channel_store.cc command_registry.cc detail/snapshot_storage.cc set_family.cc stream_family.cc string_family.cc zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc - serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc top_keys.cc multi_command_squasher.cc hll_family.cc cluster/cluster_config.cc cluster/cluster_family.cc acl/user.cc acl/user_registry.cc acl/acl_family.cc acl/validator.cc acl/helpers.cc) diff --git a/src/server/journal/executor.cc b/src/server/journal/executor.cc index bebe55eb2..986ce1eb2 100644 --- a/src/server/journal/executor.cc +++ b/src/server/journal/executor.cc @@ -20,7 +20,7 @@ namespace dfly { namespace { // Build a CmdData from parts passed to absl::StrCat. template journal::ParsedEntry::CmdData BuildFromParts(Ts... parts) { - vector raw_parts{absl::StrCat(forward(parts))...}; + vector raw_parts{absl::StrCat(std::forward(parts))...}; auto cmd_str = accumulate(raw_parts.begin(), raw_parts.end(), std::string{}); auto buf = make_unique(cmd_str.size()); @@ -28,8 +28,8 @@ template journal::ParsedEntry::CmdData BuildFromParts(Ts... par CmdArgVec slice_parts{}; size_t start = 0; - for (auto part : raw_parts) { - slice_parts.push_back(MutableSlice{buf.get() + start, part.size()}); + for (const auto& part : raw_parts) { + slice_parts.emplace_back(buf.get() + start, part.size()); start += part.size(); } @@ -38,8 +38,9 @@ template journal::ParsedEntry::CmdData BuildFromParts(Ts... par } // namespace JournalExecutor::JournalExecutor(Service* service) - : service_{service}, reply_builder_{facade::ReplyMode::NONE}, conn_context_{nullptr, nullptr, - &reply_builder_} { + : service_{service}, + reply_builder_{facade::ReplyMode::NONE}, + conn_context_{nullptr, nullptr, &reply_builder_} { conn_context_.is_replicating = true; conn_context_.journal_emulated = true; } diff --git a/src/server/journal/executor.h b/src/server/journal/executor.h index 1d637d960..6ed756c89 100644 --- a/src/server/journal/executor.h +++ b/src/server/journal/executor.h @@ -16,7 +16,7 @@ class Service; // JournalExecutor allows executing journal entries. class JournalExecutor { public: - JournalExecutor(Service* service); + explicit JournalExecutor(Service* service); ~JournalExecutor(); JournalExecutor(JournalExecutor&&) = delete; diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 80cf91c75..07e190520 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -96,6 +96,14 @@ bool Journal::HasRegisteredCallbacks() const { return journal_slice.HasRegisteredCallbacks(); } +bool Journal::IsLSNInBuffer(LSN lsn) const { + return journal_slice.IsLSNInBuffer(lsn); +} + +std::string_view Journal::GetEntry(LSN lsn) const { + return journal_slice.GetEntry(lsn); +} + LSN Journal::GetLsn() const { return journal_slice.cur_lsn(); } diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index fb26a1c7a..3d7d024f7 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -39,6 +39,9 @@ class Journal { void UnregisterOnChange(uint32_t id); bool HasRegisteredCallbacks() const; + bool IsLSNInBuffer(LSN lsn) const; + std::string_view GetEntry(LSN lsn) const; + /* void AddCmd(TxId txid, Op opcode, Span args) { OpArgs(txid, opcode, args); diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index 7b6051488..6355b413f 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -5,12 +5,19 @@ #include "server/journal/journal_slice.h" #include +#include +#include #include #include #include +#include "base/function2.hpp" #include "base/logging.h" +#include "server/journal/serializer.h" + +ABSL_FLAG(int, shard_repl_backlog_len, 1 << 10, + "The length of the circular replication log per shard"); namespace dfly { namespace journal { @@ -34,12 +41,6 @@ string ShardName(std::string_view base, unsigned index) { CHECK(!__ec$) << "Error: " << __ec$ << " " << __ec$.message() << " for " << #x; \ } while (false) -struct JournalSlice::RingItem { - LSN lsn; - TxId txid; - Op opcode; -}; - JournalSlice::JournalSlice() { } @@ -48,11 +49,11 @@ JournalSlice::~JournalSlice() { } void JournalSlice::Init(unsigned index) { - // if (ring_buffer_) // calling this function multiple times is allowed and it's a no-op. - // return; + if (ring_buffer_) // calling this function multiple times is allowed and it's a no-op. + return; slice_index_ = index; - // ring_buffer_.emplace(128); // TODO: to make it configurable + ring_buffer_.emplace(absl::GetFlag(FLAGS_shard_repl_backlog_len)); } #if 0 @@ -116,20 +117,49 @@ error_code JournalSlice::Close() { } #endif +bool JournalSlice::IsLSNInBuffer(LSN lsn) const { + DCHECK(ring_buffer_); + + if (ring_buffer_->empty()) { + return false; + } + return (*ring_buffer_)[0].lsn <= lsn && lsn <= ((*ring_buffer_)[ring_buffer_->size() - 1].lsn); +} + +std::string_view JournalSlice::GetEntry(LSN lsn) const { + DCHECK(ring_buffer_ && IsLSNInBuffer(lsn)); + auto start = (*ring_buffer_)[0].lsn; + DCHECK((*ring_buffer_)[lsn - start].lsn == lsn); + return (*ring_buffer_)[lsn - start].data; +} + void JournalSlice::AddLogRecord(const Entry& entry, bool await) { - // DCHECK(ring_buffer_); - if (entry.opcode != Op::NOOP) { - lsn_++; -// TODO: This is preparation for AOC style journaling, currently unused. + DCHECK(ring_buffer_); + + JournalItem dummy; + JournalItem* item; + if (entry.opcode == Op::NOOP) { + item = &dummy; + item->lsn = -1; + item->opcode = entry.opcode; + item->data = ""; + } else { + // GetTail gives a pointer to a new tail entry in the buffer, possibly overriding the last entry + // if the buffer is full. + item = ring_buffer_->GetTail(true); + item->opcode = entry.opcode; + item->lsn = lsn_++; + + io::BufSink buf_sink{&ring_serialize_buf_}; + JournalWriter writer{&buf_sink}; + writer.Write(entry); + + item->data = io::View(ring_serialize_buf_.InputBuffer()); + ring_serialize_buf_.Clear(); + VLOG(2) << "Writing item [" << item->lsn << "]: " << entry.ToString(); + } + #if 0 - RingItem item; - item.lsn = prev_lsn; - - item.opcode = entry.opcode; - item.txid = entry.txid; - VLOG(1) << "Writing item [" << item.lsn << "]: " << entry.ToString(); - ring_buffer_->EmplaceOrOverride(move(item)); - if (shard_file_) { string line = absl::StrCat(item.lsn, " ", entry.txid, " ", entry.opcode, "\n"); error_code ec = shard_file_->Write(io::Buffer(line), file_offset_, 0); @@ -137,15 +167,15 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) { file_offset_ += line.size(); } #endif - } + // TODO: Remove the callbacks, replace with notifiers { std::shared_lock lk(cb_mu_); DVLOG(2) << "AddLogRecord: run callbacks for " << entry.ToString() << " num callbacks: " << change_cb_arr_.size(); for (const auto& k_v : change_cb_arr_) { - k_v.second(entry, await); + k_v.second(*item, await); } } } diff --git a/src/server/journal/journal_slice.h b/src/server/journal/journal_slice.h index cf1e728e8..769b4b728 100644 --- a/src/server/journal/journal_slice.h +++ b/src/server/journal/journal_slice.h @@ -45,19 +45,28 @@ class JournalSlice { void AddLogRecord(const Entry& entry, bool await); + // Register a callback that will be called every time a new entry is + // added to the journal. + // The callback receives the entry and a boolean that indicates whether + // awaiting (to apply backpressure) is allowed. uint32_t RegisterOnChange(ChangeCallback cb); void UnregisterOnChange(uint32_t); + bool HasRegisteredCallbacks() const { std::shared_lock lk(cb_mu_); return !change_cb_arr_.empty(); } - private: - struct RingItem; + /// Returns whether the journal entry with this LSN is available + /// from the buffer. + bool IsLSNInBuffer(LSN lsn) const; + std::string_view GetEntry(LSN lsn) const; + private: // std::string shard_path_; // std::unique_ptr shard_file_; - // std::optional> ring_buffer_; + std::optional> ring_buffer_; + base::IoBuf ring_serialize_buf_; mutable util::SharedMutex cb_mu_; std::vector> change_cb_arr_ ABSL_GUARDED_BY(cb_mu_); diff --git a/src/server/journal/serializer.cc b/src/server/journal/serializer.cc index 7a700d3cc..08da8d954 100644 --- a/src/server/journal/serializer.cc +++ b/src/server/journal/serializer.cc @@ -4,8 +4,11 @@ #include "server/journal/serializer.h" +#include + #include "base/io_buf.h" #include "base/logging.h" +#include "glog/logging.h" #include "io/io.h" #include "server/common.h" #include "server/error.h" diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index b1b092117..e152109ff 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -10,12 +10,12 @@ using namespace util; void JournalStreamer::Start(io::Sink* dest) { using namespace journal; write_fb_ = fb2::Fiber("journal_stream", &JournalStreamer::WriterFb, this, dest); - journal_cb_id_ = journal_->RegisterOnChange([this](const Entry& entry, bool allow_await) { - if (entry.opcode == Op::NOOP) { - // No recode to write, just await if data was written so consumer will read the data. + journal_cb_id_ = journal_->RegisterOnChange([this](const JournalItem& item, bool allow_await) { + if (item.opcode == Op::NOOP) { + // No record to write, just await if data was written so consumer will read the data. return AwaitIfWritten(); } - writer_.Write(entry); + Write(io::Buffer(item.data)); NotifyWritten(allow_await); }); } diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 2f7e481e6..3a0d648ec 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -40,7 +40,6 @@ class JournalStreamer : protected BufferedStreamerBase { journal::Journal* journal_; Fiber write_fb_{}; - JournalWriter writer_{this}; }; } // namespace dfly diff --git a/src/server/journal/types.h b/src/server/journal/types.h index a467e2b2b..b4134227a 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -69,7 +69,13 @@ struct ParsedEntry : public EntryBase { std::string ToString() const; }; -using ChangeCallback = std::function; +struct JournalItem { + LSN lsn; + Op opcode; + std::string data; +}; + +using ChangeCallback = std::function; } // namespace journal } // namespace dfly diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index d83b35891..77091b028 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -4,6 +4,8 @@ #include "server/rdb_load.h" +#include "absl/strings/escaping.h" + extern "C" { #include "redis/intset.h" @@ -2147,20 +2149,28 @@ error_code RdbLoaderBase::HandleJournalBlob(Service* service) { while (done < num_entries) { journal::ParsedEntry entry{}; SET_OR_RETURN(journal_reader_.ReadEntry(), entry); + done++; - if (!entry.cmd.cmd_args.empty()) { - if (absl::EqualsIgnoreCase(facade::ToSV(entry.cmd.cmd_args[0]), "FLUSHALL") || - absl::EqualsIgnoreCase(facade::ToSV(entry.cmd.cmd_args[0]), "FLUSHDB")) { - // Applying a flush* operation in the middle of a load can cause out-of-sync deletions of - // data that should not be deleted, see https://github.com/dragonflydb/dragonfly/issues/1231 - // By returning an error we are effectively restarting the replication. - return RdbError(errc::unsupported_operation); - } + // EXEC entries are just for preserving atomicity of transactions. We don't create + // transactions and we don't care about atomicity when we're loading an RDB, so skip them. + // Currently rdb_save also filters those records out, but we filter them additionally here + // for better forward compatibility if we decide to change that. + if (entry.opcode == journal::Op::EXEC) + continue; + + if (entry.cmd.cmd_args.empty()) + return RdbError(errc::rdb_file_corrupted); + + if (absl::EqualsIgnoreCase(facade::ToSV(entry.cmd.cmd_args[0]), "FLUSHALL") || + absl::EqualsIgnoreCase(facade::ToSV(entry.cmd.cmd_args[0]), "FLUSHDB")) { + // Applying a flush* operation in the middle of a load can cause out-of-sync deletions of + // data that should not be deleted, see https://github.com/dragonflydb/dragonfly/issues/1231 + // By returning an error we are effectively restarting the replication. + return RdbError(errc::unsupported_operation); } + VLOG(1) << "Executing item: " << entry.ToString(); ex.Execute(entry.dbid, entry.cmd); - VLOG(1) << "Reading item: " << entry.ToString(); - done++; } return std::error_code{}; diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 495be76a1..f34055d65 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -733,15 +733,10 @@ io::Bytes RdbSerializer::PrepareFlush() { return mem_buf_.InputBuffer(); } -error_code RdbSerializer::WriteJournalEntry(const journal::Entry& entry) { - io::BufSink buf_sink{&journal_mem_buf_}; - JournalWriter writer{&buf_sink}; - writer.Write(entry); - +error_code RdbSerializer::WriteJournalEntry(std::string_view serialized_entry) { RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_JOURNAL_BLOB)); RETURN_ON_ERR(SaveLen(1)); - RETURN_ON_ERR(SaveString(io::View(journal_mem_buf_.InputBuffer()))); - journal_mem_buf_.Clear(); + RETURN_ON_ERR(SaveString(serialized_entry)); return error_code{}; } diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index f49b26ee3..f75be3f09 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -158,7 +158,7 @@ class RdbSerializer { } // Write journal entry as an embedded journal blob. - std::error_code WriteJournalEntry(const journal::Entry& entry); + std::error_code WriteJournalEntry(std::string_view entry); std::error_code SendJournalOffset(uint64_t journal_offset); @@ -188,7 +188,6 @@ class RdbSerializer { void AllocateCompressorOnce(); base::IoBuf mem_buf_; - base::IoBuf journal_mem_buf_; std::string tmp_str_; base::PODArray tmp_buf_; DbIndex last_entry_db_index_ = kInvalidDbId; diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 6771cae31..d1c8e83a5 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -260,14 +260,13 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) // transaction. // OnJournalEntry registers for changes in journal, the journal change function signature is // (const journal::Entry& entry, bool await) In snapshot flow we dont use the await argument. -void SliceSnapshot::OnJournalEntry(const journal::Entry& entry, bool unused_await_arg) { - // We ignore non payload entries like EXEC because we have no transactional ordering during - // LOAD phase on replica. - if (!entry.HasPayload()) { +void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool unused_await_arg) { + // We ignore EXEC and NOOP entries because we they have no meaning during + // the LOAD phase on replica. + if (item.opcode == journal::Op::NOOP || item.opcode == journal::Op::EXEC) return; - } - serializer_->WriteJournalEntry(entry); + serializer_->WriteJournalEntry(item.data); // This is the only place that flushes in streaming mode // once the iterate buckets fiber finished. diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 75b6bb154..67ea7fccf 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -92,7 +92,7 @@ class SliceSnapshot { void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req); // Journal listener - void OnJournalEntry(const journal::Entry& entry, bool unused_await_arg); + void OnJournalEntry(const journal::JournalItem& item, bool unused_await_arg); // Close dest channel if not closed yet. void CloseRecordChannel();