diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 824a37ba1..0553aa01e 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -29,7 +29,6 @@ iovec IoVec(io::Bytes src) { return iovec{const_cast(src.data()), src.size()}; } -constexpr size_t kFlushThreshold = 2_KB; uint32_t replication_stream_output_limit_cached = 64_KB; } // namespace @@ -90,44 +89,42 @@ void JournalStreamer::Write(std::string_view str) { DCHECK(!str.empty()); DVLOG(2) << "Writing " << str.size() << " bytes"; - // If we do not have any in flight requests we send the string right a way. - // We can not aggregate it since we do not know when the next update will follow. size_t total_pending = pending_buf_.size() + str.size(); - if (in_flight_bytes_ == 0 || total_pending > kFlushThreshold) { - // because of potential SOO with strings we allocate explicitly on heap - uint8_t* buf(new uint8_t[str.size()]); - - // TODO: it is possible to remove these redundant copies if we adjust high level - // interfaces to pass reference-counted buffers. - memcpy(buf, str.data(), str.size()); - in_flight_bytes_ += total_pending; - - iovec v[2]; - unsigned next_buf_id = 0; - - if (!pending_buf_.empty()) { - v[0] = IoVec(pending_buf_); - ++next_buf_id; - } - v[next_buf_id++] = IoVec(io::Bytes(buf, str.size())); - - dest_->AsyncWrite( - v, next_buf_id, - [buf0 = std::move(pending_buf_), buf, this, len = total_pending](std::error_code ec) { - delete[] buf; - OnCompletion(ec, len); - }); - + if (in_flight_bytes_ > 0) { + // We can not flush data while there are in flight requests because AsyncWrite + // is not atomic. Therefore, we just aggregate. + size_t tail = pending_buf_.size(); + pending_buf_.resize(pending_buf_.size() + str.size()); + memcpy(pending_buf_.data() + tail, str.data(), str.size()); return; } - DCHECK_GT(in_flight_bytes_, 0u); - DCHECK_LE(pending_buf_.size() + str.size(), kFlushThreshold); + // If we do not have any in flight requests we send the string right a way. + // We can not aggregate it since we do not know when the next update will follow. + // because of potential SOO with strings, we allocate explicitly on heap. + uint8_t* buf(new uint8_t[str.size()]); - // Aggregate - size_t tail = pending_buf_.size(); - pending_buf_.resize(pending_buf_.size() + str.size()); - memcpy(pending_buf_.data() + tail, str.data(), str.size()); + // TODO: it is possible to remove these redundant copies if we adjust high level + // interfaces to pass reference-counted buffers. + memcpy(buf, str.data(), str.size()); + in_flight_bytes_ += total_pending; + total_sent_ += total_pending; + + iovec v[2]; + unsigned next_buf_id = 0; + + if (!pending_buf_.empty()) { + v[0] = IoVec(pending_buf_); + ++next_buf_id; + } + v[next_buf_id++] = IoVec(io::Bytes(buf, str.size())); + + dest_->AsyncWrite( + v, next_buf_id, + [buf0 = std::move(pending_buf_), buf, this, len = total_pending](std::error_code ec) { + delete[] buf; + OnCompletion(ec, len); + }); } void JournalStreamer::OnCompletion(std::error_code ec, size_t len) { @@ -160,13 +157,14 @@ void JournalStreamer::ThrottleIfNeeded() { auto next = chrono::steady_clock::now() + chrono::milliseconds(absl::GetFlag(FLAGS_replication_stream_timeout)); - auto inflight_start = in_flight_bytes_; + size_t inflight_start = in_flight_bytes_; + size_t sent_start = total_sent_; std::cv_status status = waker_.await_until([this]() { return !IsStalled() || IsStopped(); }, next); if (status == std::cv_status::timeout) { - LOG(WARNING) << "Stream timed out, inflight bytes start: " << inflight_start - << ", end: " << in_flight_bytes_; + LOG(WARNING) << "Stream timed out, inflight bytes/sent start: " << inflight_start << "/" + << sent_start << ", end: " << in_flight_bytes_ << "/" << total_sent_; cntx_->ReportError(make_error_code(errc::stream_timeout)); } } @@ -182,7 +180,7 @@ void JournalStreamer::WaitForInflightToComplete() { } bool JournalStreamer::IsStalled() const { - return in_flight_bytes_ >= replication_stream_output_limit_cached; + return in_flight_bytes_ + pending_buf_.size() >= replication_stream_output_limit_cached; } RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal, diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 2837fae83..f9af83de6 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -63,7 +63,8 @@ class JournalStreamer { journal::Journal* journal_; std::vector pending_buf_; - size_t in_flight_bytes_ = 0; + size_t in_flight_bytes_ = 0, total_sent_ = 0; + time_t last_lsn_time_ = 0; util::fb2::EventCount waker_; uint32_t journal_cb_id_{0};