mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
Serve http requests from redis port
This commit is contained in:
parent
e3d73eb912
commit
05e03f67f4
5 changed files with 56 additions and 49 deletions
2
helio
2
helio
|
@ -1 +1 @@
|
|||
Subproject commit f7c9d00016ea48866632583918b042b744f261b9
|
||||
Subproject commit bf4af91fad4ccd2fa1a2f64d01547a0701e4978a
|
|
@ -3,28 +3,25 @@
|
|||
//
|
||||
|
||||
#include "base/init.h"
|
||||
#include "server/main_service.h"
|
||||
#include "server/dragonfly_listener.h"
|
||||
#include "server/main_service.h"
|
||||
#include "util/accept_server.h"
|
||||
#include "util/uring/uring_pool.h"
|
||||
#include "util/varz.h"
|
||||
|
||||
DEFINE_int32(http_port, 8080, "Http port.");
|
||||
DECLARE_uint32(port);
|
||||
DECLARE_uint32(memcache_port);
|
||||
|
||||
using namespace util;
|
||||
using namespace std;
|
||||
|
||||
namespace dfly {
|
||||
|
||||
void RunEngine(ProactorPool* pool, AcceptServer* acceptor, HttpListener<>* http) {
|
||||
Service service(pool);
|
||||
|
||||
service.RegisterHttp(http);
|
||||
service.Init(acceptor);
|
||||
|
||||
if (http) {
|
||||
service.RegisterHttp(http);
|
||||
}
|
||||
|
||||
acceptor->AddListener(FLAGS_port, new Listener{Protocol::REDIS, &service});
|
||||
if (FLAGS_memcache_port > 0) {
|
||||
acceptor->AddListener(FLAGS_memcache_port, new Listener{Protocol::MEMCACHE, &service});
|
||||
|
@ -38,7 +35,6 @@ void RunEngine(ProactorPool* pool, AcceptServer* acceptor, HttpListener<>* http)
|
|||
|
||||
} // namespace dfly
|
||||
|
||||
|
||||
int main(int argc, char* argv[]) {
|
||||
MainInitGuard guard(&argc, &argv);
|
||||
|
||||
|
@ -48,19 +44,11 @@ int main(int argc, char* argv[]) {
|
|||
pp.Run();
|
||||
|
||||
AcceptServer acceptor(&pp);
|
||||
HttpListener<>* http_listener = nullptr;
|
||||
unique_ptr<HttpListener<>> http_listener(new HttpListener<>);
|
||||
|
||||
if (FLAGS_http_port >= 0) {
|
||||
http_listener = new HttpListener<>;
|
||||
http_listener->enable_metrics();
|
||||
http_listener->enable_metrics();
|
||||
|
||||
// Ownership over http_listener is moved to the acceptor.
|
||||
uint16_t port = acceptor.AddListener(FLAGS_http_port, http_listener);
|
||||
|
||||
LOG(INFO) << "Started http service on port " << port;
|
||||
}
|
||||
|
||||
dfly::RunEngine(&pp, &acceptor, http_listener);
|
||||
dfly::RunEngine(&pp, &acceptor, http_listener.get());
|
||||
|
||||
pp.Stop();
|
||||
|
||||
|
|
|
@ -124,8 +124,10 @@ void Connection::HandleRequests() {
|
|||
this_fiber::properties<FiberProps>().set_name("DflyConnection");
|
||||
|
||||
int val = 1;
|
||||
CHECK_EQ(0, setsockopt(socket_->native_handle(), SOL_TCP, TCP_NODELAY, &val, sizeof(val)));
|
||||
auto remote_ep = socket_->RemoteEndpoint();
|
||||
LinuxSocketBase* lsb = static_cast<LinuxSocketBase*>(socket_.get());
|
||||
CHECK_EQ(0, setsockopt(lsb->native_handle(), SOL_TCP, TCP_NODELAY, &val, sizeof(val)));
|
||||
|
||||
auto remote_ep = lsb->RemoteEndpoint();
|
||||
|
||||
std::unique_ptr<tls::TlsSocket> tls_sock;
|
||||
if (ctx_) {
|
||||
|
@ -141,36 +143,46 @@ void Connection::HandleRequests() {
|
|||
}
|
||||
|
||||
FiberSocketBase* peer = tls_sock ? (FiberSocketBase*)tls_sock.get() : socket_.get();
|
||||
cc_.reset(new ConnectionContext(peer, this));
|
||||
cc_->shard_set = &service_->shard_set();
|
||||
io::Result<bool> http_res = CheckForHttpProto(peer);
|
||||
|
||||
// TODO: to move this interface to LinuxSocketBase so we won't need to cast.
|
||||
uring::UringSocket* us = static_cast<uring::UringSocket*>(socket_.get());
|
||||
if (http_res) {
|
||||
if (*http_res) {
|
||||
VLOG(1) << "HTTP1.1 identified";
|
||||
HttpConnection http_conn{service_->http_listener()};
|
||||
http_conn.SetSocket(peer);
|
||||
auto ec = http_conn.ParseFromBuffer(io_buf_.InputBuffer());
|
||||
io_buf_.ConsumeInput(io_buf_.InputLen());
|
||||
if (!ec) {
|
||||
http_conn.HandleRequests();
|
||||
}
|
||||
http_conn.ReleaseSocket();
|
||||
} else {
|
||||
cc_.reset(new ConnectionContext(peer, this));
|
||||
cc_->shard_set = &service_->shard_set();
|
||||
|
||||
bool poll_armed = true;
|
||||
uint32_t poll_id = us->PollEvent(POLLERR | POLLHUP, [&](uint32_t mask) {
|
||||
VLOG(1) << "Got event " << mask;
|
||||
cc_->conn_state.mask |= ConnectionState::CONN_CLOSING;
|
||||
if (cc_->transaction) {
|
||||
cc_->transaction->BreakOnClose();
|
||||
// TODO: to move this interface to LinuxSocketBase so we won't need to cast.
|
||||
uring::UringSocket* us = static_cast<uring::UringSocket*>(socket_.get());
|
||||
|
||||
bool poll_armed = true;
|
||||
uint32_t poll_id = us->PollEvent(POLLERR | POLLHUP, [&](uint32_t mask) {
|
||||
VLOG(1) << "Got event " << mask;
|
||||
cc_->conn_state.mask |= ConnectionState::CONN_CLOSING;
|
||||
if (cc_->transaction) {
|
||||
cc_->transaction->BreakOnClose();
|
||||
}
|
||||
|
||||
evc_.notify(); // Notify dispatch fiber.
|
||||
poll_armed = false;
|
||||
});
|
||||
|
||||
InputLoop(peer);
|
||||
|
||||
if (poll_armed) {
|
||||
us->CancelPoll(poll_id);
|
||||
}
|
||||
}
|
||||
|
||||
evc_.notify(); // Notify dispatch fiber.
|
||||
poll_armed = false;
|
||||
});
|
||||
|
||||
io::Result<bool> check_res = CheckForHttpProto(peer);
|
||||
if (!check_res)
|
||||
return;
|
||||
if (*check_res) {
|
||||
LOG(INFO) << "HTTP1.1 identified";
|
||||
}
|
||||
|
||||
InputLoop(peer);
|
||||
|
||||
if (poll_armed) {
|
||||
us->CancelPoll(poll_id);
|
||||
}
|
||||
VLOG(1) << "Closed connection for peer " << remote_ep;
|
||||
}
|
||||
|
||||
|
@ -195,6 +207,7 @@ io::Result<bool> Connection::CheckForHttpProto(util::FiberSocketBase* peer) {
|
|||
}
|
||||
last_len = io_buf_.InputLen();
|
||||
} while (last_len < 1024);
|
||||
|
||||
return false;
|
||||
}
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ constexpr size_t kMaxThreadSize = 1024;
|
|||
|
||||
} // namespace
|
||||
|
||||
Service::Service(ProactorPool* pp) : shard_set_(pp), pp_(*pp), server_family_(this) {
|
||||
Service::Service(ProactorPool* pp) : pp_(*pp), shard_set_(pp), server_family_(this) {
|
||||
CHECK(pp);
|
||||
|
||||
// We support less than 1024 threads and we support less than 1024 shards.
|
||||
|
@ -255,6 +255,7 @@ bool Service::IsShardSetLocked() const {
|
|||
|
||||
void Service::RegisterHttp(HttpListenerBase* listener) {
|
||||
CHECK_NOTNULL(listener);
|
||||
http_listener_ = listener;
|
||||
}
|
||||
|
||||
void Service::Quit(CmdArgList args, ConnectionContext* cntx) {
|
||||
|
|
|
@ -57,6 +57,8 @@ class Service {
|
|||
return pp_;
|
||||
}
|
||||
|
||||
util::HttpListenerBase* http_listener() { return http_listener_; }
|
||||
|
||||
private:
|
||||
|
||||
static void Quit(CmdArgList args, ConnectionContext* cntx);
|
||||
|
@ -67,10 +69,13 @@ class Service {
|
|||
|
||||
base::VarzValue::Map GetVarzStats();
|
||||
|
||||
util::ProactorPool& pp_;
|
||||
|
||||
CommandRegistry registry_;
|
||||
EngineShardSet shard_set_;
|
||||
util::ProactorPool& pp_;
|
||||
|
||||
ServerFamily server_family_;
|
||||
util::HttpListenerBase* http_listener_ = nullptr;
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue