mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
deprecate RecordsPopper and serialize channel records during push (#3667)
chore: deprecate RecordsPopper and serialize channel records during push Records channel is redundant for DFS/replication because we have single producer/consumer scenario and both running on the same thread. Unfortunately we need it for RDB snapshotting. For non-rdb cases we could just pass a io sink to the snapshot producer, so that it would use it directly instead of StringFile inside FlushChannelRecord. This would reduce memory usage, eliminate yet another memory copy and generally would make everything simpler. For that to work, we must serialize the order of FlushChannelRecord, and that's implemented by this PR. Also fixes #3658. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
1306a91bda
commit
b7b96424e4
6 changed files with 55 additions and 114 deletions
1
.github/workflows/test-fakeredis.yml
vendored
1
.github/workflows/test-fakeredis.yml
vendored
|
@ -78,6 +78,7 @@ jobs:
|
|||
--ignore test/test_geo_commands.py \
|
||||
--ignore test/test_bitmap_commands.py \
|
||||
--ignore test/test_json/ \
|
||||
--ignore test/test_mixins/test_bitmap_commands.py \
|
||||
--junit-xml=results-tests.xml --html=report-tests.html -v
|
||||
continue-on-error: true # For now to mark the flow as successful
|
||||
|
||||
|
|
|
@ -22,9 +22,11 @@ template <typename T, typename Queue = folly::ProducerConsumerQueue<T>> class Si
|
|||
|
||||
// Here and below, we must accept a T instead of building it from variadic args, as we need to
|
||||
// know its size in case it is added.
|
||||
void Push(T t) noexcept {
|
||||
size_.fetch_add(t.size(), std::memory_order_relaxed);
|
||||
size_t Push(T t) noexcept {
|
||||
size_t tsize = t.size();
|
||||
size_t res = size_.fetch_add(tsize, std::memory_order_relaxed);
|
||||
queue_.Push(std::move(t));
|
||||
return res + tsize;
|
||||
}
|
||||
|
||||
bool TryPush(T t) noexcept {
|
||||
|
|
|
@ -1071,42 +1071,6 @@ error_code AlignedBuffer::Flush() {
|
|||
|
||||
class RdbSaver::Impl {
|
||||
private:
|
||||
// This is a helper struct to pop records from channel while enfocing returned records order.
|
||||
struct RecordsPopper {
|
||||
RecordsPopper(bool enforce_order, SliceSnapshot::RecordChannel* c)
|
||||
: enforce_order(enforce_order), channel(c) {
|
||||
}
|
||||
|
||||
// Blocking function, pops from channel.
|
||||
// If enforce_order is enabled return the records by order.
|
||||
// returns nullopt if channel was closed.
|
||||
std::optional<SliceSnapshot::DbRecord> Pop();
|
||||
|
||||
// Non blocking function, trys to pop from channel.
|
||||
// If enforce_order is enabled return the records by order.
|
||||
// returns nullopt if nothing in channel.
|
||||
std::optional<SliceSnapshot::DbRecord> TryPop();
|
||||
|
||||
private:
|
||||
std::optional<SliceSnapshot::DbRecord> InternalPop(bool blocking);
|
||||
// Checks if next record is in queue, if so set record_holder and return true, otherwise
|
||||
// return false.
|
||||
bool TryPopFromQueue();
|
||||
|
||||
struct Compare {
|
||||
bool operator()(const SliceSnapshot::DbRecord& a, const SliceSnapshot::DbRecord& b) {
|
||||
return a.id > b.id;
|
||||
}
|
||||
};
|
||||
// min heap holds the DbRecord that poped from channel OOO
|
||||
std::priority_queue<SliceSnapshot::DbRecord, std::vector<SliceSnapshot::DbRecord>, Compare>
|
||||
q_records;
|
||||
uint64_t next_record_id = 0;
|
||||
bool enforce_order;
|
||||
SliceSnapshot::RecordChannel* channel;
|
||||
SliceSnapshot::DbRecord record_holder;
|
||||
};
|
||||
|
||||
void CleanShardSnapshots();
|
||||
|
||||
public:
|
||||
|
@ -1161,7 +1125,6 @@ class RdbSaver::Impl {
|
|||
// used for serializing non-body components in the calling fiber.
|
||||
RdbSerializer meta_serializer_;
|
||||
SliceSnapshot::RecordChannel channel_;
|
||||
bool push_to_sink_with_order_ = false;
|
||||
std::optional<AlignedBuffer> aligned_buf_;
|
||||
|
||||
// Single entry compression is compatible with redis rdb snapshot
|
||||
|
@ -1185,9 +1148,6 @@ RdbSaver::Impl::Impl(bool align_writes, unsigned producers_len, CompressionMode
|
|||
aligned_buf_.emplace(kBufLen, sink);
|
||||
sink_ = &aligned_buf_.value();
|
||||
}
|
||||
if (sm == SaveMode::SINGLE_SHARD || sm == SaveMode::SINGLE_SHARD_WITH_SUMMARY) {
|
||||
push_to_sink_with_order_ = true;
|
||||
}
|
||||
|
||||
DCHECK(producers_len > 0 || channel_.IsClosing());
|
||||
save_mode_ = sm;
|
||||
|
@ -1223,56 +1183,15 @@ error_code RdbSaver::Impl::SaveAuxFieldStrStr(string_view key, string_view val)
|
|||
return error_code{};
|
||||
}
|
||||
|
||||
bool RdbSaver::Impl::RecordsPopper::TryPopFromQueue() {
|
||||
if (enforce_order && !q_records.empty() && q_records.top().id == next_record_id) {
|
||||
record_holder = std::move(const_cast<SliceSnapshot::DbRecord&>(q_records.top()));
|
||||
q_records.pop();
|
||||
++next_record_id;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
std::optional<SliceSnapshot::DbRecord> RdbSaver::Impl::RecordsPopper::Pop() {
|
||||
return InternalPop(true);
|
||||
}
|
||||
|
||||
std::optional<SliceSnapshot::DbRecord> RdbSaver::Impl::RecordsPopper::TryPop() {
|
||||
return InternalPop(false);
|
||||
}
|
||||
|
||||
std::optional<SliceSnapshot::DbRecord> RdbSaver::Impl::RecordsPopper::InternalPop(bool blocking) {
|
||||
if (TryPopFromQueue()) {
|
||||
return std::move(record_holder);
|
||||
}
|
||||
|
||||
auto pop_fn =
|
||||
blocking ? &SliceSnapshot::RecordChannel::Pop : &SliceSnapshot::RecordChannel::TryPop;
|
||||
|
||||
while ((channel->*pop_fn)(record_holder)) {
|
||||
if (!enforce_order) {
|
||||
return std::move(record_holder);
|
||||
}
|
||||
if (record_holder.id == next_record_id) {
|
||||
++next_record_id;
|
||||
return std::move(record_holder);
|
||||
}
|
||||
// record popped from channel is ooo, push to queue
|
||||
q_records.emplace(std::move(record_holder));
|
||||
}
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
|
||||
error_code io_error;
|
||||
std::optional<SliceSnapshot::DbRecord> record;
|
||||
SliceSnapshot::DbRecord record;
|
||||
|
||||
RecordsPopper records_popper(push_to_sink_with_order_, &channel_);
|
||||
auto& stats = ServerState::tlocal()->stats;
|
||||
|
||||
// we can not exit on io-error since we spawn fibers that push data.
|
||||
// TODO: we may signal them to stop processing and exit asap in case of the error.
|
||||
while ((record = records_popper.Pop())) {
|
||||
while (channel_.Pop(record)) {
|
||||
if (io_error || cll->IsCancelled())
|
||||
continue;
|
||||
|
||||
|
@ -1280,9 +1199,9 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
|
|||
if (cll->IsCancelled())
|
||||
continue;
|
||||
|
||||
DVLOG(2) << "Pulled " << record->id;
|
||||
DVLOG(2) << "Pulled " << record.id;
|
||||
last_write_time_ns_ = absl::GetCurrentTimeNanos();
|
||||
io_error = sink_->Write(io::Buffer(record->value));
|
||||
io_error = sink_->Write(io::Buffer(record.value));
|
||||
|
||||
stats.rdb_save_usec += (absl::GetCurrentTimeNanos() - last_write_time_ns_) / 1'000;
|
||||
stats.rdb_save_count++;
|
||||
|
@ -1291,14 +1210,14 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
|
|||
VLOG(1) << "Error writing to sink " << io_error.message();
|
||||
break;
|
||||
}
|
||||
} while ((record = records_popper.TryPop()));
|
||||
} // while (records_popper.Pop())
|
||||
} while ((channel_.TryPop(record)));
|
||||
} // while (channel_.Pop())
|
||||
|
||||
for (auto& ptr : shard_snapshots_) {
|
||||
ptr->Join();
|
||||
}
|
||||
|
||||
DCHECK(!record.has_value() || !channel_.TryPop(*record));
|
||||
DCHECK(!channel_.TryPop(record));
|
||||
|
||||
return io_error;
|
||||
}
|
||||
|
|
|
@ -21,14 +21,14 @@
|
|||
#include "server/tiered_storage.h"
|
||||
#include "util/fibers/synchronization.h"
|
||||
|
||||
using facade::operator""_MB;
|
||||
|
||||
namespace dfly {
|
||||
|
||||
using namespace std;
|
||||
using namespace util;
|
||||
using namespace chrono_literals;
|
||||
|
||||
using facade::operator""_MB;
|
||||
using facade::operator""_KB;
|
||||
namespace {
|
||||
thread_local absl::flat_hash_set<SliceSnapshot*> tl_slice_snapshots;
|
||||
} // namespace
|
||||
|
@ -78,7 +78,7 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll, Snapshot
|
|||
flush_fun = [this, flush_threshold](size_t bytes_serialized,
|
||||
RdbSerializer::FlushState flush_state) {
|
||||
if (bytes_serialized > flush_threshold) {
|
||||
auto serialized = Serialize(flush_state);
|
||||
size_t serialized = FlushChannelRecord(flush_state);
|
||||
VLOG(2) << "FlushedToChannel " << serialized << " bytes";
|
||||
}
|
||||
};
|
||||
|
@ -325,7 +325,7 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
|
|||
}
|
||||
}
|
||||
|
||||
size_t SliceSnapshot::Serialize(SerializerBase::FlushState flush_state) {
|
||||
size_t SliceSnapshot::FlushChannelRecord(SerializerBase::FlushState flush_state) {
|
||||
io::StringFile sfile;
|
||||
serializer_->FlushToSink(&sfile, flush_state);
|
||||
|
||||
|
@ -333,34 +333,51 @@ size_t SliceSnapshot::Serialize(SerializerBase::FlushState flush_state) {
|
|||
if (serialized == 0)
|
||||
return 0;
|
||||
|
||||
auto id = rec_id_++;
|
||||
DVLOG(2) << "Pushed " << id;
|
||||
uint64_t id = rec_id_++;
|
||||
DVLOG(2) << "Pushing " << id;
|
||||
DbRecord db_rec{.id = id, .value = std::move(sfile.val)};
|
||||
fb2::NoOpLock lk;
|
||||
|
||||
// We create a critical section here that ensures that records are pushed in sequential order.
|
||||
// As a result, it is not possible for two fiber producers to push into channel concurrently.
|
||||
// If A.id = 5, and then B.id = 6, and both are blocked here, it means that last_pushed_id_ < 4.
|
||||
// Once last_pushed_id_ = 4, A will be unblocked, while B will wait until A finishes pushing and
|
||||
// update last_pushed_id_ to 5.
|
||||
seq_cond_.wait(lk, [&] { return id == this->last_pushed_id_ + 1; });
|
||||
|
||||
// Blocking point.
|
||||
size_t channel_usage = dest_->Push(std::move(db_rec));
|
||||
DCHECK_EQ(last_pushed_id_ + 1, id);
|
||||
last_pushed_id_ = id;
|
||||
seq_cond_.notify_all();
|
||||
|
||||
VLOG(2) << "Pushed with Serialize() " << serialized
|
||||
<< " bytes, channel total usage: " << channel_usage;
|
||||
|
||||
dest_->Push(std::move(db_rec));
|
||||
if (serialized != 0) {
|
||||
VLOG(2) << "Pushed with Serialize() " << serialized << " bytes";
|
||||
}
|
||||
return serialized;
|
||||
}
|
||||
|
||||
bool SliceSnapshot::PushSerializedToChannel(bool force) {
|
||||
if (!force && serializer_->SerializedLen() < 4096)
|
||||
if (!force && serializer_->SerializedLen() < 4_KB)
|
||||
return false;
|
||||
|
||||
// Flush any of the leftovers to avoid interleavings
|
||||
size_t serialized = Serialize();
|
||||
size_t serialized = FlushChannelRecord(FlushState::kFlushMidEntry);
|
||||
|
||||
// Bucket serialization might have accumulated some delayed values.
|
||||
// Because we can finally block in this function, we'll await and serialize them
|
||||
while (!delayed_entries_.empty()) {
|
||||
auto& entry = delayed_entries_.back();
|
||||
serializer_->SaveEntry(entry.key, entry.value.Get(), entry.expire, entry.dbid, entry.mc_flags);
|
||||
delayed_entries_.pop_back();
|
||||
if (!delayed_entries_.empty()) {
|
||||
// Async bucket serialization might have accumulated some delayed values.
|
||||
// Because we can finally block in this function, we'll await and serialize them
|
||||
do {
|
||||
auto& entry = delayed_entries_.back();
|
||||
serializer_->SaveEntry(entry.key, entry.value.Get(), entry.expire, entry.dbid,
|
||||
entry.mc_flags);
|
||||
delayed_entries_.pop_back();
|
||||
} while (!delayed_entries_.empty());
|
||||
|
||||
// blocking point.
|
||||
serialized += FlushChannelRecord(FlushState::kFlushMidEntry);
|
||||
}
|
||||
|
||||
size_t total_serialized = Serialize() + serialized;
|
||||
return total_serialized > 0;
|
||||
return serialized > 0;
|
||||
}
|
||||
|
||||
void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) {
|
||||
|
|
|
@ -131,7 +131,7 @@ class SliceSnapshot {
|
|||
// Helper function that flushes the serialized items into the RecordStream.
|
||||
// Can block on the channel.
|
||||
using FlushState = SerializerBase::FlushState;
|
||||
size_t Serialize(FlushState flush_state = FlushState::kFlushMidEntry);
|
||||
size_t FlushChannelRecord(FlushState flush_state);
|
||||
|
||||
public:
|
||||
uint64_t snapshot_version() const {
|
||||
|
@ -173,14 +173,15 @@ class SliceSnapshot {
|
|||
// Used for sanity checks.
|
||||
bool serialize_bucket_running_ = false;
|
||||
util::fb2::Fiber snapshot_fb_; // IterateEntriesFb
|
||||
|
||||
util::fb2::CondVarAny seq_cond_;
|
||||
CompressionMode compression_mode_;
|
||||
RdbTypeFreqMap type_freq_map_;
|
||||
|
||||
// version upper bound for entries that should be saved (not included).
|
||||
uint64_t snapshot_version_ = 0;
|
||||
uint32_t journal_cb_id_ = 0;
|
||||
uint64_t rec_id_ = 0;
|
||||
|
||||
uint64_t rec_id_ = 1, last_pushed_id_ = 0;
|
||||
|
||||
struct Stats {
|
||||
size_t loop_serialized = 0;
|
||||
|
|
|
@ -364,6 +364,7 @@ def test_georadius_errors(r: redis.Redis):
|
|||
testtools.raw_command(r, "geoadd", "newgroup", *bad_values)
|
||||
|
||||
|
||||
@pytest.mark.unsupported_server_types("dragonfly")
|
||||
def test_geosearch(r: redis.Redis):
|
||||
values = (
|
||||
2.1909389952632,
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue