chore: pass SinkReplyBuilder and Transaction explicitly. Part9 (#3996)

This commit is contained in:
Roman Gershman 2024-10-25 16:51:37 +03:00 committed by GitHub
parent ef09052482
commit 408a8a33f2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 314 additions and 255 deletions

View file

@ -138,7 +138,7 @@ ClusterShardInfo ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) co
return info;
}
void ClusterFamily::ClusterHelp(ConnectionContext* cntx) {
void ClusterFamily::ClusterHelp(SinkReplyBuilder* builder) {
string_view help_arr[] = {
"CLUSTER <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",
"SLOTS",
@ -152,15 +152,15 @@ void ClusterFamily::ClusterHelp(ConnectionContext* cntx) {
"HELP",
" Prints this help.",
};
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
return rb->SendSimpleStrArr(help_arr);
}
namespace {
void ClusterShardsImpl(const ClusterShardInfos& config, ConnectionContext* cntx) {
void ClusterShardsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder) {
// For more details https://redis.io/commands/cluster-shards/
constexpr unsigned int kEntrySize = 4;
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
auto WriteNode = [&](const ClusterNodeInfo& node, string_view role) {
constexpr unsigned int kNodeSize = 14;
@ -202,20 +202,21 @@ void ClusterShardsImpl(const ClusterShardInfos& config, ConnectionContext* cntx)
}
} // namespace
void ClusterFamily::ClusterShards(ConnectionContext* cntx) {
void ClusterFamily::ClusterShards(SinkReplyBuilder* builder, ConnectionContext* cntx) {
if (IsClusterEmulated()) {
return ClusterShardsImpl({GetEmulatedShardInfo(cntx)}, cntx);
return ClusterShardsImpl({GetEmulatedShardInfo(cntx)}, builder);
} else if (tl_cluster_config != nullptr) {
return ClusterShardsImpl(tl_cluster_config->GetConfig(), cntx);
return ClusterShardsImpl(tl_cluster_config->GetConfig(), builder);
} else {
return cntx->SendError(kClusterNotConfigured);
return builder->SendError(kClusterNotConfigured);
}
}
namespace {
void ClusterSlotsImpl(const ClusterShardInfos& config, ConnectionContext* cntx) {
void ClusterSlotsImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder) {
// For more details https://redis.io/commands/cluster-slots/
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
auto WriteNode = [&](const ClusterNodeInfo& node) {
constexpr unsigned int kNodeSize = 3;
rb->StartArray(kNodeSize);
@ -246,18 +247,19 @@ void ClusterSlotsImpl(const ClusterShardInfos& config, ConnectionContext* cntx)
}
} // namespace
void ClusterFamily::ClusterSlots(ConnectionContext* cntx) {
void ClusterFamily::ClusterSlots(SinkReplyBuilder* builder, ConnectionContext* cntx) {
if (IsClusterEmulated()) {
return ClusterSlotsImpl({GetEmulatedShardInfo(cntx)}, cntx);
return ClusterSlotsImpl({GetEmulatedShardInfo(cntx)}, builder);
} else if (tl_cluster_config != nullptr) {
return ClusterSlotsImpl(tl_cluster_config->GetConfig(), cntx);
return ClusterSlotsImpl(tl_cluster_config->GetConfig(), builder);
} else {
return cntx->SendError(kClusterNotConfigured);
return builder->SendError(kClusterNotConfigured);
}
}
namespace {
void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id, ConnectionContext* cntx) {
void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id,
SinkReplyBuilder* builder) {
// For more details https://redis.io/commands/cluster-nodes/
string result;
@ -296,23 +298,23 @@ void ClusterNodesImpl(const ClusterShardInfos& config, string_view my_id, Connec
}
}
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
return rb->SendBulkString(result);
}
} // namespace
void ClusterFamily::ClusterNodes(ConnectionContext* cntx) {
void ClusterFamily::ClusterNodes(SinkReplyBuilder* builder, ConnectionContext* cntx) {
if (IsClusterEmulated()) {
return ClusterNodesImpl({GetEmulatedShardInfo(cntx)}, id_, cntx);
return ClusterNodesImpl({GetEmulatedShardInfo(cntx)}, id_, builder);
} else if (tl_cluster_config != nullptr) {
return ClusterNodesImpl(tl_cluster_config->GetConfig(), id_, cntx);
return ClusterNodesImpl(tl_cluster_config->GetConfig(), id_, builder);
} else {
return cntx->SendError(kClusterNotConfigured);
return builder->SendError(kClusterNotConfigured);
}
}
namespace {
void ClusterInfoImpl(const ClusterShardInfos& config, ConnectionContext* cntx) {
void ClusterInfoImpl(const ClusterShardInfos& config, SinkReplyBuilder* builder) {
std::string msg;
auto append = [&msg](absl::AlphaNum a1, absl::AlphaNum a2) {
// Separate lines with \r\n, not \n, see #2726
@ -360,81 +362,82 @@ void ClusterInfoImpl(const ClusterShardInfos& config, ConnectionContext* cntx) {
append("cluster_stats_messages_pong_received", 1);
append("cluster_stats_messages_meet_received", 0);
append("cluster_stats_messages_received", 1);
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->SendBulkString(msg);
}
} // namespace
void ClusterFamily::ClusterInfo(ConnectionContext* cntx) {
void ClusterFamily::ClusterInfo(SinkReplyBuilder* builder, ConnectionContext* cntx) {
if (IsClusterEmulated()) {
return ClusterInfoImpl({GetEmulatedShardInfo(cntx)}, cntx);
return ClusterInfoImpl({GetEmulatedShardInfo(cntx)}, builder);
} else if (tl_cluster_config != nullptr) {
return ClusterInfoImpl(tl_cluster_config->GetConfig(), cntx);
return ClusterInfoImpl(tl_cluster_config->GetConfig(), builder);
} else {
return ClusterInfoImpl({}, cntx);
return ClusterInfoImpl({}, builder);
}
}
void ClusterFamily::KeySlot(CmdArgList args, ConnectionContext* cntx) {
void ClusterFamily::KeySlot(CmdArgList args, SinkReplyBuilder* builder) {
if (args.size() != 2) {
return cntx->SendError(WrongNumArgsError("CLUSTER KEYSLOT"));
return builder->SendError(WrongNumArgsError("CLUSTER KEYSLOT"));
}
SlotId id = cluster::KeySlot(ArgS(args, 1));
return cntx->SendLong(id);
return builder->SendLong(id);
}
void ClusterFamily::Cluster(CmdArgList args, ConnectionContext* cntx) {
void ClusterFamily::Cluster(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx) {
// In emulated cluster mode, all slots are mapped to the same host, and number of cluster
// instances is thus 1.
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
if (!IsClusterEnabledOrEmulated()) {
return cntx->SendError(kClusterDisabled);
return builder->SendError(kClusterDisabled);
}
if (sub_cmd == "KEYSLOT") {
return KeySlot(args, cntx);
return KeySlot(args, builder);
}
if (args.size() > 1) {
return cntx->SendError(WrongNumArgsError(absl::StrCat("CLUSTER ", sub_cmd)));
return builder->SendError(WrongNumArgsError(absl::StrCat("CLUSTER ", sub_cmd)));
}
if (sub_cmd == "HELP") {
return ClusterHelp(cntx);
return ClusterHelp(builder);
} else if (sub_cmd == "MYID") {
return ClusterMyId(cntx);
return ClusterMyId(builder);
} else if (sub_cmd == "SHARDS") {
return ClusterShards(cntx);
return ClusterShards(builder, cntx);
} else if (sub_cmd == "SLOTS") {
return ClusterSlots(cntx);
return ClusterSlots(builder, cntx);
} else if (sub_cmd == "NODES") {
return ClusterNodes(cntx);
return ClusterNodes(builder, cntx);
} else if (sub_cmd == "INFO") {
return ClusterInfo(cntx);
return ClusterInfo(builder, cntx);
} else {
return cntx->SendError(facade::UnknownSubCmd(sub_cmd, "CLUSTER"), facade::kSyntaxErrType);
return builder->SendError(facade::UnknownSubCmd(sub_cmd, "CLUSTER"), facade::kSyntaxErrType);
}
}
void ClusterFamily::ReadOnly(CmdArgList args, ConnectionContext* cntx) {
void ClusterFamily::ReadOnly(CmdArgList args, SinkReplyBuilder* builder) {
if (!IsClusterEmulated()) {
return cntx->SendError(kClusterDisabled);
return builder->SendError(kClusterDisabled);
}
cntx->SendOk();
builder->SendOk();
}
void ClusterFamily::ReadWrite(CmdArgList args, ConnectionContext* cntx) {
void ClusterFamily::ReadWrite(CmdArgList args, SinkReplyBuilder* builder) {
if (!IsClusterEmulated()) {
return cntx->SendError(kClusterDisabled);
return builder->SendError(kClusterDisabled);
}
cntx->SendOk();
builder->SendOk();
}
void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
void ClusterFamily::DflyCluster(CmdArgList args, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (!(IsClusterEnabled() || (IsClusterEmulated() && cntx->journal_emulated))) {
return cntx->SendError("Cluster is disabled. Use --cluster_mode=yes to enable.");
return builder->SendError("Cluster is disabled. Use --cluster_mode=yes to enable.");
}
if (cntx->conn()) {
@ -446,20 +449,20 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
args.remove_prefix(1); // remove subcommand name
if (sub_cmd == "GETSLOTINFO") {
return DflyClusterGetSlotInfo(args, cntx);
return DflyClusterGetSlotInfo(args, builder);
} else if (sub_cmd == "CONFIG") {
return DflyClusterConfig(args, cntx);
return DflyClusterConfig(args, builder, cntx);
} else if (sub_cmd == "FLUSHSLOTS") {
return DflyClusterFlushSlots(args, cntx);
return DflyClusterFlushSlots(args, builder);
} else if (sub_cmd == "SLOT-MIGRATION-STATUS") {
return DflySlotMigrationStatus(args, cntx);
return DflySlotMigrationStatus(args, builder);
}
return cntx->SendError(UnknownSubCmd(sub_cmd, "DFLYCLUSTER"), kSyntaxErrType);
return builder->SendError(UnknownSubCmd(sub_cmd, "DFLYCLUSTER"), kSyntaxErrType);
}
void ClusterFamily::ClusterMyId(ConnectionContext* cntx) {
cntx->SendSimpleString(id_);
void ClusterFamily::ClusterMyId(SinkReplyBuilder* builder) {
builder->SendSimpleString(id_);
}
namespace {
@ -520,18 +523,19 @@ void WriteFlushSlotsToJournal(const SlotRanges& slot_ranges) {
}
} // namespace
void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx) {
void ClusterFamily::DflyClusterConfig(CmdArgList args, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (args.size() != 1) {
return cntx->SendError(WrongNumArgsError("DFLYCLUSTER CONFIG"));
return builder->SendError(WrongNumArgsError("DFLYCLUSTER CONFIG"));
}
string_view json_str = ArgS(args, 0);
shared_ptr<ClusterConfig> new_config = ClusterConfig::CreateFromConfig(id_, json_str);
if (new_config == nullptr) {
LOG(WARNING) << "Can't set cluster config";
return cntx->SendError("Invalid cluster configuration.");
return builder->SendError("Invalid cluster configuration.");
} else if (tl_cluster_config && tl_cluster_config->GetConfig() == new_config->GetConfig()) {
return cntx->SendOk();
return builder->SendOk();
}
PreparedToRemoveOutgoingMigrations outgoing_migrations; // should be removed without mutex lock
@ -601,24 +605,24 @@ void ClusterFamily::DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
}
}
return cntx->SendOk();
return builder->SendOk();
}
void ClusterFamily::DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx) {
void ClusterFamily::DflyClusterGetSlotInfo(CmdArgList args, SinkReplyBuilder* builder) {
CmdArgParser parser(args);
parser.ExpectTag("SLOTS");
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
vector<std::pair<SlotId, SlotStats>> slots_stats;
do {
auto sid = parser.Next<uint32_t>();
if (sid > kMaxSlotNum)
return cntx->SendError("Invalid slot id");
return builder->SendError("Invalid slot id");
slots_stats.emplace_back(sid, SlotStats{});
} while (parser.HasNext());
if (auto err = parser.Error(); err)
return cntx->SendError(err->MakeReply());
return builder->SendError(err->MakeReply());
fb2::Mutex mu;
@ -651,7 +655,7 @@ void ClusterFamily::DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* c
}
}
void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx) {
void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, SinkReplyBuilder* builder) {
std::vector<SlotRange> slot_ranges;
CmdArgParser parser(args);
@ -661,11 +665,11 @@ void ClusterFamily::DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cn
} while (parser.HasNext());
if (auto err = parser.Error(); err)
return cntx->SendError(err->MakeReply());
return builder->SendError(err->MakeReply());
DeleteSlots(SlotRanges(std::move(slot_ranges)));
return cntx->SendOk();
return builder->SendOk();
}
void ClusterFamily::StartSlotMigrations(std::vector<MigrationInfo> migrations) {
@ -691,8 +695,8 @@ static string_view StateToStr(MigrationState state) {
return "UNDEFINED_STATE"sv;
}
void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, SinkReplyBuilder* builder) {
auto* rb = static_cast<RedisReplyBuilder*>(builder);
CmdArgParser parser(args);
util::fb2::LockGuard lk(migration_mu_);
@ -701,7 +705,7 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext*
if (parser.HasNext()) {
node_id = parser.Next<std::string_view>();
if (auto err = parser.Error(); err) {
return cntx->SendError(err->MakeReply());
return builder->SendError(err->MakeReply());
}
}
@ -745,18 +749,19 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext*
}
}
void ClusterFamily::DflyMigrate(CmdArgList args, ConnectionContext* cntx) {
void ClusterFamily::DflyMigrate(CmdArgList args, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
args.remove_prefix(1);
if (sub_cmd == "INIT") {
InitMigration(args, cntx);
InitMigration(args, builder);
} else if (sub_cmd == "FLOW") {
DflyMigrateFlow(args, cntx);
DflyMigrateFlow(args, builder, cntx);
} else if (sub_cmd == "ACK") {
DflyMigrateAck(args, cntx);
DflyMigrateAck(args, builder);
} else {
cntx->SendError(facade::UnknownSubCmd(sub_cmd, "DFLYMIGRATE"), facade::kSyntaxErrType);
builder->SendError(facade::UnknownSubCmd(sub_cmd, "DFLYMIGRATE"), facade::kSyntaxErrType);
}
}
@ -850,7 +855,7 @@ void ClusterFamily::RemoveIncomingMigrations(const std::vector<MigrationInfo>& m
}
}
void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) {
void ClusterFamily::InitMigration(CmdArgList args, SinkReplyBuilder* builder) {
VLOG(1) << "Create incoming migration, args: " << args;
CmdArgParser parser{args};
@ -863,7 +868,7 @@ void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) {
} while (parser.HasNext());
if (auto err = parser.Error(); err)
return cntx->SendError(err->MakeReply());
return builder->SendError(err->MakeReply());
const auto& incoming_migrations = cluster_config()->GetIncomingMigrations();
bool found = any_of(incoming_migrations.begin(), incoming_migrations.end(),
@ -873,7 +878,7 @@ void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) {
});
if (!found) {
VLOG(1) << "Unrecognized incoming migration from " << source_id;
return cntx->SendSimpleString(OutgoingMigration::kUnknownMigration);
return builder->SendSimpleString(OutgoingMigration::kUnknownMigration);
}
VLOG(1) << "Init migration " << source_id;
@ -885,7 +890,7 @@ void ClusterFamily::InitMigration(CmdArgList args, ConnectionContext* cntx) {
incoming_migrations_jobs_.emplace_back(make_shared<IncomingSlotMigration>(
string(source_id), &server_family_->service(), SlotRanges(std::move(slots)), flows_num));
return cntx->SendOk();
return builder->SendOk();
}
std::shared_ptr<OutgoingMigration> ClusterFamily::CreateOutgoingMigration(MigrationInfo info) {
@ -895,12 +900,13 @@ std::shared_ptr<OutgoingMigration> ClusterFamily::CreateOutgoingMigration(Migrat
return migration;
}
void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {
void ClusterFamily::DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
CmdArgParser parser{args};
auto [source_id, shard_id] = parser.Next<std::string_view, uint32_t>();
if (auto err = parser.Error(); err) {
return cntx->SendError(err->MakeReply());
return builder->SendError(err->MakeReply());
}
auto host_ip = cntx->conn()->RemoteEndpointAddress();
@ -911,7 +917,7 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {
auto migration = GetIncomingMigration(source_id);
if (!migration) {
return cntx->SendError(kIdNotFound);
return builder->SendError(kIdNotFound);
}
DCHECK(cntx->sync_dispatch);
@ -919,7 +925,7 @@ void ClusterFamily::DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx) {
// TODO provide a more clear approach
cntx->sync_dispatch = false;
cntx->SendOk();
builder->SendOk();
migration->StartFlow(shard_id, cntx->conn()->socket());
}
@ -965,12 +971,12 @@ void ClusterFamily::ApplyMigrationSlotRangeToConfig(std::string_view node_id,
<< " : " << node_id;
}
void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) {
void ClusterFamily::DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder) {
CmdArgParser parser{args};
auto [source_id, attempt] = parser.Next<std::string_view, long>();
if (auto err = parser.Error(); err) {
return cntx->SendError(err->MakeReply());
return builder->SendError(err->MakeReply());
}
VLOG(1) << "DFLYMIGRATE ACK" << args;
@ -980,26 +986,37 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) {
[source_id = source_id](const auto& m) { return m.node_info.id == source_id; });
if (m_it == in_migrations.end()) {
LOG(WARNING) << "migration isn't in config";
return cntx->SendLong(OutgoingMigration::kInvalidAttempt);
return builder->SendLong(OutgoingMigration::kInvalidAttempt);
}
auto migration = GetIncomingMigration(source_id);
if (!migration)
return cntx->SendError(kIdNotFound);
return builder->SendError(kIdNotFound);
if (!migration->Join(attempt)) {
return cntx->SendError("Join timeout happened");
return builder->SendError("Join timeout happened");
}
ApplyMigrationSlotRangeToConfig(migration->GetSourceID(), migration->GetSlots(), true);
return cntx->SendLong(attempt);
return builder->SendLong(attempt);
}
using EngineFunc = void (ClusterFamily::*)(CmdArgList args, ConnectionContext* cntx);
using EngineFunc = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* builder,
ConnectionContext* cntx);
inline CommandId::Handler HandlerFunc(ClusterFamily* se, EngineFunc f) {
return [=](CmdArgList args, ConnectionContext* cntx) { return (se->*f)(args, cntx); };
using EngineFunc2 = void (ClusterFamily::*)(CmdArgList args, SinkReplyBuilder* builder);
inline CommandId::Handler3 HandlerFunc(ClusterFamily* se, EngineFunc f) {
return [=](CmdArgList args, Transaction*, SinkReplyBuilder* builder, ConnectionContext* cntx) {
return (se->*f)(args, builder, cntx);
};
}
inline CommandId::Handler2 HandlerFunc(ClusterFamily* se, EngineFunc2 f) {
return [=](CmdArgList args, Transaction*, SinkReplyBuilder* builder) {
return (se->*f)(args, builder);
};
}
#define HFUNC(x) SetHandler(HandlerFunc(this, &ClusterFamily::x))

View file

@ -14,6 +14,10 @@
#include "server/cluster/outgoing_slot_migration.h"
#include "server/common.h"
namespace facade {
class SinkReplyBuilder;
} // namespace facade
namespace dfly {
class ServerFamily;
class CommandRegistry;
@ -41,45 +45,47 @@ class ClusterFamily {
}
private:
using SinkReplyBuilder = facade::SinkReplyBuilder;
// Cluster commands compatible with Redis
void Cluster(CmdArgList args, ConnectionContext* cntx);
void ClusterHelp(ConnectionContext* cntx);
void ClusterShards(ConnectionContext* cntx);
void ClusterSlots(ConnectionContext* cntx);
void ClusterNodes(ConnectionContext* cntx);
void ClusterInfo(ConnectionContext* cntx);
void ClusterMyId(ConnectionContext* cntx);
void Cluster(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx);
void ClusterHelp(SinkReplyBuilder* builder);
void ClusterShards(SinkReplyBuilder* builder, ConnectionContext* cntx);
void ClusterSlots(SinkReplyBuilder* builder, ConnectionContext* cntx);
void ClusterNodes(SinkReplyBuilder* builder, ConnectionContext* cntx);
void ClusterInfo(SinkReplyBuilder* builder, ConnectionContext* cntx);
void ClusterMyId(SinkReplyBuilder* builder);
void KeySlot(CmdArgList args, ConnectionContext* cntx);
void KeySlot(CmdArgList args, SinkReplyBuilder* builder);
void ReadOnly(CmdArgList args, ConnectionContext* cntx);
void ReadWrite(CmdArgList args, ConnectionContext* cntx);
void ReadOnly(CmdArgList args, SinkReplyBuilder* builder);
void ReadWrite(CmdArgList args, SinkReplyBuilder* builder);
// Custom Dragonfly commands for cluster management
void DflyCluster(CmdArgList args, ConnectionContext* cntx);
void DflyClusterConfig(CmdArgList args, ConnectionContext* cntx)
ABSL_LOCKS_EXCLUDED(set_config_mu, migration_mu_);
void DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* cntx)
void DflyCluster(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx);
void DflyClusterConfig(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx);
ABSL_LOCKS_EXCLUDED(set_config_mu, migration_mu_);
void DflyClusterGetSlotInfo(CmdArgList args, SinkReplyBuilder* builder)
ABSL_LOCKS_EXCLUDED(migration_mu_);
void DflyClusterFlushSlots(CmdArgList args, ConnectionContext* cntx);
void DflyClusterFlushSlots(CmdArgList args, SinkReplyBuilder* builder);
private: // Slots migration section
void DflySlotMigrationStatus(CmdArgList args, ConnectionContext* cntx)
void DflySlotMigrationStatus(CmdArgList args, SinkReplyBuilder* builder)
ABSL_LOCKS_EXCLUDED(migration_mu_);
// DFLYMIGRATE is internal command defines several steps in slots migrations process
void DflyMigrate(CmdArgList args, ConnectionContext* cntx);
void DflyMigrate(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx);
// DFLYMIGRATE INIT is internal command to create incoming migration object
void InitMigration(CmdArgList args, ConnectionContext* cntx) ABSL_LOCKS_EXCLUDED(migration_mu_);
void InitMigration(CmdArgList args, SinkReplyBuilder* builder) ABSL_LOCKS_EXCLUDED(migration_mu_);
// DFLYMIGRATE FLOW initiate second step in slots migration procedure
// this request should be done for every shard on the target node
// this method assocciate connection and shard that will be the data
// source for migration
void DflyMigrateFlow(CmdArgList args, ConnectionContext* cntx);
void DflyMigrateFlow(CmdArgList args, SinkReplyBuilder* builder, ConnectionContext* cntx);
void DflyMigrateAck(CmdArgList args, ConnectionContext* cntx);
void DflyMigrateAck(CmdArgList args, SinkReplyBuilder* builder);
std::shared_ptr<IncomingSlotMigration> GetIncomingMigration(std::string_view source_id)
ABSL_LOCKS_EXCLUDED(migration_mu_);

View file

@ -768,7 +768,7 @@ string CreateExecDescriptor(const std::vector<StoredCmd>& stored_cmds, unsigned
// Ensures availability of an interpreter for EVAL-like commands and it's automatic release.
// If it's part of MULTI, the preborrowed interpreter is returned, otherwise a new is acquired.
struct BorrowedInterpreter {
explicit BorrowedInterpreter(ConnectionContext* cntx) {
BorrowedInterpreter(Transaction* tx, ConnectionContext* cntx) {
// Ensure squashing ignores EVAL. We can't run on a stub context, because it doesn't have our
// preborrowed interpreter (which can't be shared on multiple threads).
CHECK(!cntx->conn_state.squashing_info);
@ -781,7 +781,7 @@ struct BorrowedInterpreter {
} else {
// A scheduled transaction occupies a place in the transaction queue and holds locks,
// preventing other transactions from progressing. Blocking below can deadlock!
CHECK(!cntx->transaction->IsScheduled());
CHECK(!tx->IsScheduled());
interpreter_ = ServerState::tlocal()->BorrowInterpreter();
owned_ = true;
@ -1239,8 +1239,8 @@ std::optional<ErrorReply> Service::VerifyCommandState(const CommandId* cid, CmdA
}
void Service::DispatchCommand(ArgSlice args, facade::ConnectionContext* cntx) {
absl::Cleanup clear_last_error(
[cntx]() { std::ignore = cntx->reply_builder()->ConsumeLastError(); });
auto* builder = cntx->reply_builder();
absl::Cleanup clear_last_error([builder]() { std::ignore = builder->ConsumeLastError(); });
DCHECK(!args.empty());
DCHECK_NE(0u, shard_set->size()) << "Init was not called";
@ -1250,7 +1250,7 @@ void Service::DispatchCommand(ArgSlice args, facade::ConnectionContext* cntx) {
const auto [cid, args_no_cmd] = registry_.FindExtended(cmd, args.subspan(1));
if (cid == nullptr) {
return cntx->SendError(ReportUnknownCmd(cmd));
return builder->SendError(ReportUnknownCmd(cmd));
}
ConnectionContext* dfly_cntx = static_cast<ConnectionContext*>(cntx);
@ -1285,7 +1285,7 @@ void Service::DispatchCommand(ArgSlice args, facade::ConnectionContext* cntx) {
server_family_.GetDflyCmd()->OnClose(dfly_cntx->conn_state.replication_info.repl_session_id);
return;
}
dfly_cntx->SendError(std::move(*err));
builder->SendError(std::move(*err));
return;
}
@ -1301,7 +1301,7 @@ void Service::DispatchCommand(ArgSlice args, facade::ConnectionContext* cntx) {
if (stored_cmd.Cid()->IsWriteOnly()) {
dfly_cntx->conn_state.exec_info.is_write = true;
}
return cntx->SendSimpleString("QUEUED");
return builder->SendSimpleString("QUEUED");
}
// Create command transaction
@ -1315,7 +1315,7 @@ void Service::DispatchCommand(ArgSlice args, facade::ConnectionContext* cntx) {
dfly_cntx->ns, dfly_cntx->conn_state.db_index, args_no_cmd);
if (status != OpStatus::OK)
return cntx->SendError(status);
return builder->SendError(status);
}
} else {
DCHECK(dfly_cntx->transaction == nullptr);
@ -1328,7 +1328,7 @@ void Service::DispatchCommand(ArgSlice args, facade::ConnectionContext* cntx) {
if (auto st =
dist_trans->InitByArgs(dfly_cntx->ns, dfly_cntx->conn_state.db_index, args_no_cmd);
st != OpStatus::OK)
return cntx->SendError(st);
return builder->SendError(st);
}
dfly_cntx->transaction = dist_trans.get();
@ -1341,8 +1341,8 @@ void Service::DispatchCommand(ArgSlice args, facade::ConnectionContext* cntx) {
dfly_cntx->cid = cid;
if (!InvokeCmd(cid, args_no_cmd, dfly_cntx)) {
dfly_cntx->SendError("Internal Error");
dfly_cntx->reply_builder()->CloseConnection();
builder->SendError("Internal Error");
builder->CloseConnection();
}
if (!dispatching_in_multi) {
@ -1352,16 +1352,16 @@ void Service::DispatchCommand(ArgSlice args, facade::ConnectionContext* cntx) {
class ReplyGuard {
public:
ReplyGuard(ConnectionContext* cntx, std::string_view cid_name) {
ReplyGuard(std::string_view cid_name, SinkReplyBuilder* builder, ConnectionContext* cntx) {
const bool is_script = bool(cntx->conn_state.script_info);
const bool is_one_of =
absl::flat_hash_set<std::string_view>({"REPLCONF", "DFLY"}).contains(cid_name);
auto* maybe_mcache = dynamic_cast<MCReplyBuilder*>(cntx->reply_builder());
bool is_mcache = builder->type() == SinkReplyBuilder::MC;
const bool is_no_reply_memcache =
maybe_mcache && (maybe_mcache->NoReply() || cid_name == "QUIT");
is_mcache && (static_cast<MCReplyBuilder*>(builder)->NoReply() || cid_name == "QUIT");
const bool should_dcheck = !is_one_of && !is_script && !is_no_reply_memcache;
if (should_dcheck) {
builder_ = cntx->reply_builder();
builder_ = builder;
builder_->ExpectReply();
}
}
@ -1400,6 +1400,7 @@ OpResult<void> OpTrackKeys(const OpArgs slice_args, const facade::Connection::We
bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionContext* cntx) {
DCHECK(cid);
DCHECK(!cid->Validate(tail_args));
auto* builder = cntx->reply_builder();
if (auto err = VerifyCommandExecution(cid, cntx, tail_args); err) {
// We need to skip this because ACK's should not be replied to
@ -1408,8 +1409,8 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo
if (cid->name() == "REPLCONF" && absl::EqualsIgnoreCase(ArgS(tail_args, 0), "ACK")) {
return true;
}
cntx->SendError(std::move(*err));
std::ignore = cntx->reply_builder()->ConsumeLastError();
builder->SendError(std::move(*err));
std::ignore = builder->ConsumeLastError();
return true; // return false only for internal error aborts
}
@ -1421,18 +1422,17 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo
}
ServerState::tlocal()->RecordCmd();
Transaction* tx = cntx->transaction;
auto& info = cntx->conn_state.tracking_info_;
auto* trans = cntx->transaction;
const bool is_read_only = cid->opt_mask() & CO::READONLY;
if (trans) {
if (tx) {
// Reset it, because in multi/exec the transaction pointer is the same and
// we will end up triggerring the callback on the following commands. To avoid this
// we reset it.
trans->SetTrackingCallback({});
tx->SetTrackingCallback({});
if (is_read_only && info.ShouldTrackKeys()) {
auto conn = cntx->conn()->Borrow();
trans->SetTrackingCallback([conn](Transaction* trans) {
tx->SetTrackingCallback([conn](Transaction* trans) {
auto* shard = EngineShard::tlocal();
OpTrackKeys(trans->GetOpArgs(shard), conn, trans->GetShardArgs(shard->shard_id()));
});
@ -1441,10 +1441,10 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo
#ifndef NDEBUG
// Verifies that we reply to the client when needed.
ReplyGuard reply_guard(cntx, cid->name());
ReplyGuard reply_guard(cid->name(), builder, cntx);
#endif
uint64_t invoke_time_usec = 0;
auto last_error = cntx->reply_builder()->ConsumeLastError();
auto last_error = builder->ConsumeLastError();
DCHECK(last_error.empty());
try {
invoke_time_usec = cid->Invoke(tail_args, cntx);
@ -1453,7 +1453,7 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo
return false;
}
std::string reason = cntx->reply_builder()->ConsumeLastError();
std::string reason = builder->ConsumeLastError();
if (!reason.empty()) {
VLOG(2) << FailedCommandToString(cid->name(), tail_args, reason);
@ -1461,7 +1461,7 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo
}
auto cid_name = cid->name();
if ((!trans && cid_name != "MULTI") || (trans && !trans->IsMulti())) {
if ((!tx && cid_name != "MULTI") || (tx && !tx->IsMulti())) {
// Each time we execute a command we need to increase the sequence number in
// order to properly track clients when OPTIN is used.
// We don't do this for `multi/exec` because it would break the
@ -1490,9 +1490,8 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo
absl::GetCurrentTimeNanos() / 1000);
}
if (cntx->transaction && !cntx->conn_state.exec_info.IsRunning() &&
cntx->conn_state.script_info == nullptr) {
cntx->last_command_debug.clock = cntx->transaction->txid();
if (tx && !cntx->conn_state.exec_info.IsRunning() && cntx->conn_state.script_info == nullptr) {
cntx->last_command_debug.clock = tx->txid();
}
return true;
@ -1749,32 +1748,34 @@ absl::flat_hash_map<std::string, unsigned> Service::UknownCmdMap() const {
return unknown_cmds_;
}
void Service::Quit(CmdArgList args, ConnectionContext* cntx) {
if (cntx->protocol() == facade::Protocol::REDIS)
cntx->SendOk();
void Service::Quit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (builder->type() == SinkReplyBuilder::REDIS)
builder->SendOk();
using facade::SinkReplyBuilder;
SinkReplyBuilder* builder = cntx->reply_builder();
builder->CloseConnection();
DeactivateMonitoring(static_cast<ConnectionContext*>(cntx));
cntx->conn()->ShutdownSelf();
}
void Service::Multi(CmdArgList args, ConnectionContext* cntx) {
void Service::Multi(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (cntx->conn_state.exec_info.IsCollecting()) {
return cntx->SendError("MULTI calls can not be nested");
return builder->SendError("MULTI calls can not be nested");
}
cntx->conn_state.exec_info.state = ConnectionState::ExecInfo::EXEC_COLLECT;
// TODO: to protect against huge exec transactions.
return cntx->SendOk();
return builder->SendOk();
}
void Service::Watch(CmdArgList args, ConnectionContext* cntx) {
void Service::Watch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
auto& exec_info = cntx->conn_state.exec_info;
// Skip if EXEC will already fail due previous WATCH.
if (exec_info.watched_dirty.load(memory_order_relaxed)) {
return cntx->SendOk();
return builder->SendOk();
}
atomic_uint32_t keys_existed = 0;
@ -1789,7 +1790,7 @@ void Service::Watch(CmdArgList args, ConnectionContext* cntx) {
keys_existed.fetch_add(res.value_or(0), memory_order_relaxed);
return OpStatus::OK;
};
cntx->transaction->ScheduleSingleHop(std::move(cb));
tx->ScheduleSingleHop(std::move(cb));
// Duplicate keys are stored to keep correct count.
exec_info.watched_existed += keys_existed.load(memory_order_relaxed);
@ -1797,12 +1798,13 @@ void Service::Watch(CmdArgList args, ConnectionContext* cntx) {
exec_info.watched_keys.emplace_back(cntx->db_index(), key);
}
return cntx->SendOk();
return builder->SendOk();
}
void Service::Unwatch(CmdArgList args, ConnectionContext* cntx) {
void Service::Unwatch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
UnwatchAllKeys(cntx->ns, &cntx->conn_state.exec_info);
return cntx->SendOk();
return builder->SendOk();
}
template <typename F> void WithReplies(CapturingReplyBuilder* crb, ConnectionContext* cntx, F&& f) {
@ -1815,7 +1817,7 @@ template <typename F> void WithReplies(CapturingReplyBuilder* crb, ConnectionCon
optional<CapturingReplyBuilder::Payload> Service::FlushEvalAsyncCmds(ConnectionContext* cntx,
bool force) {
auto& info = cntx->conn_state.script_info;
auto* tx = cntx->transaction;
size_t used_mem = info->async_cmds_heap_mem + info->async_cmds.size() * sizeof(StoredCmd);
if ((info->async_cmds.empty() || !force) && used_mem < info->async_cmds_heap_limit)
return nullopt;
@ -1824,7 +1826,7 @@ optional<CapturingReplyBuilder::Payload> Service::FlushEvalAsyncCmds(ConnectionC
auto* eval_cid = registry_.Find("EVAL");
DCHECK(eval_cid);
cntx->transaction->MultiSwitchCmd(eval_cid);
tx->MultiSwitchCmd(eval_cid);
CapturingReplyBuilder crb{ReplyMode::ONLY_ERR};
WithReplies(&crb, cntx, [&] {
@ -1839,7 +1841,8 @@ optional<CapturingReplyBuilder::Payload> Service::FlushEvalAsyncCmds(ConnectionC
}
void Service::CallFromScript(ConnectionContext* cntx, Interpreter::CallArgs& ca) {
DCHECK(cntx->transaction);
auto* tx = cntx->transaction;
DCHECK(tx);
DVLOG(2) << "CallFromScript " << ca.args[0];
InterpreterReplier replier(ca.translator);
@ -1879,28 +1882,30 @@ void Service::CallFromScript(ConnectionContext* cntx, Interpreter::CallArgs& ca)
DispatchCommand(ca.args, cntx);
}
void Service::Eval(CmdArgList args, ConnectionContext* cntx) {
void Service::Eval(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
string_view body = ArgS(args, 0);
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (body.empty()) {
return rb->SendNull();
}
BorrowedInterpreter interpreter{cntx};
BorrowedInterpreter interpreter{tx, cntx};
auto res = server_family_.script_mgr()->Insert(body, interpreter);
if (!res)
return cntx->SendError(res.error().Format(), facade::kScriptErrType);
return builder->SendError(res.error().Format(), facade::kScriptErrType);
string sha{std::move(res.value())};
CallSHA(args, sha, interpreter, cntx);
}
void Service::EvalSha(CmdArgList args, ConnectionContext* cntx) {
void Service::EvalSha(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
string sha = absl::AsciiStrToLower(ArgS(args, 0));
BorrowedInterpreter interpreter{cntx};
BorrowedInterpreter interpreter{cntx->transaction, cntx};
CallSHA(args, sha, interpreter, cntx);
}
@ -1958,17 +1963,18 @@ Transaction::MultiMode DetermineMultiMode(ScriptMgr::ScriptParams params) {
// Return nullopt if eval runs inside multi and conflicts with multi mode
optional<bool> StartMultiEval(DbIndex dbid, CmdArgList keys, ScriptMgr::ScriptParams params,
ConnectionContext* cntx) {
Transaction* trans = cntx->transaction;
Transaction* tx = cntx->transaction;
Namespace* ns = cntx->ns;
SinkReplyBuilder* builder = cntx->reply_builder();
Transaction::MultiMode script_mode = DetermineMultiMode(params);
Transaction::MultiMode multi_mode = trans->GetMultiMode();
Transaction::MultiMode multi_mode = tx->GetMultiMode();
// Check if eval is already part of a running multi transaction
if (multi_mode != Transaction::NOT_DETERMINED) {
if (multi_mode > script_mode) {
string err = StrCat(
"Multi mode conflict when running eval in multi transaction. Multi mode is: ", multi_mode,
" eval mode is: ", script_mode);
cntx->SendError(err);
builder->SendError(err);
return nullopt;
}
return false;
@ -1979,13 +1985,13 @@ optional<bool> StartMultiEval(DbIndex dbid, CmdArgList keys, ScriptMgr::ScriptPa
switch (script_mode) {
case Transaction::GLOBAL:
trans->StartMultiGlobal(ns, dbid);
tx->StartMultiGlobal(ns, dbid);
return true;
case Transaction::LOCK_AHEAD:
trans->StartMultiLockedAhead(ns, dbid, keys);
tx->StartMultiLockedAhead(ns, dbid, keys);
return true;
case Transaction::NON_ATOMIC:
trans->StartMultiNonAtomic();
tx->StartMultiNonAtomic();
return true;
default:
CHECK(false) << "Invalid mode";
@ -2017,15 +2023,16 @@ static bool CanRunSingleShardMulti(optional<ShardId> sid, const ScriptMgr::Scrip
void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpreter* interpreter,
ConnectionContext* cntx) {
DCHECK(!eval_args.sha.empty());
auto* builder = cntx->reply_builder();
// Sanitizing the input to avoid code injection.
if (eval_args.sha.size() != 40 || !IsSHA(eval_args.sha)) {
return cntx->SendError(facade::kScriptNotFound);
return builder->SendError(facade::kScriptNotFound);
}
auto params = LoadScript(eval_args.sha, server_family_.script_mgr(), interpreter);
if (!params)
return cntx->SendError(facade::kScriptNotFound);
return builder->SendError(facade::kScriptNotFound);
string error;
@ -2115,31 +2122,32 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
// Conclude the transaction.
if (*scheduled)
cntx->transaction->UnlockMulti();
tx->UnlockMulti();
}
if (result == Interpreter::RUN_ERR) {
string resp = StrCat("Error running script (call to ", eval_args.sha, "): ", error);
server_family_.script_mgr()->OnScriptError(eval_args.sha, error);
return cntx->SendError(resp, facade::kScriptErrType);
return builder->SendError(resp, facade::kScriptErrType);
}
CHECK(result == Interpreter::RUN_OK);
SinkReplyBuilder::ReplyAggregator agg(cntx->reply_builder());
EvalSerializer ser{static_cast<RedisReplyBuilder*>(cntx->reply_builder())};
SinkReplyBuilder::ReplyAggregator agg(builder);
EvalSerializer ser{static_cast<RedisReplyBuilder*>(builder)};
if (!interpreter->IsResultSafe()) {
cntx->SendError("reached lua stack limit");
builder->SendError("reached lua stack limit");
} else {
interpreter->SerializeResult(&ser);
}
}
void Service::Discard(CmdArgList args, ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
void Service::Discard(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(builder);
if (!cntx->conn_state.exec_info.IsCollecting()) {
return cntx->SendError("DISCARD without MULTI");
return builder->SendError("DISCARD without MULTI");
}
MultiCleanup(cntx);
@ -2150,6 +2158,7 @@ void Service::Discard(CmdArgList args, ConnectionContext* cntx) {
bool CheckWatchedKeyExpiry(ConnectionContext* cntx, const CommandId* exists_cid,
const CommandId* exec_cid) {
auto& exec_info = cntx->conn_state.exec_info;
auto* tx = cntx->transaction;
CmdArgVec str_list(exec_info.watched_keys.size());
for (size_t i = 0; i < str_list.size(); i++) {
@ -2166,13 +2175,13 @@ bool CheckWatchedKeyExpiry(ConnectionContext* cntx, const CommandId* exists_cid,
return OpStatus::OK;
};
cntx->transaction->MultiSwitchCmd(exists_cid);
cntx->transaction->InitByArgs(cntx->ns, cntx->conn_state.db_index, CmdArgList{str_list});
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
tx->MultiSwitchCmd(exists_cid);
tx->InitByArgs(cntx->ns, cntx->conn_state.db_index, CmdArgList{str_list});
OpStatus status = tx->ScheduleSingleHop(std::move(cb));
CHECK_EQ(OpStatus::OK, status);
// Reset cid to EXEC as it was before
cntx->transaction->MultiSwitchCmd(exec_cid);
tx->MultiSwitchCmd(exec_cid);
// The comparison can still be true even if a key expired due to another one being created.
// So we have to check the watched_dirty flag, which is set if a key expired.
@ -2223,42 +2232,43 @@ CmdArgVec CollectAllKeys(ConnectionState::ExecInfo* exec_info) {
// Return true if transaction was scheduled, false if scheduling was not required.
void StartMultiExec(ConnectionContext* cntx, ConnectionState::ExecInfo* exec_info,
Transaction::MultiMode multi_mode) {
auto trans = cntx->transaction;
auto* tx = cntx->transaction;
auto dbid = cntx->db_index();
switch (multi_mode) {
case Transaction::GLOBAL:
trans->StartMultiGlobal(cntx->ns, dbid);
tx->StartMultiGlobal(cntx->ns, dbid);
break;
case Transaction::LOCK_AHEAD: {
auto vec = CollectAllKeys(exec_info);
trans->StartMultiLockedAhead(cntx->ns, dbid, absl::MakeSpan(vec));
tx->StartMultiLockedAhead(cntx->ns, dbid, absl::MakeSpan(vec));
} break;
case Transaction::NON_ATOMIC:
trans->StartMultiNonAtomic();
tx->StartMultiNonAtomic();
break;
case Transaction::NOT_DETERMINED:
LOG(FATAL) << "should not reach";
};
}
void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
void Service::Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(builder);
auto& exec_info = cntx->conn_state.exec_info;
// Clean the context no matter the outcome
absl::Cleanup exec_clear = [&cntx] { MultiCleanup(cntx); };
if (exec_info.state == ConnectionState::ExecInfo::EXEC_ERROR) {
return cntx->SendError("-EXECABORT Transaction discarded because of previous errors");
return builder->SendError("-EXECABORT Transaction discarded because of previous errors");
}
// Check basic invariants
if (!exec_info.IsCollecting()) {
return cntx->SendError("EXEC without MULTI");
return builder->SendError("EXEC without MULTI");
}
if (IsWatchingOtherDbs(cntx->db_index(), exec_info)) {
return cntx->SendError("Dragonfly does not allow WATCH and EXEC on different databases");
return builder->SendError("Dragonfly does not allow WATCH and EXEC on different databases");
}
if (exec_info.watched_dirty.load(memory_order_relaxed)) {
@ -2272,7 +2282,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
// We borrow a single interpreter for all the EVALs inside. Returned by MultiCleanup
if (state != ExecEvalState::NONE) {
exec_info.preborrowed_interpreter = BorrowedInterpreter(cntx).Release();
exec_info.preborrowed_interpreter = BorrowedInterpreter(tx, cntx).Release();
}
// Determine according multi mode, not only only flag, but based on presence of global commands
@ -2288,7 +2298,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
// EXEC should not run if any of the watched keys expired.
if (!exec_info.watched_keys.empty() &&
!CheckWatchedKeyExpiry(cntx, registry_.Find("EXISTS"), exec_cid_)) {
cntx->transaction->UnlockMulti();
tx->UnlockMulti();
return rb->SendNull();
}
@ -2302,7 +2312,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
if (!exec_info.body.empty()) {
if (GetFlag(FLAGS_track_exec_frequencies)) {
string descr = CreateExecDescriptor(exec_info.body, cntx->transaction->GetUniqueShardCnt());
string descr = CreateExecDescriptor(exec_info.body, tx->GetUniqueShardCnt());
ServerState::tlocal()->exec_freq_count[descr]++;
}
@ -2314,7 +2324,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
for (auto& scmd : exec_info.body) {
VLOG(2) << "TX CMD " << scmd.Cid()->name() << " " << scmd.NumArgs();
cntx->transaction->MultiSwitchCmd(scmd.Cid());
tx->MultiSwitchCmd(scmd.Cid());
cntx->cid = scmd.Cid();
arg_vec.resize(scmd.NumArgs());
@ -2323,9 +2333,9 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
CmdArgList args = absl::MakeSpan(arg_vec);
if (scmd.Cid()->IsTransactional()) {
OpStatus st = cntx->transaction->InitByArgs(cntx->ns, cntx->conn_state.db_index, args);
OpStatus st = tx->InitByArgs(cntx->ns, cntx->conn_state.db_index, args);
if (st != OpStatus::OK) {
cntx->SendError(st);
builder->SendError(st);
break;
}
}
@ -2339,34 +2349,37 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
if (scheduled) {
VLOG(2) << "Exec unlocking " << exec_info.body.size() << " commands";
cntx->transaction->UnlockMulti();
tx->UnlockMulti();
}
cntx->cid = exec_cid_;
VLOG(2) << "Exec completed";
}
void Service::Publish(CmdArgList args, ConnectionContext* cntx) {
void Service::Publish(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (cluster::IsClusterEnabled()) {
return cntx->SendError("PUBLISH is not supported in cluster mode yet");
return builder->SendError("PUBLISH is not supported in cluster mode yet");
}
string_view channel = ArgS(args, 0);
string_view messages[] = {ArgS(args, 1)};
auto* cs = ServerState::tlocal()->channel_store();
cntx->SendLong(cs->SendMessages(channel, messages));
builder->SendLong(cs->SendMessages(channel, messages));
}
void Service::Subscribe(CmdArgList args, ConnectionContext* cntx) {
void Service::Subscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (cluster::IsClusterEnabled()) {
return cntx->SendError("SUBSCRIBE is not supported in cluster mode yet");
return builder->SendError("SUBSCRIBE is not supported in cluster mode yet");
}
cntx->ChangeSubscription(true /*add*/, true /* reply*/, std::move(args));
}
void Service::Unsubscribe(CmdArgList args, ConnectionContext* cntx) {
void Service::Unsubscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (cluster::IsClusterEnabled()) {
return cntx->SendError("UNSUBSCRIBE is not supported in cluster mode yet");
return builder->SendError("UNSUBSCRIBE is not supported in cluster mode yet");
}
if (args.size() == 0) {
cntx->UnsubscribeAll(true);
@ -2375,16 +2388,18 @@ void Service::Unsubscribe(CmdArgList args, ConnectionContext* cntx) {
}
}
void Service::PSubscribe(CmdArgList args, ConnectionContext* cntx) {
void Service::PSubscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (cluster::IsClusterEnabled()) {
return cntx->SendError("PSUBSCRIBE is not supported in cluster mode yet");
return builder->SendError("PSUBSCRIBE is not supported in cluster mode yet");
}
cntx->ChangePSubscription(true, true, args);
}
void Service::PUnsubscribe(CmdArgList args, ConnectionContext* cntx) {
void Service::PUnsubscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (cluster::IsClusterEnabled()) {
return cntx->SendError("PUNSUBSCRIBE is not supported in cluster mode yet");
return builder->SendError("PUNSUBSCRIBE is not supported in cluster mode yet");
}
if (args.size() == 0) {
cntx->PUnsubscribeAll(true);
@ -2395,30 +2410,31 @@ void Service::PUnsubscribe(CmdArgList args, ConnectionContext* cntx) {
// Not a real implementation. Serves as a decorator to accept some function commands
// for testing.
void Service::Function(CmdArgList args, ConnectionContext* cntx) {
void Service::Function(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
string sub_cmd = absl::AsciiStrToUpper(ArgS(args, 0));
if (sub_cmd == "FLUSH") {
return cntx->SendOk();
return builder->SendOk();
}
string err = UnknownSubCmd(sub_cmd, "FUNCTION");
return cntx->SendError(err, kSyntaxErrType);
return builder->SendError(err, kSyntaxErrType);
}
void Service::PubsubChannels(string_view pattern, ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
void Service::PubsubChannels(string_view pattern, SinkReplyBuilder* builder) {
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->SendStringArr(ServerState::tlocal()->channel_store()->ListChannels(pattern));
}
void Service::PubsubPatterns(ConnectionContext* cntx) {
void Service::PubsubPatterns(SinkReplyBuilder* builder) {
size_t pattern_count = ServerState::tlocal()->channel_store()->PatternCount();
cntx->SendLong(pattern_count);
builder->SendLong(pattern_count);
}
void Service::PubsubNumSub(CmdArgList args, ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
void Service::PubsubNumSub(CmdArgList args, SinkReplyBuilder* builder) {
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->StartArray(args.size() * 2);
for (string_view channel : args) {
@ -2427,20 +2443,22 @@ void Service::PubsubNumSub(CmdArgList args, ConnectionContext* cntx) {
}
}
void Service::Monitor(CmdArgList args, ConnectionContext* cntx) {
void Service::Monitor(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
VLOG(1) << "starting monitor on this connection: " << cntx->conn()->GetClientId();
// we are registering the current connection for all threads so they will be aware of
// this connection, to send to it any command
cntx->SendOk();
builder->SendOk();
cntx->ChangeMonitor(true /* start */);
}
void Service::Pubsub(CmdArgList args, ConnectionContext* cntx) {
void Service::Pubsub(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
if (cluster::IsClusterEnabled()) {
return cntx->SendError("PUBSUB is not supported in cluster mode yet");
return builder->SendError("PUBSUB is not supported in cluster mode yet");
}
if (args.size() < 1) {
cntx->SendError(WrongNumArgsError(cntx->cid->name()));
builder->SendError(WrongNumArgsError(cntx->cid->name()));
return;
}
@ -2459,7 +2477,7 @@ void Service::Pubsub(CmdArgList args, ConnectionContext* cntx) {
"HELP",
"\tPrints this help."};
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
rb->SendSimpleStrArr(help_arr);
return;
}
@ -2470,18 +2488,19 @@ void Service::Pubsub(CmdArgList args, ConnectionContext* cntx) {
pattern = ArgS(args, 1);
}
PubsubChannels(pattern, cntx);
PubsubChannels(pattern, builder);
} else if (subcmd == "NUMPAT") {
PubsubPatterns(cntx);
PubsubPatterns(builder);
} else if (subcmd == "NUMSUB") {
args.remove_prefix(1);
PubsubNumSub(args, cntx);
PubsubNumSub(args, builder);
} else {
cntx->SendError(UnknownSubCmd(subcmd, "PUBSUB"));
builder->SendError(UnknownSubCmd(subcmd, "PUBSUB"));
}
}
void Service::Command(CmdArgList args, ConnectionContext* cntx) {
void Service::Command(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx) {
unsigned cmd_cnt = 0;
registry_.Traverse([&](string_view name, const CommandId& cd) {
if ((cd.opt_mask() & CO::HIDDEN) == 0) {
@ -2489,7 +2508,7 @@ void Service::Command(CmdArgList args, ConnectionContext* cntx) {
}
});
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
auto* rb = static_cast<RedisReplyBuilder*>(builder);
auto serialize_command = [&rb](string_view name, const CommandId& cid) {
rb->StartArray(6);
rb->SendSimpleString(cid.name());
@ -2524,7 +2543,7 @@ void Service::Command(CmdArgList args, ConnectionContext* cntx) {
// COUNT
if (subcmd == "COUNT") {
return cntx->SendLong(cmd_cnt);
return builder->SendLong(cmd_cnt);
}
// INFO [cmd]
@ -2541,7 +2560,7 @@ void Service::Command(CmdArgList args, ConnectionContext* cntx) {
return;
}
return cntx->SendError(kSyntaxErr, kSyntaxErrType);
return builder->SendError(kSyntaxErr, kSyntaxErrType);
}
VarzValue::Map Service::GetVarzStats() {
@ -2675,8 +2694,9 @@ Service::ContextInfo Service::GetContextInfo(facade::ConnectionContext* cntx) co
using ServiceFunc = void (Service::*)(CmdArgList, ConnectionContext* cntx);
#define HFUNC(x) SetHandler(&Service::x)
#define MFUNC(x) \
SetHandler([this](CmdArgList sp, ConnectionContext* cntx) { this->x(std::move(sp), cntx); })
#define MFUNC(x) \
SetHandler([this](CmdArgList sp, Transaction* tx, SinkReplyBuilder* builder, \
ConnectionContext* cntx) { this->x(std::move(sp), tx, builder, cntx); })
namespace acl {
constexpr uint32_t kQuit = FAST | CONNECTION;

View file

@ -119,29 +119,45 @@ class Service : public facade::ServiceInterface {
const acl::AclFamily* TestInit();
private:
static void Quit(CmdArgList args, ConnectionContext* cntx);
static void Multi(CmdArgList args, ConnectionContext* cntx);
using SinkReplyBuilder = facade::SinkReplyBuilder;
static void Watch(CmdArgList args, ConnectionContext* cntx);
static void Unwatch(CmdArgList args, ConnectionContext* cntx);
static void Quit(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
static void Multi(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void Discard(CmdArgList args, ConnectionContext* cntx);
void Eval(CmdArgList args, ConnectionContext* cntx);
void EvalSha(CmdArgList args, ConnectionContext* cntx);
void Exec(CmdArgList args, ConnectionContext* cntx);
void Publish(CmdArgList args, ConnectionContext* cntx);
void Subscribe(CmdArgList args, ConnectionContext* cntx);
void Unsubscribe(CmdArgList args, ConnectionContext* cntx);
void PSubscribe(CmdArgList args, ConnectionContext* cntx);
void PUnsubscribe(CmdArgList args, ConnectionContext* cntx);
void Function(CmdArgList args, ConnectionContext* cntx);
void Monitor(CmdArgList args, ConnectionContext* cntx);
void Pubsub(CmdArgList args, ConnectionContext* cntx);
void Command(CmdArgList args, ConnectionContext* cntx);
static void Watch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
static void Unwatch(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void PubsubChannels(std::string_view pattern, ConnectionContext* cntx);
void PubsubPatterns(ConnectionContext* cntx);
void PubsubNumSub(CmdArgList channels, ConnectionContext* cntx);
void Discard(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void Eval(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
void EvalSha(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void Exec(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
void Publish(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void Subscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void Unsubscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void PSubscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void PUnsubscribe(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void Function(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void Monitor(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void Pubsub(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx);
void Command(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
ConnectionContext* cntx);
void PubsubChannels(std::string_view pattern, SinkReplyBuilder* builder);
void PubsubPatterns(SinkReplyBuilder* builder);
void PubsubNumSub(CmdArgList channels, SinkReplyBuilder* builder);
struct EvalArgs {
std::string_view sha; // only one of them is defined.