diff --git a/src/server/error.h b/src/server/error.h index a6dd48fa2..1c5f7d59b 100644 --- a/src/server/error.h +++ b/src/server/error.h @@ -22,8 +22,10 @@ using facade::kDbIndOutOfRangeErr; #define RETURN_ON_ERR(x) \ do { \ auto __ec = (x); \ - if (__ec) \ + if (__ec) { \ + VLOG(1) << "Error " << __ec << " while calling " #x; \ return __ec; \ + } \ } while (0) #endif // RETURN_ON_ERR diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 8f446428a..ac9c72fa1 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -157,11 +157,12 @@ int ziplistEntryConvertAndValidate(unsigned char* p, unsigned int head_count, vo * listpack and storing it at 'lp'. * The function is safe to call on non-validated ziplists, it returns 0 * when encounter an integrity validation issue. */ -int ziplistPairsConvertAndValidateIntegrity(unsigned char* zl, size_t size, unsigned char** lp) { +int ziplistPairsConvertAndValidateIntegrity(const uint8_t* zl, size_t size, unsigned char** lp) { /* Keep track of the field names to locate duplicate ones */ ZiplistCbArgs data = {0, NULL, lp}; - int ret = ziplistValidateIntegrity(zl, size, 1, ziplistPairsEntryConvertAndValidate, &data); + int ret = ziplistValidateIntegrity(const_cast(zl), size, 1, + ziplistPairsEntryConvertAndValidate, &data); /* make sure we have an even number of records. */ if (data.count & 1) @@ -174,6 +175,487 @@ int ziplistPairsConvertAndValidateIntegrity(unsigned char* zl, size_t size, unsi } // namespace +class RdbLoader::OpaqueObjLoader { + public: + OpaqueObjLoader(int rdb_type, PrimeValue* pv) : rdb_type_(rdb_type), pv_(pv) { + } + + void operator()(robj* o) { + pv_->ImportRObj(o); + } + + void operator()(long long val) { + pv_->SetInt(val); + } + + void operator()(const base::PODArray& str); + + void operator()(const LzfString& lzfstr); + void operator()(const unique_ptr& ptr); + + std::error_code ec() const { + return ec_; + } + + private: + void CreateSet(const LoadTrace* ltrace); + void CreateHMap(const LoadTrace* ltrace); + void CreateList(const LoadTrace* ltrace); + void CreateZSet(const LoadTrace* ltrace); + + void HandleBlob(string_view blob); + + sds ToSds(const RdbVariant& obj); + string_view ToSV(const RdbVariant& obj); + + std::error_code ec_; + int rdb_type_; + base::PODArray tset_blob_; + PrimeValue* pv_; +}; + +void RdbLoader::OpaqueObjLoader::operator()(const base::PODArray& str) { + string_view sv(str.data(), str.size()); + HandleBlob(sv); +} + +void RdbLoader::OpaqueObjLoader::operator()(const LzfString& lzfstr) { + string tmp(lzfstr.uncompressed_len, '\0'); + if (lzf_decompress(lzfstr.compressed_blob.data(), lzfstr.compressed_blob.size(), tmp.data(), + tmp.size()) == 0) { + LOG(ERROR) << "Invalid LZF compressed string"; + ec_ = RdbError(errc::rdb_file_corrupted); + return; + } + HandleBlob(tmp); +} + +void RdbLoader::OpaqueObjLoader::operator()(const unique_ptr& ptr) { + switch (rdb_type_) { + case RDB_TYPE_SET: + CreateSet(ptr.get()); + break; + case RDB_TYPE_HASH: + CreateHMap(ptr.get()); + break; + case RDB_TYPE_LIST_QUICKLIST: + case RDB_TYPE_LIST_QUICKLIST_2: + CreateList(ptr.get()); + break; + case RDB_TYPE_ZSET: + case RDB_TYPE_ZSET_2: + CreateZSet(ptr.get()); + break; + + default: + LOG(FATAL) << "Unsupported rdb type " << rdb_type_; + } +} + +void RdbLoader::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) { + size_t len = ltrace->arr.size(); + + bool is_intset = true; + if (len <= SetFamily::MaxIntsetEntries()) { + for (size_t i = 0; i < len; i++) { + if (!holds_alternative(ltrace->arr[i].rdb_var)) { + is_intset = false; + break; + } + } + } else { + /* Use a regular set when there are too many entries. */ + + is_intset = false; + } + + robj* res = nullptr; + sds sdsele = nullptr; + + auto cleanup = absl::MakeCleanup([&] { + if (sdsele) + sdsfree(sdsele); + decrRefCount(res); + }); + + if (is_intset) { + res = createIntsetObject(); + long long llval; + for (size_t i = 0; i < len; i++) { + llval = get(ltrace->arr[i].rdb_var); + uint8_t success; + res->ptr = intsetAdd((intset*)res->ptr, llval, &success); + if (!success) { + LOG(ERROR) << "Duplicate set members detected"; + ec_ = RdbError(errc::duplicate_key); + return; + } + } + } else { + res = createSetObject(); + + /* It's faster to expand the dict to the right size asap in order + * to avoid rehashing */ + if (len > DICT_HT_INITIAL_SIZE && dictTryExpand((dict*)res->ptr, len) != DICT_OK) { + LOG(ERROR) << "OOM in dictTryExpand " << len; + ec_ = RdbError(errc::out_of_memory); + return; + } + + for (size_t i = 0; i < len; i++) { + sdsele = ToSds(ltrace->arr[i].rdb_var); + if (!sdsele) + return; + + if (dictAdd((dict*)res->ptr, sdsele, NULL) != DICT_OK) { + LOG(ERROR) << "Duplicate set members detected"; + ec_ = RdbError(errc::duplicate_key); + return; + } + } + } + + pv_->ImportRObj(res); + std::move(cleanup).Cancel(); +} + +void RdbLoader::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) { + size_t len = ltrace->arr.size() / 2; + + /* Too many entries? Use a hash table right from the start. */ + bool keep_lp = (len <= server.hash_max_listpack_entries); + + size_t lp_size = 0; + if (keep_lp) { + for (const auto& str : ltrace->arr) { + size_t str_len = StrLen(str.rdb_var); + lp_size += str_len; + + if (str_len > server.hash_max_listpack_value) { + keep_lp = false; + break; + } + } + } + + robj* res = nullptr; + + if (keep_lp) { + uint8_t* lp = lpNew(lp_size); + + for (size_t i = 0; i < len; ++i) { + /* Add pair to listpack */ + string_view sv = ToSV(ltrace->arr[i * 2].rdb_var); + lp = lpAppend(lp, reinterpret_cast(sv.data()), sv.size()); + + sv = ToSV(ltrace->arr[i * 2 + 1].rdb_var); + lp = lpAppend(lp, reinterpret_cast(sv.data()), sv.size()); + } + + if (ec_) { + lpFree(lp); + return; + } + + lp = lpShrinkToFit(lp); + robj* o = createObject(OBJ_HASH, lp); + o->encoding = OBJ_ENCODING_LISTPACK; + } else { + dict* hmap = dictCreate(&hashDictType); + + auto cleanup = absl::MakeCleanup([&] { + dictRelease(hmap); + }); + + if (len > DICT_HT_INITIAL_SIZE) { + if (dictTryExpand(hmap, len) != DICT_OK) { + LOG(ERROR) << "OOM in dictTryExpand " << len; + ec_ = RdbError(errc::out_of_memory); + return; + } + } + + for (size_t i = 0; i < len; ++i) { + sds key = ToSds(ltrace->arr[i * 2].rdb_var); + sds val = ToSds(ltrace->arr[i * 2 + 1].rdb_var); + + if (!key || !val) + return; + + /* Add pair to hash table */ + int ret = dictAdd(hmap, key, val); + if (ret == DICT_ERR) { + LOG(ERROR) << "Duplicate hash fields detected"; + ec_ = RdbError(errc::rdb_file_corrupted); + return; + } + } + + res = createObject(OBJ_HASH, hmap); + res->encoding = OBJ_ENCODING_HT; + std::move(cleanup).Cancel(); + } + + DCHECK(res); + pv_->ImportRObj(res); +} + +void RdbLoader::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) { + quicklist* ql = + quicklistNew(GetFlag(FLAGS_list_max_listpack_size), GetFlag(FLAGS_list_compress_depth)); + auto cleanup = absl::Cleanup([&] { quicklistRelease(ql); }); + + for (size_t i = 0; i < ltrace->arr.size(); ++i) { + unsigned container = ltrace->arr[i].encoding; + string_view sv = ToSV(ltrace->arr[i].rdb_var); + + if (ec_) + return; + + if (container == QUICKLIST_NODE_CONTAINER_PLAIN) { + quicklistAppendPlainNode(ql, (uint8_t*)sv.data(), sv.size()); + continue; + } + + uint8_t* lp = nullptr; + + if (rdb_type_ == RDB_TYPE_LIST_QUICKLIST_2) { + uint8_t* src = (uint8_t*)sv.data(); + if (!lpValidateIntegrity(src, sv.size(), 0, NULL, NULL)) { + LOG(ERROR) << "Listpack integrity check failed."; + ec_ = RdbError(errc::rdb_file_corrupted); + return; + } + + if (lpLength(src) == 0) { + continue; + } + + lp = (uint8_t*)zmalloc(sv.size()); + memcpy(lp, src, sv.size()); + } else { + lp = lpNew(sv.size()); + if (!ziplistValidateIntegrity((uint8_t*)sv.data(), sv.size(), 1, + ziplistEntryConvertAndValidate, &lp)) { + LOG(ERROR) << "Ziplist integrity check failed."; + zfree(lp); + ec_ = RdbError(errc::rdb_file_corrupted); + return; + } + + /* Silently skip empty ziplists, if we'll end up with empty quicklist we'll fail later. */ + if (lpLength(lp) == 0) { + zfree(lp); + continue; + } + + lp = lpShrinkToFit(lp); + } + + quicklistAppendListpack(ql, lp); + } + + if (quicklistCount(ql) == 0) { + ec_ = RdbError(errc::empty_key); + return; + } + + robj* res = createObject(OBJ_LIST, ql); + res->encoding = OBJ_ENCODING_QUICKLIST; + std::move(cleanup).Cancel(); + + pv_->ImportRObj(res); +} + +void RdbLoader::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) { + robj* res = createZsetObject(); + zset* zs = (zset*)res->ptr; + + auto cleanup = absl::Cleanup([&] { decrRefCount(res); }); + + size_t zsetlen = ltrace->arr.size(); + if (zsetlen > DICT_HT_INITIAL_SIZE && dictTryExpand(zs->dict, zsetlen) != DICT_OK) { + LOG(ERROR) << "OOM in dictTryExpand " << zsetlen; + ec_ = RdbError(errc::out_of_memory); + return; + } + + size_t maxelelen = 0, totelelen = 0; + + for (size_t i = 0; i < zsetlen; ++i) { + sds sdsele = ToSds(ltrace->arr[i].rdb_var); + if (!sdsele) + return; + + double score = ltrace->arr[i].score; + + /* Don't care about integer-encoded strings. */ + if (sdslen(sdsele) > maxelelen) + maxelelen = sdslen(sdsele); + totelelen += sdslen(sdsele); + + zskiplistNode* znode = zslInsert(zs->zsl, score, sdsele); + int ret = dictAdd(zs->dict, sdsele, &znode->score); + if (ret != DICT_OK) { + LOG(ERROR) << "Duplicate zset fields detected"; + sdsfree(sdsele); + ec_ = RdbError(errc::rdb_file_corrupted); + return; + } + } + + /* Convert *after* loading, since sorted sets are not stored ordered. */ + if (zsetLength(res) <= server.zset_max_listpack_entries && + maxelelen <= server.zset_max_listpack_value && lpSafeToAdd(NULL, totelelen)) { + zsetConvert(res, OBJ_ENCODING_LISTPACK); + } + + std::move(cleanup).Cancel(); + + pv_->ImportRObj(res); +} + +void RdbLoader::OpaqueObjLoader::HandleBlob(string_view blob) { + if (rdb_type_ == RDB_TYPE_STRING) { + pv_->SetString(blob); + return; + } + + robj* res = nullptr; + if (rdb_type_ == RDB_TYPE_SET_INTSET) { + if (!intsetValidateIntegrity((const uint8_t*)blob.data(), blob.size(), 0)) { + LOG(ERROR) << "Intset integrity check failed."; + ec_ = RdbError(errc::rdb_file_corrupted); + return; + } + + const intset* is = (const intset*)blob.data(); + + unsigned len = intsetLen(is); + if (len > SetFamily::MaxIntsetEntries()) { + res = createSetObject(); + if (len > DICT_HT_INITIAL_SIZE && dictTryExpand((dict*)res->ptr, len) != DICT_OK) { + LOG(ERROR) << "OOM in dictTryExpand " << len; + decrRefCount(res); + ec_ = RdbError(errc::out_of_memory); + return; + } + + SetFamily::ConvertTo(is, (dict*)res->ptr); + } else { + intset* mine = (intset*)zmalloc(blob.size()); + memcpy(mine, blob.data(), blob.size()); + res = createObject(OBJ_SET, mine); + res->encoding = OBJ_ENCODING_INTSET; + } + } else if (rdb_type_ == RDB_TYPE_HASH_ZIPLIST) { + unsigned char* lp = lpNew(blob.size()); + if (!ziplistPairsConvertAndValidateIntegrity((const uint8_t*)blob.data(), blob.size(), &lp)) { + LOG(ERROR) << "Zset ziplist integrity check failed."; + zfree(lp); + ec_ = RdbError(errc::rdb_file_corrupted); + return; + } + + if (lpLength(lp) == 0) { + lpFree(lp); + ec_ = RdbError(errc::empty_key); + return; + } + + res = createObject(OBJ_HASH, lp); + res->encoding = OBJ_ENCODING_LISTPACK; + + if (lpBytes(lp) > HSetFamily::MaxListPackLen()) + hashTypeConvert(res, OBJ_ENCODING_HT); + else + res->ptr = lpShrinkToFit((uint8_t*)res->ptr); + } else if (rdb_type_ == RDB_TYPE_ZSET_ZIPLIST) { + unsigned char* lp = lpNew(blob.size()); + if (!ziplistPairsConvertAndValidateIntegrity((uint8_t*)blob.data(), blob.size(), &lp)) { + LOG(ERROR) << "Zset ziplist integrity check failed."; + zfree(lp); + ec_ = RdbError(errc::rdb_file_corrupted); + return; + } + + if (lpLength(lp) == 0) { + lpFree(lp); + ec_ = RdbError(errc::empty_key); + return; + } + + res = createObject(OBJ_ZSET, lp); + res->encoding = OBJ_ENCODING_LISTPACK; + + if (lpBytes(lp) > server.zset_max_listpack_entries) + zsetConvert(res, OBJ_ENCODING_SKIPLIST); + else + res->ptr = lpShrinkToFit(lp); + } else { + LOG(FATAL) << "Unsupported rdb type " << rdb_type_; + } + + pv_->ImportRObj(res); +} + +sds RdbLoader::OpaqueObjLoader::ToSds(const RdbVariant& obj) { + if (holds_alternative(obj)) { + return sdsfromlonglong(get(obj)); + } + + const base::PODArray* ch_arr = get_if>(&obj); + if (ch_arr) { + return sdsnewlen(ch_arr->data(), ch_arr->size()); + } + + const LzfString* lzf = get_if(&obj); + if (lzf) { + sds res = sdsnewlen(NULL, lzf->uncompressed_len); + if (lzf_decompress(lzf->compressed_blob.data(), lzf->compressed_blob.size(), res, + lzf->uncompressed_len) == 0) { + LOG(ERROR) << "Invalid LZF compressed string"; + ec_ = RdbError(errc::rdb_file_corrupted); + sdsfree(res); + + return nullptr; + } + return res; + } + + LOG(FATAL) << "Unexpected variant"; + return nullptr; +} + +string_view RdbLoader::OpaqueObjLoader::ToSV(const RdbVariant& obj) { + if (holds_alternative(obj)) { + tset_blob_.resize(32); + auto val = get(obj); + char* next = absl::numbers_internal::FastIntToBuffer(val, tset_blob_.data()); + return string_view{tset_blob_.data(), size_t(next - tset_blob_.data())}; + } + + const base::PODArray* ch_arr = get_if>(&obj); + if (ch_arr) { + return string_view(ch_arr->data(), ch_arr->size()); + } + + const LzfString* lzf = get_if(&obj); + if (lzf) { + tset_blob_.resize(lzf->uncompressed_len); + if (lzf_decompress(lzf->compressed_blob.data(), lzf->compressed_blob.size(), tset_blob_.data(), + lzf->uncompressed_len) == 0) { + LOG(ERROR) << "Invalid LZF compressed string"; + ec_ = RdbError(errc::rdb_file_corrupted); + return string_view{}; + } + return string_view{tset_blob_.data(), tset_blob_.size()}; + } + + LOG(FATAL) << "Unexpected variant"; + return string_view{}; +} + struct RdbLoader::ObjSettings { long long now; // current epoch time in ms. int64_t expiretime = 0; // expire epoch time in ms @@ -213,7 +695,7 @@ RdbLoader::~RdbLoader() { auto exp_res = (expr); \ if (!exp_res) \ return make_unexpected(exp_res.error()); \ - dest = exp_res.value(); \ + dest = std::move(exp_res.value()); \ } error_code RdbLoader::Load(io::Source* src) { @@ -259,7 +741,7 @@ error_code RdbLoader::Load(io::Source* src) { settings.now = mstime(); size_t keys_loaded = 0; - while (1) { + while (!stop_early_.load(memory_order_relaxed)) { /* Read type. */ SET_OR_RETURN(FetchType(), type); @@ -353,6 +835,11 @@ error_code RdbLoader::Load(io::Source* src) { settings.Reset(); } // main load loop + if (stop_early_) { + lock_guard lk(mu_); + return ec_; + } + /* Verify the checksum if RDB version is >= 5 */ RETURN_ON_ERR(VerifyChecksum()); @@ -593,7 +1080,12 @@ void RdbLoader::FlushShardAsync(ShardId sid) { if (out_buf.empty()) return; - auto cb = [indx = this->cur_db_index_, vec = std::move(out_buf)] { LoadItemsBuffer(indx, vec); }; + ItemsBuf* ib = new ItemsBuf{std::move(out_buf)}; + auto cb = [indx = this->cur_db_index_, ib, this] { + this->LoadItemsBuffer(indx, *ib); + delete ib; + }; + shard_set->Add(sid, std::move(cb)); } @@ -601,7 +1093,18 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) { DbSlice& db_slice = EngineShard::tlocal()->db_slice(); for (const auto& item : ib) { std::string_view key{item.key, sdslen(item.key)}; - auto [it, added] = db_slice.AddOrFind(db_ind, key, PrimeValue{item.val}, item.expire_ms); + PrimeValue pv; + OpaqueObjLoader visitor(item.val.rdb_type, &pv); + std::visit(visitor, item.val.obj); + + if (visitor.ec()) { + lock_guard lk(mu_); + ec_ = visitor.ec(); + stop_early_ = true; + break; + } + + auto [it, added] = db_slice.AddOrFind(db_ind, key, std::move(pv), item.expire_ms); if (!added) { LOG(WARNING) << "RDB has duplicated key '" << key << "' in DB " << db_ind; @@ -611,6 +1114,26 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) { } } +size_t RdbLoader::StrLen(const RdbVariant& tset) { + const base::PODArray* arr = get_if>(&tset); + if (arr) + return arr->size(); + + if (holds_alternative(tset)) { + auto val = get(tset); + char buf[32]; + char* next = absl::numbers_internal::FastIntToBuffer(val, buf); + return (next - buf); + } + + const LzfString* lzf = get_if(&tset); + if (lzf) + return lzf->uncompressed_len; + + LOG(DFATAL) << "should not reach"; + return 0; +} + auto RdbLoader::FetchGenericString(int flags) -> io::Result { bool isencoded; size_t len; @@ -622,7 +1145,7 @@ auto RdbLoader::FetchGenericString(int flags) -> io::Result { case RDB_ENC_INT8: case RDB_ENC_INT16: case RDB_ENC_INT32: - return FetchIntegerObject(len, flags, NULL); + return FetchIntegerObject(len, flags); case RDB_ENC_LZF: return FetchLzfStringObject(flags); default: @@ -717,8 +1240,7 @@ auto RdbLoader::FetchLzfStringObject(int flags) -> io::Result { return make_pair(createObject(OBJ_STRING, val), len); } -auto RdbLoader::FetchIntegerObject(int enctype, int flags, size_t* lenptr) - -> io::Result { +auto RdbLoader::FetchIntegerObject(int enctype, int flags) -> io::Result { bool plain = (flags & RDB_LOAD_PLAIN) != 0; bool sds = (flags & RDB_LOAD_SDS) != 0; bool encode = (flags & RDB_LOAD_ENC) != 0; @@ -737,8 +1259,6 @@ auto RdbLoader::FetchIntegerObject(int enctype, int flags, size_t* lenptr) if (plain || sds) { char buf[LONG_STR_SIZE], *p; int len = ll2string(buf, sizeof(buf), val); - if (lenptr) - *lenptr = len; p = plain ? (char*)zmalloc(len) : sdsnewlen(SDS_NOINIT, len); memcpy(p, buf, len); return make_pair(p, len); @@ -801,40 +1321,33 @@ auto RdbLoader::ReadKey() -> io::Result { return res.get_unexpected(); } -io::Result RdbLoader::ReadObj(int rdbtype) { +auto RdbLoader::ReadObj(int rdbtype) -> io::Result { io::Result res_obj = nullptr; - io::Result fetch_res; switch (rdbtype) { - case RDB_TYPE_STRING: + case RDB_TYPE_STRING: { /* Read string value */ - fetch_res = FetchGenericString(RDB_LOAD_NONE); - if (!fetch_res) - return fetch_res.get_unexpected(); - res_obj = (robj*)fetch_res->first; - break; + auto fetch = ReadStringObj(); + if (!fetch) + return make_unexpected(fetch.error()); + return OpaqueObj{std::move(*fetch), RDB_TYPE_STRING}; + } case RDB_TYPE_SET: - res_obj = ReadSet(); - break; + return ReadSet(); case RDB_TYPE_SET_INTSET: - res_obj = ReadIntSet(); - break; + return ReadIntSet(); case RDB_TYPE_HASH_ZIPLIST: - res_obj = ReadHZiplist(); - break; + return ReadHZiplist(); case RDB_TYPE_HASH: - res_obj = ReadHSet(); - break; + return ReadHMap(); case RDB_TYPE_ZSET: case RDB_TYPE_ZSET_2: - res_obj = ReadZSet(rdbtype); - break; + return ReadZSet(rdbtype); case RDB_TYPE_ZSET_ZIPLIST: - res_obj = ReadZSetZL(); - break; + return ReadZSetZL(); case RDB_TYPE_LIST_QUICKLIST: - res_obj = ReadListQuicklist(rdbtype); - break; + case RDB_TYPE_LIST_QUICKLIST_2: + return ReadListQuicklist(rdbtype); case RDB_TYPE_STREAM_LISTPACKS: res_obj = ReadStreams(); break; @@ -843,247 +1356,150 @@ io::Result RdbLoader::ReadObj(int rdbtype) { return Unexpected(errc::invalid_encoding); } - return res_obj; + if (!res_obj) + return make_unexpected(res_obj.error()); + + return OpaqueObj{*res_obj, rdbtype}; } -io::Result RdbLoader::ReadSet() { +auto RdbLoader::ReadStringObj() -> io::Result { + bool isencoded; + size_t len; + + SET_OR_UNEXPECT(LoadLen(&isencoded), len); + + if (isencoded) { + switch (len) { + case RDB_ENC_INT8: + case RDB_ENC_INT16: + case RDB_ENC_INT32: + return ReadIntObj(len); + case RDB_ENC_LZF: + return ReadLzf(); + default: + LOG(ERROR) << "Unknown RDB string encoding " << len; + return Unexpected(errc::rdb_file_corrupted); + } + } + + base::PODArray blob; + blob.resize(len); + error_code ec = FetchBuf(len, blob.data()); + if (ec) { + return make_unexpected(ec); + } + + return blob; +} + +io::Result RdbLoader::ReadIntObj(int enctype) { + long long val; + + if (enctype == RDB_ENC_INT8) { + SET_OR_UNEXPECT(FetchInt(), val); + } else if (enctype == RDB_ENC_INT16) { + SET_OR_UNEXPECT(FetchInt(), val); + } else if (enctype == RDB_ENC_INT32) { + SET_OR_UNEXPECT(FetchInt(), val); + } else { + return Unexpected(errc::invalid_encoding); + } + return val; +} + +auto RdbLoader::ReadLzf() -> io::Result { + uint64_t clen; + LzfString res; + + SET_OR_UNEXPECT(LoadLen(NULL), clen); + SET_OR_UNEXPECT(LoadLen(NULL), res.uncompressed_len); + + if (res.uncompressed_len > 1ULL << 29) { + LOG(ERROR) << "Uncompressed length is too big " << res.uncompressed_len; + return Unexpected(errc::rdb_file_corrupted); + } + + res.compressed_blob.resize(clen); + /* Load the compressed representation and uncompress it to target. */ + error_code ec = FetchBuf(clen, res.compressed_blob.data()); + if (ec) { + return make_unexpected(ec); + } + + return res; +} + +auto RdbLoader::ReadSet() -> io::Result { size_t len; SET_OR_UNEXPECT(LoadLen(NULL), len); if (len == 0) return Unexpected(errc::empty_key); - robj* res = nullptr; - sds sdsele = nullptr; - - auto cleanup = absl::MakeCleanup([&] { - if (sdsele) - sdsfree(sdsele); - decrRefCount(res); - }); - - /* Use a regular set when there are too many entries. */ - if (len > SetFamily::MaxIntsetEntries()) { - res = createSetObject(); - /* It's faster to expand the dict to the right size asap in order - * to avoid rehashing */ - if (len > DICT_HT_INITIAL_SIZE && dictTryExpand((dict*)res->ptr, len) != DICT_OK) { - LOG(ERROR) << "OOM in dictTryExpand " << len; - return Unexpected(errc::out_of_memory); - } - } else { - // TODO: why do we bother creating intset if it was recorded as non intset? - res = createIntsetObject(); - } - - /* Load every single element of the set */ + unique_ptr res(new LoadTrace); + res->arr.resize(len); for (size_t i = 0; i < len; i++) { - long long llval; - io::Result fetch = FetchGenericString(RDB_LOAD_SDS); + io::Result fetch = ReadStringObj(); if (!fetch) { return make_unexpected(fetch.error()); } - sdsele = (sds)fetch->first; - - if (res->encoding == OBJ_ENCODING_INTSET) { - /* Fetch integer value from element. */ - if (isSdsRepresentableAsLongLong(sdsele, &llval) == C_OK) { - uint8_t success; - res->ptr = intsetAdd((intset*)res->ptr, llval, &success); - if (!success) { - LOG(ERROR) << "Duplicate set members detected"; - return Unexpected(errc::duplicate_key); - } - } else { - dict* ds = dictCreate(&setDictType); - if (dictTryExpand((dict*)res->ptr, len) != DICT_OK) { - dictRelease(ds); - LOG(ERROR) << "OOM in dictTryExpand " << len; - return Unexpected(errc::out_of_memory); - } - SetFamily::ConvertTo((intset*)res->ptr, ds); - zfree(res->ptr); - res->ptr = ds; - res->encoding = OBJ_ENCODING_HT; - } - } - - /* This will also be called when the set was just converted - * to a regular hash table encoded set. */ - if (res->encoding == OBJ_ENCODING_HT) { - if (dictAdd((dict*)res->ptr, sdsele, NULL) != DICT_OK) { - LOG(ERROR) << "Duplicate set members detected"; - return Unexpected(errc::duplicate_key); - } - } else { - sdsfree(sdsele); - } + res->arr[i].rdb_var = std::move(fetch.value()); } - std::move(cleanup).Cancel(); - - return res; + return OpaqueObj{std::move(res), RDB_TYPE_SET}; } -::io::Result RdbLoader::ReadIntSet() { - OpaqueBuf fetch; - SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_PLAIN), fetch); - - if (fetch.second == 0) { - return Unexpected(errc::rdb_file_corrupted); - } - DCHECK(fetch.first); - - if (!intsetValidateIntegrity((uint8_t*)fetch.first, fetch.second, 0)) { - LOG(ERROR) << "Intset integrity check failed."; - zfree(fetch.first); - return Unexpected(errc::rdb_file_corrupted); +auto RdbLoader::ReadIntSet() -> io::Result { + io::Result fetch = ReadStringObj(); + if (!fetch) { + return make_unexpected(fetch.error()); } - intset* is = (intset*)fetch.first; - robj* res; - unsigned len = intsetLen(is); - if (len > SetFamily::MaxIntsetEntries()) { - res = createSetObject(); - if (len > DICT_HT_INITIAL_SIZE && dictTryExpand((dict*)res->ptr, len) != DICT_OK) { - LOG(ERROR) << "OOM in dictTryExpand " << len; - decrRefCount(res); - return Unexpected(errc::out_of_memory); - } - SetFamily::ConvertTo(is, (dict*)res->ptr); - zfree(is); + const LzfString* lzf = get_if(&fetch.value()); + const base::PODArray* arr = get_if>(&fetch.value()); + + if (lzf) { + if (lzf->uncompressed_len == 0 || lzf->compressed_blob.empty()) + return Unexpected(errc::rdb_file_corrupted); + } else if (arr) { + if (arr->empty()) + return Unexpected(errc::rdb_file_corrupted); } else { - res = createObject(OBJ_SET, is); - res->encoding = OBJ_ENCODING_INTSET; - } - return res; -} - -io::Result RdbLoader::ReadHZiplist() { - OpaqueBuf fetch; - SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_PLAIN), fetch); - - if (fetch.second == 0) { - return Unexpected(errc::rdb_file_corrupted); - } - DCHECK(fetch.first); - - unsigned char* lp = lpNew(fetch.second); - if (!ziplistPairsConvertAndValidateIntegrity((uint8_t*)fetch.first, fetch.second, &lp)) { - LOG(ERROR) << "Zset ziplist integrity check failed."; - zfree(lp); - zfree(fetch.first); return Unexpected(errc::rdb_file_corrupted); } - zfree(fetch.first); - - if (lpLength(lp) == 0) { - lpFree(lp); - - return Unexpected(errc::empty_key); - } - - robj* res = createObject(OBJ_HASH, lp); - res->encoding = OBJ_ENCODING_LISTPACK; - - if (lpBytes(lp) > HSetFamily::MaxListPackLen()) - hashTypeConvert(res, OBJ_ENCODING_HT); - else - res->ptr = lpShrinkToFit((uint8_t*)res->ptr); - - return res; + return OpaqueObj{std::move(*fetch), RDB_TYPE_SET_INTSET}; } -io::Result RdbLoader::ReadHSet() { +auto RdbLoader::ReadHZiplist() -> io::Result { + RdbVariant str_obj; + SET_OR_UNEXPECT(ReadStringObj(), str_obj); + + if (StrLen(str_obj) == 0) { + return Unexpected(errc::rdb_file_corrupted); + } + + return OpaqueObj{std::move(str_obj), RDB_TYPE_HASH_ZIPLIST}; +} + +auto RdbLoader::ReadHMap() -> io::Result { uint64_t len; SET_OR_UNEXPECT(LoadLen(nullptr), len); if (len == 0) return Unexpected(errc::empty_key); - sds field = nullptr; - sds value = nullptr; - robj* res = createHashObject(); + unique_ptr load_trace(new LoadTrace); + load_trace->arr.resize(len * 2); - /* Too many entries? Use a hash table right from the start. */ - if (len > server.hash_max_listpack_entries) - hashTypeConvert(res, OBJ_ENCODING_HT); - - auto cleanup = absl::Cleanup([&] { - decrRefCount(res); - if (field) - sdsfree(field); - if (value) - sdsfree(value); - }); - - /* Load every field and value into the ziplist */ - while (res->encoding == OBJ_ENCODING_LISTPACK && len > 0) { - len--; - - OpaqueBuf ofield, ovalue; - SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_SDS), ofield); - field = (sds)ofield.first; - - SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_SDS), ovalue); - value = (sds)ovalue.first; - - /* Convert to hash table if size threshold is exceeded */ - if (sdslen(field) > server.hash_max_listpack_value || - sdslen(value) > server.hash_max_listpack_value || - !lpSafeToAdd((uint8_t*)res->ptr, sdslen(field) + sdslen(value))) { - hashTypeConvert(res, OBJ_ENCODING_HT); - int ret = dictAdd((dict*)res->ptr, field, value); - if (ret == DICT_ERR) { - return Unexpected(errc::rdb_file_corrupted); - } - break; - } - - /* Add pair to listpack */ - res->ptr = lpAppend((uint8_t*)res->ptr, (uint8_t*)field, sdslen(field)); - res->ptr = lpAppend((uint8_t*)res->ptr, (uint8_t*)value, sdslen(value)); - - sdsfree(field); - sdsfree(value); - field = value = nullptr; + for (size_t i = 0; i < load_trace->arr.size(); ++i) { + SET_OR_UNEXPECT(ReadStringObj(), load_trace->arr[i].rdb_var); } - if (res->encoding == OBJ_ENCODING_HT) { - if (len > DICT_HT_INITIAL_SIZE) { - if (dictTryExpand((dict*)res->ptr, len) != DICT_OK) { - LOG(ERROR) << "OOM in dictTryExpand " << len; - return Unexpected(errc::out_of_memory); - } - } - - /* Load remaining fields and values into the hash table */ - while (len > 0) { - len--; - - OpaqueBuf ofield, ovalue; - SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_SDS), ofield); - field = (sds)ofield.first; - - SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_SDS), ovalue); - value = (sds)ovalue.first; - - /* Add pair to hash table */ - int ret = dictAdd((dict*)res->ptr, field, value); - if (ret == DICT_ERR) { - LOG(ERROR) << "Duplicate hash fields detected"; - return Unexpected(errc::rdb_file_corrupted); - } - } - } - DCHECK_EQ(0u, len); - - std::move(cleanup).Cancel(); - return res; + return OpaqueObj{std::move(load_trace), RDB_TYPE_HASH}; } -io::Result RdbLoader::ReadZSet(int rdbtype) { +auto RdbLoader::ReadZSet(int rdbtype) -> io::Result { /* Read sorted set value. */ uint64_t zsetlen; SET_OR_UNEXPECT(LoadLen(nullptr), zsetlen); @@ -1091,33 +1507,13 @@ io::Result RdbLoader::ReadZSet(int rdbtype) { if (zsetlen == 0) return Unexpected(errc::empty_key); - robj* res = createZsetObject(); - zset* zs = (zset*)res->ptr; - sds sdsele = nullptr; + unique_ptr load_trace(new LoadTrace); + load_trace->arr.resize(zsetlen); - auto cleanup = absl::Cleanup([&] { - decrRefCount(res); - if (sdsele) - sdsfree(sdsele); - }); - - if (zsetlen > DICT_HT_INITIAL_SIZE && dictTryExpand(zs->dict, zsetlen) != DICT_OK) { - LOG(ERROR) << "OOM in dictTryExpand " << zsetlen; - return Unexpected(errc::out_of_memory); - } - - size_t maxelelen = 0, totelelen = 0; - - /* Load every single element of the sorted set. */ - while (zsetlen--) { - double score; - zskiplistNode* znode; - - OpaqueBuf fetch; - SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_SDS), fetch); - - sdsele = (sds)fetch.first; + double score; + for (size_t i = 0; i < load_trace->arr.size(); ++i) { + SET_OR_UNEXPECT(ReadStringObj(), load_trace->arr[i].rdb_var); if (rdbtype == RDB_TYPE_ZSET_2) { SET_OR_UNEXPECT(FetchBinaryDouble(), score); } else { @@ -1128,85 +1524,35 @@ io::Result RdbLoader::ReadZSet(int rdbtype) { LOG(ERROR) << "Zset with NAN score detected"; return Unexpected(errc::rdb_file_corrupted); } - - /* Don't care about integer-encoded strings. */ - if (sdslen(sdsele) > maxelelen) - maxelelen = sdslen(sdsele); - totelelen += sdslen(sdsele); - - znode = zslInsert(zs->zsl, score, sdsele); - int ret = dictAdd(zs->dict, sdsele, &znode->score); - sdsele = nullptr; - - if (ret != DICT_OK) { - LOG(ERROR) << "Duplicate zset fields detected"; - return Unexpected(errc::rdb_file_corrupted); - } + load_trace->arr[i].score = score; } - /* Convert *after* loading, since sorted sets are not stored ordered. */ - if (zsetLength(res) <= server.zset_max_listpack_entries && - maxelelen <= server.zset_max_listpack_value && lpSafeToAdd(NULL, totelelen)) { - zsetConvert(res, OBJ_ENCODING_LISTPACK); - } - - std::move(cleanup).Cancel(); - - return res; + return OpaqueObj{std::move(load_trace), rdbtype}; } -io::Result RdbLoader::ReadZSetZL() { - OpaqueBuf fetch; - SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_PLAIN), fetch); +auto RdbLoader::ReadZSetZL() -> io::Result { + RdbVariant str_obj; + SET_OR_UNEXPECT(ReadStringObj(), str_obj); - if (fetch.second == 0) { - return Unexpected(errc::rdb_file_corrupted); - } - DCHECK(fetch.first); - - unsigned char* lp = lpNew(fetch.second); - if (!ziplistPairsConvertAndValidateIntegrity((uint8_t*)fetch.first, fetch.second, &lp)) { - LOG(ERROR) << "Zset ziplist integrity check failed."; - zfree(lp); - zfree(fetch.first); + if (StrLen(str_obj) == 0) { return Unexpected(errc::rdb_file_corrupted); } - zfree(fetch.first); - - if (lpLength(lp) == 0) { - lpFree(lp); - - return Unexpected(errc::empty_key); - } - - robj* res = createObject(OBJ_ZSET, lp); - res->encoding = OBJ_ENCODING_LISTPACK; - - if (lpBytes(lp) > server.zset_max_listpack_entries) - zsetConvert(res, OBJ_ENCODING_SKIPLIST); - else - res->ptr = lpShrinkToFit(lp); - - return res; + return OpaqueObj{std::move(str_obj), RDB_TYPE_ZSET_ZIPLIST}; } -io::Result RdbLoader::ReadListQuicklist(int rdbtype) { +auto RdbLoader::ReadListQuicklist(int rdbtype) -> io::Result { uint64_t len; SET_OR_UNEXPECT(LoadLen(nullptr), len); if (len == 0) return Unexpected(errc::empty_key); - quicklist* ql = - quicklistNew(GetFlag(FLAGS_list_max_listpack_size), GetFlag(FLAGS_list_compress_depth)); - uint64_t container = QUICKLIST_NODE_CONTAINER_PACKED; - - auto cleanup = absl::Cleanup([&] { quicklistRelease(ql); }); - - while (len--) { - uint8_t* lp = nullptr; + unique_ptr load_trace(new LoadTrace); + load_trace->arr.resize(len); + for (size_t i = 0; i < len; ++i) { + uint64_t container = QUICKLIST_NODE_CONTAINER_PACKED; if (rdbtype == RDB_TYPE_LIST_QUICKLIST_2) { SET_OR_UNEXPECT(LoadLen(nullptr), container); @@ -1217,58 +1563,19 @@ io::Result RdbLoader::ReadListQuicklist(int rdbtype) { } } - OpaqueBuf data; - SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_PLAIN), data); - if (data.second == 0) { + RdbVariant var; + SET_OR_UNEXPECT(ReadStringObj(), var); + if (StrLen(var) == 0) { return Unexpected(errc::rdb_file_corrupted); } - - if (container == QUICKLIST_NODE_CONTAINER_PLAIN) { - quicklistAppendPlainNode(ql, (uint8_t*)data.first, data.second); - continue; - } - - if (rdbtype == RDB_TYPE_LIST_QUICKLIST_2) { - lp = (uint8_t*)data.first; - if (!lpValidateIntegrity(lp, data.second, 0, NULL, NULL)) { - LOG(ERROR) << "Listpack integrity check failed."; - zfree(lp); - return Unexpected(errc::rdb_file_corrupted); - } - } else { - lp = lpNew(data.second); - if (!ziplistValidateIntegrity((uint8_t*)data.first, data.second, 1, - ziplistEntryConvertAndValidate, &lp)) { - LOG(ERROR) << "Ziplist integrity check failed."; - zfree(data.first); - zfree(lp); - return Unexpected(errc::rdb_file_corrupted); - } - zfree(data.first); - lp = lpShrinkToFit(lp); - } - - /* Silently skip empty ziplists, if we'll end up with empty quicklist we'll fail later. */ - if (lpLength(lp) == 0) { - zfree(lp); - continue; - } - quicklistAppendListpack(ql, lp); - } // while - - if (quicklistCount(ql) == 0) { - return Unexpected(errc::empty_key); + load_trace->arr[i].rdb_var = std::move(var); + load_trace->arr[i].encoding = container; } - std::move(cleanup).Cancel(); - - robj* res = createObject(OBJ_LIST, ql); - res->encoding = OBJ_ENCODING_QUICKLIST; - - return res; + return OpaqueObj{std::move(load_trace), rdbtype}; } -::io::Result RdbLoader::ReadStreams() { +auto RdbLoader::ReadStreams() -> io::Result { uint64_t listpacks; SET_OR_UNEXPECT(LoadLen(nullptr), listpacks); @@ -1449,13 +1756,19 @@ void RdbLoader::ResizeDb(size_t key_num, size_t expire_num) { error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) { /* Read key */ sds key; - robj* val; + OpaqueObj val; + // We free key in LoadItemsBuffer. SET_OR_RETURN(ReadKey(), key); auto key_cleanup = absl::MakeCleanup([key] { sdsfree(key); }); + io::Result io_res = ReadObj(type); - SET_OR_RETURN(ReadObj(type), val); + if (!io_res) { + VLOG(1) << "ReadObj error " << io_res.error() << " for key " << key; + return io_res.error(); + } + val = std::move(io_res.value()); /* Check if the key already expired. This function is used when loading * an RDB file from disk, either at startup, or when an RDB was @@ -1468,7 +1781,7 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) { // TODO: check rdbflags&RDBFLAGS_AOF_PREAMBLE logic in rdb.c bool should_expire = settings->has_expired; // TODO: to implement if (should_expire) { - decrRefCount(val); + // decrRefCount(val); } else { std::move(key_cleanup).Cancel(); @@ -1477,7 +1790,7 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) { uint64_t expire_at_ms = settings->expiretime; auto& out_buf = shard_buf_[sid]; - out_buf.emplace_back(Item{key, val, expire_at_ms}); + out_buf.emplace_back(Item{key, std::move(val), expire_at_ms}); constexpr size_t kBufSize = 128; if (out_buf.size() >= kBufSize) { diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index 3e4b64534..55b1bf495 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -1,6 +1,9 @@ // Copyright 2022, Roman Gershman. All rights reserved. // See LICENSE for licensing terms. // +#pragma once + +#include #include extern "C" { @@ -24,18 +27,53 @@ class RdbLoader { ~RdbLoader(); std::error_code Load(::io::Source* src); - void set_source_limit(size_t n) { source_limit_ = n;} + void set_source_limit(size_t n) { + source_limit_ = n; + } - ::io::Bytes Leftover() const { return mem_buf_.InputBuffer(); } - size_t bytes_read() const { return bytes_read_; } + ::io::Bytes Leftover() const { + return mem_buf_.InputBuffer(); + } + size_t bytes_read() const { + return bytes_read_; + } private: using MutableBytes = ::io::MutableBytes; struct ObjSettings; + using OpaqueBuf = std::pair; + struct LzfString { + base::PODArray compressed_blob; + uint64_t uncompressed_len; + }; + + struct LoadTrace; + + using RdbVariant = std::variant, LzfString, + std::unique_ptr>; + struct OpaqueObj { + RdbVariant obj; + int rdb_type; + }; + + struct LoadBlob { + RdbVariant rdb_var; + union { + unsigned encoding; + double score; + }; + }; + + struct LoadTrace { + std::vector arr; + }; + + class OpaqueObjLoader; + struct Item { sds key; - robj* val; + OpaqueObj val; uint64_t expire_ms; }; using ItemsBuf = std::vector; @@ -55,23 +93,27 @@ class RdbLoader { // FetchGenericString may return various types. I basically copied the code // from rdb.c and tried not to shoot myself on the way. // flags are RDB_LOAD_XXX masks. - using OpaqueBuf = std::pair; io::Result FetchGenericString(int flags); io::Result FetchLzfStringObject(int flags); - io::Result FetchIntegerObject(int enctype, int flags, size_t* lenptr); + io::Result FetchIntegerObject(int enctype, int flags); io::Result FetchBinaryDouble(); io::Result FetchDouble(); ::io::Result ReadKey(); - ::io::Result ReadObj(int rdbtype); - ::io::Result ReadSet(); - ::io::Result ReadIntSet(); - ::io::Result ReadHZiplist(); - ::io::Result ReadHSet(); - ::io::Result ReadZSet(int rdbtype); - ::io::Result ReadZSetZL(); - ::io::Result ReadListQuicklist(int rdbtype); + + ::io::Result ReadObj(int rdbtype); + ::io::Result ReadStringObj(); + ::io::Result ReadIntObj(int encoding); + ::io::Result ReadLzf(); + + ::io::Result ReadSet(); + ::io::Result ReadIntSet(); + ::io::Result ReadHZiplist(); + ::io::Result ReadHMap(); + ::io::Result ReadZSet(int rdbtype); + ::io::Result ReadZSetZL(); + ::io::Result ReadListQuicklist(int rdbtype); ::io::Result ReadStreams(); std::error_code EnsureRead(size_t min_sz) { @@ -86,7 +128,8 @@ class RdbLoader { std::error_code VerifyChecksum(); void FlushShardAsync(ShardId sid); - static void LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib); + void LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib); + static size_t StrLen(const RdbVariant& tset); ScriptMgr* script_mgr_; base::IoBuf mem_buf_; @@ -97,6 +140,10 @@ class RdbLoader { size_t bytes_read_ = 0; size_t source_limit_ = SIZE_MAX; DbIndex cur_db_index_ = 0; + + ::boost::fibers::mutex mu_; + std::error_code ec_; // guarded by mu_ + std::atomic_bool stop_early_{false}; }; } // namespace dfly diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 00a035c83..1466d04fe 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -97,6 +97,9 @@ TEST_F(RdbTest, LoadSmall6) { EXPECT_THAT(StrArray(resp.GetVec()[1]), UnorderedElementsAre("list1", "hset_zl", "list2", "zset_sl", "intset", "set1", "zset_zl", "hset_ht", "intkey", "strkey")); + EXPECT_THAT(Run({"get", "intkey"}), "1234567"); + EXPECT_THAT(Run({"get", "strkey"}), "abcdefghjjjjjjjjjj"); + resp = Run({"smembers", "intset"}); ASSERT_THAT(resp, ArgType(RespExpr::ARRAY)); EXPECT_THAT(resp.GetVec(), diff --git a/src/server/set_family.cc b/src/server/set_family.cc index 3c990f158..0cca3357e 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -1182,13 +1182,13 @@ uint32_t SetFamily::MaxIntsetEntries() { return kMaxIntSetEntries; } -void SetFamily::ConvertTo(intset* src, dict* dest) { +void SetFamily::ConvertTo(const intset* src, dict* dest) { int64_t intele; char buf[32]; /* To add the elements we extract integers and create redis objects */ int ii = 0; - while (intsetGet(src, ii++, &intele)) { + while (intsetGet(const_cast(src), ii++, &intele)) { char* next = absl::numbers_internal::FastIntToBuffer(intele, buf); sds s = sdsnewlen(buf, next - buf); CHECK(dictAddRaw(dest, s, NULL)); diff --git a/src/server/set_family.h b/src/server/set_family.h index c48a21107..9f15ff95c 100644 --- a/src/server/set_family.h +++ b/src/server/set_family.h @@ -25,7 +25,7 @@ class SetFamily { static uint32_t MaxIntsetEntries(); - static void ConvertTo(intset* src, dict* dest); + static void ConvertTo(const intset* src, dict* dest); private: static void SAdd(CmdArgList args, ConnectionContext* cntx);