diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index e9c34ed3c..dfe671234 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -21,7 +21,7 @@ add_library(dragonfly_lib channel_slice.cc command_registry.cc snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc set_family.cc stream_family.cc string_family.cc zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc - serializer_commons.cc journal/serializer.cc journal/executor.cc) + serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc) cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib http_client_lib absl::random_random TRDP::jsoncons zstd TRDP::lz4) diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 72f6e27f6..a8459c646 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -12,9 +12,8 @@ #include "facade/dragonfly_connection.h" #include "server/engine_shard_set.h" #include "server/error.h" -#include "server/io_utils.h" #include "server/journal/journal.h" -#include "server/journal/serializer.h" +#include "server/journal/streamer.h" #include "server/rdb_save.h" #include "server/script_mgr.h" #include "server/server_family.h" @@ -29,7 +28,6 @@ namespace dfly { using namespace facade; using namespace std; -using namespace util::fibers_ext; using util::ProactorBase; namespace { @@ -60,63 +58,6 @@ struct TransactionGuard { Transaction* t; }; - -// Buffered single-shard journal streamer that listens for journal changes with a -// journal listener and writes them to a destination sink in a separate fiber. -class JournalStreamer : protected BufferedStreamerBase { - public: - JournalStreamer(journal::Journal* journal, Context* cntx) - : BufferedStreamerBase{cntx->GetCancellation()}, cntx_{cntx}, - journal_cb_id_{0}, journal_{journal}, write_fb_{}, writer_{this} { - } - - // Self referential. - JournalStreamer(const JournalStreamer& other) = delete; - JournalStreamer(JournalStreamer&& other) = delete; - - // Register journal listener and start writer in fiber. - void Start(io::Sink* dest); - - // Must be called on context cancellation for unblocking - // and manual cleanup. - void Cancel(); - - private: - // Writer fiber that steals buffer contents and writes them to dest. - void WriterFb(io::Sink* dest); - - private: - Context* cntx_; - - uint32_t journal_cb_id_; - journal::Journal* journal_; - - Fiber write_fb_; - JournalWriter writer_; -}; - -void JournalStreamer::Start(io::Sink* dest) { - write_fb_ = Fiber(&JournalStreamer::WriterFb, this, dest); - journal_cb_id_ = journal_->RegisterOnChange([this](const journal::Entry& entry) { - writer_.Write(entry); - NotifyWritten(); - }); -} - -void JournalStreamer::Cancel() { - journal_->UnregisterOnChange(journal_cb_id_); - Finalize(); - - if (write_fb_.IsJoinable()) - write_fb_.Join(); -} - -void JournalStreamer::WriterFb(io::Sink* dest) { - if (auto ec = ConsumeIntoSink(dest); ec) { - cntx_->ReportError(ec); - } -} - } // namespace DflyCmd::ReplicaRoleInfo::ReplicaRoleInfo(std::string address, uint32_t listening_port, @@ -175,6 +116,10 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) { return Expire(args, cntx); } + if (sub_cmd == "REPLICAOFFSET" && args.size() == 3) { + return ReplicaOffset(args, cntx); + } + rb->SendError(kSyntaxErr); } @@ -303,7 +248,8 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) { absl::InsecureBitGen gen; string eof_token = GetRandomHex(gen, 40); - replica_ptr->flows[flow_id] = FlowInfo{cntx->owner(), eof_token}; + replica_ptr->flows[flow_id].conn = cntx->owner(); + replica_ptr->flows[flow_id].eof_token = eof_token; listener_->Migrate(cntx->owner(), shard_set->pool()->at(flow_id)); rb->StartArray(2); @@ -392,6 +338,28 @@ void DflyCmd::Expire(CmdArgList args, ConnectionContext* cntx) { return rb->SendOk(); } +void DflyCmd::ReplicaOffset(CmdArgList args, ConnectionContext* cntx) { + RedisReplyBuilder* rb = static_cast(cntx->reply_builder()); + string_view sync_id_str = ArgS(args, 2); + + VLOG(1) << "Got DFLY REPLICAOFFSET " << sync_id_str; + auto [sync_id, replica_ptr] = GetReplicaInfoOrReply(sync_id_str, rb); + if (!sync_id) + return; + + string result; + unique_lock lk(replica_ptr->mu); + rb->StartArray(replica_ptr->flows.size()); + for (size_t flow_id = 0; flow_id < replica_ptr->flows.size(); ++flow_id) { + JournalStreamer* streamer = replica_ptr->flows[flow_id].streamer.get(); + if (streamer) { + rb->SendLong(streamer->GetRecordCount()); + } else { + rb->SendLong(0); + } + } +} + OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { DCHECK(!flow->full_sync_fb.IsJoinable()); @@ -431,18 +399,17 @@ void DflyCmd::StopFullSyncInThread(FlowInfo* flow, EngineShard* shard) { OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) { // Create streamer for shard flows. - JournalStreamer* streamer = nullptr; + if (shard != nullptr) { - streamer = new JournalStreamer{sf_->journal(), cntx}; - streamer->Start(flow->conn->socket()); + flow->streamer.reset(new JournalStreamer(sf_->journal(), cntx)); + flow->streamer->Start(flow->conn->socket()); } // Register cleanup. - flow->cleanup = [this, streamer, flow]() { + flow->cleanup = [this, flow]() { flow->TryShutdownSocket(); - if (streamer) { - streamer->Cancel(); - delete streamer; + if (flow->streamer) { + flow->streamer->Cancel(); } }; @@ -654,4 +621,10 @@ void DflyCmd::FlowInfo::TryShutdownSocket() { } } +DflyCmd::FlowInfo::~FlowInfo() { +} + +DflyCmd::FlowInfo::FlowInfo() { +} + } // namespace dfly diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index 8b634d2e9..e6643c959 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -27,10 +27,7 @@ namespace dfly { class EngineShardSet; class ServerFamily; class RdbSaver; - -namespace journal { -class Journal; -} // namespace journal +class JournalStreamer; // DflyCmd is responsible for managing replication. A master instance can be connected // to many replica instances, what is more, each of them can open multiple connections. @@ -83,10 +80,8 @@ class DflyCmd { // Stores information related to a single flow. struct FlowInfo { - FlowInfo() = default; - FlowInfo(facade::Connection* conn, const std::string& eof_token) - : conn{conn}, eof_token{eof_token} {}; - + FlowInfo(); + ~FlowInfo(); // Shutdown associated socket if its still open. void TryShutdownSocket(); @@ -94,6 +89,7 @@ class DflyCmd { util::fibers_ext::Fiber full_sync_fb; // Full sync fiber. std::unique_ptr saver; // Saver used by the full sync phase. + std::unique_ptr streamer; std::string eof_token; std::function cleanup; // Optional cleanup for cancellation. @@ -164,6 +160,10 @@ class DflyCmd { // Check all keys for expiry. void Expire(CmdArgList args, ConnectionContext* cntx); + // REPLICAOFFSET + // Return journal records num sent for each flow of replication. + void ReplicaOffset(CmdArgList args, ConnectionContext* cntx); + // Start full sync in thread. Start FullSyncFb. Called for each flow. facade::OpStatus StartFullSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard); diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc new file mode 100644 index 000000000..0b6cc11b7 --- /dev/null +++ b/src/server/journal/streamer.cc @@ -0,0 +1,36 @@ +// Copyright 2022, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/journal/streamer.h" + +namespace dfly { + +void JournalStreamer::Start(io::Sink* dest) { + write_fb_ = util::fibers_ext::Fiber(&JournalStreamer::WriterFb, this, dest); + journal_cb_id_ = journal_->RegisterOnChange([this](const journal::Entry& entry) { + writer_.Write(entry); + record_cnt_.fetch_add(1, std::memory_order_relaxed); + NotifyWritten(); + }); +} + +uint64_t JournalStreamer::GetRecordCount() const { + return record_cnt_.load(std::memory_order_relaxed); +} + +void JournalStreamer::Cancel() { + journal_->UnregisterOnChange(journal_cb_id_); + Finalize(); + + if (write_fb_.IsJoinable()) + write_fb_.Join(); +} + +void JournalStreamer::WriterFb(io::Sink* dest) { + if (auto ec = ConsumeIntoSink(dest); ec) { + cntx_->ReportError(ec); + } +} + +} // namespace dfly diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h new file mode 100644 index 000000000..243997b18 --- /dev/null +++ b/src/server/journal/streamer.h @@ -0,0 +1,50 @@ +// Copyright 2022, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include "server/io_utils.h" +#include "server/journal/journal.h" +#include "server/journal/serializer.h" +#include "util/fibers/fiber.h" + +namespace dfly { + +// Buffered single-shard journal streamer that listens for journal changes with a +// journal listener and writes them to a destination sink in a separate fiber. +class JournalStreamer : protected BufferedStreamerBase { + public: + JournalStreamer(journal::Journal* journal, Context* cntx) + : BufferedStreamerBase{cntx->GetCancellation()}, cntx_{cntx}, + journal_cb_id_{0}, journal_{journal}, write_fb_{}, writer_{this} { + } + + // Self referential. + JournalStreamer(const JournalStreamer& other) = delete; + JournalStreamer(JournalStreamer&& other) = delete; + + // Register journal listener and start writer in fiber. + void Start(io::Sink* dest); + + // Must be called on context cancellation for unblocking + // and manual cleanup. + void Cancel(); + uint64_t GetRecordCount() const; + + private: + // Writer fiber that steals buffer contents and writes them to dest. + void WriterFb(io::Sink* dest); + + private: + Context* cntx_; + + uint32_t journal_cb_id_; + journal::Journal* journal_; + + util::fibers_ext::Fiber write_fb_; + JournalWriter writer_; + std::atomic_uint64_t record_cnt_; +}; + +} // namespace dfly