diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index f415b0969..3f79624cc 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -14,6 +14,7 @@ #include "facade/memcache_parser.h" #include "facade/redis_parser.h" #include "facade/service_interface.h" +#include "util/fibers/proactor_base.h" #ifdef DFLY_USE_SSL #include "util/tls/tls_socket.h" @@ -440,9 +441,6 @@ io::Result Connection::CheckForHttpProto(FiberSocketBase* peer) { void Connection::ConnectionFlow(FiberSocketBase* peer) { stats_ = service_->GetThreadLocalConnectionStats(); - auto dispatch_fb = - fb2::Fiber(dfly::Launch::dispatch, "connection_dispatch", [&] { DispatchFiber(peer); }); - ++stats_->num_conns; ++stats_->conn_received_cnt; stats_->read_buf_capacity += io_buf_.Capacity(); @@ -479,7 +477,8 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { cc_->conn_closing = true; // Signal dispatch to close. evc_.notify(); VLOG(1) << "Before dispatch_fb.join()"; - dispatch_fb.Join(); + if (dispatch_fb_.IsJoinable()) + dispatch_fb_.Join(); VLOG(1) << "After dispatch_fb.join()"; service_->OnClose(cc_.get()); @@ -840,6 +839,12 @@ void Connection::ShutdownSelf() { } void Connection::Migrate(util::fb2::ProactorBase* dest) { + // Migrate is used only by replication, so it doesn't have properties of full-fledged + // connections + CHECK(!cc_->async_dispatch); + CHECK_EQ(cc_->subscriptions, 0); // are bound to thread local caches + CHECK(!dispatch_fb_.IsJoinable()); // can't move once it started + owner()->Migrate(this, dest); } @@ -873,12 +878,20 @@ void Connection::SendMonitorMessageAsync(string msg) { void Connection::SendAsync(MessageHandle msg) { DCHECK(cc_); + DCHECK(owner()); + DCHECK_EQ(ProactorBase::me(), owner()->socket()->proactor()); if (cc_->conn_closing) return; - dispatch_q_bytes_.fetch_add(msg.UsedMemory(), memory_order_relaxed); + if (!dispatch_fb_.IsJoinable()) { + DCHECK_EQ(dispatch_q_.size(), 0u); + auto* peer = socket_.get(); + dispatch_fb_ = + fb2::Fiber(dfly::Launch::post, "connection_dispatch", [&] { DispatchFiber(peer); }); + } + dispatch_q_bytes_.fetch_add(msg.UsedMemory(), memory_order_relaxed); dispatch_q_.push_back(move(msg)); // Don't notify if a sync dispatch is in progress, it will wake after finishing. diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index ae12ad575..46240a6a5 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -150,6 +150,7 @@ class Connection : public util::Connection { std::string RemoteEndpointStr() const; std::string RemoteEndpointAddress() const; std::string LocalBindAddress() const; + uint32_t GetClientId() const; // Virtual because behavior is overridden in test_utils. virtual bool IsAdmin() const; @@ -219,6 +220,7 @@ class Connection : public util::Connection { private: std::deque dispatch_q_; // dispatch queue dfly::EventCount evc_; // dispatch queue waker + util::fb2::Fiber dispatch_fb_; // dispatch fiber (if started) std::atomic_uint64_t dispatch_q_bytes_ = 0; // memory usage of all entries dfly::EventCount evc_bp_; // backpressure for memory limit diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 5a25f2319..4ed2e7f51 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -22,6 +22,13 @@ template void IterateKeys(CmdArgList args, KeyIndex keys, F&& f) { f(args[*keys.bonus]); } +void CheckConnStateClean(const ConnectionState& state) { + DCHECK_EQ(state.exec_info.state, ConnectionState::ExecInfo::EXEC_INACTIVE); + DCHECK(state.exec_info.body.empty()); + DCHECK(!state.script_info); + DCHECK(!state.subscribe_info); +} + } // namespace MultiCommandSquasher::MultiCommandSquasher(absl::Span cmds, ConnectionContext* cntx, @@ -124,6 +131,7 @@ OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard auto* local_tx = sinfo.local_tx.get(); facade::CapturingReplyBuilder crb; ConnectionContext local_cntx{local_tx, &crb}; + local_cntx.conn_state.db_index = cntx_->conn_state.db_index; absl::InlinedVector arg_vec; @@ -145,10 +153,15 @@ OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard local_cntx.cid = cmd->Cid(); crb.SetReplyMode(cmd->ReplyMode()); - local_tx->InitByArgs(parent_tx->GetDbIndex(), args); + local_tx->InitByArgs(local_cntx.conn_state.db_index, args); service_->InvokeCmd(cmd->Cid(), args, &local_cntx); sinfo.replies.emplace_back(crb.Take()); + + // Assert commands made no persistent state changes to stub context state + const auto& local_state = local_cntx.conn_state; + DCHECK_EQ(local_state.db_index, cntx_->conn_state.db_index); + CheckConnStateClean(local_state); } // ConnectionContext deletes the reply builder upon destruction, so @@ -160,6 +173,8 @@ OpStatus MultiCommandSquasher::SquashedHopCb(Transaction* parent_tx, EngineShard } bool MultiCommandSquasher::ExecuteSquashed() { + DCHECK(!cntx_->conn_state.exec_info.IsCollecting()); + if (order_.empty()) return false; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 0a940728d..96799fbab 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -66,6 +66,9 @@ Transaction::~Transaction() { } void Transaction::InitBase(DbIndex dbid, CmdArgList args) { + // Switching db index is only possible for non-atomic execution + DCHECK(!multi_ || (db_index_ == dbid || multi_->mode == NON_ATOMIC)); + global_ = false; db_index_ = dbid; full_args_ = args;