chore: update command interface for hset/set families (#4209)

This commit is contained in:
Roman Gershman 2024-11-27 16:00:30 +02:00 committed by GitHub
parent bd143e4b81
commit 57fd5f16a7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 189 additions and 196 deletions

View file

@ -745,7 +745,7 @@ OpResult<vector<long>> OpHExpire(const OpArgs& op_args, string_view key, uint32_
} }
// HSETEX key [NX] tll_sec field value field value ... // HSETEX key [NX] tll_sec field value field value ...
void HSetEx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) { void HSetEx(CmdArgList args, const CommandContext& cmd_cntx) {
CmdArgParser parser{args}; CmdArgParser parser{args};
string_view key = parser.Next(); string_view key = parser.Next();
@ -757,13 +757,14 @@ void HSetEx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, Connect
constexpr uint32_t kMaxTtl = (1UL << 26); constexpr uint32_t kMaxTtl = (1UL << 26);
if (!absl::SimpleAtoi(ttl_str, &ttl_sec) || ttl_sec == 0 || ttl_sec > kMaxTtl) { if (!absl::SimpleAtoi(ttl_str, &ttl_sec) || ttl_sec == 0 || ttl_sec > kMaxTtl) {
return builder->SendError(kInvalidIntErr); return cmd_cntx.rb->SendError(kInvalidIntErr);
} }
CmdArgList fields = parser.Tail(); CmdArgList fields = parser.Tail();
if (fields.size() % 2 != 0) { if (fields.size() % 2 != 0) {
return builder->SendError(facade::WrongNumArgsError(cntx->cid->name()), kSyntaxErrType); return cmd_cntx.rb->SendError(facade::WrongNumArgsError(cmd_cntx.conn_cntx->cid->name()),
kSyntaxErrType);
} }
OpSetParams op_sp{skip_if_exists, ttl_sec}; OpSetParams op_sp{skip_if_exists, ttl_sec};
@ -772,17 +773,17 @@ void HSetEx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, Connect
return OpSet(t->GetOpArgs(shard), key, fields, op_sp); return OpSet(t->GetOpArgs(shard), key, fields, op_sp);
}; };
OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<uint32_t> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
builder->SendLong(*result); cmd_cntx.rb->SendLong(*result);
} else { } else {
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
} }
} // namespace } // namespace
void HSetFamily::HDel(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void HSetFamily::HDel(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
args.remove_prefix(1); args.remove_prefix(1);
@ -790,28 +791,28 @@ void HSetFamily::HDel(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde
return OpDel(t->GetOpArgs(shard), key, args); return OpDel(t->GetOpArgs(shard), key, args);
}; };
OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<uint32_t> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result || result.status() == OpStatus::KEY_NOTFOUND) { if (result || result.status() == OpStatus::KEY_NOTFOUND) {
builder->SendLong(*result); cmd_cntx.rb->SendLong(*result);
} else { } else {
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
} }
void HSetFamily::HLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void HSetFamily::HLen(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
auto cb = [&](Transaction* t, EngineShard* shard) { return OpLen(t->GetOpArgs(shard), key); }; auto cb = [&](Transaction* t, EngineShard* shard) { return OpLen(t->GetOpArgs(shard), key); };
OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<uint32_t> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
builder->SendLong(*result); cmd_cntx.rb->SendLong(*result);
} else { } else {
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
} }
void HSetFamily::HExists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void HSetFamily::HExists(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view field = ArgS(args, 1); string_view field = ArgS(args, 1);
@ -819,46 +820,46 @@ void HSetFamily::HExists(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
return OpExist(t->GetOpArgs(shard), key, field); return OpExist(t->GetOpArgs(shard), key, field);
}; };
OpResult<int> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<int> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
builder->SendLong(*result); cmd_cntx.rb->SendLong(*result);
} else { } else {
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
} }
void HSetFamily::HExpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void HSetFamily::HExpire(CmdArgList args, const CommandContext& cmd_cntx) {
CmdArgParser parser{args}; CmdArgParser parser{args};
string_view key = parser.Next(); string_view key = parser.Next();
string_view ttl_str = parser.Next(); string_view ttl_str = parser.Next();
uint32_t ttl_sec; uint32_t ttl_sec;
constexpr uint32_t kMaxTtl = (1UL << 26); constexpr uint32_t kMaxTtl = (1UL << 26);
if (!absl::SimpleAtoi(ttl_str, &ttl_sec) || ttl_sec == 0 || ttl_sec > kMaxTtl) { if (!absl::SimpleAtoi(ttl_str, &ttl_sec) || ttl_sec == 0 || ttl_sec > kMaxTtl) {
return builder->SendError(kInvalidIntErr); return cmd_cntx.rb->SendError(kInvalidIntErr);
} }
if (!static_cast<bool>(parser.Check("FIELDS"sv))) { if (!static_cast<bool>(parser.Check("FIELDS"sv))) {
return builder->SendError("Mandatory argument FIELDS is missing or not at the right position", return cmd_cntx.rb->SendError(
kSyntaxErrType); "Mandatory argument FIELDS is missing or not at the right position", kSyntaxErrType);
} }
string_view numFieldsStr = parser.Next(); string_view numFieldsStr = parser.Next();
uint32_t numFields; uint32_t numFields;
if (!absl::SimpleAtoi(numFieldsStr, &numFields) || numFields == 0) { if (!absl::SimpleAtoi(numFieldsStr, &numFields) || numFields == 0) {
return builder->SendError(kInvalidIntErr); return cmd_cntx.rb->SendError(kInvalidIntErr);
} }
CmdArgList fields = parser.Tail(); CmdArgList fields = parser.Tail();
if (fields.size() != numFields) { if (fields.size() != numFields) {
return builder->SendError("The `numfields` parameter must match the number of arguments", return cmd_cntx.rb->SendError("The `numfields` parameter must match the number of arguments",
kSyntaxErrType); kSyntaxErrType);
} }
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return OpHExpire(t->GetOpArgs(shard), key, ttl_sec, fields); return OpHExpire(t->GetOpArgs(shard), key, ttl_sec, fields);
}; };
OpResult<vector<long>> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<vector<long>> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder); auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (result) { if (result) {
rb->StartArray(result->size()); rb->StartArray(result->size());
const auto& array = result.value(); const auto& array = result.value();
@ -866,11 +867,11 @@ void HSetFamily::HExpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
rb->SendLong(v); rb->SendLong(v);
} }
} else { } else {
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
} }
void HSetFamily::HMGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void HSetFamily::HMGet(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
args.remove_prefix(1); args.remove_prefix(1);
@ -878,12 +879,11 @@ void HSetFamily::HMGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* build
return OpHMGet(t->GetOpArgs(shard), key, args); return OpHMGet(t->GetOpArgs(shard), key, args);
}; };
OpResult<vector<OptStr>> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<vector<OptStr>> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(builder); auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (result) { if (result) {
SinkReplyBuilder::ReplyAggregator agg(builder); SinkReplyBuilder::ReplyAggregator agg(rb);
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(result->size()); rb->StartArray(result->size());
for (const auto& val : *result) { for (const auto& val : *result) {
if (val) { if (val) {
@ -893,18 +893,18 @@ void HSetFamily::HMGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* build
} }
} }
} else if (result.status() == OpStatus::KEY_NOTFOUND) { } else if (result.status() == OpStatus::KEY_NOTFOUND) {
SinkReplyBuilder::ReplyAggregator agg(builder); SinkReplyBuilder::ReplyAggregator agg(rb);
rb->StartArray(args.size()); rb->StartArray(args.size());
for (unsigned i = 0; i < args.size(); ++i) { for (unsigned i = 0; i < args.size(); ++i) {
rb->SendNull(); rb->SendNull();
} }
} else { } else {
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
} }
void HSetFamily::HGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void HSetFamily::HGet(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view field = ArgS(args, 1); string_view field = ArgS(args, 1);
@ -912,27 +912,27 @@ void HSetFamily::HGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde
return OpGet(t->GetOpArgs(shard), key, field); return OpGet(t->GetOpArgs(shard), key, field);
}; };
auto* rb = static_cast<RedisReplyBuilder*>(builder); auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
OpResult<string> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<string> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
rb->SendBulkString(*result); rb->SendBulkString(*result);
} else { } else {
if (result.status() == OpStatus::KEY_NOTFOUND) { if (result.status() == OpStatus::KEY_NOTFOUND) {
rb->SendNull(); rb->SendNull();
} else { } else {
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
} }
} }
void HSetFamily::HIncrBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void HSetFamily::HIncrBy(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view field = ArgS(args, 1); string_view field = ArgS(args, 1);
string_view incrs = ArgS(args, 2); string_view incrs = ArgS(args, 2);
int64_t ival = 0; int64_t ival = 0;
if (!absl::SimpleAtoi(incrs, &ival)) { if (!absl::SimpleAtoi(incrs, &ival)) {
return builder->SendError(kInvalidIntErr); return cmd_cntx.rb->SendError(kInvalidIntErr);
} }
IncrByParam param{ival}; IncrByParam param{ival};
@ -941,33 +941,33 @@ void HSetFamily::HIncrBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
return OpIncrBy(t->GetOpArgs(shard), key, field, &param); return OpIncrBy(t->GetOpArgs(shard), key, field, &param);
}; };
OpStatus status = tx->ScheduleSingleHop(std::move(cb)); OpStatus status = cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
if (status == OpStatus::OK) { if (status == OpStatus::OK) {
builder->SendLong(get<int64_t>(param)); cmd_cntx.rb->SendLong(get<int64_t>(param));
} else { } else {
switch (status) { switch (status) {
case OpStatus::INVALID_VALUE: case OpStatus::INVALID_VALUE:
builder->SendError("hash value is not an integer"); cmd_cntx.rb->SendError("hash value is not an integer");
break; break;
case OpStatus::OUT_OF_RANGE: case OpStatus::OUT_OF_RANGE:
builder->SendError(kIncrOverflow); cmd_cntx.rb->SendError(kIncrOverflow);
break; break;
default: default:
builder->SendError(status); cmd_cntx.rb->SendError(status);
break; break;
} }
} }
} }
void HSetFamily::HIncrByFloat(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void HSetFamily::HIncrByFloat(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view field = ArgS(args, 1); string_view field = ArgS(args, 1);
string_view incrs = ArgS(args, 2); string_view incrs = ArgS(args, 2);
double dval = 0; double dval = 0;
if (!absl::SimpleAtod(incrs, &dval)) { if (!absl::SimpleAtod(incrs, &dval)) {
return builder->SendError(kInvalidFloatErr); return cmd_cntx.rb->SendError(kInvalidFloatErr);
} }
IncrByParam param{dval}; IncrByParam param{dval};
@ -976,55 +976,55 @@ void HSetFamily::HIncrByFloat(CmdArgList args, Transaction* tx, SinkReplyBuilder
return OpIncrBy(t->GetOpArgs(shard), key, field, &param); return OpIncrBy(t->GetOpArgs(shard), key, field, &param);
}; };
OpStatus status = tx->ScheduleSingleHop(std::move(cb)); OpStatus status = cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
if (status == OpStatus::OK) { if (status == OpStatus::OK) {
auto* rb = static_cast<RedisReplyBuilder*>(builder); auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
rb->SendDouble(get<double>(param)); rb->SendDouble(get<double>(param));
} else { } else {
switch (status) { switch (status) {
case OpStatus::INVALID_VALUE: case OpStatus::INVALID_VALUE:
builder->SendError("hash value is not a float"); cmd_cntx.rb->SendError("hash value is not a float");
break; break;
default: default:
builder->SendError(status); cmd_cntx.rb->SendError(status);
break; break;
} }
} }
} }
void HSetFamily::HKeys(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void HSetFamily::HKeys(CmdArgList args, const CommandContext& cmd_cntx) {
HGetGeneric(args, FIELDS, tx, builder); HGetGeneric(args, FIELDS, cmd_cntx.tx, cmd_cntx.rb);
} }
void HSetFamily::HVals(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void HSetFamily::HVals(CmdArgList args, const CommandContext& cmd_cntx) {
HGetGeneric(args, VALUES, tx, builder); HGetGeneric(args, VALUES, cmd_cntx.tx, cmd_cntx.rb);
} }
void HSetFamily::HGetAll(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void HSetFamily::HGetAll(CmdArgList args, const CommandContext& cmd_cntx) {
HGetGeneric(args, GetAllMode::FIELDS | GetAllMode::VALUES, tx, builder); HGetGeneric(args, GetAllMode::FIELDS | GetAllMode::VALUES, cmd_cntx.tx, cmd_cntx.rb);
} }
void HSetFamily::HScan(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void HSetFamily::HScan(CmdArgList args, const CommandContext& cmd_cntx) {
std::string_view key = ArgS(args, 0); std::string_view key = ArgS(args, 0);
std::string_view token = ArgS(args, 1); std::string_view token = ArgS(args, 1);
uint64_t cursor = 0; uint64_t cursor = 0;
if (!absl::SimpleAtoi(token, &cursor)) { if (!absl::SimpleAtoi(token, &cursor)) {
return builder->SendError("invalid cursor"); return cmd_cntx.rb->SendError("invalid cursor");
} }
// HSCAN key cursor [MATCH pattern] [COUNT count] // HSCAN key cursor [MATCH pattern] [COUNT count]
if (args.size() > 6) { if (args.size() > 6) {
DVLOG(1) << "got " << args.size() << " this is more than it should be"; DVLOG(1) << "got " << args.size() << " this is more than it should be";
return builder->SendError(kSyntaxErr); return cmd_cntx.rb->SendError(kSyntaxErr);
} }
OpResult<ScanOpts> ops = ScanOpts::TryFrom(args.subspan(2)); OpResult<ScanOpts> ops = ScanOpts::TryFrom(args.subspan(2));
if (!ops) { if (!ops) {
DVLOG(1) << "HScan invalid args - return " << ops << " to the user"; DVLOG(1) << "HScan invalid args - return " << ops << " to the user";
return builder->SendError(ops.status()); return cmd_cntx.rb->SendError(ops.status());
} }
ScanOpts scan_op = ops.value(); ScanOpts scan_op = ops.value();
@ -1033,8 +1033,8 @@ void HSetFamily::HScan(CmdArgList args, Transaction* tx, SinkReplyBuilder* build
return OpScan(t->GetOpArgs(shard), key, &cursor, scan_op); return OpScan(t->GetOpArgs(shard), key, &cursor, scan_op);
}; };
auto* rb = static_cast<RedisReplyBuilder*>(builder); auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
OpResult<StringVec> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<StringVec> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result.status() != OpStatus::WRONG_TYPE) { if (result.status() != OpStatus::WRONG_TYPE) {
rb->StartArray(2); rb->StartArray(2);
rb->SendBulkString(absl::StrCat(cursor)); rb->SendBulkString(absl::StrCat(cursor));
@ -1043,18 +1043,17 @@ void HSetFamily::HScan(CmdArgList args, Transaction* tx, SinkReplyBuilder* build
rb->SendBulkString(k); rb->SendBulkString(k);
} }
} else { } else {
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
} }
void HSetFamily::HSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void HSetFamily::HSet(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view cmd{cntx->cid->name()}; string_view cmd{cmd_cntx.conn_cntx->cid->name()};
if (args.size() % 2 != 1) { if (args.size() % 2 != 1) {
return builder->SendError(facade::WrongNumArgsError(cmd), kSyntaxErrType); return cmd_cntx.rb->SendError(facade::WrongNumArgsError(cmd), kSyntaxErrType);
} }
args.remove_prefix(1); args.remove_prefix(1);
@ -1062,16 +1061,16 @@ void HSetFamily::HSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builde
return OpSet(t->GetOpArgs(shard), key, args); return OpSet(t->GetOpArgs(shard), key, args);
}; };
OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<uint32_t> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result && cmd == "HSET") { if (result && cmd == "HSET") {
builder->SendLong(*result); cmd_cntx.rb->SendLong(*result);
} else { } else {
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
} }
void HSetFamily::HSetNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void HSetFamily::HSetNx(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
args.remove_prefix(1); args.remove_prefix(1);
@ -1079,15 +1078,15 @@ void HSetFamily::HSetNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
return OpSet(t->GetOpArgs(shard), key, args, OpSetParams{.skip_if_exists = true}); return OpSet(t->GetOpArgs(shard), key, args, OpSetParams{.skip_if_exists = true});
}; };
OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<uint32_t> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
builder->SendLong(*result); cmd_cntx.rb->SendLong(*result);
} else { } else {
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
} }
void HSetFamily::HStrLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void HSetFamily::HStrLen(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view field = ArgS(args, 1); string_view field = ArgS(args, 1);
@ -1095,11 +1094,11 @@ void HSetFamily::HStrLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
return OpStrLen(t->GetOpArgs(shard), key, field); return OpStrLen(t->GetOpArgs(shard), key, field);
}; };
OpResult<size_t> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<size_t> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
builder->SendLong(*result); cmd_cntx.rb->SendLong(*result);
} else { } else {
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
} }
@ -1111,10 +1110,10 @@ void StrVecEmplaceBack(StringVec& str_vec, const listpackEntry& lp) {
str_vec.emplace_back(absl::StrCat(lp.lval)); str_vec.emplace_back(absl::StrCat(lp.lval));
} }
void HSetFamily::HRandField(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void HSetFamily::HRandField(CmdArgList args, const CommandContext& cmd_cntx) {
if (args.size() > 3) { if (args.size() > 3) {
DVLOG(1) << "Wrong number of command arguments: " << args.size(); DVLOG(1) << "Wrong number of command arguments: " << args.size();
return builder->SendError(kSyntaxErr); return cmd_cntx.rb->SendError(kSyntaxErr);
} }
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
@ -1122,13 +1121,13 @@ void HSetFamily::HRandField(CmdArgList args, Transaction* tx, SinkReplyBuilder*
bool with_values = false; bool with_values = false;
if ((args.size() > 1) && (!SimpleAtoi(ArgS(args, 1), &count))) { if ((args.size() > 1) && (!SimpleAtoi(ArgS(args, 1), &count))) {
return builder->SendError("count value is not an integer", kSyntaxErrType); return cmd_cntx.rb->SendError("count value is not an integer", kSyntaxErrType);
} }
if (args.size() == 3) { if (args.size() == 3) {
string arg = absl::AsciiStrToUpper(ArgS(args, 2)); string arg = absl::AsciiStrToUpper(ArgS(args, 2));
if (arg != "WITHVALUES") if (arg != "WITHVALUES")
return builder->SendError(kSyntaxErr); return cmd_cntx.rb->SendError(kSyntaxErr);
else else
with_values = true; with_values = true;
} }
@ -1214,8 +1213,8 @@ void HSetFamily::HRandField(CmdArgList args, Transaction* tx, SinkReplyBuilder*
return str_vec; return str_vec;
}; };
auto* rb = static_cast<RedisReplyBuilder*>(builder); auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
OpResult<StringVec> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<StringVec> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
if ((result->size() == 1) && (args.size() == 1)) if ((result->size() == 1) && (args.size() == 1))
rb->SendBulkString(result->front()); rb->SendBulkString(result->front());
@ -1227,7 +1226,7 @@ void HSetFamily::HRandField(CmdArgList args, Transaction* tx, SinkReplyBuilder*
else else
rb->SendEmptyArray(); rb->SendEmptyArray();
} else { } else {
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
} }

View file

@ -7,14 +7,12 @@
#include <optional> #include <optional>
#include "facade/op_status.h" #include "facade/op_status.h"
#include "server/command_registry.h"
#include "server/common.h" #include "server/common.h"
#include "server/table.h" #include "server/table.h"
namespace dfly { namespace dfly {
class CommandRegistry;
class StringMap; class StringMap;
class Transaction;
using facade::OpResult; using facade::OpResult;
using facade::OpStatus; using facade::OpStatus;
@ -36,23 +34,22 @@ class HSetFamily {
private: private:
using SinkReplyBuilder = facade::SinkReplyBuilder; using SinkReplyBuilder = facade::SinkReplyBuilder;
static void HExpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void HExpire(CmdArgList args, const CommandContext& cmd_cntx);
static void HDel(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void HDel(CmdArgList args, const CommandContext& cmd_cntx);
static void HLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void HLen(CmdArgList args, const CommandContext& cmd_cntx);
static void HExists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void HExists(CmdArgList args, const CommandContext& cmd_cntx);
static void HGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void HGet(CmdArgList args, const CommandContext& cmd_cntx);
static void HMGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void HMGet(CmdArgList args, const CommandContext& cmd_cntx);
static void HIncrBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void HIncrBy(CmdArgList args, const CommandContext& cmd_cntx);
static void HKeys(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void HKeys(CmdArgList args, const CommandContext& cmd_cntx);
static void HVals(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void HVals(CmdArgList args, const CommandContext& cmd_cntx);
static void HGetAll(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void HGetAll(CmdArgList args, const CommandContext& cmd_cntx);
static void HIncrByFloat(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void HIncrByFloat(CmdArgList args, const CommandContext& cmd_cntx);
static void HScan(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void HScan(CmdArgList args, const CommandContext& cmd_cntx);
static void HSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, static void HSet(CmdArgList args, const CommandContext& cmd_cntx);
ConnectionContext* cntx); static void HSetNx(CmdArgList args, const CommandContext& cmd_cntx);
static void HSetNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void HStrLen(CmdArgList args, const CommandContext& cmd_cntx);
static void HStrLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); static void HRandField(CmdArgList args, const CommandContext& cmd_cntx);
static void HRandField(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
}; };
} // namespace dfly } // namespace dfly

View file

@ -984,7 +984,7 @@ struct SetReplies {
bool script; bool script;
}; };
void SAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void SAdd(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
auto values = args.subspan(1); auto values = args.subspan(1);
@ -992,15 +992,15 @@ void SAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return OpAdd(t->GetOpArgs(shard), key, values, false, false); return OpAdd(t->GetOpArgs(shard), key, values, false, false);
}; };
OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<uint32_t> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
return builder->SendLong(result.value()); return cmd_cntx.rb->SendLong(result.value());
} }
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
void SIsMember(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void SIsMember(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view val = ArgS(args, 1); string_view val = ArgS(args, 1);
@ -1015,11 +1015,11 @@ void SIsMember(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return find_res.status(); return find_res.status();
}; };
OpResult<void> result = tx->ScheduleSingleHop(std::move(cb)); OpResult<void> result = cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
SendNumeric(result ? OpResult<uint32_t>(1) : result.status(), builder); SendNumeric(result ? OpResult<uint32_t>(1) : result.status(), cmd_cntx.rb);
} }
void SMIsMember(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void SMIsMember(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
auto members = args.subspan(1); auto members = args.subspan(1);
@ -1040,35 +1040,35 @@ void SMIsMember(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return find_res.status(); return find_res.status();
}; };
auto* rb = static_cast<RedisReplyBuilder*>(builder); auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
OpResult<void> result = tx->ScheduleSingleHop(std::move(cb)); OpResult<void> result = cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
if (result || result == OpStatus::KEY_NOTFOUND) { if (result || result == OpStatus::KEY_NOTFOUND) {
rb->StartArray(memberships.size()); rb->StartArray(memberships.size());
for (bool b : memberships) for (bool b : memberships)
rb->SendLong(int(b)); rb->SendLong(int(b));
return; return;
} }
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
void SMove(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void SMove(CmdArgList args, const CommandContext& cmd_cntx) {
string_view src = ArgS(args, 0); string_view src = ArgS(args, 0);
string_view dest = ArgS(args, 1); string_view dest = ArgS(args, 1);
string_view member = ArgS(args, 2); string_view member = ArgS(args, 2);
Mover mover{src, dest, member, true}; Mover mover{src, dest, member, true};
mover.Find(tx); mover.Find(cmd_cntx.tx);
OpResult<unsigned> result = mover.Commit(tx); OpResult<unsigned> result = mover.Commit(cmd_cntx.tx);
if (!result) { if (!result) {
return builder->SendError(result.status()); return cmd_cntx.rb->SendError(result.status());
return; return;
} }
builder->SendLong(result.value()); cmd_cntx.rb->SendLong(result.value());
} }
void SRem(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void SRem(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
auto vals = args.subspan(1); auto vals = args.subspan(1);
@ -1076,11 +1076,11 @@ void SRem(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return OpRem(t->GetOpArgs(shard), key, vals, false); return OpRem(t->GetOpArgs(shard), key, vals, false);
}; };
OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<uint32_t> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
SendNumeric(result, builder); SendNumeric(result, cmd_cntx.rb);
} }
void SCard(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void SCard(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<uint32_t> { auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<uint32_t> {
@ -1092,17 +1092,17 @@ void SCard(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return find_res.value()->second.Size(); return find_res.value()->second.Size();
}; };
OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<uint32_t> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
SendNumeric(result, builder); SendNumeric(result, cmd_cntx.rb);
} }
void SPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void SPop(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
unsigned count = 1; unsigned count = 1;
if (args.size() > 1) { if (args.size() > 1) {
string_view arg = ArgS(args, 1); string_view arg = ArgS(args, 1);
if (!absl::SimpleAtoi(arg, &count)) { if (!absl::SimpleAtoi(arg, &count)) {
builder->SendError(kInvalidIntErr); cmd_cntx.rb->SendError(kInvalidIntErr);
return; return;
} }
} }
@ -1111,8 +1111,8 @@ void SPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return OpPop(t->GetOpArgs(shard), key, count); return OpPop(t->GetOpArgs(shard), key, count);
}; };
auto* rb = static_cast<RedisReplyBuilder*>(builder); auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
OpResult<StringVec> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<StringVec> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result || result.status() == OpStatus::KEY_NOTFOUND) { if (result || result.status() == OpStatus::KEY_NOTFOUND) {
if (args.size() == 1) { // SPOP key if (args.size() == 1) { // SPOP key
if (result.status() == OpStatus::KEY_NOTFOUND) { if (result.status() == OpStatus::KEY_NOTFOUND) {
@ -1127,10 +1127,10 @@ void SPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return; return;
} }
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
void SDiff(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) { void SDiff(CmdArgList args, const CommandContext& cmd_cntx) {
ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED); ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED);
string_view src_key = ArgS(args, 0); string_view src_key = ArgS(args, 0);
ShardId src_shard = Shard(src_key, result_set.size()); ShardId src_shard = Shard(src_key, result_set.size());
@ -1147,12 +1147,12 @@ void SDiff(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, Connecti
return OpStatus::OK; return OpStatus::OK;
}; };
tx->ScheduleSingleHop(std::move(cb)); cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
ResultSetView rsv = DiffResultVec(result_set, src_shard); ResultSetView rsv = DiffResultVec(result_set, src_shard);
SetReplies{builder, bool(cntx->conn_state.script_info)}.Send(rsv); SetReplies{cmd_cntx.rb, bool(cmd_cntx.conn_cntx->conn_state.script_info)}.Send(rsv);
} }
void SDiffStore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void SDiffStore(CmdArgList args, const CommandContext& cmd_cntx) {
ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED); ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED);
string_view dest_key = ArgS(args, 0); string_view dest_key = ArgS(args, 0);
ShardId dest_shard = Shard(dest_key, result_set.size()); ShardId dest_shard = Shard(dest_key, result_set.size());
@ -1185,11 +1185,11 @@ void SDiffStore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return OpStatus::OK; return OpStatus::OK;
}; };
tx->Execute(std::move(diff_cb), false); cmd_cntx.tx->Execute(std::move(diff_cb), false);
ResultSetView rsv = DiffResultVec(result_set, src_shard); ResultSetView rsv = DiffResultVec(result_set, src_shard);
if (!rsv) { if (!rsv) {
tx->Conclude(); cmd_cntx.tx->Conclude();
builder->SendError(rsv.status()); cmd_cntx.rb->SendError(rsv.status());
return; return;
} }
@ -1202,24 +1202,23 @@ void SDiffStore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return OpStatus::OK; return OpStatus::OK;
}; };
tx->Execute(std::move(store_cb), true); cmd_cntx.tx->Execute(std::move(store_cb), true);
builder->SendLong(result_size); cmd_cntx.rb->SendLong(result_size);
} }
void SMembers(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, void SMembers(CmdArgList args, const CommandContext& cmd_cntx) {
ConnectionContext* cntx) {
auto cb = [](Transaction* t, EngineShard* shard) { return OpInter(t, shard, false); }; auto cb = [](Transaction* t, EngineShard* shard) { return OpInter(t, shard, false); };
OpResult<StringVec> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<StringVec> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result || result.status() == OpStatus::KEY_NOTFOUND) { if (result || result.status() == OpStatus::KEY_NOTFOUND) {
SetReplies{builder, bool(cntx->conn_state.script_info)}.Send(&result.value()); SetReplies{cmd_cntx.rb, bool(cmd_cntx.conn_cntx->conn_state.script_info)}.Send(&result.value());
} else { } else {
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
} }
void SRandMember(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void SRandMember(CmdArgList args, const CommandContext& cmd_cntx) {
CmdArgParser parser{args}; CmdArgParser parser{args};
string_view key = parser.Next(); string_view key = parser.Next();
@ -1227,17 +1226,17 @@ void SRandMember(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
int count = is_count ? parser.Next<int>() : 1; int count = is_count ? parser.Next<int>() : 1;
if (parser.HasNext()) if (parser.HasNext())
return builder->SendError(WrongNumArgsError("SRANDMEMBER")); return cmd_cntx.rb->SendError(WrongNumArgsError("SRANDMEMBER"));
if (auto err = parser.Error(); err) if (auto err = parser.Error(); err)
return builder->SendError(err->MakeReply()); return cmd_cntx.rb->SendError(err->MakeReply());
const auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<StringVec> { const auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<StringVec> {
return OpRandMember(t->GetOpArgs(shard), key, count); return OpRandMember(t->GetOpArgs(shard), key, count);
}; };
OpResult<StringVec> result = tx->ScheduleSingleHopT(cb); OpResult<StringVec> result = cmd_cntx.tx->ScheduleSingleHopT(cb);
auto* rb = static_cast<RedisReplyBuilder*>(builder); auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
if (result || result == OpStatus::KEY_NOTFOUND) { if (result || result == OpStatus::KEY_NOTFOUND) {
if (is_count) { if (is_count) {
rb->SendBulkStrArr(*result, RedisReplyBuilder::SET); rb->SendBulkStrArr(*result, RedisReplyBuilder::SET);
@ -1248,10 +1247,10 @@ void SRandMember(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
} }
return; return;
} }
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
void SInter(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) { void SInter(CmdArgList args, const CommandContext& cmd_cntx) {
ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED); ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED);
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
@ -1260,16 +1259,16 @@ void SInter(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, Connect
return OpStatus::OK; return OpStatus::OK;
}; };
tx->ScheduleSingleHop(std::move(cb)); cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
OpResult<SvArray> result = InterResultVec(result_set, tx->GetUniqueShardCnt()); OpResult<SvArray> result = InterResultVec(result_set, cmd_cntx.tx->GetUniqueShardCnt());
if (result) { if (result) {
SetReplies{builder, bool(cntx->conn_state.script_info)}.Send(&*result); SetReplies{cmd_cntx.rb, bool(cmd_cntx.conn_cntx->conn_state.script_info)}.Send(&*result);
} else { } else {
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
} }
void SInterStore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void SInterStore(CmdArgList args, const CommandContext& cmd_cntx) {
ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED); ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED);
string_view dest_key = ArgS(args, 0); string_view dest_key = ArgS(args, 0);
ShardId dest_shard = Shard(dest_key, result_set.size()); ShardId dest_shard = Shard(dest_key, result_set.size());
@ -1287,12 +1286,12 @@ void SInterStore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return OpStatus::OK; return OpStatus::OK;
}; };
tx->Execute(std::move(inter_cb), false); cmd_cntx.tx->Execute(std::move(inter_cb), false);
OpResult<SvArray> result = InterResultVec(result_set, inter_shard_cnt.load(memory_order_relaxed)); OpResult<SvArray> result = InterResultVec(result_set, inter_shard_cnt.load(memory_order_relaxed));
if (!result) { if (!result) {
tx->Conclude(); cmd_cntx.tx->Conclude();
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
return; return;
} }
@ -1304,21 +1303,21 @@ void SInterStore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return OpStatus::OK; return OpStatus::OK;
}; };
tx->Execute(std::move(store_cb), true); cmd_cntx.tx->Execute(std::move(store_cb), true);
builder->SendLong(result->size()); cmd_cntx.rb->SendLong(result->size());
} }
void SInterCard(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void SInterCard(CmdArgList args, const CommandContext& cmd_cntx) {
unsigned num_keys; unsigned num_keys;
if (!absl::SimpleAtoi(ArgS(args, 0), &num_keys)) if (!absl::SimpleAtoi(ArgS(args, 0), &num_keys))
return builder->SendError(kSyntaxErr); return cmd_cntx.rb->SendError(kSyntaxErr);
unsigned limit = 0; unsigned limit = 0;
if (args.size() == (num_keys + 3) && ArgS(args, 1 + num_keys) == "LIMIT") { if (args.size() == (num_keys + 3) && ArgS(args, 1 + num_keys) == "LIMIT") {
if (!absl::SimpleAtoi(ArgS(args, num_keys + 2), &limit)) if (!absl::SimpleAtoi(ArgS(args, num_keys + 2), &limit))
return builder->SendError("limit can't be negative"); return cmd_cntx.rb->SendError("limit can't be negative");
} else if (args.size() > (num_keys + 1)) } else if (args.size() > (num_keys + 1))
return builder->SendError(kSyntaxErr); return cmd_cntx.rb->SendError(kSyntaxErr);
ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED); ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED);
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
@ -1326,16 +1325,16 @@ void SInterCard(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return OpStatus::OK; return OpStatus::OK;
}; };
tx->ScheduleSingleHop(std::move(cb)); cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
OpResult<SvArray> result = InterResultVec(result_set, tx->GetUniqueShardCnt(), limit); OpResult<SvArray> result = InterResultVec(result_set, cmd_cntx.tx->GetUniqueShardCnt(), limit);
if (result) { if (result) {
return builder->SendLong(result->size()); return cmd_cntx.rb->SendLong(result->size());
} }
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
void SUnion(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) { void SUnion(CmdArgList args, const CommandContext& cmd_cntx) {
ResultStringVec result_set(shard_set->size()); ResultStringVec result_set(shard_set->size());
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
@ -1344,13 +1343,13 @@ void SUnion(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, Connect
return OpStatus::OK; return OpStatus::OK;
}; };
tx->ScheduleSingleHop(std::move(cb)); cmd_cntx.tx->ScheduleSingleHop(std::move(cb));
ResultSetView unionset = UnionResultVec(result_set); ResultSetView unionset = UnionResultVec(result_set);
SetReplies{builder, bool(cntx->conn_state.script_info)}.Send(unionset); SetReplies{cmd_cntx.rb, bool(cmd_cntx.conn_cntx->conn_state.script_info)}.Send(unionset);
} }
void SUnionStore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void SUnionStore(CmdArgList args, const CommandContext& cmd_cntx) {
ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED); ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED);
string_view dest_key = ArgS(args, 0); string_view dest_key = ArgS(args, 0);
ShardId dest_shard = Shard(dest_key, result_set.size()); ShardId dest_shard = Shard(dest_key, result_set.size());
@ -1368,12 +1367,12 @@ void SUnionStore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return OpStatus::OK; return OpStatus::OK;
}; };
tx->Execute(std::move(union_cb), false); cmd_cntx.tx->Execute(std::move(union_cb), false);
ResultSetView unionset = UnionResultVec(result_set); ResultSetView unionset = UnionResultVec(result_set);
if (!unionset) { if (!unionset) {
tx->Conclude(); cmd_cntx.tx->Conclude();
builder->SendError(unionset.status()); cmd_cntx.rb->SendError(unionset.status());
return; return;
} }
@ -1386,30 +1385,30 @@ void SUnionStore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return OpStatus::OK; return OpStatus::OK;
}; };
tx->Execute(std::move(store_cb), true); cmd_cntx.tx->Execute(std::move(store_cb), true);
builder->SendLong(result_size); cmd_cntx.rb->SendLong(result_size);
} }
void SScan(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void SScan(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view token = ArgS(args, 1); string_view token = ArgS(args, 1);
uint64_t cursor = 0; uint64_t cursor = 0;
if (!absl::SimpleAtoi(token, &cursor)) { if (!absl::SimpleAtoi(token, &cursor)) {
return builder->SendError("invalid cursor"); return cmd_cntx.rb->SendError("invalid cursor");
} }
// SSCAN key cursor [MATCH pattern] [COUNT count] // SSCAN key cursor [MATCH pattern] [COUNT count]
if (args.size() > 6) { if (args.size() > 6) {
DVLOG(1) << "got " << args.size() << " this is more than it should be"; DVLOG(1) << "got " << args.size() << " this is more than it should be";
return builder->SendError(kSyntaxErr); return cmd_cntx.rb->SendError(kSyntaxErr);
} }
OpResult<ScanOpts> ops = ScanOpts::TryFrom(args.subspan(2)); OpResult<ScanOpts> ops = ScanOpts::TryFrom(args.subspan(2));
if (!ops) { if (!ops) {
DVLOG(1) << "SScan invalid args - return " << ops << " to the user"; DVLOG(1) << "SScan invalid args - return " << ops << " to the user";
return builder->SendError(ops.status()); return cmd_cntx.rb->SendError(ops.status());
} }
ScanOpts scan_op = ops.value(); ScanOpts scan_op = ops.value();
@ -1418,8 +1417,8 @@ void SScan(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return OpScan(t->GetOpArgs(shard), key, &cursor, scan_op); return OpScan(t->GetOpArgs(shard), key, &cursor, scan_op);
}; };
auto* rb = static_cast<RedisReplyBuilder*>(builder); auto* rb = static_cast<RedisReplyBuilder*>(cmd_cntx.rb);
OpResult<StringVec> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<StringVec> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result.status() != OpStatus::WRONG_TYPE) { if (result.status() != OpStatus::WRONG_TYPE) {
rb->StartArray(2); rb->StartArray(2);
rb->SendBulkString(absl::StrCat(cursor)); rb->SendBulkString(absl::StrCat(cursor));
@ -1428,19 +1427,19 @@ void SScan(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
rb->SendBulkString(k); rb->SendBulkString(k);
} }
} else { } else {
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
} }
// Syntax: saddex key ttl_sec member [member...] // Syntax: saddex key ttl_sec member [member...]
void SAddEx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { void SAddEx(CmdArgList args, const CommandContext& cmd_cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view ttl_str = ArgS(args, 1); string_view ttl_str = ArgS(args, 1);
uint32_t ttl_sec; uint32_t ttl_sec;
constexpr uint32_t kMaxTtl = (1UL << 26); constexpr uint32_t kMaxTtl = (1UL << 26);
if (!absl::SimpleAtoi(ttl_str, &ttl_sec) || ttl_sec == 0 || ttl_sec > kMaxTtl) { if (!absl::SimpleAtoi(ttl_str, &ttl_sec) || ttl_sec == 0 || ttl_sec > kMaxTtl) {
return builder->SendError(kInvalidIntErr); return cmd_cntx.rb->SendError(kInvalidIntErr);
} }
auto vals = args.subspan(2); auto vals = args.subspan(2);
@ -1448,12 +1447,12 @@ void SAddEx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
return OpAddEx(t->GetOpArgs(shard), key, ttl_sec, vals); return OpAddEx(t->GetOpArgs(shard), key, ttl_sec, vals);
}; };
OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb)); OpResult<uint32_t> result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
return builder->SendLong(result.value()); return cmd_cntx.rb->SendLong(result.value());
} }
builder->SendError(result.status()); cmd_cntx.rb->SendError(result.status());
} }
} // namespace } // namespace

View file

@ -33,8 +33,6 @@ class SetFamily {
static std::vector<long> SetFieldsExpireTime(const OpArgs& op_args, uint32_t ttl_sec, static std::vector<long> SetFieldsExpireTime(const OpArgs& op_args, uint32_t ttl_sec,
CmdArgList values, PrimeValue* pv); CmdArgList values, PrimeValue* pv);
private:
}; };
} // namespace dfly } // namespace dfly