diff --git a/.github/workflows/test-fakeredis.yml b/.github/workflows/test-fakeredis.yml index f5e9f1d96..0d02c9773 100644 --- a/.github/workflows/test-fakeredis.yml +++ b/.github/workflows/test-fakeredis.yml @@ -78,6 +78,7 @@ jobs: --ignore test/test_geo_commands.py \ --ignore test/test_bitmap_commands.py \ --ignore test/test_json/ \ + --ignore test/test_mixins/test_bitmap_commands.py \ --junit-xml=results-tests.xml --html=report-tests.html -v continue-on-error: true # For now to mark the flow as successful diff --git a/src/core/size_tracking_channel.h b/src/core/size_tracking_channel.h index df7f442fd..894a82a5c 100644 --- a/src/core/size_tracking_channel.h +++ b/src/core/size_tracking_channel.h @@ -22,9 +22,11 @@ template > class Si // Here and below, we must accept a T instead of building it from variadic args, as we need to // know its size in case it is added. - void Push(T t) noexcept { - size_.fetch_add(t.size(), std::memory_order_relaxed); + size_t Push(T t) noexcept { + size_t tsize = t.size(); + size_t res = size_.fetch_add(tsize, std::memory_order_relaxed); queue_.Push(std::move(t)); + return res + tsize; } bool TryPush(T t) noexcept { diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 1d25c496b..c9b917d70 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -1071,42 +1071,6 @@ error_code AlignedBuffer::Flush() { class RdbSaver::Impl { private: - // This is a helper struct to pop records from channel while enfocing returned records order. - struct RecordsPopper { - RecordsPopper(bool enforce_order, SliceSnapshot::RecordChannel* c) - : enforce_order(enforce_order), channel(c) { - } - - // Blocking function, pops from channel. - // If enforce_order is enabled return the records by order. - // returns nullopt if channel was closed. - std::optional Pop(); - - // Non blocking function, trys to pop from channel. - // If enforce_order is enabled return the records by order. - // returns nullopt if nothing in channel. - std::optional TryPop(); - - private: - std::optional InternalPop(bool blocking); - // Checks if next record is in queue, if so set record_holder and return true, otherwise - // return false. - bool TryPopFromQueue(); - - struct Compare { - bool operator()(const SliceSnapshot::DbRecord& a, const SliceSnapshot::DbRecord& b) { - return a.id > b.id; - } - }; - // min heap holds the DbRecord that poped from channel OOO - std::priority_queue, Compare> - q_records; - uint64_t next_record_id = 0; - bool enforce_order; - SliceSnapshot::RecordChannel* channel; - SliceSnapshot::DbRecord record_holder; - }; - void CleanShardSnapshots(); public: @@ -1161,7 +1125,6 @@ class RdbSaver::Impl { // used for serializing non-body components in the calling fiber. RdbSerializer meta_serializer_; SliceSnapshot::RecordChannel channel_; - bool push_to_sink_with_order_ = false; std::optional aligned_buf_; // Single entry compression is compatible with redis rdb snapshot @@ -1185,9 +1148,6 @@ RdbSaver::Impl::Impl(bool align_writes, unsigned producers_len, CompressionMode aligned_buf_.emplace(kBufLen, sink); sink_ = &aligned_buf_.value(); } - if (sm == SaveMode::SINGLE_SHARD || sm == SaveMode::SINGLE_SHARD_WITH_SUMMARY) { - push_to_sink_with_order_ = true; - } DCHECK(producers_len > 0 || channel_.IsClosing()); save_mode_ = sm; @@ -1223,56 +1183,15 @@ error_code RdbSaver::Impl::SaveAuxFieldStrStr(string_view key, string_view val) return error_code{}; } -bool RdbSaver::Impl::RecordsPopper::TryPopFromQueue() { - if (enforce_order && !q_records.empty() && q_records.top().id == next_record_id) { - record_holder = std::move(const_cast(q_records.top())); - q_records.pop(); - ++next_record_id; - return true; - } - return false; -} - -std::optional RdbSaver::Impl::RecordsPopper::Pop() { - return InternalPop(true); -} - -std::optional RdbSaver::Impl::RecordsPopper::TryPop() { - return InternalPop(false); -} - -std::optional RdbSaver::Impl::RecordsPopper::InternalPop(bool blocking) { - if (TryPopFromQueue()) { - return std::move(record_holder); - } - - auto pop_fn = - blocking ? &SliceSnapshot::RecordChannel::Pop : &SliceSnapshot::RecordChannel::TryPop; - - while ((channel->*pop_fn)(record_holder)) { - if (!enforce_order) { - return std::move(record_holder); - } - if (record_holder.id == next_record_id) { - ++next_record_id; - return std::move(record_holder); - } - // record popped from channel is ooo, push to queue - q_records.emplace(std::move(record_holder)); - } - return std::nullopt; -} - error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { error_code io_error; - std::optional record; + SliceSnapshot::DbRecord record; - RecordsPopper records_popper(push_to_sink_with_order_, &channel_); auto& stats = ServerState::tlocal()->stats; // we can not exit on io-error since we spawn fibers that push data. // TODO: we may signal them to stop processing and exit asap in case of the error. - while ((record = records_popper.Pop())) { + while (channel_.Pop(record)) { if (io_error || cll->IsCancelled()) continue; @@ -1280,9 +1199,9 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { if (cll->IsCancelled()) continue; - DVLOG(2) << "Pulled " << record->id; + DVLOG(2) << "Pulled " << record.id; last_write_time_ns_ = absl::GetCurrentTimeNanos(); - io_error = sink_->Write(io::Buffer(record->value)); + io_error = sink_->Write(io::Buffer(record.value)); stats.rdb_save_usec += (absl::GetCurrentTimeNanos() - last_write_time_ns_) / 1'000; stats.rdb_save_count++; @@ -1291,14 +1210,14 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) { VLOG(1) << "Error writing to sink " << io_error.message(); break; } - } while ((record = records_popper.TryPop())); - } // while (records_popper.Pop()) + } while ((channel_.TryPop(record))); + } // while (channel_.Pop()) for (auto& ptr : shard_snapshots_) { ptr->Join(); } - DCHECK(!record.has_value() || !channel_.TryPop(*record)); + DCHECK(!channel_.TryPop(record)); return io_error; } diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index e6674f41a..2365f642e 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -21,14 +21,14 @@ #include "server/tiered_storage.h" #include "util/fibers/synchronization.h" -using facade::operator""_MB; - namespace dfly { using namespace std; using namespace util; using namespace chrono_literals; +using facade::operator""_MB; +using facade::operator""_KB; namespace { thread_local absl::flat_hash_set tl_slice_snapshots; } // namespace @@ -78,7 +78,7 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot flush_fun = [this, flush_threshold](size_t bytes_serialized, RdbSerializer::FlushState flush_state) { if (bytes_serialized > flush_threshold) { - auto serialized = Serialize(flush_state); + size_t serialized = FlushChannelRecord(flush_state); VLOG(2) << "FlushedToChannel " << serialized << " bytes"; } }; @@ -325,7 +325,7 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr } } -size_t SliceSnapshot::Serialize(SerializerBase::FlushState flush_state) { +size_t SliceSnapshot::FlushChannelRecord(SerializerBase::FlushState flush_state) { io::StringFile sfile; serializer_->FlushToSink(&sfile, flush_state); @@ -333,34 +333,51 @@ size_t SliceSnapshot::Serialize(SerializerBase::FlushState flush_state) { if (serialized == 0) return 0; - auto id = rec_id_++; - DVLOG(2) << "Pushed " << id; + uint64_t id = rec_id_++; + DVLOG(2) << "Pushing " << id; DbRecord db_rec{.id = id, .value = std::move(sfile.val)}; + fb2::NoOpLock lk; + + // We create a critical section here that ensures that records are pushed in sequential order. + // As a result, it is not possible for two fiber producers to push into channel concurrently. + // If A.id = 5, and then B.id = 6, and both are blocked here, it means that last_pushed_id_ < 4. + // Once last_pushed_id_ = 4, A will be unblocked, while B will wait until A finishes pushing and + // update last_pushed_id_ to 5. + seq_cond_.wait(lk, [&] { return id == this->last_pushed_id_ + 1; }); + + // Blocking point. + size_t channel_usage = dest_->Push(std::move(db_rec)); + DCHECK_EQ(last_pushed_id_ + 1, id); + last_pushed_id_ = id; + seq_cond_.notify_all(); + + VLOG(2) << "Pushed with Serialize() " << serialized + << " bytes, channel total usage: " << channel_usage; - dest_->Push(std::move(db_rec)); - if (serialized != 0) { - VLOG(2) << "Pushed with Serialize() " << serialized << " bytes"; - } return serialized; } bool SliceSnapshot::PushSerializedToChannel(bool force) { - if (!force && serializer_->SerializedLen() < 4096) + if (!force && serializer_->SerializedLen() < 4_KB) return false; // Flush any of the leftovers to avoid interleavings - size_t serialized = Serialize(); + size_t serialized = FlushChannelRecord(FlushState::kFlushMidEntry); - // Bucket serialization might have accumulated some delayed values. - // Because we can finally block in this function, we'll await and serialize them - while (!delayed_entries_.empty()) { - auto& entry = delayed_entries_.back(); - serializer_->SaveEntry(entry.key, entry.value.Get(), entry.expire, entry.dbid, entry.mc_flags); - delayed_entries_.pop_back(); + if (!delayed_entries_.empty()) { + // Async bucket serialization might have accumulated some delayed values. + // Because we can finally block in this function, we'll await and serialize them + do { + auto& entry = delayed_entries_.back(); + serializer_->SaveEntry(entry.key, entry.value.Get(), entry.expire, entry.dbid, + entry.mc_flags); + delayed_entries_.pop_back(); + } while (!delayed_entries_.empty()); + + // blocking point. + serialized += FlushChannelRecord(FlushState::kFlushMidEntry); } - - size_t total_serialized = Serialize() + serialized; - return total_serialized > 0; + return serialized > 0; } void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 79e924314..706d64614 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -131,7 +131,7 @@ class SliceSnapshot { // Helper function that flushes the serialized items into the RecordStream. // Can block on the channel. using FlushState = SerializerBase::FlushState; - size_t Serialize(FlushState flush_state = FlushState::kFlushMidEntry); + size_t FlushChannelRecord(FlushState flush_state); public: uint64_t snapshot_version() const { @@ -173,14 +173,15 @@ class SliceSnapshot { // Used for sanity checks. bool serialize_bucket_running_ = false; util::fb2::Fiber snapshot_fb_; // IterateEntriesFb - + util::fb2::CondVarAny seq_cond_; CompressionMode compression_mode_; RdbTypeFreqMap type_freq_map_; // version upper bound for entries that should be saved (not included). uint64_t snapshot_version_ = 0; uint32_t journal_cb_id_ = 0; - uint64_t rec_id_ = 0; + + uint64_t rec_id_ = 1, last_pushed_id_ = 0; struct Stats { size_t loop_serialized = 0; diff --git a/tests/fakeredis/test/test_mixins/test_geo_commands.py b/tests/fakeredis/test/test_mixins/test_geo_commands.py index 5e92e8811..b93743ed0 100644 --- a/tests/fakeredis/test/test_mixins/test_geo_commands.py +++ b/tests/fakeredis/test/test_mixins/test_geo_commands.py @@ -364,6 +364,7 @@ def test_georadius_errors(r: redis.Redis): testtools.raw_command(r, "geoadd", "newgroup", *bad_values) +@pytest.mark.unsupported_server_types("dragonfly") def test_geosearch(r: redis.Redis): values = ( 2.1909389952632,