fix(stream_family): Fix journaling in the XADD and XTRIM commands (#4448)

* fix(stream_family): Fix journaling in the XADD and XTRIM commands

fixes dragonflydb#4202

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

* fix(replication_test): Fix a typo

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

* feat(replication_test): Add test_stream_approximate_trimming test

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

* refactor: address comments

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

* refactor: address comments 2

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

* fix(stream_family): Fix stream id journaling in the XADD command

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>

---------

Signed-off-by: Stepan Bagritsevich <stefan@dragonflydb.io>
This commit is contained in:
Stepan Bagritsevich 2025-02-05 15:43:26 +01:00 committed by GitHub
parent 47a1305ea1
commit 4ba1142cb3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 320 additions and 160 deletions

View file

@ -174,6 +174,10 @@ struct CmdArgParser {
return cur_i_ + i <= args_.size() && !error_; return cur_i_ + i <= args_.size() && !error_;
} }
size_t GetCurrentIndex() const {
return cur_i_;
}
private: private:
template <class T, class... Cases> template <class T, class... Cases>
std::optional<std::decay_t<T>> MapImpl(std::string_view arg, std::string_view tag, T&& value, std::optional<std::decay_t<T>> MapImpl(std::string_view arg, std::string_view tag, T&& value,

View file

@ -125,6 +125,9 @@ typedef struct {
/* Prototypes of exported APIs. */ /* Prototypes of exported APIs. */
// struct client; // struct client;
// Use this to in streamTrimByLength and streamTrimByID
#define NO_TRIM_LIMIT (-1)
/* Flags for streamCreateConsumer */ /* Flags for streamCreateConsumer */
#define SCC_DEFAULT 0 #define SCC_DEFAULT 0
#define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */ #define SCC_NO_NOTIFY (1<<0) /* Do not notify key space if consumer created */
@ -163,9 +166,12 @@ int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_
int streamDeleteItem(stream *s, streamID *id); int streamDeleteItem(stream *s, streamID *id);
void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id); void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_id);
long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id); long long streamEstimateDistanceFromFirstEverEntry(stream *s, streamID *id);
int64_t streamTrim(stream *s, streamAddTrimArgs *args); int64_t streamTrim(stream *s, streamAddTrimArgs *args, streamID *last_id);
int64_t streamTrimByLength(stream *s, long long maxlen, int approx);
int64_t streamTrimByID(stream *s, streamID minid, int approx); // If you don't want to specify a limit, use NO_TRIM_LIMIT
int64_t streamTrimByLength(stream *s, long long maxlen, int approx, streamID *last_id, long long limit);
int64_t streamTrimByID(stream *s, streamID minid, int approx, streamID *last_id, long long limit);
void streamFreeCG(streamCG *cg); void streamFreeCG(streamCG *cg);
void streamDelConsumer(streamCG *cg, streamConsumer *consumer); void streamDelConsumer(streamCG *cg, streamConsumer *consumer);
void streamLastValidID(stream *s, streamID *maxid); void streamLastValidID(stream *s, streamID *maxid);

View file

@ -300,7 +300,7 @@ void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_i
* that should be trimmed, there is a chance we will still have entries with * that should be trimmed, there is a chance we will still have entries with
* IDs < 'id' (or number of elements >= maxlen in case of MAXLEN). * IDs < 'id' (or number of elements >= maxlen in case of MAXLEN).
*/ */
int64_t streamTrim(stream *s, streamAddTrimArgs *args) { int64_t streamTrim(stream *s, streamAddTrimArgs *args, streamID *last_id) {
size_t maxlen = args->maxlen; size_t maxlen = args->maxlen;
streamID *id = &args->minid; streamID *id = &args->minid;
int approx = args->approx_trim; int approx = args->approx_trim;
@ -315,6 +315,8 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
raxSeek(&ri,"^",NULL,0); raxSeek(&ri,"^",NULL,0);
int64_t deleted = 0; int64_t deleted = 0;
streamID last_deleted_id = {0, 0}; // Initialize last deleted ID
while (raxNext(&ri)) { while (raxNext(&ri)) {
if (trim_strategy == TRIM_STRATEGY_MAXLEN && s->length <= maxlen) if (trim_strategy == TRIM_STRATEGY_MAXLEN && s->length <= maxlen)
break; break;
@ -331,16 +333,24 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
streamID master_id = {0}; /* For MINID */ streamID master_id = {0}; /* For MINID */
if (trim_strategy == TRIM_STRATEGY_MAXLEN) { if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
remove_node = s->length - entries >= maxlen; remove_node = s->length - entries >= maxlen;
if (remove_node) {
streamDecodeID(ri.key, &master_id);
// Write last ID to last_deleted_id
lpGetEdgeStreamID(lp, 0, &master_id, &last_deleted_id);
}
} else { } else {
/* Read the master ID from the radix tree key. */ /* Read the master ID from the radix tree key. */
streamDecodeID(ri.key, &master_id); streamDecodeID(ri.key, &master_id);
/* Read last ID. */ /* Read last ID. */
streamID last_id = {0, 0}; streamID last_id = {0, 0};
lpGetEdgeStreamID(lp, 0, &master_id, &last_id); lpGetEdgeStreamID(lp, 0, &master_id, &last_id);
/* We can remove the entire node id its last ID < 'id' */ /* We can remove the entire node id its last ID < 'id' */
remove_node = streamCompareID(&last_id, id) < 0; remove_node = streamCompareID(&last_id, id) < 0;
if (remove_node) {
last_deleted_id = last_id;
}
} }
if (remove_node) { if (remove_node) {
@ -356,6 +366,10 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
* stop here. */ * stop here. */
if (approx) break; if (approx) break;
if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
streamDecodeID(ri.key, &master_id);
}
/* Now we have to trim entries from within 'lp' */ /* Now we have to trim entries from within 'lp' */
int64_t deleted_from_lp = 0; int64_t deleted_from_lp = 0;
@ -386,11 +400,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
int64_t seq_delta = lpGetInteger(p); int64_t seq_delta = lpGetInteger(p);
p = lpNext(lp, p); /* Skip ID seq delta */ p = lpNext(lp, p); /* Skip ID seq delta */
streamID currid = {0}; /* For MINID */ streamID currid = {master_id.ms + ms_delta, master_id.seq + seq_delta};
if (trim_strategy == TRIM_STRATEGY_MINID) {
currid.ms = master_id.ms + ms_delta;
currid.seq = master_id.seq + seq_delta;
}
int stop; int stop;
if (trim_strategy == TRIM_STRATEGY_MAXLEN) { if (trim_strategy == TRIM_STRATEGY_MAXLEN) {
@ -422,6 +432,7 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
deleted_from_lp++; deleted_from_lp++;
s->length--; s->length--;
p = lp + delta; p = lp + delta;
last_deleted_id = currid;
} }
} }
deleted += deleted_from_lp; deleted += deleted_from_lp;
@ -458,29 +469,42 @@ int64_t streamTrim(stream *s, streamAddTrimArgs *args) {
streamGetEdgeID(s,1,1,&s->first_id); streamGetEdgeID(s,1,1,&s->first_id);
} }
/* Set the last deleted ID, if applicable. */
if (last_id) {
*last_id = last_deleted_id;
}
return deleted; return deleted;
} }
/* Trims a stream by length. Returns the number of deleted items. */ /* Trims a stream by length. Returns the number of deleted items. */
int64_t streamTrimByLength(stream *s, long long maxlen, int approx) { int64_t streamTrimByLength(stream *s, long long maxlen, int approx, streamID *last_id, long long limit) {
if (limit == NO_TRIM_LIMIT) {
limit = approx ? 100 * server.stream_node_max_entries : 0;
}
streamAddTrimArgs args = { streamAddTrimArgs args = {
.trim_strategy = TRIM_STRATEGY_MAXLEN, .trim_strategy = TRIM_STRATEGY_MAXLEN,
.approx_trim = approx, .approx_trim = approx,
.limit = approx ? 100 * server.stream_node_max_entries : 0, .limit = limit,
.maxlen = maxlen .maxlen = maxlen
}; };
return streamTrim(s, &args); return streamTrim(s, &args, last_id);
} }
/* Trims a stream by minimum ID. Returns the number of deleted items. */ /* Trims a stream by minimum ID. Returns the number of deleted items. */
int64_t streamTrimByID(stream *s, streamID minid, int approx) { int64_t streamTrimByID(stream *s, streamID minid, int approx, streamID *last_id, long long limit) {
if (limit == NO_TRIM_LIMIT) {
limit = approx ? 100 * server.stream_node_max_entries : 0;
}
streamAddTrimArgs args = { streamAddTrimArgs args = {
.trim_strategy = TRIM_STRATEGY_MINID, .trim_strategy = TRIM_STRATEGY_MINID,
.approx_trim = approx, .approx_trim = approx,
.limit = approx ? 100 * server.stream_node_max_entries : 0, .limit = limit,
.minid = minid .minid = minid
}; };
return streamTrim(s, &args); return streamTrim(s, &args, last_id);
} }
/* Initialize the stream iterator, so that we can call iterating functions /* Initialize the stream iterator, so that we can call iterating functions

View file

@ -48,6 +48,14 @@ struct Record {
using RecordVec = vector<Record>; using RecordVec = vector<Record>;
using nonstd::make_unexpected;
template <typename T> using ParseResult = io::Result<T, ErrorReply>;
nonstd::unexpected_type<ErrorReply> CreateSyntaxError(std::string_view message) {
return make_unexpected(ErrorReply{message, kSyntaxErrType});
}
struct ParsedStreamId { struct ParsedStreamId {
streamID val; streamID val;
@ -66,25 +74,43 @@ struct RangeId {
bool exclude = false; bool exclude = false;
}; };
enum class TrimStrategy { struct TrimOpts {
kNone = TRIM_STRATEGY_NONE, bool IsMaxLen() const {
kMaxLen = TRIM_STRATEGY_MAXLEN, return std::holds_alternative<uint32_t>(length_or_id);
kMinId = TRIM_STRATEGY_MINID, }
uint32_t AsMaxLen() const {
return std::get<uint32_t>(length_or_id);
}
const ParsedStreamId& AsMinId() const {
return std::get<ParsedStreamId>(length_or_id);
}
// First is MaxLen, second is MinId.
std::variant<uint32_t, ParsedStreamId> length_or_id;
int32_t limit = NO_TRIM_LIMIT;
bool approx = false;
}; };
struct AddTrimOpts { struct AddOpts {
string_view key; std::optional<TrimOpts> trim_opts;
ParsedStreamId minid;
uint32_t max_len = kuint32max;
uint32_t limit = 0;
TrimStrategy trim_strategy = TrimStrategy::kNone;
bool trim_approx = false;
// XADD only.
ParsedStreamId parsed_id; ParsedStreamId parsed_id;
bool no_mkstream = false; bool no_mkstream = false;
}; };
/* Used to journal the XADD command.
The actual stream ID assigned after adding may differ from the one specified in the command.
So, for the replica, we need to specify the exact ID that was actually added. */
struct AddArgsJournaler {
void SetStreamId(std::string_view stream_id) {
add_args[stream_id_index] = stream_id;
}
CmdArgVec add_args;
size_t stream_id_index;
};
struct NACKInfo { struct NACKInfo {
streamID pel_id; streamID pel_id;
string consumer_name; string consumer_name;
@ -166,6 +192,8 @@ struct ReadOpts {
bool noack = false; bool noack = false;
}; };
const char kTrimOptionConflictErr[] =
"MAXLEN and MINID options at the same time are not compatible";
const char kInvalidStreamId[] = "Invalid stream ID specified as stream command argument"; const char kInvalidStreamId[] = "Invalid stream ID specified as stream command argument";
const char kXGroupKeyNotFound[] = const char kXGroupKeyNotFound[] =
"The XGROUP subcommand requires the key to exist. " "The XGROUP subcommand requires the key to exist. "
@ -599,45 +627,44 @@ streamNACK* StreamCreateNACK(streamConsumer* consumer, uint64_t now_ms) {
return nack; return nack;
} }
int StreamTrim(const AddTrimOpts& opts, stream* s) { std::string StreamsIdToString(streamID id) {
if (!opts.limit) { return absl::StrCat(id.ms, "-", id.seq);
if (opts.trim_strategy == TrimStrategy::kMaxLen) {
/* Notify xtrim event if needed. */
return streamTrimByLength(s, opts.max_len, opts.trim_approx);
// TODO: when replicating, we should propagate it as exact limit in case of trimming.
} else if (opts.trim_strategy == TrimStrategy::kMinId) {
return streamTrimByID(s, opts.minid.val, opts.trim_approx);
}
} else {
streamAddTrimArgs trim_args = {};
trim_args.trim_strategy = static_cast<int>(opts.trim_strategy);
trim_args.approx_trim = opts.trim_approx;
trim_args.limit = opts.limit;
if (opts.trim_strategy == TrimStrategy::kMaxLen) {
trim_args.maxlen = opts.max_len;
} else if (opts.trim_strategy == TrimStrategy::kMinId) {
trim_args.minid = opts.minid.val;
}
return streamTrim(s, &trim_args);
}
return 0;
} }
OpResult<streamID> OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgList args) { /* The first value represents the number of deleted items, and the second is the last trimmed ID. */
DCHECK(!args.empty() && args.size() % 2 == 0); std::pair<int64_t, streamID> TrimStream(const TrimOpts& opts, stream* s) {
auto& db_slice = op_args.GetDbSlice(); streamID last_id = {0, 0};
DbSlice::AddOrFindResult add_res;
if (opts.no_mkstream) { auto trim = [&]() {
auto res_it = db_slice.FindMutable(op_args.db_cntx, opts.key, OBJ_STREAM); if (opts.IsMaxLen()) {
if (!res_it) { return streamTrimByLength(s, opts.AsMaxLen(), opts.approx, &last_id, opts.limit);
return res_it.status(); } else {
const auto& min_id = opts.AsMinId().val;
return streamTrimByID(s, min_id, opts.approx, &last_id, opts.limit);
} }
};
const int64_t deleted_items_number = trim();
return {deleted_items_number, last_id};
}
bool JournalAsMinId(const TrimOpts& opts) {
return opts.approx || opts.IsMaxLen();
}
OpResult<streamID> OpAdd(const OpArgs& op_args, string_view key, const AddOpts& opts,
CmdArgList args, AddArgsJournaler journaler) {
DCHECK(!args.empty() && args.size() % 2 == 0);
auto& db_slice = op_args.GetDbSlice();
DbSlice::AddOrFindResult add_res;
if (opts.no_mkstream) {
auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM);
RETURN_ON_BAD_STATUS(res_it);
add_res = std::move(*res_it); add_res = std::move(*res_it);
} else { } else {
auto op_res = db_slice.AddOrFind(op_args.db_cntx, opts.key); auto op_res = db_slice.AddOrFind(op_args.db_cntx, key);
RETURN_ON_BAD_STATUS(op_res); RETURN_ON_BAD_STATUS(op_res);
add_res = std::move(*op_res); add_res = std::move(*op_res);
} }
@ -670,13 +697,42 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgL
return OpStatus::OUT_OF_MEMORY; return OpStatus::OUT_OF_MEMORY;
} }
StreamTrim(opts, stream_inst); std::pair<int64_t, streamID> trim_result{};
if (opts.trim_opts) {
trim_result = TrimStream(opts.trim_opts.value(), stream_inst);
}
mem_tracker.UpdateStreamSize(it->second); mem_tracker.UpdateStreamSize(it->second);
if (op_args.shard->journal()) {
std::string result_id_as_string = StreamsIdToString(result_id);
if (opts.trim_opts && JournalAsMinId(opts.trim_opts.value())) {
// We need to set exact MinId in the journal.
std::string last_id = StreamsIdToString(trim_result.second);
CmdArgVec journal_args = {key, "MINID"sv, "="sv, last_id};
journal_args.reserve(args.size() + 4);
if (opts.no_mkstream) {
journal_args.push_back("NOMKSTREAM"sv);
}
journal_args.push_back(result_id_as_string);
for (size_t i = 0; i < args.size(); i++) {
journal_args.push_back(args[i]);
}
RecordJournal(op_args, "XADD"sv, journal_args);
} else {
journaler.SetStreamId(result_id_as_string);
RecordJournal(op_args, "XADD"sv, journaler.add_args);
}
}
auto blocking_controller = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id()); auto blocking_controller = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id());
if (blocking_controller) { if (blocking_controller) {
blocking_controller->AwakeWatched(op_args.db_cntx.db_index, opts.key); blocking_controller->AwakeWatched(op_args.db_cntx.db_index, key);
} }
return result_id; return result_id;
@ -1934,9 +1990,10 @@ void XGroupHelp(CmdArgList args, const CommandContext& cmd_cntx) {
return rb->SendSimpleStrArr(help_arr); return rb->SendSimpleStrArr(help_arr);
} }
OpResult<int64_t> OpTrim(const OpArgs& op_args, const AddTrimOpts& opts) { OpResult<int64_t> OpTrim(const OpArgs& op_args, std::string_view key, const TrimOpts& opts,
bool journal_as_minid) {
auto& db_slice = op_args.GetDbSlice(); auto& db_slice = op_args.GetDbSlice();
auto res_it = db_slice.FindMutable(op_args.db_cntx, opts.key, OBJ_STREAM); auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM);
if (!res_it) { if (!res_it) {
if (res_it.status() == OpStatus::KEY_NOTFOUND) { if (res_it.status() == OpStatus::KEY_NOTFOUND) {
return 0; return 0;
@ -1949,77 +2006,93 @@ OpResult<int64_t> OpTrim(const OpArgs& op_args, const AddTrimOpts& opts) {
CompactObj& cobj = res_it->it->second; CompactObj& cobj = res_it->it->second;
stream* s = (stream*)cobj.RObjPtr(); stream* s = (stream*)cobj.RObjPtr();
auto res = StreamTrim(opts, s); auto res = TrimStream(opts, s);
mem_tracker.UpdateStreamSize(cobj); mem_tracker.UpdateStreamSize(cobj);
if (op_args.shard->journal() && journal_as_minid) {
std::string last_id = StreamsIdToString(res.second);
RecordJournal(op_args, "XTRIM"sv, ArgSlice{key, "MINID"sv, "="sv, last_id});
}
return res.first;
}
ParseResult<TrimOpts> ParseTrimOpts(bool max_len, CmdArgParser* parser) {
TrimOpts opts;
opts.approx = parser->Check("~");
if (!opts.approx) {
parser->Check("=");
}
if (max_len) {
opts.length_or_id = parser->Next<uint32_t>();
} else {
ParsedStreamId parsed_id;
if (!ParseID(parser->Next(), false, 0, &parsed_id)) {
return CreateSyntaxError(kSyntaxErr);
}
opts.length_or_id = parsed_id; // trivial copy
}
if (parser->Check("LIMIT")) {
if (!opts.approx) {
return CreateSyntaxError(kSyntaxErr);
}
opts.limit = parser->Next<uint32_t>();
}
return opts;
}
ParseResult<TrimOpts> ParseTrimOpts(CmdArgParser* parser) {
bool max_len = parser->Check("MAXLEN");
if (!max_len) {
parser->ExpectTag("MINID");
}
auto res = ParseTrimOpts(max_len, parser);
if (parser->Check("MAXLEN") || parser->Check("MINID")) {
return CreateSyntaxError(kTrimOptionConflictErr);
}
return res; return res;
} }
optional<pair<AddTrimOpts, unsigned>> ParseAddOrTrimArgsOrReply(CmdArgList args, bool is_xadd, ParseResult<AddOpts> ParseAddOpts(CmdArgParser* parser) {
SinkReplyBuilder* builder) { AddOpts opts;
AddTrimOpts opts; while (parser->HasNext()) {
opts.key = ArgS(args, 0); if (parser->Check("NOMKSTREAM")) {
unsigned id_indx = 1;
for (; id_indx < args.size(); ++id_indx) {
string arg = absl::AsciiStrToUpper(ArgS(args, id_indx));
size_t remaining_args = args.size() - id_indx - 1;
if (is_xadd && arg == "NOMKSTREAM") {
opts.no_mkstream = true; opts.no_mkstream = true;
} else if ((arg == "MAXLEN" || arg == "MINID") && remaining_args >= 1) { continue;
if (opts.trim_strategy != TrimStrategy::kNone) { }
builder->SendError("MAXLEN and MINID options at the same time are not compatible",
kSyntaxErr); bool max_len = parser->Check("MAXLEN");
return std::nullopt; if (max_len || parser->Check("MINID")) {
if (opts.trim_opts) {
return CreateSyntaxError(kTrimOptionConflictErr);
} }
if (arg == "MAXLEN") { auto trim_opts = ParseTrimOpts(max_len, parser);
opts.trim_strategy = TrimStrategy::kMaxLen; if (!trim_opts) {
} else { return make_unexpected(trim_opts.error());
opts.trim_strategy = TrimStrategy::kMinId;
} }
id_indx++; opts.trim_opts = trim_opts.value(); // trivial copy
arg = ArgS(args, id_indx);
if (remaining_args >= 2 && arg == "~") {
opts.trim_approx = true;
id_indx++;
arg = ArgS(args, id_indx);
} else if (remaining_args >= 2 && arg == "=") {
opts.trim_approx = false;
id_indx++;
arg = ArgS(args, id_indx);
}
if (opts.trim_strategy == TrimStrategy::kMaxLen && !absl::SimpleAtoi(arg, &opts.max_len)) {
builder->SendError(kInvalidIntErr);
return std::nullopt;
}
if (opts.trim_strategy == TrimStrategy::kMinId && !ParseID(arg, false, 0, &opts.minid)) {
builder->SendError(kSyntaxErr);
return std::nullopt;
}
} else if (arg == "LIMIT" && remaining_args >= 1 && opts.trim_strategy != TrimStrategy::kNone) {
if (!opts.trim_approx) {
builder->SendError(kSyntaxErr);
return std::nullopt;
}
++id_indx;
if (!absl::SimpleAtoi(ArgS(args, id_indx), &opts.limit)) {
builder->SendError(kSyntaxErr);
return std::nullopt;
}
} else if (is_xadd) {
// There are still remaining field args.
break;
} else { } else {
builder->SendError(kSyntaxErr); // It is StreamId
return std::nullopt; std::string_view id = parser->Next();
if (!ParseID(id, true, 0, &opts.parsed_id)) {
return CreateSyntaxError(kInvalidStreamId);
}
break;
} }
} }
return make_pair(opts, id_indx); return opts;
} }
struct StreamReplies { struct StreamReplies {
@ -2547,46 +2620,47 @@ bool ParseXpendingOptions(CmdArgList& args, PendingOpts& opts, SinkReplyBuilder*
} // namespace } // namespace
void StreamFamily::XAdd(CmdArgList args, const CommandContext& cmd_cntx) { void StreamFamily::XAdd(CmdArgList args, const CommandContext& cmd_cntx) {
auto parse_resp = ParseAddOrTrimArgsOrReply(args, true, cmd_cntx.rb); CmdArgParser parser{args};
if (!parse_resp) { auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
string_view key = parser.Next();
auto parsed_add_opts = ParseAddOpts(&parser);
if (auto err = parser.Error(); err || !parsed_add_opts) {
rb->SendError(!parsed_add_opts ? parsed_add_opts.error() : err->MakeReply());
return; return;
} }
auto add_opts = parse_resp->first; // Save the index of the stream ID in the arguments list.
auto id_indx = parse_resp->second; // We need this during journaling
// It is (parser.GetCurrentIndex() - 1) because the stream id is the last parsed argument in the
// ParseAddOpts
const size_t stream_id_index_in_args = parser.GetCurrentIndex() - 1;
AddArgsJournaler journaler{{args.begin(), args.end()}, stream_id_index_in_args};
args.remove_prefix(id_indx); CmdArgList fields = parser.Tail();
if (args.size() < 2 || args.size() % 2 == 0) { if (fields.empty() || fields.size() % 2 != 0) {
return cmd_cntx.rb->SendError(WrongNumArgsError("XADD"), kSyntaxErrType); return rb->SendError(WrongNumArgsError("XADD"), kSyntaxErrType);
} }
string_view id = ArgS(args, 0);
if (!ParseID(id, true, 0, &add_opts.parsed_id)) {
return cmd_cntx.rb->SendError(kInvalidStreamId, kSyntaxErrType);
}
args.remove_prefix(1);
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return OpAdd(t->GetOpArgs(shard), add_opts, args); return OpAdd(t->GetOpArgs(shard), key, parsed_add_opts.value(), fields, journaler);
}; };
OpResult<streamID> add_result = cmd_cntx.tx->ScheduleSingleHopT(cb); OpResult<streamID> add_result = cmd_cntx.tx->ScheduleSingleHopT(cb);
auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (add_result) { if (add_result) {
return rb->SendBulkString(StreamIdRepr(*add_result)); rb->SendBulkString(StreamIdRepr(*add_result));
} else {
if (add_result == OpStatus::KEY_NOTFOUND) {
rb->SendNull();
} else if (add_result == OpStatus::STREAM_ID_SMALL) {
rb->SendError(LeqTopIdError("XADD"));
} else {
rb->SendError(add_result.status());
}
} }
if (add_result == OpStatus::KEY_NOTFOUND) {
return rb->SendNull();
}
if (add_result.status() == OpStatus::STREAM_ID_SMALL) {
return cmd_cntx.rb->SendError(LeqTopIdError("XADD"));
}
return cmd_cntx.rb->SendError(add_result.status());
} }
absl::InlinedVector<streamID, 8> GetXclaimIds(CmdArgList& args) { absl::InlinedVector<streamID, 8> GetXclaimIds(CmdArgList& args) {
@ -3173,22 +3247,35 @@ void StreamFamily::XSetId(CmdArgList args, const CommandContext& cmd_cntx) {
} }
void StreamFamily::XTrim(CmdArgList args, const CommandContext& cmd_cntx) { void StreamFamily::XTrim(CmdArgList args, const CommandContext& cmd_cntx) {
auto parse_resp = ParseAddOrTrimArgsOrReply(args, true, cmd_cntx.rb); CmdArgParser parser{args};
if (!parse_resp) { auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
std::string_view key = parser.Next();
auto parsed_trim_opts = ParseTrimOpts(&parser);
if (!parsed_trim_opts || !parser.Finalize()) {
rb->SendError(!parsed_trim_opts ? parsed_trim_opts.error() : parser.Error()->MakeReply());
return; return;
} }
auto trim_opts = parse_resp->first; auto& trim_opts = parsed_trim_opts.value();
// We can auto-journal if we are not trimming approximately or by maxlen
const bool enable_auto_journaling = !JournalAsMinId(trim_opts);
if (enable_auto_journaling) {
cmd_cntx.tx->ReviveAutoJournal();
}
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return OpTrim(t->GetOpArgs(shard), trim_opts); return OpTrim(t->GetOpArgs(shard), key, trim_opts, !enable_auto_journaling);
}; };
OpResult<int64_t> trim_result = cmd_cntx.tx->ScheduleSingleHopT(cb); OpResult<int64_t> trim_result = cmd_cntx.tx->ScheduleSingleHopT(cb);
if (trim_result) { if (trim_result) {
return cmd_cntx.rb->SendLong(*trim_result); rb->SendLong(*trim_result);
} else {
rb->SendError(trim_result.status());
} }
return cmd_cntx.rb->SendError(trim_result.status());
} }
void StreamFamily::XAck(CmdArgList args, const CommandContext& cmd_cntx) { void StreamFamily::XAck(CmdArgList args, const CommandContext& cmd_cntx) {
@ -3315,7 +3402,9 @@ void StreamFamily::Register(CommandRegistry* registry) {
using CI = CommandId; using CI = CommandId;
registry->StartFamily(); registry->StartFamily();
constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::VARIADIC_KEYS; constexpr auto kReadFlags = CO::READONLY | CO::BLOCKING | CO::VARIADIC_KEYS;
*registry << CI{"XADD", CO::WRITE | CO::DENYOOM | CO::FAST, -5, 1, 1, acl::kXAdd}.HFUNC(XAdd) *registry << CI{"XADD", CO::WRITE | CO::DENYOOM | CO::FAST | CO::NO_AUTOJOURNAL, -5, 1, 1,
acl::kXAdd}
.HFUNC(XAdd)
<< CI{"XCLAIM", CO::WRITE | CO::FAST, -6, 1, 1, acl::kXClaim}.HFUNC(XClaim) << CI{"XCLAIM", CO::WRITE | CO::FAST, -6, 1, 1, acl::kXClaim}.HFUNC(XClaim)
<< CI{"XDEL", CO::WRITE | CO::FAST, -3, 1, 1, acl::kXDel}.HFUNC(XDel) << CI{"XDEL", CO::WRITE | CO::FAST, -3, 1, 1, acl::kXDel}.HFUNC(XDel)
<< CI{"XGROUP", CO::WRITE | CO::DENYOOM, -3, 2, 2, acl::kXGroup}.HFUNC(XGroup) << CI{"XGROUP", CO::WRITE | CO::DENYOOM, -3, 2, 2, acl::kXGroup}.HFUNC(XGroup)
@ -3327,7 +3416,8 @@ void StreamFamily::Register(CommandRegistry* registry) {
<< CI{"XREAD", kReadFlags, -3, 3, 3, acl::kXRead}.HFUNC(XRead) << CI{"XREAD", kReadFlags, -3, 3, 3, acl::kXRead}.HFUNC(XRead)
<< CI{"XREADGROUP", kReadFlags, -6, 6, 6, acl::kXReadGroup}.HFUNC(XReadGroup) << CI{"XREADGROUP", kReadFlags, -6, 6, 6, acl::kXReadGroup}.HFUNC(XReadGroup)
<< CI{"XSETID", CO::WRITE, 3, 1, 1, acl::kXSetId}.HFUNC(XSetId) << CI{"XSETID", CO::WRITE, 3, 1, 1, acl::kXSetId}.HFUNC(XSetId)
<< CI{"XTRIM", CO::WRITE | CO::FAST, -4, 1, 1, acl::kXTrim}.HFUNC(XTrim) << CI{"XTRIM", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, -4, 1, 1, acl::kXTrim}.HFUNC(
XTrim)
<< CI{"_XGROUP_HELP", CO::NOSCRIPT | CO::HIDDEN, 2, 0, 0, acl::kXGroupHelp}.SetHandler( << CI{"_XGROUP_HELP", CO::NOSCRIPT | CO::HIDDEN, 2, 0, 0, acl::kXGroupHelp}.SetHandler(
XGroupHelp) XGroupHelp)
<< CI{"XACK", CO::WRITE | CO::FAST, -4, 1, 1, acl::kXAck}.HFUNC(XAck) << CI{"XACK", CO::WRITE | CO::FAST, -4, 1, 1, acl::kXAck}.HFUNC(XAck)

View file

@ -693,7 +693,7 @@ TEST_F(StreamFamilyTest, XTrimInvalidArgs) {
// Invalid limit. // Invalid limit.
resp = Run({"xtrim", "foo", "maxlen", "~", "2", "limit", "nan"}); resp = Run({"xtrim", "foo", "maxlen", "~", "2", "limit", "nan"});
EXPECT_THAT(resp, ErrArg("syntax error")); EXPECT_THAT(resp, ErrArg("value is not an integer or out of range"));
} }
TEST_F(StreamFamilyTest, XPending) { TEST_F(StreamFamilyTest, XPending) {
Run({"xadd", "foo", "1-0", "k1", "v1"}); Run({"xadd", "foo", "1-0", "k1", "v1"});

View file

@ -2715,7 +2715,7 @@ async def test_big_containers(df_factory, element_size, elements_number):
logging.info(f"Replica Used memory {replica_used_memory}, peak memory {replica_peak_memory}") logging.info(f"Replica Used memory {replica_used_memory}, peak memory {replica_peak_memory}")
assert replica_peak_memory < 1.1 * replica_used_memory assert replica_peak_memory < 1.1 * replica_used_memory
# Check replica data consisten # Check replica data consistent
replica_data = await StaticSeeder.capture(c_replica) replica_data = await StaticSeeder.capture(c_replica)
master_data = await StaticSeeder.capture(c_master) master_data = await StaticSeeder.capture(c_master)
assert master_data == replica_data assert master_data == replica_data
@ -2734,3 +2734,39 @@ async def test_master_too_big(df_factory):
# We should never sync due to used memory too high during full sync # We should never sync due to used memory too high during full sync
with pytest.raises(TimeoutError): with pytest.raises(TimeoutError):
await wait_available_async(c_replica, timeout=10) await wait_available_async(c_replica, timeout=10)
@dfly_args({"proactor_threads": 4})
async def test_stream_approximate_trimming(df_factory):
master = df_factory.create()
replica = df_factory.create()
df_factory.start_all([master, replica])
c_master = master.client()
c_replica = replica.client()
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_for_replicas_state(c_replica)
# Step 1: Populate master with 100 streams, each containing 200 entries
num_streams = 100
entries_per_stream = 200
for i in range(num_streams):
stream_name = f"stream{i}"
for j in range(entries_per_stream):
await c_master.execute_command("XADD", stream_name, "*", f"field{j}", f"value{j}")
# Step 2: Trim each stream to a random size between 70 and 200
for i in range(num_streams):
stream_name = f"stream{i}"
trim_size = random.randint(70, entries_per_stream)
await c_master.execute_command("XTRIM", stream_name, "MAXLEN", "~", trim_size)
# Wait for replica sync
await asyncio.sleep(1)
# Check replica data consistent
master_data = await StaticSeeder.capture(c_master)
replica_data = await StaticSeeder.capture(c_replica)
assert master_data == replica_data