From b374947c38a1e65872790abaee467b4ed71511b0 Mon Sep 17 00:00:00 2001 From: Abhradeep Chakraborty Date: Wed, 17 May 2023 12:46:49 +0530 Subject: [PATCH] Support MINID, NOMKSTREAM and LIMIT options for XADD command (#1201) Signed-off-by: Abhradeep Chakraborty --- .pre-commit-config.yaml | 7 ++- src/redis/stream.h | 25 ++++++++++ src/redis/t_stream.c | 24 ---------- src/server/stream_family.cc | 82 +++++++++++++++++++++++++++----- src/server/stream_family_test.cc | 24 ++++++++++ 5 files changed, 123 insertions(+), 39 deletions(-) diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index b1599f856..f293f1855 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,9 @@ default_stages: [commit] -exclude: 'src\/redis\/.*' -exclude: 'contrib\/charts\/dragonfly\/ci\/.*' +exclude: | + (?x)( + src/redis/.* | + contrib/charts/dragonfly/ci/.* + ) repos: - repo: local hooks: diff --git a/src/redis/stream.h b/src/redis/stream.h index 745f3ab65..4ca981e1c 100644 --- a/src/redis/stream.h +++ b/src/redis/stream.h @@ -105,6 +105,26 @@ typedef struct streamPropInfo { robj *groupname; } streamPropInfo; +typedef struct { + /* XADD options */ + streamID id; /* User-provided ID, for XADD only. */ + int id_given; /* Was an ID different than "*" specified? for XADD only. */ + int seq_given; /* Was an ID different than "ms-*" specified? for XADD only. */ + int no_mkstream; /* if set to 1 do not create new stream */ + + /* XADD + XTRIM common options */ + int trim_strategy; /* TRIM_STRATEGY_* */ + int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */ + int approx_trim; /* If 1 only delete whole radix tree nodes, so + * the trim argument is not applied verbatim. */ + long long limit; /* Maximum amount of entries to trim. If 0, no limitation + * on the amount of trimming work is enforced. */ + /* TRIM_STRATEGY_MAXLEN options */ + long long maxlen; /* After trimming, leave stream at this length . */ + /* TRIM_STRATEGY_MINID options */ + streamID minid; /* Trim by ID (No stream entries with ID < 'minid' will remain) */ +} streamAddTrimArgs; + /* Prototypes of exported APIs. */ // struct client; @@ -119,6 +139,10 @@ typedef struct streamPropInfo { #define SCG_INVALID_ENTRIES_READ -1 +#define TRIM_STRATEGY_NONE 0 +#define TRIM_STRATEGY_MAXLEN 1 +#define TRIM_STRATEGY_MINID 2 + stream *streamNew(void); void freeStream(stream *s); unsigned long streamLength(const robj *subject); @@ -147,6 +171,7 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_ int streamDeleteItem(stream *s, streamID *id); void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id); long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id); +int64_t streamTrim(stream *s, streamAddTrimArgs *args); int64_t streamTrimByLength(stream *s, long long maxlen, int approx); int64_t streamTrimByID(stream *s, streamID minid, int approx); void streamFreeCG(streamCG *cg); diff --git a/src/redis/t_stream.c b/src/redis/t_stream.c index 9ca2577af..464817769 100644 --- a/src/redis/t_stream.c +++ b/src/redis/t_stream.c @@ -673,30 +673,6 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_ return C_OK; } -typedef struct { - /* XADD options */ - streamID id; /* User-provided ID, for XADD only. */ - int id_given; /* Was an ID different than "*" specified? for XADD only. */ - int seq_given; /* Was an ID different than "ms-*" specified? for XADD only. */ - int no_mkstream; /* if set to 1 do not create new stream */ - - /* XADD + XTRIM common options */ - int trim_strategy; /* TRIM_STRATEGY_* */ - int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */ - int approx_trim; /* If 1 only delete whole radix tree nodes, so - * the trim argument is not applied verbatim. */ - long long limit; /* Maximum amount of entries to trim. If 0, no limitation - * on the amount of trimming work is enforced. */ - /* TRIM_STRATEGY_MAXLEN options */ - long long maxlen; /* After trimming, leave stream at this length . */ - /* TRIM_STRATEGY_MINID options */ - streamID minid; /* Trim by ID (No stream entries with ID < 'minid' will remain) */ -} streamAddTrimArgs; - -#define TRIM_STRATEGY_NONE 0 -#define TRIM_STRATEGY_MAXLEN 1 -#define TRIM_STRATEGY_MINID 2 - /* Trim the stream 's' according to args->trim_strategy, and return the * number of elements removed from the stream. The 'approx' option, if non-zero, * specifies that the trimming must be performed in a approximated way in diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 62a896064..3227f0559 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -43,10 +43,20 @@ struct RangeId { bool exclude = false; }; +enum class TrimStrategy { + kAddOptsTrimNone = TRIM_STRATEGY_NONE, + kAddOptsTrimMaxLen = TRIM_STRATEGY_MAXLEN, + kAddOptsTrimMinId = TRIM_STRATEGY_MINID, +}; + struct AddOpts { ParsedStreamId parsed_id; - uint32_t max_limit = kuint32max; - bool max_limit_approx = false; + ParsedStreamId minid; + uint32_t max_len = kuint32max; + uint32_t limit = 0; + TrimStrategy trim_strategy = TrimStrategy::kAddOptsTrimNone; + bool trim_approx = false; + bool no_mkstream = false; }; struct GroupInfo { @@ -471,10 +481,19 @@ OpResult OpAdd(const OpArgs& op_args, string_view key, const AddOpts& auto& db_slice = op_args.shard->db_slice(); pair add_res; - try { - add_res = db_slice.AddOrFind(op_args.db_cntx, key); - } catch (bad_alloc&) { - return OpStatus::OUT_OF_MEMORY; + if (opts.no_mkstream) { + auto res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM); + if (!res_it) { + return res_it.status(); + } + add_res.first = res_it.value(); + add_res.second = false; + } else { + try { + add_res = db_slice.AddOrFind(op_args.db_cntx, key); + } catch (bad_alloc&) { + return OpStatus::OUT_OF_MEMORY; + } } robj* stream_obj = nullptr; @@ -508,10 +527,26 @@ OpResult OpAdd(const OpArgs& op_args, string_view key, const AddOpts& return OpStatus::OUT_OF_MEMORY; } - if (opts.max_limit < kuint32max) { - /* Notify xtrim event if needed. */ - streamTrimByLength(stream_inst, opts.max_limit, opts.max_limit_approx); - // TODO: when replicating, we should propagate it as exact limit in case of trimming. + if (!opts.limit) { + if (opts.trim_strategy == TrimStrategy::kAddOptsTrimMaxLen) { + /* Notify xtrim event if needed. */ + streamTrimByLength(stream_inst, opts.max_len, opts.trim_approx); + // TODO: when replicating, we should propagate it as exact limit in case of trimming. + } else if (opts.trim_strategy == TrimStrategy::kAddOptsTrimMinId) { + streamTrimByID(stream_inst, opts.minid.val, opts.trim_approx); + } + } else { + streamAddTrimArgs add_args = { + .trim_strategy = static_cast(opts.trim_strategy), + .approx_trim = opts.trim_approx, + .limit = opts.limit, + }; + if (opts.trim_strategy == TrimStrategy::kAddOptsTrimMaxLen) { + add_args.maxlen = opts.max_len; + } else if (opts.trim_strategy == TrimStrategy::kAddOptsTrimMinId) { + add_args.minid = opts.minid.val; + } + streamTrim(stream_inst, &add_args); } return result_id; } @@ -927,17 +962,38 @@ void StreamFamily::XAdd(CmdArgList args, ConnectionContext* cntx) { for (; id_indx < args.size(); ++id_indx) { ToUpper(&args[id_indx]); string_view arg = ArgS(args, id_indx); - if (arg == "MAXLEN") { + if (arg == "NOMKSTREAM") { + add_opts.no_mkstream = true; + } else if (arg == "MAXLEN" || arg == "MINID") { + if (arg == "MAXLEN") { + add_opts.trim_strategy = TrimStrategy::kAddOptsTrimMaxLen; + } else { + add_opts.trim_strategy = TrimStrategy::kAddOptsTrimMinId; + } if (id_indx + 2 >= args.size()) { return (*cntx)->SendError(kSyntaxErr); } ++id_indx; if (ArgS(args, id_indx) == "~") { - add_opts.max_limit_approx = true; + add_opts.trim_approx = true; ++id_indx; } arg = ArgS(args, id_indx); - if (!absl::SimpleAtoi(arg, &add_opts.max_limit)) { + if (add_opts.trim_strategy == TrimStrategy::kAddOptsTrimMaxLen && + !absl::SimpleAtoi(arg, &add_opts.max_len)) { + return (*cntx)->SendError(kSyntaxErr); + } + if (add_opts.trim_strategy == TrimStrategy::kAddOptsTrimMinId && + !ParseID(arg, false, 0, &add_opts.minid)) { + return (*cntx)->SendError(kSyntaxErr); + } + + } else if (arg == "LIMIT" && add_opts.trim_strategy != TrimStrategy::kAddOptsTrimNone) { + if (id_indx + 2 >= args.size() || !add_opts.trim_approx) { + return (*cntx)->SendError(kSyntaxErr); + } + ++id_indx; + if (!absl::SimpleAtoi(ArgS(args, id_indx), &add_opts.limit)) { return (*cntx)->SendError(kSyntaxErr); } } else { diff --git a/src/server/stream_family_test.cc b/src/server/stream_family_test.cc index b43da4366..1b26cc37c 100644 --- a/src/server/stream_family_test.cc +++ b/src/server/stream_family_test.cc @@ -39,6 +39,12 @@ TEST_F(StreamFamilyTest, Add) { resp = Run({"xadd", "key", "badid", "f1", "val1"}); EXPECT_THAT(resp, ErrArg("Invalid stream ID")); + + resp = Run({"xadd", "key", "nomkstream", "*", "field2", "value2"}); + ASSERT_THAT(resp, ArgType(RespExpr::STRING)); + + resp = Run({"xadd", "noexist", "nomkstream", "*", "field", "value"}); + EXPECT_THAT(resp, ErrArg("no such key")); } TEST_F(StreamFamilyTest, AddExtended) { @@ -62,6 +68,24 @@ TEST_F(StreamFamilyTest, AddExtended) { auto resp3 = Run({"xadd", "key", id2, "f1", "val1"}); EXPECT_THAT(resp3, ErrArg("equal or smaller than")); + + Run({"xadd", "key2", "5-0", "field", "val"}); + Run({"xadd", "key2", "6-0", "field1", "val1"}); + Run({"xadd", "key2", "7-0", "field2", "val2"}); + auto resp = Run({"xadd", "key2", "minid", "6", "*", "field3", "val3"}); + EXPECT_THAT(Run({"xlen", "key2"}), IntArg(3)); + EXPECT_THAT(Run({"xrange", "key2", "5-0", "5-0"}), ArrLen(0)); + + for (int i = 0; i < 700; i++) { + Run({"xadd", "key3", "*", "field", "val"}); + } + resp = Run({"xadd", "key3", "maxlen", "~", "500", "*", "field", "val"}); + EXPECT_THAT(Run({"xlen", "key3"}), IntArg(501)); + for (int i = 0; i < 700; i++) { + Run({"xadd", "key4", "*", "field", "val"}); + } + resp = Run({"xadd", "key4", "maxlen", "~", "500", "limit", "100", "*", "field", "val"}); + EXPECT_THAT(Run({"xlen", "key4"}), IntArg(601)); } TEST_F(StreamFamilyTest, Range) {