fix: connections logging was updated

This commit is contained in:
Volodymyr Yavdoshenko 2025-05-05 18:26:14 +03:00
parent 6a84ad0208
commit 9f7ee25b66
No known key found for this signature in database
GPG key ID: 24BC74845F4F4064
12 changed files with 255 additions and 21 deletions

2
helio

@ -1 +1 @@
Subproject commit e1e3934b656a258c58125c18c7524dd6438c5585
Subproject commit ec5495235d3001fe4233f88a528a1432e23d5985

View file

@ -3,7 +3,7 @@ cxx_link(dfly_parser_lib base strings_lib)
add_library(dfly_facade conn_context.cc dragonfly_listener.cc dragonfly_connection.cc facade.cc
memcache_parser.cc reply_builder.cc op_status.cc service_interface.cc
reply_capture.cc cmd_arg_parser.cc tls_helpers.cc)
reply_capture.cc cmd_arg_parser.cc tls_helpers.cc socket_utils.cc)
if (DF_USE_SSL)
set(TLS_LIB tls_lib)

View file

@ -24,6 +24,7 @@
#include "facade/memcache_parser.h"
#include "facade/redis_parser.h"
#include "facade/service_interface.h"
#include "facade/socket_utils.h"
#include "glog/logging.h"
#include "io/file.h"
#include "util/fibers/fibers.h"
@ -740,7 +741,8 @@ void Connection::HandleRequests() {
uint8_t buf[2];
auto read_sz = socket_->Read(io::MutableBytes(buf));
if (!read_sz || *read_sz < sizeof(buf)) {
VLOG(1) << "Error reading from peer " << remote_ep << " " << read_sz.error().message();
VLOG(1) << "Error reading from peer " << remote_ep << " " << read_sz.error().message()
<< ", socket state: " + dfly::GetSocketInfo(socket_->native_handle());
return;
}
if (buf[0] != 0x16 || buf[1] != 0x03) {
@ -761,7 +763,8 @@ void Connection::HandleRequests() {
FiberSocketBase::AcceptResult aresult = socket_->Accept();
if (!aresult) {
LOG(INFO) << "Error handshaking " << aresult.error().message();
LOG(INFO) << "Error handshaking " << aresult.error().message()
<< ", socket state: " + dfly::GetSocketInfo(socket_->native_handle());
return;
}
is_tls_ = 1;
@ -1110,7 +1113,8 @@ void Connection::ConnectionFlow() {
if (ec && !FiberSocketBase::IsConnClosed(ec)) {
string conn_info = service_->GetContextInfo(cc_.get()).Format();
LOG(WARNING) << "Socket error for connection " << conn_info << " " << GetName()
<< " during phase " << kPhaseName[phase_] << " : " << ec << " " << ec.message();
<< " during phase " << kPhaseName[phase_] << " : " << ec << " " << ec.message()
<< ", socket state: " + dfly::GetSocketInfo(socket_->native_handle());
}
}

View file

@ -0,0 +1,65 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#include "socket_utils.h"
#ifdef __linux__
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/stat.h>
#include <unistd.h>
#include "absl/strings/str_cat.h"
#include "io/proc_reader.h"
#endif
namespace dfly {
// Returns information about the TCP socket state by its descriptor
std::string GetSocketInfo(int socket_fd) {
if (socket_fd < 0)
return "invalid socket";
#ifdef __linux__
struct stat sock_stat;
if (fstat(socket_fd, &sock_stat) != 0) {
return "could not stat socket";
}
auto tcp_info = io::ReadTcpInfo(sock_stat.st_ino);
if (!tcp_info) {
auto tcp6_info = io::ReadTcp6Info(sock_stat.st_ino);
if (!tcp6_info) {
return "socket not found in /proc/net/tcp or /proc/net/tcp6";
}
tcp_info = std::move(tcp6_info);
}
std::string state_str = io::TcpStateToString(tcp_info->state);
if (tcp_info->is_ipv6) {
char local_ip[INET6_ADDRSTRLEN], remote_ip[INET6_ADDRSTRLEN];
inet_ntop(AF_INET6, &tcp_info->local_addr6, local_ip, sizeof(local_ip));
inet_ntop(AF_INET6, &tcp_info->remote_addr6, remote_ip, sizeof(remote_ip));
return absl::StrCat("State: ", state_str, ", Local: [", local_ip, "]:", tcp_info->local_port,
", Remote: [", remote_ip, "]:", tcp_info->remote_port,
", Inode: ", tcp_info->inode);
} else {
char local_ip[INET_ADDRSTRLEN], remote_ip[INET_ADDRSTRLEN];
struct in_addr addr;
addr.s_addr = htonl(tcp_info->local_addr);
inet_ntop(AF_INET, &addr, local_ip, sizeof(local_ip));
addr.s_addr = htonl(tcp_info->remote_addr);
inet_ntop(AF_INET, &addr, remote_ip, sizeof(remote_ip));
return absl::StrCat("State: ", state_str, ", Local: ", local_ip, ":", tcp_info->local_port,
", Remote: ", remote_ip, ":", tcp_info->remote_port,
", Inode: ", tcp_info->inode);
}
#else
return "socket info not available on this platform";
#endif
}
} // namespace dfly

14
src/facade/socket_utils.h Normal file
View file

@ -0,0 +1,14 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <string>
namespace dfly {
// Returns information about the TCP socket state by its descriptor
std::string GetSocketInfo(int socket_fd);
} // namespace dfly

View file

@ -10,6 +10,7 @@
#include "base/flags.h"
#include "base/logging.h"
#include "cluster_utility.h"
#include "facade/socket_utils.h"
#include "server/error.h"
#include "server/journal/executor.h"
#include "server/journal/tx_executor.h"
@ -67,6 +68,11 @@ class ClusterShardMigration {
auto tx_data = tx_reader.NextTxData(&reader, cntx);
if (!tx_data) {
if (cntx->GetError()) {
LOG(WARNING) << "Error reading from migration socket for shard " << source_shard_id_
<< ": " << cntx->GetError().Format()
<< ", socket state: " << GetSocketInfo(source->native_handle());
}
break;
}
@ -105,7 +111,12 @@ class ClusterShardMigration {
if (socket_ != nullptr) {
return socket_->proactor()->Await([s = socket_]() {
if (s->IsOpen()) {
return s->Shutdown(SHUT_RDWR); // Does not Close(), only forbids further I/O.
auto ec = s->Shutdown(SHUT_RDWR); // Does not Close(), only forbids further I/O.
if (ec) {
LOG(WARNING) << "Error shutting down socket for shard migration: " << ec.message()
<< ", socket state: " << GetSocketInfo(s->native_handle());
}
return ec;
}
return std::error_code();
});

View file

@ -12,6 +12,7 @@
#include "base/logging.h"
#include "cluster_family.h"
#include "cluster_utility.h"
#include "facade/socket_utils.h"
#include "server/db_slice.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
@ -51,6 +52,9 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
VLOG(1) << "Connecting to source node_id " << node_id << " shard_id " << shard_id;
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
if (auto ec = ConnectAndAuth(timeout, &exec_st_); ec) {
LOG(WARNING) << "Couldn't connect to source node_id " << node_id << " shard_id " << shard_id
<< ": " << ec.message()
<< ", socket state: " + GetSocketInfo(Sock()->native_handle());
exec_st_.ReportError(GenericError(ec, "Couldn't connect to source."));
return;
}
@ -203,7 +207,10 @@ void OutgoingMigration::SyncFb() {
VLOG(1) << "Connecting to target node";
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
if (auto ec = ConnectAndAuth(timeout, &exec_st_); ec) {
LOG(WARNING) << "Can't connect to target node";
std::string socket_info;
LOG(WARNING) << "Can't connect to target node " << server().Description()
<< " for migration: " << ec.message()
<< ", socket state: " + GetSocketInfo(Sock()->native_handle());
exec_st_.ReportError(GenericError(ec, "Couldn't connect to source."));
continue;
}
@ -216,7 +223,10 @@ void OutgoingMigration::SyncFb() {
}
if (auto ec = SendCommandAndReadResponse(cmd); ec) {
LOG(WARNING) << "Can't connect to target node";
std::string socket_info;
LOG(WARNING) << "Could not send INIT command to " << server().Description()
<< " for migration: " << ec.message()
<< ", socket state: " + GetSocketInfo(Sock()->native_handle());
exec_st_.ReportError(GenericError(ec, "Could not send INIT command."));
continue;
}
@ -296,8 +306,9 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
}
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
if (auto ec = ConnectAndAuth(timeout, &exec_st_); ec) {
LOG(WARNING) << "Couldn't connect " << cf_->MyID() << " : " << migration_info_.node_info.id
<< " attempt " << attempt;
LOG(WARNING) << "Couldn't connect to " << cf_->MyID() << " : " << migration_info_.node_info.id
<< " attempt " << attempt << ": " << ec.message()
<< ", socket state: " + GetSocketInfo(Sock()->native_handle());
return false;
}
}
@ -334,7 +345,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
VLOG(1) << "send " << cmd;
if (auto err = SendCommand(cmd); err) {
LOG(WARNING) << "Error during sending DFLYMIGRATE ACK: " << err.message();
LOG(WARNING) << "Error during sending DFLYMIGRATE ACK to " << server().Description() << ": "
<< err.message() << ", socket state: " + GetSocketInfo(Sock()->native_handle());
return false;
}
@ -351,7 +363,9 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
}
if (auto resp = ReadRespReply(absl::ToInt64Milliseconds(passed - timeout)); !resp) {
LOG(WARNING) << resp.error();
LOG(WARNING) << "Error reading response to ACK command from " << server().Description()
<< ": " << resp.error()
<< ", socket state: " + GetSocketInfo(Sock()->native_handle());
return false;
}
@ -382,6 +396,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
void OutgoingMigration::Start() {
VLOG(1) << "Resolving host DNS for outgoing migration";
if (error_code ec = ResolveHostDns(); ec) {
LOG(WARNING) << "Could not resolve host DNS for outgoing migration to "
<< server().Description() << ": " << ec.message();
exec_st_.ReportError(GenericError(ec, "Could not resolve host dns."));
return;
}

View file

@ -363,7 +363,16 @@ GenericError ExecutionState::GetError() const {
}
void ExecutionState::ReportCancelError() {
ReportError(std::make_error_code(errc::operation_canceled), "ExecutionState cancelled");
std::string cancel_reason = "ExecutionState cancelled";
// Add additional information about the reason for cancellation, if possible
// Possible to extract from system errors or context
std::error_code sys_err = std::error_code(errno, std::system_category());
if (sys_err && sys_err != std::errc::operation_canceled) {
absl::StrAppend(&cancel_reason, " due to system error: ", sys_err.message());
}
ReportError(std::make_error_code(errc::operation_canceled), cancel_reason);
}
void ExecutionState::Reset(ErrHandler handler) {

View file

@ -22,6 +22,7 @@ extern "C" {
#include "base/logging.h"
#include "facade/dragonfly_connection.h"
#include "facade/redis_parser.h"
#include "facade/socket_utils.h"
#include "server/error.h"
#include "server/journal/executor.h"
#include "server/journal/serializer.h"
@ -234,6 +235,8 @@ void ProtocolClient::CloseSocket() {
}
void ProtocolClient::DefaultErrorHandler(const GenericError& err) {
LOG(WARNING) << "Socket error: " << err.Format() << " in " << server_context_.Description()
<< ", socket info: " << GetSocketInfo(sock_->native_handle());
CloseSocket();
}

View file

@ -24,6 +24,7 @@ extern "C" {
#include "base/logging.h"
#include "facade/redis_parser.h"
#include "facade/socket_utils.h"
#include "server/error.h"
#include "server/journal/executor.h"
#include "server/journal/serializer.h"
@ -206,7 +207,8 @@ void Replica::MainReplicationFb(std::optional<LastMasterSyncData> last_master_sy
ec = ResolveHostDns();
if (ec) {
LOG(ERROR) << "Error resolving dns to " << server().host << " " << ec;
LOG(ERROR) << "Error resolving dns to " << server().host << " (phase: " << GetCurrentPhase()
<< "): " << ec;
continue;
}
@ -214,7 +216,9 @@ void Replica::MainReplicationFb(std::optional<LastMasterSyncData> last_master_sy
reconnect_count_++;
ec = ConnectAndAuth(absl::GetFlag(FLAGS_master_reconnect_timeout_ms) * 1ms, &exec_st_);
if (ec) {
LOG(WARNING) << "Error connecting to " << server().Description() << " " << ec;
LOG(WARNING) << "Error connecting to " << server().Description()
<< " (phase: " << GetCurrentPhase() << "): " << ec
<< ", reason: " << ec.message();
continue;
}
VLOG(1) << "Replica socket connected";
@ -226,8 +230,9 @@ void Replica::MainReplicationFb(std::optional<LastMasterSyncData> last_master_sy
if ((state_mask_.load() & R_GREETED) == 0) {
ec = Greet();
if (ec) {
LOG(INFO) << "Error greeting " << server().Description() << " " << ec << " "
<< ec.message();
LOG(WARNING) << "Error greeting " << server().Description()
<< " (phase: " << GetCurrentPhase() << "): " << ec << " " << ec.message()
<< ", socket state: " + GetSocketInfo(Sock()->native_handle());
state_mask_.fetch_and(R_ENABLED);
continue;
}
@ -243,8 +248,9 @@ void Replica::MainReplicationFb(std::optional<LastMasterSyncData> last_master_sy
ec = InitiatePSync();
if (ec) {
LOG(WARNING) << "Error syncing with " << server().Description() << " " << ec << " "
<< ec.message();
LOG(WARNING) << "Error syncing with " << server().Description()
<< " (phase: " << GetCurrentPhase() << "): " << ec << " " << ec.message()
<< ", socket state: " + GetSocketInfo(Sock()->native_handle());
state_mask_.fetch_and(R_ENABLED); // reset all flags besides R_ENABLED
continue;
}
@ -262,8 +268,9 @@ void Replica::MainReplicationFb(std::optional<LastMasterSyncData> last_master_sy
auto state = state_mask_.fetch_and(R_ENABLED);
if (state & R_ENABLED) { // replication was not stopped.
LOG(WARNING) << "Error stable sync with " << server().Description() << " " << ec << " "
<< ec.message();
LOG(WARNING) << "Error stable sync with " << server().Description()
<< " (phase: " << GetCurrentPhase() << "): " << ec << " " << ec.message()
<< ", socket state: " + GetSocketInfo(Sock()->native_handle());
}
}
@ -642,6 +649,9 @@ error_code Replica::ConsumeRedisStream() {
auto response = ReadRespReply(&io_buf, /*copy_msg=*/false);
if (!response.has_value()) {
VLOG(1) << "ConsumeRedisStream finished";
LOG(ERROR) << "Error in Redis Stream at phase " << GetCurrentPhase() << " with "
<< server().Description() << ", error: " << response.error()
<< ", socket state: " + GetSocketInfo(Sock()->native_handle());
exec_st_.ReportError(response.error());
acks_fb_.JoinIfNeeded();
return response.error();
@ -679,6 +689,11 @@ error_code Replica::ConsumeDflyStream() {
auto err_handler = [this](const auto& ge) {
// Make sure the flows are not in a state transition
lock_guard lk{flows_op_mu_};
LOG(ERROR) << "DflyStream error in phase " << GetCurrentPhase() << " with "
<< server().Description() << ", error: " << ge.Format()
<< ", socket state: " + GetSocketInfo(Sock()->native_handle());
DefaultErrorHandler(ge);
for (auto& flow : shard_flows_) {
flow->Cancel();
@ -1171,6 +1186,23 @@ std::string Replica::GetSyncId() const {
return master_context_.dfly_session_id;
}
std::string Replica::GetCurrentPhase() const {
uint32_t state = state_mask_.load();
if (!(state & R_ENABLED))
return "DISABLED";
if (!(state & R_TCP_CONNECTED))
return "TCP_CONNECTING";
if (!(state & R_GREETED))
return "GREETING";
if (!(state & R_SYNC_OK))
return "INITIAL_SYNC";
if (state & R_SYNCING)
return "FULL_SYNC_IN_PROGRESS";
return "STABLE_SYNC";
}
uint32_t DflyShardReplica::FlowId() const {
return flow_id_;
}

View file

@ -140,6 +140,9 @@ class Replica : ProtocolClient {
std::vector<uint64_t> GetReplicaOffset() const;
std::string GetSyncId() const;
// Get the current replication phase based on state_mask_
std::string GetCurrentPhase() const;
private:
util::fb2::ProactorBase* proactor_ = nullptr;
Service& service_;

View file

@ -11,6 +11,7 @@
#include "base/gtest.h"
#include "base/logging.h"
#include "facade/facade_test.h"
#include "facade/socket_utils.h"
#include "server/test_utils.h"
using namespace testing;
@ -26,6 +27,82 @@ class ServerFamilyTest : public BaseFamilyTest {
protected:
};
#ifdef __linux__
TEST_F(ServerFamilyTest, ReadTcpInfo) {
// Create a TCP socket
int sockfd = socket(AF_INET, SOCK_STREAM, 0);
ASSERT_GT(sockfd, 0) << "Failed to create socket";
// We'll create a socket in LISTEN state
struct sockaddr_in server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin_family = AF_INET;
server_addr.sin_addr.s_addr = INADDR_ANY;
server_addr.sin_port = 0; // Let the system choose a free port
// Bind to the port
ASSERT_EQ(bind(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)), 0)
<< "Failed to bind socket: " << strerror(errno);
// Start listening
ASSERT_EQ(listen(sockfd, 1), 0) << "Failed to listen on socket: " << strerror(errno);
// Get socket info
std::string socket_info = GetSocketInfo(sockfd);
std::cout << "Socket info for valid socket: " << socket_info << std::endl;
EXPECT_FALSE(socket_info.empty()) << "Socket info should not be empty";
// The socket info should contain some recognizable patterns
// For a listening socket, it should contain information about the local address
EXPECT_NE(socket_info.find("State: LISTEN"), std::string::npos)
<< "Socket info doesn't contain expected local address pattern";
// Close the socket
close(sockfd);
// Test invalid socket
socket_info = GetSocketInfo(-1);
EXPECT_EQ(socket_info, "invalid socket");
}
TEST_F(ServerFamilyTest, GetTcpSocketInfoIPv6) {
// Create an IPv6 TCP socket
int sockfd = socket(AF_INET6, SOCK_STREAM, 0);
ASSERT_GT(sockfd, 0) << "Failed to create IPv6 socket";
// We'll create a socket in LISTEN state
struct sockaddr_in6 server_addr;
memset(&server_addr, 0, sizeof(server_addr));
server_addr.sin6_family = AF_INET6;
server_addr.sin6_addr = in6addr_any;
server_addr.sin6_port = 0; // Let the system choose a free port
// Bind to the port
ASSERT_EQ(bind(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)), 0)
<< "Failed to bind IPv6 socket: " << strerror(errno);
// Start listening
ASSERT_EQ(listen(sockfd, 1), 0) << "Failed to listen on IPv6 socket: " << strerror(errno);
// Get socket info
std::string socket_info = GetSocketInfo(sockfd);
std::cout << "Socket info for valid IPv6 socket: " << socket_info << std::endl;
EXPECT_FALSE(socket_info.empty()) << "IPv6 socket info should not be empty";
// The socket info should contain some recognizable patterns
// For a listening IPv6 socket, it should contain information about the local address
EXPECT_NE(socket_info.find("State: LISTEN"), std::string::npos)
<< "IPv6 socket info doesn't contain expected LISTEN state";
// If IPv6 support works correctly, the socket info should indicate an IPv6 address format
EXPECT_NE(socket_info.find("Local: ["), std::string::npos)
<< "IPv6 socket info doesn't use IPv6 address format";
// Close the socket
close(sockfd);
}
#endif
TEST_F(ServerFamilyTest, SlowLogArgsCountTruncation) {
auto resp = Run({"config", "set", "slowlog_max_len", "3"});
EXPECT_THAT(resp.GetString(), "OK");