From 48589604fc99dfbb35ac78a101e1efc021e89b95 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 16 Nov 2021 15:04:32 +0200 Subject: [PATCH] Bind redis parser to dragonfly connection --- server/dragonfly_connection.cc | 45 +++++++++++++++++++++++++++++++--- server/dragonfly_connection.h | 11 +++++++-- server/resp_expr.cc | 6 ++--- server/resp_expr.h | 2 +- server/test_utils.cc | 2 +- server/test_utils.h | 2 +- 6 files changed, 57 insertions(+), 11 deletions(-) diff --git a/server/dragonfly_connection.cc b/server/dragonfly_connection.cc index 956f17ac0..1e4f56618 100644 --- a/server/dragonfly_connection.cc +++ b/server/dragonfly_connection.cc @@ -9,6 +9,7 @@ #include "base/io_buf.h" #include "base/logging.h" #include "server/main_service.h" +#include "server/redis_parser.h" #include "util/fiber_sched_algo.h" using namespace util; @@ -41,6 +42,7 @@ struct Connection::Shutdown { Connection::Connection(Service* service) : service_(service) { + redis_parser_.reset(new RedisParser); } Connection::~Connection() { @@ -84,7 +86,7 @@ void Connection::HandleRequests() { void Connection::InputLoop(FiberSocketBase* peer) { base::IoBuf io_buf{kMinReadSize}; - + ParserStatus status = OK; std::error_code ec; do { @@ -93,13 +95,17 @@ void Connection::InputLoop(FiberSocketBase* peer) { if (!recv_sz) { ec = recv_sz.error(); + status = OK; break; } io_buf.CommitWrite(*recv_sz); - ec = peer->Write(io_buf.InputBuffer()); - if (ec) + status = ParseRedis(&io_buf, peer); + if (status == NEED_MORE) { + status = OK; + } else if (status != OK) { break; + } } while (peer->IsOpen()); if (ec && !FiberSocketBase::IsConnClosed(ec)) { @@ -107,4 +113,37 @@ void Connection::InputLoop(FiberSocketBase* peer) { } } +auto Connection::ParseRedis(base::IoBuf* io_buf, util::FiberSocketBase* peer) -> ParserStatus { + RespVec args; + uint32_t consumed = 0; + + RedisParser::Result result = RedisParser::OK; + error_code ec; + do { + result = redis_parser_->Parse(io_buf->InputBuffer(), &consumed, &args); + + if (result == RedisParser::OK && !args.empty()) { + RespExpr& first = args.front(); + if (first.type == RespExpr::STRING) { + DVLOG(2) << "Got Args with first token " << ToSV(first.GetBuf()); + } + + CHECK_EQ(RespExpr::STRING, first.type); // TODO + string_view sv = ToSV(first.GetBuf()); + if (sv == "PING") { + ec = peer->Write(io::Buffer("PONG\r\n")); + } + } + io_buf->ConsumeInput(consumed); + } while (RedisParser::OK == result && !ec); + + parser_error_ = result; + if (result == RedisParser::OK) + return OK; + + if (result == RedisParser::INPUT_PENDING) + return NEED_MORE; + + return ERROR; +} } // namespace dfly diff --git a/server/dragonfly_connection.h b/server/dragonfly_connection.h index 9aafba74d..e541a1b3e 100644 --- a/server/dragonfly_connection.h +++ b/server/dragonfly_connection.h @@ -6,9 +6,12 @@ #include "util/connection.h" +#include "base/io_buf.h" + namespace dfly { class Service; +class RedisParser; class Connection : public util::Connection { public: @@ -26,13 +29,17 @@ class Connection : public util::Connection { void OnShutdown() override; private: + enum ParserStatus { OK, NEED_MORE, ERROR }; + void HandleRequests() final; void InputLoop(util::FiberSocketBase* peer); - void DispatchFiber(util::FiberSocketBase* peer); + ParserStatus ParseRedis(base::IoBuf* buf, util::FiberSocketBase* peer); + + std::unique_ptr redis_parser_; Service* service_; - + unsigned parser_error_ = 0; struct Shutdown; std::unique_ptr shutdown_; }; diff --git a/server/resp_expr.cc b/server/resp_expr.cc index b37da0bdb..fa3fe71ca 100644 --- a/server/resp_expr.cc +++ b/server/resp_expr.cc @@ -32,14 +32,14 @@ namespace std { ostream& operator<<(ostream& os, const dfly::RespExpr& e) { using dfly::RespExpr; - using dfly::ToAbsl; + using dfly::ToSV; switch (e.type) { case RespExpr::INT64: os << "i" << get(e.u); break; case RespExpr::STRING: - os << "'" << ToAbsl(get(e.u)) << "'"; + os << "'" << ToSV(e.GetBuf()) << "'"; break; case RespExpr::NIL: os << "nil"; @@ -51,7 +51,7 @@ ostream& operator<<(ostream& os, const dfly::RespExpr& e) { os << dfly::RespSpan{*get(e.u)}; break; case RespExpr::ERROR: - os << "e(" << ToAbsl(get(e.u)) << ")"; + os << "e(" << ToSV(e.GetBuf()) << ")"; break; } diff --git a/server/resp_expr.h b/server/resp_expr.h index afc95cabf..7975d3a6d 100644 --- a/server/resp_expr.h +++ b/server/resp_expr.h @@ -39,7 +39,7 @@ class RespExpr { using RespVec = RespExpr::Vec; using RespSpan = absl::Span; -inline std::string_view ToAbsl(const absl::Span& s) { +inline std::string_view ToSV(const absl::Span& s) { return std::string_view{reinterpret_cast(s.data()), s.size()}; } diff --git a/server/test_utils.cc b/server/test_utils.cc index 91bc956fa..7a608a258 100644 --- a/server/test_utils.cc +++ b/server/test_utils.cc @@ -103,7 +103,7 @@ vector ToIntArr(const RespVec& vec) { vector res; for (auto a : vec) { int64_t val; - std::string_view s = ToAbsl(a.GetBuf()); + std::string_view s = ToSV(a.GetBuf()); CHECK(absl::SimpleAtoi(s, &val)) << s; res.push_back(val); } diff --git a/server/test_utils.h b/server/test_utils.h index 0fa7307b4..6410d28b3 100644 --- a/server/test_utils.h +++ b/server/test_utils.h @@ -74,7 +74,7 @@ inline ::testing::PolymorphicMatcher ArgType(RespExpr::Type t) } inline bool operator==(const RespExpr& left, const char* s) { - return left.type == RespExpr::STRING && ToAbsl(left.GetBuf()) == s; + return left.type == RespExpr::STRING && ToSV(left.GetBuf()) == s; } void PrintTo(const RespExpr::Vec& vec, std::ostream* os);