mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
chore: Streamer is rewritten with async interface (#3108)
* chore: Streamer is rewritten with async interface Signed-off-by: Roman Gershman <roman@dragonflydb.io> --------- Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
d2ae0ab75c
commit
a80063189e
6 changed files with 203 additions and 230 deletions
|
@ -52,7 +52,7 @@ add_library(dragonfly_lib bloom_family.cc engine_shard_set.cc channel_store.cc
|
||||||
detail/save_stages_controller.cc
|
detail/save_stages_controller.cc
|
||||||
detail/snapshot_storage.cc
|
detail/snapshot_storage.cc
|
||||||
set_family.cc stream_family.cc string_family.cc
|
set_family.cc stream_family.cc string_family.cc
|
||||||
zset_family.cc version.cc bitops_family.cc container_utils.cc io_utils.cc
|
zset_family.cc version.cc bitops_family.cc container_utils.cc
|
||||||
top_keys.cc multi_command_squasher.cc hll_family.cc
|
top_keys.cc multi_command_squasher.cc hll_family.cc
|
||||||
${DF_SEARCH_SRCS}
|
${DF_SEARCH_SRCS}
|
||||||
${DF_LINUX_SRCS}
|
${DF_LINUX_SRCS}
|
||||||
|
|
|
@ -406,7 +406,6 @@ GenericError Context::ReportErrorInternal(GenericError&& err) {
|
||||||
|
|
||||||
if (err_handler_)
|
if (err_handler_)
|
||||||
err_handler_fb_ = fb2::Fiber("report_internal_error", err_handler_, err_);
|
err_handler_fb_ = fb2::Fiber("report_internal_error", err_handler_, err_);
|
||||||
|
|
||||||
Cancellation::Cancel();
|
Cancellation::Cancel();
|
||||||
return err_;
|
return err_;
|
||||||
}
|
}
|
||||||
|
|
|
@ -1,94 +0,0 @@
|
||||||
// Copyright 2022, DragonflyDB authors. All rights reserved.
|
|
||||||
// See LICENSE for licensing terms.
|
|
||||||
//
|
|
||||||
|
|
||||||
#include "server/io_utils.h"
|
|
||||||
|
|
||||||
#include "base/flags.h"
|
|
||||||
#include "server/error.h"
|
|
||||||
|
|
||||||
using namespace std;
|
|
||||||
|
|
||||||
namespace dfly {
|
|
||||||
|
|
||||||
io::Result<size_t> BufferedStreamerBase::WriteSome(const iovec* vec, uint32_t len) {
|
|
||||||
// Shrink producer_buf_ only if it is empty, its capacity reached 10 times more than max buffer
|
|
||||||
// memory and the write len is less than max buffer memory.
|
|
||||||
if (producer_buf_.InputLen() == 0 && producer_buf_.Capacity() > max_buffered_mem_ * 10) {
|
|
||||||
uint32_t write_len = 0;
|
|
||||||
for (uint32_t i = 0; i < len; ++i) {
|
|
||||||
write_len += vec->iov_len;
|
|
||||||
}
|
|
||||||
if (write_len < max_buffered_mem_) {
|
|
||||||
producer_buf_ = io::IoBuf{max_buffered_mem_};
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return io::BufSink{&producer_buf_}.WriteSome(vec, len);
|
|
||||||
}
|
|
||||||
|
|
||||||
void BufferedStreamerBase::NotifyWritten(bool allow_await) {
|
|
||||||
if (IsStopped())
|
|
||||||
return;
|
|
||||||
buffered_++;
|
|
||||||
// Wake up the consumer.
|
|
||||||
waker_.notify();
|
|
||||||
// Block if we're stalled because the consumer is not keeping up.
|
|
||||||
if (allow_await) {
|
|
||||||
waker_.await([this]() { return !IsStalled() || IsStopped(); });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
void BufferedStreamerBase::AwaitIfWritten() {
|
|
||||||
if (IsStopped())
|
|
||||||
return;
|
|
||||||
if (buffered_) {
|
|
||||||
waker_.await([this]() { return !IsStalled() || IsStopped(); });
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
error_code BufferedStreamerBase::ConsumeIntoSink(io::Sink* dest) {
|
|
||||||
while (!IsStopped()) {
|
|
||||||
// Wait for more data or stop signal.
|
|
||||||
waker_.await([this]() { return buffered_ > 0 || IsStopped(); });
|
|
||||||
// Break immediately on cancellation.
|
|
||||||
if (IsStopped()) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
// Swap producer and consumer buffers
|
|
||||||
std::swap(producer_buf_, consumer_buf_);
|
|
||||||
buffered_ = 0;
|
|
||||||
|
|
||||||
// If producer stalled, notify we consumed data and it can unblock.
|
|
||||||
waker_.notifyAll();
|
|
||||||
|
|
||||||
// Write data and check for errors.
|
|
||||||
if (auto ec = dest->Write(consumer_buf_.InputBuffer()); ec) {
|
|
||||||
Finalize(); // Finalize on error to unblock prodcer immediately.
|
|
||||||
return ec;
|
|
||||||
}
|
|
||||||
|
|
||||||
consumer_buf_.Clear();
|
|
||||||
}
|
|
||||||
return std::error_code{};
|
|
||||||
}
|
|
||||||
|
|
||||||
void BufferedStreamerBase::Finalize() {
|
|
||||||
producer_done_ = true;
|
|
||||||
waker_.notifyAll();
|
|
||||||
}
|
|
||||||
|
|
||||||
bool BufferedStreamerBase::IsStopped() {
|
|
||||||
return cll_->IsCancelled() || producer_done_;
|
|
||||||
}
|
|
||||||
|
|
||||||
bool BufferedStreamerBase::IsStalled() {
|
|
||||||
return buffered_ > max_buffered_cnt_ || producer_buf_.InputLen() > max_buffered_mem_;
|
|
||||||
}
|
|
||||||
|
|
||||||
size_t BufferedStreamerBase::GetTotalBufferCapacities() const {
|
|
||||||
return consumer_buf_.Capacity() + producer_buf_.Capacity();
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace dfly
|
|
|
@ -1,78 +0,0 @@
|
||||||
// Copyright 2022, DragonflyDB authors. All rights reserved.
|
|
||||||
// See LICENSE for licensing terms.
|
|
||||||
//
|
|
||||||
|
|
||||||
#include "io/io.h"
|
|
||||||
#include "io/io_buf.h"
|
|
||||||
#include "server/common.h"
|
|
||||||
|
|
||||||
namespace dfly {
|
|
||||||
|
|
||||||
// Base for constructing buffered byte streams with backpressure
|
|
||||||
// for single producer and consumer on the same thread.
|
|
||||||
//
|
|
||||||
// Use it as a io::Sink to write data from a producer fiber,
|
|
||||||
// and ConsumeIntoSink to extract this data in a consumer fiber.
|
|
||||||
// Use NotifyWritten to request the consumer to be woken up.
|
|
||||||
//
|
|
||||||
// Uses two base::IoBuf internally that are swapped in turns.
|
|
||||||
class BufferedStreamerBase : public io::Sink {
|
|
||||||
protected:
|
|
||||||
// Initialize with global cancellation and optional stall conditions.
|
|
||||||
BufferedStreamerBase(const Cancellation* cll, unsigned max_buffered_cnt = 5,
|
|
||||||
unsigned max_buffered_mem = 8192)
|
|
||||||
: cll_{cll}, max_buffered_cnt_{max_buffered_cnt}, max_buffered_mem_{max_buffered_mem} {
|
|
||||||
}
|
|
||||||
|
|
||||||
public:
|
|
||||||
size_t GetTotalBufferCapacities() const;
|
|
||||||
|
|
||||||
protected:
|
|
||||||
// Write some data into the internal buffer.
|
|
||||||
//
|
|
||||||
// Consumer needs to be woken up manually with NotifyWritten to avoid waking it up for small
|
|
||||||
// writes:
|
|
||||||
//
|
|
||||||
// while (should_write()) {
|
|
||||||
// bsb->WriteSome(...); <- Write some data
|
|
||||||
// bsb->WriteSome(...);
|
|
||||||
// ...
|
|
||||||
// bsb->NotifyWritten(); <- Wake up consumer after writes
|
|
||||||
// }
|
|
||||||
// bsb->Finalize(); <- Finalize to unblock consumer
|
|
||||||
//
|
|
||||||
io::Result<size_t> WriteSome(const iovec* vec, uint32_t len) override;
|
|
||||||
|
|
||||||
// Report that a batch of data has been written and the consumer can be woken up.
|
|
||||||
// Blocks if the consumer if not keeping up, if allow_await is set to true.
|
|
||||||
void NotifyWritten(bool allow_await);
|
|
||||||
|
|
||||||
// Blocks the if the consumer if not keeping up.
|
|
||||||
void AwaitIfWritten();
|
|
||||||
|
|
||||||
// Report producer finished.
|
|
||||||
void Finalize();
|
|
||||||
|
|
||||||
// Consume whole stream to sink from the consumer fiber. Unblocks when cancelled or finalized.
|
|
||||||
std::error_code ConsumeIntoSink(io::Sink* dest);
|
|
||||||
|
|
||||||
// Whether the consumer is not keeping up.
|
|
||||||
bool IsStalled();
|
|
||||||
|
|
||||||
// Whether the producer stopped or the context was cancelled.
|
|
||||||
bool IsStopped();
|
|
||||||
|
|
||||||
protected:
|
|
||||||
bool producer_done_ = false; // whether producer is done
|
|
||||||
unsigned buffered_ = 0; // how many entries are buffered
|
|
||||||
util::fb2::EventCount waker_; // two sided waker
|
|
||||||
|
|
||||||
const Cancellation* cll_; // global cancellation
|
|
||||||
|
|
||||||
unsigned max_buffered_cnt_; // Max buffered entries before stall
|
|
||||||
unsigned max_buffered_mem_; // Max buffered mem before stall
|
|
||||||
|
|
||||||
io::IoBuf producer_buf_, consumer_buf_; // Two buffers that are swapped in turns.
|
|
||||||
};
|
|
||||||
|
|
||||||
} // namespace dfly
|
|
|
@ -6,67 +6,191 @@
|
||||||
|
|
||||||
#include <absl/functional/bind_front.h>
|
#include <absl/functional/bind_front.h>
|
||||||
|
|
||||||
|
#include "base/flags.h"
|
||||||
#include "base/logging.h"
|
#include "base/logging.h"
|
||||||
#include "server/cluster/cluster_defs.h"
|
#include "server/cluster/cluster_defs.h"
|
||||||
|
|
||||||
|
using namespace facade;
|
||||||
|
|
||||||
|
ABSL_FLAG(uint32_t, replication_stream_timeout, 500,
|
||||||
|
"Time in milliseconds to wait for the replication output buffer go below "
|
||||||
|
"the throttle limit.");
|
||||||
|
ABSL_FLAG(uint32_t, replication_stream_output_limit, 64_KB,
|
||||||
|
"Time to wait for the replication output buffer go below the throttle limit");
|
||||||
|
|
||||||
namespace dfly {
|
namespace dfly {
|
||||||
using namespace util;
|
using namespace util;
|
||||||
|
using namespace journal;
|
||||||
|
|
||||||
void JournalStreamer::Start(io::Sink* dest, bool send_lsn) {
|
namespace {
|
||||||
using namespace journal;
|
|
||||||
write_fb_ = fb2::Fiber("journal_stream", &JournalStreamer::WriterFb, this, dest);
|
iovec IoVec(io::Bytes src) {
|
||||||
|
return iovec{const_cast<uint8_t*>(src.data()), src.size()};
|
||||||
|
}
|
||||||
|
|
||||||
|
constexpr size_t kFlushThreshold = 2_KB;
|
||||||
|
uint32_t replication_stream_output_limit_cached = 64_KB;
|
||||||
|
|
||||||
|
} // namespace
|
||||||
|
|
||||||
|
JournalStreamer::JournalStreamer(journal::Journal* journal, Context* cntx)
|
||||||
|
: journal_(journal), cntx_(cntx) {
|
||||||
|
// cache the flag to avoid accessing it later.
|
||||||
|
replication_stream_output_limit_cached = absl::GetFlag(FLAGS_replication_stream_output_limit);
|
||||||
|
}
|
||||||
|
|
||||||
|
JournalStreamer::~JournalStreamer() {
|
||||||
|
DCHECK_EQ(in_flight_bytes_, 0u);
|
||||||
|
VLOG(1) << "~JournalStreamer";
|
||||||
|
}
|
||||||
|
|
||||||
|
void JournalStreamer::Start(io::AsyncSink* dest, bool send_lsn) {
|
||||||
|
CHECK(dest_ == nullptr && dest != nullptr);
|
||||||
|
dest_ = dest;
|
||||||
journal_cb_id_ =
|
journal_cb_id_ =
|
||||||
journal_->RegisterOnChange([this, send_lsn](const JournalItem& item, bool allow_await) {
|
journal_->RegisterOnChange([this, send_lsn](const JournalItem& item, bool allow_await) {
|
||||||
|
if (allow_await) {
|
||||||
|
ThrottleIfNeeded();
|
||||||
|
// No record to write, just await if data was written so consumer will read the data.
|
||||||
|
if (item.opcode == Op::NOOP)
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
if (!ShouldWrite(item)) {
|
if (!ShouldWrite(item)) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
if (item.opcode == Op::NOOP) {
|
Write(item.data);
|
||||||
// No record to write, just await if data was written so consumer will read the data.
|
|
||||||
return AwaitIfWritten();
|
|
||||||
}
|
|
||||||
|
|
||||||
Write(io::Buffer(item.data));
|
|
||||||
time_t now = time(nullptr);
|
time_t now = time(nullptr);
|
||||||
|
|
||||||
|
// TODO: to chain it to the previous Write call.
|
||||||
if (send_lsn && now - last_lsn_time_ > 3) {
|
if (send_lsn && now - last_lsn_time_ > 3) {
|
||||||
last_lsn_time_ = now;
|
last_lsn_time_ = now;
|
||||||
base::IoBuf tmp;
|
io::StringSink sink;
|
||||||
io::BufSink sink(&tmp);
|
|
||||||
JournalWriter writer(&sink);
|
JournalWriter writer(&sink);
|
||||||
writer.Write(Entry{journal::Op::LSN, item.lsn});
|
writer.Write(Entry{journal::Op::LSN, item.lsn});
|
||||||
Write(io::Buffer(io::View(tmp.InputBuffer())));
|
Write(sink.str());
|
||||||
}
|
}
|
||||||
NotifyWritten(allow_await);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void JournalStreamer::Cancel() {
|
void JournalStreamer::Cancel() {
|
||||||
Finalize(); // Finalize must be called before UnregisterOnChange because we first need to stop
|
VLOG(1) << "JournalStreamer::Cancel";
|
||||||
// writing to buffer and notify the all the producers.
|
waker_.notifyAll();
|
||||||
// Writing to journal holds mutex protecting change_cb_arr_, than the fiber can
|
|
||||||
// preemt when calling NotifyWritten and it will not run again till notified.
|
|
||||||
// UnregisterOnChange will try to lock the mutex therefor calling UnregisterOnChange
|
|
||||||
// before Finalize may cause deadlock.
|
|
||||||
journal_->UnregisterOnChange(journal_cb_id_);
|
journal_->UnregisterOnChange(journal_cb_id_);
|
||||||
|
WaitForInflightToComplete();
|
||||||
|
}
|
||||||
|
|
||||||
if (write_fb_.IsJoinable()) {
|
size_t JournalStreamer::GetTotalBufferCapacities() const {
|
||||||
write_fb_.Join();
|
return in_flight_bytes_ + pending_buf_.capacity();
|
||||||
|
}
|
||||||
|
|
||||||
|
void JournalStreamer::Write(std::string_view str) {
|
||||||
|
DCHECK(!str.empty());
|
||||||
|
DVLOG(2) << "Writing " << str.size() << " bytes";
|
||||||
|
|
||||||
|
// If we do not have any in flight requests we send the string right a way.
|
||||||
|
// We can not aggregate it since we do not know when the next update will follow.
|
||||||
|
size_t total_pending = pending_buf_.size() + str.size();
|
||||||
|
if (in_flight_bytes_ == 0 || total_pending > kFlushThreshold) {
|
||||||
|
// because of potential SOO with strings we allocate explicitly on heap
|
||||||
|
uint8_t* buf(new uint8_t[str.size()]);
|
||||||
|
|
||||||
|
// TODO: it is possible to remove these redundant copies if we adjust high level
|
||||||
|
// interfaces to pass reference-counted buffers.
|
||||||
|
memcpy(buf, str.data(), str.size());
|
||||||
|
in_flight_bytes_ += total_pending;
|
||||||
|
|
||||||
|
iovec v[2];
|
||||||
|
unsigned next_buf_id = 0;
|
||||||
|
|
||||||
|
if (!pending_buf_.empty()) {
|
||||||
|
v[0] = IoVec(pending_buf_);
|
||||||
|
++next_buf_id;
|
||||||
|
}
|
||||||
|
v[next_buf_id++] = IoVec(io::Bytes(buf, str.size()));
|
||||||
|
|
||||||
|
dest_->AsyncWrite(
|
||||||
|
v, next_buf_id,
|
||||||
|
[buf0 = std::move(pending_buf_), buf, this, len = total_pending](std::error_code ec) {
|
||||||
|
delete[] buf;
|
||||||
|
OnCompletion(ec, len);
|
||||||
|
});
|
||||||
|
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
DCHECK_GT(in_flight_bytes_, 0u);
|
||||||
|
DCHECK_LE(pending_buf_.size() + str.size(), kFlushThreshold);
|
||||||
|
|
||||||
|
// Aggregate
|
||||||
|
size_t tail = pending_buf_.size();
|
||||||
|
pending_buf_.resize(pending_buf_.size() + str.size());
|
||||||
|
memcpy(pending_buf_.data() + tail, str.data(), str.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
void JournalStreamer::OnCompletion(std::error_code ec, size_t len) {
|
||||||
|
DCHECK_GE(in_flight_bytes_, len);
|
||||||
|
|
||||||
|
DVLOG(2) << "Completing from " << in_flight_bytes_ << " to " << in_flight_bytes_ - len;
|
||||||
|
in_flight_bytes_ -= len;
|
||||||
|
if (ec && !IsStopped()) {
|
||||||
|
cntx_->ReportError(ec);
|
||||||
|
} else if (in_flight_bytes_ == 0 && !pending_buf_.empty() && !IsStopped()) {
|
||||||
|
// If everything was sent but we have a pending buf, flush it.
|
||||||
|
io::Bytes src(pending_buf_);
|
||||||
|
in_flight_bytes_ += src.size();
|
||||||
|
dest_->AsyncWrite(src, [buf = std::move(pending_buf_), this](std::error_code ec) {
|
||||||
|
OnCompletion(ec, buf.size());
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
// notify ThrottleIfNeeded or WaitForInflightToComplete that waits
|
||||||
|
// for all the completions to finish.
|
||||||
|
// ThrottleIfNeeded can run from multiple fibers in the journal thread.
|
||||||
|
// For example, from Heartbeat calling TriggerJournalWriteToSink to flush potential
|
||||||
|
// expiration deletions and there are other cases as well.
|
||||||
|
waker_.notifyAll();
|
||||||
|
}
|
||||||
|
|
||||||
|
void JournalStreamer::ThrottleIfNeeded() {
|
||||||
|
if (IsStopped() || !IsStalled())
|
||||||
|
return;
|
||||||
|
|
||||||
|
auto next = chrono::steady_clock::now() +
|
||||||
|
chrono::milliseconds(absl::GetFlag(FLAGS_replication_stream_timeout));
|
||||||
|
auto inflight_start = in_flight_bytes_;
|
||||||
|
|
||||||
|
std::cv_status status =
|
||||||
|
waker_.await_until([this]() { return !IsStalled() || IsStopped(); }, next);
|
||||||
|
if (status == std::cv_status::timeout) {
|
||||||
|
LOG(WARNING) << "Stream timed out, inflight bytes start: " << inflight_start
|
||||||
|
<< ", end: " << in_flight_bytes_;
|
||||||
|
cntx_->ReportError(make_error_code(errc::stream_timeout));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
void JournalStreamer::WriterFb(io::Sink* dest) {
|
void JournalStreamer::WaitForInflightToComplete() {
|
||||||
if (auto ec = ConsumeIntoSink(dest); ec) {
|
while (in_flight_bytes_) {
|
||||||
cntx_->ReportError(ec);
|
auto next = chrono::steady_clock::now() + 1s;
|
||||||
|
std::cv_status status =
|
||||||
|
waker_.await_until([this] { return this->in_flight_bytes_ == 0; }, next);
|
||||||
|
LOG_IF(WARNING, status == std::cv_status::timeout)
|
||||||
|
<< "Waiting for inflight bytes " << in_flight_bytes_;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bool JournalStreamer::IsStalled() const {
|
||||||
|
return in_flight_bytes_ >= replication_stream_output_limit_cached;
|
||||||
|
}
|
||||||
|
|
||||||
RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal,
|
RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal,
|
||||||
Context* cntx)
|
Context* cntx)
|
||||||
: JournalStreamer(journal, cntx), db_slice_(slice), my_slots_(std::move(slots)) {
|
: JournalStreamer(journal, cntx), db_slice_(slice), my_slots_(std::move(slots)) {
|
||||||
DCHECK(slice != nullptr);
|
DCHECK(slice != nullptr);
|
||||||
}
|
}
|
||||||
|
|
||||||
void RestoreStreamer::Start(io::Sink* dest, bool send_lsn) {
|
void RestoreStreamer::Start(io::AsyncSink* dest, bool send_lsn) {
|
||||||
VLOG(1) << "RestoreStreamer start";
|
VLOG(1) << "RestoreStreamer start";
|
||||||
auto db_cb = absl::bind_front(&RestoreStreamer::OnDbChange, this);
|
auto db_cb = absl::bind_front(&RestoreStreamer::OnDbChange, this);
|
||||||
snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb));
|
snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb));
|
||||||
|
@ -78,7 +202,7 @@ void RestoreStreamer::Start(io::Sink* dest, bool send_lsn) {
|
||||||
PrimeTable* pt = &db_slice_->databases()[0]->prime;
|
PrimeTable* pt = &db_slice_->databases()[0]->prime;
|
||||||
|
|
||||||
do {
|
do {
|
||||||
if (fiber_cancellation_.IsCancelled())
|
if (fiber_cancelled_)
|
||||||
return;
|
return;
|
||||||
|
|
||||||
bool written = false;
|
bool written = false;
|
||||||
|
@ -90,11 +214,10 @@ void RestoreStreamer::Start(io::Sink* dest, bool send_lsn) {
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
if (written) {
|
if (written) {
|
||||||
NotifyWritten(true);
|
ThrottleIfNeeded();
|
||||||
}
|
}
|
||||||
++last_yield;
|
|
||||||
|
|
||||||
if (last_yield >= 100) {
|
if (++last_yield >= 100) {
|
||||||
ThisFiber::Yield();
|
ThisFiber::Yield();
|
||||||
last_yield = 0;
|
last_yield = 0;
|
||||||
}
|
}
|
||||||
|
@ -105,9 +228,14 @@ void RestoreStreamer::SendFinalize() {
|
||||||
VLOG(1) << "RestoreStreamer FIN opcode for : " << db_slice_->shard_id();
|
VLOG(1) << "RestoreStreamer FIN opcode for : " << db_slice_->shard_id();
|
||||||
journal::Entry entry(journal::Op::FIN, 0 /*db_id*/, 0 /*slot_id*/);
|
journal::Entry entry(journal::Op::FIN, 0 /*db_id*/, 0 /*slot_id*/);
|
||||||
|
|
||||||
JournalWriter writer{this};
|
io::StringSink sink;
|
||||||
|
JournalWriter writer{&sink};
|
||||||
writer.Write(entry);
|
writer.Write(entry);
|
||||||
NotifyWritten(true);
|
Write(sink.str());
|
||||||
|
|
||||||
|
// TODO: is the intent here to flush everything?
|
||||||
|
//
|
||||||
|
ThrottleIfNeeded();
|
||||||
}
|
}
|
||||||
|
|
||||||
RestoreStreamer::~RestoreStreamer() {
|
RestoreStreamer::~RestoreStreamer() {
|
||||||
|
@ -117,7 +245,7 @@ void RestoreStreamer::Cancel() {
|
||||||
auto sver = snapshot_version_;
|
auto sver = snapshot_version_;
|
||||||
snapshot_version_ = 0; // to prevent double cancel in another fiber
|
snapshot_version_ = 0; // to prevent double cancel in another fiber
|
||||||
if (sver != 0) {
|
if (sver != 0) {
|
||||||
fiber_cancellation_.Cancel();
|
fiber_cancelled_ = true;
|
||||||
db_slice_->UnregisterOnChange(sver);
|
db_slice_->UnregisterOnChange(sver);
|
||||||
JournalStreamer::Cancel();
|
JournalStreamer::Cancel();
|
||||||
}
|
}
|
||||||
|
@ -176,16 +304,12 @@ void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req
|
||||||
PrimeTable* table = db_slice_->GetTables(0).first;
|
PrimeTable* table = db_slice_->GetTables(0).first;
|
||||||
|
|
||||||
if (const PrimeTable::bucket_iterator* bit = req.update()) {
|
if (const PrimeTable::bucket_iterator* bit = req.update()) {
|
||||||
if (WriteBucket(*bit)) {
|
WriteBucket(*bit);
|
||||||
NotifyWritten(false);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
string_view key = get<string_view>(req.change);
|
string_view key = get<string_view>(req.change);
|
||||||
table->CVCUponInsert(snapshot_version_, key, [this](PrimeTable::bucket_iterator it) {
|
table->CVCUponInsert(snapshot_version_, key, [this](PrimeTable::bucket_iterator it) {
|
||||||
DCHECK_LT(it.GetVersion(), snapshot_version_);
|
DCHECK_LT(it.GetVersion(), snapshot_version_);
|
||||||
if (WriteBucket(it)) {
|
WriteBucket(it);
|
||||||
NotifyWritten(false);
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -214,8 +338,12 @@ void RestoreStreamer::WriteCommand(journal::Entry::Payload cmd_payload) {
|
||||||
0, // slot-id, but it is ignored at this level
|
0, // slot-id, but it is ignored at this level
|
||||||
cmd_payload);
|
cmd_payload);
|
||||||
|
|
||||||
JournalWriter writer{this};
|
// TODO: From WriteEntry to till Write we tripple copy the PrimeValue. It's ver in-efficient and
|
||||||
|
// will burn CPU for large values.
|
||||||
|
io::StringSink sink;
|
||||||
|
JournalWriter writer{&sink};
|
||||||
writer.Write(entry);
|
writer.Write(entry);
|
||||||
|
Write(sink.str());
|
||||||
}
|
}
|
||||||
|
|
||||||
} // namespace dfly
|
} // namespace dfly
|
||||||
|
|
|
@ -5,7 +5,6 @@
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
#include "server/db_slice.h"
|
#include "server/db_slice.h"
|
||||||
#include "server/io_utils.h"
|
|
||||||
#include "server/journal/journal.h"
|
#include "server/journal/journal.h"
|
||||||
#include "server/journal/serializer.h"
|
#include "server/journal/serializer.h"
|
||||||
#include "server/rdb_save.h"
|
#include "server/rdb_save.h"
|
||||||
|
@ -14,39 +13,58 @@ namespace dfly {
|
||||||
|
|
||||||
// Buffered single-shard journal streamer that listens for journal changes with a
|
// Buffered single-shard journal streamer that listens for journal changes with a
|
||||||
// journal listener and writes them to a destination sink in a separate fiber.
|
// journal listener and writes them to a destination sink in a separate fiber.
|
||||||
class JournalStreamer : protected BufferedStreamerBase {
|
class JournalStreamer {
|
||||||
public:
|
public:
|
||||||
JournalStreamer(journal::Journal* journal, Context* cntx)
|
JournalStreamer(journal::Journal* journal, Context* cntx);
|
||||||
: BufferedStreamerBase{cntx->GetCancellation()}, cntx_{cntx}, journal_{journal} {
|
virtual ~JournalStreamer();
|
||||||
}
|
|
||||||
|
|
||||||
// Self referential.
|
// Self referential.
|
||||||
JournalStreamer(const JournalStreamer& other) = delete;
|
JournalStreamer(const JournalStreamer& other) = delete;
|
||||||
JournalStreamer(JournalStreamer&& other) = delete;
|
JournalStreamer(JournalStreamer&& other) = delete;
|
||||||
|
|
||||||
// Register journal listener and start writer in fiber.
|
// Register journal listener and start writer in fiber.
|
||||||
virtual void Start(io::Sink* dest, bool send_lsn);
|
virtual void Start(io::AsyncSink* dest, bool send_lsn);
|
||||||
|
|
||||||
// Must be called on context cancellation for unblocking
|
// Must be called on context cancellation for unblocking
|
||||||
// and manual cleanup.
|
// and manual cleanup.
|
||||||
virtual void Cancel();
|
virtual void Cancel();
|
||||||
|
|
||||||
using BufferedStreamerBase::GetTotalBufferCapacities;
|
size_t GetTotalBufferCapacities() const;
|
||||||
|
|
||||||
|
protected:
|
||||||
|
// TODO: we copy the string on each write because JournalItem may be passed to multiple
|
||||||
|
// streamers so we can not move it. However, if we would either wrap JournalItem in shared_ptr
|
||||||
|
// or wrap JournalItem::data in shared_ptr, we can avoid the cost of copying strings.
|
||||||
|
// Also, for small strings it's more peformant to copy to the intermediate buffer than
|
||||||
|
// to issue an io operation.
|
||||||
|
void Write(std::string_view str);
|
||||||
|
|
||||||
|
// Blocks the if the consumer if not keeping up.
|
||||||
|
void ThrottleIfNeeded();
|
||||||
|
|
||||||
private:
|
|
||||||
// Writer fiber that steals buffer contents and writes them to dest.
|
|
||||||
void WriterFb(io::Sink* dest);
|
|
||||||
virtual bool ShouldWrite(const journal::JournalItem& item) const {
|
virtual bool ShouldWrite(const journal::JournalItem& item) const {
|
||||||
return true;
|
return !IsStopped();
|
||||||
}
|
}
|
||||||
|
|
||||||
Context* cntx_;
|
void WaitForInflightToComplete();
|
||||||
|
|
||||||
|
private:
|
||||||
|
void OnCompletion(std::error_code ec, size_t len);
|
||||||
|
|
||||||
|
bool IsStopped() const {
|
||||||
|
return cntx_->IsCancelled();
|
||||||
|
}
|
||||||
|
|
||||||
|
bool IsStalled() const;
|
||||||
|
|
||||||
uint32_t journal_cb_id_{0};
|
|
||||||
journal::Journal* journal_;
|
journal::Journal* journal_;
|
||||||
|
Context* cntx_;
|
||||||
|
io::AsyncSink* dest_ = nullptr;
|
||||||
|
std::vector<uint8_t> pending_buf_;
|
||||||
|
size_t in_flight_bytes_ = 0;
|
||||||
time_t last_lsn_time_ = 0;
|
time_t last_lsn_time_ = 0;
|
||||||
|
util::fb2::EventCount waker_;
|
||||||
util::fb2::Fiber write_fb_{};
|
uint32_t journal_cb_id_{0};
|
||||||
};
|
};
|
||||||
|
|
||||||
// Serializes existing DB as RESTORE commands, and sends updates as regular commands.
|
// Serializes existing DB as RESTORE commands, and sends updates as regular commands.
|
||||||
|
@ -56,7 +74,7 @@ class RestoreStreamer : public JournalStreamer {
|
||||||
RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal, Context* cntx);
|
RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal, Context* cntx);
|
||||||
~RestoreStreamer() override;
|
~RestoreStreamer() override;
|
||||||
|
|
||||||
void Start(io::Sink* dest, bool send_lsn = false) override;
|
void Start(io::AsyncSink* dest, bool send_lsn = false) override;
|
||||||
// Cancel() must be called if Start() is called
|
// Cancel() must be called if Start() is called
|
||||||
void Cancel() override;
|
void Cancel() override;
|
||||||
|
|
||||||
|
@ -80,7 +98,7 @@ class RestoreStreamer : public JournalStreamer {
|
||||||
DbSlice* db_slice_;
|
DbSlice* db_slice_;
|
||||||
uint64_t snapshot_version_ = 0;
|
uint64_t snapshot_version_ = 0;
|
||||||
cluster::SlotSet my_slots_;
|
cluster::SlotSet my_slots_;
|
||||||
Cancellation fiber_cancellation_;
|
bool fiber_cancelled_ = false;
|
||||||
bool snapshot_finished_ = false;
|
bool snapshot_finished_ = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue