Support MINID, NOMKSTREAM and LIMIT options for XADD command (#1201)

Signed-off-by: Abhradeep Chakraborty <chakrabortyabhradeep79@gmail.com>
This commit is contained in:
Abhradeep Chakraborty 2023-05-17 12:46:49 +05:30 committed by GitHub
parent 70d9a79af7
commit b374947c38
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 123 additions and 39 deletions

View file

@ -1,6 +1,9 @@
default_stages: [commit]
exclude: 'src\/redis\/.*'
exclude: 'contrib\/charts\/dragonfly\/ci\/.*'
exclude: |
(?x)(
src/redis/.* |
contrib/charts/dragonfly/ci/.*
)
repos:
- repo: local
hooks:

View file

@ -105,6 +105,26 @@ typedef struct streamPropInfo {
robj *groupname;
} streamPropInfo;
typedef struct {
/* XADD options */
streamID id; /* User-provided ID, for XADD only. */
int id_given; /* Was an ID different than "*" specified? for XADD only. */
int seq_given; /* Was an ID different than "ms-*" specified? for XADD only. */
int no_mkstream; /* if set to 1 do not create new stream */
/* XADD + XTRIM common options */
int trim_strategy; /* TRIM_STRATEGY_* */
int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */
int approx_trim; /* If 1 only delete whole radix tree nodes, so
* the trim argument is not applied verbatim. */
long long limit; /* Maximum amount of entries to trim. If 0, no limitation
* on the amount of trimming work is enforced. */
/* TRIM_STRATEGY_MAXLEN options */
long long maxlen; /* After trimming, leave stream at this length . */
/* TRIM_STRATEGY_MINID options */
streamID minid; /* Trim by ID (No stream entries with ID < 'minid' will remain) */
} streamAddTrimArgs;
/* Prototypes of exported APIs. */
// struct client;
@ -119,6 +139,10 @@ typedef struct streamPropInfo {
#define SCG_INVALID_ENTRIES_READ -1
#define TRIM_STRATEGY_NONE 0
#define TRIM_STRATEGY_MAXLEN 1
#define TRIM_STRATEGY_MINID 2
stream *streamNew(void);
void freeStream(stream *s);
unsigned long streamLength(const robj *subject);
@ -147,6 +171,7 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
int streamDeleteItem(stream *s, streamID *id);
void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id);
long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id);
int64_t streamTrim(stream *s, streamAddTrimArgs *args);
int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
int64_t streamTrimByID(stream *s, streamID minid, int approx);
void streamFreeCG(streamCG *cg);

View file

@ -673,30 +673,6 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
return C_OK;
}
typedef struct {
/* XADD options */
streamID id; /* User-provided ID, for XADD only. */
int id_given; /* Was an ID different than "*" specified? for XADD only. */
int seq_given; /* Was an ID different than "ms-*" specified? for XADD only. */
int no_mkstream; /* if set to 1 do not create new stream */
/* XADD + XTRIM common options */
int trim_strategy; /* TRIM_STRATEGY_* */
int trim_strategy_arg_idx; /* Index of the count in MAXLEN/MINID, for rewriting. */
int approx_trim; /* If 1 only delete whole radix tree nodes, so
* the trim argument is not applied verbatim. */
long long limit; /* Maximum amount of entries to trim. If 0, no limitation
* on the amount of trimming work is enforced. */
/* TRIM_STRATEGY_MAXLEN options */
long long maxlen; /* After trimming, leave stream at this length . */
/* TRIM_STRATEGY_MINID options */
streamID minid; /* Trim by ID (No stream entries with ID < 'minid' will remain) */
} streamAddTrimArgs;
#define TRIM_STRATEGY_NONE 0
#define TRIM_STRATEGY_MAXLEN 1
#define TRIM_STRATEGY_MINID 2
/* Trim the stream 's' according to args->trim_strategy, and return the
* number of elements removed from the stream. The 'approx' option, if non-zero,
* specifies that the trimming must be performed in a approximated way in

View file

@ -43,10 +43,20 @@ struct RangeId {
bool exclude = false;
};
enum class TrimStrategy {
kAddOptsTrimNone = TRIM_STRATEGY_NONE,
kAddOptsTrimMaxLen = TRIM_STRATEGY_MAXLEN,
kAddOptsTrimMinId = TRIM_STRATEGY_MINID,
};
struct AddOpts {
ParsedStreamId parsed_id;
uint32_t max_limit = kuint32max;
bool max_limit_approx = false;
ParsedStreamId minid;
uint32_t max_len = kuint32max;
uint32_t limit = 0;
TrimStrategy trim_strategy = TrimStrategy::kAddOptsTrimNone;
bool trim_approx = false;
bool no_mkstream = false;
};
struct GroupInfo {
@ -471,10 +481,19 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const AddOpts&
auto& db_slice = op_args.shard->db_slice();
pair<PrimeIterator, bool> add_res;
try {
add_res = db_slice.AddOrFind(op_args.db_cntx, key);
} catch (bad_alloc&) {
return OpStatus::OUT_OF_MEMORY;
if (opts.no_mkstream) {
auto res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM);
if (!res_it) {
return res_it.status();
}
add_res.first = res_it.value();
add_res.second = false;
} else {
try {
add_res = db_slice.AddOrFind(op_args.db_cntx, key);
} catch (bad_alloc&) {
return OpStatus::OUT_OF_MEMORY;
}
}
robj* stream_obj = nullptr;
@ -508,10 +527,26 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const AddOpts&
return OpStatus::OUT_OF_MEMORY;
}
if (opts.max_limit < kuint32max) {
/* Notify xtrim event if needed. */
streamTrimByLength(stream_inst, opts.max_limit, opts.max_limit_approx);
// TODO: when replicating, we should propagate it as exact limit in case of trimming.
if (!opts.limit) {
if (opts.trim_strategy == TrimStrategy::kAddOptsTrimMaxLen) {
/* Notify xtrim event if needed. */
streamTrimByLength(stream_inst, opts.max_len, opts.trim_approx);
// TODO: when replicating, we should propagate it as exact limit in case of trimming.
} else if (opts.trim_strategy == TrimStrategy::kAddOptsTrimMinId) {
streamTrimByID(stream_inst, opts.minid.val, opts.trim_approx);
}
} else {
streamAddTrimArgs add_args = {
.trim_strategy = static_cast<int>(opts.trim_strategy),
.approx_trim = opts.trim_approx,
.limit = opts.limit,
};
if (opts.trim_strategy == TrimStrategy::kAddOptsTrimMaxLen) {
add_args.maxlen = opts.max_len;
} else if (opts.trim_strategy == TrimStrategy::kAddOptsTrimMinId) {
add_args.minid = opts.minid.val;
}
streamTrim(stream_inst, &add_args);
}
return result_id;
}
@ -927,17 +962,38 @@ void StreamFamily::XAdd(CmdArgList args, ConnectionContext* cntx) {
for (; id_indx < args.size(); ++id_indx) {
ToUpper(&args[id_indx]);
string_view arg = ArgS(args, id_indx);
if (arg == "MAXLEN") {
if (arg == "NOMKSTREAM") {
add_opts.no_mkstream = true;
} else if (arg == "MAXLEN" || arg == "MINID") {
if (arg == "MAXLEN") {
add_opts.trim_strategy = TrimStrategy::kAddOptsTrimMaxLen;
} else {
add_opts.trim_strategy = TrimStrategy::kAddOptsTrimMinId;
}
if (id_indx + 2 >= args.size()) {
return (*cntx)->SendError(kSyntaxErr);
}
++id_indx;
if (ArgS(args, id_indx) == "~") {
add_opts.max_limit_approx = true;
add_opts.trim_approx = true;
++id_indx;
}
arg = ArgS(args, id_indx);
if (!absl::SimpleAtoi(arg, &add_opts.max_limit)) {
if (add_opts.trim_strategy == TrimStrategy::kAddOptsTrimMaxLen &&
!absl::SimpleAtoi(arg, &add_opts.max_len)) {
return (*cntx)->SendError(kSyntaxErr);
}
if (add_opts.trim_strategy == TrimStrategy::kAddOptsTrimMinId &&
!ParseID(arg, false, 0, &add_opts.minid)) {
return (*cntx)->SendError(kSyntaxErr);
}
} else if (arg == "LIMIT" && add_opts.trim_strategy != TrimStrategy::kAddOptsTrimNone) {
if (id_indx + 2 >= args.size() || !add_opts.trim_approx) {
return (*cntx)->SendError(kSyntaxErr);
}
++id_indx;
if (!absl::SimpleAtoi(ArgS(args, id_indx), &add_opts.limit)) {
return (*cntx)->SendError(kSyntaxErr);
}
} else {

View file

@ -39,6 +39,12 @@ TEST_F(StreamFamilyTest, Add) {
resp = Run({"xadd", "key", "badid", "f1", "val1"});
EXPECT_THAT(resp, ErrArg("Invalid stream ID"));
resp = Run({"xadd", "key", "nomkstream", "*", "field2", "value2"});
ASSERT_THAT(resp, ArgType(RespExpr::STRING));
resp = Run({"xadd", "noexist", "nomkstream", "*", "field", "value"});
EXPECT_THAT(resp, ErrArg("no such key"));
}
TEST_F(StreamFamilyTest, AddExtended) {
@ -62,6 +68,24 @@ TEST_F(StreamFamilyTest, AddExtended) {
auto resp3 = Run({"xadd", "key", id2, "f1", "val1"});
EXPECT_THAT(resp3, ErrArg("equal or smaller than"));
Run({"xadd", "key2", "5-0", "field", "val"});
Run({"xadd", "key2", "6-0", "field1", "val1"});
Run({"xadd", "key2", "7-0", "field2", "val2"});
auto resp = Run({"xadd", "key2", "minid", "6", "*", "field3", "val3"});
EXPECT_THAT(Run({"xlen", "key2"}), IntArg(3));
EXPECT_THAT(Run({"xrange", "key2", "5-0", "5-0"}), ArrLen(0));
for (int i = 0; i < 700; i++) {
Run({"xadd", "key3", "*", "field", "val"});
}
resp = Run({"xadd", "key3", "maxlen", "~", "500", "*", "field", "val"});
EXPECT_THAT(Run({"xlen", "key3"}), IntArg(501));
for (int i = 0; i < 700; i++) {
Run({"xadd", "key4", "*", "field", "val"});
}
resp = Run({"xadd", "key4", "maxlen", "~", "500", "limit", "100", "*", "field", "val"});
EXPECT_THAT(Run({"xlen", "key4"}), IntArg(601));
}
TEST_F(StreamFamilyTest, Range) {