Merge branch 'main' into RemoveAtomic

This commit is contained in:
Roman Gershman 2025-05-05 19:01:58 +03:00 committed by GitHub
commit ca2d3b53c0
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 97 additions and 78 deletions

View file

@ -1167,7 +1167,6 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato
}
auto& db = db_arr_[cntx.db_index];
auto expire_it = db->expire.Find(it->first);
if (IsValid(expire_it)) {
@ -1184,6 +1183,9 @@ DbSlice::PrimeItAndExp DbSlice::ExpireIfNeeded(const Context& cntx, PrimeIterato
<< ", prime table size: " << db->prime.size() << util::fb2::GetStacktrace();
}
DCHECK(shard_owner()->shard_lock()->Check(IntentLock::Mode::EXCLUSIVE))
<< util::fb2::GetStacktrace();
string scratch;
string_view key = it->first.GetSlice(&scratch);

View file

@ -665,9 +665,8 @@ void DflyCmd::StartStableSyncInThread(FlowInfo* flow, ExecutionState* exec_st, E
DCHECK(shard);
DCHECK(flow->conn);
flow->streamer.reset(new JournalStreamer(sf_->journal(), exec_st));
bool send_lsn = flow->version >= DflyVersion::VER4;
flow->streamer->Start(flow->conn->socket(), send_lsn);
flow->streamer.reset(new JournalStreamer(sf_->journal(), exec_st, JournalStreamer::SendLsn::YES));
flow->streamer->Start(flow->conn->socket());
// Register cleanup.
flow->cleanup = [flow]() {

View file

@ -58,8 +58,8 @@ error_code Journal::Close() {
return {};
}
uint32_t Journal::RegisterOnChange(ChangeCallback cb) {
return journal_slice.RegisterOnChange(cb);
uint32_t Journal::RegisterOnChange(JournalConsumerInterface* consumer) {
return journal_slice.RegisterOnChange(consumer);
}
void Journal::UnregisterOnChange(uint32_t id) {

View file

@ -25,7 +25,7 @@ class Journal {
//******* The following functions must be called in the context of the owning shard *********//
uint32_t RegisterOnChange(ChangeCallback cb);
uint32_t RegisterOnChange(JournalConsumerInterface* consumer);
void UnregisterOnChange(uint32_t id);
bool HasRegisteredCallbacks() const;

View file

@ -156,29 +156,30 @@ void JournalSlice::CallOnChange(const JournalItem& item) {
// Hence this lock prevents the UnregisterOnChange to start running in the middle of CallOnChange.
// CallOnChange is atomic iff JournalSlice::SetFlushMode(false) is called before.
std::shared_lock lk(cb_mu_);
const size_t size = change_cb_arr_.size();
auto k_v = change_cb_arr_.begin();
for (size_t i = 0; i < size; ++i) {
k_v->second(item, enable_journal_flush_);
++k_v;
for (auto k_v : journal_consumers_arr_) {
k_v.second->ConsumeJournalChange(item);
}
if (enable_journal_flush_) {
for (auto k_v : journal_consumers_arr_) {
k_v.second->ThrottleIfNeeded();
}
}
}
uint32_t JournalSlice::RegisterOnChange(ChangeCallback cb) {
uint32_t JournalSlice::RegisterOnChange(JournalConsumerInterface* consumer) {
// mutex lock isn't needed due to iterators are not invalidated
uint32_t id = next_cb_id_++;
change_cb_arr_.emplace_back(id, std::move(cb));
journal_consumers_arr_.emplace_back(id, std::move(consumer));
return id;
}
void JournalSlice::UnregisterOnChange(uint32_t id) {
// we need to wait until callback is finished before remove it
lock_guard lk(cb_mu_);
auto it = find_if(change_cb_arr_.begin(), change_cb_arr_.end(),
auto it = find_if(journal_consumers_arr_.begin(), journal_consumers_arr_.end(),
[id](const auto& e) { return e.first == id; });
CHECK(it != change_cb_arr_.end());
change_cb_arr_.erase(it);
CHECK(it != journal_consumers_arr_.end());
journal_consumers_arr_.erase(it);
}
} // namespace journal

View file

@ -43,11 +43,11 @@ class JournalSlice {
// added to the journal.
// The callback receives the entry and a boolean that indicates whether
// awaiting (to apply backpressure) is allowed.
uint32_t RegisterOnChange(ChangeCallback cb);
uint32_t RegisterOnChange(JournalConsumerInterface* consumer);
void UnregisterOnChange(uint32_t);
bool HasRegisteredCallbacks() const {
return !change_cb_arr_.empty();
return !journal_consumers_arr_.empty();
}
/// Returns whether the journal entry with this LSN is available
@ -70,7 +70,7 @@ class JournalSlice {
base::IoBuf ring_serialize_buf_;
mutable util::fb2::SharedMutex cb_mu_; // to prevent removing callback during call
std::list<std::pair<uint32_t, ChangeCallback>> change_cb_arr_;
std::list<std::pair<uint32_t, JournalConsumerInterface*>> journal_consumers_arr_;
LSN lsn_ = 1;

View file

@ -39,8 +39,8 @@ uint32_t migration_buckets_serialization_threshold_cached = 100;
} // namespace
JournalStreamer::JournalStreamer(journal::Journal* journal, ExecutionState* cntx)
: cntx_(cntx), journal_(journal) {
JournalStreamer::JournalStreamer(journal::Journal* journal, ExecutionState* cntx, SendLsn send_lsn)
: cntx_(cntx), journal_(journal), send_lsn_(send_lsn) {
// cache the flag to avoid accessing it later.
replication_stream_output_limit_cached = absl::GetFlag(FLAGS_replication_stream_output_limit);
}
@ -52,35 +52,29 @@ JournalStreamer::~JournalStreamer() {
VLOG(1) << "~JournalStreamer";
}
void JournalStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) {
void JournalStreamer::ConsumeJournalChange(const JournalItem& item) {
if (!ShouldWrite(item)) {
return;
}
DCHECK_GT(item.lsn, last_lsn_writen_);
Write(item.data);
time_t now = time(nullptr);
last_lsn_writen_ = item.lsn;
// TODO: to chain it to the previous Write call.
if (send_lsn_ == SendLsn::YES && now - last_lsn_time_ > 3) {
last_lsn_time_ = now;
io::StringSink sink;
JournalWriter writer(&sink);
writer.Write(Entry{journal::Op::LSN, item.lsn});
Write(std::move(sink).str());
}
}
void JournalStreamer::Start(util::FiberSocketBase* dest) {
CHECK(dest_ == nullptr && dest != nullptr);
dest_ = dest;
journal_cb_id_ =
journal_->RegisterOnChange([this, send_lsn](const JournalItem& item, bool allow_await) {
if (allow_await) {
ThrottleIfNeeded();
// No record to write, just await if data was written so consumer will read the data.
// TODO: shouldnt we trigger async write in noop??
if (item.opcode == Op::NOOP)
return;
}
if (!ShouldWrite(item)) {
return;
}
Write(item.data);
time_t now = time(nullptr);
// TODO: to chain it to the previous Write call.
if (send_lsn && now - last_lsn_time_ > 3) {
last_lsn_time_ = now;
io::StringSink sink;
JournalWriter writer(&sink);
writer.Write(Entry{journal::Op::LSN, item.lsn});
Write(std::move(sink).str());
}
});
journal_cb_id_ = journal_->RegisterOnChange(this);
}
void JournalStreamer::Cancel() {
@ -188,14 +182,16 @@ bool JournalStreamer::IsStalled() const {
RestoreStreamer::RestoreStreamer(DbSlice* slice, cluster::SlotSet slots, journal::Journal* journal,
ExecutionState* cntx)
: JournalStreamer(journal, cntx), db_slice_(slice), my_slots_(std::move(slots)) {
: JournalStreamer(journal, cntx, JournalStreamer::SendLsn::NO),
db_slice_(slice),
my_slots_(std::move(slots)) {
DCHECK(slice != nullptr);
migration_buckets_serialization_threshold_cached =
absl::GetFlag(FLAGS_migration_buckets_serialization_threshold);
db_array_ = slice->databases(); // Inc ref to make sure DB isn't deleted while we use it
}
void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) {
void RestoreStreamer::Start(util::FiberSocketBase* dest) {
if (!cntx_->IsRunning())
return;
@ -203,7 +199,7 @@ void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) {
auto db_cb = absl::bind_front(&RestoreStreamer::OnDbChange, this);
snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb));
JournalStreamer::Start(dest, send_lsn);
JournalStreamer::Start(dest);
}
void RestoreStreamer::Run() {

View file

@ -18,9 +18,10 @@ namespace dfly {
// Buffered single-shard journal streamer that listens for journal changes with a
// journal listener and writes them to a destination sink in a separate fiber.
class JournalStreamer {
class JournalStreamer : public journal::JournalConsumerInterface {
public:
JournalStreamer(journal::Journal* journal, ExecutionState* cntx);
enum class SendLsn { NO = 0, YES = 1 };
JournalStreamer(journal::Journal* journal, ExecutionState* cntx, SendLsn send_lsn);
virtual ~JournalStreamer();
// Self referential.
@ -28,7 +29,9 @@ class JournalStreamer {
JournalStreamer(JournalStreamer&& other) = delete;
// Register journal listener and start writer in fiber.
virtual void Start(util::FiberSocketBase* dest, bool send_lsn);
virtual void Start(util::FiberSocketBase* dest);
void ConsumeJournalChange(const journal::JournalItem& item);
// Must be called on context cancellation for unblocking
// and manual cleanup.
@ -48,7 +51,7 @@ class JournalStreamer {
void ThrottleIfNeeded();
virtual bool ShouldWrite(const journal::JournalItem& item) const {
return cntx_->IsRunning();
return cntx_->IsRunning() && item.opcode != journal::Op::NOOP;
}
void WaitForInflightToComplete();
@ -68,8 +71,10 @@ class JournalStreamer {
size_t in_flight_bytes_ = 0, total_sent_ = 0;
time_t last_lsn_time_ = 0;
LSN last_lsn_writen_ = 0;
util::fb2::EventCount waker_;
uint32_t journal_cb_id_{0};
SendLsn send_lsn_;
};
// Serializes existing DB as RESTORE commands, and sends updates as regular commands.
@ -80,7 +85,7 @@ class RestoreStreamer : public JournalStreamer {
ExecutionState* cntx);
~RestoreStreamer() override;
void Start(util::FiberSocketBase* dest, bool send_lsn = false) override;
void Start(util::FiberSocketBase* dest) override;
void Run();

View file

@ -87,7 +87,14 @@ struct JournalItem {
std::optional<SlotId> slot;
};
using ChangeCallback = std::function<void(const JournalItem&, bool await)>;
struct JournalConsumerInterface {
virtual ~JournalConsumerInterface() = default;
// Receives a journal change for serializing
virtual void ConsumeJournalChange(const JournalItem& item) = 0;
// Waits for writing the serialized data
virtual void ThrottleIfNeeded() = 0;
};
} // namespace journal
} // namespace dfly

View file

@ -75,10 +75,7 @@ void SliceSnapshot::Start(bool stream_journal, SnapshotFlush allow_flush) {
if (stream_journal) {
auto* journal = db_slice_->shard_owner()->journal();
DCHECK(journal);
auto journal_cb = [this](const journal::JournalItem& item, bool await) {
OnJournalEntry(item, await);
};
journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb));
journal_cb_id_ = journal->RegisterOnChange(this);
}
const auto flush_threshold = ServerState::tlocal()->serialization_max_chunk_size;
@ -131,6 +128,7 @@ void SliceSnapshot::FinalizeJournalStream(bool cancel) {
journal->UnregisterOnChange(cb_id);
if (!cancel) {
// always succeeds because serializer_ flushes to string.
VLOG(1) << "FinalizeJournalStream lsn: " << journal->GetLsn();
std::ignore = serializer_->SendJournalOffset(journal->GetLsn());
PushSerialized(true);
}
@ -227,10 +225,7 @@ void SliceSnapshot::SwitchIncrementalFb(LSN lsn) {
if (journal->GetLsn() == lsn) {
std::ignore = serializer_->SendFullSyncCut();
auto journal_cb = [this](const journal::JournalItem& item, bool await) {
OnJournalEntry(item, await);
};
journal_cb_id_ = journal->RegisterOnChange(std::move(journal_cb));
journal_cb_id_ = journal->RegisterOnChange(this);
PushSerialized(true);
} else {
// We stopped but we didn't manage to send the whole stream.
@ -415,7 +410,7 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
// transaction.
// allow_flush is controlled by Journal::SetFlushMode
// (usually it's true unless we are in the middle of a critical section that can not preempt).
void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool allow_flush) {
void SliceSnapshot::ConsumeJournalChange(const journal::JournalItem& item) {
{
// We grab the lock in case we are in the middle of serializing a bucket, so it serves as a
// barrier here for atomic serialization.
@ -424,12 +419,10 @@ void SliceSnapshot::OnJournalEntry(const journal::JournalItem& item, bool allow_
std::ignore = serializer_->WriteJournalEntry(item.data);
}
}
}
if (allow_flush) {
// This is the only place that flushes in streaming mode
// once the iterate buckets fiber finished.
PushSerialized(false);
}
void SliceSnapshot::ThrottleIfNeeded() {
PushSerialized(false);
}
size_t SliceSnapshot::GetBufferCapacity() const {

View file

@ -47,7 +47,7 @@ struct Entry;
// and submitting all values to an output sink.
// In journal streaming mode, the snapshot continues submitting changes
// over the sink until explicitly stopped.
class SliceSnapshot {
class SliceSnapshot : public journal::JournalConsumerInterface {
public:
// Represents a target for receiving snapshot data.
struct SnapshotDataConsumerInterface {
@ -103,6 +103,10 @@ class SliceSnapshot {
RdbSaver::SnapshotStats GetCurrentSnapshotProgress() const;
// Journal listener
void ConsumeJournalChange(const journal::JournalItem& item);
void ThrottleIfNeeded();
private:
// Main snapshotting fiber that iterates over all buckets in the db slice
// and submits them to SerializeBucket.

View file

@ -149,11 +149,19 @@ cv_status Transaction::BatonBarrier::Wait(time_point tp) {
Transaction::Guard::Guard(Transaction* tx) : tx(tx) {
DCHECK(tx->cid_->opt_mask() & CO::GLOBAL_TRANS);
tx->Execute([](auto*, auto*) { return OpStatus::OK; }, false);
auto cb = [&](Transaction* t, EngineShard* shard) {
namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(false);
return OpStatus::OK;
};
tx->Execute(cb, false);
}
Transaction::Guard::~Guard() {
tx->Conclude();
auto cb = [&](Transaction* t, EngineShard* shard) {
namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).SetExpireAllowed(true);
return OpStatus::OK;
};
tx->Execute(cb, true);
tx->Refurbish();
}

View file

@ -2959,12 +2959,15 @@ async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFa
await fill_task
@pytest.mark.skip("temporarily skipped")
async def test_bug_in_json_memory_tracking(df_factory: DflyInstanceFactory):
"""
This test reproduces a bug in the JSON memory tracking.
"""
master = df_factory.create(proactor_threads=2, serialization_max_chunk_size=1)
master = df_factory.create(
proactor_threads=2,
serialization_max_chunk_size=1,
vmodule="replica=2,dflycmd=2,snapshot=1,rdb_save=1,rdb_load=1,journal_slice=2",
)
replicas = [df_factory.create(proactor_threads=2) for i in range(2)]
# Start instances and connect clients
@ -2982,6 +2985,7 @@ async def test_bug_in_json_memory_tracking(df_factory: DflyInstanceFactory):
seeder = SeederV2(key_target=50_000)
fill_task = asyncio.create_task(seeder.run(master.client()))
await asyncio.sleep(0.2)
for replica in c_replicas:
await replica.execute_command(f"REPLICAOF LOCALHOST {master.port}")