diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index f36be18bd..8934be18c 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -1167,7 +1167,6 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato } auto& db = db_arr_[cntx.db_index]; - auto expire_it = db->expire.Find(it->first); if (IsValid(expire_it)) { @@ -1184,6 +1183,9 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato << ", prime table size: " << db->prime.size() << util::fb2::GetStacktrace(); } + DCHECK(shard_owner()->shard_lock()->Check(IntentLock::Mode::EXCLUSIVE)) + << util::fb2::GetStacktrace(); + string scratch; string_view key = it->first.GetSlice(&scratch); diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 75bbabba5..9e154024e 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -665,9 +665,8 @@ void DflyCmd::StartStableSyncInThread(FlowInfo* flow, ExecutionState* exec_st, E DCHECK(shard); DCHECK(flow->conn); - flow->streamer.reset(new JournalStreamer(sf_->journal(), exec_st)); - bool send_lsn = flow->version >= DflyVersion::VER4; - flow->streamer->Start(flow->conn->socket(), send_lsn); + flow->streamer.reset(new JournalStreamer(sf_->journal(), exec_st, JournalStreamer::SendLsn::YES)); + flow->streamer->Start(flow->conn->socket()); // Register cleanup. flow->cleanup = [flow]() { diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 648265997..b0afecab9 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -58,8 +58,8 @@ error_code Journal::Close() { return {}; } -uint32_t Journal::RegisterOnChange(ChangeCallback cb) { - return journal_slice.RegisterOnChange(cb); +uint32_t Journal::RegisterOnChange(JournalConsumerInterface* consumer) { + return journal_slice.RegisterOnChange(consumer); } void Journal::UnregisterOnChange(uint32_t id) { diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index d47d52b62..21fd18d77 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -25,7 +25,7 @@ class Journal { //******* The following functions must be called in the context of the owning shard *********// - uint32_t RegisterOnChange(ChangeCallback cb); + uint32_t RegisterOnChange(JournalConsumerInterface* consumer); void UnregisterOnChange(uint32_t id); bool HasRegisteredCallbacks() const; diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index 82670c4cd..72f865268 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -156,29 +156,30 @@ void JournalSlice::CallOnChange(const JournalItem& item) { // Hence this lock prevents the UnregisterOnChange to start running in the middle of CallOnChange. // CallOnChange is atomic iff JournalSlice::SetFlushMode(false) is called before. std::shared_lock lk(cb_mu_); - - const size_t size = change_cb_arr_.size(); - auto k_v = change_cb_arr_.begin(); - for (size_t i = 0; i < size; ++i) { - k_v->second(item, enable_journal_flush_); - ++k_v; + for (auto k_v : journal_consumers_arr_) { + k_v.second->ConsumeJournalChange(item); + } + if (enable_journal_flush_) { + for (auto k_v : journal_consumers_arr_) { + k_v.second->ThrottleIfNeeded(); + } } } -uint32_t JournalSlice::RegisterOnChange(ChangeCallback cb) { +uint32_t JournalSlice::RegisterOnChange(JournalConsumerInterface* consumer) { // mutex lock isn't needed due to iterators are not invalidated uint32_t id = next_cb_id_++; - change_cb_arr_.emplace_back(id, std::move(cb)); + journal_consumers_arr_.emplace_back(id, std::move(consumer)); return id; } void JournalSlice::UnregisterOnChange(uint32_t id) { // we need to wait until callback is finished before remove it lock_guard lk(cb_mu_); - auto it = find_if(change_cb_arr_.begin(), change_cb_arr_.end(), + auto it = find_if(journal_consumers_arr_.begin(), journal_consumers_arr_.end(), [id](const auto& e) { return e.first == id; }); - CHECK(it != change_cb_arr_.end()); - change_cb_arr_.erase(it); + CHECK(it != journal_consumers_arr_.end()); + journal_consumers_arr_.erase(it); } } // namespace journal diff --git a/src/server/journal/journal_slice.h b/src/server/journal/journal_slice.h index da0b18ea7..c8a9553ea 100644 --- a/src/server/journal/journal_slice.h +++ b/src/server/journal/journal_slice.h @@ -43,11 +43,11 @@ class JournalSlice { // added to the journal. // The callback receives the entry and a boolean that indicates whether // awaiting (to apply backpressure) is allowed. - uint32_t RegisterOnChange(ChangeCallback cb); + uint32_t RegisterOnChange(JournalConsumerInterface* consumer); void UnregisterOnChange(uint32_t); bool HasRegisteredCallbacks() const { - return !change_cb_arr_.empty(); + return !journal_consumers_arr_.empty(); } /// Returns whether the journal entry with this LSN is available @@ -70,7 +70,7 @@ class JournalSlice { base::IoBuf ring_serialize_buf_; mutable util::fb2::SharedMutex cb_mu_; // to prevent removing callback during call - std::list> change_cb_arr_; + std::list> journal_consumers_arr_; LSN lsn_ = 1; diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index c976ac52a..46bf8dee9 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -39,8 +39,8 @@ uint32_t migration_buckets_serialization_threshold_cached = 100; } // namespace -JournalStreamer::JournalStreamer(journal::Journal* journal, ExecutionState* cntx) - : cntx_(cntx), journal_(journal) { +JournalStreamer::JournalStreamer(journal::Journal* journal, ExecutionState* cntx, SendLsn send_lsn) + : cntx_(cntx), journal_(journal), send_lsn_(send_lsn) { // cache the flag to avoid accessing it later. replication_stream_output_limit_cached = absl::GetFlag(FLAGS_replication_stream_output_limit); } @@ -52,35 +52,29 @@ JournalStreamer::~JournalStreamer() { VLOG(1) << "~JournalStreamer"; } -void JournalStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) { +void JournalStreamer::ConsumeJournalChange(const JournalItem& item) { + if (!ShouldWrite(item)) { + return; + } + + DCHECK_GT(item.lsn, last_lsn_writen_); + Write(item.data); + time_t now = time(nullptr); + last_lsn_writen_ = item.lsn; + // TODO: to chain it to the previous Write call. + if (send_lsn_ == SendLsn::YES && now - last_lsn_time_ > 3) { + last_lsn_time_ = now; + io::StringSink sink; + JournalWriter writer(&sink); + writer.Write(Entry{journal::Op::LSN, item.lsn}); + Write(std::move(sink).str()); + } +} + +void JournalStreamer::Start(util::FiberSocketBase* dest) { CHECK(dest_ == nullptr && dest != nullptr); dest_ = dest; - journal_cb_id_ = - journal_->RegisterOnChange([this, send_lsn](const JournalItem& item, bool allow_await) { - if (allow_await) { - ThrottleIfNeeded(); - // No record to write, just await if data was written so consumer will read the data. - // TODO: shouldnt we trigger async write in noop?? - if (item.opcode == Op::NOOP) - return; - } - - if (!ShouldWrite(item)) { - return; - } - - Write(item.data); - time_t now = time(nullptr); - - // TODO: to chain it to the previous Write call. - if (send_lsn && now - last_lsn_time_ > 3) { - last_lsn_time_ = now; - io::StringSink sink; - JournalWriter writer(&sink); - writer.Write(Entry{journal::Op::LSN, item.lsn}); - Write(std::move(sink).str()); - } - }); + journal_cb_id_ = journal_->RegisterOnChange(this); } void JournalStreamer::Cancel() { @@ -188,14 +182,16 @@ bool JournalStreamer::IsStalled() const { RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal, ExecutionState* cntx) - : JournalStreamer(journal, cntx), db_slice_(slice), my_slots_(std::move(slots)) { + : JournalStreamer(journal, cntx, JournalStreamer::SendLsn::NO), + db_slice_(slice), + my_slots_(std::move(slots)) { DCHECK(slice != nullptr); migration_buckets_serialization_threshold_cached = absl::GetFlag(FLAGS_migration_buckets_serialization_threshold); db_array_ = slice->databases(); // Inc ref to make sure DB isn't deleted while we use it } -void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) { +void RestoreStreamer::Start(util::FiberSocketBase* dest) { if (!cntx_->IsRunning()) return; @@ -203,7 +199,7 @@ void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) { auto db_cb = absl::bind_front(&RestoreStreamer::OnDbChange, this); snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb)); - JournalStreamer::Start(dest, send_lsn); + JournalStreamer::Start(dest); } void RestoreStreamer::Run() { diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index d1d0bade9..16ba86e99 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -18,9 +18,10 @@ namespace dfly { // Buffered single-shard journal streamer that listens for journal changes with a // journal listener and writes them to a destination sink in a separate fiber. -class JournalStreamer { +class JournalStreamer : public journal::JournalConsumerInterface { public: - JournalStreamer(journal::Journal* journal, ExecutionState* cntx); + enum class SendLsn { NO = 0, YES = 1 }; + JournalStreamer(journal::Journal* journal, ExecutionState* cntx, SendLsn send_lsn); virtual ~JournalStreamer(); // Self referential. @@ -28,7 +29,9 @@ class JournalStreamer { JournalStreamer(JournalStreamer&& other) = delete; // Register journal listener and start writer in fiber. - virtual void Start(util::FiberSocketBase* dest, bool send_lsn); + virtual void Start(util::FiberSocketBase* dest); + + void ConsumeJournalChange(const journal::JournalItem& item); // Must be called on context cancellation for unblocking // and manual cleanup. @@ -48,7 +51,7 @@ class JournalStreamer { void ThrottleIfNeeded(); virtual bool ShouldWrite(const journal::JournalItem& item) const { - return cntx_->IsRunning(); + return cntx_->IsRunning() && item.opcode != journal::Op::NOOP; } void WaitForInflightToComplete(); @@ -68,8 +71,10 @@ class JournalStreamer { size_t in_flight_bytes_ = 0, total_sent_ = 0; time_t last_lsn_time_ = 0; + LSN last_lsn_writen_ = 0; util::fb2::EventCount waker_; uint32_t journal_cb_id_{0}; + SendLsn send_lsn_; }; // Serializes existing DB as RESTORE commands, and sends updates as regular commands. @@ -80,7 +85,7 @@ class RestoreStreamer : public JournalStreamer { ExecutionState* cntx); ~RestoreStreamer() override; - void Start(util::FiberSocketBase* dest, bool send_lsn = false) override; + void Start(util::FiberSocketBase* dest) override; void Run(); diff --git a/src/server/journal/types.h b/src/server/journal/types.h index bb3904dbc..24183623a 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -87,7 +87,14 @@ struct JournalItem { std::optional slot; }; -using ChangeCallback = std::function; +struct JournalConsumerInterface { + virtual ~JournalConsumerInterface() = default; + + // Receives a journal change for serializing + virtual void ConsumeJournalChange(const JournalItem& item) = 0; + // Waits for writing the serialized data + virtual void ThrottleIfNeeded() = 0; +}; } // namespace journal } // namespace dfly diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 7b9c53602..9eaae4df2 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -75,10 +75,7 @@ void SliceSnapshot::Start(bool stream_journal, SnapshotFlush allow_flush) { if (stream_journal) { auto* journal = db_slice_->shard_owner()->journal(); DCHECK(journal); - auto journal_cb = [this](const journal::JournalItem& item, bool await) { - OnJournalEntry(item, await); - }; - journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb)); + journal_cb_id_ = journal->RegisterOnChange(this); } const auto flush_threshold = ServerState::tlocal()->serialization_max_chunk_size; @@ -131,6 +128,7 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) { journal->UnregisterOnChange(cb_id); if (!cancel) { // always succeeds because serializer_ flushes to string. + VLOG(1) << "FinalizeJournalStream lsn: " << journal->GetLsn(); std::ignore = serializer_->SendJournalOffset(journal->GetLsn()); PushSerialized(true); } @@ -227,10 +225,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) { if (journal->GetLsn() == lsn) { std::ignore = serializer_->SendFullSyncCut(); - auto journal_cb = [this](const journal::JournalItem& item, bool await) { - OnJournalEntry(item, await); - }; - journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb)); + journal_cb_id_ = journal->RegisterOnChange(this); PushSerialized(true); } else { // We stopped but we didn't manage to send the whole stream. @@ -415,7 +410,7 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) // transaction. // allow_flush is controlled by Journal::SetFlushMode // (usually it's true unless we are in the middle of a critical section that can not preempt). -void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool allow_flush) { +void SliceSnapshot::ConsumeJournalChange(const journal::JournalItem& item) { { // We grab the lock in case we are in the middle of serializing a bucket, so it serves as a // barrier here for atomic serialization. @@ -424,12 +419,10 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool allow_ std::ignore = serializer_->WriteJournalEntry(item.data); } } +} - if (allow_flush) { - // This is the only place that flushes in streaming mode - // once the iterate buckets fiber finished. - PushSerialized(false); - } +void SliceSnapshot::ThrottleIfNeeded() { + PushSerialized(false); } size_t SliceSnapshot::GetBufferCapacity() const { diff --git a/src/server/snapshot.h b/src/server/snapshot.h index f027b5e48..c49ae9f5f 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -47,7 +47,7 @@ struct Entry; // and submitting all values to an output sink. // In journal streaming mode, the snapshot continues submitting changes // over the sink until explicitly stopped. -class SliceSnapshot { +class SliceSnapshot : public journal::JournalConsumerInterface { public: // Represents a target for receiving snapshot data. struct SnapshotDataConsumerInterface { @@ -103,6 +103,10 @@ class SliceSnapshot { RdbSaver::SnapshotStats GetCurrentSnapshotProgress() const; + // Journal listener + void ConsumeJournalChange(const journal::JournalItem& item); + void ThrottleIfNeeded(); + private: // Main snapshotting fiber that iterates over all buckets in the db slice // and submits them to SerializeBucket. diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 0bbcc2668..578d9426d 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -149,11 +149,19 @@ cv_status Transaction::BatonBarrier::Wait(time_point tp) { Transaction::Guard::Guard(Transaction* tx) : tx(tx) { DCHECK(tx->cid_->opt_mask() & CO::GLOBAL_TRANS); - tx->Execute([](auto*, auto*) { return OpStatus::OK; }, false); + auto cb = [&](Transaction* t, EngineShard* shard) { + namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(false); + return OpStatus::OK; + }; + tx->Execute(cb, false); } Transaction::Guard::~Guard() { - tx->Conclude(); + auto cb = [&](Transaction* t, EngineShard* shard) { + namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(true); + return OpStatus::OK; + }; + tx->Execute(cb, true); tx->Refurbish(); } diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index eaa81cee9..b3c41f679 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2959,12 +2959,15 @@ async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFa await fill_task -@pytest.mark.skip("temporarily skipped") async def test_bug_in_json_memory_tracking(df_factory: DflyInstanceFactory): """ This test reproduces a bug in the JSON memory tracking. """ - master = df_factory.create(proactor_threads=2, serialization_max_chunk_size=1) + master = df_factory.create( + proactor_threads=2, + serialization_max_chunk_size=1, + vmodule="replica=2,dflycmd=2,snapshot=1,rdb_save=1,rdb_load=1,journal_slice=2", + ) replicas = [df_factory.create(proactor_threads=2) for i in range(2)] # Start instances and connect clients @@ -2982,6 +2985,7 @@ async def test_bug_in_json_memory_tracking(df_factory: DflyInstanceFactory): seeder = SeederV2(key_target=50_000) fill_task = asyncio.create_task(seeder.run(master.client())) + await asyncio.sleep(0.2) for replica in c_replicas: await replica.execute_command(f"REPLICAOF LOCALHOST {master.port}")