fix(server): Initialize ServerFamily with all listeners. (#1485)

* fix(server): Initialize ServerFamily with all listeners.

- Add a test for CLIENT LIST which is the visible result of this.

* use std move
This commit is contained in:
Roy Jacobson 2023-07-02 10:01:54 +02:00 committed by GitHub
parent 1ee0e30255
commit 52192e0596
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
7 changed files with 35 additions and 14 deletions

View file

@ -314,12 +314,14 @@ bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
auto tcp_disabled = GetFlag(FLAGS_port) == 0u;
Listener* main_listener = nullptr;
if (!tcp_disabled)
std::vector<facade::Listener*> listeners;
if (!tcp_disabled) {
main_listener = new Listener{Protocol::REDIS, &service};
listeners.push_back(main_listener);
}
Service::InitOpts opts;
opts.disable_time_update = false;
service.Init(acceptor, main_listener, opts);
const auto& bind = GetFlag(FLAGS_bind);
const char* bind_addr = bind.empty() ? nullptr : bind.c_str();
auto port = GetFlag(FLAGS_port);
@ -357,6 +359,7 @@ bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
delete uds_listener;
} else {
LOG(INFO) << "Listening on unix socket " << unix_sock;
listeners.push_back(uds_listener);
unlink_uds = true;
}
} else if (tcp_disabled) {
@ -381,9 +384,12 @@ bool RunEngine(ProactorPool* pool, AcceptServer* acceptor) {
delete admin_listener;
} else {
LOG(INFO) << "Listening on " << printable_addr;
listeners.push_back(admin_listener);
}
}
service.Init(acceptor, listeners, opts);
if (!tcp_disabled) {
error_code ec = acceptor->AddListener(bind_addr, port, main_listener);

View file

@ -539,7 +539,7 @@ Service::~Service() {
shard_set = nullptr;
}
void Service::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_interface,
void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*> listeners,
const InitOpts& opts) {
InitRedisTables();
@ -557,7 +557,7 @@ void Service::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_i
request_latency_usec.Init(&pp_);
StringFamily::Init(&pp_);
GenericFamily::Init(&pp_);
server_family_.Init(acceptor, main_interface, &cluster_family_);
server_family_.Init(acceptor, std::move(listeners), &cluster_family_);
ChannelStore* cs = new ChannelStore{};
pp_.Await(

View file

@ -35,7 +35,7 @@ class Service : public facade::ServiceInterface {
explicit Service(util::ProactorPool* pp);
~Service();
void Init(util::AcceptServer* acceptor, util::ListenerInterface* main_interface,
void Init(util::AcceptServer* acceptor, std::vector<facade::Listener*> listeners,
const InitOpts& opts = InitOpts{});
void Shutdown();

View file

@ -493,11 +493,11 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) {
ServerFamily::~ServerFamily() {
}
void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* main_listener,
void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*> listeners,
ClusterFamily* cluster_family) {
CHECK(acceptor_ == nullptr);
acceptor_ = acceptor;
main_listener_ = main_listener;
listeners_ = std::move(listeners);
dfly_cmd_ = make_unique<DflyCmd>(this);
cluster_family_ = cluster_family;
@ -1346,7 +1346,10 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) {
client_info.push_back(move(info));
};
main_listener_->TraverseConnections(cb);
for (auto* listener : listeners_) {
listener->TraverseConnections(cb);
}
string result = absl::StrJoin(move(client_info), "\n");
result.append("\n");
return (*cntx)->SendBulkString(result);

View file

@ -8,6 +8,7 @@
#include <string>
#include "facade/conn_context.h"
#include "facade/dragonfly_listener.h"
#include "facade/redis_parser.h"
#include "server/channel_store.h"
#include "server/engine_shard_set.h"
@ -88,7 +89,7 @@ class ServerFamily {
explicit ServerFamily(Service* service);
~ServerFamily();
void Init(util::AcceptServer* acceptor, util::ListenerInterface* main_listener,
void Init(util::AcceptServer* acceptor, std::vector<facade::Listener*> listeners,
ClusterFamily* cluster_family);
void Register(CommandRegistry* registry);
void Shutdown();
@ -195,7 +196,7 @@ class ServerFamily {
Service& service_;
util::AcceptServer* acceptor_ = nullptr;
util::ListenerInterface* main_listener_ = nullptr;
std::vector<facade::Listener*> listeners_;
util::ProactorBase* pb_task_ = nullptr;
mutable Mutex replicaof_mu_, save_mu_;

View file

@ -138,7 +138,7 @@ void BaseFamilyTest::SetUp() {
Service::InitOpts opts;
opts.disable_time_update = true;
service_->Init(nullptr, nullptr, opts);
service_->Init(nullptr, {}, opts);
TEST_current_time_ms = absl::GetCurrentTimeNanos() / 1000000;
auto cb = [&](EngineShard* s) { s->db_slice().UpdateExpireBase(TEST_current_time_ms - 1000, 0); };

View file

@ -62,10 +62,21 @@ async def test_connection_name(async_client: aioredis.Redis):
assert name == "test_conn_name"
'''
make sure that the scan command is working with python
'''
async def test_client_list(df_factory):
instance = df_factory.create(port=1111, admin_port=1112)
instance.start()
async with (aioredis.Redis(port=instance.port) as client, aioredis.Redis(port=instance.admin_port) as admin_client):
await client.ping()
await admin_client.ping()
assert len(await client.execute_command("CLIENT LIST")) == 2
assert len(await admin_client.execute_command("CLIENT LIST")) == 2
instance.stop()
async def test_scan(async_client: aioredis.Redis):
'''
make sure that the scan command is working with python
'''
def gen_test_data():
for i in range(10):
yield f"key-{i}", f"value-{i}"