chore: pass SinkReplyBuilder and Transaction explicitly. Part4 (#3967)

This commit is contained in:
Roman Gershman 2024-10-23 21:21:46 +03:00 committed by GitHub
parent 70614b8d40
commit 0ebc1a11e1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 163 additions and 165 deletions

View file

@ -39,13 +39,13 @@ using BitsStrVec = vector<string>;
// The following is the list of the functions that would handle the // The following is the list of the functions that would handle the
// commands that handle the bit operations // commands that handle the bit operations
void BitPos(CmdArgList args, ConnectionContext* cntx); void BitPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void BitCount(CmdArgList args, ConnectionContext* cntx); void BitCount(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void BitField(CmdArgList args, ConnectionContext* cntx); void BitField(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void BitFieldRo(CmdArgList args, ConnectionContext* cntx); void BitFieldRo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void BitOp(CmdArgList args, ConnectionContext* cntx); void BitOp(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void GetBit(CmdArgList args, ConnectionContext* cntx); void GetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
void SetBit(CmdArgList args, ConnectionContext* cntx); void SetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
OpResult<string> ReadValue(const DbContext& context, string_view key, EngineShard* shard); OpResult<string> ReadValue(const DbContext& context, string_view key, EngineShard* shard);
OpResult<bool> ReadValueBitsetAt(const OpArgs& op_args, string_view key, uint32_t offset); OpResult<bool> ReadValueBitsetAt(const OpArgs& op_args, string_view key, uint32_t offset);
@ -489,22 +489,23 @@ OpResult<string> RunBitOpOnShard(string_view op, const OpArgs& op_args, ShardArg
return op_result; return op_result;
} }
template <typename T> void HandleOpValueResult(const OpResult<T>& result, ConnectionContext* cntx) { template <typename T>
void HandleOpValueResult(const OpResult<T>& result, SinkReplyBuilder* builder) {
static_assert(std::is_integral<T>::value, static_assert(std::is_integral<T>::value,
"we are only handling types that are integral types in the return types from " "we are only handling types that are integral types in the return types from "
"here"); "here");
if (result) { if (result) {
cntx->SendLong(result.value()); builder->SendLong(result.value());
} else { } else {
switch (result.status()) { switch (result.status()) {
case OpStatus::WRONG_TYPE: case OpStatus::WRONG_TYPE:
cntx->SendError(kWrongTypeErr); builder->SendError(kWrongTypeErr);
break; break;
case OpStatus::OUT_OF_MEMORY: case OpStatus::OUT_OF_MEMORY:
cntx->SendError(kOutOfMemory); builder->SendError(kOutOfMemory);
break; break;
default: default:
cntx->SendLong(0); // in case we don't have the value we should just send 0 builder->SendLong(0); // in case we don't have the value we should just send 0
break; break;
} }
} }
@ -512,12 +513,12 @@ template <typename T> void HandleOpValueResult(const OpResult<T>& result, Connec
// ------------------------------------------------------------------------- // // ------------------------------------------------------------------------- //
// Impl for the command functions // Impl for the command functions
void BitPos(CmdArgList args, ConnectionContext* cntx) { void BitPos(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
// Support for the command BITPOS // Support for the command BITPOS
// See details at https://redis.io/commands/bitpos/ // See details at https://redis.io/commands/bitpos/
if (args.size() < 1 || args.size() > 5) { if (args.size() < 1 || args.size() > 5) {
return cntx->SendError(kSyntaxErr); return builder->SendError(kSyntaxErr);
} }
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
@ -528,19 +529,19 @@ void BitPos(CmdArgList args, ConnectionContext* cntx) {
bool as_bit = false; bool as_bit = false;
if (!absl::SimpleAtoi(ArgS(args, 1), &value)) { if (!absl::SimpleAtoi(ArgS(args, 1), &value)) {
return cntx->SendError(kInvalidIntErr); return builder->SendError(kInvalidIntErr);
} else if (value != 0 && value != 1) { } else if (value != 0 && value != 1) {
return cntx->SendError("The bit argument must be 1 or 0"); return builder->SendError("The bit argument must be 1 or 0");
} }
if (args.size() >= 3) { if (args.size() >= 3) {
if (!absl::SimpleAtoi(ArgS(args, 2), &start)) { if (!absl::SimpleAtoi(ArgS(args, 2), &start)) {
return cntx->SendError(kInvalidIntErr); return builder->SendError(kInvalidIntErr);
} }
if (args.size() >= 4) { if (args.size() >= 4) {
if (!absl::SimpleAtoi(ArgS(args, 3), &end)) { if (!absl::SimpleAtoi(ArgS(args, 3), &end)) {
return cntx->SendError(kInvalidIntErr); return builder->SendError(kInvalidIntErr);
} }
if (args.size() >= 5) { if (args.size() >= 5) {
@ -550,7 +551,7 @@ void BitPos(CmdArgList args, ConnectionContext* cntx) {
} else if (arg == "BYTE") { } else if (arg == "BYTE") {
as_bit = false; as_bit = false;
} else { } else {
return cntx->SendError(kSyntaxErr); return builder->SendError(kSyntaxErr);
} }
} }
} }
@ -559,12 +560,11 @@ void BitPos(CmdArgList args, ConnectionContext* cntx) {
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return FindFirstBitWithValue(t->GetOpArgs(shard), key, value, start, end, as_bit); return FindFirstBitWithValue(t->GetOpArgs(shard), key, value, start, end, as_bit);
}; };
Transaction* trans = cntx->transaction; OpResult<int64_t> res = tx->ScheduleSingleHopT(std::move(cb));
OpResult<int64_t> res = trans->ScheduleSingleHopT(std::move(cb)); HandleOpValueResult(res, builder);
HandleOpValueResult(res, cntx);
} }
void BitCount(CmdArgList args, ConnectionContext* cntx) { void BitCount(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
// Support for the command BITCOUNT // Support for the command BITCOUNT
// See details at https://redis.io/commands/bitcount/ // See details at https://redis.io/commands/bitcount/
// Please note that if the key don't exists, it would return 0 // Please note that if the key don't exists, it would return 0
@ -579,14 +579,13 @@ void BitCount(CmdArgList args, ConnectionContext* cntx) {
bool as_bit = parser.HasNext() ? parser.MapNext("BYTE", false, "BIT", true) : false; bool as_bit = parser.HasNext() ? parser.MapNext("BYTE", false, "BIT", true) : false;
if (!parser.Finalize()) { if (!parser.Finalize()) {
return cntx->SendError(parser.Error()->MakeReply()); return builder->SendError(parser.Error()->MakeReply());
} }
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return CountBitsForValue(t->GetOpArgs(shard), key, start, end, as_bit); return CountBitsForValue(t->GetOpArgs(shard), key, start, end, as_bit);
}; };
Transaction* trans = cntx->transaction; OpResult<std::size_t> res = tx->ScheduleSingleHopT(std::move(cb));
OpResult<std::size_t> res = trans->ScheduleSingleHopT(std::move(cb)); HandleOpValueResult(res, builder);
HandleOpValueResult(res, cntx);
} }
// GCC yields a wrong warning about uninitialized optional use // GCC yields a wrong warning about uninitialized optional use
@ -1063,8 +1062,8 @@ nonstd::expected<CommandList, string> ParseToCommandList(CmdArgList args, bool r
return result; return result;
} }
void SendResults(const vector<ResultType>& results, ConnectionContext* cntx) { void SendResults(const vector<ResultType>& results, SinkReplyBuilder* builder) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); auto* rb = static_cast<RedisReplyBuilder*>(builder);
const size_t total = results.size(); const size_t total = results.size();
if (total == 0) { if (total == 0) {
rb->SendNullArray(); rb->SendNullArray();
@ -1082,9 +1081,9 @@ void SendResults(const vector<ResultType>& results, ConnectionContext* cntx) {
} }
} }
void BitFieldGeneric(CmdArgList args, bool read_only, ConnectionContext* cntx) { void BitFieldGeneric(CmdArgList args, bool read_only, Transaction* tx, SinkReplyBuilder* builder) {
if (args.size() == 1) { if (args.size() == 1) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->SendNullArray(); rb->SendNullArray();
return; return;
} }
@ -1092,7 +1091,7 @@ void BitFieldGeneric(CmdArgList args, bool read_only, ConnectionContext* cntx) {
auto maybe_ops_list = ParseToCommandList(args.subspan(1), read_only); auto maybe_ops_list = ParseToCommandList(args.subspan(1), read_only);
if (!maybe_ops_list.has_value()) { if (!maybe_ops_list.has_value()) {
cntx->SendError(maybe_ops_list.error()); builder->SendError(maybe_ops_list.error());
return; return;
} }
CommandList cmd_list = std::move(maybe_ops_list.value()); CommandList cmd_list = std::move(maybe_ops_list.value());
@ -1102,30 +1101,29 @@ void BitFieldGeneric(CmdArgList args, bool read_only, ConnectionContext* cntx) {
return executor.Execute(cmd_list); return executor.Execute(cmd_list);
}; };
Transaction* trans = cntx->transaction; OpResult<vector<ResultType>> res = tx->ScheduleSingleHopT(std::move(cb));
OpResult<vector<ResultType>> res = trans->ScheduleSingleHopT(std::move(cb));
if (res == OpStatus::WRONG_TYPE) { if (res == OpStatus::WRONG_TYPE) {
cntx->SendError(kWrongTypeErr); builder->SendError(kWrongTypeErr);
return; return;
} }
SendResults(*res, cntx); SendResults(*res, builder);
} }
void BitField(CmdArgList args, ConnectionContext* cntx) { void BitField(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
BitFieldGeneric(args, false, cntx); BitFieldGeneric(args, false, tx, builder);
} }
void BitFieldRo(CmdArgList args, ConnectionContext* cntx) { void BitFieldRo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
BitFieldGeneric(args, true, cntx); BitFieldGeneric(args, true, tx, builder);
} }
#ifndef __clang__ #ifndef __clang__
#pragma GCC diagnostic pop #pragma GCC diagnostic pop
#endif #endif
void BitOp(CmdArgList args, ConnectionContext* cntx) { void BitOp(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
static const std::array<string_view, 4> BITOP_OP_NAMES{OR_OP_NAME, XOR_OP_NAME, AND_OP_NAME, static const std::array<string_view, 4> BITOP_OP_NAMES{OR_OP_NAME, XOR_OP_NAME, AND_OP_NAME,
NOT_OP_NAME}; NOT_OP_NAME};
string op = absl::AsciiStrToUpper(ArgS(args, 0)); string op = absl::AsciiStrToUpper(ArgS(args, 0));
@ -1134,7 +1132,7 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) {
[&op](auto val) { return op == val; }); [&op](auto val) { return op == val; });
if (illegal || (op == NOT_OP_NAME && args.size() > 3)) { if (illegal || (op == NOT_OP_NAME && args.size() > 3)) {
return cntx->SendError(kSyntaxErr); // too many arguments return builder->SendError(kSyntaxErr); // too many arguments
} }
// Multi shard access - read only // Multi shard access - read only
@ -1157,13 +1155,13 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) {
return OpStatus::OK; return OpStatus::OK;
}; };
cntx->transaction->Execute(std::move(shard_bitop), false); // we still have more work to do tx->Execute(std::move(shard_bitop), false); // we still have more work to do
// All result from each shard // All result from each shard
const auto joined_results = CombineResultOp(result_set, op); const auto joined_results = CombineResultOp(result_set, op);
// Second phase - save to target key if successful // Second phase - save to target key if successful
if (!joined_results) { if (!joined_results) {
cntx->transaction->Conclude(); tx->Conclude();
cntx->SendError(joined_results.status()); builder->SendError(joined_results.status());
return; return;
} else { } else {
auto op_result = joined_results.value(); auto op_result = joined_results.value();
@ -1193,12 +1191,12 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) {
return OpStatus::OK; return OpStatus::OK;
}; };
cntx->transaction->Execute(std::move(store_cb), true); tx->Execute(std::move(store_cb), true);
cntx->SendLong(op_result.size()); builder->SendLong(op_result.size());
} }
} }
void GetBit(CmdArgList args, ConnectionContext* cntx) { void GetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
// Support for the command "GETBIT key offset" // Support for the command "GETBIT key offset"
// see https://redis.io/commands/getbit/ // see https://redis.io/commands/getbit/
@ -1206,17 +1204,16 @@ void GetBit(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
if (!absl::SimpleAtoi(ArgS(args, 1), &offset)) { if (!absl::SimpleAtoi(ArgS(args, 1), &offset)) {
return cntx->SendError(kInvalidIntErr); return builder->SendError(kInvalidIntErr);
} }
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return ReadValueBitsetAt(t->GetOpArgs(shard), key, offset); return ReadValueBitsetAt(t->GetOpArgs(shard), key, offset);
}; };
Transaction* trans = cntx->transaction; OpResult<bool> res = tx->ScheduleSingleHopT(std::move(cb));
OpResult<bool> res = trans->ScheduleSingleHopT(std::move(cb)); HandleOpValueResult(res, builder);
HandleOpValueResult(res, cntx);
} }
void SetBit(CmdArgList args, ConnectionContext* cntx) { void SetBit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
// Support for the command "SETBIT key offset new_value" // Support for the command "SETBIT key offset new_value"
// see https://redis.io/commands/setbit/ // see https://redis.io/commands/setbit/
@ -1224,16 +1221,15 @@ void SetBit(CmdArgList args, ConnectionContext* cntx) {
auto [key, offset, value] = parser.Next<string_view, uint32_t, FInt<0, 1>>(); auto [key, offset, value] = parser.Next<string_view, uint32_t, FInt<0, 1>>();
if (auto err = parser.Error(); err) { if (auto err = parser.Error(); err) {
return cntx->SendError(err->MakeReply()); return builder->SendError(err->MakeReply());
} }
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return BitNewValue(t->GetOpArgs(shard), key, offset, value != 0); return BitNewValue(t->GetOpArgs(shard), key, offset, value != 0);
}; };
Transaction* trans = cntx->transaction; OpResult<bool> res = tx->ScheduleSingleHopT(std::move(cb));
OpResult<bool> res = trans->ScheduleSingleHopT(std::move(cb)); HandleOpValueResult(res, builder);
HandleOpValueResult(res, cntx);
} }
// ------------------------------------------------------------------------- // // ------------------------------------------------------------------------- //

View file

@ -708,22 +708,22 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
return created; return created;
} }
void HGetGeneric(CmdArgList args, ConnectionContext* cntx, uint8_t getall_mask) { void HGetGeneric(CmdArgList args, uint8_t getall_mask, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return OpGetAll(t->GetOpArgs(shard), key, getall_mask); return OpGetAll(t->GetOpArgs(shard), key, getall_mask);
}; };
OpResult<vector<string>> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); OpResult<vector<string>> result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (result) { if (result) {
bool is_map = (getall_mask == (VALUES | FIELDS)); bool is_map = (getall_mask == (VALUES | FIELDS));
rb->SendStringArr(absl::Span<const string>{*result}, rb->SendStringArr(absl::Span<const string>{*result},
is_map ? RedisReplyBuilder::MAP : RedisReplyBuilder::ARRAY); is_map ? RedisReplyBuilder::MAP : RedisReplyBuilder::ARRAY);
} else { } else {
cntx->SendError(result.status()); builder->SendError(result.status());
} }
} }
@ -745,7 +745,7 @@ OpResult<vector<long>> OpHExpire(const OpArgs& op_args, string_view key, uint32_
} }
// HSETEX key [NX] tll_sec field value field value ... // HSETEX key [NX] tll_sec field value field value ...
void HSetEx(CmdArgList args, ConnectionContext* cntx) { void HSetEx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) {
CmdArgParser parser{args}; CmdArgParser parser{args};
string_view key = parser.Next(); string_view key = parser.Next();
@ -757,13 +757,13 @@ void HSetEx(CmdArgList args, ConnectionContext* cntx) {
constexpr uint32_t kMaxTtl = (1UL << 26); constexpr uint32_t kMaxTtl = (1UL << 26);
if (!absl::SimpleAtoi(ttl_str, &ttl_sec) || ttl_sec == 0 || ttl_sec > kMaxTtl) { if (!absl::SimpleAtoi(ttl_str, &ttl_sec) || ttl_sec == 0 || ttl_sec > kMaxTtl) {
return cntx->SendError(kInvalidIntErr); return builder->SendError(kInvalidIntErr);
} }
CmdArgList fields = parser.Tail(); CmdArgList fields = parser.Tail();
if (fields.size() % 2 != 0) { if (fields.size() % 2 != 0) {
return cntx->SendError(facade::WrongNumArgsError(cntx->cid->name()), kSyntaxErrType); return builder->SendError(facade::WrongNumArgsError(cntx->cid->name()), kSyntaxErrType);
} }
OpSetParams op_sp{skip_if_exists, ttl_sec}; OpSetParams op_sp{skip_if_exists, ttl_sec};
@ -772,17 +772,17 @@ void HSetEx(CmdArgList args, ConnectionContext* cntx) {
return OpSet(t->GetOpArgs(shard), key, fields, op_sp); return OpSet(t->GetOpArgs(shard), key, fields, op_sp);
}; };
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
cntx->SendLong(*result); builder->SendLong(*result);
} else { } else {
cntx->SendError(result.status()); builder->SendError(result.status());
} }
} }
} // namespace } // namespace
void HSetFamily::HDel(CmdArgList args, ConnectionContext* cntx) { void HSetFamily::HDel(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
args.remove_prefix(1); args.remove_prefix(1);
@ -790,28 +790,28 @@ void HSetFamily::HDel(CmdArgList args, ConnectionContext* cntx) {
return OpDel(t->GetOpArgs(shard), key, args); return OpDel(t->GetOpArgs(shard), key, args);
}; };
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb));
if (result || result.status() == OpStatus::KEY_NOTFOUND) { if (result || result.status() == OpStatus::KEY_NOTFOUND) {
cntx->SendLong(*result); builder->SendLong(*result);
} else { } else {
cntx->SendError(result.status()); builder->SendError(result.status());
} }
} }
void HSetFamily::HLen(CmdArgList args, ConnectionContext* cntx) { void HSetFamily::HLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
auto cb = [&](Transaction* t, EngineShard* shard) { return OpLen(t->GetOpArgs(shard), key); }; auto cb = [&](Transaction* t, EngineShard* shard) { return OpLen(t->GetOpArgs(shard), key); };
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
cntx->SendLong(*result); builder->SendLong(*result);
} else { } else {
cntx->SendError(result.status()); builder->SendError(result.status());
} }
} }
void HSetFamily::HExists(CmdArgList args, ConnectionContext* cntx) { void HSetFamily::HExists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view field = ArgS(args, 1); string_view field = ArgS(args, 1);
@ -819,46 +819,46 @@ void HSetFamily::HExists(CmdArgList args, ConnectionContext* cntx) {
return OpExist(t->GetOpArgs(shard), key, field); return OpExist(t->GetOpArgs(shard), key, field);
}; };
OpResult<int> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); OpResult<int> result = tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
cntx->SendLong(*result); builder->SendLong(*result);
} else { } else {
cntx->SendError(result.status()); builder->SendError(result.status());
} }
} }
void HSetFamily::HExpire(CmdArgList args, ConnectionContext* cntx) { void HSetFamily::HExpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
CmdArgParser parser{args}; CmdArgParser parser{args};
string_view key = parser.Next(); string_view key = parser.Next();
string_view ttl_str = parser.Next(); string_view ttl_str = parser.Next();
uint32_t ttl_sec; uint32_t ttl_sec;
constexpr uint32_t kMaxTtl = (1UL << 26); constexpr uint32_t kMaxTtl = (1UL << 26);
if (!absl::SimpleAtoi(ttl_str, &ttl_sec) || ttl_sec == 0 || ttl_sec > kMaxTtl) { if (!absl::SimpleAtoi(ttl_str, &ttl_sec) || ttl_sec == 0 || ttl_sec > kMaxTtl) {
return cntx->SendError(kInvalidIntErr); return builder->SendError(kInvalidIntErr);
} }
if (!static_cast<bool>(parser.Check("FIELDS"sv))) { if (!static_cast<bool>(parser.Check("FIELDS"sv))) {
return cntx->SendError("Mandatory argument FIELDS is missing or not at the right position", return builder->SendError("Mandatory argument FIELDS is missing or not at the right position",
kSyntaxErrType); kSyntaxErrType);
} }
string_view numFieldsStr = parser.Next(); string_view numFieldsStr = parser.Next();
uint32_t numFields; uint32_t numFields;
if (!absl::SimpleAtoi(numFieldsStr, &numFields) || numFields == 0) { if (!absl::SimpleAtoi(numFieldsStr, &numFields) || numFields == 0) {
return cntx->SendError(kInvalidIntErr); return builder->SendError(kInvalidIntErr);
} }
CmdArgList fields = parser.Tail(); CmdArgList fields = parser.Tail();
if (fields.size() != numFields) { if (fields.size() != numFields) {
return cntx->SendError("The `numfields` parameter must match the number of arguments", return builder->SendError("The `numfields` parameter must match the number of arguments",
kSyntaxErrType); kSyntaxErrType);
} }
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return OpHExpire(t->GetOpArgs(shard), key, ttl_sec, fields); return OpHExpire(t->GetOpArgs(shard), key, ttl_sec, fields);
}; };
OpResult<vector<long>> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); OpResult<vector<long>> result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (result) { if (result) {
rb->StartArray(result->size()); rb->StartArray(result->size());
const auto& array = result.value(); const auto& array = result.value();
@ -866,11 +866,11 @@ void HSetFamily::HExpire(CmdArgList args, ConnectionContext* cntx) {
rb->SendLong(v); rb->SendLong(v);
} }
} else { } else {
cntx->SendError(result.status()); builder->SendError(result.status());
} }
} }
void HSetFamily::HMGet(CmdArgList args, ConnectionContext* cntx) { void HSetFamily::HMGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
args.remove_prefix(1); args.remove_prefix(1);
@ -878,12 +878,12 @@ void HSetFamily::HMGet(CmdArgList args, ConnectionContext* cntx) {
return OpHMGet(t->GetOpArgs(shard), key, args); return OpHMGet(t->GetOpArgs(shard), key, args);
}; };
OpResult<vector<OptStr>> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); OpResult<vector<OptStr>> result = tx->ScheduleSingleHopT(std::move(cb));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (result) { if (result) {
SinkReplyBuilder::ReplyAggregator agg(cntx->reply_builder()); SinkReplyBuilder::ReplyAggregator agg(builder);
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(result->size()); rb->StartArray(result->size());
for (const auto& val : *result) { for (const auto& val : *result) {
if (val) { if (val) {
@ -893,18 +893,18 @@ void HSetFamily::HMGet(CmdArgList args, ConnectionContext* cntx) {
} }
} }
} else if (result.status() == OpStatus::KEY_NOTFOUND) { } else if (result.status() == OpStatus::KEY_NOTFOUND) {
SinkReplyBuilder::ReplyAggregator agg(cntx->reply_builder()); SinkReplyBuilder::ReplyAggregator agg(builder);
rb->StartArray(args.size()); rb->StartArray(args.size());
for (unsigned i = 0; i < args.size(); ++i) { for (unsigned i = 0; i < args.size(); ++i) {
rb->SendNull(); rb->SendNull();
} }
} else { } else {
cntx->SendError(result.status()); builder->SendError(result.status());
} }
} }
void HSetFamily::HGet(CmdArgList args, ConnectionContext* cntx) { void HSetFamily::HGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view field = ArgS(args, 1); string_view field = ArgS(args, 1);
@ -912,27 +912,27 @@ void HSetFamily::HGet(CmdArgList args, ConnectionContext* cntx) {
return OpGet(t->GetOpArgs(shard), key, field); return OpGet(t->GetOpArgs(shard), key, field);
}; };
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); auto* rb = static_cast<RedisReplyBuilder*>(builder);
OpResult<string> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); OpResult<string> result = tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
rb->SendBulkString(*result); rb->SendBulkString(*result);
} else { } else {
if (result.status() == OpStatus::KEY_NOTFOUND) { if (result.status() == OpStatus::KEY_NOTFOUND) {
rb->SendNull(); rb->SendNull();
} else { } else {
cntx->SendError(result.status()); builder->SendError(result.status());
} }
} }
} }
void HSetFamily::HIncrBy(CmdArgList args, ConnectionContext* cntx) { void HSetFamily::HIncrBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view field = ArgS(args, 1); string_view field = ArgS(args, 1);
string_view incrs = ArgS(args, 2); string_view incrs = ArgS(args, 2);
int64_t ival = 0; int64_t ival = 0;
if (!absl::SimpleAtoi(incrs, &ival)) { if (!absl::SimpleAtoi(incrs, &ival)) {
return cntx->SendError(kInvalidIntErr); return builder->SendError(kInvalidIntErr);
} }
IncrByParam param{ival}; IncrByParam param{ival};
@ -941,33 +941,33 @@ void HSetFamily::HIncrBy(CmdArgList args, ConnectionContext* cntx) {
return OpIncrBy(t->GetOpArgs(shard), key, field, &param); return OpIncrBy(t->GetOpArgs(shard), key, field, &param);
}; };
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); OpStatus status = tx->ScheduleSingleHop(std::move(cb));
if (status == OpStatus::OK) { if (status == OpStatus::OK) {
cntx->SendLong(get<int64_t>(param)); builder->SendLong(get<int64_t>(param));
} else { } else {
switch (status) { switch (status) {
case OpStatus::INVALID_VALUE: case OpStatus::INVALID_VALUE:
cntx->SendError("hash value is not an integer"); builder->SendError("hash value is not an integer");
break; break;
case OpStatus::OUT_OF_RANGE: case OpStatus::OUT_OF_RANGE:
cntx->SendError(kIncrOverflow); builder->SendError(kIncrOverflow);
break; break;
default: default:
cntx->SendError(status); builder->SendError(status);
break; break;
} }
} }
} }
void HSetFamily::HIncrByFloat(CmdArgList args, ConnectionContext* cntx) { void HSetFamily::HIncrByFloat(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view field = ArgS(args, 1); string_view field = ArgS(args, 1);
string_view incrs = ArgS(args, 2); string_view incrs = ArgS(args, 2);
double dval = 0; double dval = 0;
if (!absl::SimpleAtod(incrs, &dval)) { if (!absl::SimpleAtod(incrs, &dval)) {
return cntx->SendError(kInvalidFloatErr); return builder->SendError(kInvalidFloatErr);
} }
IncrByParam param{dval}; IncrByParam param{dval};
@ -976,55 +976,55 @@ void HSetFamily::HIncrByFloat(CmdArgList args, ConnectionContext* cntx) {
return OpIncrBy(t->GetOpArgs(shard), key, field, &param); return OpIncrBy(t->GetOpArgs(shard), key, field, &param);
}; };
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); OpStatus status = tx->ScheduleSingleHop(std::move(cb));
if (status == OpStatus::OK) { if (status == OpStatus::OK) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->SendDouble(get<double>(param)); rb->SendDouble(get<double>(param));
} else { } else {
switch (status) { switch (status) {
case OpStatus::INVALID_VALUE: case OpStatus::INVALID_VALUE:
cntx->SendError("hash value is not a float"); builder->SendError("hash value is not a float");
break; break;
default: default:
cntx->SendError(status); builder->SendError(status);
break; break;
} }
} }
} }
void HSetFamily::HKeys(CmdArgList args, ConnectionContext* cntx) { void HSetFamily::HKeys(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
HGetGeneric(args, cntx, FIELDS); HGetGeneric(args, FIELDS, tx, builder);
} }
void HSetFamily::HVals(CmdArgList args, ConnectionContext* cntx) { void HSetFamily::HVals(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
HGetGeneric(args, cntx, VALUES); HGetGeneric(args, VALUES, tx, builder);
} }
void HSetFamily::HGetAll(CmdArgList args, ConnectionContext* cntx) { void HSetFamily::HGetAll(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
HGetGeneric(args, cntx, GetAllMode::FIELDS | GetAllMode::VALUES); HGetGeneric(args, GetAllMode::FIELDS | GetAllMode::VALUES, tx, builder);
} }
void HSetFamily::HScan(CmdArgList args, ConnectionContext* cntx) { void HSetFamily::HScan(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
std::string_view key = ArgS(args, 0); std::string_view key = ArgS(args, 0);
std::string_view token = ArgS(args, 1); std::string_view token = ArgS(args, 1);
uint64_t cursor = 0; uint64_t cursor = 0;
if (!absl::SimpleAtoi(token, &cursor)) { if (!absl::SimpleAtoi(token, &cursor)) {
return cntx->SendError("invalid cursor"); return builder->SendError("invalid cursor");
} }
// HSCAN key cursor [MATCH pattern] [COUNT count] // HSCAN key cursor [MATCH pattern] [COUNT count]
if (args.size() > 6) { if (args.size() > 6) {
DVLOG(1) << "got " << args.size() << " this is more than it should be"; DVLOG(1) << "got " << args.size() << " this is more than it should be";
return cntx->SendError(kSyntaxErr); return builder->SendError(kSyntaxErr);
} }
OpResult<ScanOpts> ops = ScanOpts::TryFrom(args.subspan(2)); OpResult<ScanOpts> ops = ScanOpts::TryFrom(args.subspan(2));
if (!ops) { if (!ops) {
DVLOG(1) << "HScan invalid args - return " << ops << " to the user"; DVLOG(1) << "HScan invalid args - return " << ops << " to the user";
return cntx->SendError(ops.status()); return builder->SendError(ops.status());
} }
ScanOpts scan_op = ops.value(); ScanOpts scan_op = ops.value();
@ -1033,8 +1033,8 @@ void HSetFamily::HScan(CmdArgList args, ConnectionContext* cntx) {
return OpScan(t->GetOpArgs(shard), key, &cursor, scan_op); return OpScan(t->GetOpArgs(shard), key, &cursor, scan_op);
}; };
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); auto* rb = static_cast<RedisReplyBuilder*>(builder);
OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); OpResult<StringVec> result = tx->ScheduleSingleHopT(std::move(cb));
if (result.status() != OpStatus::WRONG_TYPE) { if (result.status() != OpStatus::WRONG_TYPE) {
rb->StartArray(2); rb->StartArray(2);
rb->SendBulkString(absl::StrCat(cursor)); rb->SendBulkString(absl::StrCat(cursor));
@ -1043,17 +1043,18 @@ void HSetFamily::HScan(CmdArgList args, ConnectionContext* cntx) {
rb->SendBulkString(k); rb->SendBulkString(k);
} }
} else { } else {
cntx->SendError(result.status()); builder->SendError(result.status());
} }
} }
void HSetFamily::HSet(CmdArgList args, ConnectionContext* cntx) { void HSetFamily::HSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view cmd{cntx->cid->name()}; string_view cmd{cntx->cid->name()};
if (args.size() % 2 != 1) { if (args.size() % 2 != 1) {
return cntx->SendError(facade::WrongNumArgsError(cmd), kSyntaxErrType); return builder->SendError(facade::WrongNumArgsError(cmd), kSyntaxErrType);
} }
args.remove_prefix(1); args.remove_prefix(1);
@ -1061,16 +1062,16 @@ void HSetFamily::HSet(CmdArgList args, ConnectionContext* cntx) {
return OpSet(t->GetOpArgs(shard), key, args); return OpSet(t->GetOpArgs(shard), key, args);
}; };
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb));
if (result && cmd == "HSET") { if (result && cmd == "HSET") {
cntx->SendLong(*result); builder->SendLong(*result);
} else { } else {
cntx->SendError(result.status()); builder->SendError(result.status());
} }
} }
void HSetFamily::HSetNx(CmdArgList args, ConnectionContext* cntx) { void HSetFamily::HSetNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
args.remove_prefix(1); args.remove_prefix(1);
@ -1078,15 +1079,15 @@ void HSetFamily::HSetNx(CmdArgList args, ConnectionContext* cntx) {
return OpSet(t->GetOpArgs(shard), key, args, OpSetParams{.skip_if_exists = true}); return OpSet(t->GetOpArgs(shard), key, args, OpSetParams{.skip_if_exists = true});
}; };
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
cntx->SendLong(*result); builder->SendLong(*result);
} else { } else {
cntx->SendError(result.status()); builder->SendError(result.status());
} }
} }
void HSetFamily::HStrLen(CmdArgList args, ConnectionContext* cntx) { void HSetFamily::HStrLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
string_view field = ArgS(args, 1); string_view field = ArgS(args, 1);
@ -1094,11 +1095,11 @@ void HSetFamily::HStrLen(CmdArgList args, ConnectionContext* cntx) {
return OpStrLen(t->GetOpArgs(shard), key, field); return OpStrLen(t->GetOpArgs(shard), key, field);
}; };
OpResult<size_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); OpResult<size_t> result = tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
cntx->SendLong(*result); builder->SendLong(*result);
} else { } else {
cntx->SendError(result.status()); builder->SendError(result.status());
} }
} }
@ -1110,10 +1111,10 @@ void StrVecEmplaceBack(StringVec& str_vec, const listpackEntry& lp) {
str_vec.emplace_back(absl::StrCat(lp.lval)); str_vec.emplace_back(absl::StrCat(lp.lval));
} }
void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) { void HSetFamily::HRandField(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
if (args.size() > 3) { if (args.size() > 3) {
DVLOG(1) << "Wrong number of command arguments: " << args.size(); DVLOG(1) << "Wrong number of command arguments: " << args.size();
return cntx->SendError(kSyntaxErr); return builder->SendError(kSyntaxErr);
} }
string_view key = ArgS(args, 0); string_view key = ArgS(args, 0);
@ -1121,19 +1122,19 @@ void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) {
bool with_values = false; bool with_values = false;
if ((args.size() > 1) && (!SimpleAtoi(ArgS(args, 1), &count))) { if ((args.size() > 1) && (!SimpleAtoi(ArgS(args, 1), &count))) {
return cntx->SendError("count value is not an integer", kSyntaxErrType); return builder->SendError("count value is not an integer", kSyntaxErrType);
} }
if (args.size() == 3) { if (args.size() == 3) {
string arg = absl::AsciiStrToUpper(ArgS(args, 2)); string arg = absl::AsciiStrToUpper(ArgS(args, 2));
if (arg != "WITHVALUES") if (arg != "WITHVALUES")
return cntx->SendError(kSyntaxErr); return builder->SendError(kSyntaxErr);
else else
with_values = true; with_values = true;
} }
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<StringVec> { auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<StringVec> {
auto& db_slice = cntx->ns->GetDbSlice(shard->shard_id()); auto& db_slice = t->GetDbSlice(shard->shard_id());
DbContext db_context = t->GetDbContext(); DbContext db_context = t->GetDbContext();
auto it_res = db_slice.FindReadOnly(db_context, key, OBJ_HASH); auto it_res = db_slice.FindReadOnly(db_context, key, OBJ_HASH);
@ -1213,8 +1214,8 @@ void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) {
return str_vec; return str_vec;
}; };
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); auto* rb = static_cast<RedisReplyBuilder*>(builder);
OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); OpResult<StringVec> result = tx->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
if ((result->size() == 1) && (args.size() == 1)) if ((result->size() == 1) && (args.size() == 1))
rb->SendBulkString(result->front()); rb->SendBulkString(result->front());
@ -1226,7 +1227,7 @@ void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) {
else else
rb->SendEmptyArray(); rb->SendEmptyArray();
} else { } else {
cntx->SendError(result.status()); builder->SendError(result.status());
} }
} }

View file

@ -12,9 +12,9 @@
namespace dfly { namespace dfly {
class ConnectionContext;
class CommandRegistry; class CommandRegistry;
class StringMap; class StringMap;
class Transaction;
using facade::OpResult; using facade::OpResult;
using facade::OpStatus; using facade::OpStatus;
@ -34,24 +34,25 @@ class HSetFamily {
PrimeValue* pv); PrimeValue* pv);
private: private:
// TODO: to move it to anonymous namespace in cc file. using SinkReplyBuilder = facade::SinkReplyBuilder;
static void HExpire(CmdArgList args, ConnectionContext* cntx); static void HExpire(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void HDel(CmdArgList args, ConnectionContext* cntx); static void HDel(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void HLen(CmdArgList args, ConnectionContext* cntx); static void HLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void HExists(CmdArgList args, ConnectionContext* cntx); static void HExists(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void HGet(CmdArgList args, ConnectionContext* cntx); static void HGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void HMGet(CmdArgList args, ConnectionContext* cntx); static void HMGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void HIncrBy(CmdArgList args, ConnectionContext* cntx); static void HIncrBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void HKeys(CmdArgList args, ConnectionContext* cntx); static void HKeys(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void HVals(CmdArgList args, ConnectionContext* cntx); static void HVals(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void HGetAll(CmdArgList args, ConnectionContext* cntx); static void HGetAll(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void HIncrByFloat(CmdArgList args, ConnectionContext* cntx); static void HIncrByFloat(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void HScan(CmdArgList args, ConnectionContext* cntx); static void HScan(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void HSet(CmdArgList args, ConnectionContext* cntx); static void HSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
static void HSetNx(CmdArgList args, ConnectionContext* cntx); ConnectionContext* cntx);
static void HStrLen(CmdArgList args, ConnectionContext* cntx); static void HSetNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void HRandField(CmdArgList args, ConnectionContext* cntx); static void HStrLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
static void HRandField(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder);
}; };
} // namespace dfly } // namespace dfly