From b7b96424e43ac6e6f18e68cc1f22358566788fbb Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Mon, 9 Sep 2024 09:19:04 +0300 Subject: [PATCH] deprecate RecordsPopper and serialize channel records during push (#3667) chore: deprecate RecordsPopper and serialize channel records during push Records channel is redundant for DFS/replication because we have single producer/consumer scenario and both running on the same thread. Unfortunately we need it for RDB snapshotting. For non-rdb cases we could just pass a io sink to the snapshot producer, so that it would use it directly instead of StringFile inside FlushChannelRecord. This would reduce memory usage, eliminate yet another memory copy and generally would make everything simpler. For that to work, we must serialize the order of FlushChannelRecord, and that's implemented by this PR. Also fixes #3658. Signed-off-by: Roman Gershman --- .github/workflows/test-fakeredis.yml | 1 + src/core/size_tracking_channel.h | 6 +- src/server/rdb_save.cc | 95 ++----------------- src/server/snapshot.cc | 59 ++++++++---- src/server/snapshot.h | 7 +- .../test/test_mixins/test_geo_commands.py | 1 + 6 files changed, 55 insertions(+), 114 deletions(-) diff --git a/.github/workflows/test-fakeredis.yml b/.github/workflows/test-fakeredis.yml index f5e9f1d96..0d02c9773 100644 --- a/.github/workflows/test-fakeredis.yml +++ b/.github/workflows/test-fakeredis.yml @@ -78,6 +78,7 @@ jobs: --ignore test/test_geo_commands.py \ --ignore test/test_bitmap_commands.py \ --ignore test/test_json/ \ + --ignore test/test_mixins/test_bitmap_commands.py \ --junit-xml=results-tests.xml --html=report-tests.html -v continue-on-error: true # For now to mark the flow as successful diff --git a/src/core/size_tracking_channel.h b/src/core/size_tracking_channel.h index df7f442fd..894a82a5c 100644 --- a/src/core/size_tracking_channel.h +++ b/src/core/size_tracking_channel.h @@ -22,9 +22,11 @@ template > class Si // Here and below, we must accept a T instead of building it from variadic args, as we need to // know its size in case it is added. - void Push(T t) noexcept { - size_.fetch_add(t.size(), std::memory_order_relaxed); + size_t Push(T t) noexcept { + size_t tsize = t.size(); + size_t res = size_.fetch_add(tsize, std::memory_order_relaxed); queue_.Push(std::move(t)); + return res + tsize; } bool TryPush(T t) noexcept { diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 1d25c496b..c9b917d70 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -1071,42 +1071,6 @@ error_code AlignedBuffer::Flush() { class RdbSaver::Impl { private: - // This is a helper struct to pop records from channel while enfocing returned records order. - struct RecordsPopper { - RecordsPopper(bool enforce_order, SliceSnapshot::RecordChannel* c) - : enforce_order(enforce_order), channel(c) { - } - - // Blocking function, pops from channel. - // If enforce_order is enabled return the records by order. - // returns nullopt if channel was closed. - std::optional Pop(); - - // Non blocking function, trys to pop from channel. - // If enforce_order is enabled return the records by order. - // returns nullopt if nothing in channel. - std::optional TryPop(); - - private: - std::optional InternalPop(bool blocking); - // Checks if next record is in queue, if so set record_holder and return true, otherwise - // return false. - bool TryPopFromQueue(); - - struct Compare { - bool operator()(const SliceSnapshot::DbRecord& a, const SliceSnapshot::DbRecord& b) { - return a.id > b.id; - } - }; - // min heap holds the DbRecord that poped from channel OOO - std::priority_queue, Compare> - q_records; - uint64_t next_record_id = 0; - bool enforce_order; - SliceSnapshot::RecordChannel* channel; - SliceSnapshot::DbRecord record_holder; - }; - void CleanShardSnapshots(); public: @@ -1161,7 +1125,6 @@ class RdbSaver::Impl { // used for serializing non-body components in the calling fiber. RdbSerializer meta_serializer_; SliceSnapshot::RecordChannel channel_; - bool push_to_sink_with_order_ = false; std::optional aligned_buf_; // Single entry compression is compatible with redis rdb snapshot @@ -1185,9 +1148,6 @@ RdbSaver::Impl::Impl(bool align_writes, unsigned producers_len, CompressionMode aligned_buf_.emplace(kBufLen, sink); sink_ = &aligned_buf_.value(); } - if (sm == SaveMode::SINGLE_SHARD || sm == SaveMode::SINGLE_SHARD_WITH_SUMMARY) { - push_to_sink_with_order_ = true; - } DCHECK(producers_len > 0 || channel_.IsClosing()); save_mode_ = sm; @@ -1223,56 +1183,15 @@ error_code RdbSaver::Impl::SaveAuxFieldStrStr(string_view key, string_view val) return error_code{}; } -bool RdbSaver::Impl::RecordsPopper::TryPopFromQueue() { - if (enforce_order && !q_records.empty() && q_records.top().id == next_record_id) { - record_holder = std::move(const_cast(q_records.top())); - q_records.pop(); - ++next_record_id; - return true; - } - return false; -} - -std::optional RdbSaver::Impl::RecordsPopper::Pop() { - return InternalPop(true); -} - -std::optional RdbSaver::Impl::RecordsPopper::TryPop() { - return InternalPop(false); -} - -std::optional RdbSaver::Impl::RecordsPopper::InternalPop(bool blocking) { - if (TryPopFromQueue()) { - return std::move(record_holder); - } - - auto pop_fn = - blocking ? &SliceSnapshot::RecordChannel::Pop : &SliceSnapshot::RecordChannel::TryPop; - - while ((channel->*pop_fn)(record_holder)) { - if (!enforce_order) { - return std::move(record_holder); - } - if (record_holder.id == next_record_id) { - ++next_record_id; - return std::move(record_holder); - } - // record popped from channel is ooo, push to queue - q_records.emplace(std::move(record_holder)); - } - return std::nullopt; -} - error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { error_code io_error; - std::optional record; + SliceSnapshot::DbRecord record; - RecordsPopper records_popper(push_to_sink_with_order_, &channel_); auto& stats = ServerState::tlocal()->stats; // we can not exit on io-error since we spawn fibers that push data. // TODO: we may signal them to stop processing and exit asap in case of the error. - while ((record = records_popper.Pop())) { + while (channel_.Pop(record)) { if (io_error || cll->IsCancelled()) continue; @@ -1280,9 +1199,9 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { if (cll->IsCancelled()) continue; - DVLOG(2) << "Pulled " << record->id; + DVLOG(2) << "Pulled " << record.id; last_write_time_ns_ = absl::GetCurrentTimeNanos(); - io_error = sink_->Write(io::Buffer(record->value)); + io_error = sink_->Write(io::Buffer(record.value)); stats.rdb_save_usec += (absl::GetCurrentTimeNanos() - last_write_time_ns_) / 1'000; stats.rdb_save_count++; @@ -1291,14 +1210,14 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { VLOG(1) << "Error writing to sink " << io_error.message(); break; } - } while ((record = records_popper.TryPop())); - } // while (records_popper.Pop()) + } while ((channel_.TryPop(record))); + } // while (channel_.Pop()) for (auto& ptr : shard_snapshots_) { ptr->Join(); } - DCHECK(!record.has_value() || !channel_.TryPop(*record)); + DCHECK(!channel_.TryPop(record)); return io_error; } diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index e6674f41a..2365f642e 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -21,14 +21,14 @@ #include "server/tiered_storage.h" #include "util/fibers/synchronization.h" -using facade::operator""_MB; - namespace dfly { using namespace std; using namespace util; using namespace chrono_literals; +using facade::operator""_MB; +using facade::operator""_KB; namespace { thread_local absl::flat_hash_set tl_slice_snapshots; } // namespace @@ -78,7 +78,7 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot flush_fun = [this, flush_threshold](size_t bytes_serialized, RdbSerializer::FlushState flush_state) { if (bytes_serialized > flush_threshold) { - auto serialized = Serialize(flush_state); + size_t serialized = FlushChannelRecord(flush_state); VLOG(2) << "FlushedToChannel " << serialized << " bytes"; } }; @@ -325,7 +325,7 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr } } -size_t SliceSnapshot::Serialize(SerializerBase::FlushState flush_state) { +size_t SliceSnapshot::FlushChannelRecord(SerializerBase::FlushState flush_state) { io::StringFile sfile; serializer_->FlushToSink(&sfile, flush_state); @@ -333,34 +333,51 @@ size_t SliceSnapshot::Serialize(SerializerBase::FlushState flush_state) { if (serialized == 0) return 0; - auto id = rec_id_++; - DVLOG(2) << "Pushed " << id; + uint64_t id = rec_id_++; + DVLOG(2) << "Pushing " << id; DbRecord db_rec{.id = id, .value = std::move(sfile.val)}; + fb2::NoOpLock lk; + + // We create a critical section here that ensures that records are pushed in sequential order. + // As a result, it is not possible for two fiber producers to push into channel concurrently. + // If A.id = 5, and then B.id = 6, and both are blocked here, it means that last_pushed_id_ < 4. + // Once last_pushed_id_ = 4, A will be unblocked, while B will wait until A finishes pushing and + // update last_pushed_id_ to 5. + seq_cond_.wait(lk, [&] { return id == this->last_pushed_id_ + 1; }); + + // Blocking point. + size_t channel_usage = dest_->Push(std::move(db_rec)); + DCHECK_EQ(last_pushed_id_ + 1, id); + last_pushed_id_ = id; + seq_cond_.notify_all(); + + VLOG(2) << "Pushed with Serialize() " << serialized + << " bytes, channel total usage: " << channel_usage; - dest_->Push(std::move(db_rec)); - if (serialized != 0) { - VLOG(2) << "Pushed with Serialize() " << serialized << " bytes"; - } return serialized; } bool SliceSnapshot::PushSerializedToChannel(bool force) { - if (!force && serializer_->SerializedLen() < 4096) + if (!force && serializer_->SerializedLen() < 4_KB) return false; // Flush any of the leftovers to avoid interleavings - size_t serialized = Serialize(); + size_t serialized = FlushChannelRecord(FlushState::kFlushMidEntry); - // Bucket serialization might have accumulated some delayed values. - // Because we can finally block in this function, we'll await and serialize them - while (!delayed_entries_.empty()) { - auto& entry = delayed_entries_.back(); - serializer_->SaveEntry(entry.key, entry.value.Get(), entry.expire, entry.dbid, entry.mc_flags); - delayed_entries_.pop_back(); + if (!delayed_entries_.empty()) { + // Async bucket serialization might have accumulated some delayed values. + // Because we can finally block in this function, we'll await and serialize them + do { + auto& entry = delayed_entries_.back(); + serializer_->SaveEntry(entry.key, entry.value.Get(), entry.expire, entry.dbid, + entry.mc_flags); + delayed_entries_.pop_back(); + } while (!delayed_entries_.empty()); + + // blocking point. + serialized += FlushChannelRecord(FlushState::kFlushMidEntry); } - - size_t total_serialized = Serialize() + serialized; - return total_serialized > 0; + return serialized > 0; } void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 79e924314..706d64614 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -131,7 +131,7 @@ class SliceSnapshot { // Helper function that flushes the serialized items into the RecordStream. // Can block on the channel. using FlushState = SerializerBase::FlushState; - size_t Serialize(FlushState flush_state = FlushState::kFlushMidEntry); + size_t FlushChannelRecord(FlushState flush_state); public: uint64_t snapshot_version() const { @@ -173,14 +173,15 @@ class SliceSnapshot { // Used for sanity checks. bool serialize_bucket_running_ = false; util::fb2::Fiber snapshot_fb_; // IterateEntriesFb - + util::fb2::CondVarAny seq_cond_; CompressionMode compression_mode_; RdbTypeFreqMap type_freq_map_; // version upper bound for entries that should be saved (not included). uint64_t snapshot_version_ = 0; uint32_t journal_cb_id_ = 0; - uint64_t rec_id_ = 0; + + uint64_t rec_id_ = 1, last_pushed_id_ = 0; struct Stats { size_t loop_serialized = 0; diff --git a/tests/fakeredis/test/test_mixins/test_geo_commands.py b/tests/fakeredis/test/test_mixins/test_geo_commands.py index 5e92e8811..b93743ed0 100644 --- a/tests/fakeredis/test/test_mixins/test_geo_commands.py +++ b/tests/fakeredis/test/test_mixins/test_geo_commands.py @@ -364,6 +364,7 @@ def test_georadius_errors(r: redis.Redis): testtools.raw_command(r, "geoadd", "newgroup", *bad_values) +@pytest.mark.unsupported_server_types("dragonfly") def test_geosearch(r: redis.Redis): values = ( 2.1909389952632,