mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
bug(server): log evicted keys in journal in PrimeEvictionPolicy. (#2302)
fixes #2296 added a regression test that tests both policy based eviction as well as heart beat eviction. --------- Signed-off-by: Yue Li <61070669+theyueli@users.noreply.github.com>
This commit is contained in:
parent
9fe7d038b7
commit
8d09478474
3 changed files with 99 additions and 1 deletions
|
@ -249,6 +249,20 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT
|
||||||
}
|
}
|
||||||
|
|
||||||
DbTable* table = db_slice_->GetDBTable(cntx_.db_index);
|
DbTable* table = db_slice_->GetDBTable(cntx_.db_index);
|
||||||
|
auto& lt = table->trans_locks;
|
||||||
|
string tmp;
|
||||||
|
string_view key = last_slot_it->first.GetSlice(&tmp);
|
||||||
|
// do not evict locked keys
|
||||||
|
if (lt.find(KeyLockArgs::GetLockKey(key)) != lt.end())
|
||||||
|
return 0;
|
||||||
|
|
||||||
|
// log the evicted keys to journal.
|
||||||
|
if (auto journal = db_slice_->shard_owner()->journal(); journal) {
|
||||||
|
ArgSlice delete_args{key};
|
||||||
|
journal->RecordEntry(0, journal::Op::EXPIRED, cntx_.db_index, 1, ClusterConfig::KeySlot(key),
|
||||||
|
make_pair("DEL", delete_args), false);
|
||||||
|
}
|
||||||
|
|
||||||
db_slice_->PerformDeletion(last_slot_it, db_slice_->shard_owner(), table);
|
db_slice_->PerformDeletion(last_slot_it, db_slice_->shard_owner(), table);
|
||||||
++evicted_;
|
++evicted_;
|
||||||
}
|
}
|
||||||
|
@ -1260,7 +1274,7 @@ void DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t increase_goal_bytes
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
if (auto journal = owner_->journal(); journal) {
|
if (auto journal = owner_->journal(); journal) {
|
||||||
keys_to_journal.push_back(tmp);
|
keys_to_journal.push_back(string(key));
|
||||||
}
|
}
|
||||||
|
|
||||||
PerformDeletion(evict_it, shard_owner(), db_table.get());
|
PerformDeletion(evict_it, shard_owner(), db_table.get());
|
||||||
|
|
|
@ -781,6 +781,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
|
||||||
config_registry.RegisterMutable("replica_partial_sync");
|
config_registry.RegisterMutable("replica_partial_sync");
|
||||||
config_registry.RegisterMutable("max_eviction_per_heartbeat");
|
config_registry.RegisterMutable("max_eviction_per_heartbeat");
|
||||||
config_registry.RegisterMutable("max_segment_to_consider");
|
config_registry.RegisterMutable("max_segment_to_consider");
|
||||||
|
config_registry.RegisterMutable("enable_heartbeat_eviction");
|
||||||
|
|
||||||
uint32_t shard_num = GetFlag(FLAGS_num_shards);
|
uint32_t shard_num = GetFlag(FLAGS_num_shards);
|
||||||
if (shard_num == 0 || shard_num > pp_.size()) {
|
if (shard_num == 0 || shard_num > pp_.size()) {
|
||||||
|
|
|
@ -1836,3 +1836,86 @@ async def test_replicaof_reject_on_load(df_local_factory, df_seeder_factory):
|
||||||
await c_replica.close()
|
await c_replica.close()
|
||||||
master.stop()
|
master.stop()
|
||||||
replica.stop()
|
replica.stop()
|
||||||
|
|
||||||
|
|
||||||
|
# note: please be careful if you want to change any of the parameters used in this test.
|
||||||
|
# changing parameters without extensive testing may easily lead to weak testing case assertion
|
||||||
|
# which means eviction may not get triggered.
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
@pytest.mark.skip(reason="Failing due to bug in replication on command errors")
|
||||||
|
async def test_policy_based_eviction_propagation(df_local_factory, df_seeder_factory):
|
||||||
|
master = df_local_factory.create(
|
||||||
|
proactor_threads=1, cache_mode="true", maxmemory="256mb", enable_heartbeat_eviction="false"
|
||||||
|
)
|
||||||
|
replica = df_local_factory.create(proactor_threads=1)
|
||||||
|
df_local_factory.start_all([master, replica])
|
||||||
|
|
||||||
|
c_master = master.client()
|
||||||
|
c_replica = replica.client()
|
||||||
|
|
||||||
|
await c_master.execute_command("DEBUG POPULATE 6000 size 44000")
|
||||||
|
|
||||||
|
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
|
||||||
|
await wait_available_async(c_replica)
|
||||||
|
|
||||||
|
seeder = df_seeder_factory.create(
|
||||||
|
port=master.port,
|
||||||
|
keys=500,
|
||||||
|
val_size=200,
|
||||||
|
stop_on_failure=False,
|
||||||
|
unsupported_types=[
|
||||||
|
ValueType.JSON,
|
||||||
|
ValueType.LIST,
|
||||||
|
ValueType.SET,
|
||||||
|
ValueType.HSET,
|
||||||
|
ValueType.ZSET,
|
||||||
|
],
|
||||||
|
)
|
||||||
|
await seeder.run(target_deviation=0.1)
|
||||||
|
|
||||||
|
info = await c_master.info("stats")
|
||||||
|
assert info["evicted_keys"] > 0, "Weak testcase: policy based eviction was not triggered."
|
||||||
|
|
||||||
|
await check_all_replicas_finished([c_replica], c_master)
|
||||||
|
keys_master = await c_master.execute_command("keys *")
|
||||||
|
keys_replica = await c_replica.execute_command("keys *")
|
||||||
|
|
||||||
|
assert set(keys_master) == set(keys_replica)
|
||||||
|
await disconnect_clients(c_master, *[c_replica])
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_heartbeat_eviction_propagation(df_local_factory):
|
||||||
|
master = df_local_factory.create(
|
||||||
|
proactor_threads=1, cache_mode="true", maxmemory="256mb", enable_heartbeat_eviction="false"
|
||||||
|
)
|
||||||
|
replica = df_local_factory.create(proactor_threads=1)
|
||||||
|
df_local_factory.start_all([master, replica])
|
||||||
|
|
||||||
|
c_master = master.client()
|
||||||
|
c_replica = replica.client()
|
||||||
|
|
||||||
|
# fill the master to use about 233mb > 256mb * 0.9, which will trigger heartbeat eviction.
|
||||||
|
await c_master.execute_command("DEBUG POPULATE 233 size 1048576")
|
||||||
|
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
|
||||||
|
await wait_available_async(c_replica)
|
||||||
|
|
||||||
|
# now enable heart beat eviction
|
||||||
|
await c_master.execute_command("CONFIG SET enable_heartbeat_eviction true")
|
||||||
|
|
||||||
|
while True:
|
||||||
|
info = await c_master.info("stats")
|
||||||
|
evicted_1 = info["evicted_keys"]
|
||||||
|
time.sleep(2)
|
||||||
|
info = await c_master.info("stats")
|
||||||
|
evicted_2 = info["evicted_keys"]
|
||||||
|
if evicted_2 == evicted_1:
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
print("waiting for eviction to finish...", end="\r", flush=True)
|
||||||
|
|
||||||
|
await check_all_replicas_finished([c_replica], c_master)
|
||||||
|
keys_master = await c_master.execute_command("keys *")
|
||||||
|
keys_replica = await c_replica.execute_command("keys *")
|
||||||
|
assert set(keys_master) == set(keys_replica)
|
||||||
|
await disconnect_clients(c_master, *[c_replica])
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue