chore: further extend the compression analysis (#5065)

Allow export/import of huffman tables via
`DEBUG COMPRESSION EXPORT` or `DEBUG COMPRESSION IMPORT <bintable>`

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2025-05-06 13:15:54 +03:00 committed by GitHub
parent 4d07d7d053
commit b3e0bcfb31
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 92 additions and 24 deletions

View file

@ -1524,13 +1524,13 @@ std::string_view ObjTypeToString(CompactObjType type) {
return "Invalid type"sv;
}
std::optional<CompactObjType> ObjTypeFromString(std::string_view sv) {
CompactObjType ObjTypeFromString(std::string_view sv) {
for (auto& p : kObjTypeToString) {
if (absl::EqualsIgnoreCase(sv, p.second)) {
return p.first;
}
}
return std::nullopt;
return kInvalidCompactObjType;
}
} // namespace dfly

View file

@ -543,7 +543,8 @@ inline bool CompactObj::operator==(std::string_view sv) const {
std::string_view ObjTypeToString(CompactObjType type);
std::optional<CompactObjType> ObjTypeFromString(std::string_view sv);
// Returns kInvalidCompactObjType if sv is not a valid type.
CompactObjType ObjTypeFromString(std::string_view sv);
namespace detail {

View file

@ -299,8 +299,8 @@ OpResult<ScanOpts> ScanOpts::TryFrom(CmdArgList args) {
if (pattern != "*")
scan_opts.matcher.reset(new GlobMatcher{pattern, true});
} else if (opt == "TYPE") {
auto obj_type = ObjTypeFromString(ArgS(args, i + 1));
if (!obj_type) {
CompactObjType obj_type = ObjTypeFromString(ArgS(args, i + 1));
if (obj_type == kInvalidCompactObjType) {
return facade::OpStatus::SYNTAX_ERR;
}
scan_opts.type_filter = obj_type;

View file

@ -279,7 +279,7 @@ struct HufHist {
}
};
void DoComputeHist(optional<CompactObjType> type, EngineShard* shard, ConnectionContext* cntx,
void DoComputeHist(CompactObjType type, EngineShard* shard, ConnectionContext* cntx,
HufHist* dest) {
auto& db_slice = cntx->ns->GetDbSlice(shard->shard_id());
DbTable* dbt = db_slice.GetDBTable(cntx->db_index());
@ -294,11 +294,11 @@ void DoComputeHist(optional<CompactObjType> type, EngineShard* shard, Connection
do {
cursor = table.Traverse(cursor, [&](PrimeIterator it) {
scratch.clear();
if (!type) {
if (type == kInvalidCompactObjType) { // KEYSPACE
it->first.GetString(&scratch);
} else if (*type == OBJ_STRING && it->second.ObjType() == OBJ_STRING) {
} else if (type == OBJ_STRING && it->second.ObjType() == OBJ_STRING) {
it->second.GetString(&scratch);
} else if (*type == OBJ_ZSET && it->second.ObjType() == OBJ_ZSET) {
} else if (type == OBJ_ZSET && it->second.ObjType() == OBJ_ZSET) {
container_utils::IterateSortedSet(
it->second.GetRobjWrapper(), [&](container_utils::ContainerEntry entry, double) {
if (entry.value) {
@ -306,14 +306,14 @@ void DoComputeHist(optional<CompactObjType> type, EngineShard* shard, Connection
}
return true;
});
} else if (*type == OBJ_LIST && it->second.ObjType() == OBJ_LIST) {
} else if (type == OBJ_LIST && it->second.ObjType() == OBJ_LIST) {
container_utils::IterateList(it->second, [&](container_utils::ContainerEntry entry) {
if (entry.value) {
HIST_add(dest->hist.data(), entry.value, entry.length);
}
return true;
});
} else if (*type == OBJ_HASH && it->second.ObjType() == OBJ_HASH) {
} else if (type == OBJ_HASH && it->second.ObjType() == OBJ_HASH) {
container_utils::IterateMap(it->second, [&](container_utils::ContainerEntry key,
container_utils::ContainerEntry value) {
if (key.value) {
@ -596,9 +596,11 @@ void DebugCmd::Run(CmdArgList args, facade::SinkReplyBuilder* builder) {
" traffic logging is stopped.",
"RECVSIZE [<tid> | ENABLE | DISABLE]",
" Prints the histogram of the received request sizes on the given thread",
"COMPRESSION [type]"
"COMPRESSION [IMPORT <bintable> | EXPORT] [type]",
" Estimate the compressibility of values of the given type. if no type is given, ",
" checks compressibility of keys",
" checks compressibility of keys. If IN is specified, then the provided ",
" bintable is used to check compressibility. If OUT is specified, then ",
" the serialized table is printed as well",
"IOSTATS [PS]",
" Prints IO stats per thread. If PS is specified, prints thread-level stats ",
" per second.",
@ -1281,14 +1283,29 @@ void DebugCmd::Keys(CmdArgList args, facade::SinkReplyBuilder* builder) {
}
void DebugCmd::Compression(CmdArgList args, facade::SinkReplyBuilder* builder) {
optional<CompactObjType> type;
if (args.size() > 0) {
string_view type_str = ArgS(args, 0);
CompactObjType type = kInvalidCompactObjType;
CmdArgParser parser(args);
string bintable;
bool print_bintable = false;
if (parser.Check("EXPORT")) {
print_bintable = true;
} else {
parser.Check("IMPORT", &bintable);
}
if (parser.HasNext()) {
string_view type_str = parser.Next();
type = ObjTypeFromString(type_str);
if (!type) {
if (type == kInvalidCompactObjType) {
return builder->SendError(kSyntaxErr);
}
}
if (parser.HasError()) {
return builder->SendError(parser.Error()->MakeReply());
}
auto* rb = static_cast<RedisReplyBuilder*>(builder);
fb2::Mutex mu;
@ -1300,26 +1317,72 @@ void DebugCmd::Compression(CmdArgList args, facade::SinkReplyBuilder* builder) {
hist.Merge(local);
});
HUF_CREATE_STATIC_CTABLE(huf_ctable, HufHist::kMaxSymbol);
size_t num_bits = 0, compressed_size = 0, raw_size = 0;
unsigned table_max_symbol = 255;
if (hist.max_symbol) {
HUF_CREATE_STATIC_CTABLE(huf_ctable, HufHist::kMaxSymbol);
unique_ptr<uint32_t[]> wrkspace(new uint32_t[HUF_CTABLE_WORKSPACE_SIZE_U32]);
constexpr size_t kWspSize = HUF_CTABLE_WORKSPACE_SIZE;
num_bits = HUF_buildCTable_wksp(huf_ctable, hist.hist.data(), hist.max_symbol, 0,
wrkspace.get(), kWspSize);
compressed_size = HUF_estimateCompressedSize(huf_ctable, hist.hist.data(), hist.max_symbol);
if (bintable.empty()) {
table_max_symbol = hist.max_symbol;
num_bits = HUF_buildCTable_wksp(huf_ctable, hist.hist.data(), table_max_symbol, 0,
wrkspace.get(), kWspSize);
if (HUF_isError(num_bits)) {
return rb->SendError(StrCat("Internal error: ", HUF_getErrorName(num_bits)));
}
} else {
// Try to read the bintable and create a ctable from it.
unsigned has_zero_weights = 1;
size_t read_size = HUF_readCTable(huf_ctable, &table_max_symbol, bintable.data(),
bintable.size(), &has_zero_weights);
if (HUF_isError(read_size)) {
return rb->SendError(StrCat("Internal error: ", HUF_getErrorName(read_size)));
}
if (read_size != bintable.size()) {
return rb->SendError("Invalid bintable");
}
}
compressed_size = HUF_estimateCompressedSize(huf_ctable, hist.hist.data(), table_max_symbol);
for (unsigned i = table_max_symbol + 1; i <= hist.max_symbol; i++) {
compressed_size += hist.hist[i];
}
raw_size = 0;
for (unsigned i = 0; i < hist.max_symbol; i++) {
for (unsigned i = 0; i <= hist.max_symbol; i++) {
raw_size += hist.hist[i];
}
if (print_bintable) {
// Reverse engineered: (maxSymbolValue + 1) / 2 + 1.
constexpr unsigned kMaxTableSize = 130;
bintable.resize(kMaxTableSize);
// Seems we can reuse the same workspace, its capacity is enough.
size_t res = HUF_writeCTable_wksp(bintable.data(), kMaxTableSize, huf_ctable,
table_max_symbol, num_bits, wrkspace.get(), kWspSize);
if (HUF_isError(res)) {
return rb->SendError(StrCat("Internal error: ", HUF_getErrorName(res)));
}
bintable.resize(res);
} else {
bintable.clear();
}
}
rb->StartCollection(5, RedisReplyBuilder::CollectionType::MAP);
unsigned map_len = print_bintable ? 7 : 6;
rb->StartCollection(map_len, RedisReplyBuilder::CollectionType::MAP);
rb->SendSimpleString("max_symbol");
rb->SendLong(hist.max_symbol);
// in case we load a bintable, table_max_symbol may be different from max_symbol.
// if it's smaller, it means our table can not encode all symbols.
rb->SendSimpleString("table_max_symbol");
rb->SendLong(table_max_symbol);
rb->SendSimpleString("max_bits");
rb->SendLong(num_bits);
rb->SendSimpleString("raw_size");
@ -1329,6 +1392,10 @@ void DebugCmd::Compression(CmdArgList args, facade::SinkReplyBuilder* builder) {
rb->SendSimpleString("ratio");
double ratio = raw_size > 0 ? static_cast<double>(compressed_size) / raw_size : 0;
rb->SendDouble(ratio);
if (print_bintable) {
rb->SendSimpleString("bintable");
rb->SendBulkString(bintable);
}
}
void DebugCmd::IOStats(CmdArgList args, facade::SinkReplyBuilder* builder) {