fix: do not migrate during connection close (#2570)

* fix: do not migrate during connection close

Fixes #2569
Before the change we had a corner case where Dragonfly would call
OnPreMigrateThread but would not call CancelOnErrorCb because OnBreakCb has already been called
(it resets break_cb_engaged_)

On the other hand in OnPostMigrateThread we called RegisterOnErrorCb if breaker_cb_ which resulted in double registration.
This change simplifies the logic by removing break_cb_engaged_ flag since CancelOnErrorCb is safe to call if nothing is registered.
Moreover, we now skip Migrate flow if a socket is being closed.

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-02-12 16:03:34 +02:00 committed by GitHub
parent 6d11f86091
commit 4000adf57f
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 46 additions and 33 deletions

2
helio

@ -1 +1 @@
Subproject commit d1d06c486f2a6f79e1eb6249c921cc6ecf06d30e
Subproject commit 432ffbae89d909f3fbbc25fdeb77689c3f09ea75

View file

@ -502,20 +502,15 @@ void Connection::OnShutdown() {
}
void Connection::OnPreMigrateThread() {
// If we migrating to another io_uring we should cancel any pending requests we have.
if (break_cb_engaged_) {
socket_->CancelOnErrorCb();
break_cb_engaged_ = false;
}
CHECK(!cc_->conn_closing);
socket_->CancelOnErrorCb();
}
void Connection::OnPostMigrateThread() {
// Once we migrated, we should rearm OnBreakCb callback.
if (breaker_cb_) {
DCHECK(!break_cb_engaged_);
socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); });
break_cb_engaged_ = true;
}
// Update tl variables
@ -594,14 +589,11 @@ void Connection::HandleRequests() {
cc_.reset(service_->CreateContext(peer, this));
if (breaker_cb_) {
socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); });
break_cb_engaged_ = true;
}
ConnectionFlow(peer);
if (break_cb_engaged_) {
socket_->CancelOnErrorCb();
}
socket_->CancelOnErrorCb(); // noop if nothing is registered.
cc_.reset();
}
@ -975,14 +967,13 @@ void Connection::OnBreakCb(int32_t mask) {
<< cc_->reply_builder()->IsSendActive() << " " << cc_->reply_builder()->GetError();
cc_->conn_closing = true;
break_cb_engaged_ = false; // do not attempt to cancel it.
breaker_cb_(mask);
evc_.notify(); // Notify dispatch fiber.
}
void Connection::HandleMigrateRequest() {
if (!migration_request_) {
if (cc_->conn_closing || !migration_request_) {
return;
}
@ -996,6 +987,7 @@ void Connection::HandleMigrateRequest() {
// We don't support migrating with subscriptions as it would require moving thread local
// handles. We can't check above, as the queue might have contained a subscribe request.
if (cc_->subscriptions == 0) {
stats_->num_migrations++;
migration_request_ = nullptr;
DecreaseStatsOnClose();

View file

@ -401,7 +401,6 @@ class Connection : public util::Connection {
std::string name_;
unsigned parser_error_ = 0;
bool break_cb_engaged_ = false;
BreakerCb breaker_cb_;
std::unique_ptr<Shutdown> shutdown_cb_;

View file

@ -20,7 +20,7 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats);
ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
// To break this code deliberately if we add/remove a field to this struct.
static_assert(kSizeConnStats == 104u);
static_assert(kSizeConnStats == 112u);
ADD(read_buf_capacity);
ADD(dispatch_queue_entries);
@ -36,6 +36,7 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) {
ADD(num_conns);
ADD(num_replicas);
ADD(num_blocked_clients);
ADD(num_migrations);
return *this;
}

View file

@ -61,7 +61,7 @@ struct ConnectionStats {
uint32_t num_conns = 0;
uint32_t num_replicas = 0;
uint32_t num_blocked_clients = 0;
uint64_t num_migrations = 0;
ConnectionStats& operator+=(const ConnectionStats& o);
};

View file

@ -1441,6 +1441,9 @@ void DbSlice::TrackKeys(const facade::Connection::WeakRef& conn, const ArgSlice&
}
void DbSlice::SendInvalidationTrackingMessage(std::string_view key) {
if (client_tracking_map_.empty())
return;
auto it = client_tracking_map_.find(key);
if (it != client_tracking_map_.end()) {
// notify all the clients.

View file

@ -1859,6 +1859,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("total_pipelined_commands", conn_stats.pipelined_cmd_cnt);
append("pipelined_latency_usec", conn_stats.pipelined_cmd_latency);
append("total_net_input_bytes", conn_stats.io_read_bytes);
append("connection_migrations", conn_stats.num_migrations);
append("total_net_output_bytes", reply_stats.io_write_bytes);
append("instantaneous_input_kbps", -1);
append("instantaneous_output_kbps", -1);

View file

@ -3,11 +3,15 @@ import async_timeout
from redis import asyncio as aioredis
import time
import json
import logging
import pytest
import random
import itertools
import random
import string
from .instance import DflyInstance
from . import dfly_args, dfly_multi_test_args
DJANGO_CACHEOPS_SCRIPT = """
@ -310,3 +314,28 @@ async def test_one_interpreter(async_client: aioredis.Redis):
# At least some connection was seen blocked
# Flaky: release build is too fast and never blocks
# assert max_blocked > 0
"""
Tests migrate/close interaction for the connection
Reproduces #2569
"""
@dfly_args({"proactor_threads": "4", "pipeline_squash": 0})
async def test_migrate_close_connection(async_client: aioredis.Redis, df_server: DflyInstance):
sha = await async_client.script_load("return redis.call('GET', KEYS[1])")
async def run():
reader, writer = await asyncio.open_connection("localhost", df_server.port)
# write a EVALSHA that will ask for migration (75% it's on the wrong shard)
writer.write((f"EVALSHA {sha} 1 a\r\n").encode())
await writer.drain()
# disconnect the client connection
writer.close()
await writer.wait_closed()
tasks = [asyncio.create_task(run()) for _ in range(50)]
await asyncio.gather(*tasks)

View file

@ -5,7 +5,6 @@ import aiohttp
import logging
from dataclasses import dataclass
from typing import Dict, Optional, List, Union
import os
import re
import psutil
import itertools
@ -163,22 +162,13 @@ class DflyInstance:
if proc is None:
return
# if we have log files, it means that we started a process.
# if it died before we could stop it, we should raise an exception
if self.log_files:
exitcode = proc.poll()
if exitcode is not None:
if exitcode != 0:
raise Exception(f"Process exited with code {exitcode}")
return
logging.debug(f"Stopping instance on {self._port}")
try:
if kill:
proc.kill()
else:
proc.terminate()
proc.communicate(timeout=30)
proc.communicate(timeout=15)
except subprocess.TimeoutExpired:
# We need to send SIGUSR1 to DF such that it prints the stacktrace
proc.send_signal(signal.SIGUSR1)
@ -203,11 +193,9 @@ class DflyInstance:
self._port = None
all_args = self.format_args(self.args)
arg_str = " ".join(all_args)
bin_path = os.path.realpath(self.params.path)
logging.debug(f"Starting {bin_path} with arguments: {arg_str}")
logging.debug(f"Starting instance with arguments {all_args} from {self.params.path}")
run_cmd = [bin_path, *all_args]
run_cmd = [self.params.path, *all_args]
if self.params.gdb:
run_cmd = ["gdb", "--ex", "r", "--args"] + run_cmd