chore: pass SinkReplyBuilder and Transaction explicitly. Part10 (#3998)

This commit is contained in:
Roman Gershman 2024-10-28 16:18:52 +02:00 committed by GitHub
parent 41d8c66a15
commit 1bdd56c973
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 342 additions and 294 deletions

View file

@ -230,10 +230,13 @@ namespace {
const auto kRedisVersion = "6.2.11";
constexpr string_view kS3Prefix = "s3://"sv;
using EngineFunc = void (ServerFamily::*)(CmdArgList args, ConnectionContext* cntx);
using EngineFunc = void (ServerFamily::*)(CmdArgList args, Transaction* tx,
SinkReplyBuilder* builder, ConnectionContext* cntx);
inline CommandId::Handler HandlerFunc(ServerFamily* se, EngineFunc f) {
return [=](CmdArgList args, ConnectionContext* cntx) { return (se->*f)(args, cntx); };
inline CommandId::Handler3 HandlerFunc(ServerFamily* se, EngineFunc f) {
return [=](CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) {
return (se->*f)(args, tx, builder, cntx);
};
}
using CI = CommandId;
@ -306,20 +309,20 @@ std::optional<cron::cronexpr> InferSnapshotCronExpr() {
return std::nullopt;
}
void ClientSetName(CmdArgList args, ConnectionContext* cntx) {
void ClientSetName(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx) {
if (args.size() == 1) {
cntx->conn()->SetName(string{ArgS(args, 0)});
return cntx->SendOk();
return builder->SendOk();
} else {
return cntx->SendError(facade::kSyntaxErr);
return builder->SendError(facade::kSyntaxErr);
}
}
void ClientGetName(CmdArgList args, ConnectionContext* cntx) {
void ClientGetName(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx) {
if (!args.empty()) {
return cntx->SendError(facade::kSyntaxErr);
return builder->SendError(facade::kSyntaxErr);
}
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (auto name = cntx->conn()->GetName(); !name.empty()) {
return rb->SendBulkString(name);
} else {
@ -327,9 +330,10 @@ void ClientGetName(CmdArgList args, ConnectionContext* cntx) {
}
}
void ClientList(CmdArgList args, absl::Span<facade::Listener*> listeners, ConnectionContext* cntx) {
void ClientList(CmdArgList args, absl::Span<facade::Listener*> listeners, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (!args.empty()) {
return cntx->SendError(facade::kSyntaxErr);
return builder->SendError(facade::kSyntaxErr);
}
vector<string> client_info;
@ -350,11 +354,12 @@ void ClientList(CmdArgList args, absl::Span<facade::Listener*> listeners, Connec
string result = absl::StrJoin(client_info, "\n");
result.append("\n");
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
return rb->SendVerbatimString(result);
}
void ClientPauseCmd(CmdArgList args, vector<facade::Listener*> listeners, ConnectionContext* cntx) {
void ClientPauseCmd(CmdArgList args, vector<facade::Listener*> listeners, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
CmdArgParser parser(args);
auto timeout = parser.Next<uint64_t>();
@ -363,7 +368,7 @@ void ClientPauseCmd(CmdArgList args, vector<facade::Listener*> listeners, Connec
pause_state = parser.MapNext("WRITE", ClientPause::WRITE, "ALL", ClientPause::ALL);
}
if (auto err = parser.Error(); err) {
return cntx->SendError(err->MakeReply());
return builder->SendError(err->MakeReply());
}
const auto timeout_ms = timeout * 1ms;
@ -376,21 +381,21 @@ void ClientPauseCmd(CmdArgList args, vector<facade::Listener*> listeners, Connec
Pause(listeners, cntx->ns, cntx->conn(), pause_state, std::move(is_pause_in_progress));
pause_fb_opt) {
pause_fb_opt->Detach();
cntx->SendOk();
builder->SendOk();
} else {
cntx->SendError("Failed to pause all running clients");
builder->SendError("Failed to pause all running clients");
}
}
void ClientTracking(CmdArgList args, ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
void ClientTracking(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (!rb->IsResp3())
return cntx->SendError(
return builder->SendError(
"Client tracking is currently not supported for RESP2. Please use RESP3.");
CmdArgParser parser{args};
if (!parser.HasAtLeast(1) || args.size() > 3)
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
bool is_on = false;
using Tracking = ConnectionState::ClientTracking;
@ -398,7 +403,7 @@ void ClientTracking(CmdArgList args, ConnectionContext* cntx) {
if (parser.Check("ON")) {
is_on = true;
} else if (!parser.Check("OFF")) {
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
}
bool noloop = false;
@ -411,7 +416,7 @@ void ClientTracking(CmdArgList args, ConnectionContext* cntx) {
} else if (parser.Check("NOLOOP")) {
noloop = true;
} else {
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
}
}
@ -419,7 +424,7 @@ void ClientTracking(CmdArgList args, ConnectionContext* cntx) {
if (!noloop && parser.Check("NOLOOP")) {
noloop = true;
} else {
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
}
}
@ -430,50 +435,51 @@ void ClientTracking(CmdArgList args, ConnectionContext* cntx) {
cntx->conn_state.tracking_info_.SetClientTracking(is_on);
cntx->conn_state.tracking_info_.SetOption(option);
cntx->conn_state.tracking_info_.SetNoLoop(noloop);
return cntx->SendOk();
return builder->SendOk();
}
void ClientCaching(CmdArgList args, ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
void ClientCaching(CmdArgList args, SinkReplyBuilder* builder, Transaction* tx,
ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (!rb->IsResp3())
return cntx->SendError(
return builder->SendError(
"Client caching is currently not supported for RESP2. Please use RESP3.");
if (args.size() != 1) {
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
}
using Tracking = ConnectionState::ClientTracking;
CmdArgParser parser{args};
if (parser.Check("YES")) {
if (!cntx->conn_state.tracking_info_.HasOption(Tracking::OPTIN)) {
return cntx->SendError(
return builder->SendError(
"ERR CLIENT CACHING YES is only valid when tracking is enabled in OPTIN mode");
}
} else if (parser.Check("NO")) {
if (!cntx->conn_state.tracking_info_.HasOption(Tracking::OPTOUT)) {
return cntx->SendError(
return builder->SendError(
"ERR CLIENT CACHING NO is only valid when tracking is enabled in OPTOUT mode");
}
cntx->conn_state.tracking_info_.ResetCachingSequenceNumber();
} else {
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
}
bool is_multi = cntx->transaction && cntx->transaction->IsMulti();
bool is_multi = tx && tx->IsMulti();
cntx->conn_state.tracking_info_.SetCachingSequenceNumber(is_multi);
cntx->SendOk();
builder->SendOk();
}
void ClientSetInfo(CmdArgList args, ConnectionContext* cntx) {
void ClientSetInfo(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx) {
if (args.size() != 2) {
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
}
auto* conn = cntx->conn();
if (conn == nullptr) {
return cntx->SendError("No connection");
return builder->SendError("No connection");
}
string type = absl::AsciiStrToUpper(ArgS(args, 0));
@ -484,13 +490,13 @@ void ClientSetInfo(CmdArgList args, ConnectionContext* cntx) {
} else if (type == "LIB-VER") {
conn->SetLibVersion(string(val));
} else {
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
}
cntx->SendOk();
builder->SendOk();
}
void ClientId(CmdArgList args, ConnectionContext* cntx) {
void ClientId(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx) {
if (args.size() != 0) {
return cntx->SendError(kSyntaxErr);
}
@ -498,7 +504,8 @@ void ClientId(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendLong(cntx->conn()->GetClientId());
}
void ClientKill(CmdArgList args, absl::Span<facade::Listener*> listeners, ConnectionContext* cntx) {
void ClientKill(CmdArgList args, absl::Span<facade::Listener*> listeners, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
std::function<bool(facade::Connection * conn)> evaluator;
if (args.size() == 1) {
@ -580,7 +587,7 @@ struct ReplicaOfArgs {
string host;
uint16_t port;
std::optional<cluster::SlotRange> slot_range;
static optional<ReplicaOfArgs> FromCmdArgs(CmdArgList args, ConnectionContext* cntx);
static optional<ReplicaOfArgs> FromCmdArgs(CmdArgList args, SinkReplyBuilder* builder);
bool IsReplicaOfNoOne() const {
return port == 0;
}
@ -597,7 +604,7 @@ struct ReplicaOfArgs {
}
};
optional<ReplicaOfArgs> ReplicaOfArgs::FromCmdArgs(CmdArgList args, ConnectionContext* cntx) {
optional<ReplicaOfArgs> ReplicaOfArgs::FromCmdArgs(CmdArgList args, SinkReplyBuilder* builder) {
ReplicaOfArgs replicaof_args;
CmdArgParser parser(args);
@ -608,21 +615,21 @@ optional<ReplicaOfArgs> ReplicaOfArgs::FromCmdArgs(CmdArgList args, ConnectionCo
replicaof_args.host = parser.Next<string>();
replicaof_args.port = parser.Next<uint16_t>();
if (auto err = parser.Error(); err || replicaof_args.port < 1) {
cntx->SendError("port is out of range");
builder->SendError("port is out of range");
return nullopt;
}
if (parser.HasNext()) {
auto [slot_start, slot_end] = parser.Next<cluster::SlotId, cluster::SlotId>();
replicaof_args.slot_range = cluster::SlotRange{slot_start, slot_end};
if (auto err = parser.Error(); err || !replicaof_args.slot_range->IsValid()) {
cntx->SendError("Invalid slot range");
builder->SendError("Invalid slot range");
return nullopt;
}
}
}
if (auto err = parser.Error(); err) {
cntx->SendError(err->MakeReply());
builder->SendError(err->MakeReply());
return nullopt;
}
return replicaof_args;
@ -630,18 +637,18 @@ optional<ReplicaOfArgs> ReplicaOfArgs::FromCmdArgs(CmdArgList args, ConnectionCo
} // namespace
void SlowLogGet(dfly::CmdArgList args, dfly::ConnectionContext* cntx, std::string_view sub_cmd,
util::ProactorPool* pp) {
void SlowLogGet(dfly::CmdArgList args, std::string_view sub_cmd, util::ProactorPool* pp,
SinkReplyBuilder* builder) {
size_t requested_slow_log_length = UINT32_MAX;
size_t argc = args.size();
if (argc >= 3) {
cntx->SendError(facade::UnknownSubCmd(sub_cmd, "SLOWLOG"), facade::kSyntaxErrType);
builder->SendError(facade::UnknownSubCmd(sub_cmd, "SLOWLOG"), facade::kSyntaxErrType);
return;
} else if (argc == 2) {
string_view length = facade::ArgS(args, 1);
int64_t num;
if ((!absl::SimpleAtoi(length, &num)) || (num < -1)) {
cntx->SendError("count should be greater than or equal to -1");
builder->SendError("count should be greater than or equal to -1");
return;
}
if (num >= 0) {
@ -669,7 +676,7 @@ void SlowLogGet(dfly::CmdArgList args, dfly::ConnectionContext* cntx, std::strin
requested_slow_log_length = std::min(merged_slow_log.size(), requested_slow_log_length);
auto* rb = static_cast<facade::RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<facade::RedisReplyBuilder*>(builder);
rb->StartArray(requested_slow_log_length);
for (size_t i = 0; i < requested_slow_log_length; ++i) {
const auto& entry = merged_slow_log[i].first;
@ -1560,9 +1567,9 @@ void ServerFamily::OnClose(ConnectionContext* cntx) {
dfly_cmd_->OnClose(cntx->conn_state.replication_info.repl_session_id);
}
void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext* cntx) {
void ServerFamily::StatsMC(std::string_view section, SinkReplyBuilder* builder) {
if (!section.empty()) {
return cntx->SendError("");
return builder->SendError("");
}
string info;
@ -1599,8 +1606,8 @@ void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext*
absl::StrAppend(&info, "END\r\n");
MCReplyBuilder* builder = static_cast<MCReplyBuilder*>(cntx->reply_builder());
builder->SendRaw(info);
MCReplyBuilder* mc_builder = static_cast<MCReplyBuilder*>(builder);
mc_builder->SendRaw(info);
#undef ADD_LINE
}
@ -1702,7 +1709,8 @@ LastSaveInfo ServerFamily::GetLastSaveInfo() const {
return last_save_info_;
}
void ServerFamily::DbSize(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::DbSize(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
atomic_ulong num_keys{0};
shard_set->RunBriefInParallel(
@ -1712,7 +1720,7 @@ void ServerFamily::DbSize(CmdArgList args, ConnectionContext* cntx) {
},
[](ShardId) { return true; });
return cntx->SendLong(num_keys.load(memory_order_relaxed));
return builder->SendLong(num_keys.load(memory_order_relaxed));
}
void ServerFamily::CancelBlockingOnThread(std::function<OpStatus(ArgSlice)> status_cb) {
@ -1762,27 +1770,29 @@ void ServerFamily::SendInvalidationMessages() const {
}
}
void ServerFamily::FlushDb(CmdArgList args, ConnectionContext* cntx) {
DCHECK(cntx->transaction);
Drakarys(cntx->transaction, cntx->transaction->GetDbIndex());
void ServerFamily::FlushDb(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
DCHECK(tx);
Drakarys(tx, tx->GetDbIndex());
SendInvalidationMessages();
cntx->reply_builder()->SendOk();
builder->SendOk();
}
void ServerFamily::FlushAll(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::FlushAll(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (args.size() > 1) {
cntx->SendError(kSyntaxErr);
builder->SendError(kSyntaxErr);
return;
}
DCHECK(cntx->transaction);
Drakarys(cntx->transaction, DbSlice::kDbAll);
DCHECK(tx);
Drakarys(tx, DbSlice::kDbAll);
SendInvalidationMessages();
cntx->SendOk();
builder->SendOk();
}
bool ServerFamily::DoAuth(ConnectionContext* cntx, std::string_view username,
std::string_view password) const {
std::string_view password) {
const auto* registry = ServerState::tlocal()->user_registry;
CHECK(registry);
const bool is_authorized = registry->AuthUser(username, password);
@ -1798,7 +1808,8 @@ bool ServerFamily::DoAuth(ConnectionContext* cntx, std::string_view username,
return is_authorized;
}
void ServerFamily::Auth(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::Auth(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (args.size() > 2) {
return cntx->SendError(kSyntaxErr);
}
@ -1833,35 +1844,37 @@ void ServerFamily::Auth(CmdArgList args, ConnectionContext* cntx) {
}
}
void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::Client(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
CmdArgList sub_args = args.subspan(1);
if (sub_cmd == "SETNAME") {
return ClientSetName(sub_args, cntx);
return ClientSetName(sub_args, builder, cntx);
} else if (sub_cmd == "GETNAME") {
return ClientGetName(sub_args, cntx);
return ClientGetName(sub_args, builder, cntx);
} else if (sub_cmd == "LIST") {
return ClientList(sub_args, absl::MakeSpan(listeners_), cntx);
return ClientList(sub_args, absl::MakeSpan(listeners_), builder, cntx);
} else if (sub_cmd == "PAUSE") {
return ClientPauseCmd(sub_args, GetNonPriviligedListeners(), cntx);
return ClientPauseCmd(sub_args, GetNonPriviligedListeners(), builder, cntx);
} else if (sub_cmd == "TRACKING") {
return ClientTracking(sub_args, cntx);
return ClientTracking(sub_args, builder, cntx);
} else if (sub_cmd == "KILL") {
return ClientKill(sub_args, absl::MakeSpan(listeners_), cntx);
return ClientKill(sub_args, absl::MakeSpan(listeners_), builder, cntx);
} else if (sub_cmd == "CACHING") {
return ClientCaching(sub_args, cntx);
return ClientCaching(sub_args, builder, tx, cntx);
} else if (sub_cmd == "SETINFO") {
return ClientSetInfo(sub_args, cntx);
return ClientSetInfo(sub_args, builder, cntx);
} else if (sub_cmd == "ID") {
return ClientId(sub_args, cntx);
return ClientId(sub_args, builder, cntx);
}
LOG_FIRST_N(ERROR, 10) << "Subcommand " << sub_cmd << " not supported";
return cntx->SendError(UnknownSubCmd(sub_cmd, "CLIENT"), kSyntaxErrType);
return builder->SendError(UnknownSubCmd(sub_cmd, "CLIENT"), kSyntaxErrType);
}
void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::Config(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
if (sub_cmd == "HELP") {
@ -1877,13 +1890,13 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
" Prints this help.",
};
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
return rb->SendSimpleStrArr(help_arr);
}
if (sub_cmd == "SET") {
if (args.size() != 3) {
return cntx->SendError(WrongNumArgsError("config|set"));
return builder->SendError(WrongNumArgsError("config|set"));
}
string param = absl::AsciiStrToLower(ArgS(args, 1));
@ -1893,19 +1906,19 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
const char kErrPrefix[] = "CONFIG SET failed (possibly related to argument '";
switch (result) {
case ConfigRegistry::SetResult::OK:
return cntx->SendOk();
return builder->SendOk();
case ConfigRegistry::SetResult::UNKNOWN:
return cntx->SendError(
return builder->SendError(
absl::StrCat("Unknown option or number of arguments for CONFIG SET - '", param, "'"),
kConfigErrType);
case ConfigRegistry::SetResult::READONLY:
return cntx->SendError(absl::StrCat(kErrPrefix, param, "') - can't set immutable config"),
kConfigErrType);
return builder->SendError(
absl::StrCat(kErrPrefix, param, "') - can't set immutable config"), kConfigErrType);
case ConfigRegistry::SetResult::INVALID:
return cntx->SendError(absl::StrCat(kErrPrefix, param, "') - argument can not be set"),
kConfigErrType);
return builder->SendError(absl::StrCat(kErrPrefix, param, "') - argument can not be set"),
kConfigErrType);
}
ABSL_UNREACHABLE();
}
@ -1930,25 +1943,27 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) {
}
}
}
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
return rb->SendStringArr(res, RedisReplyBuilder::MAP);
}
if (sub_cmd == "RESETSTAT") {
ResetStat(cntx->ns);
return cntx->SendOk();
return builder->SendOk();
} else {
return cntx->SendError(UnknownSubCmd(sub_cmd, "CONFIG"), kSyntaxErrType);
return builder->SendError(UnknownSubCmd(sub_cmd, "CONFIG"), kSyntaxErrType);
}
}
void ServerFamily::Debug(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::Debug(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
DebugCmd dbg_cmd{this, cntx};
return dbg_cmd.Run(args);
}
void ServerFamily::Memory(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::Memory(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
MemoryCmd mem_cmd{this, cntx};
return mem_cmd.Run(args);
@ -1962,9 +1977,9 @@ void ServerFamily::BgSaveFb(boost::intrusive_ptr<Transaction> trans) {
}
std::optional<ServerFamily::VersionBasename> ServerFamily::GetVersionAndBasename(
CmdArgList args, ConnectionContext* cntx) {
CmdArgList args, SinkReplyBuilder* builder) {
if (args.size() > 2) {
cntx->SendError(kSyntaxErr);
builder->SendError(kSyntaxErr);
return {};
}
@ -1977,7 +1992,7 @@ std::optional<ServerFamily::VersionBasename> ServerFamily::GetVersionAndBasename
} else if (sub_cmd == "RDB") {
new_version = false;
} else {
cntx->SendError(UnknownSubCmd(sub_cmd, "SAVE"), kSyntaxErrType);
builder->SendError(UnknownSubCmd(sub_cmd, "SAVE"), kSyntaxErrType);
return {};
}
}
@ -1992,40 +2007,42 @@ std::optional<ServerFamily::VersionBasename> ServerFamily::GetVersionAndBasename
// BGSAVE [DF|RDB] [basename]
// TODO add missing [SCHEDULE]
void ServerFamily::BgSave(CmdArgList args, ConnectionContext* cntx) {
auto maybe_res = GetVersionAndBasename(args, cntx);
void ServerFamily::BgSave(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
auto maybe_res = GetVersionAndBasename(args, builder);
if (!maybe_res) {
return;
}
const auto [version, basename] = *maybe_res;
if (auto ec = DoSaveCheckAndStart(version, basename, cntx->transaction); ec) {
cntx->SendError(ec.Format());
if (auto ec = DoSaveCheckAndStart(version, basename, tx); ec) {
builder->SendError(ec.Format());
return;
}
bg_save_fb_.JoinIfNeeded();
bg_save_fb_ = fb2::Fiber("bg_save_fiber", &ServerFamily::BgSaveFb, this,
boost::intrusive_ptr<Transaction>(cntx->transaction));
cntx->SendOk();
boost::intrusive_ptr<Transaction>(tx));
builder->SendOk();
}
// SAVE [DF|RDB] [basename]
// Allows saving the snapshot of the dataset on disk, potentially overriding the format
// and the snapshot name.
void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
auto maybe_res = GetVersionAndBasename(args, cntx);
void ServerFamily::Save(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
auto maybe_res = GetVersionAndBasename(args, builder);
if (!maybe_res) {
return;
}
const auto [version, basename] = *maybe_res;
GenericError ec = DoSave(version, basename, cntx->transaction);
GenericError ec = DoSave(version, basename, tx);
if (ec) {
cntx->SendError(ec.Format());
builder->SendError(ec.Format());
} else {
cntx->SendOk();
builder->SendOk();
}
}
@ -2149,9 +2166,10 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const {
return result;
}
void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (args.size() > 1) {
return cntx->SendError(kSyntaxErr);
return builder->SendError(kSyntaxErr);
}
string section;
@ -2547,11 +2565,12 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
if (should_enter("CLUSTER")) {
append("cluster_enabled", cluster::IsClusterEnabledOrEmulated());
}
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->SendVerbatimString(info);
}
void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::Hello(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
// If no arguments are provided default to RESP2.
bool is_resp3 = false;
bool has_auth = false;
@ -2565,7 +2584,7 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
is_resp3 = proto_version == "3";
bool valid_proto_version = proto_version == "2" || is_resp3;
if (!valid_proto_version) {
cntx->SendError(UnknownCmd("HELLO", args));
builder->SendError(UnknownCmd("HELLO", args));
return;
}
@ -2582,18 +2601,18 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
clientname = ArgS(args, i + 1);
i += 1;
} else {
cntx->SendError(kSyntaxErr);
builder->SendError(kSyntaxErr);
return;
}
}
}
if (has_auth && !DoAuth(cntx, username, password)) {
return cntx->SendError(facade::kAuthRejected);
return builder->SendError(facade::kAuthRejected);
}
if (cntx->req_auth && !cntx->authenticated) {
cntx->SendError(
builder->SendError(
"-NOAUTH HELLO must be called with the client already "
"authenticated, otherwise the HELLO <proto> AUTH <user> <pass> "
"option can be used to authenticate the client and "
@ -2605,7 +2624,7 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
cntx->conn()->SetName(string{clientname});
}
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
int proto_version = 2;
if (is_resp3) {
proto_version = 3;
@ -2633,32 +2652,33 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
rb->SendBulkString((*ServerState::tlocal()).is_master ? "master" : "slave");
}
void ServerFamily::AddReplicaOf(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::AddReplicaOf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
util::fb2::LockGuard lk(replicaof_mu_);
if (ServerState::tlocal()->is_master) {
cntx->SendError("Calling ADDREPLICAOFF allowed only after server is already a replica");
builder->SendError("Calling ADDREPLICAOFF allowed only after server is already a replica");
return;
}
CHECK(replica_);
auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, cntx);
auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, builder);
if (!replicaof_args.has_value()) {
return;
}
if (replicaof_args->IsReplicaOfNoOne()) {
return cntx->SendError("ADDREPLICAOF does not support no one");
return builder->SendError("ADDREPLICAOF does not support no one");
}
LOG(INFO) << "Add Replica " << *replicaof_args;
auto add_replica = make_unique<Replica>(replicaof_args->host, replicaof_args->port, &service_,
master_replid(), replicaof_args->slot_range);
error_code ec = add_replica->Start(cntx);
error_code ec = add_replica->Start(builder);
if (!ec) {
cluster_replicas_.push_back(std::move(add_replica));
}
}
void ServerFamily::ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx,
void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ActionOnConnectionFail on_err) {
std::shared_ptr<Replica> new_replica;
{
@ -2666,11 +2686,11 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx,
// We should not execute replica of command while loading from snapshot.
if (ServerState::tlocal()->is_master && service_.GetGlobalState() == GlobalState::LOADING) {
cntx->SendError(kLoadingErr);
builder->SendError(kLoadingErr);
return;
}
auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, cntx);
auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, builder);
if (!replicaof_args.has_value()) {
return;
}
@ -2692,7 +2712,7 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx,
CHECK_EQ(service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE), GlobalState::ACTIVE)
<< "Server is set to replica no one, yet state is not active!";
return cntx->SendOk();
return builder->SendOk();
}
// If any replication is in progress, stop it, cancellation should kick in immediately
@ -2704,14 +2724,14 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx,
if (auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING);
new_state != GlobalState::LOADING) {
LOG(WARNING) << new_state << " in progress, ignored";
cntx->SendError("Invalid state");
builder->SendError("Invalid state");
return;
}
// If we are called by "Replicate", cntx->transaction will be null but we do not need
// If we are called by "Replicate", tx will be null but we do not need
// to flush anything.
if (cntx->transaction) {
Drakarys(cntx->transaction, DbSlice::kDbAll);
if (tx) {
Drakarys(tx, DbSlice::kDbAll);
}
// Create a new replica and assing it
@ -2730,10 +2750,10 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx,
error_code ec{};
switch (on_err) {
case ActionOnConnectionFail::kReturnOnError:
ec = new_replica->Start(cntx);
ec = new_replica->Start(builder);
break;
case ActionOnConnectionFail::kContinueReplication: // set DF to replicate, and forget about it
new_replica->EnableReplication(cntx);
new_replica->EnableReplication(builder);
break;
};
@ -2756,16 +2776,12 @@ void ServerFamily::StopAllClusterReplicas() {
cluster_replicas_.clear();
}
void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
ReplicaOfInternal(args, cntx, ActionOnConnectionFail::kReturnOnError);
void ServerFamily::ReplicaOf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
ReplicaOfInternal(args, tx, builder, ActionOnConnectionFail::kReturnOnError);
}
void ServerFamily::Replicate(string_view host, string_view port) {
io::NullSink sink;
ConnectionContext cntx{&sink, nullptr, {}};
cntx.ns = &namespaces.GetDefaultNamespace();
cntx.skip_acl_validation = true;
StringVec replicaof_params{string(host), string(port)};
CmdArgVec args_vec;
@ -2773,12 +2789,15 @@ void ServerFamily::Replicate(string_view host, string_view port) {
args_vec.emplace_back(MutableSlice{s.data(), s.size()});
}
CmdArgList args_list = absl::MakeSpan(args_vec);
ReplicaOfInternal(args_list, &cntx, ActionOnConnectionFail::kContinueReplication);
io::NullSink sink;
facade::RedisReplyBuilder rb(&sink);
ReplicaOfInternal(args_list, nullptr, &rb, ActionOnConnectionFail::kContinueReplication);
}
// REPLTAKEOVER <seconds> [SAVE]
// SAVE is used only by tests.
void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::ReplTakeOver(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
VLOG(1) << "ReplTakeOver start";
CmdArgParser parser{args};
@ -2787,19 +2806,19 @@ void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) {
bool save_flag = static_cast<bool>(parser.Check("SAVE"));
if (parser.HasNext())
return cntx->SendError(absl::StrCat("Unsupported option:", string_view(parser.Next())));
return builder->SendError(absl::StrCat("Unsupported option:", string_view(parser.Next())));
if (auto err = parser.Error(); err)
return cntx->SendError(err->MakeReply());
return builder->SendError(err->MakeReply());
// We allow zero timeouts for tests.
if (timeout_sec < 0) {
return cntx->SendError("timeout is negative");
return builder->SendError("timeout is negative");
}
// We return OK, to support idempotency semantics.
if (ServerState::tlocal()->is_master)
return cntx->SendOk();
return builder->SendOk();
util::fb2::LockGuard lk(replicaof_mu_);
@ -2808,31 +2827,32 @@ void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) {
auto info = replica_->GetSummary();
if (!info.full_sync_done) {
return cntx->SendError("Full sync not done");
return builder->SendError("Full sync not done");
}
std::error_code ec = replica_->TakeOver(ArgS(args, 0), save_flag);
if (ec)
return cntx->SendError("Couldn't execute takeover");
return builder->SendError("Couldn't execute takeover");
LOG(INFO) << "Takeover successful, promoting this instance to master.";
SetMasterFlagOnAllThreads(true);
replica_->Stop();
replica_.reset();
return cntx->SendOk();
return builder->SendOk();
}
void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
{
util::fb2::LockGuard lk(replicaof_mu_);
if (!ServerState::tlocal()->is_master) {
return cntx->SendError("Replicating a replica is unsupported");
return builder->SendError("Replicating a replica is unsupported");
}
}
auto err_cb = [&]() mutable {
LOG(ERROR) << "Error in receiving command: " << args;
cntx->SendError(kSyntaxErr);
builder->SendError(kSyntaxErr);
};
if (args.size() % 2 == 1)
@ -2857,7 +2877,7 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
cntx->replica_conn = true;
// The response for 'capa dragonfly' is: <masterid> <syncid> <numthreads> <version>
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(4);
rb->SendSimpleString(master_replid_);
rb->SendSimpleString(sync_id);
@ -2921,8 +2941,9 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
return cntx->SendOk();
}
void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
void ServerFamily::Role(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(builder);
util::fb2::LockGuard lk(replicaof_mu_);
// Thread local var is_master is updated under mutex replicaof_mu_ together with replica_,
// ensuring eventual consistency of is_master. When determining if the server is a replica and
@ -2965,11 +2986,13 @@ void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) {
}
}
void ServerFamily::Script(CmdArgList args, ConnectionContext* cntx) {
script_mgr_->Run(std::move(args), cntx->transaction, cntx->reply_builder());
void ServerFamily::Script(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
script_mgr_->Run(std::move(args), tx, builder);
}
void ServerFamily::LastSave(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::LastSave(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
time_t save_time;
{
util::fb2::LockGuard lk(save_mu_);
@ -2978,8 +3001,9 @@ void ServerFamily::LastSave(CmdArgList args, ConnectionContext* cntx) {
cntx->SendLong(save_time);
}
void ServerFamily::Latency(CmdArgList args, ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
void ServerFamily::Latency(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(builder);
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
if (sub_cmd == "LATEST") {
@ -2990,7 +3014,8 @@ void ServerFamily::Latency(CmdArgList args, ConnectionContext* cntx) {
cntx->SendError(kSyntaxErr);
}
void ServerFamily::ShutdownCmd(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::ShutdownCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (args.size() > 1) {
cntx->SendError(kSyntaxErr);
return;
@ -3014,11 +3039,13 @@ void ServerFamily::ShutdownCmd(CmdArgList args, ConnectionContext* cntx) {
cntx->SendOk();
}
void ServerFamily::Dfly(CmdArgList args, ConnectionContext* cntx) {
dfly_cmd_->Run(args, static_cast<RedisReplyBuilder*>(cntx->reply_builder()), cntx);
void ServerFamily::Dfly(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
dfly_cmd_->Run(args, static_cast<RedisReplyBuilder*>(builder), cntx);
}
void ServerFamily::SlowLog(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::SlowLog(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
if (sub_cmd == "HELP") {
@ -3036,7 +3063,7 @@ void ServerFamily::SlowLog(CmdArgList args, ConnectionContext* cntx) {
"HELP",
" Prints this help.",
};
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->SendSimpleStrArr(help);
return;
}
@ -3057,17 +3084,18 @@ void ServerFamily::SlowLog(CmdArgList args, ConnectionContext* cntx) {
}
if (sub_cmd == "GET") {
return SlowLogGet(args, cntx, sub_cmd, &service_.proactor_pool());
return SlowLogGet(args, sub_cmd, &service_.proactor_pool(), builder);
}
cntx->SendError(UnknownSubCmd(sub_cmd, "SLOWLOG"), kSyntaxErrType);
}
void ServerFamily::Module(CmdArgList args, ConnectionContext* cntx) {
void ServerFamily::Module(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
if (sub_cmd != "LIST")
return cntx->SendError(kSyntaxErr);
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(2);
// Json