feat: introduce user timeout (#2361)

* feat: introduce user timeout

* feat: introduce tcp_user_timeout flag.

See TCP_USER_TIMEOUT flag in tcp(7) man page.
This linux-only setting allows fail faster during the send operation
if for some reason the remote socket is unresponsive and does not send ACKs for
the transmitted segments.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>

* Update src/facade/dragonfly_listener.cc

Co-authored-by: Shahar Mike <chakaz@users.noreply.github.com>
Signed-off-by: Roman Gershman <romange@gmail.com>

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
Signed-off-by: Roman Gershman <romange@gmail.com>
Co-authored-by: Shahar Mike <chakaz@users.noreply.github.com>
This commit is contained in:
Roman Gershman 2024-01-03 08:06:25 +02:00 committed by GitHub
parent 6f9107291e
commit cb3e366459
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 19 additions and 8 deletions

View file

@ -95,7 +95,7 @@ void UpdateIoBufCapacity(const base::IoBuf& io_buf, ConnectionStats* stats,
f();
const size_t capacity = io_buf.Capacity();
if (stats != nullptr && prev_capacity != capacity) {
VLOG(1) << "Grown io_buf to " << capacity;
VLOG(2) << "Grown io_buf to " << capacity;
stats->read_buf_capacity += capacity - prev_capacity;
}
}
@ -294,7 +294,7 @@ void Connection::DispatchOperations::operator()(const MigrationRequestMessage& m
}
void Connection::DispatchOperations::operator()(CheckpointMessage msg) {
VLOG(1) << "Decremented checkpoint at " << self->DebugInfo();
VLOG(2) << "Decremented checkpoint at " << self->DebugInfo();
msg.bc.Dec();
}
@ -638,9 +638,9 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) {
evc_.notify();
phase_ = SHUTTING_DOWN;
VLOG(1) << "Before dispatch_fb.join()";
VLOG(2) << "Before dispatch_fb.join()";
dispatch_fb_.JoinIfNeeded();
VLOG(1) << "After dispatch_fb.join()";
VLOG(2) << "After dispatch_fb.join()";
phase_ = PRECLOSE;
@ -823,13 +823,14 @@ void Connection::OnBreakCb(int32_t mask) {
if (mask <= 0)
return; // we cancelled the poller, which means we do not need to break from anything.
VLOG(1) << "Got event " << mask;
if (!cc_) {
LOG(ERROR) << "Unexpected event " << mask;
return;
}
VLOG(1) << "[" << id_ << "] Got event " << mask << " " << phase_ << " "
<< cc_->reply_builder()->IsSendActive() << " " << cc_->reply_builder()->GetError();
cc_->conn_closing = true;
break_cb_engaged_ = false; // do not attempt to cancel it.
@ -1258,7 +1259,7 @@ void Connection::SendCheckpoint(fb2::BlockingCounter bc, bool ignore_paused, boo
if (cc_->blocked && ignore_blocked)
return;
VLOG(1) << "Sent checkpoint to " << DebugInfo();
VLOG(2) << "Sent checkpoint to " << DebugInfo();
bc.Add(1);
SendAsync({CheckpointMessage{bc}});

View file

@ -36,6 +36,9 @@ ABSL_FLAG(string, tls_ca_cert_dir, "", "ca signed certificates directory");
ABSL_FLAG(uint32_t, tcp_keepalive, 300,
"the period in seconds of inactivity after which keep-alives are triggerred,"
"the duration until an inactive connection is terminated is twice the specified time");
ABSL_FLAG(uint32_t, tcp_user_timeout, 0,
"the maximum period in milliseconds that transimitted data may stay unacknowledged "
"before TCP aborts the connection. 0 means OS default timeout");
ABSL_DECLARE_FLAG(bool, primary_port_http_enabled);
@ -192,6 +195,13 @@ error_code Listener::ConfigureServerSocket(int fd) {
}
bool success = ConfigureKeepAlive(fd);
#ifdef __linux__
int user_timeout = absl::GetFlag(FLAGS_tcp_user_timeout);
if (setsockopt(fd, IPPROTO_TCP, TCP_USER_TIMEOUT, &user_timeout, sizeof(int)) < 0) {
LOG(WARNING) << "Could not set user timeout on socket " << SafeErrorMessage(errno);
}
#endif
if (!success) {
#ifndef __APPLE__
int myerr = errno;

View file

@ -1129,7 +1129,7 @@ pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) {
", contended locks: ", info.contended_locks, "\n");
absl::StrAppend(&msg, "max contention score: ", info.max_contention_score,
", lock: ", info.max_contention_lock_name,
"poll_executions:", shard->stats().poll_execution_total);
", poll_executions:", shard->stats().poll_execution_total);
const Transaction* cont_tx = shard->GetContTx();
if (cont_tx) {
absl::StrAppend(&msg, " continuation_tx: ", cont_tx->DebugId(), " ",