From f3ce3ce0c80c90d21ba90ec8250f08da9a9e2533 Mon Sep 17 00:00:00 2001 From: Stepan Bagritsevich <43710058+BagritsevichStepan@users.noreply.github.com> Date: Tue, 11 Feb 2025 16:37:29 +0100 Subject: [PATCH] fix(stream_family): Fix replication for the XADD and XTRIM commands (#4591) * chore(stream): Revert changes in the redis code Signed-off-by: Stepan Bagritsevich * fix(stream_family): Fix replication for the XADD and XTRIM commands Signed-off-by: Stepan Bagritsevich --------- Signed-off-by: Stepan Bagritsevich --- src/core/interpreter.cc | 3 +- src/redis/stream.h | 12 +--- src/redis/t_stream.c | 50 ++++--------- src/server/stream_family.cc | 87 ++++++++++++++++------- tests/dragonfly/replication_test.py | 10 +++ tests/dragonfly/seeder/script-hashlib.lua | 4 ++ 6 files changed, 93 insertions(+), 73 deletions(-) diff --git a/src/core/interpreter.cc b/src/core/interpreter.cc index ded0515fb..99d347b69 100644 --- a/src/core/interpreter.cc +++ b/src/core/interpreter.cc @@ -76,8 +76,7 @@ struct StringCollectorTranslator : public ObjectExplorer { values.emplace_back(str); } void OnArrayStart(unsigned len) final { - CHECK(values.empty()); - values.reserve(len); + values.reserve(values.size() + len); } void OnArrayEnd() final { } diff --git a/src/redis/stream.h b/src/redis/stream.h index daa6809d6..9e03540ea 100644 --- a/src/redis/stream.h +++ b/src/redis/stream.h @@ -125,9 +125,6 @@ typedef struct { /* Prototypes of exported APIs. */ // struct client; -// Use this to in streamTrimByLength and streamTrimByID -#define NO_TRIM_LIMIT (-1) - /* Flags for streamCreateConsumer */ #define SCC_DEFAULT 0 #define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */ @@ -166,12 +163,9 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_ int streamDeleteItem(stream *s, streamID *id); void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id); long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id); -int64_t streamTrim(stream *s, streamAddTrimArgs *args, streamID *last_id); - -// If you don't want to specify a limit, use NO_TRIM_LIMIT -int64_t streamTrimByLength(stream *s, long long maxlen, int approx, streamID *last_id, long long limit); -int64_t streamTrimByID(stream *s, streamID minid, int approx, streamID *last_id, long long limit); - +int64_t streamTrim(stream *s, streamAddTrimArgs *args); +int64_t streamTrimByLength(stream *s, long long maxlen, int approx); +int64_t streamTrimByID(stream *s, streamID minid, int approx); void streamFreeCG(streamCG *cg); void streamDelConsumer(streamCG *cg, streamConsumer *consumer); void streamLastValidID(stream *s, streamID *maxid); diff --git a/src/redis/t_stream.c b/src/redis/t_stream.c index 64c5892c5..27a10b100 100644 --- a/src/redis/t_stream.c +++ b/src/redis/t_stream.c @@ -300,7 +300,7 @@ void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_i * that should be trimmed, there is a chance we will still have entries with * IDs < 'id' (or number of elements >= maxlen in case of MAXLEN). */ -int64_t streamTrim(stream *s, streamAddTrimArgs *args, streamID *last_id) { +int64_t streamTrim(stream *s, streamAddTrimArgs *args) { size_t maxlen = args->maxlen; streamID *id = &args->minid; int approx = args->approx_trim; @@ -315,8 +315,6 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args, streamID *last_id) { raxSeek(&ri,"^",NULL,0); int64_t deleted = 0; - streamID last_deleted_id = {0, 0}; // Initialize last deleted ID - while (raxNext(&ri)) { if (trim_strategy == TRIM_STRATEGY_MAXLEN && s->length <= maxlen) break; @@ -333,24 +331,16 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args, streamID *last_id) { streamID master_id = {0}; /* For MINID */ if (trim_strategy == TRIM_STRATEGY_MAXLEN) { remove_node = s->length - entries >= maxlen; - if (remove_node) { - streamDecodeID(ri.key, &master_id); - // Write last ID to last_deleted_id - lpGetEdgeStreamID(lp, 0, &master_id, &last_deleted_id); - } } else { /* Read the master ID from the radix tree key. */ streamDecodeID(ri.key, &master_id); - + /* Read last ID. */ streamID last_id = {0, 0}; lpGetEdgeStreamID(lp, 0, &master_id, &last_id); /* We can remove the entire node id its last ID < 'id' */ remove_node = streamCompareID(&last_id, id) < 0; - if (remove_node) { - last_deleted_id = last_id; - } } if (remove_node) { @@ -366,10 +356,6 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args, streamID *last_id) { * stop here. */ if (approx) break; - if (trim_strategy == TRIM_STRATEGY_MAXLEN) { - streamDecodeID(ri.key, &master_id); - } - /* Now we have to trim entries from within 'lp' */ int64_t deleted_from_lp = 0; @@ -400,7 +386,11 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args, streamID *last_id) { int64_t seq_delta = lpGetInteger(p); p = lpNext(lp, p); /* Skip ID seq delta */ - streamID currid = {master_id.ms + ms_delta, master_id.seq + seq_delta}; + streamID currid = {0}; /* For MINID */ + if (trim_strategy == TRIM_STRATEGY_MINID) { + currid.ms = master_id.ms + ms_delta; + currid.seq = master_id.seq + seq_delta; + } int stop; if (trim_strategy == TRIM_STRATEGY_MAXLEN) { @@ -432,7 +422,6 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args, streamID *last_id) { deleted_from_lp++; s->length--; p = lp + delta; - last_deleted_id = currid; } } deleted += deleted_from_lp; @@ -469,42 +458,29 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args, streamID *last_id) { streamGetEdgeID(s,1,1,&s->first_id); } - /* Set the last deleted ID, if applicable. */ - if (last_id) { - *last_id = last_deleted_id; - } - return deleted; } /* Trims a stream by length. Returns the number of deleted items. */ -int64_t streamTrimByLength(stream *s, long long maxlen, int approx, streamID *last_id, long long limit) { - if (limit == NO_TRIM_LIMIT) { - limit = approx ? 100 * server.stream_node_max_entries : 0; - } - +int64_t streamTrimByLength(stream *s, long long maxlen, int approx) { streamAddTrimArgs args = { .trim_strategy = TRIM_STRATEGY_MAXLEN, .approx_trim = approx, - .limit = limit, + .limit = approx ? 100 * server.stream_node_max_entries : 0, .maxlen = maxlen }; - return streamTrim(s, &args, last_id); + return streamTrim(s, &args); } /* Trims a stream by minimum ID. Returns the number of deleted items. */ -int64_t streamTrimByID(stream *s, streamID minid, int approx, streamID *last_id, long long limit) { - if (limit == NO_TRIM_LIMIT) { - limit = approx ? 100 * server.stream_node_max_entries : 0; - } - +int64_t streamTrimByID(stream *s, streamID minid, int approx) { streamAddTrimArgs args = { .trim_strategy = TRIM_STRATEGY_MINID, .approx_trim = approx, - .limit = limit, + .limit = approx ? 100 * server.stream_node_max_entries : 0, .minid = minid }; - return streamTrim(s, &args, last_id); + return streamTrim(s, &args); } /* Initialize the stream iterator, so that we can call iterating functions diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index a4e6cb082..402fbe691 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -75,6 +75,12 @@ struct RangeId { }; struct TrimOpts { + static constexpr int32_t kNoTrimLimit = -1; + + bool HasLimit() const { + return limit != kNoTrimLimit; + } + bool IsMaxLen() const { return std::holds_alternative(length_or_id); } @@ -89,7 +95,7 @@ struct TrimOpts { // First is MaxLen, second is MinId. std::variant length_or_id; - int32_t limit = NO_TRIM_LIMIT; + int32_t limit = kNoTrimLimit; bool approx = false; }; @@ -631,21 +637,30 @@ std::string StreamsIdToString(streamID id) { return absl::StrCat(id.ms, "-", id.seq); } -/* The first value represents the number of deleted items, and the second is the last trimmed ID. */ -std::pair TrimStream(const TrimOpts& opts, stream* s) { - streamID last_id = {0, 0}; - - auto trim = [&]() { +/* Return value represents the number of deleted items. */ +int64_t TrimStream(const TrimOpts& opts, stream* s) { + if (!opts.HasLimit()) { if (opts.IsMaxLen()) { - return streamTrimByLength(s, opts.AsMaxLen(), opts.approx, &last_id, opts.limit); + return streamTrimByLength(s, opts.AsMaxLen(), opts.approx); } else { const auto& min_id = opts.AsMinId().val; - return streamTrimByID(s, min_id, opts.approx, &last_id, opts.limit); + return streamTrimByID(s, min_id, opts.approx); } - }; + } - const int64_t deleted_items_number = trim(); - return {deleted_items_number, last_id}; + streamAddTrimArgs trim_args = {}; + trim_args.approx_trim = opts.approx; + trim_args.limit = opts.limit; + + if (opts.IsMaxLen()) { + trim_args.trim_strategy = TRIM_STRATEGY_MAXLEN; + trim_args.maxlen = opts.AsMaxLen(); + } else { + trim_args.trim_strategy = TRIM_STRATEGY_MINID; + trim_args.minid = opts.AsMinId().val; + } + + return streamTrim(s, &trim_args); } bool JournalAsMinId(const TrimOpts& opts) { @@ -697,30 +712,44 @@ OpResult OpAdd(const OpArgs& op_args, string_view key, const AddOpts& return OpStatus::OUT_OF_MEMORY; } - std::pair trim_result{}; if (opts.trim_opts) { - trim_result = TrimStream(opts.trim_opts.value(), stream_inst); + int64_t deleted_items_number = TrimStream(opts.trim_opts.value(), stream_inst); + VLOG(2) << "Trimmed " << deleted_items_number << " items from stream " << key + << " during the XADD command"; } mem_tracker.UpdateStreamSize(it->second); if (op_args.shard->journal()) { std::string result_id_as_string = StreamsIdToString(result_id); + const bool stream_is_empty = stream_inst->length == 0; - if (opts.trim_opts && JournalAsMinId(opts.trim_opts.value())) { - // We need to set exact MinId in the journal. - std::string last_id = StreamsIdToString(trim_result.second); - CmdArgVec journal_args = {key, "MINID"sv, "="sv, last_id}; + if (opts.trim_opts && (stream_is_empty || JournalAsMinId(opts.trim_opts.value()))) { + std::string last_id; + + CmdArgVec journal_args = {key}; journal_args.reserve(args.size() + 4); - if (opts.no_mkstream) { - journal_args.push_back("NOMKSTREAM"sv); + if (stream_is_empty) { + // We need remove the whole stream in replica + journal_args.emplace_back("MAXLEN"sv); + journal_args.emplace_back("0"sv); + } else { + // We need to set exact MinId in the journal. + // For this we are using new first_id from the stream + last_id = StreamsIdToString(stream_inst->first_id); + journal_args.emplace_back("MINID"sv); + journal_args.emplace_back(last_id); } - journal_args.push_back(result_id_as_string); + if (opts.no_mkstream) { + journal_args.emplace_back("NOMKSTREAM"sv); + } + + journal_args.emplace_back(result_id_as_string); for (size_t i = 0; i < args.size(); i++) { - journal_args.push_back(args[i]); + journal_args.emplace_back(args[i]); } RecordJournal(op_args, "XADD"sv, journal_args); @@ -2006,16 +2035,24 @@ OpResult OpTrim(const OpArgs& op_args, std::string_view key, const Trim CompactObj& cobj = res_it->it->second; stream* s = (stream*)cobj.RObjPtr(); - auto res = TrimStream(opts, s); + int64_t deleted_items_number = TrimStream(opts, s); mem_tracker.UpdateStreamSize(cobj); if (op_args.shard->journal() && journal_as_minid) { - std::string last_id = StreamsIdToString(res.second); - RecordJournal(op_args, "XTRIM"sv, ArgSlice{key, "MINID"sv, "="sv, last_id}); + const bool stream_is_empty = s->length == 0; + if (stream_is_empty) { + // We need remove the whole stream in replica + RecordJournal(op_args, "XTRIM"sv, ArgSlice{key, "MAXLEN"sv, "0"sv}); + } else { + // We need to set exact MinId in the journal. + // For this we are using new first_id from the stream + std::string last_id = StreamsIdToString(s->first_id); + RecordJournal(op_args, "XTRIM"sv, ArgSlice{key, "MINID"sv, last_id}); + } } - return res.first; + return deleted_items_number; } ParseResult ParseTrimOpts(bool max_len, CmdArgParser* parser) { diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 5e4b195ad..1e0867aec 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2790,6 +2790,16 @@ async def test_stream_approximate_trimming(df_factory): replica_data = await StaticSeeder.capture(c_replica) assert master_data == replica_data + # Step 3: Trim all streams to 0 + for i in range(num_streams): + stream_name = f"stream{i}" + await c_master.execute_command("XTRIM", stream_name, "MAXLEN", "0") + + # Check replica data consistent + master_data = await StaticSeeder.capture(c_master) + replica_data = await StaticSeeder.capture(c_replica) + assert master_data == replica_data + async def test_preempt_in_atomic_section_of_heartbeat(df_factory: DflyInstanceFactory): master = df_factory.create(proactor_threads=1, serialization_max_chunk_size=100000000000) diff --git a/tests/dragonfly/seeder/script-hashlib.lua b/tests/dragonfly/seeder/script-hashlib.lua index a2580d427..48eab90f0 100644 --- a/tests/dragonfly/seeder/script-hashlib.lua +++ b/tests/dragonfly/seeder/script-hashlib.lua @@ -29,3 +29,7 @@ function LH_funcs.json(key, hash) -- add values to hash, note JSON.GET returns just a string return dragonfly.ihash(hash, false, 'JSON.GET', key) end + +function LH_funcs.stream(key, hash) + return dragonfly.ihash(hash, false, 'XRANGE', key, '-', '+') +end