refactor: conn_context and reply_builder refactoring (#2251)

* refactor: conn_context and reply_builder refactoring
This commit is contained in:
Borys 2023-12-06 08:23:32 +02:00 committed by GitHub
parent 1900e499ba
commit 33d0879416
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
26 changed files with 1424 additions and 1314 deletions

View file

@ -1115,7 +1115,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
dfly_cntx->transaction->InitByArgs(dfly_cntx->conn_state.db_index, args_no_cmd);
if (status != OpStatus::OK)
return (*cntx)->SendError(status);
return cntx->SendError(status);
}
} else {
DCHECK(dfly_cntx->transaction == nullptr);
@ -1126,7 +1126,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
if (!dist_trans->IsMulti()) { // Multi command initialize themself based on their mode.
if (auto st = dist_trans->InitByArgs(dfly_cntx->conn_state.db_index, args_no_cmd);
st != OpStatus::OK)
return (*cntx)->SendError(st);
return cntx->SendError(st);
}
dfly_cntx->transaction = dist_trans.get();
@ -1184,7 +1184,7 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo
DCHECK(!cid->Validate(tail_args));
if (auto err = VerifyCommandExecution(cid, cntx); err) {
(*cntx)->SendError(std::move(*err));
cntx->SendError(std::move(*err));
return true; // return false only for internal error aborts
}
@ -1462,7 +1462,7 @@ absl::flat_hash_map<std::string, unsigned> Service::UknownCmdMap() const {
void Service::Quit(CmdArgList args, ConnectionContext* cntx) {
if (cntx->protocol() == facade::Protocol::REDIS)
(*cntx)->SendOk();
cntx->SendOk();
using facade::SinkReplyBuilder;
SinkReplyBuilder* builder = cntx->reply_builder();
@ -1474,11 +1474,11 @@ void Service::Quit(CmdArgList args, ConnectionContext* cntx) {
void Service::Multi(CmdArgList args, ConnectionContext* cntx) {
if (cntx->conn_state.exec_info.IsCollecting()) {
return (*cntx)->SendError("MULTI calls can not be nested");
return cntx->SendError("MULTI calls can not be nested");
}
cntx->conn_state.exec_info.state = ConnectionState::ExecInfo::EXEC_COLLECT;
// TODO: to protect against huge exec transactions.
return (*cntx)->SendOk();
return cntx->SendOk();
}
void Service::Watch(CmdArgList args, ConnectionContext* cntx) {
@ -1486,7 +1486,7 @@ void Service::Watch(CmdArgList args, ConnectionContext* cntx) {
// Skip if EXEC will already fail due previous WATCH.
if (exec_info.watched_dirty.load(memory_order_relaxed)) {
return (*cntx)->SendOk();
return cntx->SendOk();
}
atomic_uint32_t keys_existed = 0;
@ -1508,12 +1508,12 @@ void Service::Watch(CmdArgList args, ConnectionContext* cntx) {
exec_info.watched_keys.emplace_back(cntx->db_index(), ArgS(args, i));
}
return (*cntx)->SendOk();
return cntx->SendOk();
}
void Service::Unwatch(CmdArgList args, ConnectionContext* cntx) {
UnwatchAllKeys(cntx);
return (*cntx)->SendOk();
return cntx->SendOk();
}
template <typename F> void WithReplies(CapturingReplyBuilder* crb, ConnectionContext* cntx, F&& f) {
@ -1593,14 +1593,15 @@ void Service::CallFromScript(ConnectionContext* cntx, Interpreter::CallArgs& ca)
void Service::Eval(CmdArgList args, ConnectionContext* cntx) {
string_view body = ArgS(args, 0);
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
if (body.empty()) {
return (*cntx)->SendNull();
return rb->SendNull();
}
BorrowedInterpreter interpreter{cntx};
auto res = server_family_.script_mgr()->Insert(body, interpreter);
if (!res)
return (*cntx)->SendError(res.error().Format(), facade::kScriptErrType);
return rb->SendError(res.error().Format(), facade::kScriptErrType);
string sha{std::move(res.value())};
@ -1677,7 +1678,7 @@ optional<bool> StartMultiEval(DbIndex dbid, CmdArgList keys, ScriptMgr::ScriptPa
string err = StrCat(
"Multi mode conflict when running eval in multi transaction. Multi mode is: ", multi_mode,
" eval mode is: ", script_mode);
(*cntx)->SendError(err);
cntx->SendError(err);
return nullopt;
}
return false;
@ -1758,12 +1759,12 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
// Sanitizing the input to avoid code injection.
if (eval_args.sha.size() != 40 || !IsSHA(eval_args.sha)) {
return (*cntx)->SendError(facade::kScriptNotFound);
return cntx->SendError(facade::kScriptNotFound);
}
auto params = LoadScipt(eval_args.sha, server_family_.script_mgr(), interpreter);
if (!params)
return (*cntx)->SendError(facade::kScriptNotFound);
return cntx->SendError(facade::kScriptNotFound);
string error;
@ -1854,7 +1855,7 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
if (result == Interpreter::RUN_ERR) {
string resp = StrCat("Error running script (call to ", eval_args.sha, "): ", error);
return (*cntx)->SendError(resp, facade::kScriptErrType);
return cntx->SendError(resp, facade::kScriptErrType);
}
CHECK(result == Interpreter::RUN_OK);
@ -1862,14 +1863,14 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
SinkReplyBuilder::ReplyAggregator agg(cntx->reply_builder());
EvalSerializer ser{static_cast<RedisReplyBuilder*>(cntx->reply_builder())};
if (!interpreter->IsResultSafe()) {
(*cntx)->SendError("reached lua stack limit");
cntx->SendError("reached lua stack limit");
} else {
interpreter->SerializeResult(&ser);
}
}
void Service::Discard(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = (*cntx).operator->();
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
if (!cntx->conn_state.exec_info.IsCollecting()) {
return rb->SendError("DISCARD without MULTI");
@ -1976,7 +1977,7 @@ void StartMultiExec(DbIndex dbid, Transaction* trans, ConnectionState::ExecInfo*
}
void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = (*cntx).operator->();
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
absl::Cleanup exec_clear = [&cntx] { MultiCleanup(cntx); };
@ -2047,7 +2048,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
if (scmd.Cid()->IsTransactional()) {
OpStatus st = cntx->transaction->InitByArgs(cntx->conn_state.db_index, args);
if (st != OpStatus::OK) {
(*cntx)->SendError(st);
cntx->SendError(st);
break;
}
}
@ -2120,7 +2121,7 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) {
shard_set->pool()->DispatchBrief(std::move(cb));
}
(*cntx)->SendLong(num_published);
cntx->SendLong(num_published);
}
void Service::Subscribe(CmdArgList args, ConnectionContext* cntx) {
@ -2154,34 +2155,35 @@ void Service::Function(CmdArgList args, ConnectionContext* cntx) {
string_view sub_cmd = ArgS(args, 0);
if (sub_cmd == "FLUSH") {
return (*cntx)->SendOk();
return cntx->SendOk();
}
string err = UnknownSubCmd(sub_cmd, "FUNCTION");
return (*cntx)->SendError(err, kSyntaxErrType);
return cntx->SendError(err, kSyntaxErrType);
}
void Service::PubsubChannels(string_view pattern, ConnectionContext* cntx) {
(*cntx)->SendStringArr(ServerState::tlocal()->channel_store()->ListChannels(pattern));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
rb->SendStringArr(ServerState::tlocal()->channel_store()->ListChannels(pattern));
}
void Service::PubsubPatterns(ConnectionContext* cntx) {
size_t pattern_count = ServerState::tlocal()->channel_store()->PatternCount();
(*cntx)->SendLong(pattern_count);
cntx->SendLong(pattern_count);
}
void Service::Monitor(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "starting monitor on this connection: " << cntx->conn()->GetClientId();
// we are registering the current connection for all threads so they will be aware of
// this connection, to send to it any command
(*cntx)->SendOk();
cntx->SendOk();
cntx->ChangeMonitor(true /* start */);
}
void Service::Pubsub(CmdArgList args, ConnectionContext* cntx) {
if (args.size() < 1) {
(*cntx)->SendError(WrongNumArgsError(cntx->cid->name()));
cntx->SendError(WrongNumArgsError(cntx->cid->name()));
return;
}
@ -2198,7 +2200,8 @@ void Service::Pubsub(CmdArgList args, ConnectionContext* cntx) {
"HELP",
"\tPrints this help."};
(*cntx)->SendSimpleStrArr(help_arr);
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
rb->SendSimpleStrArr(help_arr);
return;
}
@ -2212,7 +2215,7 @@ void Service::Pubsub(CmdArgList args, ConnectionContext* cntx) {
} else if (subcmd == "NUMPAT") {
PubsubPatterns(cntx);
} else {
(*cntx)->SendError(UnknownSubCmd(subcmd, "PUBSUB"));
cntx->SendError(UnknownSubCmd(subcmd, "PUBSUB"));
}
}
@ -2224,28 +2227,29 @@ void Service::Command(CmdArgList args, ConnectionContext* cntx) {
}
});
auto serialize_command = [&cntx](string_view name, const CommandId& cid) {
(*cntx)->StartArray(6);
(*cntx)->SendSimpleString(cid.name());
(*cntx)->SendLong(cid.arity());
(*cntx)->StartArray(CommandId::OptCount(cid.opt_mask()));
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto serialize_command = [&rb](string_view name, const CommandId& cid) {
rb->StartArray(6);
rb->SendSimpleString(cid.name());
rb->SendLong(cid.arity());
rb->StartArray(CommandId::OptCount(cid.opt_mask()));
for (uint32_t i = 0; i < 32; ++i) {
unsigned obit = (1u << i);
if (cid.opt_mask() & obit) {
const char* name = CO::OptName(CO::CommandOpt{obit});
(*cntx)->SendSimpleString(name);
rb->SendSimpleString(name);
}
}
(*cntx)->SendLong(cid.first_key_pos());
(*cntx)->SendLong(cid.last_key_pos());
(*cntx)->SendLong(cid.opt_mask() & CO::INTERLEAVED_KEYS ? 2 : 1);
rb->SendLong(cid.first_key_pos());
rb->SendLong(cid.last_key_pos());
rb->SendLong(cid.opt_mask() & CO::INTERLEAVED_KEYS ? 2 : 1);
};
// If no arguments are specified, reply with all commands
if (args.empty()) {
(*cntx)->StartArray(cmd_cnt);
rb->StartArray(cmd_cnt);
registry_.Traverse([&](string_view name, const CommandId& cid) {
if (cid.opt_mask() & CO::HIDDEN)
return;
@ -2259,7 +2263,7 @@ void Service::Command(CmdArgList args, ConnectionContext* cntx) {
// COUNT
if (subcmd == "COUNT") {
return (*cntx)->SendLong(cmd_cnt);
return cntx->SendLong(cmd_cnt);
}
// INFO [cmd]
@ -2268,16 +2272,16 @@ void Service::Command(CmdArgList args, ConnectionContext* cntx) {
string_view cmd = ArgS(args, 1);
if (const auto* cid = registry_.Find(cmd); cid) {
(*cntx)->StartArray(1);
rb->StartArray(1);
serialize_command(cmd, *cid);
} else {
(*cntx)->SendNull();
rb->SendNull();
}
return;
}
return (*cntx)->SendError(kSyntaxErr, kSyntaxErrType);
return cntx->SendError(kSyntaxErr, kSyntaxErrType);
}
VarzValue::Map Service::GetVarzStats() {