diff --git a/helio b/helio index d1d06c486..432ffbae8 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit d1d06c486f2a6f79e1eb6249c921cc6ecf06d30e +Subproject commit 432ffbae89d909f3fbbc25fdeb77689c3f09ea75 diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 1817c5251..ce80188a0 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -502,20 +502,15 @@ void Connection::OnShutdown() { } void Connection::OnPreMigrateThread() { - // If we migrating to another io_uring we should cancel any pending requests we have. - if (break_cb_engaged_) { - socket_->CancelOnErrorCb(); - break_cb_engaged_ = false; - } + CHECK(!cc_->conn_closing); + + socket_->CancelOnErrorCb(); } void Connection::OnPostMigrateThread() { // Once we migrated, we should rearm OnBreakCb callback. if (breaker_cb_) { - DCHECK(!break_cb_engaged_); - socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); }); - break_cb_engaged_ = true; } // Update tl variables @@ -594,14 +589,11 @@ void Connection::HandleRequests() { cc_.reset(service_->CreateContext(peer, this)); if (breaker_cb_) { socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); }); - break_cb_engaged_ = true; } ConnectionFlow(peer); - if (break_cb_engaged_) { - socket_->CancelOnErrorCb(); - } + socket_->CancelOnErrorCb(); // noop if nothing is registered. cc_.reset(); } @@ -975,14 +967,13 @@ void Connection::OnBreakCb(int32_t mask) { << cc_->reply_builder()->IsSendActive() << " " << cc_->reply_builder()->GetError(); cc_->conn_closing = true; - break_cb_engaged_ = false; // do not attempt to cancel it. breaker_cb_(mask); evc_.notify(); // Notify dispatch fiber. } void Connection::HandleMigrateRequest() { - if (!migration_request_) { + if (cc_->conn_closing || !migration_request_) { return; } @@ -996,6 +987,7 @@ void Connection::HandleMigrateRequest() { // We don't support migrating with subscriptions as it would require moving thread local // handles. We can't check above, as the queue might have contained a subscribe request. if (cc_->subscriptions == 0) { + stats_->num_migrations++; migration_request_ = nullptr; DecreaseStatsOnClose(); diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 3122b5cc7..e730a91a7 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -401,7 +401,6 @@ class Connection : public util::Connection { std::string name_; unsigned parser_error_ = 0; - bool break_cb_engaged_ = false; BreakerCb breaker_cb_; std::unique_ptr shutdown_cb_; diff --git a/src/facade/facade.cc b/src/facade/facade.cc index 758179c32..c86fd50b1 100644 --- a/src/facade/facade.cc +++ b/src/facade/facade.cc @@ -20,7 +20,7 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats); ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { // To break this code deliberately if we add/remove a field to this struct. - static_assert(kSizeConnStats == 104u); + static_assert(kSizeConnStats == 112u); ADD(read_buf_capacity); ADD(dispatch_queue_entries); @@ -36,6 +36,7 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { ADD(num_conns); ADD(num_replicas); ADD(num_blocked_clients); + ADD(num_migrations); return *this; } diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h index 4846609d9..2cc876c3c 100644 --- a/src/facade/facade_types.h +++ b/src/facade/facade_types.h @@ -61,7 +61,7 @@ struct ConnectionStats { uint32_t num_conns = 0; uint32_t num_replicas = 0; uint32_t num_blocked_clients = 0; - + uint64_t num_migrations = 0; ConnectionStats& operator+=(const ConnectionStats& o); }; diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 5109574b4..4dcb56019 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -1441,6 +1441,9 @@ void DbSlice::TrackKeys(const facade::Connection::WeakRef& conn, const ArgSlice& } void DbSlice::SendInvalidationTrackingMessage(std::string_view key) { + if (client_tracking_map_.empty()) + return; + auto it = client_tracking_map_.find(key); if (it != client_tracking_map_.end()) { // notify all the clients. diff --git a/src/server/server_family.cc b/src/server/server_family.cc index b3343bf11..1857a1fad 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1859,6 +1859,7 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) { append("total_pipelined_commands", conn_stats.pipelined_cmd_cnt); append("pipelined_latency_usec", conn_stats.pipelined_cmd_latency); append("total_net_input_bytes", conn_stats.io_read_bytes); + append("connection_migrations", conn_stats.num_migrations); append("total_net_output_bytes", reply_stats.io_write_bytes); append("instantaneous_input_kbps", -1); append("instantaneous_output_kbps", -1); diff --git a/tests/dragonfly/eval_test.py b/tests/dragonfly/eval_test.py index 34a69e3a4..8e0b4463f 100644 --- a/tests/dragonfly/eval_test.py +++ b/tests/dragonfly/eval_test.py @@ -3,11 +3,15 @@ import async_timeout from redis import asyncio as aioredis import time import json +import logging import pytest import random import itertools import random import string + +from .instance import DflyInstance + from . import dfly_args, dfly_multi_test_args DJANGO_CACHEOPS_SCRIPT = """ @@ -310,3 +314,28 @@ async def test_one_interpreter(async_client: aioredis.Redis): # At least some connection was seen blocked # Flaky: release build is too fast and never blocks # assert max_blocked > 0 + + +""" +Tests migrate/close interaction for the connection +Reproduces #2569 +""" + + +@dfly_args({"proactor_threads": "4", "pipeline_squash": 0}) +async def test_migrate_close_connection(async_client: aioredis.Redis, df_server: DflyInstance): + sha = await async_client.script_load("return redis.call('GET', KEYS[1])") + + async def run(): + reader, writer = await asyncio.open_connection("localhost", df_server.port) + + # write a EVALSHA that will ask for migration (75% it's on the wrong shard) + writer.write((f"EVALSHA {sha} 1 a\r\n").encode()) + await writer.drain() + + # disconnect the client connection + writer.close() + await writer.wait_closed() + + tasks = [asyncio.create_task(run()) for _ in range(50)] + await asyncio.gather(*tasks) diff --git a/tests/dragonfly/instance.py b/tests/dragonfly/instance.py index a99c98c11..9c97235ed 100644 --- a/tests/dragonfly/instance.py +++ b/tests/dragonfly/instance.py @@ -5,7 +5,6 @@ import aiohttp import logging from dataclasses import dataclass from typing import Dict, Optional, List, Union -import os import re import psutil import itertools @@ -163,22 +162,13 @@ class DflyInstance: if proc is None: return - # if we have log files, it means that we started a process. - # if it died before we could stop it, we should raise an exception - if self.log_files: - exitcode = proc.poll() - if exitcode is not None: - if exitcode != 0: - raise Exception(f"Process exited with code {exitcode}") - return - logging.debug(f"Stopping instance on {self._port}") try: if kill: proc.kill() else: proc.terminate() - proc.communicate(timeout=30) + proc.communicate(timeout=15) except subprocess.TimeoutExpired: # We need to send SIGUSR1 to DF such that it prints the stacktrace proc.send_signal(signal.SIGUSR1) @@ -203,11 +193,9 @@ class DflyInstance: self._port = None all_args = self.format_args(self.args) - arg_str = " ".join(all_args) - bin_path = os.path.realpath(self.params.path) - logging.debug(f"Starting {bin_path} with arguments: {arg_str}") + logging.debug(f"Starting instance with arguments {all_args} from {self.params.path}") - run_cmd = [bin_path, *all_args] + run_cmd = [self.params.path, *all_args] if self.params.gdb: run_cmd = ["gdb", "--ex", "r", "--args"] + run_cmd