diff --git a/README.md b/README.md index e53cf28f5..2e6f74841 100644 --- a/README.md +++ b/README.md @@ -26,5 +26,3 @@ cd build-opt && ninja midi-redis ``` for more options, run `./midi-redis --help` - - diff --git a/helio b/helio index 29e247c00..dab0fc768 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 29e247c00c2f10798c78044607436953fb22b384 +Subproject commit dab0fc768494d8e7cc42d77b08fbee1a81415072 diff --git a/server/dragonfly_connection.cc b/server/dragonfly_connection.cc index 56a7467d9..72dbdffad 100644 --- a/server/dragonfly_connection.cc +++ b/server/dragonfly_connection.cc @@ -26,8 +26,6 @@ namespace fibers = boost::fibers; namespace dfly { namespace { -using CmdArgVec = std::vector; - void SendProtocolError(RedisParser::Result pres, FiberSocketBase* peer) { string res("-ERR Protocol error: "); if (pres == RedisParser::BAD_BULKLEN) { @@ -43,10 +41,6 @@ void SendProtocolError(RedisParser::Result pres, FiberSocketBase* peer) { } } -inline MutableStrSpan ToMSS(absl::Span span) { - return MutableStrSpan{reinterpret_cast(span.data()), span.size()}; -} - void RespToArgList(const RespVec& src, CmdArgVec* dest) { dest->resize(src.size()); for (size_t i = 0; i < src.size(); ++i) { @@ -311,6 +305,7 @@ void Connection::DispatchFiber(util::FiberSocketBase* peer) { std::unique_ptr req{dispatch_q_.front()}; dispatch_q_.pop_front(); + cc_->SetBatchMode(!dispatch_q_.empty()); cc_->conn_state.mask |= ConnectionState::ASYNC_DISPATCH; service_->DispatchCommand(CmdArgList{req->args.data(), req->args.size()}, cc_.get()); cc_->conn_state.mask &= ~ConnectionState::ASYNC_DISPATCH; diff --git a/server/dragonfly_listener.cc b/server/dragonfly_listener.cc index d5fa73672..dd1e80b2a 100644 --- a/server/dragonfly_listener.cc +++ b/server/dragonfly_listener.cc @@ -119,11 +119,12 @@ ProactorBase* Listener::PickConnectionProactor(LinuxSocketBase* sock) { if (FLAGS_conn_use_incoming_cpu) { int fd = sock->native_handle(); - int cpu; + int cpu, napi_id; socklen_t len = sizeof(cpu); CHECK_EQ(0, getsockopt(fd, SOL_SOCKET, SO_INCOMING_CPU, &cpu, &len)); - VLOG(1) << "CPU for connection " << fd << " is " << cpu; + CHECK_EQ(0, getsockopt(fd, SOL_SOCKET, SO_INCOMING_NAPI_ID, &napi_id, &len)); + VLOG(1) << "CPU/NAPI for connection " << fd << " is " << cpu << "/" << napi_id; vector ids = pool()->MapCpuToThreads(cpu); if (!ids.empty()) { diff --git a/server/engine_shard_set.cc b/server/engine_shard_set.cc index 501cb5bfc..b8add1194 100644 --- a/server/engine_shard_set.cc +++ b/server/engine_shard_set.cc @@ -43,7 +43,7 @@ void EngineShard::DestroyThreadLocal() { delete shard_; shard_ = nullptr; - DVLOG(1) << "Shard reset " << index; + VLOG(1) << "Shard reset " << index; } void EngineShardSet::Init(uint32_t sz) { diff --git a/server/main_service.cc b/server/main_service.cc index a54943b4d..f7ba55c1a 100644 --- a/server/main_service.cc +++ b/server/main_service.cc @@ -65,6 +65,8 @@ void Service::Init(util::AcceptServer* acceptor) { } void Service::Shutdown() { + VLOG(1) << "Service::Shutdown"; + engine_varz.reset(); request_latency_usec.Shutdown(); ping_qps.Shutdown(); diff --git a/server/reply_builder.cc b/server/reply_builder.cc index 68af563d7..c4e56862c 100644 --- a/server/reply_builder.cc +++ b/server/reply_builder.cc @@ -30,7 +30,30 @@ BaseSerializer::BaseSerializer(io::Sink* sink) : sink_(sink) { } void BaseSerializer::Send(const iovec* v, uint32_t len) { - error_code ec = sink_->Write(v, len); + if (should_batch_) { + // TODO: to introduce flushing when too much data is batched. + for (unsigned i = 0; i < len; ++i) { + std::string_view src((char*)v[i].iov_base, v[i].iov_len); + DVLOG(2) << "Appending to stream " << sink_ << " " << src; + batch_.append(src.data(), src.size()); + } + return; + } + + error_code ec; + if (batch_.empty()) { + ec = sink_->Write(v, len); + } else { + DVLOG(1) << "Sending batch to stream " << sink_ << "\n" << batch_; + + iovec tmp[len + 1]; + tmp[0].iov_base = batch_.data(); + tmp[0].iov_len = batch_.size(); + copy(v, v + len, tmp + 1); + ec = sink_->Write(tmp, len + 1); + batch_.clear(); + } + if (ec) { ec_ = ec; } diff --git a/server/reply_builder.h b/server/reply_builder.h index 667f8b835..06837fe62 100644 --- a/server/reply_builder.h +++ b/server/reply_builder.h @@ -22,18 +22,24 @@ class BaseSerializer { ec_ = std::make_error_code(std::errc::connection_aborted); } + // In order to reduce interrupt rate we allow coalescing responses together using + // Batch mode. It is controlled by Connection state machine because it makes sense only + // when pipelined requests are arriving. + void SetBatchMode(bool batch) { + should_batch_ = batch; + } + //! Sends a string as is without any formatting. raw should be encoded according to the protocol. void SendDirect(std::string_view str); - ::io::Sink* sink() { - return sink_; - } - void Send(const iovec* v, uint32_t len); private: ::io::Sink* sink_; std::error_code ec_; + std::string batch_; + + bool should_batch_ = false; }; class RespSerializer : public BaseSerializer { @@ -89,6 +95,10 @@ class ReplyBuilder { void SendGetReply(std::string_view key, uint32_t flags, std::string_view value); void SendGetNotFound(); + void SetBatchMode(bool mode) { + serializer_->SetBatchMode(mode); + } + private: RespSerializer* as_resp() { return static_cast(serializer_.get());