From a80063189e60a7de0c51f4fd0b73980f52cd54f2 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 13 Jun 2024 12:29:06 +0300 Subject: [PATCH] chore: Streamer is rewritten with async interface (#3108) * chore: Streamer is rewritten with async interface Signed-off-by: Roman Gershman --------- Signed-off-by: Roman Gershman --- src/server/CMakeLists.txt | 2 +- src/server/common.cc | 1 - src/server/io_utils.cc | 94 --------------- src/server/io_utils.h | 78 ------------- src/server/journal/streamer.cc | 206 ++++++++++++++++++++++++++------- src/server/journal/streamer.h | 52 ++++++--- 6 files changed, 203 insertions(+), 230 deletions(-) delete mode 100644 src/server/io_utils.cc delete mode 100644 src/server/io_utils.h diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index b1052b332..852f84bb3 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -52,7 +52,7 @@ add_library(dragonfly_lib bloom_family.cc engine_shard_set.cc channel_store.cc detail/save_stages_controller.cc detail/snapshot_storage.cc set_family.cc stream_family.cc string_family.cc - zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc + zset_family.cc version.cc bitops_family.cc container_utils.cc top_keys.cc multi_command_squasher.cc hll_family.cc ${DF_SEARCH_SRCS} ${DF_LINUX_SRCS} diff --git a/src/server/common.cc b/src/server/common.cc index d28576cd7..58218f635 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -406,7 +406,6 @@ GenericError Context::ReportErrorInternal(GenericError&& err) { if (err_handler_) err_handler_fb_ = fb2::Fiber("report_internal_error", err_handler_, err_); - Cancellation::Cancel(); return err_; } diff --git a/src/server/io_utils.cc b/src/server/io_utils.cc deleted file mode 100644 index 7dcb28fc4..000000000 --- a/src/server/io_utils.cc +++ /dev/null @@ -1,94 +0,0 @@ -// Copyright 2022, DragonflyDB authors. All rights reserved. -// See LICENSE for licensing terms. -// - -#include "server/io_utils.h" - -#include "base/flags.h" -#include "server/error.h" - -using namespace std; - -namespace dfly { - -io::Result BufferedStreamerBase::WriteSome(const iovec* vec, uint32_t len) { - // Shrink producer_buf_ only if it is empty, its capacity reached 10 times more than max buffer - // memory and the write len is less than max buffer memory. - if (producer_buf_.InputLen() == 0 && producer_buf_.Capacity() > max_buffered_mem_ * 10) { - uint32_t write_len = 0; - for (uint32_t i = 0; i < len; ++i) { - write_len += vec->iov_len; - } - if (write_len < max_buffered_mem_) { - producer_buf_ = io::IoBuf{max_buffered_mem_}; - } - } - - return io::BufSink{&producer_buf_}.WriteSome(vec, len); -} - -void BufferedStreamerBase::NotifyWritten(bool allow_await) { - if (IsStopped()) - return; - buffered_++; - // Wake up the consumer. - waker_.notify(); - // Block if we're stalled because the consumer is not keeping up. - if (allow_await) { - waker_.await([this]() { return !IsStalled() || IsStopped(); }); - } -} - -void BufferedStreamerBase::AwaitIfWritten() { - if (IsStopped()) - return; - if (buffered_) { - waker_.await([this]() { return !IsStalled() || IsStopped(); }); - } -} - -error_code BufferedStreamerBase::ConsumeIntoSink(io::Sink* dest) { - while (!IsStopped()) { - // Wait for more data or stop signal. - waker_.await([this]() { return buffered_ > 0 || IsStopped(); }); - // Break immediately on cancellation. - if (IsStopped()) { - break; - } - - // Swap producer and consumer buffers - std::swap(producer_buf_, consumer_buf_); - buffered_ = 0; - - // If producer stalled, notify we consumed data and it can unblock. - waker_.notifyAll(); - - // Write data and check for errors. - if (auto ec = dest->Write(consumer_buf_.InputBuffer()); ec) { - Finalize(); // Finalize on error to unblock prodcer immediately. - return ec; - } - - consumer_buf_.Clear(); - } - return std::error_code{}; -} - -void BufferedStreamerBase::Finalize() { - producer_done_ = true; - waker_.notifyAll(); -} - -bool BufferedStreamerBase::IsStopped() { - return cll_->IsCancelled() || producer_done_; -} - -bool BufferedStreamerBase::IsStalled() { - return buffered_ > max_buffered_cnt_ || producer_buf_.InputLen() > max_buffered_mem_; -} - -size_t BufferedStreamerBase::GetTotalBufferCapacities() const { - return consumer_buf_.Capacity() + producer_buf_.Capacity(); -} - -} // namespace dfly diff --git a/src/server/io_utils.h b/src/server/io_utils.h deleted file mode 100644 index da93694fe..000000000 --- a/src/server/io_utils.h +++ /dev/null @@ -1,78 +0,0 @@ -// Copyright 2022, DragonflyDB authors. All rights reserved. -// See LICENSE for licensing terms. -// - -#include "io/io.h" -#include "io/io_buf.h" -#include "server/common.h" - -namespace dfly { - -// Base for constructing buffered byte streams with backpressure -// for single producer and consumer on the same thread. -// -// Use it as a io::Sink to write data from a producer fiber, -// and ConsumeIntoSink to extract this data in a consumer fiber. -// Use NotifyWritten to request the consumer to be woken up. -// -// Uses two base::IoBuf internally that are swapped in turns. -class BufferedStreamerBase : public io::Sink { - protected: - // Initialize with global cancellation and optional stall conditions. - BufferedStreamerBase(const Cancellation* cll, unsigned max_buffered_cnt = 5, - unsigned max_buffered_mem = 8192) - : cll_{cll}, max_buffered_cnt_{max_buffered_cnt}, max_buffered_mem_{max_buffered_mem} { - } - - public: - size_t GetTotalBufferCapacities() const; - - protected: - // Write some data into the internal buffer. - // - // Consumer needs to be woken up manually with NotifyWritten to avoid waking it up for small - // writes: - // - // while (should_write()) { - // bsb->WriteSome(...); <- Write some data - // bsb->WriteSome(...); - // ... - // bsb->NotifyWritten(); <- Wake up consumer after writes - // } - // bsb->Finalize(); <- Finalize to unblock consumer - // - io::Result WriteSome(const iovec* vec, uint32_t len) override; - - // Report that a batch of data has been written and the consumer can be woken up. - // Blocks if the consumer if not keeping up, if allow_await is set to true. - void NotifyWritten(bool allow_await); - - // Blocks the if the consumer if not keeping up. - void AwaitIfWritten(); - - // Report producer finished. - void Finalize(); - - // Consume whole stream to sink from the consumer fiber. Unblocks when cancelled or finalized. - std::error_code ConsumeIntoSink(io::Sink* dest); - - // Whether the consumer is not keeping up. - bool IsStalled(); - - // Whether the producer stopped or the context was cancelled. - bool IsStopped(); - - protected: - bool producer_done_ = false; // whether producer is done - unsigned buffered_ = 0; // how many entries are buffered - util::fb2::EventCount waker_; // two sided waker - - const Cancellation* cll_; // global cancellation - - unsigned max_buffered_cnt_; // Max buffered entries before stall - unsigned max_buffered_mem_; // Max buffered mem before stall - - io::IoBuf producer_buf_, consumer_buf_; // Two buffers that are swapped in turns. -}; - -} // namespace dfly diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 2495eb8c7..c00c2ae87 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -6,67 +6,191 @@ #include +#include "base/flags.h" #include "base/logging.h" #include "server/cluster/cluster_defs.h" +using namespace facade; + +ABSL_FLAG(uint32_t, replication_stream_timeout, 500, + "Time in milliseconds to wait for the replication output buffer go below " + "the throttle limit."); +ABSL_FLAG(uint32_t, replication_stream_output_limit, 64_KB, + "Time to wait for the replication output buffer go below the throttle limit"); + namespace dfly { using namespace util; +using namespace journal; -void JournalStreamer::Start(io::Sink* dest, bool send_lsn) { - using namespace journal; - write_fb_ = fb2::Fiber("journal_stream", &JournalStreamer::WriterFb, this, dest); +namespace { + +iovec IoVec(io::Bytes src) { + return iovec{const_cast(src.data()), src.size()}; +} + +constexpr size_t kFlushThreshold = 2_KB; +uint32_t replication_stream_output_limit_cached = 64_KB; + +} // namespace + +JournalStreamer::JournalStreamer(journal::Journal* journal, Context* cntx) + : journal_(journal), cntx_(cntx) { + // cache the flag to avoid accessing it later. + replication_stream_output_limit_cached = absl::GetFlag(FLAGS_replication_stream_output_limit); +} + +JournalStreamer::~JournalStreamer() { + DCHECK_EQ(in_flight_bytes_, 0u); + VLOG(1) << "~JournalStreamer"; +} + +void JournalStreamer::Start(io::AsyncSink* dest, bool send_lsn) { + CHECK(dest_ == nullptr && dest != nullptr); + dest_ = dest; journal_cb_id_ = journal_->RegisterOnChange([this, send_lsn](const JournalItem& item, bool allow_await) { + if (allow_await) { + ThrottleIfNeeded(); + // No record to write, just await if data was written so consumer will read the data. + if (item.opcode == Op::NOOP) + return; + } + if (!ShouldWrite(item)) { return; } - if (item.opcode == Op::NOOP) { - // No record to write, just await if data was written so consumer will read the data. - return AwaitIfWritten(); - } - - Write(io::Buffer(item.data)); + Write(item.data); time_t now = time(nullptr); + + // TODO: to chain it to the previous Write call. if (send_lsn && now - last_lsn_time_ > 3) { last_lsn_time_ = now; - base::IoBuf tmp; - io::BufSink sink(&tmp); + io::StringSink sink; JournalWriter writer(&sink); writer.Write(Entry{journal::Op::LSN, item.lsn}); - Write(io::Buffer(io::View(tmp.InputBuffer()))); + Write(sink.str()); } - NotifyWritten(allow_await); }); } void JournalStreamer::Cancel() { - Finalize(); // Finalize must be called before UnregisterOnChange because we first need to stop - // writing to buffer and notify the all the producers. - // Writing to journal holds mutex protecting change_cb_arr_, than the fiber can - // preemt when calling NotifyWritten and it will not run again till notified. - // UnregisterOnChange will try to lock the mutex therefor calling UnregisterOnChange - // before Finalize may cause deadlock. + VLOG(1) << "JournalStreamer::Cancel"; + waker_.notifyAll(); journal_->UnregisterOnChange(journal_cb_id_); + WaitForInflightToComplete(); +} - if (write_fb_.IsJoinable()) { - write_fb_.Join(); +size_t JournalStreamer::GetTotalBufferCapacities() const { + return in_flight_bytes_ + pending_buf_.capacity(); +} + +void JournalStreamer::Write(std::string_view str) { + DCHECK(!str.empty()); + DVLOG(2) << "Writing " << str.size() << " bytes"; + + // If we do not have any in flight requests we send the string right a way. + // We can not aggregate it since we do not know when the next update will follow. + size_t total_pending = pending_buf_.size() + str.size(); + if (in_flight_bytes_ == 0 || total_pending > kFlushThreshold) { + // because of potential SOO with strings we allocate explicitly on heap + uint8_t* buf(new uint8_t[str.size()]); + + // TODO: it is possible to remove these redundant copies if we adjust high level + // interfaces to pass reference-counted buffers. + memcpy(buf, str.data(), str.size()); + in_flight_bytes_ += total_pending; + + iovec v[2]; + unsigned next_buf_id = 0; + + if (!pending_buf_.empty()) { + v[0] = IoVec(pending_buf_); + ++next_buf_id; + } + v[next_buf_id++] = IoVec(io::Bytes(buf, str.size())); + + dest_->AsyncWrite( + v, next_buf_id, + [buf0 = std::move(pending_buf_), buf, this, len = total_pending](std::error_code ec) { + delete[] buf; + OnCompletion(ec, len); + }); + + return; + } + + DCHECK_GT(in_flight_bytes_, 0u); + DCHECK_LE(pending_buf_.size() + str.size(), kFlushThreshold); + + // Aggregate + size_t tail = pending_buf_.size(); + pending_buf_.resize(pending_buf_.size() + str.size()); + memcpy(pending_buf_.data() + tail, str.data(), str.size()); +} + +void JournalStreamer::OnCompletion(std::error_code ec, size_t len) { + DCHECK_GE(in_flight_bytes_, len); + + DVLOG(2) << "Completing from " << in_flight_bytes_ << " to " << in_flight_bytes_ - len; + in_flight_bytes_ -= len; + if (ec && !IsStopped()) { + cntx_->ReportError(ec); + } else if (in_flight_bytes_ == 0 && !pending_buf_.empty() && !IsStopped()) { + // If everything was sent but we have a pending buf, flush it. + io::Bytes src(pending_buf_); + in_flight_bytes_ += src.size(); + dest_->AsyncWrite(src, [buf = std::move(pending_buf_), this](std::error_code ec) { + OnCompletion(ec, buf.size()); + }); + } + + // notify ThrottleIfNeeded or WaitForInflightToComplete that waits + // for all the completions to finish. + // ThrottleIfNeeded can run from multiple fibers in the journal thread. + // For example, from Heartbeat calling TriggerJournalWriteToSink to flush potential + // expiration deletions and there are other cases as well. + waker_.notifyAll(); +} + +void JournalStreamer::ThrottleIfNeeded() { + if (IsStopped() || !IsStalled()) + return; + + auto next = chrono::steady_clock::now() + + chrono::milliseconds(absl::GetFlag(FLAGS_replication_stream_timeout)); + auto inflight_start = in_flight_bytes_; + + std::cv_status status = + waker_.await_until([this]() { return !IsStalled() || IsStopped(); }, next); + if (status == std::cv_status::timeout) { + LOG(WARNING) << "Stream timed out, inflight bytes start: " << inflight_start + << ", end: " << in_flight_bytes_; + cntx_->ReportError(make_error_code(errc::stream_timeout)); } } -void JournalStreamer::WriterFb(io::Sink* dest) { - if (auto ec = ConsumeIntoSink(dest); ec) { - cntx_->ReportError(ec); +void JournalStreamer::WaitForInflightToComplete() { + while (in_flight_bytes_) { + auto next = chrono::steady_clock::now() + 1s; + std::cv_status status = + waker_.await_until([this] { return this->in_flight_bytes_ == 0; }, next); + LOG_IF(WARNING, status == std::cv_status::timeout) + << "Waiting for inflight bytes " << in_flight_bytes_; } } +bool JournalStreamer::IsStalled() const { + return in_flight_bytes_ >= replication_stream_output_limit_cached; +} + RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal, Context* cntx) : JournalStreamer(journal, cntx), db_slice_(slice), my_slots_(std::move(slots)) { DCHECK(slice != nullptr); } -void RestoreStreamer::Start(io::Sink* dest, bool send_lsn) { +void RestoreStreamer::Start(io::AsyncSink* dest, bool send_lsn) { VLOG(1) << "RestoreStreamer start"; auto db_cb = absl::bind_front(&RestoreStreamer::OnDbChange, this); snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb)); @@ -78,7 +202,7 @@ void RestoreStreamer::Start(io::Sink* dest, bool send_lsn) { PrimeTable* pt = &db_slice_->databases()[0]->prime; do { - if (fiber_cancellation_.IsCancelled()) + if (fiber_cancelled_) return; bool written = false; @@ -90,11 +214,10 @@ void RestoreStreamer::Start(io::Sink* dest, bool send_lsn) { } }); if (written) { - NotifyWritten(true); + ThrottleIfNeeded(); } - ++last_yield; - if (last_yield >= 100) { + if (++last_yield >= 100) { ThisFiber::Yield(); last_yield = 0; } @@ -105,9 +228,14 @@ void RestoreStreamer::SendFinalize() { VLOG(1) << "RestoreStreamer FIN opcode for : " << db_slice_->shard_id(); journal::Entry entry(journal::Op::FIN, 0 /*db_id*/, 0 /*slot_id*/); - JournalWriter writer{this}; + io::StringSink sink; + JournalWriter writer{&sink}; writer.Write(entry); - NotifyWritten(true); + Write(sink.str()); + + // TODO: is the intent here to flush everything? + // + ThrottleIfNeeded(); } RestoreStreamer::~RestoreStreamer() { @@ -117,7 +245,7 @@ void RestoreStreamer::Cancel() { auto sver = snapshot_version_; snapshot_version_ = 0; // to prevent double cancel in another fiber if (sver != 0) { - fiber_cancellation_.Cancel(); + fiber_cancelled_ = true; db_slice_->UnregisterOnChange(sver); JournalStreamer::Cancel(); } @@ -176,16 +304,12 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req PrimeTable* table = db_slice_->GetTables(0).first; if (const PrimeTable::bucket_iterator* bit = req.update()) { - if (WriteBucket(*bit)) { - NotifyWritten(false); - } + WriteBucket(*bit); } else { string_view key = get(req.change); table->CVCUponInsert(snapshot_version_, key, [this](PrimeTable::bucket_iterator it) { DCHECK_LT(it.GetVersion(), snapshot_version_); - if (WriteBucket(it)) { - NotifyWritten(false); - } + WriteBucket(it); }); } } @@ -214,8 +338,12 @@ void RestoreStreamer::WriteCommand(journal::Entry::Payload cmd_payload) { 0, // slot-id, but it is ignored at this level cmd_payload); - JournalWriter writer{this}; + // TODO: From WriteEntry to till Write we tripple copy the PrimeValue. It's ver in-efficient and + // will burn CPU for large values. + io::StringSink sink; + JournalWriter writer{&sink}; writer.Write(entry); + Write(sink.str()); } } // namespace dfly diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 9652a0344..7cb8b34bf 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -5,7 +5,6 @@ #pragma once #include "server/db_slice.h" -#include "server/io_utils.h" #include "server/journal/journal.h" #include "server/journal/serializer.h" #include "server/rdb_save.h" @@ -14,39 +13,58 @@ namespace dfly { // Buffered single-shard journal streamer that listens for journal changes with a // journal listener and writes them to a destination sink in a separate fiber. -class JournalStreamer : protected BufferedStreamerBase { +class JournalStreamer { public: - JournalStreamer(journal::Journal* journal, Context* cntx) - : BufferedStreamerBase{cntx->GetCancellation()}, cntx_{cntx}, journal_{journal} { - } + JournalStreamer(journal::Journal* journal, Context* cntx); + virtual ~JournalStreamer(); // Self referential. JournalStreamer(const JournalStreamer& other) = delete; JournalStreamer(JournalStreamer&& other) = delete; // Register journal listener and start writer in fiber. - virtual void Start(io::Sink* dest, bool send_lsn); + virtual void Start(io::AsyncSink* dest, bool send_lsn); // Must be called on context cancellation for unblocking // and manual cleanup. virtual void Cancel(); - using BufferedStreamerBase::GetTotalBufferCapacities; + size_t GetTotalBufferCapacities() const; + + protected: + // TODO: we copy the string on each write because JournalItem may be passed to multiple + // streamers so we can not move it. However, if we would either wrap JournalItem in shared_ptr + // or wrap JournalItem::data in shared_ptr, we can avoid the cost of copying strings. + // Also, for small strings it's more peformant to copy to the intermediate buffer than + // to issue an io operation. + void Write(std::string_view str); + + // Blocks the if the consumer if not keeping up. + void ThrottleIfNeeded(); - private: - // Writer fiber that steals buffer contents and writes them to dest. - void WriterFb(io::Sink* dest); virtual bool ShouldWrite(const journal::JournalItem& item) const { - return true; + return !IsStopped(); } - Context* cntx_; + void WaitForInflightToComplete(); + + private: + void OnCompletion(std::error_code ec, size_t len); + + bool IsStopped() const { + return cntx_->IsCancelled(); + } + + bool IsStalled() const; - uint32_t journal_cb_id_{0}; journal::Journal* journal_; + Context* cntx_; + io::AsyncSink* dest_ = nullptr; + std::vector pending_buf_; + size_t in_flight_bytes_ = 0; time_t last_lsn_time_ = 0; - - util::fb2::Fiber write_fb_{}; + util::fb2::EventCount waker_; + uint32_t journal_cb_id_{0}; }; // Serializes existing DB as RESTORE commands, and sends updates as regular commands. @@ -56,7 +74,7 @@ class RestoreStreamer : public JournalStreamer { RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal, Context* cntx); ~RestoreStreamer() override; - void Start(io::Sink* dest, bool send_lsn = false) override; + void Start(io::AsyncSink* dest, bool send_lsn = false) override; // Cancel() must be called if Start() is called void Cancel() override; @@ -80,7 +98,7 @@ class RestoreStreamer : public JournalStreamer { DbSlice* db_slice_; uint64_t snapshot_version_ = 0; cluster::SlotSet my_slots_; - Cancellation fiber_cancellation_; + bool fiber_cancelled_ = false; bool snapshot_finished_ = false; };