From 4ba1142cb35ba8c136541b4ce9e9a654f850eea2 Mon Sep 17 00:00:00 2001 From: Stepan Bagritsevich <43710058+BagritsevichStepan@users.noreply.github.com> Date: Wed, 5 Feb 2025 15:43:26 +0100 Subject: [PATCH] fix(stream_family): Fix journaling in the XADD and XTRIM commands (#4448) * fix(stream_family): Fix journaling in the XADD and XTRIM commands fixes dragonflydb#4202 Signed-off-by: Stepan Bagritsevich * fix(replication_test): Fix a typo Signed-off-by: Stepan Bagritsevich * feat(replication_test): Add test_stream_approximate_trimming test Signed-off-by: Stepan Bagritsevich * refactor: address comments Signed-off-by: Stepan Bagritsevich * refactor: address comments 2 Signed-off-by: Stepan Bagritsevich * fix(stream_family): Fix stream id journaling in the XADD command Signed-off-by: Stepan Bagritsevich --------- Signed-off-by: Stepan Bagritsevich --- src/facade/cmd_arg_parser.h | 4 + src/redis/stream.h | 12 +- src/redis/t_stream.c | 50 +++- src/server/stream_family.cc | 374 +++++++++++++++++----------- src/server/stream_family_test.cc | 2 +- tests/dragonfly/replication_test.py | 38 ++- 6 files changed, 320 insertions(+), 160 deletions(-) diff --git a/src/facade/cmd_arg_parser.h b/src/facade/cmd_arg_parser.h index 645453915..8c3b0c01f 100644 --- a/src/facade/cmd_arg_parser.h +++ b/src/facade/cmd_arg_parser.h @@ -174,6 +174,10 @@ struct CmdArgParser { return cur_i_ + i <= args_.size() && !error_; } + size_t GetCurrentIndex() const { + return cur_i_; + } + private: template std::optional> MapImpl(std::string_view arg, std::string_view tag, T&& value, diff --git a/src/redis/stream.h b/src/redis/stream.h index 9e03540ea..daa6809d6 100644 --- a/src/redis/stream.h +++ b/src/redis/stream.h @@ -125,6 +125,9 @@ typedef struct { /* Prototypes of exported APIs. */ // struct client; +// Use this to in streamTrimByLength and streamTrimByID +#define NO_TRIM_LIMIT (-1) + /* Flags for streamCreateConsumer */ #define SCC_DEFAULT 0 #define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */ @@ -163,9 +166,12 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_ int streamDeleteItem(stream *s, streamID *id); void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id); long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id); -int64_t streamTrim(stream *s, streamAddTrimArgs *args); -int64_t streamTrimByLength(stream *s, long long maxlen, int approx); -int64_t streamTrimByID(stream *s, streamID minid, int approx); +int64_t streamTrim(stream *s, streamAddTrimArgs *args, streamID *last_id); + +// If you don't want to specify a limit, use NO_TRIM_LIMIT +int64_t streamTrimByLength(stream *s, long long maxlen, int approx, streamID *last_id, long long limit); +int64_t streamTrimByID(stream *s, streamID minid, int approx, streamID *last_id, long long limit); + void streamFreeCG(streamCG *cg); void streamDelConsumer(streamCG *cg, streamConsumer *consumer); void streamLastValidID(stream *s, streamID *maxid); diff --git a/src/redis/t_stream.c b/src/redis/t_stream.c index 27a10b100..64c5892c5 100644 --- a/src/redis/t_stream.c +++ b/src/redis/t_stream.c @@ -300,7 +300,7 @@ void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_i * that should be trimmed, there is a chance we will still have entries with * IDs < 'id' (or number of elements >= maxlen in case of MAXLEN). */ -int64_t streamTrim(stream *s, streamAddTrimArgs *args) { +int64_t streamTrim(stream *s, streamAddTrimArgs *args, streamID *last_id) { size_t maxlen = args->maxlen; streamID *id = &args->minid; int approx = args->approx_trim; @@ -315,6 +315,8 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) { raxSeek(&ri,"^",NULL,0); int64_t deleted = 0; + streamID last_deleted_id = {0, 0}; // Initialize last deleted ID + while (raxNext(&ri)) { if (trim_strategy == TRIM_STRATEGY_MAXLEN && s->length <= maxlen) break; @@ -331,16 +333,24 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) { streamID master_id = {0}; /* For MINID */ if (trim_strategy == TRIM_STRATEGY_MAXLEN) { remove_node = s->length - entries >= maxlen; + if (remove_node) { + streamDecodeID(ri.key, &master_id); + // Write last ID to last_deleted_id + lpGetEdgeStreamID(lp, 0, &master_id, &last_deleted_id); + } } else { /* Read the master ID from the radix tree key. */ streamDecodeID(ri.key, &master_id); - + /* Read last ID. */ streamID last_id = {0, 0}; lpGetEdgeStreamID(lp, 0, &master_id, &last_id); /* We can remove the entire node id its last ID < 'id' */ remove_node = streamCompareID(&last_id, id) < 0; + if (remove_node) { + last_deleted_id = last_id; + } } if (remove_node) { @@ -356,6 +366,10 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) { * stop here. */ if (approx) break; + if (trim_strategy == TRIM_STRATEGY_MAXLEN) { + streamDecodeID(ri.key, &master_id); + } + /* Now we have to trim entries from within 'lp' */ int64_t deleted_from_lp = 0; @@ -386,11 +400,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) { int64_t seq_delta = lpGetInteger(p); p = lpNext(lp, p); /* Skip ID seq delta */ - streamID currid = {0}; /* For MINID */ - if (trim_strategy == TRIM_STRATEGY_MINID) { - currid.ms = master_id.ms + ms_delta; - currid.seq = master_id.seq + seq_delta; - } + streamID currid = {master_id.ms + ms_delta, master_id.seq + seq_delta}; int stop; if (trim_strategy == TRIM_STRATEGY_MAXLEN) { @@ -422,6 +432,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) { deleted_from_lp++; s->length--; p = lp + delta; + last_deleted_id = currid; } } deleted += deleted_from_lp; @@ -458,29 +469,42 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) { streamGetEdgeID(s,1,1,&s->first_id); } + /* Set the last deleted ID, if applicable. */ + if (last_id) { + *last_id = last_deleted_id; + } + return deleted; } /* Trims a stream by length. Returns the number of deleted items. */ -int64_t streamTrimByLength(stream *s, long long maxlen, int approx) { +int64_t streamTrimByLength(stream *s, long long maxlen, int approx, streamID *last_id, long long limit) { + if (limit == NO_TRIM_LIMIT) { + limit = approx ? 100 * server.stream_node_max_entries : 0; + } + streamAddTrimArgs args = { .trim_strategy = TRIM_STRATEGY_MAXLEN, .approx_trim = approx, - .limit = approx ? 100 * server.stream_node_max_entries : 0, + .limit = limit, .maxlen = maxlen }; - return streamTrim(s, &args); + return streamTrim(s, &args, last_id); } /* Trims a stream by minimum ID. Returns the number of deleted items. */ -int64_t streamTrimByID(stream *s, streamID minid, int approx) { +int64_t streamTrimByID(stream *s, streamID minid, int approx, streamID *last_id, long long limit) { + if (limit == NO_TRIM_LIMIT) { + limit = approx ? 100 * server.stream_node_max_entries : 0; + } + streamAddTrimArgs args = { .trim_strategy = TRIM_STRATEGY_MINID, .approx_trim = approx, - .limit = approx ? 100 * server.stream_node_max_entries : 0, + .limit = limit, .minid = minid }; - return streamTrim(s, &args); + return streamTrim(s, &args, last_id); } /* Initialize the stream iterator, so that we can call iterating functions diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index c64ddde04..d02e7e207 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -48,6 +48,14 @@ struct Record { using RecordVec = vector; +using nonstd::make_unexpected; + +template using ParseResult = io::Result; + +nonstd::unexpected_type CreateSyntaxError(std::string_view message) { + return make_unexpected(ErrorReply{message, kSyntaxErrType}); +} + struct ParsedStreamId { streamID val; @@ -66,25 +74,43 @@ struct RangeId { bool exclude = false; }; -enum class TrimStrategy { - kNone = TRIM_STRATEGY_NONE, - kMaxLen = TRIM_STRATEGY_MAXLEN, - kMinId = TRIM_STRATEGY_MINID, +struct TrimOpts { + bool IsMaxLen() const { + return std::holds_alternative(length_or_id); + } + + uint32_t AsMaxLen() const { + return std::get(length_or_id); + } + + const ParsedStreamId& AsMinId() const { + return std::get(length_or_id); + } + + // First is MaxLen, second is MinId. + std::variant length_or_id; + int32_t limit = NO_TRIM_LIMIT; + bool approx = false; }; -struct AddTrimOpts { - string_view key; - ParsedStreamId minid; - uint32_t max_len = kuint32max; - uint32_t limit = 0; - TrimStrategy trim_strategy = TrimStrategy::kNone; - bool trim_approx = false; - - // XADD only. +struct AddOpts { + std::optional trim_opts; ParsedStreamId parsed_id; bool no_mkstream = false; }; +/* Used to journal the XADD command. + The actual stream ID assigned after adding may differ from the one specified in the command. + So, for the replica, we need to specify the exact ID that was actually added. */ +struct AddArgsJournaler { + void SetStreamId(std::string_view stream_id) { + add_args[stream_id_index] = stream_id; + } + + CmdArgVec add_args; + size_t stream_id_index; +}; + struct NACKInfo { streamID pel_id; string consumer_name; @@ -166,6 +192,8 @@ struct ReadOpts { bool noack = false; }; +const char kTrimOptionConflictErr[] = + "MAXLEN and MINID options at the same time are not compatible"; const char kInvalidStreamId[] = "Invalid stream ID specified as stream command argument"; const char kXGroupKeyNotFound[] = "The XGROUP subcommand requires the key to exist. " @@ -599,45 +627,44 @@ streamNACK* StreamCreateNACK(streamConsumer* consumer, uint64_t now_ms) { return nack; } -int StreamTrim(const AddTrimOpts& opts, stream* s) { - if (!opts.limit) { - if (opts.trim_strategy == TrimStrategy::kMaxLen) { - /* Notify xtrim event if needed. */ - return streamTrimByLength(s, opts.max_len, opts.trim_approx); - // TODO: when replicating, we should propagate it as exact limit in case of trimming. - } else if (opts.trim_strategy == TrimStrategy::kMinId) { - return streamTrimByID(s, opts.minid.val, opts.trim_approx); - } - } else { - streamAddTrimArgs trim_args = {}; - trim_args.trim_strategy = static_cast(opts.trim_strategy); - trim_args.approx_trim = opts.trim_approx; - trim_args.limit = opts.limit; - - if (opts.trim_strategy == TrimStrategy::kMaxLen) { - trim_args.maxlen = opts.max_len; - } else if (opts.trim_strategy == TrimStrategy::kMinId) { - trim_args.minid = opts.minid.val; - } - return streamTrim(s, &trim_args); - } - - return 0; +std::string StreamsIdToString(streamID id) { + return absl::StrCat(id.ms, "-", id.seq); } -OpResult OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgList args) { - DCHECK(!args.empty() && args.size() % 2 == 0); - auto& db_slice = op_args.GetDbSlice(); - DbSlice::AddOrFindResult add_res; +/* The first value represents the number of deleted items, and the second is the last trimmed ID. */ +std::pair TrimStream(const TrimOpts& opts, stream* s) { + streamID last_id = {0, 0}; - if (opts.no_mkstream) { - auto res_it = db_slice.FindMutable(op_args.db_cntx, opts.key, OBJ_STREAM); - if (!res_it) { - return res_it.status(); + auto trim = [&]() { + if (opts.IsMaxLen()) { + return streamTrimByLength(s, opts.AsMaxLen(), opts.approx, &last_id, opts.limit); + } else { + const auto& min_id = opts.AsMinId().val; + return streamTrimByID(s, min_id, opts.approx, &last_id, opts.limit); } + }; + + const int64_t deleted_items_number = trim(); + return {deleted_items_number, last_id}; +} + +bool JournalAsMinId(const TrimOpts& opts) { + return opts.approx || opts.IsMaxLen(); +} + +OpResult OpAdd(const OpArgs& op_args, string_view key, const AddOpts& opts, + CmdArgList args, AddArgsJournaler journaler) { + DCHECK(!args.empty() && args.size() % 2 == 0); + + auto& db_slice = op_args.GetDbSlice(); + + DbSlice::AddOrFindResult add_res; + if (opts.no_mkstream) { + auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM); + RETURN_ON_BAD_STATUS(res_it); add_res = std::move(*res_it); } else { - auto op_res = db_slice.AddOrFind(op_args.db_cntx, opts.key); + auto op_res = db_slice.AddOrFind(op_args.db_cntx, key); RETURN_ON_BAD_STATUS(op_res); add_res = std::move(*op_res); } @@ -670,13 +697,42 @@ OpResult OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgL return OpStatus::OUT_OF_MEMORY; } - StreamTrim(opts, stream_inst); + std::pair trim_result{}; + if (opts.trim_opts) { + trim_result = TrimStream(opts.trim_opts.value(), stream_inst); + } mem_tracker.UpdateStreamSize(it->second); + if (op_args.shard->journal()) { + std::string result_id_as_string = StreamsIdToString(result_id); + + if (opts.trim_opts && JournalAsMinId(opts.trim_opts.value())) { + // We need to set exact MinId in the journal. + std::string last_id = StreamsIdToString(trim_result.second); + CmdArgVec journal_args = {key, "MINID"sv, "="sv, last_id}; + journal_args.reserve(args.size() + 4); + + if (opts.no_mkstream) { + journal_args.push_back("NOMKSTREAM"sv); + } + + journal_args.push_back(result_id_as_string); + + for (size_t i = 0; i < args.size(); i++) { + journal_args.push_back(args[i]); + } + + RecordJournal(op_args, "XADD"sv, journal_args); + } else { + journaler.SetStreamId(result_id_as_string); + RecordJournal(op_args, "XADD"sv, journaler.add_args); + } + } + auto blocking_controller = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id()); if (blocking_controller) { - blocking_controller->AwakeWatched(op_args.db_cntx.db_index, opts.key); + blocking_controller->AwakeWatched(op_args.db_cntx.db_index, key); } return result_id; @@ -1934,9 +1990,10 @@ void XGroupHelp(CmdArgList args, const CommandContext& cmd_cntx) { return rb->SendSimpleStrArr(help_arr); } -OpResult OpTrim(const OpArgs& op_args, const AddTrimOpts& opts) { +OpResult OpTrim(const OpArgs& op_args, std::string_view key, const TrimOpts& opts, + bool journal_as_minid) { auto& db_slice = op_args.GetDbSlice(); - auto res_it = db_slice.FindMutable(op_args.db_cntx, opts.key, OBJ_STREAM); + auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM); if (!res_it) { if (res_it.status() == OpStatus::KEY_NOTFOUND) { return 0; @@ -1949,77 +2006,93 @@ OpResult OpTrim(const OpArgs& op_args, const AddTrimOpts& opts) { CompactObj& cobj = res_it->it->second; stream* s = (stream*)cobj.RObjPtr(); - auto res = StreamTrim(opts, s); + auto res = TrimStream(opts, s); mem_tracker.UpdateStreamSize(cobj); + + if (op_args.shard->journal() && journal_as_minid) { + std::string last_id = StreamsIdToString(res.second); + RecordJournal(op_args, "XTRIM"sv, ArgSlice{key, "MINID"sv, "="sv, last_id}); + } + + return res.first; +} + +ParseResult ParseTrimOpts(bool max_len, CmdArgParser* parser) { + TrimOpts opts; + opts.approx = parser->Check("~"); + if (!opts.approx) { + parser->Check("="); + } + + if (max_len) { + opts.length_or_id = parser->Next(); + } else { + ParsedStreamId parsed_id; + if (!ParseID(parser->Next(), false, 0, &parsed_id)) { + return CreateSyntaxError(kSyntaxErr); + } + + opts.length_or_id = parsed_id; // trivial copy + } + + if (parser->Check("LIMIT")) { + if (!opts.approx) { + return CreateSyntaxError(kSyntaxErr); + } + + opts.limit = parser->Next(); + } + + return opts; +} + +ParseResult ParseTrimOpts(CmdArgParser* parser) { + bool max_len = parser->Check("MAXLEN"); + if (!max_len) { + parser->ExpectTag("MINID"); + } + + auto res = ParseTrimOpts(max_len, parser); + + if (parser->Check("MAXLEN") || parser->Check("MINID")) { + return CreateSyntaxError(kTrimOptionConflictErr); + } + return res; } -optional> ParseAddOrTrimArgsOrReply(CmdArgList args, bool is_xadd, - SinkReplyBuilder* builder) { - AddTrimOpts opts; - opts.key = ArgS(args, 0); - - unsigned id_indx = 1; - for (; id_indx < args.size(); ++id_indx) { - string arg = absl::AsciiStrToUpper(ArgS(args, id_indx)); - size_t remaining_args = args.size() - id_indx - 1; - - if (is_xadd && arg == "NOMKSTREAM") { +ParseResult ParseAddOpts(CmdArgParser* parser) { + AddOpts opts; + while (parser->HasNext()) { + if (parser->Check("NOMKSTREAM")) { opts.no_mkstream = true; - } else if ((arg == "MAXLEN" || arg == "MINID") && remaining_args >= 1) { - if (opts.trim_strategy != TrimStrategy::kNone) { - builder->SendError("MAXLEN and MINID options at the same time are not compatible", - kSyntaxErr); - return std::nullopt; + continue; + } + + bool max_len = parser->Check("MAXLEN"); + if (max_len || parser->Check("MINID")) { + if (opts.trim_opts) { + return CreateSyntaxError(kTrimOptionConflictErr); } - if (arg == "MAXLEN") { - opts.trim_strategy = TrimStrategy::kMaxLen; - } else { - opts.trim_strategy = TrimStrategy::kMinId; + auto trim_opts = ParseTrimOpts(max_len, parser); + if (!trim_opts) { + return make_unexpected(trim_opts.error()); } - id_indx++; - arg = ArgS(args, id_indx); - if (remaining_args >= 2 && arg == "~") { - opts.trim_approx = true; - id_indx++; - arg = ArgS(args, id_indx); - } else if (remaining_args >= 2 && arg == "=") { - opts.trim_approx = false; - id_indx++; - arg = ArgS(args, id_indx); - } - - if (opts.trim_strategy == TrimStrategy::kMaxLen && !absl::SimpleAtoi(arg, &opts.max_len)) { - builder->SendError(kInvalidIntErr); - return std::nullopt; - } - if (opts.trim_strategy == TrimStrategy::kMinId && !ParseID(arg, false, 0, &opts.minid)) { - builder->SendError(kSyntaxErr); - return std::nullopt; - } - } else if (arg == "LIMIT" && remaining_args >= 1 && opts.trim_strategy != TrimStrategy::kNone) { - if (!opts.trim_approx) { - builder->SendError(kSyntaxErr); - return std::nullopt; - } - ++id_indx; - if (!absl::SimpleAtoi(ArgS(args, id_indx), &opts.limit)) { - builder->SendError(kSyntaxErr); - return std::nullopt; - } - } else if (is_xadd) { - // There are still remaining field args. - break; + opts.trim_opts = trim_opts.value(); // trivial copy } else { - builder->SendError(kSyntaxErr); - return std::nullopt; + // It is StreamId + std::string_view id = parser->Next(); + if (!ParseID(id, true, 0, &opts.parsed_id)) { + return CreateSyntaxError(kInvalidStreamId); + } + break; } } - return make_pair(opts, id_indx); + return opts; } struct StreamReplies { @@ -2547,46 +2620,47 @@ bool ParseXpendingOptions(CmdArgList& args, PendingOpts& opts, SinkReplyBuilder* } // namespace void StreamFamily::XAdd(CmdArgList args, const CommandContext& cmd_cntx) { - auto parse_resp = ParseAddOrTrimArgsOrReply(args, true, cmd_cntx.rb); - if (!parse_resp) { + CmdArgParser parser{args}; + auto* rb = static_cast(cmd_cntx.rb); + + string_view key = parser.Next(); + + auto parsed_add_opts = ParseAddOpts(&parser); + + if (auto err = parser.Error(); err || !parsed_add_opts) { + rb->SendError(!parsed_add_opts ? parsed_add_opts.error() : err->MakeReply()); return; } - auto add_opts = parse_resp->first; - auto id_indx = parse_resp->second; + // Save the index of the stream ID in the arguments list. + // We need this during journaling + // It is (parser.GetCurrentIndex() - 1) because the stream id is the last parsed argument in the + // ParseAddOpts + const size_t stream_id_index_in_args = parser.GetCurrentIndex() - 1; + AddArgsJournaler journaler{{args.begin(), args.end()}, stream_id_index_in_args}; - args.remove_prefix(id_indx); - if (args.size() < 2 || args.size() % 2 == 0) { - return cmd_cntx.rb->SendError(WrongNumArgsError("XADD"), kSyntaxErrType); + CmdArgList fields = parser.Tail(); + if (fields.empty() || fields.size() % 2 != 0) { + return rb->SendError(WrongNumArgsError("XADD"), kSyntaxErrType); } - string_view id = ArgS(args, 0); - - if (!ParseID(id, true, 0, &add_opts.parsed_id)) { - return cmd_cntx.rb->SendError(kInvalidStreamId, kSyntaxErrType); - } - - args.remove_prefix(1); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpAdd(t->GetOpArgs(shard), add_opts, args); + return OpAdd(t->GetOpArgs(shard), key, parsed_add_opts.value(), fields, journaler); }; OpResult add_result = cmd_cntx.tx->ScheduleSingleHopT(cb); - auto* rb = static_cast(cmd_cntx.rb); if (add_result) { - return rb->SendBulkString(StreamIdRepr(*add_result)); + rb->SendBulkString(StreamIdRepr(*add_result)); + } else { + if (add_result == OpStatus::KEY_NOTFOUND) { + rb->SendNull(); + } else if (add_result == OpStatus::STREAM_ID_SMALL) { + rb->SendError(LeqTopIdError("XADD")); + } else { + rb->SendError(add_result.status()); + } } - - if (add_result == OpStatus::KEY_NOTFOUND) { - return rb->SendNull(); - } - - if (add_result.status() == OpStatus::STREAM_ID_SMALL) { - return cmd_cntx.rb->SendError(LeqTopIdError("XADD")); - } - - return cmd_cntx.rb->SendError(add_result.status()); } absl::InlinedVector GetXclaimIds(CmdArgList& args) { @@ -3173,22 +3247,35 @@ void StreamFamily::XSetId(CmdArgList args, const CommandContext& cmd_cntx) { } void StreamFamily::XTrim(CmdArgList args, const CommandContext& cmd_cntx) { - auto parse_resp = ParseAddOrTrimArgsOrReply(args, true, cmd_cntx.rb); - if (!parse_resp) { + CmdArgParser parser{args}; + auto* rb = static_cast(cmd_cntx.rb); + + std::string_view key = parser.Next(); + + auto parsed_trim_opts = ParseTrimOpts(&parser); + if (!parsed_trim_opts || !parser.Finalize()) { + rb->SendError(!parsed_trim_opts ? parsed_trim_opts.error() : parser.Error()->MakeReply()); return; } - auto trim_opts = parse_resp->first; + auto& trim_opts = parsed_trim_opts.value(); + + // We can auto-journal if we are not trimming approximately or by maxlen + const bool enable_auto_journaling = !JournalAsMinId(trim_opts); + if (enable_auto_journaling) { + cmd_cntx.tx->ReviveAutoJournal(); + } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpTrim(t->GetOpArgs(shard), trim_opts); + return OpTrim(t->GetOpArgs(shard), key, trim_opts, !enable_auto_journaling); }; OpResult trim_result = cmd_cntx.tx->ScheduleSingleHopT(cb); if (trim_result) { - return cmd_cntx.rb->SendLong(*trim_result); + rb->SendLong(*trim_result); + } else { + rb->SendError(trim_result.status()); } - return cmd_cntx.rb->SendError(trim_result.status()); } void StreamFamily::XAck(CmdArgList args, const CommandContext& cmd_cntx) { @@ -3315,7 +3402,9 @@ void StreamFamily::Register(CommandRegistry* registry) { using CI = CommandId; registry->StartFamily(); constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::VARIADIC_KEYS; - *registry << CI{"XADD", CO::WRITE | CO::DENYOOM | CO::FAST, -5, 1, 1, acl::kXAdd}.HFUNC(XAdd) + *registry << CI{"XADD", CO::WRITE | CO::DENYOOM | CO::FAST | CO::NO_AUTOJOURNAL, -5, 1, 1, + acl::kXAdd} + .HFUNC(XAdd) << CI{"XCLAIM", CO::WRITE | CO::FAST, -6, 1, 1, acl::kXClaim}.HFUNC(XClaim) << CI{"XDEL", CO::WRITE | CO::FAST, -3, 1, 1, acl::kXDel}.HFUNC(XDel) << CI{"XGROUP", CO::WRITE | CO::DENYOOM, -3, 2, 2, acl::kXGroup}.HFUNC(XGroup) @@ -3327,7 +3416,8 @@ void StreamFamily::Register(CommandRegistry* registry) { << CI{"XREAD", kReadFlags, -3, 3, 3, acl::kXRead}.HFUNC(XRead) << CI{"XREADGROUP", kReadFlags, -6, 6, 6, acl::kXReadGroup}.HFUNC(XReadGroup) << CI{"XSETID", CO::WRITE, 3, 1, 1, acl::kXSetId}.HFUNC(XSetId) - << CI{"XTRIM", CO::WRITE | CO::FAST, -4, 1, 1, acl::kXTrim}.HFUNC(XTrim) + << CI{"XTRIM", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, -4, 1, 1, acl::kXTrim}.HFUNC( + XTrim) << CI{"_XGROUP_HELP", CO::NOSCRIPT | CO::HIDDEN, 2, 0, 0, acl::kXGroupHelp}.SetHandler( XGroupHelp) << CI{"XACK", CO::WRITE | CO::FAST, -4, 1, 1, acl::kXAck}.HFUNC(XAck) diff --git a/src/server/stream_family_test.cc b/src/server/stream_family_test.cc index 0872b7953..56e05f22e 100644 --- a/src/server/stream_family_test.cc +++ b/src/server/stream_family_test.cc @@ -693,7 +693,7 @@ TEST_F(StreamFamilyTest, XTrimInvalidArgs) { // Invalid limit. resp = Run({"xtrim", "foo", "maxlen", "~", "2", "limit", "nan"}); - EXPECT_THAT(resp, ErrArg("syntax error")); + EXPECT_THAT(resp, ErrArg("value is not an integer or out of range")); } TEST_F(StreamFamilyTest, XPending) { Run({"xadd", "foo", "1-0", "k1", "v1"}); diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index a542c01bf..add96839c 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2715,7 +2715,7 @@ async def test_big_containers(df_factory, element_size, elements_number): logging.info(f"Replica Used memory {replica_used_memory}, peak memory {replica_peak_memory}") assert replica_peak_memory < 1.1 * replica_used_memory - # Check replica data consisten + # Check replica data consistent replica_data = await StaticSeeder.capture(c_replica) master_data = await StaticSeeder.capture(c_master) assert master_data == replica_data @@ -2734,3 +2734,39 @@ async def test_master_too_big(df_factory): # We should never sync due to used memory too high during full sync with pytest.raises(TimeoutError): await wait_available_async(c_replica, timeout=10) + + +@dfly_args({"proactor_threads": 4}) +async def test_stream_approximate_trimming(df_factory): + master = df_factory.create() + replica = df_factory.create() + + df_factory.start_all([master, replica]) + c_master = master.client() + c_replica = replica.client() + + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + await wait_for_replicas_state(c_replica) + + # Step 1: Populate master with 100 streams, each containing 200 entries + num_streams = 100 + entries_per_stream = 200 + + for i in range(num_streams): + stream_name = f"stream{i}" + for j in range(entries_per_stream): + await c_master.execute_command("XADD", stream_name, "*", f"field{j}", f"value{j}") + + # Step 2: Trim each stream to a random size between 70 and 200 + for i in range(num_streams): + stream_name = f"stream{i}" + trim_size = random.randint(70, entries_per_stream) + await c_master.execute_command("XTRIM", stream_name, "MAXLEN", "~", trim_size) + + # Wait for replica sync + await asyncio.sleep(1) + + # Check replica data consistent + master_data = await StaticSeeder.capture(c_master) + replica_data = await StaticSeeder.capture(c_replica) + assert master_data == replica_data