mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
fix: support XREAD ... STREAMS ... keys derivation (#1250)
This commit is contained in:
parent
cc0e264ed0
commit
9658eab036
3 changed files with 23 additions and 3 deletions
|
@ -618,7 +618,7 @@ vector<RecordVec> OpRead(const OpArgs& op_args, const ArgSlice& args, const Read
|
||||||
|
|
||||||
vector<RecordVec> response(args.size());
|
vector<RecordVec> response(args.size());
|
||||||
for (size_t i = 0; i < args.size(); ++i) {
|
for (size_t i = 0; i < args.size(); ++i) {
|
||||||
const string_view key = args[i];
|
string_view key = args[i];
|
||||||
|
|
||||||
range_opts.start = opts.stream_ids.at(key);
|
range_opts.start = opts.stream_ids.at(key);
|
||||||
|
|
||||||
|
@ -1446,7 +1446,8 @@ void StreamFamily::Register(CommandRegistry* registry) {
|
||||||
// Therefore the command has format:
|
// Therefore the command has format:
|
||||||
// XREAD COUNT <count> STREAMS <stream1> <stream2> <id1> <id2>
|
// XREAD COUNT <count> STREAMS <stream1> <stream2> <id1> <id2>
|
||||||
// Where the keys are <stream1> and <stream2>.
|
// Where the keys are <stream1> and <stream2>.
|
||||||
<< CI{"XREAD", CO::READONLY | CO::REVERSE_MAPPING, -4, 4, 5, 1}.HFUNC(XRead)
|
<< CI{"XREAD", CO::READONLY | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS, -3, 3, 3, 1}
|
||||||
|
.HFUNC(XRead)
|
||||||
<< CI{"XSETID", CO::WRITE | CO::DENYOOM, 3, 1, 1, 1}.HFUNC(XSetId)
|
<< CI{"XSETID", CO::WRITE | CO::DENYOOM, 3, 1, 1, 1}.HFUNC(XSetId)
|
||||||
<< CI{"_XGROUP_HELP", CO::NOSCRIPT | CO::HIDDEN, 2, 0, 0, 0}.SetHandler(XGroupHelp);
|
<< CI{"_XGROUP_HELP", CO::NOSCRIPT | CO::HIDDEN, 2, 0, 0, 0}.SetHandler(XGroupHelp);
|
||||||
}
|
}
|
||||||
|
|
|
@ -170,7 +170,7 @@ TEST_F(StreamFamilyTest, XReadInvalidArgs) {
|
||||||
|
|
||||||
// Missing STREAMS.
|
// Missing STREAMS.
|
||||||
resp = Run({"xread", "count", "5"});
|
resp = Run({"xread", "count", "5"});
|
||||||
EXPECT_THAT(resp, ErrArg("wrong number of arguments for 'xread' command"));
|
EXPECT_THAT(resp, ErrArg("syntax error"));
|
||||||
|
|
||||||
// Unbalanced list of streams.
|
// Unbalanced list of streams.
|
||||||
resp = Run({"xread", "count", "invalid", "streams", "s1", "s2", "s3", "0", "0"});
|
resp = Run({"xread", "count", "invalid", "streams", "s1", "s2", "s3", "0", "0"});
|
||||||
|
|
|
@ -1389,12 +1389,31 @@ OpResult<KeyIndex> DetermineKeys(const CommandId* cid, CmdArgList args) {
|
||||||
if (cid->opt_mask() & CO::VARIADIC_KEYS) {
|
if (cid->opt_mask() & CO::VARIADIC_KEYS) {
|
||||||
// ZUNION/INTER <num_keys> <key1> [<key2> ...]
|
// ZUNION/INTER <num_keys> <key1> [<key2> ...]
|
||||||
// EVAL <script> <num_keys>
|
// EVAL <script> <num_keys>
|
||||||
|
// XREAD ... STREAMS ...
|
||||||
if (args.size() < 2) {
|
if (args.size() < 2) {
|
||||||
return OpStatus::SYNTAX_ERR;
|
return OpStatus::SYNTAX_ERR;
|
||||||
}
|
}
|
||||||
|
|
||||||
string_view name{cid->name()};
|
string_view name{cid->name()};
|
||||||
|
|
||||||
|
if (name == "XREAD") {
|
||||||
|
for (size_t i = 0; i < args.size(); ++i) {
|
||||||
|
string_view arg = ArgS(args, i);
|
||||||
|
if (absl::EqualsIgnoreCase(arg, "STREAMS")) {
|
||||||
|
size_t left = args.size() - i - 1;
|
||||||
|
if (left < 2 || left % 2 != 0)
|
||||||
|
return OpStatus::SYNTAX_ERR;
|
||||||
|
|
||||||
|
key_index.start = i + 1;
|
||||||
|
key_index.end = key_index.start + (left / 2);
|
||||||
|
key_index.step = 1;
|
||||||
|
|
||||||
|
return key_index;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return OpStatus::SYNTAX_ERR;
|
||||||
|
}
|
||||||
|
|
||||||
if (absl::EndsWith(name, "STORE"))
|
if (absl::EndsWith(name, "STORE"))
|
||||||
key_index.bonus = 0; // Z<xxx>STORE <key> commands
|
key_index.bonus = 0; // Z<xxx>STORE <key> commands
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue