fix(server): use compression for non big values (#4331)

* fix server: use compression for non big values
---------

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2024-12-18 22:03:45 +02:00 committed by GitHub
parent 904d21d666
commit e462fc0401
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
13 changed files with 92 additions and 109 deletions

View file

@ -386,13 +386,26 @@ int DragonflyRandstrCommand(lua_State* state) {
lua_remove(state, 1);
std::string buf(dsize, ' ');
auto push_str = [dsize, state, &buf]() {
static const char alphanum[] =
"0123456789"
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
"abcdefghijklmnopqrstuvwxyz";
for (int i = 0; i < dsize; ++i)
buf[i] = alphanum[rand() % (sizeof(alphanum) - 1)];
static const char pattern[] = "DRAGONFLY";
constexpr int pattern_len = sizeof(pattern) - 1;
constexpr int pattern_interval = 53;
for (int i = 0; i < dsize; ++i) {
if (i % pattern_interval == 0 && i + pattern_len <= dsize) {
// Insert the repeating pattern for better compression of random string.
buf.replace(i, pattern_len, pattern, pattern_len);
i += pattern_len - 1; // Adjust index to skip the pattern
} else {
// Fill the rest with semi-random characters for variation
buf[i] = alphanum[rand() % (sizeof(alphanum) - 1)];
}
}
lua_pushlstring(state, buf.c_str(), buf.length());
};

View file

@ -162,15 +162,7 @@ std::string AbslUnparseFlag(dfly::CompressionMode flag) {
}
dfly::CompressionMode GetDefaultCompressionMode() {
const auto flag = absl::GetFlag(FLAGS_compression_mode);
if (ServerState::tlocal()->serialization_max_chunk_size == 0) {
return flag;
}
static bool once = flag != dfly::CompressionMode::NONE;
LOG_IF(WARNING, once) << "Setting CompressionMode to NONE because big value serialization is on";
once = false;
return dfly::CompressionMode::NONE;
return absl::GetFlag(FLAGS_compression_mode);
}
uint8_t RdbObjectType(const PrimeValue& pv) {
@ -944,6 +936,7 @@ io::Bytes SerializerBase::PrepareFlush(SerializerBase::FlushState flush_state) {
return mem_buf_.InputBuffer();
bool is_last_chunk = flush_state == FlushState::kFlushEndEntry;
VLOG(2) << "PrepareFlush:" << is_last_chunk << " " << number_of_chunks_;
if (is_last_chunk && number_of_chunks_ == 0) {
if (compression_mode_ == CompressionMode::MULTI_ENTRY_ZSTD ||
compression_mode_ == CompressionMode::MULTI_ENTRY_LZ4) {
@ -1603,6 +1596,7 @@ void SerializerBase::CompressBlob() {
compression_stats_.emplace(CompressionStats{});
}
Bytes blob_to_compress = mem_buf_.InputBuffer();
VLOG(2) << "CompressBlob size " << blob_to_compress.size();
size_t blob_size = blob_to_compress.size();
if (blob_size < kMinStrSizeToCompress) {
++compression_stats_->small_str_count;
@ -1644,6 +1638,8 @@ void SerializerBase::CompressBlob() {
memcpy(dest.data(), compressed_blob.data(), compressed_blob.length());
mem_buf_.CommitWrite(compressed_blob.length());
++compression_stats_->compressed_blobs;
auto& stats = ServerState::tlocal()->stats;
++stats.compressed_blobs;
}
size_t RdbSerializer::GetTempBufferSize() const {

View file

@ -172,6 +172,10 @@ TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) {
RespExpr resp = Run({"save", "df"});
ASSERT_EQ(resp, "OK");
if (mode == CompressionMode::MULTI_ENTRY_ZSTD || mode == CompressionMode::MULTI_ENTRY_LZ4) {
EXPECT_GE(GetMetrics().coordinator_stats.compressed_blobs, 1);
}
auto save_info = service_->server_family().GetLastSaveInfo();
resp = Run({"dfly", "load", save_info.file_name});
ASSERT_EQ(resp, "OK");

View file

@ -2374,6 +2374,7 @@ void ServerFamily::Info(CmdArgList args, const CommandContext& cmd_cntx) {
append("rdb_save_usec", m.coordinator_stats.rdb_save_usec);
append("rdb_save_count", m.coordinator_stats.rdb_save_count);
append("big_value_preemptions", m.coordinator_stats.big_value_preemptions);
append("compressed_blobs", m.coordinator_stats.compressed_blobs);
append("instantaneous_input_kbps", -1);
append("instantaneous_output_kbps", -1);
append("rejected_connections", -1);

View file

@ -27,7 +27,7 @@ ServerState::Stats::Stats(unsigned num_shards) : tx_width_freq_arr(num_shards) {
}
ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) {
static_assert(sizeof(Stats) == 18 * 8, "Stats size mismatch");
static_assert(sizeof(Stats) == 19 * 8, "Stats size mismatch");
#define ADD(x) this->x += (other.x)
@ -51,6 +51,7 @@ ServerState::Stats& ServerState::Stats::Add(const ServerState::Stats& other) {
ADD(rdb_save_count);
ADD(big_value_preemptions);
ADD(compressed_blobs);
ADD(oom_error_cmd_cnt);

View file

@ -123,6 +123,7 @@ class ServerState { // public struct - to allow initialization.
uint64_t rdb_save_count = 0;
uint64_t big_value_preemptions = 0;
uint64_t compressed_blobs = 0;
// Number of times we rejected command dispatch due to OOM condition.
uint64_t oom_error_cmd_cnt = 0;

View file

@ -364,7 +364,7 @@ bool SliceSnapshot::PushSerialized(bool force) {
return false;
// Flush any of the leftovers to avoid interleavings
size_t serialized = FlushSerialized(FlushState::kFlushMidEntry);
size_t serialized = FlushSerialized(FlushState::kFlushEndEntry);
if (!delayed_entries_.empty()) {
// Async bucket serialization might have accumulated some delayed values.
@ -377,7 +377,7 @@ bool SliceSnapshot::PushSerialized(bool force) {
} while (!delayed_entries_.empty());
// blocking point.
serialized += FlushSerialized(FlushState::kFlushMidEntry);
serialized += FlushSerialized(FlushState::kFlushEndEntry);
}
return serialized > 0;
}

View file

@ -416,7 +416,7 @@ class DflyInstanceFactory:
args.setdefault("noversion_check", None)
# MacOs does not set it automatically, so we need to set it manually
args.setdefault("maxmemory", "8G")
vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1,cluster_family=1,proactor_pool=1,dflycmd=1"
vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1,cluster_family=1,proactor_pool=1,dflycmd=1,snapshot=1"
args.setdefault("vmodule", vmod)
args.setdefault("jsonpathv2")
@ -426,7 +426,7 @@ class DflyInstanceFactory:
args.setdefault("log_dir", self.params.log_dir)
if version >= 1.21 and "serialization_max_chunk_size" not in args:
args.setdefault("serialization_max_chunk_size", 16384)
args.setdefault("serialization_max_chunk_size", 300000)
for k, v in args.items():
args[k] = v.format(**self.params.env) if isinstance(v, str) else v

View file

@ -49,14 +49,16 @@ Test full replication pipeline. Test full sync with streaming changes and stable
# Quick general test that replication is working
(1, 3 * [1], dict(key_target=1_000), 500),
# A lot of huge values
(2, 2 * [1], dict(key_target=1_000, huge_value_percentage=2), 500),
(2, 2 * [1], dict(key_target=1_000, huge_value_target=30), 500),
(4, [4, 4], dict(key_target=10_000), 1_000),
pytest.param(6, [6, 6, 6], dict(key_target=100_000), 20_000, marks=M_OPT),
# Skewed tests with different thread ratio
pytest.param(8, 6 * [1], dict(key_target=5_000), 2_000, marks=M_SLOW),
pytest.param(2, [8, 8], dict(key_target=10_000), 2_000, marks=M_SLOW),
# Everything is big because data size is 10k
pytest.param(2, [2], dict(key_target=1_000, data_size=10_000), 100, marks=M_SLOW),
pytest.param(
2, [2], dict(key_target=1_000, data_size=10_000, huge_value_target=0), 100, marks=M_SLOW
),
# Stress test
pytest.param(8, [8, 8], dict(key_target=1_000_000, units=16), 50_000, marks=M_STRESS),
],
@ -128,22 +130,22 @@ async def test_replication_all(
info = await c_master.info()
preemptions = info["big_value_preemptions"]
key_target = seeder_config["key_target"]
# Rough estimate
estimated_preemptions = key_target * (0.01)
assert preemptions > estimated_preemptions
total_buckets = info["num_buckets"]
compressed_blobs = info["compressed_blobs"]
logging.debug(
f"Compressed blobs {compressed_blobs} .Buckets {total_buckets}. Preemptions {preemptions}"
)
assert preemptions >= seeder.huge_value_target * 0.5
assert compressed_blobs > 0
# Because data size could be 10k and for that case there will be almost a preemption
# per bucket.
if "data_size" not in seeder_config.keys():
total_buckets = info["num_buckets"]
if seeder.data_size < 1000:
# We care that we preempt less times than the total buckets such that we can be
# sure that we test both flows (with and without preemptions). Preemptions on 30%
# of buckets seems like a big number but that depends on a few parameters like
# the size of the hug value and the serialization max chunk size. For the test cases here,
# it's usually close to 10% but there are some that are close to 30.
total_buckets = info["num_buckets"]
logging.debug(f"Buckets {total_buckets}. Preemptions {preemptions}")
assert preemptions <= (total_buckets * 0.3)
@ -2282,7 +2284,7 @@ async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory,
c_master = master.client()
c_replica = replica.client()
await c_master.execute_command("debug", "populate", "200000", "foo", "5000")
await c_master.execute_command("debug", "populate", "200000", "foo", "5000", "RAND")
seeder = df_seeder_factory.create(port=master.port)
seeder_task = asyncio.create_task(seeder.run())
@ -2310,6 +2312,7 @@ async def test_replication_timeout_on_full_sync(df_factory: DflyInstanceFactory,
await assert_replica_reconnections(replica, 0)
@dfly_args({"proactor_threads": 1})
async def test_master_stalled_disconnect(df_factory: DflyInstanceFactory):
# disconnect after 1 second of being blocked
master = df_factory.create(replication_timeout=1000)
@ -2320,7 +2323,7 @@ async def test_master_stalled_disconnect(df_factory: DflyInstanceFactory):
c_master = master.client()
c_replica = replica.client()
await c_master.execute_command("debug", "populate", "200000", "foo", "500")
await c_master.execute_command("debug", "populate", "200000", "foo", "500", "RAND")
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
@assert_eventually
@ -2624,7 +2627,7 @@ async def test_replication_timeout_on_full_sync_heartbeat_expiry(
c_master = master.client()
c_replica = replica.client()
await c_master.execute_command("debug", "populate", "100000", "foo", "5000")
await c_master.execute_command("debug", "populate", "100000", "foo", "5000", "RAND")
c_master = master.client()
c_replica = replica.client()

View file

@ -138,10 +138,8 @@ class Seeder(SeederBase):
data_size=100,
collection_size=None,
types: typing.Optional[typing.List[str]] = None,
huge_value_percentage=0,
huge_value_size=10000,
# 2 huge entries per container/key as default
huge_value_csize=2,
huge_value_target=5,
huge_value_size=100000,
):
SeederBase.__init__(self, types)
self.key_target = key_target
@ -151,9 +149,8 @@ class Seeder(SeederBase):
else:
self.collection_size = collection_size
self.huge_value_percentage = huge_value_percentage
self.huge_value_target = huge_value_target
self.huge_value_size = huge_value_size
self.huge_value_csize = huge_value_csize
self.units = [
Seeder.Unit(
@ -175,9 +172,8 @@ class Seeder(SeederBase):
target_deviation if target_deviation is not None else -1,
self.data_size,
self.collection_size,
self.huge_value_percentage,
self.huge_value_target / len(self.units),
self.huge_value_size,
self.huge_value_csize,
]
sha = await client.script_load(Seeder._load_script("generate"))
@ -211,11 +207,10 @@ class Seeder(SeederBase):
result = await client.evalsha(sha, 0, *args)
result = result.split()
unit.counter = int(result[0])
huge_keys = int(result[1])
huge_entries = int(result[2])
huge_entries = int(result[1])
msg = f"running unit {unit.prefix}/{unit.type} took {time.time() - s}, target {args[4+0]}"
if huge_keys > 0:
msg = f"{msg}. Total huge keys added {huge_keys} with {args[11]} elements each. Total huge entries {huge_entries}."
if huge_entries > 0:
msg = f"{msg}. Total huge entries {huge_entries} added."
logging.debug(msg)

View file

@ -19,46 +19,24 @@ local min_dev = tonumber(ARGV[7])
local data_size = tonumber(ARGV[8])
local collection_size = tonumber(ARGV[9])
-- Probability of each key in key_target to be a big value
local huge_value_percentage = tonumber(ARGV[10])
local huge_value_target = tonumber(ARGV[10])
local huge_value_size = tonumber(ARGV[11])
local huge_value_csize = tonumber(ARGV[12])
-- collect all keys belonging to this script
-- assumes exclusive ownership
local keys = LU_collect_keys(prefix, type)
LG_funcs.init(data_size, collection_size, huge_value_percentage, huge_value_size, huge_value_csize)
LG_funcs.init(data_size, collection_size, huge_value_target, huge_value_size)
local addfunc = LG_funcs['add_' .. string.lower(type)]
local modfunc = LG_funcs['mod_' .. string.lower(type)]
local huge_entries = LG_funcs["get_huge_entries"]
local huge_keys = 0
local function huge_entry()
local ratio = LG_funcs.huge_value_percentage / 100
-- [0, 1]
local rand = math.random()
local huge_entry = (ratio > rand)
return huge_entry
end
local function action_add()
local key = prefix .. tostring(key_counter)
local op_type = string.lower(type)
local is_huge = false
-- `string` and `json` huge entries are not supported so
-- we don't roll a dice to decide if they are huge or not
if op_type ~= "string" and op_type ~= "json" then
is_huge = huge_entry()
end
key_counter = key_counter + 1
if is_huge then
huge_keys = huge_keys + 1
end
table.insert(keys, key)
keys[key] = is_huge
addfunc(key, keys)
end
@ -149,4 +127,4 @@ if stop_key ~= '' then
redis.call('DEL', stop_key)
end
return tostring(key_counter) .. " " .. tostring(huge_keys) .. " " .. tostring(huge_entries())
return tostring(key_counter) .. " " .. tostring(huge_entries())

View file

@ -1,32 +1,42 @@
local LG_funcs = {}
function LG_funcs.init(dsize, csize, large_val_perc, large_val_sz, huge_value_csize)
function LG_funcs.init(dsize, csize, large_val_count, large_val_sz)
LG_funcs.dsize = dsize
LG_funcs.csize = csize
LG_funcs.esize = math.ceil(dsize / csize)
LG_funcs.huge_value_percentage = large_val_perc
LG_funcs.huge_value_target = large_val_count
LG_funcs.huge_value_size = large_val_sz
LG_funcs.huge_value_csize = huge_value_csize
end
local huge_entries = 0
local function randstr(huge_entry)
local str
if huge_entry then
str = dragonfly.randstr(LG_funcs.huge_value_size)
local function is_huge_entry()
if huge_entries >= LG_funcs.huge_value_target then
return false
else
huge_entries = huge_entries + 1
return true
end
end
local function randstr()
local str
local is_huge = is_huge_entry()
if is_huge then
str = dragonfly.randstr(LG_funcs.huge_value_size)
else
str = dragonfly.randstr(LG_funcs.esize)
end
return str
end
local function randstr_sequence(huge_entry)
local function randstr_sequence()
local strs
if huge_entry then
strs = dragonfly.randstr(LG_funcs.huge_value_size, LG_funcs.huge_value_csize)
huge_entries = huge_entries + 1
local is_huge = is_huge_entry()
if is_huge then
strs = dragonfly.randstr(LG_funcs.huge_value_size, LG_funcs.csize)
else
strs = dragonfly.randstr(LG_funcs.esize, LG_funcs.csize)
end
@ -55,8 +65,7 @@ end
-- store list of random blobs of default container/element sizes
function LG_funcs.add_list(key, keys)
local is_huge = keys[key]
redis.apcall('LPUSH', key, unpack(randstr_sequence(is_huge)))
redis.apcall('LPUSH', key, unpack(randstr_sequence()))
end
function LG_funcs.mod_list(key, keys)
@ -68,9 +77,9 @@ function LG_funcs.mod_list(key, keys)
elseif action == 2 then
redis.apcall('LPOP', key)
elseif action == 3 then
redis.apcall('LPUSH', key, randstr(false))
redis.apcall('LPUSH', key, randstr())
else
redis.apcall('RPUSH', key, randstr(false))
redis.apcall('RPUSH', key, randstr())
end
end
@ -89,8 +98,7 @@ function LG_funcs.add_set(key, keys)
end
redis.apcall('SDIFFSTORE', key, keys[i1], keys[i2])
else
local is_huge = keys[key]
redis.apcall('SADD', key, unpack(randstr_sequence(is_huge)))
redis.apcall('SADD', key, unpack(randstr_sequence()))
end
end
@ -99,8 +107,7 @@ function LG_funcs.mod_set(key, keys)
if math.random() < 0.5 then
redis.apcall('SPOP', key)
else
local is_huge = keys[key]
redis.apcall('SADD', key, randstr(false))
redis.apcall('SADD', key, randstr())
end
end
@ -110,26 +117,15 @@ end
-- where `value` is a random string for even indices and a number for odd indices
function LG_funcs.add_hash(key, keys)
local blobs
local is_huge = keys[key]
local blobs = randstr_sequence()
local limit = LG_funcs.csize
if is_huge then
limit = LG_funcs.huge_value_csize
blobs = dragonfly.randstr(LG_funcs.huge_value_size, limit)
huge_entries = huge_entries + 1
else
blobs = dragonfly.randstr(LG_funcs.esize, LG_funcs.csize / 2)
end
local htable = {}
for i = 1, limit, 2 do
for i = 1, limit do
htable[i * 2 - 1] = tostring(i)
htable[i * 2] = math.random(0, 1000)
end
for i = 2, limit, 2 do
htable[i * 2 - 1] = tostring(i)
htable[i * 2] = blobs[i // 2]
htable[i * 2] = blobs[i]
end
redis.apcall('HSET', key, unpack(htable))
end
@ -138,7 +134,7 @@ function LG_funcs.mod_hash(key, keys)
if idx % 2 == 1 then
redis.apcall('HINCRBY', key, tostring(idx), 1)
else
redis.apcall('HSET', key, tostring(idx), randstr(false))
redis.apcall('HSET', key, tostring(idx), randstr())
end
end
@ -146,15 +142,10 @@ end
function LG_funcs.add_zset(key, keys)
-- TODO: We don't support ZDIFFSTORE
local is_huge = keys[key]
local blobs = randstr_sequence(is_huge)
local blobs = randstr_sequence()
local ztable = {}
local limit = LG_funcs.csize
if is_huge then
limit = LG_funcs.huge_value_csize
end
for i = 1, limit do
ztable[i * 2 - 1] = tostring(i)
@ -167,7 +158,7 @@ function LG_funcs.mod_zset(key, keys)
local action = math.random(1, 4)
if action <= 2 then
local size = LG_funcs.csize * 2
redis.apcall('ZADD', key, math.random(0, size), randstr(false))
redis.apcall('ZADD', key, math.random(0, size), randstr())
elseif action == 3 then
redis.apcall('ZPOPMAX', key)
else

View file

@ -36,7 +36,7 @@ async def test_static_collection_size(async_client: aioredis.Redis):
data_size=10_000,
collection_size=1,
types=["LIST"],
huge_value_percentage=0,
huge_value_target=0,
huge_value_size=0,
)
await s.run(async_client)