mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
chore: get back on the decision to put a hard limit on command interface (#4203)
* chore: get back on the decision to put a hard limit on command interface Limiting commands to only Transaction* and SinkReplyBuilder does not hold. We need sometimes to access context fields for multitude of reasons. But I do not want to pass the huge ConnectionContext object because, it's hard then to track unusual access patterns. The compromise: to introduce CommandContext that currently has tx, rb and extended fields. It will be relatively easy to identify irregular access patterns by tracking the extended field. This commit is the first one in series of probably 10-15 commits. No functional changes here. --------- Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
3327e1a908
commit
45f8e8446f
4 changed files with 132 additions and 127 deletions
|
@ -135,11 +135,19 @@ optional<facade::ErrorReply> CommandId::Validate(CmdArgList tail_args) const {
|
|||
|
||||
CommandId&& CommandId::SetHandler(Handler2 f) && {
|
||||
handler_ = [f = std::move(f)](CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder,
|
||||
facade::ConnectionContext*) { f(args, tx, builder); };
|
||||
ConnectionContext*) { f(args, tx, builder); };
|
||||
|
||||
return std::move(*this);
|
||||
}
|
||||
|
||||
CommandId&& CommandId::SetHandler(Handler3 f) && {
|
||||
handler_ = [f = std::move(f)](CmdArgList args, Transaction* tx, facade::SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
f(std::move(args), CommandContext{tx, builder, cntx});
|
||||
};
|
||||
return std::move(*this);
|
||||
};
|
||||
|
||||
CommandRegistry::CommandRegistry() {
|
||||
vector<string> rename_command = GetFlag(FLAGS_rename_command);
|
||||
|
||||
|
|
|
@ -73,6 +73,16 @@ static_assert(!IsEvalKind(""));
|
|||
// Per thread vector of command stats. Each entry is {cmd_calls, cmd_latency_agg in usec}.
|
||||
using CmdCallStats = std::pair<uint64_t, uint64_t>;
|
||||
|
||||
struct CommandContext {
|
||||
CommandContext(Transaction* _tx, facade::SinkReplyBuilder* _rb, ConnectionContext* cntx)
|
||||
: tx(_tx), rb(_rb), conn_cntx(cntx) {
|
||||
}
|
||||
|
||||
Transaction* tx;
|
||||
facade::SinkReplyBuilder* rb;
|
||||
ConnectionContext* conn_cntx;
|
||||
};
|
||||
|
||||
class CommandId : public facade::CommandId {
|
||||
public:
|
||||
// NOTICE: name must be a literal string, otherwise metrics break! (see cmd_stats_map in
|
||||
|
@ -95,6 +105,8 @@ class CommandId : public facade::CommandId {
|
|||
fu2::function_base<true, true, fu2::capacity_default, false, false,
|
||||
void(CmdArgList, Transaction*, facade::SinkReplyBuilder*) const>;
|
||||
|
||||
using Handler3 = fu2::function_base<true, true, fu2::capacity_default, false, false,
|
||||
void(CmdArgList, const CommandContext&) const>;
|
||||
using ArgValidator = fu2::function_base<true, true, fu2::capacity_default, false, false,
|
||||
std::optional<facade::ErrorReply>(CmdArgList) const>;
|
||||
|
||||
|
@ -130,6 +142,8 @@ class CommandId : public facade::CommandId {
|
|||
|
||||
CommandId&& SetHandler(Handler2 f) &&;
|
||||
|
||||
CommandId&& SetHandler(Handler3 f) &&;
|
||||
|
||||
CommandId&& SetValidator(ArgValidator f) && {
|
||||
validator_ = std::move(f);
|
||||
return std::move(*this);
|
||||
|
|
|
@ -974,14 +974,14 @@ OpStatus SetCmd::CachePrevIfNeeded(const SetCmd::SetParams& params, DbSlice::Ite
|
|||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
void StringFamily::Set(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
void StringFamily::Set(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
facade::CmdArgParser parser{args};
|
||||
|
||||
auto [key, value] = parser.Next<string_view, string_view>();
|
||||
|
||||
SetCmd::SetParams sparams;
|
||||
sparams.memcache_flags = cntx->conn_state.memcache_flag;
|
||||
sparams.memcache_flags = cmnd_cntx.conn_cntx->conn_state.memcache_flag;
|
||||
auto* builder = cmnd_cntx.rb;
|
||||
|
||||
while (parser.HasNext()) {
|
||||
if (auto exp_type = parser.TryMapNext("EX", ExpT::EX, "PX", ExpT::PX, "EXAT", ExpT::EXAT,
|
||||
|
@ -1017,7 +1017,7 @@ void StringFamily::Set(CmdArgList args, Transaction* tx, SinkReplyBuilder* build
|
|||
|
||||
// Remove existed key if the key is expired already
|
||||
if (rel_ms < 0) {
|
||||
tx->ScheduleSingleHop([](const Transaction* tx, EngineShard* es) {
|
||||
cmnd_cntx.tx->ScheduleSingleHop([](const Transaction* tx, EngineShard* es) {
|
||||
ShardArgs args = tx->GetShardArgs(es->shard_id());
|
||||
GenericFamily::OpDel(tx->GetOpArgs(es), args);
|
||||
return OpStatus::OK;
|
||||
|
@ -1050,15 +1050,15 @@ void StringFamily::Set(CmdArgList args, Transaction* tx, SinkReplyBuilder* build
|
|||
StringValue prev;
|
||||
if (sparams.flags & SetCmd::SET_GET)
|
||||
sparams.prev_val = &prev;
|
||||
bool manual_journal = cntx->cid->opt_mask() & CO::NO_AUTOJOURNAL;
|
||||
OpStatus result = SetGeneric(sparams, key, value, manual_journal, tx);
|
||||
bool manual_journal = cmnd_cntx.conn_cntx->cid->opt_mask() & CO::NO_AUTOJOURNAL;
|
||||
OpStatus result = SetGeneric(sparams, key, value, manual_journal, cmnd_cntx.tx);
|
||||
|
||||
if (result == OpStatus::WRONG_TYPE) {
|
||||
return builder->SendError(kWrongTypeErr);
|
||||
}
|
||||
|
||||
if (sparams.flags & SetCmd::SET_GET) {
|
||||
return GetReplies{builder}.Send(std::move(prev));
|
||||
return GetReplies{cmnd_cntx.rb}.Send(std::move(prev));
|
||||
}
|
||||
|
||||
if (result == OpStatus::OK) {
|
||||
|
@ -1074,18 +1074,15 @@ void StringFamily::Set(CmdArgList args, Transaction* tx, SinkReplyBuilder* build
|
|||
builder->SendSetSkipped();
|
||||
}
|
||||
|
||||
void StringFamily::SetEx(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb,
|
||||
ConnectionContext* cntx) {
|
||||
SetExGeneric(true, std::move(args), cntx->cid, tx, rb);
|
||||
void StringFamily::SetEx(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
SetExGeneric(true, std::move(args), cmnd_cntx.conn_cntx->cid, cmnd_cntx.tx, cmnd_cntx.rb);
|
||||
}
|
||||
|
||||
void StringFamily::PSetEx(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb,
|
||||
ConnectionContext* cntx) {
|
||||
SetExGeneric(false, std::move(args), cntx->cid, tx, rb);
|
||||
void StringFamily::PSetEx(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
SetExGeneric(false, std::move(args), cmnd_cntx.conn_cntx->cid, cmnd_cntx.tx, cmnd_cntx.rb);
|
||||
}
|
||||
|
||||
void StringFamily::SetNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
void StringFamily::SetNx(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
// This is the same as calling the "Set" function, only in this case we are
|
||||
// change the value only if the key does not exist. Otherwise the function
|
||||
// will not modify it. in which case it would return 0
|
||||
|
@ -1096,10 +1093,10 @@ void StringFamily::SetNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
|
|||
|
||||
SetCmd::SetParams sparams;
|
||||
sparams.flags |= SetCmd::SET_IF_NOTEXIST;
|
||||
sparams.memcache_flags = cntx->conn_state.memcache_flag;
|
||||
bool manual_journal = cntx->cid->opt_mask() & CO::NO_AUTOJOURNAL;
|
||||
const auto results{SetGeneric(sparams, key, value, manual_journal, tx)};
|
||||
|
||||
sparams.memcache_flags = cmnd_cntx.conn_cntx->conn_state.memcache_flag;
|
||||
bool manual_journal = cmnd_cntx.conn_cntx->cid->opt_mask() & CO::NO_AUTOJOURNAL;
|
||||
const auto results{SetGeneric(sparams, key, value, manual_journal, cmnd_cntx.tx)};
|
||||
auto* builder = cmnd_cntx.rb;
|
||||
if (results == OpStatus::OK) {
|
||||
return builder->SendLong(1); // this means that we successfully set the value
|
||||
}
|
||||
|
@ -1111,7 +1108,7 @@ void StringFamily::SetNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
|
|||
return builder->SendLong(0); // value do exists, we need to report that we didn't change it
|
||||
}
|
||||
|
||||
void StringFamily::Get(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
void StringFamily::Get(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
auto cb = [key = ArgS(args, 0)](Transaction* tx, EngineShard* es) -> OpResult<StringValue> {
|
||||
auto it_res = tx->GetDbSlice(es->shard_id()).FindReadOnly(tx->GetDbContext(), key, OBJ_STRING);
|
||||
if (!it_res.ok())
|
||||
|
@ -1120,11 +1117,10 @@ void StringFamily::Get(CmdArgList args, Transaction* tx, SinkReplyBuilder* build
|
|||
return StringValue::Read(tx->GetDbIndex(), key, (*it_res)->second, es);
|
||||
};
|
||||
|
||||
GetReplies{builder}.Send(tx->ScheduleSingleHopT(cb));
|
||||
GetReplies{cmnd_cntx.rb}.Send(cmnd_cntx.tx->ScheduleSingleHopT(cb));
|
||||
}
|
||||
|
||||
void StringFamily::GetDel(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
void StringFamily::GetDel(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
auto cb = [key = ArgS(args, 0)](Transaction* tx, EngineShard* es) -> OpResult<StringValue> {
|
||||
auto& db_slice = tx->GetDbSlice(es->shard_id());
|
||||
auto it_res = db_slice.FindMutable(tx->GetDbContext(), key, OBJ_STRING);
|
||||
|
@ -1137,41 +1133,41 @@ void StringFamily::GetDel(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu
|
|||
return value;
|
||||
};
|
||||
|
||||
GetReplies{builder}.Send(tx->ScheduleSingleHopT(cb));
|
||||
GetReplies{cmnd_cntx.rb}.Send(cmnd_cntx.tx->ScheduleSingleHopT(cb));
|
||||
}
|
||||
|
||||
void StringFamily::GetSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
void StringFamily::GetSet(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
string_view key = ArgS(args, 0);
|
||||
string_view value = ArgS(args, 1);
|
||||
|
||||
StringValue prev;
|
||||
SetCmd::SetParams sparams;
|
||||
sparams.prev_val = &prev;
|
||||
bool manual_journal = cntx->cid->opt_mask() & CO::NO_AUTOJOURNAL;
|
||||
if (OpStatus status = SetGeneric(sparams, key, value, manual_journal, cntx->transaction);
|
||||
bool manual_journal = cmnd_cntx.conn_cntx->cid->opt_mask() & CO::NO_AUTOJOURNAL;
|
||||
if (OpStatus status = SetGeneric(sparams, key, value, manual_journal, cmnd_cntx.tx);
|
||||
status != OpStatus::OK) {
|
||||
auto* builder = cmnd_cntx.rb;
|
||||
return builder->SendError(status);
|
||||
}
|
||||
|
||||
GetReplies{builder}.Send(std::move(prev));
|
||||
GetReplies{cmnd_cntx.rb}.Send(std::move(prev));
|
||||
}
|
||||
|
||||
void StringFamily::Append(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb) {
|
||||
ExtendGeneric(args, false, tx, rb);
|
||||
void StringFamily::Append(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
ExtendGeneric(args, false, cmnd_cntx.tx, cmnd_cntx.rb);
|
||||
}
|
||||
|
||||
void StringFamily::Prepend(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb) {
|
||||
ExtendGeneric(args, true, tx, rb);
|
||||
void StringFamily::Prepend(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
ExtendGeneric(args, true, cmnd_cntx.tx, cmnd_cntx.rb);
|
||||
}
|
||||
|
||||
void StringFamily::GetEx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
void StringFamily::GetEx(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
CmdArgParser parser{args};
|
||||
string_view key = parser.Next();
|
||||
|
||||
DbSlice::ExpireParams exp_params;
|
||||
bool defined = false;
|
||||
auto* builder = cmnd_cntx.rb;
|
||||
while (parser.HasNext()) {
|
||||
if (auto exp_type = parser.TryMapNext("EX", ExpT::EX, "PX", ExpT::PX, "EXAT", ExpT::EXAT,
|
||||
"PXAT", ExpT::PXAT);
|
||||
|
@ -1230,77 +1226,77 @@ void StringFamily::GetEx(CmdArgList args, Transaction* tx, SinkReplyBuilder* bui
|
|||
return value;
|
||||
};
|
||||
|
||||
GetReplies{builder}.Send(tx->ScheduleSingleHopT(cb));
|
||||
GetReplies{cmnd_cntx.rb}.Send(cmnd_cntx.tx->ScheduleSingleHopT(cb));
|
||||
}
|
||||
|
||||
void StringFamily::Incr(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
void StringFamily::Incr(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
string_view key = ArgS(args, 0);
|
||||
return IncrByGeneric(key, 1, tx, builder);
|
||||
return IncrByGeneric(key, 1, cmnd_cntx.tx, cmnd_cntx.rb);
|
||||
}
|
||||
|
||||
void StringFamily::IncrBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
void StringFamily::IncrBy(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
string_view key = ArgS(args, 0);
|
||||
string_view sval = ArgS(args, 1);
|
||||
int64_t val;
|
||||
|
||||
if (!absl::SimpleAtoi(sval, &val)) {
|
||||
return builder->SendError(kInvalidIntErr);
|
||||
return cmnd_cntx.rb->SendError(kInvalidIntErr);
|
||||
}
|
||||
return IncrByGeneric(key, val, tx, builder);
|
||||
return IncrByGeneric(key, val, cmnd_cntx.tx, cmnd_cntx.rb);
|
||||
}
|
||||
|
||||
void StringFamily::IncrByFloat(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
void StringFamily::IncrByFloat(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
string_view key = ArgS(args, 0);
|
||||
string_view sval = ArgS(args, 1);
|
||||
double val;
|
||||
|
||||
if (!absl::SimpleAtod(sval, &val)) {
|
||||
return builder->SendError(kInvalidFloatErr);
|
||||
return cmnd_cntx.rb->SendError(kInvalidFloatErr);
|
||||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpIncrFloat(t->GetOpArgs(shard), key, val);
|
||||
};
|
||||
|
||||
OpResult<double> result = tx->ScheduleSingleHopT(std::move(cb));
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(builder);
|
||||
OpResult<double> result = cmnd_cntx.tx->ScheduleSingleHopT(std::move(cb));
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(cmnd_cntx.rb);
|
||||
|
||||
DVLOG(2) << "IncrByGeneric " << key << "/" << result.value();
|
||||
if (!result) {
|
||||
return builder->SendError(result.status());
|
||||
return rb->SendError(result.status());
|
||||
}
|
||||
|
||||
rb->SendDouble(result.value());
|
||||
}
|
||||
|
||||
void StringFamily::Decr(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
void StringFamily::Decr(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
string_view key = ArgS(args, 0);
|
||||
return IncrByGeneric(key, -1, tx, builder);
|
||||
return IncrByGeneric(key, -1, cmnd_cntx.tx, cmnd_cntx.rb);
|
||||
}
|
||||
|
||||
void StringFamily::DecrBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
void StringFamily::DecrBy(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
string_view key = ArgS(args, 0);
|
||||
string_view sval = ArgS(args, 1);
|
||||
int64_t val;
|
||||
|
||||
if (!absl::SimpleAtoi(sval, &val)) {
|
||||
return builder->SendError(kInvalidIntErr);
|
||||
return cmnd_cntx.rb->SendError(kInvalidIntErr);
|
||||
}
|
||||
if (val == INT64_MIN) {
|
||||
return builder->SendError(kIncrOverflow);
|
||||
return cmnd_cntx.rb->SendError(kIncrOverflow);
|
||||
}
|
||||
|
||||
return IncrByGeneric(key, -val, tx, builder);
|
||||
return IncrByGeneric(key, -val, cmnd_cntx.tx, cmnd_cntx.rb);
|
||||
}
|
||||
|
||||
void StringFamily::MGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
void StringFamily::MGet(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
DCHECK_GE(args.size(), 1U);
|
||||
|
||||
uint8_t fetch_mask = 0;
|
||||
auto* builder = cmnd_cntx.rb;
|
||||
if (builder->GetProtocol() == Protocol::MEMCACHE) {
|
||||
fetch_mask |= FETCH_MCFLAG;
|
||||
if (cntx->conn_state.memcache_flag & ConnectionState::FETCH_CAS_VER)
|
||||
if (cmnd_cntx.conn_cntx->conn_state.memcache_flag & ConnectionState::FETCH_CAS_VER)
|
||||
fetch_mask |= FETCH_MCVER;
|
||||
}
|
||||
|
||||
|
@ -1312,7 +1308,7 @@ void StringFamily::MGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
|
|||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
OpStatus result = tx->ScheduleSingleHop(std::move(cb));
|
||||
OpStatus result = cmnd_cntx.tx->ScheduleSingleHop(std::move(cb));
|
||||
CHECK_EQ(OpStatus::OK, result);
|
||||
|
||||
// wait for all tiered reads to finish
|
||||
|
@ -1322,11 +1318,11 @@ void StringFamily::MGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
|
|||
absl::FixedArray<optional<GetResp>, 8> res(args.size());
|
||||
|
||||
for (ShardId sid = 0; sid < mget_resp.size(); ++sid) {
|
||||
if (!tx->IsActive(sid))
|
||||
if (!cmnd_cntx.tx->IsActive(sid))
|
||||
continue;
|
||||
|
||||
auto& src = mget_resp[sid];
|
||||
ShardArgs shard_args = tx->GetShardArgs(sid);
|
||||
ShardArgs shard_args = cmnd_cntx.tx->GetShardArgs(sid);
|
||||
unsigned src_indx = 0;
|
||||
for (auto it = shard_args.begin(); it != shard_args.end(); ++it, ++src_indx) {
|
||||
if (!src.resp_arr[src_indx])
|
||||
|
@ -1381,14 +1377,13 @@ void StringFamily::MGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
|
|||
}
|
||||
}
|
||||
|
||||
void StringFamily::MSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
void StringFamily::MSet(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
if (VLOG_IS_ON(2)) {
|
||||
string str;
|
||||
for (size_t i = 1; i < args.size(); ++i) {
|
||||
absl::StrAppend(&str, " ", ArgS(args, i));
|
||||
}
|
||||
LOG(INFO) << "MSET/" << tx->GetUniqueShardCnt() << str;
|
||||
LOG(INFO) << "MSET/" << cmnd_cntx.tx->GetUniqueShardCnt() << str;
|
||||
}
|
||||
|
||||
AggregateStatus result;
|
||||
|
@ -1399,18 +1394,17 @@ void StringFamily::MSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* buil
|
|||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
if (auto status = tx->ScheduleSingleHop(std::move(cb)); status != OpStatus::OK)
|
||||
if (auto status = cmnd_cntx.tx->ScheduleSingleHop(std::move(cb)); status != OpStatus::OK)
|
||||
result = status;
|
||||
|
||||
if (*result == OpStatus::OK) {
|
||||
builder->SendOk();
|
||||
cmnd_cntx.rb->SendOk();
|
||||
} else {
|
||||
builder->SendError(*result);
|
||||
cmnd_cntx.rb->SendError(*result);
|
||||
}
|
||||
}
|
||||
|
||||
void StringFamily::MSetNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
void StringFamily::MSetNx(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
atomic_bool exists{false};
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* es) {
|
||||
|
@ -1429,7 +1423,7 @@ void StringFamily::MSetNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu
|
|||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
tx->Execute(std::move(cb), false);
|
||||
cmnd_cntx.tx->Execute(std::move(cb), false);
|
||||
const bool to_skip = exists.load(memory_order_relaxed);
|
||||
|
||||
AggregateStatus result;
|
||||
|
@ -1442,36 +1436,37 @@ void StringFamily::MSetNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* bu
|
|||
result = status;
|
||||
return OpStatus::OK;
|
||||
};
|
||||
tx->Execute(std::move(epilog_cb), true);
|
||||
cmnd_cntx.tx->Execute(std::move(epilog_cb), true);
|
||||
|
||||
builder->SendLong(to_skip || (*result != OpStatus::OK) ? 0 : 1);
|
||||
cmnd_cntx.rb->SendLong(to_skip || (*result != OpStatus::OK) ? 0 : 1);
|
||||
}
|
||||
|
||||
void StringFamily::StrLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
void StringFamily::StrLen(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
auto cb = [key = ArgS(args, 0)](Transaction* t, EngineShard* shard) {
|
||||
return OpStrLen(t->GetOpArgs(shard), key);
|
||||
};
|
||||
GetReplies{builder}.Send(tx->ScheduleSingleHopT(cb));
|
||||
GetReplies{cmnd_cntx.rb}.Send(cmnd_cntx.tx->ScheduleSingleHopT(cb));
|
||||
}
|
||||
|
||||
void StringFamily::GetRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
void StringFamily::GetRange(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
CmdArgParser parser(args);
|
||||
auto [key, start, end] = parser.Next<string_view, int32_t, int32_t>();
|
||||
|
||||
if (auto err = parser.Error(); err) {
|
||||
return builder->SendError(err->MakeReply());
|
||||
return cmnd_cntx.rb->SendError(err->MakeReply());
|
||||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpGetRange(t->GetOpArgs(shard), key, start, end);
|
||||
};
|
||||
|
||||
GetReplies{builder}.Send(tx->ScheduleSingleHopT(cb));
|
||||
GetReplies{cmnd_cntx.rb}.Send(cmnd_cntx.tx->ScheduleSingleHopT(cb));
|
||||
}
|
||||
|
||||
void StringFamily::SetRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
void StringFamily::SetRange(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
CmdArgParser parser(args);
|
||||
auto [key, start, value] = parser.Next<string_view, int32_t, string_view>();
|
||||
auto* builder = cmnd_cntx.rb;
|
||||
|
||||
if (auto err = parser.Error(); err) {
|
||||
return builder->SendError(err->MakeReply());
|
||||
|
@ -1488,7 +1483,7 @@ void StringFamily::SetRange(CmdArgList args, Transaction* tx, SinkReplyBuilder*
|
|||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpSetRange(t->GetOpArgs(shard), key, start, value);
|
||||
};
|
||||
auto res = tx->ScheduleSingleHopT(cb);
|
||||
auto res = cmnd_cntx.tx->ScheduleSingleHopT(cb);
|
||||
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(builder);
|
||||
if (res.ok())
|
||||
|
@ -1510,28 +1505,28 @@ void StringFamily::SetRange(CmdArgList args, Transaction* tx, SinkReplyBuilder*
|
|||
* 5. The number of seconds until the limit will reset to its maximum capacity.
|
||||
* Equivalent to X-RateLimit-Reset.
|
||||
*/
|
||||
void StringFamily::ClThrottle(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb) {
|
||||
void StringFamily::ClThrottle(CmdArgList args, const CommandContext& cmnd_cntx) {
|
||||
const string_view key = ArgS(args, 0);
|
||||
|
||||
// Allow max burst in number of tokens
|
||||
uint64_t max_burst;
|
||||
const string_view max_burst_str = ArgS(args, 1);
|
||||
if (!absl::SimpleAtoi(max_burst_str, &max_burst)) {
|
||||
return rb->SendError(kInvalidIntErr);
|
||||
return cmnd_cntx.rb->SendError(kInvalidIntErr);
|
||||
}
|
||||
|
||||
// Emit count of tokens per period
|
||||
uint64_t count;
|
||||
const string_view count_str = ArgS(args, 2);
|
||||
if (!absl::SimpleAtoi(count_str, &count)) {
|
||||
return rb->SendError(kInvalidIntErr);
|
||||
return cmnd_cntx.rb->SendError(kInvalidIntErr);
|
||||
}
|
||||
|
||||
// Period of emitting count of tokens
|
||||
uint64_t period;
|
||||
const string_view period_str = ArgS(args, 3);
|
||||
if (!absl::SimpleAtoi(period_str, &period)) {
|
||||
return rb->SendError(kInvalidIntErr);
|
||||
return cmnd_cntx.rb->SendError(kInvalidIntErr);
|
||||
}
|
||||
|
||||
// Apply quantity of tokens now
|
||||
|
@ -1540,33 +1535,33 @@ void StringFamily::ClThrottle(CmdArgList args, Transaction* tx, SinkReplyBuilder
|
|||
const string_view quantity_str = ArgS(args, 4);
|
||||
|
||||
if (!absl::SimpleAtoi(quantity_str, &quantity)) {
|
||||
return rb->SendError(kInvalidIntErr);
|
||||
return cmnd_cntx.rb->SendError(kInvalidIntErr);
|
||||
}
|
||||
}
|
||||
|
||||
if (max_burst > INT64_MAX - 1) {
|
||||
return rb->SendError(kInvalidIntErr);
|
||||
return cmnd_cntx.rb->SendError(kInvalidIntErr);
|
||||
}
|
||||
const int64_t limit = max_burst + 1;
|
||||
|
||||
if (period > UINT64_MAX / 1000 || count == 0 || period * 1000 / count > INT64_MAX) {
|
||||
return rb->SendError(kInvalidIntErr);
|
||||
return cmnd_cntx.rb->SendError(kInvalidIntErr);
|
||||
}
|
||||
const int64_t emission_interval_ms = period * 1000 / count;
|
||||
|
||||
if (emission_interval_ms == 0) {
|
||||
return rb->SendError("zero rates are not supported");
|
||||
return cmnd_cntx.rb->SendError("zero rates are not supported");
|
||||
}
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<array<int64_t, 5>> {
|
||||
return OpThrottle(t->GetOpArgs(shard), key, limit, emission_interval_ms, quantity);
|
||||
};
|
||||
|
||||
Transaction* trans = tx;
|
||||
Transaction* trans = cmnd_cntx.tx;
|
||||
OpResult<array<int64_t, 5>> result = trans->ScheduleSingleHopT(std::move(cb));
|
||||
|
||||
if (result) {
|
||||
RedisReplyBuilder* redis_builder = static_cast<RedisReplyBuilder*>(rb);
|
||||
RedisReplyBuilder* redis_builder = static_cast<RedisReplyBuilder*>(cmnd_cntx.rb);
|
||||
redis_builder->StartArray(result->size());
|
||||
auto& array = result.value();
|
||||
|
||||
|
@ -1588,17 +1583,17 @@ void StringFamily::ClThrottle(CmdArgList args, Transaction* tx, SinkReplyBuilder
|
|||
} else {
|
||||
switch (result.status()) {
|
||||
case OpStatus::WRONG_TYPE:
|
||||
rb->SendError(kWrongTypeErr);
|
||||
cmnd_cntx.rb->SendError(kWrongTypeErr);
|
||||
break;
|
||||
case OpStatus::INVALID_INT:
|
||||
case OpStatus::INVALID_VALUE:
|
||||
rb->SendError(kInvalidIntErr);
|
||||
cmnd_cntx.rb->SendError(kInvalidIntErr);
|
||||
break;
|
||||
case OpStatus::OUT_OF_MEMORY:
|
||||
rb->SendError(kOutOfMemory);
|
||||
cmnd_cntx.rb->SendError(kOutOfMemory);
|
||||
break;
|
||||
default:
|
||||
rb->SendError(result.status());
|
||||
cmnd_cntx.rb->SendError(result.status());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
|
@ -4,7 +4,7 @@
|
|||
|
||||
#pragma once
|
||||
|
||||
#include "facade/facade_types.h"
|
||||
#include "server/command_registry.h"
|
||||
|
||||
namespace facade {
|
||||
class SinkReplyBuilder;
|
||||
|
@ -12,50 +12,38 @@ class SinkReplyBuilder;
|
|||
|
||||
namespace dfly {
|
||||
|
||||
class ConnectionContext;
|
||||
class CommandRegistry;
|
||||
class Transaction;
|
||||
|
||||
using facade::CmdArgList;
|
||||
|
||||
class StringFamily {
|
||||
public:
|
||||
static void Register(CommandRegistry* registry);
|
||||
|
||||
private:
|
||||
using SinkReplyBuilder = facade::SinkReplyBuilder;
|
||||
using CmdArgList = facade::CmdArgList;
|
||||
|
||||
static void Append(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb);
|
||||
static void Decr(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb);
|
||||
static void DecrBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb);
|
||||
static void Get(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb);
|
||||
static void GetDel(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb,
|
||||
ConnectionContext* cntx);
|
||||
static void GetRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb);
|
||||
static void GetSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb,
|
||||
ConnectionContext* cntx);
|
||||
static void GetEx(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb,
|
||||
ConnectionContext* cntx);
|
||||
static void Incr(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb);
|
||||
static void IncrBy(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb);
|
||||
static void IncrByFloat(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb);
|
||||
static void MGet(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb, ConnectionContext* cntx);
|
||||
static void MSet(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb, ConnectionContext* cntx);
|
||||
static void MSetNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb,
|
||||
ConnectionContext* cntx);
|
||||
static void Append(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
static void Decr(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
static void DecrBy(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
static void Get(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
static void GetDel(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
static void GetRange(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
static void GetSet(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
static void GetEx(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
static void Incr(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
static void IncrBy(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
static void IncrByFloat(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
static void MGet(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
static void MSet(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
static void MSetNx(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
|
||||
static void Set(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb, ConnectionContext* cntx);
|
||||
static void SetEx(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb,
|
||||
ConnectionContext* cntx);
|
||||
static void SetNx(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb,
|
||||
ConnectionContext* cntx);
|
||||
static void SetRange(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb);
|
||||
static void StrLen(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb);
|
||||
static void Prepend(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb);
|
||||
static void PSetEx(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb,
|
||||
ConnectionContext* cntx);
|
||||
static void Set(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
static void SetEx(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
static void SetNx(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
static void SetRange(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
static void StrLen(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
static void Prepend(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
static void PSetEx(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
|
||||
static void ClThrottle(CmdArgList args, Transaction* tx, SinkReplyBuilder* rb);
|
||||
static void ClThrottle(CmdArgList args, const CommandContext& cmnd_cntx);
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue