mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-12 19:05:47 +02:00
fix(server): Fix replication bug, add gdb opt to pytest (#513)
fix(server): Fix small bugs, add gdb opt to pytest Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
e43032842e
commit
2493434b23
7 changed files with 68 additions and 39 deletions
|
@ -884,6 +884,8 @@ void RdbSaver::Impl::Cancel() {
|
||||||
dfly::SliceSnapshot::DbRecord rec;
|
dfly::SliceSnapshot::DbRecord rec;
|
||||||
while (channel_.Pop(rec)) {
|
while (channel_.Pop(rec)) {
|
||||||
}
|
}
|
||||||
|
|
||||||
|
snapshot->Join();
|
||||||
}
|
}
|
||||||
|
|
||||||
void RdbSaver::Impl::FillFreqMap(RdbTypeFreqMap* dest) const {
|
void RdbSaver::Impl::FillFreqMap(RdbTypeFreqMap* dest) const {
|
||||||
|
|
|
@ -624,8 +624,8 @@ void Replica::FullSyncDflyFb(SyncBlock* sb, string eof_token) {
|
||||||
std::unique_lock lk(sb->mu_);
|
std::unique_lock lk(sb->mu_);
|
||||||
sb->flows_left--;
|
sb->flows_left--;
|
||||||
ran = true;
|
ran = true;
|
||||||
}
|
|
||||||
sb->cv_.notify_all();
|
sb->cv_.notify_all();
|
||||||
|
}
|
||||||
});
|
});
|
||||||
loader.Load(&ps);
|
loader.Load(&ps);
|
||||||
|
|
||||||
|
|
|
@ -782,7 +782,7 @@ static void RunStage(bool new_version, std::function<void(unsigned)> cb) {
|
||||||
};
|
};
|
||||||
|
|
||||||
using PartialSaveOpts =
|
using PartialSaveOpts =
|
||||||
tuple<const string& /*filename*/, const string& /*path*/, absl::Time /*start*/>;
|
tuple<const fs::path& /*filename*/, const fs::path& /*path*/, absl::Time /*start*/>;
|
||||||
|
|
||||||
// Start saving a single snapshot of a multi-file dfly snapshot.
|
// Start saving a single snapshot of a multi-file dfly snapshot.
|
||||||
// If shard is null, then this is the summary file.
|
// If shard is null, then this is the summary file.
|
||||||
|
@ -790,18 +790,17 @@ error_code DoPartialSave(PartialSaveOpts opts, const dfly::StringVec& scripts,
|
||||||
RdbSnapshot* snapshot, EngineShard* shard) {
|
RdbSnapshot* snapshot, EngineShard* shard) {
|
||||||
auto [filename, path, now] = opts;
|
auto [filename, path, now] = opts;
|
||||||
// Construct resulting filename.
|
// Construct resulting filename.
|
||||||
fs::path file = filename, abs_path = path;
|
fs::path full_filename = filename;
|
||||||
if (shard == nullptr) {
|
if (shard == nullptr) {
|
||||||
ExtendFilename(now, "summary", &file);
|
ExtendFilename(now, "summary", &full_filename);
|
||||||
} else {
|
} else {
|
||||||
ExtendFilenameWithShard(now, shard->shard_id(), &file);
|
ExtendFilenameWithShard(now, shard->shard_id(), &full_filename);
|
||||||
}
|
}
|
||||||
abs_path /= file; // use / operator to concatenate paths.
|
fs::path full_path = path / full_filename; // use / operator to concatenate paths.
|
||||||
VLOG(1) << "Saving partial file to " << abs_path;
|
|
||||||
|
|
||||||
// Start rdb saving.
|
// Start rdb saving.
|
||||||
SaveMode mode = shard == nullptr ? SaveMode::SUMMARY : SaveMode::SINGLE_SHARD;
|
SaveMode mode = shard == nullptr ? SaveMode::SUMMARY : SaveMode::SINGLE_SHARD;
|
||||||
std::error_code local_ec = snapshot->Start(mode, abs_path.generic_string(), scripts);
|
error_code local_ec = snapshot->Start(mode, full_path.generic_string(), scripts);
|
||||||
|
|
||||||
if (!local_ec && mode == SaveMode::SINGLE_SHARD) {
|
if (!local_ec && mode == SaveMode::SINGLE_SHARD) {
|
||||||
snapshot->StartInShard(shard);
|
snapshot->StartInShard(shard);
|
||||||
|
@ -897,7 +896,6 @@ GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) {
|
||||||
|
|
||||||
ExtendFilenameWithShard(start, -1, &filename);
|
ExtendFilenameWithShard(start, -1, &filename);
|
||||||
path /= filename; // use / operator to concatenate paths.
|
path /= filename; // use / operator to concatenate paths.
|
||||||
VLOG(1) << "Saving to " << path;
|
|
||||||
|
|
||||||
snapshots[0].reset(new RdbSnapshot(fq_threadpool_.get()));
|
snapshots[0].reset(new RdbSnapshot(fq_threadpool_.get()));
|
||||||
const auto lua_scripts = script_mgr_->GetLuaScripts();
|
const auto lua_scripts = script_mgr_->GetLuaScripts();
|
||||||
|
@ -933,13 +931,11 @@ GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) {
|
||||||
absl::Duration dur = absl::Now() - start;
|
absl::Duration dur = absl::Now() - start;
|
||||||
double seconds = double(absl::ToInt64Milliseconds(dur)) / 1000;
|
double seconds = double(absl::ToInt64Milliseconds(dur)) / 1000;
|
||||||
|
|
||||||
{
|
|
||||||
LOG(INFO) << "Saving " << path << " finished after "
|
|
||||||
<< strings::HumanReadableElapsedTime(seconds);
|
|
||||||
}
|
|
||||||
|
|
||||||
// Populate LastSaveInfo.
|
// Populate LastSaveInfo.
|
||||||
if (!ec) {
|
if (!ec) {
|
||||||
|
LOG(INFO) << "Saving " << path << " finished after "
|
||||||
|
<< strings::HumanReadableElapsedTime(seconds);
|
||||||
|
|
||||||
save_info = make_shared<LastSaveInfo>();
|
save_info = make_shared<LastSaveInfo>();
|
||||||
for (const auto& k_v : rdb_name_map) {
|
for (const auto& k_v : rdb_name_map) {
|
||||||
save_info->freq_map.emplace_back(k_v);
|
save_info->freq_map.emplace_back(k_v);
|
||||||
|
|
|
@ -5,6 +5,17 @@
|
||||||
|
|
||||||
The tests assume you have the "dragonfly" binary in `<root>/build-dbg` directory.
|
The tests assume you have the "dragonfly" binary in `<root>/build-dbg` directory.
|
||||||
You can override the location of the binary using `DRAGONFLY_PATH` environment var.
|
You can override the location of the binary using `DRAGONFLY_PATH` environment var.
|
||||||
|
|
||||||
|
### Important fixtures
|
||||||
|
|
||||||
|
- `df_server` is the default instance that is available for testing. Use the `dfly_args` decorator to change its default arguments.
|
||||||
|
- `client` and `async_client` are clients to the default instance. The default instance is re-used accross tests with the same arguments, but each new client flushes the instance.
|
||||||
|
- `pool` and `async_pool` are client pools that are connected to the default instance
|
||||||
|
|
||||||
|
### Custom arguments
|
||||||
|
|
||||||
|
- use `--gdb` to start all instances inside gdb.
|
||||||
|
|
||||||
### Before you start
|
### Before you start
|
||||||
Please make sure that you have python 3 installed on you local host.
|
Please make sure that you have python 3 installed on you local host.
|
||||||
If have more both python 2 and python 3 installed on you host, you can run the tests with the following command:
|
If have more both python 2 and python 3 installed on you host, you can run the tests with the following command:
|
||||||
|
@ -20,6 +31,7 @@ Then install all the required dependencies for the tests:
|
||||||
```
|
```
|
||||||
pip install -r dragonfly/requirements.txt
|
pip install -r dragonfly/requirements.txt
|
||||||
```
|
```
|
||||||
|
|
||||||
### Running the tests
|
### Running the tests
|
||||||
to run pytest, run:
|
to run pytest, run:
|
||||||
`pytest -xv dragonfly`
|
`pytest -xv dragonfly`
|
||||||
|
@ -33,15 +45,6 @@ Pytest will recursively search the `tests/dragonfly` directory for files matchin
|
||||||
|
|
||||||
**Note**: When making a new directory in `tests/dragonfly` be sure to create an `__init__.py` file to avoid [name conflicts](https://docs.pytest.org/en/7.1.x/explanation/goodpractices.html#tests-outside-application-code)
|
**Note**: When making a new directory in `tests/dragonfly` be sure to create an `__init__.py` file to avoid [name conflicts](https://docs.pytest.org/en/7.1.x/explanation/goodpractices.html#tests-outside-application-code)
|
||||||
|
|
||||||
### Interacting with Dragonfly
|
|
||||||
Pytest allows for parameters with a specific name to be automatically resolved through [fixtures](https://docs.pytest.org/en/7.1.x/explanation/fixtures.html) for any test function. The following fixtures are to be used to interact with Dragonfly when writing a test:
|
|
||||||
| Name | Type | [Scope](https://docs.pytest.org/en/7.1.x/how-to/fixtures.html?highlight=scope#scope-sharing-fixtures-across-classes-modules-packages-or-session) | Description
|
|
||||||
| ----- | ---- | ----- | ----------- |
|
|
||||||
| tmp_dir | [pathlib.Path](https://docs.python.org/3/library/pathlib.html) | Session | The temporary directory the Dragonfly binary will be running in. The environment variable `DRAGONFLY_TMP` is also set to this value |
|
|
||||||
| test_env | `dict` | Session | The environment variables used when running Dragonfly as a dictionary |
|
|
||||||
| client | [redis.Redis](https://redis-py.readthedocs.io/en/stable/connections.html#generic-client) | Class | The redis client to interact with the Dragonfly instance |
|
|
||||||
| async_client | [aioredis.Redis](https://aioredis.readthedocs.io/en/latest/api/high-level/#aioredis.client.Redis) | Class | The async redis client to interact with the Dragonfly instance |
|
|
||||||
|
|
||||||
### Passing CLI commands to Dragonfly
|
### Passing CLI commands to Dragonfly
|
||||||
To pass custom flags to the Dragonfly executable two class decorators have been created. `@dfly_args` allows you to pass a list of parameters to the Dragonfly executable, similarly `@dfly_multi_test_args` allows you to specify multiple parameter configurations to test with a given test class.
|
To pass custom flags to the Dragonfly executable two class decorators have been created. `@dfly_args` allows you to pass a list of parameters to the Dragonfly executable, similarly `@dfly_multi_test_args` allows you to specify multiple parameter configurations to test with a given test class.
|
||||||
|
|
||||||
|
|
|
@ -6,6 +6,16 @@ import subprocess
|
||||||
import time
|
import time
|
||||||
import subprocess
|
import subprocess
|
||||||
|
|
||||||
|
from dataclasses import dataclass
|
||||||
|
|
||||||
|
|
||||||
|
@dataclass
|
||||||
|
class DflyParams:
|
||||||
|
path: str
|
||||||
|
cwd: str
|
||||||
|
gdb: bool
|
||||||
|
env: any
|
||||||
|
|
||||||
|
|
||||||
class DflyInstance:
|
class DflyInstance:
|
||||||
"""
|
"""
|
||||||
|
@ -13,20 +23,25 @@ class DflyInstance:
|
||||||
with fixed arguments.
|
with fixed arguments.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, path, args, cwd):
|
def __init__(self, params: DflyParams, args):
|
||||||
self.path = path
|
|
||||||
self.args = args
|
self.args = args
|
||||||
self.cwd = cwd
|
self.params = params
|
||||||
self.proc = None
|
self.proc = None
|
||||||
|
|
||||||
def start(self):
|
def start(self):
|
||||||
arglist = DflyInstance.format_args(self.args)
|
arglist = DflyInstance.format_args(self.args)
|
||||||
|
|
||||||
print(f"Starting instance on {self.port} with arguments {arglist}")
|
print(f"Starting instance on {self.port} with arguments {arglist}")
|
||||||
self.proc = subprocess.Popen([self.path, *arglist], cwd=self.cwd)
|
|
||||||
|
args = [self.params.path, *arglist]
|
||||||
|
if self.params.gdb:
|
||||||
|
args = ["gdb", "--ex", "r", "--args"] + args
|
||||||
|
|
||||||
|
self.proc = subprocess.Popen(args, cwd=self.params.cwd)
|
||||||
|
|
||||||
# Give Dragonfly time to start and detect possible failure causes
|
# Give Dragonfly time to start and detect possible failure causes
|
||||||
time.sleep(0.3)
|
# Gdb starts slowly
|
||||||
|
time.sleep(0.4 if not self.params.gdb else 3.0)
|
||||||
|
|
||||||
return_code = self.proc.poll()
|
return_code = self.proc.poll()
|
||||||
if return_code is not None:
|
if return_code is not None:
|
||||||
|
@ -70,19 +85,17 @@ class DflyInstanceFactory:
|
||||||
A factory for creating dragonfly instances with pre-supplied arguments.
|
A factory for creating dragonfly instances with pre-supplied arguments.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
def __init__(self, env, cwd, path, args):
|
def __init__(self, params: DflyParams, args):
|
||||||
self.env = env
|
|
||||||
self.cwd = cwd
|
|
||||||
self.path = path
|
|
||||||
self.args = args
|
self.args = args
|
||||||
|
self.params = params
|
||||||
self.instances = []
|
self.instances = []
|
||||||
|
|
||||||
def create(self, **kwargs) -> DflyInstance:
|
def create(self, **kwargs) -> DflyInstance:
|
||||||
args = {**self.args, **kwargs}
|
args = {**self.args, **kwargs}
|
||||||
for k, v in args.items():
|
for k, v in args.items():
|
||||||
args[k] = v.format(**self.env) if isinstance(v, str) else v
|
args[k] = v.format(**self.params.env) if isinstance(v, str) else v
|
||||||
|
|
||||||
instance = DflyInstance(self.path, args, self.cwd)
|
instance = DflyInstance(self.params, args)
|
||||||
self.instances.append(instance)
|
self.instances.append(instance)
|
||||||
return instance
|
return instance
|
||||||
|
|
||||||
|
|
|
@ -12,7 +12,7 @@ import aioredis
|
||||||
from pathlib import Path
|
from pathlib import Path
|
||||||
from tempfile import TemporaryDirectory
|
from tempfile import TemporaryDirectory
|
||||||
|
|
||||||
from . import DflyInstance, DflyInstanceFactory
|
from . import DflyInstance, DflyInstanceFactory, DflyParams
|
||||||
|
|
||||||
DATABASE_INDEX = 1
|
DATABASE_INDEX = 1
|
||||||
|
|
||||||
|
@ -40,6 +40,12 @@ def test_env(tmp_dir: Path):
|
||||||
return env
|
return env
|
||||||
|
|
||||||
|
|
||||||
|
def pytest_addoption(parser):
|
||||||
|
parser.addoption(
|
||||||
|
'--gdb', action='store_true', default=False, help='Run instances in gdb'
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="session", params=[{}])
|
@pytest.fixture(scope="session", params=[{}])
|
||||||
def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory:
|
def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory:
|
||||||
"""
|
"""
|
||||||
|
@ -50,15 +56,22 @@ def df_factory(request, tmp_dir, test_env) -> DflyInstanceFactory:
|
||||||
scripts_dir, '../../build-dbg/dragonfly'))
|
scripts_dir, '../../build-dbg/dragonfly'))
|
||||||
|
|
||||||
args = request.param if request.param else {}
|
args = request.param if request.param else {}
|
||||||
factory = DflyInstanceFactory(test_env, tmp_dir, path=path, args=args)
|
|
||||||
|
params = DflyParams(
|
||||||
|
path=path,
|
||||||
|
cwd=tmp_dir,
|
||||||
|
gdb=request.config.getoption("--gdb"),
|
||||||
|
env=test_env
|
||||||
|
)
|
||||||
|
|
||||||
|
factory = DflyInstanceFactory(params, args)
|
||||||
yield factory
|
yield factory
|
||||||
factory.stop_all()
|
factory.stop_all()
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="function")
|
@pytest.fixture(scope="function")
|
||||||
def df_local_factory(df_factory: DflyInstanceFactory):
|
def df_local_factory(df_factory: DflyInstanceFactory):
|
||||||
factory = DflyInstanceFactory(
|
factory = DflyInstanceFactory(df_factory.params, df_factory.args)
|
||||||
df_factory.env, df_factory.cwd, df_factory.path, df_factory.args)
|
|
||||||
yield factory
|
yield factory
|
||||||
factory.stop_all()
|
factory.stop_all()
|
||||||
|
|
||||||
|
|
|
@ -205,7 +205,7 @@ async def test_disconnect(df_local_factory, t_master, t_crash_fs, t_crash_ss, t_
|
||||||
def check_gen(): return gen_test_data(n_keys//5, seed=0)
|
def check_gen(): return gen_test_data(n_keys//5, seed=0)
|
||||||
|
|
||||||
await batch_fill_data_async(c_master, check_gen())
|
await batch_fill_data_async(c_master, check_gen())
|
||||||
await asyncio.sleep(0.1)
|
await asyncio.sleep(1.0)
|
||||||
for _, c_replica, _ in replicas_of_type(lambda t: t > 1):
|
for _, c_replica, _ in replicas_of_type(lambda t: t > 1):
|
||||||
await batch_check_data_async(c_replica, check_gen())
|
await batch_check_data_async(c_replica, check_gen())
|
||||||
|
|
||||||
|
@ -217,6 +217,8 @@ async def test_disconnect(df_local_factory, t_master, t_crash_fs, t_crash_ss, t_
|
||||||
await asyncio.gather(*(disconnect(*args) for args
|
await asyncio.gather(*(disconnect(*args) for args
|
||||||
in replicas_of_type(lambda t: t == 2)))
|
in replicas_of_type(lambda t: t == 2)))
|
||||||
|
|
||||||
|
await asyncio.sleep(0.5)
|
||||||
|
|
||||||
# Check phase 3 replica survived
|
# Check phase 3 replica survived
|
||||||
for _, c_replica, _ in replicas_of_type(lambda t: t == 2):
|
for _, c_replica, _ in replicas_of_type(lambda t: t == 2):
|
||||||
assert await c_replica.ping()
|
assert await c_replica.ping()
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue