diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index db8eb758b..234d42840 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -489,7 +489,7 @@ void DflyCmd::TakeOver(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext VLOG(1) << "Takeover accepted, shutting down."; std::string save_arg = "NOSAVE"; MutableSlice sargs(save_arg); - return sf_->ShutdownCmd(CmdArgList(&sargs, 1), cntx); + return sf_->ShutdownCmd(CmdArgList(&sargs, 1), nullptr, rb, cntx); } void DflyCmd::Expire(CmdArgList args, RedisReplyBuilder* rb, ConnectionContext* cntx) { diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 69ec162c5..e418c4efd 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1619,7 +1619,7 @@ void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view va strcpy(cmd_name, "QUIT"); break; case MemcacheParser::STATS: - server_family_.StatsMC(cmd.key, cntx); + server_family_.StatsMC(cmd.key, mc_builder); return; case MemcacheParser::VERSION: mc_builder->SendSimpleString("VERSION 1.5.0 DF"); diff --git a/src/server/replica.cc b/src/server/replica.cc index df22b1e67..9d66ce4ce 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -90,18 +90,18 @@ Replica::~Replica() { static const char kConnErr[] = "could not connect to master: "; -error_code Replica::Start(ConnectionContext* cntx) { +error_code Replica::Start(facade::SinkReplyBuilder* builder) { VLOG(1) << "Starting replication"; ProactorBase* mythread = ProactorBase::me(); CHECK(mythread); - auto check_connection_error = [this, &cntx](error_code ec, const char* msg) -> error_code { + auto check_connection_error = [this, builder](error_code ec, const char* msg) -> error_code { if (cntx_.IsCancelled()) { - cntx->SendError("replication cancelled"); + builder->SendError("replication cancelled"); return std::make_error_code(errc::operation_canceled); } if (ec) { - cntx->SendError(absl::StrCat(msg, ec.message())); + builder->SendError(absl::StrCat(msg, ec.message())); cntx_.Cancel(); } return ec; @@ -131,17 +131,17 @@ error_code Replica::Start(ConnectionContext* cntx) { // 4. Spawn main coordination fiber. sync_fb_ = fb2::Fiber("main_replication", &Replica::MainReplicationFb, this); - cntx->SendOk(); + builder->SendOk(); return {}; } -void Replica::EnableReplication(ConnectionContext* cntx) { +void Replica::EnableReplication(facade::SinkReplyBuilder* builder) { VLOG(1) << "Enabling replication"; state_mask_.store(R_ENABLED); // set replica state to enabled sync_fb_ = MakeFiber(&Replica::MainReplicationFb, this); // call replication fiber - cntx->SendOk(); + builder->SendOk(); } void Replica::Stop() { diff --git a/src/server/replica.h b/src/server/replica.h index 3dbcd8d2f..ef293e0f1 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -58,12 +58,12 @@ class Replica : ProtocolClient { // Spawns a fiber that runs until link with master is broken or the replication is stopped. // Returns true if initial link with master has been established or // false if it has failed. - std::error_code Start(ConnectionContext* cntx); + std::error_code Start(facade::SinkReplyBuilder* builder); // Sets the server state to have replication enabled. // It is like Start(), but does not attempt to establish // a connection right-away, but instead lets MainReplicationFb do the work. - void EnableReplication(ConnectionContext* cntx); + void EnableReplication(facade::SinkReplyBuilder* builder); void Stop(); // thread-safe diff --git a/src/server/search/search_family.cc b/src/server/search/search_family.cc index 900557929..80bf40cca 100644 --- a/src/server/search/search_family.cc +++ b/src/server/search/search_family.cc @@ -98,7 +98,7 @@ search::SchemaField::TagParams ParseTagParams(CmdArgParser* parser) { #endif optional ParseSchemaOrReply(DocIndex::DataType type, CmdArgParser parser, - ConnectionContext* cntx) { + SinkReplyBuilder* builder) { search::Schema schema; while (parser.HasNext()) { @@ -107,7 +107,7 @@ optional ParseSchemaOrReply(DocIndex::DataType type, CmdArgParse // Verify json path is correct if (type == DocIndex::JSON && !IsValidJsonPath(field)) { - cntx->SendError("Bad json path: " + string{field}); + builder->SendError("Bad json path: " + string{field}); return nullopt; } @@ -119,7 +119,7 @@ optional ParseSchemaOrReply(DocIndex::DataType type, CmdArgParse auto type = parser.MapNext("TAG", SchemaField::TAG, "TEXT", SchemaField::TEXT, "NUMERIC", SchemaField::NUMERIC, "VECTOR", SchemaField::VECTOR); if (auto err = parser.Error(); err) { - cntx->SendError(err->MakeReply()); + builder->SendError(err->MakeReply()); return nullopt; } @@ -133,12 +133,12 @@ optional ParseSchemaOrReply(DocIndex::DataType type, CmdArgParse if (parser.HasError()) { auto err = *parser.Error(); VLOG(1) << "Could not parse vector param " << err.index; - cntx->SendError("Parse error of vector parameters", kSyntaxErrType); + builder->SendError("Parse error of vector parameters", kSyntaxErrType); return nullopt; } if (vector_params.dim == 0) { - cntx->SendError("Knn vector dimension cannot be zero", kSyntaxErrType); + builder->SendError("Knn vector dimension cannot be zero", kSyntaxErrType); return nullopt; } params = vector_params; @@ -172,7 +172,7 @@ optional ParseSchemaOrReply(DocIndex::DataType type, CmdArgParse schema.field_names[field_info.short_name] = field_ident; if (auto err = parser.Error(); err) { - cntx->SendError(err->MakeReply()); + builder->SendError(err->MakeReply()); return nullopt; } @@ -226,7 +226,7 @@ search::QueryParams ParseQueryParams(CmdArgParser* parser) { return params; } -optional ParseSearchParamsOrReply(CmdArgParser parser, ConnectionContext* cntx) { +optional ParseSearchParamsOrReply(CmdArgParser parser, SinkReplyBuilder* builder) { SearchParams params; while (parser.HasNext()) { @@ -269,7 +269,7 @@ optional ParseSearchParamsOrReply(CmdArgParser parser, ConnectionC } if (auto err = parser.Error(); err) { - cntx->SendError(err->MakeReply()); + builder->SendError(err->MakeReply()); return nullopt; } @@ -277,7 +277,7 @@ optional ParseSearchParamsOrReply(CmdArgParser parser, ConnectionC } optional ParseAggregatorParamsOrReply(CmdArgParser parser, - ConnectionContext* cntx) { + SinkReplyBuilder* builder) { AggregateParams params; tie(params.index, params.query) = parser.Next(); @@ -298,7 +298,7 @@ optional ParseAggregatorParamsOrReply(CmdArgParser parser, TODO: Throw an error if the field has no '@' sign at the beginning if (!parsed_field) { - cntx->SendError(absl::StrCat("bad arguments for GROUPBY: Unknown property '", field, + builder->SendError(absl::StrCat("bad arguments for GROUPBY: Unknown property '", field, "'. Did you mean '@", field, "`?")); return nullopt; } */ @@ -314,7 +314,7 @@ optional ParseAggregatorParamsOrReply(CmdArgParser parser, RF::SUM, "AVG", RF::AVG, "MAX", RF::MAX, "MIN", RF::MIN); if (!func_name) { - cntx->SendError(absl::StrCat("reducer function ", parser.Next(), " not found")); + builder->SendError(absl::StrCat("reducer function ", parser.Next(), " not found")); return nullopt; } @@ -361,16 +361,16 @@ optional ParseAggregatorParamsOrReply(CmdArgParser parser, } if (parser.Check("LOAD")) { - cntx->SendError("LOAD cannot be applied after projectors or reducers"); + builder->SendError("LOAD cannot be applied after projectors or reducers"); return nullopt; } - cntx->SendError(absl::StrCat("Unknown clause: ", parser.Peek())); + builder->SendError(absl::StrCat("Unknown clause: ", parser.Peek())); return nullopt; } if (auto err = parser.Error(); err) { - cntx->SendError(err->MakeReply()); + builder->SendError(err->MakeReply()); return nullopt; } @@ -385,8 +385,8 @@ auto SortableValueSender(RedisReplyBuilder* rb) { }; } -void SendSerializedDoc(const SerializedSearchDoc& doc, ConnectionContext* cntx) { - auto* rb = static_cast(cntx->reply_builder()); +void SendSerializedDoc(const SerializedSearchDoc& doc, SinkReplyBuilder* builder) { + auto* rb = static_cast(builder); auto sortable_value_sender = SortableValueSender(rb); rb->SendBulkString(doc.key); @@ -398,7 +398,7 @@ void SendSerializedDoc(const SerializedSearchDoc& doc, ConnectionContext* cntx) } void ReplyWithResults(const SearchParams& params, absl::Span results, - ConnectionContext* cntx) { + SinkReplyBuilder* builder) { size_t total_count = 0; for (const auto& shard_docs : results) total_count += shard_docs.total_hits; @@ -406,12 +406,12 @@ void ReplyWithResults(const SearchParams& params, absl::Span resul size_t result_count = min(total_count - min(total_count, params.limit_offset), params.limit_total); - facade::SinkReplyBuilder::ReplyAggregator agg{cntx->reply_builder()}; + facade::SinkReplyBuilder::ReplyAggregator agg{builder}; bool ids_only = params.IdsOnly(); size_t reply_size = ids_only ? (result_count + 1) : (result_count * 2 + 1); - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); rb->StartArray(reply_size); rb->SendLong(total_count); @@ -431,13 +431,13 @@ void ReplyWithResults(const SearchParams& params, absl::Span resul if (ids_only) rb->SendBulkString(serialized_doc.key); else - SendSerializedDoc(serialized_doc, cntx); + SendSerializedDoc(serialized_doc, builder); } } } void ReplySorted(search::AggregationInfo agg, const SearchParams& params, - absl::Span results, ConnectionContext* cntx) { + absl::Span results, SinkReplyBuilder* builder) { size_t total = 0; vector docs; for (auto& shard_results : results) { @@ -466,8 +466,8 @@ void ReplySorted(search::AggregationInfo agg, const SearchParams& params, if (!params.ShouldReturnField(agg.alias)) agg.alias = ""; - facade::SinkReplyBuilder::ReplyAggregator agg_reply{cntx->reply_builder()}; - auto* rb = static_cast(cntx->reply_builder()); + facade::SinkReplyBuilder::ReplyAggregator agg_reply{builder}; + auto* rb = static_cast(builder); rb->StartArray(reply_size); rb->SendLong(min(total, agg_limit)); for (auto* doc : absl::MakeSpan(docs).subspan(start_idx, result_count)) { @@ -479,15 +479,16 @@ void ReplySorted(search::AggregationInfo agg, const SearchParams& params, if (!agg.alias.empty() && holds_alternative(doc->score)) doc->values[agg.alias] = absl::StrCat(get(doc->score)); - SendSerializedDoc(*doc, cntx); + SendSerializedDoc(*doc, builder); } } } // namespace -void SearchFamily::FtCreate(CmdArgList args, ConnectionContext* cntx) { +void SearchFamily::FtCreate(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { if (cntx->conn_state.db_index != 0) { - return cntx->SendError("Cannot create index on db != 0"sv); + return builder->SendError("Cannot create index on db != 0"sv); } DocIndex index{}; @@ -505,7 +506,7 @@ void SearchFamily::FtCreate(CmdArgList args, ConnectionContext* cntx) { // PREFIX count prefix [prefix ...] if (parser.Check("PREFIX")) { if (size_t num = parser.Next(); num != 1) - return cntx->SendError("Multiple prefixes are not supported"); + return builder->SendError("Multiple prefixes are not supported"); index.prefix = string(parser.Next()); continue; } @@ -520,7 +521,7 @@ void SearchFamily::FtCreate(CmdArgList args, ConnectionContext* cntx) { // SCHEMA if (parser.Check("SCHEMA")) { - auto schema = ParseSchemaOrReply(index.type, parser.Tail(), cntx); + auto schema = ParseSchemaOrReply(index.type, parser.Tail(), builder); if (!schema) return; index.schema = std::move(*schema); @@ -532,11 +533,11 @@ void SearchFamily::FtCreate(CmdArgList args, ConnectionContext* cntx) { } if (auto err = parser.Error(); err) - return cntx->SendError(err->MakeReply()); + return builder->SendError(err->MakeReply()); // Check if index already exists atomic_uint exists_cnt = 0; - cntx->transaction->Execute( + tx->Execute( [idx_name, &exists_cnt](auto* tx, auto* es) { if (es->search_indices()->GetIndex(idx_name) != nullptr) exists_cnt.fetch_add(1, std::memory_order_relaxed); @@ -547,29 +548,29 @@ void SearchFamily::FtCreate(CmdArgList args, ConnectionContext* cntx) { DCHECK(exists_cnt == 0u || exists_cnt == shard_set->size()); if (exists_cnt.load(memory_order_relaxed) > 0) { - cntx->transaction->Conclude(); - return cntx->SendError("Index already exists"); + tx->Conclude(); + return builder->SendError("Index already exists"); } auto idx_ptr = make_shared(std::move(index)); - cntx->transaction->Execute( + tx->Execute( [idx_name, idx_ptr](auto* tx, auto* es) { es->search_indices()->InitIndex(tx->GetOpArgs(es), idx_name, idx_ptr); return OpStatus::OK; }, true); - cntx->SendOk(); + builder->SendOk(); } -void SearchFamily::FtAlter(CmdArgList args, ConnectionContext* cntx) { +void SearchFamily::FtAlter(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { CmdArgParser parser{args}; string_view idx_name = parser.Next(); parser.ExpectTag("SCHEMA"); parser.ExpectTag("ADD"); if (auto err = parser.Error(); err) - return cntx->SendError(err->MakeReply()); + return builder->SendError(err->MakeReply()); // First, extract existing index info shared_ptr index_info; @@ -581,17 +582,17 @@ void SearchFamily::FtAlter(CmdArgList args, ConnectionContext* cntx) { index_info = make_shared(idx->GetInfo().base_index); return OpStatus::OK; }; - cntx->transaction->Execute(idx_cb, false); + tx->Execute(idx_cb, false); if (!index_info) { - cntx->transaction->Conclude(); - return cntx->SendError("Index not found"); + tx->Conclude(); + return builder->SendError("Index not found"); } // Parse additional schema - optional new_fields = ParseSchemaOrReply(index_info->type, parser, cntx); + optional new_fields = ParseSchemaOrReply(index_info->type, parser, builder); if (!new_fields) { - cntx->transaction->Conclude(); + tx->Conclude(); return; } @@ -610,17 +611,17 @@ void SearchFamily::FtAlter(CmdArgList args, ConnectionContext* cntx) { es->search_indices()->InitIndex(tx->GetOpArgs(es), idx_name, index_info); return OpStatus::OK; }; - cntx->transaction->Execute(upd_cb, true); + tx->Execute(upd_cb, true); - cntx->SendOk(); + builder->SendOk(); } -void SearchFamily::FtDropIndex(CmdArgList args, ConnectionContext* cntx) { +void SearchFamily::FtDropIndex(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view idx_name = ArgS(args, 0); // TODO: Handle optional DD param atomic_uint num_deleted{0}; - cntx->transaction->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { + tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { if (es->search_indices()->DropIndex(idx_name)) num_deleted.fetch_add(1); return OpStatus::OK; @@ -628,17 +629,17 @@ void SearchFamily::FtDropIndex(CmdArgList args, ConnectionContext* cntx) { DCHECK(num_deleted == 0u || num_deleted == shard_set->size()); if (num_deleted == 0u) - return cntx->SendError("-Unknown Index name"); - return cntx->SendOk(); + return builder->SendError("-Unknown Index name"); + return builder->SendOk(); } -void SearchFamily::FtInfo(CmdArgList args, ConnectionContext* cntx) { +void SearchFamily::FtInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view idx_name = ArgS(args, 0); atomic_uint num_notfound{0}; vector infos(shard_set->size()); - cntx->transaction->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { + tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { auto* index = es->search_indices()->GetIndex(idx_name); if (index == nullptr) num_notfound.fetch_add(1); @@ -650,7 +651,7 @@ void SearchFamily::FtInfo(CmdArgList args, ConnectionContext* cntx) { DCHECK(num_notfound == 0u || num_notfound == shard_set->size()); if (num_notfound > 0u) - return cntx->SendError("Unknown Index name"); + return builder->SendError("Unknown Index name"); DCHECK(infos.front().base_index.schema.fields.size() == infos.back().base_index.schema.fields.size()); @@ -662,7 +663,7 @@ void SearchFamily::FtInfo(CmdArgList args, ConnectionContext* cntx) { const auto& info = infos.front(); const auto& schema = info.base_index.schema; - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); rb->StartCollection(4, RedisReplyBuilder::MAP); rb->SendSimpleString("index_name"); @@ -700,38 +701,38 @@ void SearchFamily::FtInfo(CmdArgList args, ConnectionContext* cntx) { rb->SendLong(total_num_docs); } -void SearchFamily::FtList(CmdArgList args, ConnectionContext* cntx) { +void SearchFamily::FtList(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { atomic_int first{0}; vector names; - cntx->transaction->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { + tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { // Using `first` to assign `names` only once without a race if (first.fetch_add(1) == 0) names = es->search_indices()->GetIndexNames(); return OpStatus::OK; }); - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); rb->SendStringArr(names); } -void SearchFamily::FtSearch(CmdArgList args, ConnectionContext* cntx) { +void SearchFamily::FtSearch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view index_name = ArgS(args, 0); string_view query_str = ArgS(args, 1); - auto params = ParseSearchParamsOrReply(args.subspan(2), cntx); + auto params = ParseSearchParamsOrReply(args.subspan(2), builder); if (!params.has_value()) return; search::SearchAlgorithm search_algo; search::SortOption* sort_opt = params->sort_option.has_value() ? &*params->sort_option : nullptr; if (!search_algo.Init(query_str, ¶ms->query_params, sort_opt)) - return cntx->SendError("Query syntax error"); + return builder->SendError("Query syntax error"); // Because our coordinator thread may not have a shard, we can't check ahead if the index exists. atomic index_not_found{false}; vector docs(shard_set->size()); - cntx->transaction->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { + tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { if (auto* index = es->search_indices()->GetIndex(index_name); index) docs[es->shard_id()] = index->Search(t->GetOpArgs(es), *params, &search_algo); else @@ -740,31 +741,31 @@ void SearchFamily::FtSearch(CmdArgList args, ConnectionContext* cntx) { }); if (index_not_found.load()) - return cntx->SendError(string{index_name} + ": no such index"); + return builder->SendError(string{index_name} + ": no such index"); for (const auto& res : docs) { if (res.error) - return cntx->SendError(*res.error); + return builder->SendError(*res.error); } if (auto agg = search_algo.HasAggregation(); agg) - ReplySorted(std::move(*agg), *params, absl::MakeSpan(docs), cntx); + ReplySorted(std::move(*agg), *params, absl::MakeSpan(docs), builder); else - ReplyWithResults(*params, absl::MakeSpan(docs), cntx); + ReplyWithResults(*params, absl::MakeSpan(docs), builder); } -void SearchFamily::FtProfile(CmdArgList args, ConnectionContext* cntx) { +void SearchFamily::FtProfile(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view index_name = ArgS(args, 0); string_view query_str = ArgS(args, 3); - optional params = ParseSearchParamsOrReply(args.subspan(4), cntx); + optional params = ParseSearchParamsOrReply(args.subspan(4), builder); if (!params.has_value()) return; search::SearchAlgorithm search_algo; search::SortOption* sort_opt = params->sort_option.has_value() ? &*params->sort_option : nullptr; if (!search_algo.Init(query_str, ¶ms->query_params, sort_opt)) - return cntx->SendError("Query syntax error"); + return builder->SendError("Query syntax error"); search_algo.EnableProfiling(); @@ -774,7 +775,7 @@ void SearchFamily::FtProfile(CmdArgList args, ConnectionContext* cntx) { vector> results(shard_set->size()); - cntx->transaction->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { + tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { auto* index = es->search_indices()->GetIndex(index_name); if (!index) return OpStatus::OK; @@ -792,7 +793,7 @@ void SearchFamily::FtProfile(CmdArgList args, ConnectionContext* cntx) { }); auto took = absl::Now() - start; - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); rb->StartArray(results.size() + 1); // General stats @@ -834,14 +835,14 @@ void SearchFamily::FtProfile(CmdArgList args, ConnectionContext* cntx) { } } -void SearchFamily::FtTagVals(CmdArgList args, ConnectionContext* cntx) { +void SearchFamily::FtTagVals(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { string_view index_name = ArgS(args, 0); string_view field_name = ArgS(args, 1); VLOG(1) << "FtTagVals: " << index_name << " " << field_name; vector> shard_results(shard_set->size(), StringVec{}); - cntx->transaction->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { + tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { if (auto* index = es->search_indices()->GetIndex(index_name); index) shard_results[es->shard_id()] = index->GetTagVals(field_name); else @@ -858,31 +859,31 @@ void SearchFamily::FtTagVals(CmdArgList args, ConnectionContext* cntx) { result_set.insert(make_move_iterator(res->begin()), make_move_iterator(res->end())); } else { res.error().kind = facade::kSearchErrType; - return cntx->SendError(res.error()); + return builder->SendError(res.error()); } } shard_results.clear(); vector vec(result_set.begin(), result_set.end()); - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); rb->SendStringArr(vec, RedisReplyBuilder::SET); } -void SearchFamily::FtAggregate(CmdArgList args, ConnectionContext* cntx) { - const auto params = ParseAggregatorParamsOrReply(args, cntx); +void SearchFamily::FtAggregate(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) { + const auto params = ParseAggregatorParamsOrReply(args, builder); if (!params) return; search::SearchAlgorithm search_algo; if (!search_algo.Init(params->query, ¶ms->params, nullptr)) - return cntx->SendError("Query syntax error"); + return builder->SendError("Query syntax error"); using ResultContainer = decltype(declval().SearchForAggregator( declval(), params.value(), &search_algo)); vector query_results(shard_set->size()); - cntx->transaction->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { + tx->ScheduleSingleHop([&](Transaction* t, EngineShard* es) { if (auto* index = es->search_indices()->GetIndex(params->index); index) { query_results[es->shard_id()] = index->SearchForAggregator(t->GetOpArgs(es), params.value(), &search_algo); @@ -898,10 +899,10 @@ void SearchFamily::FtAggregate(CmdArgList args, ConnectionContext* cntx) { auto agg_results = aggregate::Process(std::move(values), params->steps); if (!agg_results.has_value()) - return cntx->SendError(agg_results.error()); + return builder->SendError(agg_results.error()); size_t result_size = agg_results->size(); - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); auto sortable_value_sender = SortableValueSender(rb); rb->StartArray(result_size + 1); diff --git a/src/server/search/search_family.h b/src/server/search/search_family.h index f3b453eb9..db5445eeb 100644 --- a/src/server/search/search_family.h +++ b/src/server/search/search_family.h @@ -9,20 +9,27 @@ #include "base/mutex.h" #include "server/common.h" +namespace facade { +class SinkReplyBuilder; +} // namespace facade + namespace dfly { class CommandRegistry; class ConnectionContext; class SearchFamily { - static void FtCreate(CmdArgList args, ConnectionContext* cntx); - static void FtAlter(CmdArgList args, ConnectionContext* cntx); - static void FtDropIndex(CmdArgList args, ConnectionContext* cntx); - static void FtInfo(CmdArgList args, ConnectionContext* cntx); - static void FtList(CmdArgList args, ConnectionContext* cntx); - static void FtSearch(CmdArgList args, ConnectionContext* cntx); - static void FtProfile(CmdArgList args, ConnectionContext* cntx); - static void FtAggregate(CmdArgList args, ConnectionContext* cntx); - static void FtTagVals(CmdArgList args, ConnectionContext* cntx); + using SinkReplyBuilder = facade::SinkReplyBuilder; + + static void FtCreate(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx); + static void FtAlter(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void FtDropIndex(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void FtInfo(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void FtList(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void FtSearch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void FtProfile(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void FtAggregate(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); + static void FtTagVals(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder); public: static void Register(CommandRegistry* registry); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index f781e880a..ea7659e37 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -230,10 +230,13 @@ namespace { const auto kRedisVersion = "6.2.11"; constexpr string_view kS3Prefix = "s3://"sv; -using EngineFunc = void (ServerFamily::*)(CmdArgList args, ConnectionContext* cntx); +using EngineFunc = void (ServerFamily::*)(CmdArgList args, Transaction* tx, + SinkReplyBuilder* builder, ConnectionContext* cntx); -inline CommandId::Handler HandlerFunc(ServerFamily* se, EngineFunc f) { - return [=](CmdArgList args, ConnectionContext* cntx) { return (se->*f)(args, cntx); }; +inline CommandId::Handler3 HandlerFunc(ServerFamily* se, EngineFunc f) { + return [=](CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) { + return (se->*f)(args, tx, builder, cntx); + }; } using CI = CommandId; @@ -306,20 +309,20 @@ std::optional InferSnapshotCronExpr() { return std::nullopt; } -void ClientSetName(CmdArgList args, ConnectionContext* cntx) { +void ClientSetName(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx) { if (args.size() == 1) { cntx->conn()->SetName(string{ArgS(args, 0)}); - return cntx->SendOk(); + return builder->SendOk(); } else { - return cntx->SendError(facade::kSyntaxErr); + return builder->SendError(facade::kSyntaxErr); } } -void ClientGetName(CmdArgList args, ConnectionContext* cntx) { +void ClientGetName(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx) { if (!args.empty()) { - return cntx->SendError(facade::kSyntaxErr); + return builder->SendError(facade::kSyntaxErr); } - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); if (auto name = cntx->conn()->GetName(); !name.empty()) { return rb->SendBulkString(name); } else { @@ -327,9 +330,10 @@ void ClientGetName(CmdArgList args, ConnectionContext* cntx) { } } -void ClientList(CmdArgList args, absl::Span listeners, ConnectionContext* cntx) { +void ClientList(CmdArgList args, absl::Span listeners, SinkReplyBuilder* builder, + ConnectionContext* cntx) { if (!args.empty()) { - return cntx->SendError(facade::kSyntaxErr); + return builder->SendError(facade::kSyntaxErr); } vector client_info; @@ -350,11 +354,12 @@ void ClientList(CmdArgList args, absl::Span listeners, Connec string result = absl::StrJoin(client_info, "\n"); result.append("\n"); - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); return rb->SendVerbatimString(result); } -void ClientPauseCmd(CmdArgList args, vector listeners, ConnectionContext* cntx) { +void ClientPauseCmd(CmdArgList args, vector listeners, SinkReplyBuilder* builder, + ConnectionContext* cntx) { CmdArgParser parser(args); auto timeout = parser.Next(); @@ -363,7 +368,7 @@ void ClientPauseCmd(CmdArgList args, vector listeners, Connec pause_state = parser.MapNext("WRITE", ClientPause::WRITE, "ALL", ClientPause::ALL); } if (auto err = parser.Error(); err) { - return cntx->SendError(err->MakeReply()); + return builder->SendError(err->MakeReply()); } const auto timeout_ms = timeout * 1ms; @@ -376,21 +381,21 @@ void ClientPauseCmd(CmdArgList args, vector listeners, Connec Pause(listeners, cntx->ns, cntx->conn(), pause_state, std::move(is_pause_in_progress)); pause_fb_opt) { pause_fb_opt->Detach(); - cntx->SendOk(); + builder->SendOk(); } else { - cntx->SendError("Failed to pause all running clients"); + builder->SendError("Failed to pause all running clients"); } } -void ClientTracking(CmdArgList args, ConnectionContext* cntx) { - auto* rb = static_cast(cntx->reply_builder()); +void ClientTracking(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx) { + auto* rb = static_cast(builder); if (!rb->IsResp3()) - return cntx->SendError( + return builder->SendError( "Client tracking is currently not supported for RESP2. Please use RESP3."); CmdArgParser parser{args}; if (!parser.HasAtLeast(1) || args.size() > 3) - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); bool is_on = false; using Tracking = ConnectionState::ClientTracking; @@ -398,7 +403,7 @@ void ClientTracking(CmdArgList args, ConnectionContext* cntx) { if (parser.Check("ON")) { is_on = true; } else if (!parser.Check("OFF")) { - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); } bool noloop = false; @@ -411,7 +416,7 @@ void ClientTracking(CmdArgList args, ConnectionContext* cntx) { } else if (parser.Check("NOLOOP")) { noloop = true; } else { - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); } } @@ -419,7 +424,7 @@ void ClientTracking(CmdArgList args, ConnectionContext* cntx) { if (!noloop && parser.Check("NOLOOP")) { noloop = true; } else { - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); } } @@ -430,50 +435,51 @@ void ClientTracking(CmdArgList args, ConnectionContext* cntx) { cntx->conn_state.tracking_info_.SetClientTracking(is_on); cntx->conn_state.tracking_info_.SetOption(option); cntx->conn_state.tracking_info_.SetNoLoop(noloop); - return cntx->SendOk(); + return builder->SendOk(); } -void ClientCaching(CmdArgList args, ConnectionContext* cntx) { - auto* rb = static_cast(cntx->reply_builder()); +void ClientCaching(CmdArgList args, SinkReplyBuilder* builder, Transaction* tx, + ConnectionContext* cntx) { + auto* rb = static_cast(builder); if (!rb->IsResp3()) - return cntx->SendError( + return builder->SendError( "Client caching is currently not supported for RESP2. Please use RESP3."); if (args.size() != 1) { - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); } using Tracking = ConnectionState::ClientTracking; CmdArgParser parser{args}; if (parser.Check("YES")) { if (!cntx->conn_state.tracking_info_.HasOption(Tracking::OPTIN)) { - return cntx->SendError( + return builder->SendError( "ERR CLIENT CACHING YES is only valid when tracking is enabled in OPTIN mode"); } } else if (parser.Check("NO")) { if (!cntx->conn_state.tracking_info_.HasOption(Tracking::OPTOUT)) { - return cntx->SendError( + return builder->SendError( "ERR CLIENT CACHING NO is only valid when tracking is enabled in OPTOUT mode"); } cntx->conn_state.tracking_info_.ResetCachingSequenceNumber(); } else { - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); } - bool is_multi = cntx->transaction && cntx->transaction->IsMulti(); + bool is_multi = tx && tx->IsMulti(); cntx->conn_state.tracking_info_.SetCachingSequenceNumber(is_multi); - cntx->SendOk(); + builder->SendOk(); } -void ClientSetInfo(CmdArgList args, ConnectionContext* cntx) { +void ClientSetInfo(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx) { if (args.size() != 2) { - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); } auto* conn = cntx->conn(); if (conn == nullptr) { - return cntx->SendError("No connection"); + return builder->SendError("No connection"); } string type = absl::AsciiStrToUpper(ArgS(args, 0)); @@ -484,13 +490,13 @@ void ClientSetInfo(CmdArgList args, ConnectionContext* cntx) { } else if (type == "LIB-VER") { conn->SetLibVersion(string(val)); } else { - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); } - cntx->SendOk(); + builder->SendOk(); } -void ClientId(CmdArgList args, ConnectionContext* cntx) { +void ClientId(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx) { if (args.size() != 0) { return cntx->SendError(kSyntaxErr); } @@ -498,7 +504,8 @@ void ClientId(CmdArgList args, ConnectionContext* cntx) { return cntx->SendLong(cntx->conn()->GetClientId()); } -void ClientKill(CmdArgList args, absl::Span listeners, ConnectionContext* cntx) { +void ClientKill(CmdArgList args, absl::Span listeners, SinkReplyBuilder* builder, + ConnectionContext* cntx) { std::function evaluator; if (args.size() == 1) { @@ -580,7 +587,7 @@ struct ReplicaOfArgs { string host; uint16_t port; std::optional slot_range; - static optional FromCmdArgs(CmdArgList args, ConnectionContext* cntx); + static optional FromCmdArgs(CmdArgList args, SinkReplyBuilder* builder); bool IsReplicaOfNoOne() const { return port == 0; } @@ -597,7 +604,7 @@ struct ReplicaOfArgs { } }; -optional ReplicaOfArgs::FromCmdArgs(CmdArgList args, ConnectionContext* cntx) { +optional ReplicaOfArgs::FromCmdArgs(CmdArgList args, SinkReplyBuilder* builder) { ReplicaOfArgs replicaof_args; CmdArgParser parser(args); @@ -608,21 +615,21 @@ optional ReplicaOfArgs::FromCmdArgs(CmdArgList args, ConnectionCo replicaof_args.host = parser.Next(); replicaof_args.port = parser.Next(); if (auto err = parser.Error(); err || replicaof_args.port < 1) { - cntx->SendError("port is out of range"); + builder->SendError("port is out of range"); return nullopt; } if (parser.HasNext()) { auto [slot_start, slot_end] = parser.Next(); replicaof_args.slot_range = cluster::SlotRange{slot_start, slot_end}; if (auto err = parser.Error(); err || !replicaof_args.slot_range->IsValid()) { - cntx->SendError("Invalid slot range"); + builder->SendError("Invalid slot range"); return nullopt; } } } if (auto err = parser.Error(); err) { - cntx->SendError(err->MakeReply()); + builder->SendError(err->MakeReply()); return nullopt; } return replicaof_args; @@ -630,18 +637,18 @@ optional ReplicaOfArgs::FromCmdArgs(CmdArgList args, ConnectionCo } // namespace -void SlowLogGet(dfly::CmdArgList args, dfly::ConnectionContext* cntx, std::string_view sub_cmd, - util::ProactorPool* pp) { +void SlowLogGet(dfly::CmdArgList args, std::string_view sub_cmd, util::ProactorPool* pp, + SinkReplyBuilder* builder) { size_t requested_slow_log_length = UINT32_MAX; size_t argc = args.size(); if (argc >= 3) { - cntx->SendError(facade::UnknownSubCmd(sub_cmd, "SLOWLOG"), facade::kSyntaxErrType); + builder->SendError(facade::UnknownSubCmd(sub_cmd, "SLOWLOG"), facade::kSyntaxErrType); return; } else if (argc == 2) { string_view length = facade::ArgS(args, 1); int64_t num; if ((!absl::SimpleAtoi(length, &num)) || (num < -1)) { - cntx->SendError("count should be greater than or equal to -1"); + builder->SendError("count should be greater than or equal to -1"); return; } if (num >= 0) { @@ -669,7 +676,7 @@ void SlowLogGet(dfly::CmdArgList args, dfly::ConnectionContext* cntx, std::strin requested_slow_log_length = std::min(merged_slow_log.size(), requested_slow_log_length); - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); rb->StartArray(requested_slow_log_length); for (size_t i = 0; i < requested_slow_log_length; ++i) { const auto& entry = merged_slow_log[i].first; @@ -1560,9 +1567,9 @@ void ServerFamily::OnClose(ConnectionContext* cntx) { dfly_cmd_->OnClose(cntx->conn_state.replication_info.repl_session_id); } -void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext* cntx) { +void ServerFamily::StatsMC(std::string_view section, SinkReplyBuilder* builder) { if (!section.empty()) { - return cntx->SendError(""); + return builder->SendError(""); } string info; @@ -1599,8 +1606,8 @@ void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext* absl::StrAppend(&info, "END\r\n"); - MCReplyBuilder* builder = static_cast(cntx->reply_builder()); - builder->SendRaw(info); + MCReplyBuilder* mc_builder = static_cast(builder); + mc_builder->SendRaw(info); #undef ADD_LINE } @@ -1702,7 +1709,8 @@ LastSaveInfo ServerFamily::GetLastSaveInfo() const { return last_save_info_; } -void ServerFamily::DbSize(CmdArgList args, ConnectionContext* cntx) { +void ServerFamily::DbSize(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { atomic_ulong num_keys{0}; shard_set->RunBriefInParallel( @@ -1712,7 +1720,7 @@ void ServerFamily::DbSize(CmdArgList args, ConnectionContext* cntx) { }, [](ShardId) { return true; }); - return cntx->SendLong(num_keys.load(memory_order_relaxed)); + return builder->SendLong(num_keys.load(memory_order_relaxed)); } void ServerFamily::CancelBlockingOnThread(std::function status_cb) { @@ -1762,27 +1770,29 @@ void ServerFamily::SendInvalidationMessages() const { } } -void ServerFamily::FlushDb(CmdArgList args, ConnectionContext* cntx) { - DCHECK(cntx->transaction); - Drakarys(cntx->transaction, cntx->transaction->GetDbIndex()); +void ServerFamily::FlushDb(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { + DCHECK(tx); + Drakarys(tx, tx->GetDbIndex()); SendInvalidationMessages(); - cntx->reply_builder()->SendOk(); + builder->SendOk(); } -void ServerFamily::FlushAll(CmdArgList args, ConnectionContext* cntx) { +void ServerFamily::FlushAll(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { if (args.size() > 1) { - cntx->SendError(kSyntaxErr); + builder->SendError(kSyntaxErr); return; } - DCHECK(cntx->transaction); - Drakarys(cntx->transaction, DbSlice::kDbAll); + DCHECK(tx); + Drakarys(tx, DbSlice::kDbAll); SendInvalidationMessages(); - cntx->SendOk(); + builder->SendOk(); } bool ServerFamily::DoAuth(ConnectionContext* cntx, std::string_view username, - std::string_view password) const { + std::string_view password) { const auto* registry = ServerState::tlocal()->user_registry; CHECK(registry); const bool is_authorized = registry->AuthUser(username, password); @@ -1798,7 +1808,8 @@ bool ServerFamily::DoAuth(ConnectionContext* cntx, std::string_view username, return is_authorized; } -void ServerFamily::Auth(CmdArgList args, ConnectionContext* cntx) { +void ServerFamily::Auth(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { if (args.size() > 2) { return cntx->SendError(kSyntaxErr); } @@ -1833,35 +1844,37 @@ void ServerFamily::Auth(CmdArgList args, ConnectionContext* cntx) { } } -void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) { +void ServerFamily::Client(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); CmdArgList sub_args = args.subspan(1); if (sub_cmd == "SETNAME") { - return ClientSetName(sub_args, cntx); + return ClientSetName(sub_args, builder, cntx); } else if (sub_cmd == "GETNAME") { - return ClientGetName(sub_args, cntx); + return ClientGetName(sub_args, builder, cntx); } else if (sub_cmd == "LIST") { - return ClientList(sub_args, absl::MakeSpan(listeners_), cntx); + return ClientList(sub_args, absl::MakeSpan(listeners_), builder, cntx); } else if (sub_cmd == "PAUSE") { - return ClientPauseCmd(sub_args, GetNonPriviligedListeners(), cntx); + return ClientPauseCmd(sub_args, GetNonPriviligedListeners(), builder, cntx); } else if (sub_cmd == "TRACKING") { - return ClientTracking(sub_args, cntx); + return ClientTracking(sub_args, builder, cntx); } else if (sub_cmd == "KILL") { - return ClientKill(sub_args, absl::MakeSpan(listeners_), cntx); + return ClientKill(sub_args, absl::MakeSpan(listeners_), builder, cntx); } else if (sub_cmd == "CACHING") { - return ClientCaching(sub_args, cntx); + return ClientCaching(sub_args, builder, tx, cntx); } else if (sub_cmd == "SETINFO") { - return ClientSetInfo(sub_args, cntx); + return ClientSetInfo(sub_args, builder, cntx); } else if (sub_cmd == "ID") { - return ClientId(sub_args, cntx); + return ClientId(sub_args, builder, cntx); } LOG_FIRST_N(ERROR, 10) << "Subcommand " << sub_cmd << " not supported"; - return cntx->SendError(UnknownSubCmd(sub_cmd, "CLIENT"), kSyntaxErrType); + return builder->SendError(UnknownSubCmd(sub_cmd, "CLIENT"), kSyntaxErrType); } -void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) { +void ServerFamily::Config(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); if (sub_cmd == "HELP") { @@ -1877,13 +1890,13 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) { " Prints this help.", }; - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); return rb->SendSimpleStrArr(help_arr); } if (sub_cmd == "SET") { if (args.size() != 3) { - return cntx->SendError(WrongNumArgsError("config|set")); + return builder->SendError(WrongNumArgsError("config|set")); } string param = absl::AsciiStrToLower(ArgS(args, 1)); @@ -1893,19 +1906,19 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) { const char kErrPrefix[] = "CONFIG SET failed (possibly related to argument '"; switch (result) { case ConfigRegistry::SetResult::OK: - return cntx->SendOk(); + return builder->SendOk(); case ConfigRegistry::SetResult::UNKNOWN: - return cntx->SendError( + return builder->SendError( absl::StrCat("Unknown option or number of arguments for CONFIG SET - '", param, "'"), kConfigErrType); case ConfigRegistry::SetResult::READONLY: - return cntx->SendError(absl::StrCat(kErrPrefix, param, "') - can't set immutable config"), - kConfigErrType); + return builder->SendError( + absl::StrCat(kErrPrefix, param, "') - can't set immutable config"), kConfigErrType); case ConfigRegistry::SetResult::INVALID: - return cntx->SendError(absl::StrCat(kErrPrefix, param, "') - argument can not be set"), - kConfigErrType); + return builder->SendError(absl::StrCat(kErrPrefix, param, "') - argument can not be set"), + kConfigErrType); } ABSL_UNREACHABLE(); } @@ -1930,25 +1943,27 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) { } } } - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); return rb->SendStringArr(res, RedisReplyBuilder::MAP); } if (sub_cmd == "RESETSTAT") { ResetStat(cntx->ns); - return cntx->SendOk(); + return builder->SendOk(); } else { - return cntx->SendError(UnknownSubCmd(sub_cmd, "CONFIG"), kSyntaxErrType); + return builder->SendError(UnknownSubCmd(sub_cmd, "CONFIG"), kSyntaxErrType); } } -void ServerFamily::Debug(CmdArgList args, ConnectionContext* cntx) { +void ServerFamily::Debug(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { DebugCmd dbg_cmd{this, cntx}; return dbg_cmd.Run(args); } -void ServerFamily::Memory(CmdArgList args, ConnectionContext* cntx) { +void ServerFamily::Memory(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { MemoryCmd mem_cmd{this, cntx}; return mem_cmd.Run(args); @@ -1962,9 +1977,9 @@ void ServerFamily::BgSaveFb(boost::intrusive_ptr trans) { } std::optional ServerFamily::GetVersionAndBasename( - CmdArgList args, ConnectionContext* cntx) { + CmdArgList args, SinkReplyBuilder* builder) { if (args.size() > 2) { - cntx->SendError(kSyntaxErr); + builder->SendError(kSyntaxErr); return {}; } @@ -1977,7 +1992,7 @@ std::optional ServerFamily::GetVersionAndBasename } else if (sub_cmd == "RDB") { new_version = false; } else { - cntx->SendError(UnknownSubCmd(sub_cmd, "SAVE"), kSyntaxErrType); + builder->SendError(UnknownSubCmd(sub_cmd, "SAVE"), kSyntaxErrType); return {}; } } @@ -1992,40 +2007,42 @@ std::optional ServerFamily::GetVersionAndBasename // BGSAVE [DF|RDB] [basename] // TODO add missing [SCHEDULE] -void ServerFamily::BgSave(CmdArgList args, ConnectionContext* cntx) { - auto maybe_res = GetVersionAndBasename(args, cntx); +void ServerFamily::BgSave(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { + auto maybe_res = GetVersionAndBasename(args, builder); if (!maybe_res) { return; } const auto [version, basename] = *maybe_res; - if (auto ec = DoSaveCheckAndStart(version, basename, cntx->transaction); ec) { - cntx->SendError(ec.Format()); + if (auto ec = DoSaveCheckAndStart(version, basename, tx); ec) { + builder->SendError(ec.Format()); return; } bg_save_fb_.JoinIfNeeded(); bg_save_fb_ = fb2::Fiber("bg_save_fiber", &ServerFamily::BgSaveFb, this, - boost::intrusive_ptr(cntx->transaction)); - cntx->SendOk(); + boost::intrusive_ptr(tx)); + builder->SendOk(); } // SAVE [DF|RDB] [basename] // Allows saving the snapshot of the dataset on disk, potentially overriding the format // and the snapshot name. -void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) { - auto maybe_res = GetVersionAndBasename(args, cntx); +void ServerFamily::Save(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { + auto maybe_res = GetVersionAndBasename(args, builder); if (!maybe_res) { return; } const auto [version, basename] = *maybe_res; - GenericError ec = DoSave(version, basename, cntx->transaction); + GenericError ec = DoSave(version, basename, tx); if (ec) { - cntx->SendError(ec.Format()); + builder->SendError(ec.Format()); } else { - cntx->SendOk(); + builder->SendOk(); } } @@ -2149,9 +2166,10 @@ Metrics ServerFamily::GetMetrics(Namespace* ns) const { return result; } -void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { +void ServerFamily::Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { if (args.size() > 1) { - return cntx->SendError(kSyntaxErr); + return builder->SendError(kSyntaxErr); } string section; @@ -2547,11 +2565,12 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { if (should_enter("CLUSTER")) { append("cluster_enabled", cluster::IsClusterEnabledOrEmulated()); } - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); rb->SendVerbatimString(info); } -void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) { +void ServerFamily::Hello(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { // If no arguments are provided default to RESP2. bool is_resp3 = false; bool has_auth = false; @@ -2565,7 +2584,7 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) { is_resp3 = proto_version == "3"; bool valid_proto_version = proto_version == "2" || is_resp3; if (!valid_proto_version) { - cntx->SendError(UnknownCmd("HELLO", args)); + builder->SendError(UnknownCmd("HELLO", args)); return; } @@ -2582,18 +2601,18 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) { clientname = ArgS(args, i + 1); i += 1; } else { - cntx->SendError(kSyntaxErr); + builder->SendError(kSyntaxErr); return; } } } if (has_auth && !DoAuth(cntx, username, password)) { - return cntx->SendError(facade::kAuthRejected); + return builder->SendError(facade::kAuthRejected); } if (cntx->req_auth && !cntx->authenticated) { - cntx->SendError( + builder->SendError( "-NOAUTH HELLO must be called with the client already " "authenticated, otherwise the HELLO AUTH " "option can be used to authenticate the client and " @@ -2605,7 +2624,7 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) { cntx->conn()->SetName(string{clientname}); } - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); int proto_version = 2; if (is_resp3) { proto_version = 3; @@ -2633,32 +2652,33 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) { rb->SendBulkString((*ServerState::tlocal()).is_master ? "master" : "slave"); } -void ServerFamily::AddReplicaOf(CmdArgList args, ConnectionContext* cntx) { +void ServerFamily::AddReplicaOf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { util::fb2::LockGuard lk(replicaof_mu_); if (ServerState::tlocal()->is_master) { - cntx->SendError("Calling ADDREPLICAOFF allowed only after server is already a replica"); + builder->SendError("Calling ADDREPLICAOFF allowed only after server is already a replica"); return; } CHECK(replica_); - auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, cntx); + auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, builder); if (!replicaof_args.has_value()) { return; } if (replicaof_args->IsReplicaOfNoOne()) { - return cntx->SendError("ADDREPLICAOF does not support no one"); + return builder->SendError("ADDREPLICAOF does not support no one"); } LOG(INFO) << "Add Replica " << *replicaof_args; auto add_replica = make_unique(replicaof_args->host, replicaof_args->port, &service_, master_replid(), replicaof_args->slot_range); - error_code ec = add_replica->Start(cntx); + error_code ec = add_replica->Start(builder); if (!ec) { cluster_replicas_.push_back(std::move(add_replica)); } } -void ServerFamily::ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx, +void ServerFamily::ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ActionOnConnectionFail on_err) { std::shared_ptr new_replica; { @@ -2666,11 +2686,11 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx, // We should not execute replica of command while loading from snapshot. if (ServerState::tlocal()->is_master && service_.GetGlobalState() == GlobalState::LOADING) { - cntx->SendError(kLoadingErr); + builder->SendError(kLoadingErr); return; } - auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, cntx); + auto replicaof_args = ReplicaOfArgs::FromCmdArgs(args, builder); if (!replicaof_args.has_value()) { return; } @@ -2692,7 +2712,7 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx, CHECK_EQ(service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE), GlobalState::ACTIVE) << "Server is set to replica no one, yet state is not active!"; - return cntx->SendOk(); + return builder->SendOk(); } // If any replication is in progress, stop it, cancellation should kick in immediately @@ -2704,14 +2724,14 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx, if (auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING); new_state != GlobalState::LOADING) { LOG(WARNING) << new_state << " in progress, ignored"; - cntx->SendError("Invalid state"); + builder->SendError("Invalid state"); return; } - // If we are called by "Replicate", cntx->transaction will be null but we do not need + // If we are called by "Replicate", tx will be null but we do not need // to flush anything. - if (cntx->transaction) { - Drakarys(cntx->transaction, DbSlice::kDbAll); + if (tx) { + Drakarys(tx, DbSlice::kDbAll); } // Create a new replica and assing it @@ -2730,10 +2750,10 @@ void ServerFamily::ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx, error_code ec{}; switch (on_err) { case ActionOnConnectionFail::kReturnOnError: - ec = new_replica->Start(cntx); + ec = new_replica->Start(builder); break; case ActionOnConnectionFail::kContinueReplication: // set DF to replicate, and forget about it - new_replica->EnableReplication(cntx); + new_replica->EnableReplication(builder); break; }; @@ -2756,16 +2776,12 @@ void ServerFamily::StopAllClusterReplicas() { cluster_replicas_.clear(); } -void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) { - ReplicaOfInternal(args, cntx, ActionOnConnectionFail::kReturnOnError); +void ServerFamily::ReplicaOf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { + ReplicaOfInternal(args, tx, builder, ActionOnConnectionFail::kReturnOnError); } void ServerFamily::Replicate(string_view host, string_view port) { - io::NullSink sink; - ConnectionContext cntx{&sink, nullptr, {}}; - cntx.ns = &namespaces.GetDefaultNamespace(); - cntx.skip_acl_validation = true; - StringVec replicaof_params{string(host), string(port)}; CmdArgVec args_vec; @@ -2773,12 +2789,15 @@ void ServerFamily::Replicate(string_view host, string_view port) { args_vec.emplace_back(MutableSlice{s.data(), s.size()}); } CmdArgList args_list = absl::MakeSpan(args_vec); - ReplicaOfInternal(args_list, &cntx, ActionOnConnectionFail::kContinueReplication); + io::NullSink sink; + facade::RedisReplyBuilder rb(&sink); + ReplicaOfInternal(args_list, nullptr, &rb, ActionOnConnectionFail::kContinueReplication); } // REPLTAKEOVER [SAVE] // SAVE is used only by tests. -void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) { +void ServerFamily::ReplTakeOver(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { VLOG(1) << "ReplTakeOver start"; CmdArgParser parser{args}; @@ -2787,19 +2806,19 @@ void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) { bool save_flag = static_cast(parser.Check("SAVE")); if (parser.HasNext()) - return cntx->SendError(absl::StrCat("Unsupported option:", string_view(parser.Next()))); + return builder->SendError(absl::StrCat("Unsupported option:", string_view(parser.Next()))); if (auto err = parser.Error(); err) - return cntx->SendError(err->MakeReply()); + return builder->SendError(err->MakeReply()); // We allow zero timeouts for tests. if (timeout_sec < 0) { - return cntx->SendError("timeout is negative"); + return builder->SendError("timeout is negative"); } // We return OK, to support idempotency semantics. if (ServerState::tlocal()->is_master) - return cntx->SendOk(); + return builder->SendOk(); util::fb2::LockGuard lk(replicaof_mu_); @@ -2808,31 +2827,32 @@ void ServerFamily::ReplTakeOver(CmdArgList args, ConnectionContext* cntx) { auto info = replica_->GetSummary(); if (!info.full_sync_done) { - return cntx->SendError("Full sync not done"); + return builder->SendError("Full sync not done"); } std::error_code ec = replica_->TakeOver(ArgS(args, 0), save_flag); if (ec) - return cntx->SendError("Couldn't execute takeover"); + return builder->SendError("Couldn't execute takeover"); LOG(INFO) << "Takeover successful, promoting this instance to master."; SetMasterFlagOnAllThreads(true); replica_->Stop(); replica_.reset(); - return cntx->SendOk(); + return builder->SendOk(); } -void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) { +void ServerFamily::ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { { util::fb2::LockGuard lk(replicaof_mu_); if (!ServerState::tlocal()->is_master) { - return cntx->SendError("Replicating a replica is unsupported"); + return builder->SendError("Replicating a replica is unsupported"); } } auto err_cb = [&]() mutable { LOG(ERROR) << "Error in receiving command: " << args; - cntx->SendError(kSyntaxErr); + builder->SendError(kSyntaxErr); }; if (args.size() % 2 == 1) @@ -2857,7 +2877,7 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) { cntx->replica_conn = true; // The response for 'capa dragonfly' is: - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); rb->StartArray(4); rb->SendSimpleString(master_replid_); rb->SendSimpleString(sync_id); @@ -2921,8 +2941,9 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) { return cntx->SendOk(); } -void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) { - auto* rb = static_cast(cntx->reply_builder()); +void ServerFamily::Role(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { + auto* rb = static_cast(builder); util::fb2::LockGuard lk(replicaof_mu_); // Thread local var is_master is updated under mutex replicaof_mu_ together with replica_, // ensuring eventual consistency of is_master. When determining if the server is a replica and @@ -2965,11 +2986,13 @@ void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) { } } -void ServerFamily::Script(CmdArgList args, ConnectionContext* cntx) { - script_mgr_->Run(std::move(args), cntx->transaction, cntx->reply_builder()); +void ServerFamily::Script(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { + script_mgr_->Run(std::move(args), tx, builder); } -void ServerFamily::LastSave(CmdArgList args, ConnectionContext* cntx) { +void ServerFamily::LastSave(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { time_t save_time; { util::fb2::LockGuard lk(save_mu_); @@ -2978,8 +3001,9 @@ void ServerFamily::LastSave(CmdArgList args, ConnectionContext* cntx) { cntx->SendLong(save_time); } -void ServerFamily::Latency(CmdArgList args, ConnectionContext* cntx) { - auto* rb = static_cast(cntx->reply_builder()); +void ServerFamily::Latency(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { + auto* rb = static_cast(builder); string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); if (sub_cmd == "LATEST") { @@ -2990,7 +3014,8 @@ void ServerFamily::Latency(CmdArgList args, ConnectionContext* cntx) { cntx->SendError(kSyntaxErr); } -void ServerFamily::ShutdownCmd(CmdArgList args, ConnectionContext* cntx) { +void ServerFamily::ShutdownCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { if (args.size() > 1) { cntx->SendError(kSyntaxErr); return; @@ -3014,11 +3039,13 @@ void ServerFamily::ShutdownCmd(CmdArgList args, ConnectionContext* cntx) { cntx->SendOk(); } -void ServerFamily::Dfly(CmdArgList args, ConnectionContext* cntx) { - dfly_cmd_->Run(args, static_cast(cntx->reply_builder()), cntx); +void ServerFamily::Dfly(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { + dfly_cmd_->Run(args, static_cast(builder), cntx); } -void ServerFamily::SlowLog(CmdArgList args, ConnectionContext* cntx) { +void ServerFamily::SlowLog(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); if (sub_cmd == "HELP") { @@ -3036,7 +3063,7 @@ void ServerFamily::SlowLog(CmdArgList args, ConnectionContext* cntx) { "HELP", " Prints this help.", }; - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); rb->SendSimpleStrArr(help); return; } @@ -3057,17 +3084,18 @@ void ServerFamily::SlowLog(CmdArgList args, ConnectionContext* cntx) { } if (sub_cmd == "GET") { - return SlowLogGet(args, cntx, sub_cmd, &service_.proactor_pool()); + return SlowLogGet(args, sub_cmd, &service_.proactor_pool(), builder); } cntx->SendError(UnknownSubCmd(sub_cmd, "SLOWLOG"), kSyntaxErrType); } -void ServerFamily::Module(CmdArgList args, ConnectionContext* cntx) { +void ServerFamily::Module(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) { string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0)); if (sub_cmd != "LIST") return cntx->SendError(kSyntaxErr); - auto* rb = static_cast(cntx->reply_builder()); + auto* rb = static_cast(builder); rb->StartArray(2); // Json diff --git a/src/server/server_family.h b/src/server/server_family.h index 85a4c0408..d022697dd 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -161,6 +161,8 @@ struct ReplicaOffsetInfo { }; class ServerFamily { + using SinkReplyBuilder = facade::SinkReplyBuilder; + public: explicit ServerFamily(Service* service); ~ServerFamily(); @@ -169,7 +171,9 @@ class ServerFamily { void Register(CommandRegistry* registry); void Shutdown() ABSL_LOCKS_EXCLUDED(replicaof_mu_); - void ShutdownCmd(CmdArgList args, ConnectionContext* cntx); + // Public because is used by DflyCmd. + void ShutdownCmd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx); Service& service() { return service_; @@ -187,7 +191,7 @@ class ServerFamily { return script_mgr_.get(); } - void StatsMC(std::string_view section, facade::ConnectionContext* cntx); + void StatsMC(std::string_view section, SinkReplyBuilder* builder); // if new_version is true, saves DF specific, non redis compatible snapshot. // if basename is not empty it will override dbfilename flag. @@ -265,29 +269,40 @@ class ServerFamily { return shard_set->size(); } - void Auth(CmdArgList args, ConnectionContext* cntx); - void Client(CmdArgList args, ConnectionContext* cntx); - void Config(CmdArgList args, ConnectionContext* cntx); - void DbSize(CmdArgList args, ConnectionContext* cntx); - void Debug(CmdArgList args, ConnectionContext* cntx); - void Dfly(CmdArgList args, ConnectionContext* cntx); - void Memory(CmdArgList args, ConnectionContext* cntx); - void FlushDb(CmdArgList args, ConnectionContext* cntx); - void FlushAll(CmdArgList args, ConnectionContext* cntx); - void Info(CmdArgList args, ConnectionContext* cntx) ABSL_LOCKS_EXCLUDED(save_mu_, replicaof_mu_); - void Hello(CmdArgList args, ConnectionContext* cntx); - void LastSave(CmdArgList args, ConnectionContext* cntx) ABSL_LOCKS_EXCLUDED(save_mu_); - void Latency(CmdArgList args, ConnectionContext* cntx); - void ReplicaOf(CmdArgList args, ConnectionContext* cntx); - void AddReplicaOf(CmdArgList args, ConnectionContext* cntx); - void ReplTakeOver(CmdArgList args, ConnectionContext* cntx) ABSL_LOCKS_EXCLUDED(replicaof_mu_); - void ReplConf(CmdArgList args, ConnectionContext* cntx); - void Role(CmdArgList args, ConnectionContext* cntx) ABSL_LOCKS_EXCLUDED(replicaof_mu_); - void Save(CmdArgList args, ConnectionContext* cntx); - void BgSave(CmdArgList args, ConnectionContext* cntx); - void Script(CmdArgList args, ConnectionContext* cntx); - void SlowLog(CmdArgList args, ConnectionContext* cntx); - void Module(CmdArgList args, ConnectionContext* cntx); + void Auth(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); + void Client(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); + void Config(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); + void DbSize(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); + void Debug(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); + void Dfly(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); + void Memory(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); + void FlushDb(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx); + void FlushAll(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx); + void Info(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) + ABSL_LOCKS_EXCLUDED(save_mu_, replicaof_mu_); + void Hello(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); + void LastSave(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) ABSL_LOCKS_EXCLUDED(save_mu_); + void Latency(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx); + void ReplicaOf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx); + void AddReplicaOf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx); + void ReplTakeOver(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx) ABSL_LOCKS_EXCLUDED(replicaof_mu_); + void ReplConf(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx); + void Role(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) + ABSL_LOCKS_EXCLUDED(replicaof_mu_); + void Save(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); + void BgSave(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); + void Script(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); + void SlowLog(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ConnectionContext* cntx); + void Module(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx); void SyncGeneric(std::string_view repl_master_id, uint64_t offs, ConnectionContext* cntx); @@ -298,8 +313,8 @@ class ServerFamily { }; // REPLICAOF implementation. See arguments above - void ReplicaOfInternal(CmdArgList args, ConnectionContext* cntx, ActionOnConnectionFail on_error) - ABSL_LOCKS_EXCLUDED(replicaof_mu_); + void ReplicaOfInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, + ActionOnConnectionFail on_error) ABSL_LOCKS_EXCLUDED(replicaof_mu_); // Returns the number of loaded keys if successful. io::Result LoadRdb(const std::string& rdb_file, LoadExistingKeys existing_keys); @@ -311,7 +326,7 @@ class ServerFamily { // Helper function to retrieve version(true if format is dfs rdb), and basename from args. // In case of an error an empty optional is returned. using VersionBasename = std::pair; - std::optional GetVersionAndBasename(CmdArgList args, ConnectionContext* cntx); + std::optional GetVersionAndBasename(CmdArgList args, SinkReplyBuilder* builder); void BgSaveFb(boost::intrusive_ptr trans); @@ -322,7 +337,7 @@ class ServerFamily { bool ignore_state = false) ABSL_NO_THREAD_SAFETY_ANALYSIS; void StopAllClusterReplicas() ABSL_EXCLUSIVE_LOCKS_REQUIRED(replicaof_mu_); - bool DoAuth(ConnectionContext* cntx, std::string_view username, std::string_view password) const; + static bool DoAuth(ConnectionContext* cntx, std::string_view username, std::string_view password); util::fb2::Fiber snapshot_schedule_fb_; util::fb2::Fiber load_fiber_; @@ -373,7 +388,4 @@ std::optional Pause(std::vector listeners, facade::Connection* conn, ClientPause pause_state, std::function is_pause_in_progress); -void SlowLogGet(CmdArgList args, ConnectionContext* cntx, std::string_view sub_cmd, - util::ProactorPool* pp); - } // namespace dfly