fix: localize timeout in test_cancel_replication_immediately test (#2048)

This commit is contained in:
Roman Gershman 2023-10-22 08:44:00 +03:00 committed by GitHub
parent 9e8d886ce9
commit 83d5b849a3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 18 additions and 14 deletions

View file

@ -81,7 +81,8 @@ error_code Replica::Start(ConnectionContext* cntx) {
ProactorBase* mythread = ProactorBase::me();
CHECK(mythread);
RETURN_ON_ERR(cntx_.SwitchErrorHandler(absl::bind_front(&Replica::DefaultErrorHandler, this)));
RETURN_ON_ERR(
cntx_.SwitchErrorHandler([this](const GenericError& ge) { this->DefaultErrorHandler(ge); }));
auto check_connection_error = [this, &cntx](const error_code& ec, const char* msg) -> error_code {
if (cntx_.IsCancelled()) {
@ -117,7 +118,7 @@ error_code Replica::Start(ConnectionContext* cntx) {
(*cntx)->SendOk();
return {};
} // namespace dfly
}
void Replica::EnableReplication(ConnectionContext* cntx) {
VLOG(1) << "Enabling replication";
@ -166,7 +167,7 @@ void Replica::MainReplicationFb() {
error_code ec;
while (state_mask_.load() & R_ENABLED) {
// Discard all previous errors and set default error handler.
cntx_.Reset(absl::bind_front(&Replica::DefaultErrorHandler, this));
cntx_.Reset([this](const GenericError& ge) { this->DefaultErrorHandler(ge); });
// 1. Connect socket.
if ((state_mask_.load() & R_TCP_CONNECTED) == 0) {
ThisFiber::SleepFor(500ms);
@ -639,10 +640,6 @@ void Replica::SetShardStates(bool replica) {
shard_set->RunBriefInParallel([replica](EngineShard* shard) { shard->SetReplica(replica); });
}
void Replica::DefaultErrorHandler(const GenericError& err) {
CloseSocket();
}
error_code Replica::SendNextPhaseRequest(string_view kind) {
// Ask master to start sending replication stream
string request = StrCat("DFLY ", kind, " ", master_context_.dfly_session_id);

View file

@ -118,8 +118,6 @@ class Replica : ProtocolClient {
// Send DFLY ${kind} to the master instance.
std::error_code SendNextPhaseRequest(std::string_view kind);
void DefaultErrorHandler(const GenericError& err);
private: /* Utility */
struct PSyncResponse {
// string - end of sync token (diskless)

View file

@ -408,7 +408,9 @@ async def test_rotating_masters(
@pytest.mark.asyncio
@pytest.mark.slow
async def test_cancel_replication_immediately(df_local_factory, df_seeder_factory):
async def test_cancel_replication_immediately(
df_local_factory, df_seeder_factory: DflySeederFactory
):
"""
Issue 100 replication commands. This checks that the replication state
machine can handle cancellation well.
@ -423,7 +425,8 @@ async def test_cancel_replication_immediately(df_local_factory, df_seeder_factor
df_local_factory.start_all([replica, master])
seeder = df_seeder_factory.create(port=master.port)
c_replica = aioredis.Redis(port=replica.port)
c_replica = aioredis.Redis(port=replica.port, socket_timeout=20)
await seeder.run(target_deviation=0.1)
replication_commands = []
@ -437,14 +440,20 @@ async def test_cancel_replication_immediately(df_local_factory, df_seeder_factor
return False
for i in range(COMMANDS_TO_ISSUE):
replication_commands.append(replicate())
results = await asyncio.gather(*replication_commands)
num_successes = sum(results)
replication_commands.append(asyncio.create_task(replicate()))
num_successes = 0
for result in asyncio.as_completed(replication_commands, timeout=30):
r = await result
num_successes += r
logging.info(f"succeses: {num_successes}")
assert COMMANDS_TO_ISSUE > num_successes, "At least one REPLICAOF must be cancelled"
await wait_available_async(c_replica)
capture = await seeder.capture()
logging.info(f"number of items captured {len(capture)}")
assert await seeder.compare(capture, replica.port)
await c_replica.close()
"""