diff --git a/src/redis/rdb.h b/src/redis/rdb.h index df0c4fdc1..d0de8a29c 100644 --- a/src/redis/rdb.h +++ b/src/redis/rdb.h @@ -155,6 +155,6 @@ // Currently moved here from server.h #define LONG_STR_SIZE 21 /* Bytes needed for long -> str + '\0' */ -#define REDIS_VERSION "999.999.999" +#define REDIS_VERSION "6.2.11" #endif diff --git a/src/server/detail/snapshot_storage.cc b/src/server/detail/snapshot_storage.cc index e85695e13..93e534f8c 100644 --- a/src/server/detail/snapshot_storage.cc +++ b/src/server/detail/snapshot_storage.cc @@ -90,7 +90,8 @@ string SnapshotStorage::FindMatchingFile(string_view prefix, string_view dbfilen return {}; } -io::Result, GenericError> SnapshotStorage::ExpandSnapshot(const string& load_path) { +io::Result SnapshotStorage::ExpandSnapshot( + const string& load_path) { if (!(absl::EndsWith(load_path, ".rdb") || absl::EndsWith(load_path, "summary.dfs"))) { return nonstd::make_unexpected( GenericError(std::make_error_code(std::errc::invalid_argument), "Bad filename extension")); @@ -101,17 +102,20 @@ io::Result, GenericError> SnapshotStorage::ExpandSnapshot(const s return nonstd::make_unexpected(GenericError(ec, "File not found")); } - vector paths{{load_path}}; + ExpandResult result; // Collect all other files in case we're loading dfs. if (absl::EndsWith(load_path, "summary.dfs")) { auto res = ExpandFromPath(load_path); if (!res) { - return res; + return nonstd::make_unexpected(res.error()); } - paths.insert(paths.end(), res->begin(), res->end()); + result = std::move(*res); + result.push_back(load_path); + } else { + result.push_back(load_path); } - return paths; + return result; } FileSnapshotStorage::FileSnapshotStorage(fb2::FiberQueueThreadPool* fq_threadpool) diff --git a/src/server/detail/snapshot_storage.h b/src/server/detail/snapshot_storage.h index 5154cb4c3..6f217a545 100644 --- a/src/server/detail/snapshot_storage.h +++ b/src/server/detail/snapshot_storage.h @@ -51,8 +51,9 @@ class SnapshotStorage { virtual io::Result LoadPath(std::string_view dir, std::string_view dbfilename) = 0; + using ExpandResult = std::vector; // Searches for all the relevant snapshot files given the RDB file or DFS summary file path. - io::Result, GenericError> ExpandSnapshot(const std::string& load_path); + io::Result ExpandSnapshot(const std::string& load_path); virtual bool IsCloud() const { return false; diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 0b889fc4f..a6873a14c 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -102,6 +102,8 @@ string error_category::message(int ev) const { switch (ev) { case errc::wrong_signature: return "Wrong signature while trying to load from rdb file"; + case errc::out_of_memory: + return "Out of memory, or used memory is too high"; default: return absl::StrCat("Internal error when loading RDB file ", ev); break; @@ -2596,7 +2598,9 @@ error_code RdbLoader::HandleAux() { } else if (auxkey == "lua") { LoadScriptFromAux(std::move(auxval)); } else if (auxkey == "redis-ver") { - VLOG(1) << "Loading RDB produced by version " << auxval; + VLOG(1) << "Loading RDB produced by Redis version " << auxval; + } else if (auxkey == "df-ver") { + VLOG(1) << "Loading RDB produced by Dragonfly version " << auxval; } else if (auxkey == "ctime") { int64_t ctime; if (absl::SimpleAtoi(auxval, &ctime)) { @@ -2606,9 +2610,14 @@ error_code RdbLoader::HandleAux() { VLOG(1) << "RDB age " << strings::HumanReadableElapsedTime(age); } } else if (auxkey == "used-mem") { - long long usedmem; + int64_t usedmem; if (absl::SimpleAtoi(auxval, &usedmem)) { VLOG(1) << "RDB memory usage when created " << strings::HumanReadableNumBytes(usedmem); + if (usedmem > ssize_t(max_memory_limit)) { + LOG(WARNING) << "Could not load snapshot - its used memory is " << usedmem + << " but the limit is " << max_memory_limit; + return RdbError(errc::out_of_memory); + } } } else if (auxkey == "aof-preamble") { long long haspreamble; diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 9bab44b8f..940bace26 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -1561,16 +1561,18 @@ void RdbSaver::FillFreqMap(RdbTypeFreqMap* freq_map) { error_code RdbSaver::SaveAux(const GlobalData& glob_state) { static_assert(sizeof(void*) == 8, ""); - int aof_preamble = false; error_code ec; /* Add a few fields about the state when the RDB was created. */ RETURN_ON_ERR(impl_->SaveAuxFieldStrStr("redis-ver", REDIS_VERSION)); + RETURN_ON_ERR(impl_->SaveAuxFieldStrStr("df-ver", GetVersion())); RETURN_ON_ERR(SaveAuxFieldStrInt("redis-bits", 64)); RETURN_ON_ERR(SaveAuxFieldStrInt("ctime", time(NULL))); - RETURN_ON_ERR(SaveAuxFieldStrInt("used-mem", used_mem_current.load(memory_order_relaxed))); - RETURN_ON_ERR(SaveAuxFieldStrInt("aof-preamble", aof_preamble)); + auto used_mem = used_mem_current.load(memory_order_relaxed); + VLOG(1) << "Used memory during save: " << used_mem; + RETURN_ON_ERR(SaveAuxFieldStrInt("used-mem", used_mem)); + RETURN_ON_ERR(SaveAuxFieldStrInt("aof-preamble", 0)); // Save lua scripts only in rdb or summary file DCHECK(save_mode_ != SaveMode::SINGLE_SHARD || glob_state.lua_scripts.empty()); diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index cd45662c0..890946fd5 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -94,7 +94,7 @@ TEST_F(RdbTest, Crc) { TEST_F(RdbTest, LoadEmpty) { auto ec = LoadRdb("empty.rdb"); - CHECK(!ec); + ASSERT_FALSE(ec) << ec; } TEST_F(RdbTest, LoadSmall6) { @@ -646,4 +646,13 @@ TEST_F(RdbTest, LoadHugeStream) { ASSERT_EQ(2000, CheckedInt({"xlen", "test:0"})); } +TEST_F(RdbTest, SnapshotTooBig) { + // Run({"debug", "populate", "10000", "foo", "1000"}); + // usleep(5000); // let the stats to sync + max_memory_limit = 100000; + used_mem_current = 1000000; + auto resp = Run({"debug", "reload"}); + ASSERT_THAT(resp, ErrArg("Out of memory")); +} + } // namespace dfly diff --git a/src/server/server_family.cc b/src/server/server_family.cc index e78bd74d1..dd01f41e6 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1083,25 +1083,24 @@ std::optional> ServerFamily::Load(string_view load_pat DCHECK_GT(shard_count(), 0u); + // TODO: to move it to helio. + auto immediate = [](auto val) { + fb2::Future future; + future.Resolve(val); + return future; + }; + if (ServerState::tlocal() && !ServerState::tlocal()->is_master) { - fb2::Future future; - future.Resolve(string("Replica cannot load data")); - return future; + return immediate(string("Replica cannot load data")); } - auto paths_result = snapshot_storage_->ExpandSnapshot(path); - if (!paths_result) { - LOG(ERROR) << "Failed to load snapshot: " << paths_result.error().Format(); + auto expand_result = snapshot_storage_->ExpandSnapshot(path); + if (!expand_result) { + LOG(ERROR) << "Failed to load snapshot: " << expand_result.error().Format(); - fb2::Future future; - future.Resolve(paths_result.error()); - return future; + return immediate(expand_result.error()); } - std::vector paths = *paths_result; - - LOG(INFO) << "Loading " << path; - auto new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::LOADING); if (new_state != GlobalState::LOADING) { LOG(WARNING) << new_state << " in progress, ignored"; @@ -1110,6 +1109,10 @@ std::optional> ServerFamily::Load(string_view load_pat auto& pool = service_.proactor_pool(); + const vector& paths = *expand_result; + + LOG(INFO) << "Loading " << path; + vector load_fibers; load_fibers.reserve(paths.size()); @@ -1125,39 +1128,36 @@ std::optional> ServerFamily::Load(string_view load_pat proactor = pool.GetNextProactor(); } - auto load_fiber = [this, aggregated_result, existing_keys, path = std::move(path)]() { + auto load_func = [this, aggregated_result, existing_keys, path = std::move(path)]() { auto load_result = LoadRdb(path, existing_keys); if (load_result.has_value()) aggregated_result->keys_read.fetch_add(*load_result); else aggregated_result->first_error = load_result.error(); }; - load_fibers.push_back(proactor->LaunchFiber(std::move(load_fiber))); + load_fibers.push_back(proactor->LaunchFiber(std::move(load_func))); } fb2::Future future; // Run fiber that empties the channel and sets ec_promise. - auto load_join_fiber = [this, aggregated_result, load_fibers = std::move(load_fibers), - future]() mutable { + auto load_join_func = [this, aggregated_result, load_fibers = std::move(load_fibers), + future]() mutable { for (auto& fiber : load_fibers) { fiber.Join(); } if (aggregated_result->first_error) { - LOG(ERROR) << "Rdb load failed. " << (*aggregated_result->first_error).message(); - service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); - future.Resolve(*aggregated_result->first_error); - return; + LOG(ERROR) << "Rdb load failed: " << (*aggregated_result->first_error).message(); + } else { + RdbLoader::PerformPostLoad(&service_); + LOG(INFO) << "Load finished, num keys read: " << aggregated_result->keys_read; } - RdbLoader::PerformPostLoad(&service_); - - LOG(INFO) << "Load finished, num keys read: " << aggregated_result->keys_read; service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); future.Resolve(*(aggregated_result->first_error)); }; - pool.GetNextProactor()->Dispatch(std::move(load_join_fiber)); + pool.GetNextProactor()->Dispatch(std::move(load_join_func)); return future; } @@ -1196,6 +1196,7 @@ void ServerFamily::SnapshotScheduling() { io::Result ServerFamily::LoadRdb(const std::string& rdb_file, LoadExistingKeys existing_keys) { VLOG(1) << "Loading data from " << rdb_file; + CHECK(fb2::ProactorBase::IsProactorThread()) << "must be called from proactor thread"; error_code ec; io::ReadonlyFileOrError res = snapshot_storage_->OpenReadFile(rdb_file); diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index de57627e6..8d160f4e8 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -195,7 +195,6 @@ disconnect_cases = [ ] -@pytest.mark.asyncio @pytest.mark.parametrize("t_master, t_crash_fs, t_crash_ss, t_disonnect, n_keys", disconnect_cases) async def test_disconnect_replica( df_factory: DflyInstanceFactory, @@ -327,7 +326,6 @@ master_crash_cases = [ ] -@pytest.mark.asyncio @pytest.mark.slow @pytest.mark.parametrize("t_master, t_replicas, n_random_crashes, n_keys", master_crash_cases) async def test_disconnect_master( @@ -397,7 +395,6 @@ Test re-connecting replica to different masters. rotating_master_cases = [(4, [4, 4, 4, 4], dict(keys=2_000, dbcount=4))] -@pytest.mark.asyncio @pytest.mark.slow @pytest.mark.parametrize("t_replica, t_masters, seeder_config", rotating_master_cases) async def test_rotating_masters(df_factory, df_seeder_factory, t_replica, t_masters, seeder_config): @@ -433,7 +430,6 @@ async def test_rotating_masters(df_factory, df_seeder_factory, t_replica, t_mast fill_task.cancel() -@pytest.mark.asyncio @pytest.mark.slow async def test_cancel_replication_immediately(df_factory, df_seeder_factory: DflySeederFactory): """ @@ -491,7 +487,6 @@ Check replica keys at the end. """ -@pytest.mark.asyncio async def test_flushall(df_factory): master = df_factory.create(proactor_threads=4) replica = df_factory.create(proactor_threads=2) @@ -542,7 +537,6 @@ Test journal rewrites. @dfly_args({"proactor_threads": 4}) -@pytest.mark.asyncio async def test_rewrites(df_factory): CLOSE_TIMESTAMP = int(time.time()) + 100 CLOSE_TIMESTAMP_MS = CLOSE_TIMESTAMP * 1000 @@ -727,7 +721,6 @@ Test automatic replication of expiry. @dfly_args({"proactor_threads": 4}) -@pytest.mark.asyncio async def test_expiry(df_factory: DflyInstanceFactory, n_keys=1000): master = df_factory.create() replica = df_factory.create() @@ -866,7 +859,6 @@ return 'OK' """ -@pytest.mark.asyncio @pytest.mark.parametrize("t_master, t_replicas, num_ops, num_keys, num_par, flags", script_cases) async def test_scripts(df_factory, t_master, t_replicas, num_ops, num_keys, num_par, flags): master = df_factory.create(proactor_threads=t_master) @@ -900,7 +892,6 @@ async def test_scripts(df_factory, t_master, t_replicas, num_ops, num_keys, num_ @dfly_args({"proactor_threads": 4}) -@pytest.mark.asyncio async def test_auth_master(df_factory, n_keys=20): masterpass = "requirepass" replicapass = "replicapass" @@ -966,7 +957,6 @@ async def test_script_transfer(df_factory): @dfly_args({"proactor_threads": 4}) -@pytest.mark.asyncio async def test_role_command(df_factory, n_keys=20): master = df_factory.create() replica = df_factory.create() @@ -1064,7 +1054,6 @@ async def assert_replica_reconnections(replica_inst, initial_reconnects_count): @dfly_args({"proactor_threads": 2}) -@pytest.mark.asyncio async def test_replication_info(df_factory: DflyInstanceFactory, df_seeder_factory, n_keys=2000): master = df_factory.create() replica = df_factory.create(replication_acks_interval=100) @@ -1096,7 +1085,6 @@ More details in https://github.com/dragonflydb/dragonfly/issues/1231 """ -@pytest.mark.asyncio @pytest.mark.slow async def test_flushall_in_full_sync(df_factory): master = df_factory.create(proactor_threads=4) @@ -1155,7 +1143,6 @@ redis.call('SET', 'A', 'ErrroR') """ -@pytest.mark.asyncio async def test_readonly_script(df_factory): master = df_factory.create(proactor_threads=2) replica = df_factory.create(proactor_threads=2) @@ -1188,7 +1175,6 @@ take_over_cases = [ @pytest.mark.parametrize("master_threads, replica_threads", take_over_cases) -@pytest.mark.asyncio async def test_take_over_counters(df_factory, master_threads, replica_threads): master = df_factory.create(proactor_threads=master_threads) replica1 = df_factory.create(proactor_threads=replica_threads) @@ -1243,7 +1229,6 @@ async def test_take_over_counters(df_factory, master_threads, replica_threads): @pytest.mark.parametrize("master_threads, replica_threads", take_over_cases) -@pytest.mark.asyncio async def test_take_over_seeder( request, df_factory, df_seeder_factory, master_threads, replica_threads ): @@ -1299,7 +1284,6 @@ async def test_take_over_seeder( @pytest.mark.parametrize("master_threads, replica_threads", [[4, 4]]) -@pytest.mark.asyncio async def test_take_over_read_commands(df_factory, master_threads, replica_threads): master = df_factory.create(proactor_threads=master_threads) replica = df_factory.create(proactor_threads=replica_threads) @@ -1333,7 +1317,6 @@ async def test_take_over_read_commands(df_factory, master_threads, replica_threa await promt_task -@pytest.mark.asyncio async def test_take_over_timeout(df_factory, df_seeder_factory): master = df_factory.create(proactor_threads=2) replica = df_factory.create(proactor_threads=2) @@ -1379,7 +1362,6 @@ async def test_take_over_timeout(df_factory, df_seeder_factory): replication_cases = [(8, 8)] -@pytest.mark.asyncio @pytest.mark.parametrize("t_master, t_replica", replication_cases) async def test_no_tls_on_admin_port( df_factory: DflyInstanceFactory, @@ -1428,7 +1410,6 @@ async def test_no_tls_on_admin_port( replication_cases = [(8, 8, False), (8, 8, True)] -@pytest.mark.asyncio @pytest.mark.parametrize("t_master, t_replica, test_admin_port", replication_cases) async def test_tls_replication( df_factory, @@ -1521,7 +1502,6 @@ async def wait_for_replica_status( raise RuntimeError("Client did not become available in time!") -@pytest.mark.asyncio async def test_replicaof_flag(df_factory): # tests --replicaof works under normal conditions master = df_factory.create( @@ -1555,7 +1535,6 @@ async def test_replicaof_flag(df_factory): assert "VALUE" == val -@pytest.mark.asyncio async def test_replicaof_flag_replication_waits(df_factory): # tests --replicaof works when we launch replication before the master BASE_PORT = 1111 @@ -1599,7 +1578,6 @@ async def test_replicaof_flag_replication_waits(df_factory): assert "VALUE" == val -@pytest.mark.asyncio async def test_replicaof_flag_disconnect(df_factory): # test stopping replication when started using --replicaof master = df_factory.create( @@ -1639,7 +1617,6 @@ async def test_replicaof_flag_disconnect(df_factory): assert role[0] == "master" -@pytest.mark.asyncio async def test_df_crash_on_memcached_error(df_factory): master = df_factory.create( memcached_port=11211, @@ -1667,7 +1644,6 @@ async def test_df_crash_on_memcached_error(df_factory): memcached_client.set("key", "data", noreply=False) -@pytest.mark.asyncio async def test_df_crash_on_replicaof_flag(df_factory): master = df_factory.create( proactor_threads=2, @@ -1931,7 +1907,6 @@ async def test_search_with_stream(df_factory: DflyInstanceFactory): # @pytest.mark.slow -@pytest.mark.asyncio async def test_client_pause_with_replica(df_factory, df_seeder_factory): master = df_factory.create(proactor_threads=4) replica = df_factory.create(proactor_threads=4) @@ -2003,7 +1978,6 @@ async def test_replicaof_reject_on_load(df_factory, df_seeder_factory): await c_replica.execute_command(f"REPLICAOF localhost {master.port}") -@pytest.mark.asyncio async def test_heartbeat_eviction_propagation(df_factory): master = df_factory.create( proactor_threads=1, cache_mode="true", maxmemory="256mb", enable_heartbeat_eviction="false" @@ -2039,7 +2013,6 @@ async def test_heartbeat_eviction_propagation(df_factory): assert set(keys_master) == set(keys_replica) -@pytest.mark.asyncio async def test_policy_based_eviction_propagation(df_factory, df_seeder_factory): master = df_factory.create( proactor_threads=2, @@ -2076,7 +2049,6 @@ async def test_policy_based_eviction_propagation(df_factory, df_seeder_factory): assert set(keys_master).difference(keys_replica) == set() -@pytest.mark.asyncio async def test_journal_doesnt_yield_issue_2500(df_factory, df_seeder_factory): """ Issues many SETEX commands through a Lua script so that no yields are done between them. @@ -2129,7 +2101,6 @@ async def test_journal_doesnt_yield_issue_2500(df_factory, df_seeder_factory): assert set(keys_master) == set(keys_replica) -@pytest.mark.asyncio async def test_saving_replica(df_factory): master = df_factory.create(proactor_threads=1) replica = df_factory.create(proactor_threads=1, dbfilename=f"dump_{tmp_file_name()}") @@ -2157,7 +2128,6 @@ async def test_saving_replica(df_factory): assert not await is_saving(c_replica) -@pytest.mark.asyncio async def test_start_replicating_while_save(df_factory): master = df_factory.create(proactor_threads=4) replica = df_factory.create(proactor_threads=4, dbfilename=f"dump_{tmp_file_name()}") @@ -2183,7 +2153,6 @@ async def test_start_replicating_while_save(df_factory): assert not await is_saving(c_replica) -@pytest.mark.asyncio async def test_user_acl_replication(df_factory): master = df_factory.create(proactor_threads=4) replica = df_factory.create(proactor_threads=4) @@ -2217,7 +2186,6 @@ async def test_user_acl_replication(df_factory): @pytest.mark.parametrize("break_conn", [False, True]) -@pytest.mark.asyncio async def test_replica_reconnect(df_factory, break_conn): """ Test replica does not connect to master if master restarted @@ -2270,7 +2238,6 @@ async def test_replica_reconnect(df_factory, break_conn): assert await c_replica.execute_command("get k") == "6789" -@pytest.mark.asyncio async def test_announce_ip_port(df_factory): master = df_factory.create() replica = df_factory.create(replica_announce_ip="overrode-host", announce_port="1337") @@ -2291,7 +2258,6 @@ async def test_announce_ip_port(df_factory): assert port == "1337" -@pytest.mark.asyncio async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory, df_seeder_factory): # setting replication_timeout to a very small value to force the replica to timeout master = df_factory.create(replication_timeout=100, vmodule="replica=2,dflycmd=2") @@ -2435,7 +2401,6 @@ async def test_replicate_old_master( # For more information plz refer to the issue on gh: # https://github.com/dragonflydb/dragonfly/issues/3504 @dfly_args({"proactor_threads": 1}) -@pytest.mark.asyncio async def test_empty_hash_map_replicate_old_master(df_factory): cpu = platform.processor() if cpu != "x86_64": @@ -2494,7 +2459,6 @@ async def test_empty_hash_map_replicate_old_master(df_factory): # For more information plz refer to the issue on gh: # https://github.com/dragonflydb/dragonfly/issues/3504 @dfly_args({"proactor_threads": 1}) -@pytest.mark.asyncio async def test_empty_hashmap_loading_bug(df_factory: DflyInstanceFactory): cpu = platform.processor() if cpu != "x86_64": @@ -2565,7 +2529,6 @@ async def test_replicating_mc_flags(df_factory): assert c_mc_replica.get(f"key{i}") == str.encode(f"value{i}") -@pytest.mark.asyncio async def test_double_take_over(df_factory, df_seeder_factory): master = df_factory.create(proactor_threads=4, dbfilename="", admin_port=ADMIN_PORT) replica = df_factory.create(proactor_threads=4, dbfilename="", admin_port=ADMIN_PORT + 1) @@ -2607,7 +2570,6 @@ async def test_double_take_over(df_factory, df_seeder_factory): assert await seeder.compare(capture, port=master.port) -@pytest.mark.asyncio async def test_replica_of_replica(df_factory): # Can't connect a replica to a replica, but OK to connect 2 replicas to the same master master = df_factory.create(proactor_threads=2) @@ -2627,7 +2589,6 @@ async def test_replica_of_replica(df_factory): assert await c_replica2.execute_command(f"REPLICAOF localhost {master.port}") == "OK" -@pytest.mark.asyncio async def test_replication_timeout_on_full_sync_heartbeat_expiry( df_factory: DflyInstanceFactory, df_seeder_factory ): @@ -2680,7 +2641,6 @@ async def test_replication_timeout_on_full_sync_heartbeat_expiry( "element_size, elements_number", [(16, 20000), (20000, 16)], ) -@pytest.mark.asyncio async def test_big_containers(df_factory, element_size, elements_number): master = df_factory.create(proactor_threads=4) replica = df_factory.create(proactor_threads=4) @@ -2708,3 +2668,18 @@ async def test_big_containers(df_factory, element_size, elements_number): replica_data = await StaticSeeder.capture(c_replica) master_data = await StaticSeeder.capture(c_master) assert master_data == replica_data + + +async def test_master_too_big(df_factory): + master = df_factory.create(proactor_threads=4) + replica = df_factory.create(proactor_threads=2, maxmemory="600mb") + + df_factory.start_all([master, replica]) + c_master = master.client() + c_replica = replica.client() + await c_master.execute_command("DEBUG POPULATE 1000000 key 1000 RAND") + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + + # We should never sync due to used memory too high during full sync + with pytest.raises(TimeoutError): + await wait_available_async(c_replica, timeout=10) diff --git a/tests/dragonfly/utility.py b/tests/dragonfly/utility.py index b644e538d..8916221db 100644 --- a/tests/dragonfly/utility.py +++ b/tests/dragonfly/utility.py @@ -83,8 +83,8 @@ async def tick_timer(func, timeout=5, step=0.1): await asyncio.sleep(step) if last_error: - raise RuntimeError("Timed out!") from last_error - raise RuntimeError("Timed out!") + raise TimeoutError("Timed out!") from last_error + raise TimeoutError("Timed out!") async def info_tick_timer(client: aioredis.Redis, section=None, **kwargs): @@ -113,7 +113,7 @@ async def wait_available_async( assert "Dragonfly is loading the dataset in memory" in str(e) timeout -= time.time() - start if timeout <= 0: - raise RuntimeError("Timed out!") + raise TimeoutError("Timed out!") # Secondly for replicas, we make sure they reached stable state replicaton async for info, breaker in info_tick_timer(clients, "REPLICATION", timeout=timeout):