From 8a2d6ad1f4e7d34bbdca3ce3bb85a843531ec0b1 Mon Sep 17 00:00:00 2001 From: Kostas Kyrimis Date: Fri, 19 Jul 2024 10:03:17 +0300 Subject: [PATCH] fix: ub in RegisterOnChange and regression tests for big values (#3336) * fix replication test flag name for big values * fix a bug that triggers ub when RegisterOnChange is called on flows that iterate over the callbacks and preempt * add a stress test for big value serialization Signed-off-by: kostas --- src/server/db_slice.cc | 7 +++++++ src/server/db_slice.h | 31 +++++++++++++++++++++++++++++ tests/dragonfly/replication_test.py | 11 ++++++---- 3 files changed, 45 insertions(+), 4 deletions(-) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index ce8dc756f..02761db42 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -17,6 +17,7 @@ #include "server/server_state.h" #include "server/tiered_storage.h" #include "strings/human_readable.h" +#include "util/fibers/fibers.h" #include "util/fibers/stacktrace.h" ABSL_FLAG(bool, enable_heartbeat_eviction, true, @@ -1114,11 +1115,13 @@ void DbSlice::ExpireAllIfNeeded() { } uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) { + block_counter_.Wait(); return change_cb_.emplace_back(NextVersion(), std::move(cb)).first; } void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound) { FetchedItemsRestorer fetched_restorer(&fetched_items_); + std::unique_lock lk(block_counter_); uint64_t bucket_version = it.GetVersion(); // change_cb_ is ordered by version. @@ -1139,6 +1142,7 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_ //! Unregisters the callback. void DbSlice::UnregisterOnChange(uint64_t id) { + block_counter_.Wait(); auto it = find_if(change_cb_.begin(), change_cb_.end(), [id](const auto& cb) { return cb.first == id; }); CHECK(it != change_cb_.end()); @@ -1543,7 +1547,10 @@ void DbSlice::OnCbFinish() { void DbSlice::CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const { FetchedItemsRestorer fetched_restorer(&fetched_items_); + std::unique_lock lk(block_counter_); + for (const auto& ccb : change_cb_) { + CHECK(ccb.second); ccb.second(id, cr); } } diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 2c032f3ea..613fdb84a 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -13,6 +13,7 @@ #include "server/conn_context.h" #include "server/table.h" #include "util/fibers/fibers.h" +#include "util/fibers/synchronization.h" namespace dfly { @@ -524,6 +525,36 @@ class DbSlice { void CallChangeCallbacks(DbIndex id, const ChangeReq& cr) const; private: + class LocalBlockingCounter { + public: + void lock() { + ++mutating; + } + + void unlock() { + --mutating; + if (mutating == 0) { + cond_var.notify_one(); + } + } + + void Wait() { + util::fb2::NoOpLock noop_lk_; + cond_var.wait(noop_lk_, [this]() { return mutating == 0; }); + } + + private: + util::fb2::CondVarAny cond_var; + size_t mutating = 0; + }; + + // We need this because registered callbacks might yield. If RegisterOnChange + // gets called after we preempt while iterating over the registered callbacks + // (let's say in FlushChangeToEarlierCallbacks) we will get UB, because we pushed + // into a vector which might get resized, invalidating the iterators that are being + // used by the preempted FlushChangeToEarlierCallbacks. LocalBlockingCounter + // protects us against this case. + mutable LocalBlockingCounter block_counter_; ShardId shard_id_; uint8_t caching_mode_ : 1; diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 505b20a53..0d7614d63 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -55,6 +55,7 @@ Test full replication pipeline. Test full sync with streaming changes and stable pytest.param( 8, [8, 8], dict(key_target=1_000_000, units=16), 50_000, False, marks=M_STRESS ), + pytest.param(8, [8, 8], dict(key_target=1_000_000, units=16), 50_000, True, marks=M_STRESS), ], ) @pytest.mark.parametrize("mode", [({}), ({"cache_mode": "true"})]) @@ -67,14 +68,16 @@ async def test_replication_all( big_value, mode, ): + args = {} if mode: - mode["maxmemory"] = str(t_master * 256) + "mb" + args["cache_mode"] = "true" + args["maxmemory"] = str(t_master * 256) + "mb" if big_value: - mode["compression_mode"] = 0 - mode["flush_big_entries_threshold"] = 4096 + args["compression_mode"] = 0 + args["serialization_max_chunk_size"] = 4096 - master = df_factory.create(admin_port=ADMIN_PORT, proactor_threads=t_master, **mode) + master = df_factory.create(admin_port=ADMIN_PORT, proactor_threads=t_master, **args) replicas = [ df_factory.create(admin_port=ADMIN_PORT + i + 1, proactor_threads=t) for i, t in enumerate(t_replicas)