Fix socket code in replication (#1622)

This commit is contained in:
Roy Jacobson 2023-08-03 17:01:56 +02:00 committed by GitHub
parent da17a39410
commit 8040bed10f
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 14 additions and 3 deletions

View file

@ -195,8 +195,11 @@ ProtocolClient::ProtocolClient(ServerContext context) : server_context_(std::mov
}
ProtocolClient::~ProtocolClient() {
// FIXME: We should close the socket explictly outside of the destructor. This currently
// breaks test_cancel_replication_immediately.
if (sock_) {
auto ec = sock_->Close();
std::error_code ec;
sock_->proactor()->Await([this, &ec]() { ec = sock_->Close(); });
LOG_IF(ERROR, ec) << "Error closing socket " << ec;
}
#ifdef DFLY_USE_SSL
@ -229,14 +232,19 @@ error_code ProtocolClient::ConnectAndAuth(std::chrono::milliseconds connect_time
// run we must not create a new socket. sock_mu_ syncs between the two
// functions.
if (!cntx->IsCancelled()) {
if (sock_) {
LOG_IF(WARNING, sock_->Close()) << "Error closing socket";
sock_.reset(nullptr);
}
if (ssl_ctx_) {
auto tls_sock = std::make_unique<tls::TlsSocket>(mythread->CreateSocket());
tls_sock->InitSSL(ssl_ctx_);
sock_.reset(tls_sock.release());
sock_ = std::move(tls_sock);
} else {
sock_.reset(mythread->CreateSocket());
}
serializer_.reset(new ReqSerializer(sock_.get()));
serializer_ = std::make_unique<ReqSerializer>(sock_.get());
} else {
return cntx->GetError();
}

View file

@ -2052,11 +2052,14 @@ void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) {
if (replica_ == new_replica) {
if (ec) {
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
replica_->Stop();
replica_.reset();
}
bool is_master = !replica_;
pool.AwaitFiberOnAll(
[&](util::ProactorBase* pb) { ServerState::tlocal()->is_master = is_master; });
} else {
new_replica->Stop();
}
}