chore: Connection fixes (#1663)

* chore: Connection safety checks

Signed-off-by: Vladislav <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2023-08-09 17:57:41 +03:00 committed by GitHub
parent f9a3e2811c
commit b9e8a2c0da
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 39 additions and 6 deletions

View file

@ -14,6 +14,7 @@
#include "facade/memcache_parser.h"
#include "facade/redis_parser.h"
#include "facade/service_interface.h"
#include "util/fibers/proactor_base.h"
#ifdef DFLY_USE_SSL
#include "util/tls/tls_socket.h"
@ -440,9 +441,6 @@ io::Result<bool> Connection::CheckForHttpProto(FiberSocketBase* peer) {
void Connection::ConnectionFlow(FiberSocketBase* peer) {
stats_ = service_->GetThreadLocalConnectionStats();
auto dispatch_fb =
fb2::Fiber(dfly::Launch::dispatch, "connection_dispatch", [&] { DispatchFiber(peer); });
++stats_->num_conns;
++stats_->conn_received_cnt;
stats_->read_buf_capacity += io_buf_.Capacity();
@ -479,7 +477,8 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
cc_->conn_closing = true; // Signal dispatch to close.
evc_.notify();
VLOG(1) << "Before dispatch_fb.join()";
dispatch_fb.Join();
if (dispatch_fb_.IsJoinable())
dispatch_fb_.Join();
VLOG(1) << "After dispatch_fb.join()";
service_->OnClose(cc_.get());
@ -840,6 +839,12 @@ void Connection::ShutdownSelf() {
}
void Connection::Migrate(util::fb2::ProactorBase* dest) {
// Migrate is used only by replication, so it doesn't have properties of full-fledged
// connections
CHECK(!cc_->async_dispatch);
CHECK_EQ(cc_->subscriptions, 0); // are bound to thread local caches
CHECK(!dispatch_fb_.IsJoinable()); // can't move once it started
owner()->Migrate(this, dest);
}
@ -873,12 +878,20 @@ void Connection::SendMonitorMessageAsync(string msg) {
void Connection::SendAsync(MessageHandle msg) {
DCHECK(cc_);
DCHECK(owner());
DCHECK_EQ(ProactorBase::me(), owner()->socket()->proactor());
if (cc_->conn_closing)
return;
dispatch_q_bytes_.fetch_add(msg.UsedMemory(), memory_order_relaxed);
if (!dispatch_fb_.IsJoinable()) {
DCHECK_EQ(dispatch_q_.size(), 0u);
auto* peer = socket_.get();
dispatch_fb_ =
fb2::Fiber(dfly::Launch::post, "connection_dispatch", [&] { DispatchFiber(peer); });
}
dispatch_q_bytes_.fetch_add(msg.UsedMemory(), memory_order_relaxed);
dispatch_q_.push_back(move(msg));
// Don't notify if a sync dispatch is in progress, it will wake after finishing.

View file

@ -150,6 +150,7 @@ class Connection : public util::Connection {
std::string RemoteEndpointStr() const;
std::string RemoteEndpointAddress() const;
std::string LocalBindAddress() const;
uint32_t GetClientId() const;
// Virtual because behavior is overridden in test_utils.
virtual bool IsAdmin() const;
@ -219,6 +220,7 @@ class Connection : public util::Connection {
private:
std::deque<MessageHandle> dispatch_q_; // dispatch queue
dfly::EventCount evc_; // dispatch queue waker
util::fb2::Fiber dispatch_fb_; // dispatch fiber (if started)
std::atomic_uint64_t dispatch_q_bytes_ = 0; // memory usage of all entries
dfly::EventCount evc_bp_; // backpressure for memory limit

View file

@ -22,6 +22,13 @@ template <typename F> void IterateKeys(CmdArgList args, KeyIndex keys, F&& f) {
f(args[*keys.bonus]);
}
void CheckConnStateClean(const ConnectionState& state) {
DCHECK_EQ(state.exec_info.state, ConnectionState::ExecInfo::EXEC_INACTIVE);
DCHECK(state.exec_info.body.empty());
DCHECK(!state.script_info);
DCHECK(!state.subscribe_info);
}
} // namespace
MultiCommandSquasher::MultiCommandSquasher(absl::Span<StoredCmd> cmds, ConnectionContext* cntx,
@ -124,6 +131,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard
auto* local_tx = sinfo.local_tx.get();
facade::CapturingReplyBuilder crb;
ConnectionContext local_cntx{local_tx, &crb};
local_cntx.conn_state.db_index = cntx_->conn_state.db_index;
absl::InlinedVector<MutableSlice, 4> arg_vec;
@ -145,10 +153,15 @@ OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard
local_cntx.cid = cmd->Cid();
crb.SetReplyMode(cmd->ReplyMode());
local_tx->InitByArgs(parent_tx->GetDbIndex(), args);
local_tx->InitByArgs(local_cntx.conn_state.db_index, args);
service_->InvokeCmd(cmd->Cid(), args, &local_cntx);
sinfo.replies.emplace_back(crb.Take());
// Assert commands made no persistent state changes to stub context state
const auto& local_state = local_cntx.conn_state;
DCHECK_EQ(local_state.db_index, cntx_->conn_state.db_index);
CheckConnStateClean(local_state);
}
// ConnectionContext deletes the reply builder upon destruction, so
@ -160,6 +173,8 @@ OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard
}
bool MultiCommandSquasher::ExecuteSquashed() {
DCHECK(!cntx_->conn_state.exec_info.IsCollecting());
if (order_.empty())
return false;

View file

@ -66,6 +66,9 @@ Transaction::~Transaction() {
}
void Transaction::InitBase(DbIndex dbid, CmdArgList args) {
// Switching db index is only possible for non-atomic execution
DCHECK(!multi_ || (db_index_ == dbid || multi_->mode == NON_ATOMIC));
global_ = false;
db_index_ = dbid;
full_args_ = args;