From 9f7ee25b661ab396ffcd6a613ba63314ae39106b Mon Sep 17 00:00:00 2001 From: Volodymyr Yavdoshenko Date: Mon, 5 May 2025 18:26:14 +0300 Subject: [PATCH] fix: connections logging was updated --- helio | 2 +- src/facade/CMakeLists.txt | 2 +- src/facade/dragonfly_connection.cc | 10 ++- src/facade/socket_utils.cc | 65 ++++++++++++++++ src/facade/socket_utils.h | 14 ++++ src/server/cluster/incoming_slot_migration.cc | 13 +++- src/server/cluster/outgoing_slot_migration.cc | 28 +++++-- src/server/common.cc | 11 ++- src/server/protocol_client.cc | 3 + src/server/replica.cc | 48 ++++++++++-- src/server/replica.h | 3 + src/server/server_family_test.cc | 77 +++++++++++++++++++ 12 files changed, 255 insertions(+), 21 deletions(-) create mode 100644 src/facade/socket_utils.cc create mode 100644 src/facade/socket_utils.h diff --git a/helio b/helio index e1e3934b6..ec5495235 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit e1e3934b656a258c58125c18c7524dd6438c5585 +Subproject commit ec5495235d3001fe4233f88a528a1432e23d5985 diff --git a/src/facade/CMakeLists.txt b/src/facade/CMakeLists.txt index 6e8d65fa9..1919f484b 100644 --- a/src/facade/CMakeLists.txt +++ b/src/facade/CMakeLists.txt @@ -3,7 +3,7 @@ cxx_link(dfly_parser_lib base strings_lib) add_library(dfly_facade conn_context.cc dragonfly_listener.cc dragonfly_connection.cc facade.cc memcache_parser.cc reply_builder.cc op_status.cc service_interface.cc - reply_capture.cc cmd_arg_parser.cc tls_helpers.cc) + reply_capture.cc cmd_arg_parser.cc tls_helpers.cc socket_utils.cc) if (DF_USE_SSL) set(TLS_LIB tls_lib) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index c3e0acf7f..44a5c30f7 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -24,6 +24,7 @@ #include "facade/memcache_parser.h" #include "facade/redis_parser.h" #include "facade/service_interface.h" +#include "facade/socket_utils.h" #include "glog/logging.h" #include "io/file.h" #include "util/fibers/fibers.h" @@ -740,7 +741,8 @@ void Connection::HandleRequests() { uint8_t buf[2]; auto read_sz = socket_->Read(io::MutableBytes(buf)); if (!read_sz || *read_sz < sizeof(buf)) { - VLOG(1) << "Error reading from peer " << remote_ep << " " << read_sz.error().message(); + VLOG(1) << "Error reading from peer " << remote_ep << " " << read_sz.error().message() + << ", socket state: " + dfly::GetSocketInfo(socket_->native_handle()); return; } if (buf[0] != 0x16 || buf[1] != 0x03) { @@ -761,7 +763,8 @@ void Connection::HandleRequests() { FiberSocketBase::AcceptResult aresult = socket_->Accept(); if (!aresult) { - LOG(INFO) << "Error handshaking " << aresult.error().message(); + LOG(INFO) << "Error handshaking " << aresult.error().message() + << ", socket state: " + dfly::GetSocketInfo(socket_->native_handle()); return; } is_tls_ = 1; @@ -1110,7 +1113,8 @@ void Connection::ConnectionFlow() { if (ec && !FiberSocketBase::IsConnClosed(ec)) { string conn_info = service_->GetContextInfo(cc_.get()).Format(); LOG(WARNING) << "Socket error for connection " << conn_info << " " << GetName() - << " during phase " << kPhaseName[phase_] << " : " << ec << " " << ec.message(); + << " during phase " << kPhaseName[phase_] << " : " << ec << " " << ec.message() + << ", socket state: " + dfly::GetSocketInfo(socket_->native_handle()); } } diff --git a/src/facade/socket_utils.cc b/src/facade/socket_utils.cc new file mode 100644 index 000000000..e842cf59f --- /dev/null +++ b/src/facade/socket_utils.cc @@ -0,0 +1,65 @@ +// Copyright 2022, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "socket_utils.h" + +#ifdef __linux__ +#include +#include +#include +#include + +#include "absl/strings/str_cat.h" +#include "io/proc_reader.h" + +#endif + +namespace dfly { + +// Returns information about the TCP socket state by its descriptor +std::string GetSocketInfo(int socket_fd) { + if (socket_fd < 0) + return "invalid socket"; + +#ifdef __linux__ + struct stat sock_stat; + if (fstat(socket_fd, &sock_stat) != 0) { + return "could not stat socket"; + } + + auto tcp_info = io::ReadTcpInfo(sock_stat.st_ino); + if (!tcp_info) { + auto tcp6_info = io::ReadTcp6Info(sock_stat.st_ino); + if (!tcp6_info) { + return "socket not found in /proc/net/tcp or /proc/net/tcp6"; + } + tcp_info = std::move(tcp6_info); + } + + std::string state_str = io::TcpStateToString(tcp_info->state); + + if (tcp_info->is_ipv6) { + char local_ip[INET6_ADDRSTRLEN], remote_ip[INET6_ADDRSTRLEN]; + inet_ntop(AF_INET6, &tcp_info->local_addr6, local_ip, sizeof(local_ip)); + inet_ntop(AF_INET6, &tcp_info->remote_addr6, remote_ip, sizeof(remote_ip)); + return absl::StrCat("State: ", state_str, ", Local: [", local_ip, "]:", tcp_info->local_port, + ", Remote: [", remote_ip, "]:", tcp_info->remote_port, + ", Inode: ", tcp_info->inode); + } else { + char local_ip[INET_ADDRSTRLEN], remote_ip[INET_ADDRSTRLEN]; + struct in_addr addr; + addr.s_addr = htonl(tcp_info->local_addr); + inet_ntop(AF_INET, &addr, local_ip, sizeof(local_ip)); + addr.s_addr = htonl(tcp_info->remote_addr); + inet_ntop(AF_INET, &addr, remote_ip, sizeof(remote_ip)); + return absl::StrCat("State: ", state_str, ", Local: ", local_ip, ":", tcp_info->local_port, + ", Remote: ", remote_ip, ":", tcp_info->remote_port, + ", Inode: ", tcp_info->inode); + } +#else + return "socket info not available on this platform"; +#endif +} + +} // namespace dfly diff --git a/src/facade/socket_utils.h b/src/facade/socket_utils.h new file mode 100644 index 000000000..91d90cf4a --- /dev/null +++ b/src/facade/socket_utils.h @@ -0,0 +1,14 @@ +// Copyright 2022, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include + +namespace dfly { + +// Returns information about the TCP socket state by its descriptor +std::string GetSocketInfo(int socket_fd); + +} // namespace dfly diff --git a/src/server/cluster/incoming_slot_migration.cc b/src/server/cluster/incoming_slot_migration.cc index 2a6c8057a..f40dcc3e2 100644 --- a/src/server/cluster/incoming_slot_migration.cc +++ b/src/server/cluster/incoming_slot_migration.cc @@ -10,6 +10,7 @@ #include "base/flags.h" #include "base/logging.h" #include "cluster_utility.h" +#include "facade/socket_utils.h" #include "server/error.h" #include "server/journal/executor.h" #include "server/journal/tx_executor.h" @@ -67,6 +68,11 @@ class ClusterShardMigration { auto tx_data = tx_reader.NextTxData(&reader, cntx); if (!tx_data) { + if (cntx->GetError()) { + LOG(WARNING) << "Error reading from migration socket for shard " << source_shard_id_ + << ": " << cntx->GetError().Format() + << ", socket state: " << GetSocketInfo(source->native_handle()); + } break; } @@ -105,7 +111,12 @@ class ClusterShardMigration { if (socket_ != nullptr) { return socket_->proactor()->Await([s = socket_]() { if (s->IsOpen()) { - return s->Shutdown(SHUT_RDWR); // Does not Close(), only forbids further I/O. + auto ec = s->Shutdown(SHUT_RDWR); // Does not Close(), only forbids further I/O. + if (ec) { + LOG(WARNING) << "Error shutting down socket for shard migration: " << ec.message() + << ", socket state: " << GetSocketInfo(s->native_handle()); + } + return ec; } return std::error_code(); }); diff --git a/src/server/cluster/outgoing_slot_migration.cc b/src/server/cluster/outgoing_slot_migration.cc index 52d47b07b..87d027489 100644 --- a/src/server/cluster/outgoing_slot_migration.cc +++ b/src/server/cluster/outgoing_slot_migration.cc @@ -12,6 +12,7 @@ #include "base/logging.h" #include "cluster_family.h" #include "cluster_utility.h" +#include "facade/socket_utils.h" #include "server/db_slice.h" #include "server/engine_shard_set.h" #include "server/error.h" @@ -51,6 +52,9 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient { VLOG(1) << "Connecting to source node_id " << node_id << " shard_id " << shard_id; auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms; if (auto ec = ConnectAndAuth(timeout, &exec_st_); ec) { + LOG(WARNING) << "Couldn't connect to source node_id " << node_id << " shard_id " << shard_id + << ": " << ec.message() + << ", socket state: " + GetSocketInfo(Sock()->native_handle()); exec_st_.ReportError(GenericError(ec, "Couldn't connect to source.")); return; } @@ -203,7 +207,10 @@ void OutgoingMigration::SyncFb() { VLOG(1) << "Connecting to target node"; auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms; if (auto ec = ConnectAndAuth(timeout, &exec_st_); ec) { - LOG(WARNING) << "Can't connect to target node"; + std::string socket_info; + LOG(WARNING) << "Can't connect to target node " << server().Description() + << " for migration: " << ec.message() + << ", socket state: " + GetSocketInfo(Sock()->native_handle()); exec_st_.ReportError(GenericError(ec, "Couldn't connect to source.")); continue; } @@ -216,7 +223,10 @@ void OutgoingMigration::SyncFb() { } if (auto ec = SendCommandAndReadResponse(cmd); ec) { - LOG(WARNING) << "Can't connect to target node"; + std::string socket_info; + LOG(WARNING) << "Could not send INIT command to " << server().Description() + << " for migration: " << ec.message() + << ", socket state: " + GetSocketInfo(Sock()->native_handle()); exec_st_.ReportError(GenericError(ec, "Could not send INIT command.")); continue; } @@ -296,8 +306,9 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { } auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms; if (auto ec = ConnectAndAuth(timeout, &exec_st_); ec) { - LOG(WARNING) << "Couldn't connect " << cf_->MyID() << " : " << migration_info_.node_info.id - << " attempt " << attempt; + LOG(WARNING) << "Couldn't connect to " << cf_->MyID() << " : " << migration_info_.node_info.id + << " attempt " << attempt << ": " << ec.message() + << ", socket state: " + GetSocketInfo(Sock()->native_handle()); return false; } } @@ -334,7 +345,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { VLOG(1) << "send " << cmd; if (auto err = SendCommand(cmd); err) { - LOG(WARNING) << "Error during sending DFLYMIGRATE ACK: " << err.message(); + LOG(WARNING) << "Error during sending DFLYMIGRATE ACK to " << server().Description() << ": " + << err.message() << ", socket state: " + GetSocketInfo(Sock()->native_handle()); return false; } @@ -351,7 +363,9 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { } if (auto resp = ReadRespReply(absl::ToInt64Milliseconds(passed - timeout)); !resp) { - LOG(WARNING) << resp.error(); + LOG(WARNING) << "Error reading response to ACK command from " << server().Description() + << ": " << resp.error() + << ", socket state: " + GetSocketInfo(Sock()->native_handle()); return false; } @@ -382,6 +396,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) { void OutgoingMigration::Start() { VLOG(1) << "Resolving host DNS for outgoing migration"; if (error_code ec = ResolveHostDns(); ec) { + LOG(WARNING) << "Could not resolve host DNS for outgoing migration to " + << server().Description() << ": " << ec.message(); exec_st_.ReportError(GenericError(ec, "Could not resolve host dns.")); return; } diff --git a/src/server/common.cc b/src/server/common.cc index 93f5a72b0..d5644c4e7 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -363,7 +363,16 @@ GenericError ExecutionState::GetError() const { } void ExecutionState::ReportCancelError() { - ReportError(std::make_error_code(errc::operation_canceled), "ExecutionState cancelled"); + std::string cancel_reason = "ExecutionState cancelled"; + + // Add additional information about the reason for cancellation, if possible + // Possible to extract from system errors or context + std::error_code sys_err = std::error_code(errno, std::system_category()); + if (sys_err && sys_err != std::errc::operation_canceled) { + absl::StrAppend(&cancel_reason, " due to system error: ", sys_err.message()); + } + + ReportError(std::make_error_code(errc::operation_canceled), cancel_reason); } void ExecutionState::Reset(ErrHandler handler) { diff --git a/src/server/protocol_client.cc b/src/server/protocol_client.cc index 8ecd6f65d..3cf61fdad 100644 --- a/src/server/protocol_client.cc +++ b/src/server/protocol_client.cc @@ -22,6 +22,7 @@ extern "C" { #include "base/logging.h" #include "facade/dragonfly_connection.h" #include "facade/redis_parser.h" +#include "facade/socket_utils.h" #include "server/error.h" #include "server/journal/executor.h" #include "server/journal/serializer.h" @@ -234,6 +235,8 @@ void ProtocolClient::CloseSocket() { } void ProtocolClient::DefaultErrorHandler(const GenericError& err) { + LOG(WARNING) << "Socket error: " << err.Format() << " in " << server_context_.Description() + << ", socket info: " << GetSocketInfo(sock_->native_handle()); CloseSocket(); } diff --git a/src/server/replica.cc b/src/server/replica.cc index 32879b6e6..92ad7c4ac 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -24,6 +24,7 @@ extern "C" { #include "base/logging.h" #include "facade/redis_parser.h" +#include "facade/socket_utils.h" #include "server/error.h" #include "server/journal/executor.h" #include "server/journal/serializer.h" @@ -206,7 +207,8 @@ void Replica::MainReplicationFb(std::optional last_master_sy ec = ResolveHostDns(); if (ec) { - LOG(ERROR) << "Error resolving dns to " << server().host << " " << ec; + LOG(ERROR) << "Error resolving dns to " << server().host << " (phase: " << GetCurrentPhase() + << "): " << ec; continue; } @@ -214,7 +216,9 @@ void Replica::MainReplicationFb(std::optional last_master_sy reconnect_count_++; ec = ConnectAndAuth(absl::GetFlag(FLAGS_master_reconnect_timeout_ms) * 1ms, &exec_st_); if (ec) { - LOG(WARNING) << "Error connecting to " << server().Description() << " " << ec; + LOG(WARNING) << "Error connecting to " << server().Description() + << " (phase: " << GetCurrentPhase() << "): " << ec + << ", reason: " << ec.message(); continue; } VLOG(1) << "Replica socket connected"; @@ -226,8 +230,9 @@ void Replica::MainReplicationFb(std::optional last_master_sy if ((state_mask_.load() & R_GREETED) == 0) { ec = Greet(); if (ec) { - LOG(INFO) << "Error greeting " << server().Description() << " " << ec << " " - << ec.message(); + LOG(WARNING) << "Error greeting " << server().Description() + << " (phase: " << GetCurrentPhase() << "): " << ec << " " << ec.message() + << ", socket state: " + GetSocketInfo(Sock()->native_handle()); state_mask_.fetch_and(R_ENABLED); continue; } @@ -243,8 +248,9 @@ void Replica::MainReplicationFb(std::optional last_master_sy ec = InitiatePSync(); if (ec) { - LOG(WARNING) << "Error syncing with " << server().Description() << " " << ec << " " - << ec.message(); + LOG(WARNING) << "Error syncing with " << server().Description() + << " (phase: " << GetCurrentPhase() << "): " << ec << " " << ec.message() + << ", socket state: " + GetSocketInfo(Sock()->native_handle()); state_mask_.fetch_and(R_ENABLED); // reset all flags besides R_ENABLED continue; } @@ -262,8 +268,9 @@ void Replica::MainReplicationFb(std::optional last_master_sy auto state = state_mask_.fetch_and(R_ENABLED); if (state & R_ENABLED) { // replication was not stopped. - LOG(WARNING) << "Error stable sync with " << server().Description() << " " << ec << " " - << ec.message(); + LOG(WARNING) << "Error stable sync with " << server().Description() + << " (phase: " << GetCurrentPhase() << "): " << ec << " " << ec.message() + << ", socket state: " + GetSocketInfo(Sock()->native_handle()); } } @@ -642,6 +649,9 @@ error_code Replica::ConsumeRedisStream() { auto response = ReadRespReply(&io_buf, /*copy_msg=*/false); if (!response.has_value()) { VLOG(1) << "ConsumeRedisStream finished"; + LOG(ERROR) << "Error in Redis Stream at phase " << GetCurrentPhase() << " with " + << server().Description() << ", error: " << response.error() + << ", socket state: " + GetSocketInfo(Sock()->native_handle()); exec_st_.ReportError(response.error()); acks_fb_.JoinIfNeeded(); return response.error(); @@ -679,6 +689,11 @@ error_code Replica::ConsumeDflyStream() { auto err_handler = [this](const auto& ge) { // Make sure the flows are not in a state transition lock_guard lk{flows_op_mu_}; + + LOG(ERROR) << "DflyStream error in phase " << GetCurrentPhase() << " with " + << server().Description() << ", error: " << ge.Format() + << ", socket state: " + GetSocketInfo(Sock()->native_handle()); + DefaultErrorHandler(ge); for (auto& flow : shard_flows_) { flow->Cancel(); @@ -1171,6 +1186,23 @@ std::string Replica::GetSyncId() const { return master_context_.dfly_session_id; } +std::string Replica::GetCurrentPhase() const { + uint32_t state = state_mask_.load(); + + if (!(state & R_ENABLED)) + return "DISABLED"; + if (!(state & R_TCP_CONNECTED)) + return "TCP_CONNECTING"; + if (!(state & R_GREETED)) + return "GREETING"; + if (!(state & R_SYNC_OK)) + return "INITIAL_SYNC"; + if (state & R_SYNCING) + return "FULL_SYNC_IN_PROGRESS"; + + return "STABLE_SYNC"; +} + uint32_t DflyShardReplica::FlowId() const { return flow_id_; } diff --git a/src/server/replica.h b/src/server/replica.h index c2f7aaefc..4127d5299 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -140,6 +140,9 @@ class Replica : ProtocolClient { std::vector GetReplicaOffset() const; std::string GetSyncId() const; + // Get the current replication phase based on state_mask_ + std::string GetCurrentPhase() const; + private: util::fb2::ProactorBase* proactor_ = nullptr; Service& service_; diff --git a/src/server/server_family_test.cc b/src/server/server_family_test.cc index efdc0eb04..5a2c85b6b 100644 --- a/src/server/server_family_test.cc +++ b/src/server/server_family_test.cc @@ -11,6 +11,7 @@ #include "base/gtest.h" #include "base/logging.h" #include "facade/facade_test.h" +#include "facade/socket_utils.h" #include "server/test_utils.h" using namespace testing; @@ -26,6 +27,82 @@ class ServerFamilyTest : public BaseFamilyTest { protected: }; +#ifdef __linux__ +TEST_F(ServerFamilyTest, ReadTcpInfo) { + // Create a TCP socket + int sockfd = socket(AF_INET, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0) << "Failed to create socket"; + + // We'll create a socket in LISTEN state + struct sockaddr_in server_addr; + memset(&server_addr, 0, sizeof(server_addr)); + server_addr.sin_family = AF_INET; + server_addr.sin_addr.s_addr = INADDR_ANY; + server_addr.sin_port = 0; // Let the system choose a free port + + // Bind to the port + ASSERT_EQ(bind(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)), 0) + << "Failed to bind socket: " << strerror(errno); + + // Start listening + ASSERT_EQ(listen(sockfd, 1), 0) << "Failed to listen on socket: " << strerror(errno); + + // Get socket info + std::string socket_info = GetSocketInfo(sockfd); + std::cout << "Socket info for valid socket: " << socket_info << std::endl; + EXPECT_FALSE(socket_info.empty()) << "Socket info should not be empty"; + + // The socket info should contain some recognizable patterns + // For a listening socket, it should contain information about the local address + EXPECT_NE(socket_info.find("State: LISTEN"), std::string::npos) + << "Socket info doesn't contain expected local address pattern"; + + // Close the socket + close(sockfd); + + // Test invalid socket + socket_info = GetSocketInfo(-1); + EXPECT_EQ(socket_info, "invalid socket"); +} + +TEST_F(ServerFamilyTest, GetTcpSocketInfoIPv6) { + // Create an IPv6 TCP socket + int sockfd = socket(AF_INET6, SOCK_STREAM, 0); + ASSERT_GT(sockfd, 0) << "Failed to create IPv6 socket"; + + // We'll create a socket in LISTEN state + struct sockaddr_in6 server_addr; + memset(&server_addr, 0, sizeof(server_addr)); + server_addr.sin6_family = AF_INET6; + server_addr.sin6_addr = in6addr_any; + server_addr.sin6_port = 0; // Let the system choose a free port + + // Bind to the port + ASSERT_EQ(bind(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)), 0) + << "Failed to bind IPv6 socket: " << strerror(errno); + + // Start listening + ASSERT_EQ(listen(sockfd, 1), 0) << "Failed to listen on IPv6 socket: " << strerror(errno); + + // Get socket info + std::string socket_info = GetSocketInfo(sockfd); + std::cout << "Socket info for valid IPv6 socket: " << socket_info << std::endl; + EXPECT_FALSE(socket_info.empty()) << "IPv6 socket info should not be empty"; + + // The socket info should contain some recognizable patterns + // For a listening IPv6 socket, it should contain information about the local address + EXPECT_NE(socket_info.find("State: LISTEN"), std::string::npos) + << "IPv6 socket info doesn't contain expected LISTEN state"; + + // If IPv6 support works correctly, the socket info should indicate an IPv6 address format + EXPECT_NE(socket_info.find("Local: ["), std::string::npos) + << "IPv6 socket info doesn't use IPv6 address format"; + + // Close the socket + close(sockfd); +} +#endif + TEST_F(ServerFamilyTest, SlowLogArgsCountTruncation) { auto resp = Run({"config", "set", "slowlog_max_len", "3"}); EXPECT_THAT(resp.GetString(), "OK");