diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 1a212af7d..03f9a2c68 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -95,7 +95,7 @@ void UpdateIoBufCapacity(const base::IoBuf& io_buf, ConnectionStats* stats, f(); const size_t capacity = io_buf.Capacity(); if (stats != nullptr && prev_capacity != capacity) { - VLOG(1) << "Grown io_buf to " << capacity; + VLOG(2) << "Grown io_buf to " << capacity; stats->read_buf_capacity += capacity - prev_capacity; } } @@ -294,7 +294,7 @@ void Connection::DispatchOperations::operator()(const MigrationRequestMessage& m } void Connection::DispatchOperations::operator()(CheckpointMessage msg) { - VLOG(1) << "Decremented checkpoint at " << self->DebugInfo(); + VLOG(2) << "Decremented checkpoint at " << self->DebugInfo(); msg.bc.Dec(); } @@ -638,9 +638,9 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { evc_.notify(); phase_ = SHUTTING_DOWN; - VLOG(1) << "Before dispatch_fb.join()"; + VLOG(2) << "Before dispatch_fb.join()"; dispatch_fb_.JoinIfNeeded(); - VLOG(1) << "After dispatch_fb.join()"; + VLOG(2) << "After dispatch_fb.join()"; phase_ = PRECLOSE; @@ -823,13 +823,14 @@ void Connection::OnBreakCb(int32_t mask) { if (mask <= 0) return; // we cancelled the poller, which means we do not need to break from anything. - VLOG(1) << "Got event " << mask; - if (!cc_) { LOG(ERROR) << "Unexpected event " << mask; return; } + VLOG(1) << "[" << id_ << "] Got event " << mask << " " << phase_ << " " + << cc_->reply_builder()->IsSendActive() << " " << cc_->reply_builder()->GetError(); + cc_->conn_closing = true; break_cb_engaged_ = false; // do not attempt to cancel it. @@ -1258,7 +1259,7 @@ void Connection::SendCheckpoint(fb2::BlockingCounter bc, bool ignore_paused, boo if (cc_->blocked && ignore_blocked) return; - VLOG(1) << "Sent checkpoint to " << DebugInfo(); + VLOG(2) << "Sent checkpoint to " << DebugInfo(); bc.Add(1); SendAsync({CheckpointMessage{bc}}); diff --git a/src/facade/dragonfly_listener.cc b/src/facade/dragonfly_listener.cc index d239ad536..cbf912289 100644 --- a/src/facade/dragonfly_listener.cc +++ b/src/facade/dragonfly_listener.cc @@ -36,6 +36,9 @@ ABSL_FLAG(string, tls_ca_cert_dir, "", "ca signed certificates directory"); ABSL_FLAG(uint32_t, tcp_keepalive, 300, "the period in seconds of inactivity after which keep-alives are triggerred," "the duration until an inactive connection is terminated is twice the specified time"); +ABSL_FLAG(uint32_t, tcp_user_timeout, 0, + "the maximum period in milliseconds that transimitted data may stay unacknowledged " + "before TCP aborts the connection. 0 means OS default timeout"); ABSL_DECLARE_FLAG(bool, primary_port_http_enabled); @@ -192,6 +195,13 @@ error_code Listener::ConfigureServerSocket(int fd) { } bool success = ConfigureKeepAlive(fd); +#ifdef __linux__ + int user_timeout = absl::GetFlag(FLAGS_tcp_user_timeout); + if (setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &user_timeout, sizeof(int)) < 0) { + LOG(WARNING) << "Could not set user timeout on socket " << SafeErrorMessage(errno); + } +#endif + if (!success) { #ifndef __APPLE__ int myerr = errno; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 7d62c716c..7a5dc96c9 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1129,7 +1129,7 @@ pair Transaction::ScheduleInShard(EngineShard* shard) { ", contended locks: ", info.contended_locks, "\n"); absl::StrAppend(&msg, "max contention score: ", info.max_contention_score, ", lock: ", info.max_contention_lock_name, - "poll_executions:", shard->stats().poll_execution_total); + ", poll_executions:", shard->stats().poll_execution_total); const Transaction* cont_tx = shard->GetContTx(); if (cont_tx) { absl::StrAppend(&msg, " continuation_tx: ", cont_tx->DebugId(), " ",