// Copyright 2022, DragonflyDB authors. All rights reserved. // See LICENSE for licensing terms. // #include "io/io.h" #include "io/io_buf.h" #include "server/common.h" namespace dfly { // Base for constructing buffered byte streams with backpressure // for single producer and consumer on the same thread. // // Use it as a io::Sink to write data from a producer fiber, // and ConsumeIntoSink to extract this data in a consumer fiber. // Use NotifyWritten to request the consumer to be woken up. // // Uses two base::IoBuf internally that are swapped in turns. class BufferedStreamerBase : public io::Sink { protected: // Initialize with global cancellation and optional stall conditions. BufferedStreamerBase(const Cancellation* cll, unsigned max_buffered_cnt = 5, unsigned max_buffered_mem = 8192) : cll_{cll}, max_buffered_cnt_{max_buffered_cnt}, max_buffered_mem_{max_buffered_mem} { } public: size_t GetTotalBufferCapacities() const; protected: // Write some data into the internal buffer. // // Consumer needs to be woken up manually with NotifyWritten to avoid waking it up for small // writes: // // while (should_write()) { // bsb->WriteSome(...); <- Write some data // bsb->WriteSome(...); // ... // bsb->NotifyWritten(); <- Wake up consumer after writes // } // bsb->Finalize(); <- Finalize to unblock consumer // io::Result WriteSome(const iovec* vec, uint32_t len) override; // Report that a batch of data has been written and the consumer can be woken up. // Blocks if the consumer if not keeping up, if allow_await is set to true. void NotifyWritten(bool allow_await); // Blocks the if the consumer if not keeping up. void AwaitIfWritten(); // Report producer finished. void Finalize(); // Consume whole stream to sink from the consumer fiber. Unblocks when cancelled or finalized. std::error_code ConsumeIntoSink(io::Sink* dest); // Whether the consumer is not keeping up. bool IsStalled(); // Whether the producer stopped or the context was cancelled. bool IsStopped(); protected: bool producer_done_ = false; // whether producer is done unsigned buffered_ = 0; // how many entries are buffered util::fb2::EventCount waker_; // two sided waker const Cancellation* cll_; // global cancellation unsigned max_buffered_cnt_; // Max buffered entries before stall unsigned max_buffered_mem_; // Max buffered mem before stall io::IoBuf producer_buf_, consumer_buf_; // Two buffers that are swapped in turns. }; } // namespace dfly