Implement list serialization

This commit is contained in:
Roman Gershman 2022-04-05 12:04:03 +03:00
parent 19583ca7f2
commit ba1314201c
4 changed files with 68 additions and 16 deletions

View file

@ -231,10 +231,10 @@ REDIS_STATIC int __quicklistCompressNode(quicklistNode *node) {
if (node->sz < MIN_COMPRESS_BYTES)
return 0;
quicklistLZF *lzf = zmalloc(sizeof(*lzf) + node->sz);
// TODO: roman - to move out of stack.
LZF_STATE sdata;
// ROMAN: we allocate LZF_STATE on heap, piggy-backing on the existing allocation.
char* uptr = zmalloc(sizeof(quicklistLZF) + node->sz + sizeof(LZF_STATE));
quicklistLZF *lzf = (quicklistLZF*)uptr;
LZF_HSLOT* sdata = (LZF_HSLOT*)(uptr + sizeof(quicklistLZF) + node->sz);
/* Cancel if compression fails or doesn't compress small enough */
if (((lzf->sz = lzf_compress(node->entry, node->sz, lzf->compressed,

View file

@ -81,8 +81,8 @@ typedef struct ServerStub {
int lfu_decay_time; /* LFU counter decay factor. */
/* should not be used. Use FLAGS_list_max_ziplist_size and FLAGS_list_compress_depth instead. */
int list_compress_depth;
int list_max_ziplist_size;
// int list_compress_depth;
// int list_max_ziplist_size;
unsigned long long maxmemory; /* Max number of memory bytes to use */
int maxmemory_policy; /* Policy for key eviction */

View file

@ -11,9 +11,9 @@
extern "C" {
#include "redis/intset.h"
#include "redis/listpack.h"
#include "redis/ziplist.h"
#include "redis/rdb.h"
#include "redis/util.h"
#include "redis/ziplist.h"
#include "redis/zmalloc.h"
}
@ -239,12 +239,56 @@ error_code RdbSerializer::SaveListObject(const robj* obj) {
RETURN_ON_ERR(SaveLen(ql->len));
while (node) {
if (quicklistNodeIsCompressed(node)) {
void* data;
size_t compress_len = quicklistGetLzf(node, &data);
RETURN_ON_ERR(SaveLzfBlob(Bytes{reinterpret_cast<uint8_t*>(data), compress_len}, node->sz));
DVLOG(2) << "QL node (encoding/container/sz): " << node->encoding << "/" << node->container
<< "/" << node->sz;
if (QL_NODE_IS_PLAIN(node)) {
if (quicklistNodeIsCompressed(node)) {
void* data;
size_t compress_len = quicklistGetLzf(node, &data);
RETURN_ON_ERR(SaveLzfBlob(Bytes{reinterpret_cast<uint8_t*>(data), compress_len}, node->sz));
} else {
RETURN_ON_ERR(SaveString(node->entry, node->sz));
}
} else {
RETURN_ON_ERR(SaveString(node->entry, node->sz));
// listpack
uint8_t* lp = node->entry;
uint8_t* decompressed = NULL;
if (quicklistNodeIsCompressed(node)) {
void* data;
size_t compress_len = quicklistGetLzf(node, &data);
decompressed = (uint8_t*)zmalloc(node->sz);
if (lzf_decompress(data, compress_len, decompressed, node->sz) == 0) {
/* Someone requested decompress, but we can't decompress. Not good. */
zfree(decompressed);
return make_error_code(errc::illegal_byte_sequence);
}
lp = decompressed;
}
// listpack, convert to ziplist first.
uint8_t* lpfield = lpFirst(lp);
int64_t entry_len;
uint8_t* entry;
uint8_t buf[32];
uint8_t* zl = ziplistNew();
while (lpfield) {
entry = lpGet(lpfield, &entry_len, buf);
zl = ziplistPush(zl, entry, entry_len, ZIPLIST_TAIL);
lpfield = lpNext(lp, lpfield);
}
size_t ziplen = ziplistBlobLen(zl);
auto cleanup = absl::MakeCleanup([=] {
zfree(zl);
if (decompressed)
zfree(decompressed);
});
RETURN_ON_ERR(SaveString(string_view{reinterpret_cast<char*>(zl), ziplen}));
}
node = node->next;
}

View file

@ -22,6 +22,9 @@ using namespace testing;
using namespace std;
using namespace util;
DECLARE_int32(list_compress_depth);
DECLARE_int32(list_max_listpack_size);
namespace dfly {
class RdbTest : public BaseFamilyTest {
@ -81,13 +84,18 @@ TEST_F(RdbTest, LoadSmall) {
}
TEST_F(RdbTest, Save) {
gflags::FlagSaver fs;
FLAGS_list_compress_depth = 1;
FLAGS_list_max_listpack_size = 1; // limit listpack to a single element.
Run({"set", "string_key", "val"});
Run({"sadd", "set_key1", "val1", "val2"});
Run({"sadd", "set_key2", "1", "2", "3"});
Run({"sadd", "intset_key", "1", "2", "3"});
Run({"hset", "small_hset", "field1", "val1", "field2", "val2"});
Run({"rpush", "list_key1", "val", "val2"});
Run({"rpush", "list_key2", "head", string(512, 'a'), string(512, 'b'), "tail"});
// Run({"rpush", "list_key", "val"}); // TODO: invalid encoding when reading by redis 6.
// Run({"rpush", "list_key", "val"});
Run({"hset", "hset_key", "field1", "val1", "field2", "val2"});
Run({"save"});
}