mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
fix: Fix squashing, pytest arg formatting (#1712)
Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
4fbd0e38dd
commit
c65b9cf63d
5 changed files with 16 additions and 8 deletions
|
@ -36,7 +36,8 @@ MultiCommandSquasher::MultiCommandSquasher(absl::Span<StoredCmd> cmds, Connectio
|
|||
: cmds_{cmds}, cntx_{cntx}, service_{service}, base_cid_{nullptr},
|
||||
verify_commands_{verify_commands}, error_abort_{error_abort} {
|
||||
auto mode = cntx->transaction->GetMultiMode();
|
||||
base_cid_ = mode == Transaction::NON_ATOMIC ? nullptr : cntx->transaction->GetCId();
|
||||
base_cid_ = cntx->transaction->GetCId();
|
||||
atomic_ = mode != Transaction::NON_ATOMIC;
|
||||
}
|
||||
|
||||
MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(ShardId sid) {
|
||||
|
@ -49,7 +50,7 @@ MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(Shar
|
|||
if (IsAtomic()) {
|
||||
sinfo.local_tx = new Transaction{cntx_->transaction};
|
||||
} else {
|
||||
sinfo.local_tx = new Transaction{cntx_->transaction->GetCId()};
|
||||
sinfo.local_tx = new Transaction{base_cid_};
|
||||
sinfo.local_tx->StartMultiNonAtomic();
|
||||
}
|
||||
}
|
||||
|
@ -174,7 +175,7 @@ bool MultiCommandSquasher::ExecuteSquashed() {
|
|||
DCHECK(!cntx_->conn_state.exec_info.IsCollecting());
|
||||
|
||||
if (order_.empty())
|
||||
return false;
|
||||
return true;
|
||||
|
||||
for (auto& sd : sharded_)
|
||||
sd.replies.reserve(sd.cmds.size());
|
||||
|
@ -254,7 +255,7 @@ void MultiCommandSquasher::Run() {
|
|||
}
|
||||
|
||||
bool MultiCommandSquasher::IsAtomic() const {
|
||||
return base_cid_ != nullptr;
|
||||
return atomic_;
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -74,8 +74,8 @@ class MultiCommandSquasher {
|
|||
ConnectionContext* cntx_; // Underlying context
|
||||
Service* service_;
|
||||
|
||||
// underlying cid (exec or eval) for executing batch hops, nullptr for non-atomic
|
||||
const CommandId* base_cid_;
|
||||
bool atomic_; // Wheter working in any of the atomic modes
|
||||
const CommandId* base_cid_; // underlying cid (exec or eval) for executing batch hops
|
||||
|
||||
bool verify_commands_ = false; // Whether commands need to be verified before execution
|
||||
bool error_abort_ = false; // Abort upon receiving error
|
||||
|
|
|
@ -272,7 +272,7 @@ def with_tls_server_args(tmp_dir, gen_ca_cert):
|
|||
tls_server_cert,
|
||||
)
|
||||
|
||||
args = {"tls": "", "tls_key_file": tls_server_key, "tls_cert_file": tls_server_cert}
|
||||
args = {"tls": None, "tls_key_file": tls_server_key, "tls_cert_file": tls_server_cert}
|
||||
return args
|
||||
|
||||
|
||||
|
|
|
@ -507,3 +507,10 @@ async def test_squashed_pipeline(async_client: aioredis.Redis):
|
|||
assert res[0:10] == [j + 1] * 10
|
||||
assert isinstance(res[10], aioredis.ResponseError)
|
||||
res = res[11:]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
@dfly_args({"proactor_threads": "4", "pipeline_squash": 10})
|
||||
async def test_squashed_pipeline_seeder(df_server, df_seeder_factory):
|
||||
seeder = df_seeder_factory.create(port=df_server.port, keys=10_000)
|
||||
await seeder.run(target_deviation=0.1)
|
||||
|
|
|
@ -132,7 +132,7 @@ class TestDflyAutoLoadSnapshot(SnapshotTestBase):
|
|||
async def test_snapshot(self, df_local_factory, save_type, dbfilename):
|
||||
df_args = {"dbfilename": dbfilename, **BASIC_ARGS, "port": 1111}
|
||||
if save_type == "rdb":
|
||||
df_args["nodf_snapshot_format"] = ""
|
||||
df_args["nodf_snapshot_format"] = None
|
||||
df_server = df_local_factory.create(**df_args)
|
||||
df_server.start()
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue