chore: rename owner to conn (#1973)

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2023-10-01 10:56:18 +03:00 committed by GitHub
parent e6b8cd1d76
commit 43d55fa6d7
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 28 additions and 28 deletions

View file

@ -24,11 +24,11 @@ class ConnectionContext {
virtual ~ConnectionContext() {
}
Connection* owner() {
Connection* conn() {
return owner_;
}
const Connection* owner() const {
const Connection* conn() const {
return owner_;
}

View file

@ -38,7 +38,7 @@ void AclLog::Add(const ConnectionContext& cntx, std::string object, Reason reaso
username = std::move(tried_to_auth);
}
std::string client_info = cntx.owner()->GetClientInfo();
std::string client_info = cntx.conn()->GetClientInfo();
using clock = std::chrono::system_clock;
LogEntry entry = {std::move(username), std::move(client_info), std::move(object), reason,
clock::now()};

View file

@ -70,7 +70,7 @@ ClusterShard ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const
DCHECK(etl.is_master);
std::string cluster_announce_ip = absl::GetFlag(FLAGS_cluster_announce_ip);
std::string preferred_endpoint =
cluster_announce_ip.empty() ? cntx->owner()->LocalBindAddress() : cluster_announce_ip;
cluster_announce_ip.empty() ? cntx->conn()->LocalBindAddress() : cluster_announce_ip;
info.master = {.id = server_family_->master_id(),
.ip = preferred_endpoint,
@ -85,7 +85,7 @@ ClusterShard ClusterFamily::GetEmulatedShardInfo(ConnectionContext* cntx) const
info.master = {
.id = etl.remote_client_id_, .ip = replication_info->host, .port = replication_info->port};
info.replicas.push_back({.id = server_family_->master_id(),
.ip = cntx->owner()->LocalBindAddress(),
.ip = cntx->conn()->LocalBindAddress(),
.port = static_cast<uint16_t>(absl::GetFlag(FLAGS_port))});
}
@ -378,7 +378,7 @@ void ClusterFamily::DflyCluster(CmdArgList args, ConnectionContext* cntx) {
return (*cntx)->SendError(kClusterDisabled);
}
if (cntx->owner() && !cntx->owner()->IsAdmin()) {
if (cntx->conn() && !cntx->conn()->IsAdmin()) {
return (*cntx)->SendError(kDflyClusterCmdPort);
}

View file

@ -97,10 +97,10 @@ void ConnectionContext::ChangeMonitor(bool start) {
// then notify all other threads that there is a change in the number of monitors
auto& my_monitors = ServerState::tlocal()->Monitors();
if (start) {
my_monitors.Add(owner());
my_monitors.Add(conn());
} else {
VLOG(1) << "connection " << owner()->GetClientId() << " no longer needs to be monitored";
my_monitors.Remove(owner());
VLOG(1) << "connection " << conn()->GetClientId() << " no longer needs to be monitored";
my_monitors.Remove(conn());
}
// Tell other threads that about the change in the number of connection that we monitor
shard_set->pool()->Await(

View file

@ -228,7 +228,7 @@ void DflyCmd::Thread(CmdArgList args, ConnectionContext* cntx) {
if (num_thread < pool->size()) {
if (int(num_thread) != ProactorBase::GetIndex()) {
cntx->owner()->Migrate(pool->at(num_thread));
cntx->conn()->Migrate(pool->at(num_thread));
}
return rb->SendOk();
@ -272,7 +272,7 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) {
return rb->SendError(kInvalidState);
// Set meta info on connection.
cntx->owner()->SetName(absl::StrCat("repl_flow_", sync_id));
cntx->conn()->SetName(absl::StrCat("repl_flow_", sync_id));
cntx->conn_state.replication_info.repl_session_id = sync_id;
cntx->conn_state.replication_info.repl_flow_id = flow_id;
@ -281,11 +281,11 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) {
auto& flow = replica_ptr->flows[flow_id];
cntx->replication_flow = &flow;
flow.conn = cntx->owner();
flow.conn = cntx->conn();
flow.eof_token = eof_token;
flow.version = replica_ptr->version;
cntx->owner()->Migrate(shard_set->pool()->at(flow_id));
cntx->conn()->Migrate(shard_set->pool()->at(flow_id));
sf_->journal()->StartInThread();
std::string_view sync_type = "FULL";
@ -416,7 +416,7 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) {
// We need to await for all dispatches to finish: Otherwise a transaction might be scheduled
// after this function exits but before the actual shutdown.
sf_->CancelBlockingCommands();
if (!sf_->AwaitDispatches(timeout_dur, [self = cntx->owner()](util::Connection* conn) {
if (!sf_->AwaitDispatches(timeout_dur, [self = cntx->conn()](util::Connection* conn) {
// The only command that is currently dispatching should be the takeover command -
// so we wait until this is true.
return conn != self;
@ -602,7 +602,7 @@ auto DflyCmd::CreateSyncSession(ConnectionContext* cntx)
fb2::Fiber("stop_replication", &DflyCmd::StopReplication, this, sync_id).Detach();
};
string address = cntx->owner()->RemoteEndpointAddress();
string address = cntx->conn()->RemoteEndpointAddress();
uint32_t port = cntx->conn_state.replication_info.repl_listening_port;
LOG(INFO) << "Registered replica " << address << ":" << port;

View file

@ -230,7 +230,7 @@ std::string MakeMonitorMessage(const ConnectionContext* cntx, const CommandId* c
string endpoint;
if (cntx->conn_state.script_info) {
endpoint = "lua";
} else if (const auto* conn = cntx->owner(); conn != nullptr) {
} else if (const auto* conn = cntx->conn(); conn != nullptr) {
endpoint = conn->RemoteEndpointStr();
} else {
endpoint = "REPLICATION:0";
@ -823,7 +823,7 @@ std::optional<ErrorReply> Service::VerifyCommandState(const CommandId* cid, CmdA
// If there is no connection owner, it means the command it being called
// from another command or used internally, therefore is always permitted.
if (dfly_cntx.owner() != nullptr && !dfly_cntx.owner()->IsAdmin() && cid->IsRestricted()) {
if (dfly_cntx.conn() != nullptr && !dfly_cntx.conn()->IsAdmin() && cid->IsRestricted()) {
return ErrorReply{"Cannot execute restricted command (admin only)"};
}
@ -919,9 +919,9 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
bool under_multi = dfly_cntx->conn_state.exec_info.IsRunning();
if (VLOG_IS_ON(2) &&
cntx->owner()) { // owner may not exists in case of this being called from replica context
cntx->conn()) { // owner may not exists in case of this being called from replica context
const char* lua = under_script ? "LUA " : "";
LOG(INFO) << "Got (" << cntx->owner()->GetClientId() << "): " << lua << args
LOG(INFO) << "Got (" << cntx->conn()->GetClientId() << "): " << lua << args
<< " in dbid=" << dfly_cntx->conn_state.db_index;
}
@ -1269,7 +1269,7 @@ void Service::Quit(CmdArgList args, ConnectionContext* cntx) {
builder->CloseConnection();
DeactivateMonitoring(static_cast<ConnectionContext*>(cntx));
cntx->owner()->ShutdownSelf();
cntx->conn()->ShutdownSelf();
}
void Service::Multi(CmdArgList args, ConnectionContext* cntx) {
@ -1874,7 +1874,7 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) {
// Most importantly, this approach allows not blocking and not awaiting in the dispatch below,
// thus not adding any overhead to backpressure checks.
for (auto& sub : subscribers)
sub.conn_cntx->owner()->EnsureAsyncMemoryBudget();
sub.conn_cntx->conn()->EnsureAsyncMemoryBudget();
auto subscribers_ptr = make_shared<decltype(subscribers)>(std::move(subscribers));
auto buf = shared_ptr<char[]>{new char[channel.size() + msg.size()]};
@ -1886,7 +1886,7 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) {
ChannelStore::Subscriber::ByThread);
while (it != subscribers_ptr->end() && it->thread_id == idx) {
facade::Connection* conn = it->conn_cntx->owner();
facade::Connection* conn = it->conn_cntx->conn();
DCHECK(conn);
conn->SendPubMessageAsync(
{std::move(it->pattern), std::move(buf), channel.size(), msg.size()});
@ -1949,7 +1949,7 @@ void Service::PubsubPatterns(ConnectionContext* cntx) {
}
void Service::Monitor(CmdArgList args, ConnectionContext* cntx) {
VLOG(1) << "starting monitor on this connection: " << cntx->owner()->GetClientId();
VLOG(1) << "starting monitor on this connection: " << cntx->conn()->GetClientId();
// we are registering the current connection for all threads so they will be aware of
// this connection, to send to it any command
(*cntx)->SendOk();

View file

@ -1044,12 +1044,12 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) {
string_view sub_cmd = ArgS(args, 0);
if (sub_cmd == "SETNAME" && args.size() == 2) {
cntx->owner()->SetName(string{ArgS(args, 1)});
cntx->conn()->SetName(string{ArgS(args, 1)});
return (*cntx)->SendOk();
}
if (sub_cmd == "GETNAME") {
auto name = cntx->owner()->GetName();
auto name = cntx->conn()->GetName();
if (!name.empty()) {
return (*cntx)->SendBulkString(name);
} else {
@ -1603,7 +1603,7 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
}
if (has_setname) {
cntx->owner()->SetName(string{clientname});
cntx->conn()->SetName(string{clientname});
}
int proto_version = 2;
@ -1625,7 +1625,7 @@ void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendBulkString("proto");
(*cntx)->SendLong(proto_version);
(*cntx)->SendBulkString("id");
(*cntx)->SendLong(cntx->owner()->GetClientId());
(*cntx)->SendLong(cntx->conn()->GetClientId());
(*cntx)->SendBulkString("mode");
(*cntx)->SendBulkString("standalone");
(*cntx)->SendBulkString("role");
@ -1792,7 +1792,7 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
if (cmd == "CAPA") {
if (arg == "dragonfly" && args.size() == 2 && i == 0) {
auto [sid, replica_info] = dfly_cmd_->CreateSyncSession(cntx);
cntx->owner()->SetName(absl::StrCat("repl_ctrl_", sid));
cntx->conn()->SetName(absl::StrCat("repl_ctrl_", sid));
string sync_id = absl::StrCat("SYNC", sid);
cntx->conn_state.replication_info.repl_session_id = sid;