chore: fix shutdown sequence in Dragonfly server (#4168)

1. Better logging in regtests
2. Release resources in dfly_main in more controlled manner.
3. Switch to ignoring signals when unregister signal handlers during the shutdown.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-11-24 10:35:00 +02:00 committed by GitHub
parent cfca3e798d
commit 91caa940b9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 54 additions and 42 deletions

2
helio

@ -1 +1 @@
Subproject commit 944be564ecd44865a9a057c09b5d4bbf7f6db772 Subproject commit ff9b6cd35bf082a9d48cf0904b0e8557cf31b6d2

View file

@ -762,44 +762,42 @@ Usage: dragonfly [FLAGS]
fb2::SetDefaultStackResource(&fb2::std_malloc_resource, kFiberDefaultStackSize); fb2::SetDefaultStackResource(&fb2::std_malloc_resource, kFiberDefaultStackSize);
unique_ptr<util::ProactorPool> pool; {
unique_ptr<util::ProactorPool> pool;
#ifdef __linux__ #ifdef __linux__
base::sys::KernelVersion kver; base::sys::KernelVersion kver;
base::sys::GetKernelVersion(&kver); base::sys::GetKernelVersion(&kver);
CHECK_LT(kver.major, 99u); CHECK_LT(kver.major, 99u);
dfly::kernel_version = kver.kernel * 100 + kver.major; dfly::kernel_version = kver.kernel * 100 + kver.major;
bool use_epoll = ShouldUseEpollAPI(kver); bool use_epoll = ShouldUseEpollAPI(kver);
if (use_epoll) { if (use_epoll) {
pool.reset(fb2::Pool::Epoll(max_available_threads)); pool.reset(fb2::Pool::Epoll(max_available_threads));
} else { } else {
pool.reset(fb2::Pool::IOUring(1024, max_available_threads)); // 1024 - iouring queue size. pool.reset(fb2::Pool::IOUring(1024, max_available_threads)); // 1024 - iouring queue size.
} }
#else #else
pool.reset(fb2::Pool::Epoll(max_available_threads)); pool.reset(fb2::Pool::Epoll(max_available_threads));
#endif #endif
pool->Run(); pool->Run();
SetupAllocationTracker(pool.get()); SetupAllocationTracker(pool.get());
AcceptServer acceptor(pool.get(), &fb2::std_malloc_resource, true); AcceptServer acceptor(pool.get(), &fb2::std_malloc_resource, true);
acceptor.set_back_log(absl::GetFlag(FLAGS_tcp_backlog)); acceptor.set_back_log(absl::GetFlag(FLAGS_tcp_backlog));
dfly::RunEngine(pool.get(), &acceptor); dfly::RunEngine(pool.get(), &acceptor);
pool->Stop(); pool->Stop();
if (!pidfile_path.empty()) { if (!pidfile_path.empty()) {
unlink(pidfile_path.c_str()); unlink(pidfile_path.c_str());
}
} }
// Returns memory to OS.
// This is a workaround for a bug in mi_malloc that may cause a crash on exit.
mi_collect(true);
return 0; return 0;
} }

View file

@ -771,7 +771,7 @@ Service::Service(ProactorPool* pp)
Service::~Service() { Service::~Service() {
#ifdef PRINT_STACKTRACES_ON_SIGNAL #ifdef PRINT_STACKTRACES_ON_SIGNAL
ProactorBase::ClearSignal({SIGUSR1}); ProactorBase::ClearSignal({SIGUSR1}, true);
#endif #endif
delete shard_set; delete shard_set;

View file

@ -131,6 +131,7 @@ async def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory:
path=path, path=path,
cwd=tmp_dir, cwd=tmp_dir,
gdb=request.config.getoption("--gdb"), gdb=request.config.getoption("--gdb"),
direct_output=request.config.getoption("--direct-out"),
buffered_out=request.config.getoption("--buffered-output"), buffered_out=request.config.getoption("--buffered-output"),
args=parse_args(request.config.getoption("--df")), args=parse_args(request.config.getoption("--df")),
existing_port=int(existing) if existing else None, existing_port=int(existing) if existing else None,
@ -259,6 +260,13 @@ def pytest_addoption(parser):
default=None, default=None,
help="Provide a port to the existing memcached process for the test", help="Provide a port to the existing memcached process for the test",
) )
parser.addoption(
"--direct-out",
action="store_true",
default=False,
help="If true, does not post process dragonfly output",
)
parser.addoption("--repeat", action="store", help="Number of times to repeat each test") parser.addoption("--repeat", action="store", help="Number of times to repeat each test")

View file

@ -26,6 +26,7 @@ class DflyParams:
path: str path: str
cwd: str cwd: str
gdb: bool gdb: bool
direct_output: bool
buffered_out: bool buffered_out: bool
args: Dict[str, Union[str, None]] args: Dict[str, Union[str, None]]
existing_port: int existing_port: int
@ -186,7 +187,7 @@ class DflyInstance:
try: try:
self.get_port_from_psutil() self.get_port_from_psutil()
logging.debug( logging.debug(
f"Process started after {time.time() - s:.2f} seconds. port={self.port}" f"Process {self.proc.pid} started after {time.time() - s:.2f} seconds. port={self.port}"
) )
break break
except RuntimeError: except RuntimeError:
@ -202,18 +203,19 @@ class DflyInstance:
sed_cmd = ["sed", "-u", "-e", sed_format] sed_cmd = ["sed", "-u", "-e", sed_format]
if self.params.buffered_out: if self.params.buffered_out:
sed_cmd.remove("-u") sed_cmd.remove("-u")
self.sed_proc = subprocess.Popen( if not self.params.direct_output:
sed_cmd, self.sed_proc = subprocess.Popen(
stdin=self.proc.stdout, sed_cmd,
stdout=subprocess.PIPE, stdin=self.proc.stdout,
bufsize=1, stdout=subprocess.PIPE,
universal_newlines=True, bufsize=1,
) universal_newlines=True,
self.stacktrace = [] )
self.sed_thread = threading.Thread( self.stacktrace = []
target=read_sedout, args=(self.sed_proc.stdout, self.stacktrace), daemon=True self.sed_thread = threading.Thread(
) target=read_sedout, args=(self.sed_proc.stdout, self.stacktrace), daemon=True
self.sed_thread.start() )
self.sed_thread.start()
def set_proc_to_none(self): def set_proc_to_none(self):
self.proc = None self.proc = None
@ -235,7 +237,8 @@ class DflyInstance:
# if the return code is positive it means abnormal exit # if the return code is positive it means abnormal exit
if proc.returncode != 0: if proc.returncode != 0:
raise Exception( raise Exception(
f"Dragonfly did not terminate gracefully, exit code {proc.returncode}" f"Dragonfly did not terminate gracefully, exit code {proc.returncode}, "
f"pid: {proc.pid}"
) )
except subprocess.TimeoutExpired: except subprocess.TimeoutExpired:
@ -268,15 +271,18 @@ class DflyInstance:
all_args = self.format_args(self.args) all_args = self.format_args(self.args)
real_path = os.path.realpath(self.params.path) real_path = os.path.realpath(self.params.path)
logging.debug(f"Starting instance with arguments {' '.join(all_args)} from {real_path}")
run_cmd = [self.params.path, *all_args] run_cmd = [self.params.path, *all_args]
if self.params.gdb: if self.params.gdb:
run_cmd = ["gdb", "--ex", "r", "--args"] + run_cmd run_cmd = ["gdb", "--ex", "r", "--args"] + run_cmd
self.proc = subprocess.Popen( self.proc = subprocess.Popen(
run_cmd, cwd=self.params.cwd, stdout=subprocess.PIPE, stderr=subprocess.STDOUT run_cmd,
cwd=self.params.cwd,
stdout=None if self.params.direct_output else subprocess.PIPE,
stderr=subprocess.STDOUT,
) )
logging.debug(f"Starting {real_path} {' '.join(all_args)}, pid {self.proc.pid}")
def _check_status(self): def _check_status(self):
if not self.params.existing_port: if not self.params.existing_port: