mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
When hset is loaded from rdb, if a ttl is specified by the user, the code recreates a string_view on top of the same memory location, tset_blob_, that was used for val earlier. This causes the val string view to point to the same value now as TTL has. For example if the val string view was 'x' earlier (points to tset_blob_, size 1), and ttl is 7777777, val now becomes '7'. To fix this val is now given it's own string. TTL is kept as string view as the pointer is not reused anywhere in the following loop. Signed-off-by: Abhijat Malviya <abhijat@dragonflydb.io>
2823 lines
84 KiB
C++
2823 lines
84 KiB
C++
// Copyright 2022, DragonflyDB authors. All rights reserved.
|
|
// See LICENSE for licensing terms.
|
|
//
|
|
|
|
#include "server/rdb_load.h"
|
|
|
|
#include "absl/strings/escaping.h"
|
|
#include "server/tiered_storage.h"
|
|
|
|
extern "C" {
|
|
#include "redis/intset.h"
|
|
#include "redis/listpack.h"
|
|
#include "redis/lzfP.h" /* LZF compression library */
|
|
#include "redis/quicklist.h"
|
|
#include "redis/stream.h"
|
|
#include "redis/util.h"
|
|
#include "redis/ziplist.h"
|
|
#include "redis/zmalloc.h"
|
|
#include "redis/zset.h"
|
|
}
|
|
#include <absl/cleanup/cleanup.h>
|
|
#include <absl/strings/match.h>
|
|
#include <absl/strings/str_cat.h>
|
|
#include <absl/strings/str_split.h>
|
|
|
|
#include <cstring>
|
|
|
|
#include "base/endian.h"
|
|
#include "base/flags.h"
|
|
#include "base/logging.h"
|
|
#include "core/bloom.h"
|
|
#include "core/json/json_object.h"
|
|
#include "core/qlist.h"
|
|
#include "core/sorted_map.h"
|
|
#include "core/string_map.h"
|
|
#include "core/string_set.h"
|
|
#include "server/cluster/cluster_config.h"
|
|
#include "server/container_utils.h"
|
|
#include "server/engine_shard_set.h"
|
|
#include "server/error.h"
|
|
#include "server/family_utils.h"
|
|
#include "server/hset_family.h"
|
|
#include "server/journal/executor.h"
|
|
#include "server/journal/serializer.h"
|
|
#include "server/main_service.h"
|
|
#include "server/rdb_extensions.h"
|
|
#include "server/script_mgr.h"
|
|
#include "server/search/doc_index.h"
|
|
#include "server/serializer_commons.h"
|
|
#include "server/server_state.h"
|
|
#include "server/set_family.h"
|
|
#include "server/stream_family.h"
|
|
#include "server/transaction.h"
|
|
#include "strings/human_readable.h"
|
|
|
|
ABSL_DECLARE_FLAG(int32_t, list_max_listpack_size);
|
|
ABSL_DECLARE_FLAG(int32_t, list_compress_depth);
|
|
ABSL_DECLARE_FLAG(uint32_t, dbnum);
|
|
ABSL_DECLARE_FLAG(bool, list_experimental_v2);
|
|
ABSL_FLAG(bool, rdb_load_dry_run, false, "Dry run RDB load without applying changes");
|
|
ABSL_FLAG(bool, rdb_ignore_expiry, false, "Ignore Key Expiry when loding from RDB snapshot");
|
|
|
|
namespace dfly {
|
|
|
|
using namespace std;
|
|
using base::IoBuf;
|
|
using nonstd::make_unexpected;
|
|
using namespace util;
|
|
using absl::GetFlag;
|
|
using rdb::errc;
|
|
using namespace tiering::literals;
|
|
|
|
namespace {
|
|
|
|
// Maximum length of each LoadTrace segment.
|
|
//
|
|
// Note kMaxBlobLen must be a multiple of 6 to avoid truncating elements
|
|
// containing 2 or 3 items.
|
|
constexpr size_t kMaxBlobLen = 4092;
|
|
|
|
inline auto Unexpected(errc ev) {
|
|
return make_unexpected(RdbError(ev));
|
|
}
|
|
|
|
static const error_code kOk;
|
|
|
|
struct ZiplistCbArgs {
|
|
long count = 0;
|
|
absl::flat_hash_set<string_view> fields;
|
|
unsigned char** lp;
|
|
};
|
|
|
|
/* callback for hashZiplistConvertAndValidateIntegrity.
|
|
* Check that the ziplist doesn't have duplicate hash field names.
|
|
* The ziplist element pointed by 'p' will be converted and stored into listpack. */
|
|
int ziplistPairsEntryConvertAndValidate(unsigned char* p, unsigned int head_count, void* userdata) {
|
|
unsigned char* str;
|
|
unsigned int slen;
|
|
long long vll;
|
|
|
|
ZiplistCbArgs* data = (ZiplistCbArgs*)userdata;
|
|
|
|
if (data->fields.empty()) {
|
|
data->fields.reserve(head_count / 2);
|
|
}
|
|
|
|
if (!ziplistGet(p, &str, &slen, &vll))
|
|
return 0;
|
|
|
|
/* Even records are field names, add to dict and check that's not a dup */
|
|
if (((data->count) & 1) == 0) {
|
|
sds field = str ? sdsnewlen(str, slen) : sdsfromlonglong(vll);
|
|
auto [_, inserted] = data->fields.emplace(field, sdslen(field));
|
|
if (!inserted) {
|
|
/* Duplicate, return an error */
|
|
sdsfree(field);
|
|
return 0;
|
|
}
|
|
}
|
|
|
|
if (str) {
|
|
*(data->lp) = lpAppend(*(data->lp), (unsigned char*)str, slen);
|
|
} else {
|
|
*(data->lp) = lpAppendInteger(*(data->lp), vll);
|
|
}
|
|
|
|
(data->count)++;
|
|
return 1;
|
|
}
|
|
|
|
/* callback for ziplistValidateIntegrity.
|
|
* The ziplist element pointed by 'p' will be converted and stored into listpack. */
|
|
int ziplistEntryConvertAndValidate(unsigned char* p, unsigned int head_count, void* userdata) {
|
|
unsigned char* str;
|
|
unsigned int slen;
|
|
long long vll;
|
|
unsigned char** lp = (unsigned char**)userdata;
|
|
|
|
if (!ziplistGet(p, &str, &slen, &vll))
|
|
return 0;
|
|
|
|
if (str)
|
|
*lp = lpAppend(*lp, (unsigned char*)str, slen);
|
|
else
|
|
*lp = lpAppendInteger(*lp, vll);
|
|
|
|
return 1;
|
|
}
|
|
|
|
/* Validate the integrity of the data structure while converting it to
|
|
* listpack and storing it at 'lp'.
|
|
* The function is safe to call on non-validated ziplists, it returns 0
|
|
* when encounter an integrity validation issue. */
|
|
int ziplistPairsConvertAndValidateIntegrity(const uint8_t* zl, size_t size, unsigned char** lp) {
|
|
/* Keep track of the field names to locate duplicate ones */
|
|
ZiplistCbArgs data;
|
|
data.lp = lp;
|
|
|
|
int ret = ziplistValidateIntegrity(const_cast<uint8_t*>(zl), size, 1,
|
|
ziplistPairsEntryConvertAndValidate, &data);
|
|
|
|
/* make sure we have an even number of records. */
|
|
if (data.count & 1)
|
|
ret = 0;
|
|
|
|
for (auto field : data.fields) {
|
|
sdsfree((sds)field.data());
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
string ModuleTypeName(uint64_t module_id) {
|
|
static const char ModuleNameSet[] =
|
|
"ABCDEFGHIJKLMNOPQRSTUVWXYZ"
|
|
"abcdefghijklmnopqrstuvwxyz"
|
|
"0123456789-_";
|
|
|
|
char name[10];
|
|
|
|
name[9] = '\0';
|
|
char* p = name + 8;
|
|
module_id >>= 10;
|
|
for (int j = 0; j < 9; j++) {
|
|
*p-- = ModuleNameSet[module_id & 63];
|
|
module_id >>= 6;
|
|
}
|
|
|
|
return string{name};
|
|
}
|
|
|
|
bool RdbTypeAllowedEmpty(int type) {
|
|
return type == RDB_TYPE_STRING || type == RDB_TYPE_JSON || type == RDB_TYPE_SBF ||
|
|
type == RDB_TYPE_STREAM_LISTPACKS || type == RDB_TYPE_SET_WITH_EXPIRY ||
|
|
type == RDB_TYPE_HASH_WITH_EXPIRY;
|
|
}
|
|
|
|
} // namespace
|
|
|
|
class RdbLoaderBase::OpaqueObjLoader {
|
|
public:
|
|
OpaqueObjLoader(int rdb_type, PrimeValue* pv, LoadConfig config)
|
|
: rdb_type_(rdb_type), pv_(pv), config_(config) {
|
|
}
|
|
|
|
void operator()(long long val) {
|
|
pv_->SetInt(val);
|
|
}
|
|
|
|
void operator()(const base::PODArray<char>& str);
|
|
void operator()(const LzfString& lzfstr);
|
|
void operator()(const unique_ptr<LoadTrace>& ptr);
|
|
void operator()(const RdbSBF& src);
|
|
|
|
std::error_code ec() const {
|
|
return ec_;
|
|
}
|
|
|
|
private:
|
|
void CreateSet(const LoadTrace* ltrace);
|
|
void CreateHMap(const LoadTrace* ltrace);
|
|
void CreateList(const LoadTrace* ltrace);
|
|
void CreateZSet(const LoadTrace* ltrace);
|
|
void CreateStream(const LoadTrace* ltrace);
|
|
|
|
void HandleBlob(string_view blob);
|
|
|
|
string_view ToSV(const RdbVariant& obj);
|
|
|
|
// Returns whether pv_ has the given object type and encoding. If not ec_
|
|
// is set to the error.
|
|
bool EnsureObjEncoding(CompactObjType type, unsigned encoding);
|
|
|
|
template <typename F> static void Iterate(const LoadTrace& ltrace, F&& f) {
|
|
for (const auto& blob : ltrace.arr) {
|
|
if (!f(blob)) {
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
std::error_code ec_;
|
|
int rdb_type_;
|
|
base::PODArray<char> tset_blob_;
|
|
PrimeValue* pv_;
|
|
LoadConfig config_;
|
|
};
|
|
|
|
RdbLoaderBase::RdbLoaderBase() : origin_mem_buf_{16_KB} {
|
|
mem_buf_ = &origin_mem_buf_;
|
|
}
|
|
|
|
RdbLoaderBase::~RdbLoaderBase() {
|
|
}
|
|
|
|
void RdbLoaderBase::OpaqueObjLoader::operator()(const base::PODArray<char>& str) {
|
|
string_view sv(str.data(), str.size());
|
|
HandleBlob(sv);
|
|
}
|
|
|
|
void RdbLoaderBase::OpaqueObjLoader::operator()(const LzfString& lzfstr) {
|
|
string tmp(lzfstr.uncompressed_len, '\0');
|
|
if (lzf_decompress(lzfstr.compressed_blob.data(), lzfstr.compressed_blob.size(), tmp.data(),
|
|
tmp.size()) == 0) {
|
|
LOG(ERROR) << "Invalid LZF compressed string";
|
|
ec_ = RdbError(errc::rdb_file_corrupted);
|
|
return;
|
|
}
|
|
HandleBlob(tmp);
|
|
}
|
|
|
|
void RdbLoaderBase::OpaqueObjLoader::operator()(const unique_ptr<LoadTrace>& ptr) {
|
|
switch (rdb_type_) {
|
|
case RDB_TYPE_SET:
|
|
case RDB_TYPE_SET_WITH_EXPIRY:
|
|
CreateSet(ptr.get());
|
|
break;
|
|
case RDB_TYPE_HASH:
|
|
case RDB_TYPE_HASH_WITH_EXPIRY:
|
|
CreateHMap(ptr.get());
|
|
break;
|
|
case RDB_TYPE_LIST_QUICKLIST:
|
|
case RDB_TYPE_LIST_QUICKLIST_2:
|
|
CreateList(ptr.get());
|
|
break;
|
|
case RDB_TYPE_ZSET:
|
|
case RDB_TYPE_ZSET_2:
|
|
CreateZSet(ptr.get());
|
|
break;
|
|
case RDB_TYPE_STREAM_LISTPACKS:
|
|
case RDB_TYPE_STREAM_LISTPACKS_2:
|
|
case RDB_TYPE_STREAM_LISTPACKS_3:
|
|
CreateStream(ptr.get());
|
|
break;
|
|
default:
|
|
LOG(FATAL) << "Unsupported rdb type " << rdb_type_;
|
|
}
|
|
}
|
|
|
|
void RdbLoaderBase::OpaqueObjLoader::operator()(const RdbSBF& src) {
|
|
SBF* sbf =
|
|
CompactObj::AllocateMR<SBF>(src.grow_factor, src.fp_prob, src.max_capacity, src.prev_size,
|
|
src.current_size, CompactObj::memory_resource());
|
|
for (unsigned i = 0; i < src.filters.size(); ++i) {
|
|
sbf->AddFilter(src.filters[i].blob, src.filters[i].hash_cnt);
|
|
}
|
|
pv_->SetSBF(sbf);
|
|
}
|
|
|
|
void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
|
|
size_t len = ltrace->arr.size();
|
|
|
|
bool is_intset = true;
|
|
if (!config_.streamed && rdb_type_ == RDB_TYPE_HASH &&
|
|
ltrace->arr.size() <= SetFamily::MaxIntsetEntries()) {
|
|
Iterate(*ltrace, [&](const LoadBlob& blob) {
|
|
if (!holds_alternative<long long>(blob.rdb_var)) {
|
|
is_intset = false;
|
|
return false;
|
|
}
|
|
return true;
|
|
});
|
|
} else {
|
|
/* Use a regular set when there are too many entries, or when the
|
|
* set is being streamed. */
|
|
is_intset = false;
|
|
}
|
|
|
|
sds sdsele = nullptr;
|
|
void* inner_obj = nullptr;
|
|
|
|
auto cleanup = absl::MakeCleanup([&] {
|
|
if (sdsele)
|
|
sdsfree(sdsele);
|
|
if (inner_obj) {
|
|
if (is_intset) {
|
|
zfree(inner_obj);
|
|
} else {
|
|
CompactObj::DeleteMR<StringSet>(inner_obj);
|
|
}
|
|
}
|
|
});
|
|
|
|
if (is_intset) {
|
|
inner_obj = intsetNew();
|
|
|
|
long long llval;
|
|
Iterate(*ltrace, [&](const LoadBlob& blob) {
|
|
llval = get<long long>(blob.rdb_var);
|
|
uint8_t success;
|
|
inner_obj = intsetAdd((intset*)inner_obj, llval, &success);
|
|
if (!success) {
|
|
LOG(ERROR) << "Duplicate set members detected";
|
|
ec_ = RdbError(errc::duplicate_key);
|
|
return false;
|
|
}
|
|
return true;
|
|
});
|
|
} else {
|
|
StringSet* set;
|
|
if (config_.append) {
|
|
// Note we always use StringSet when the object is being streamed.
|
|
if (!EnsureObjEncoding(OBJ_SET, kEncodingStrMap2)) {
|
|
return;
|
|
}
|
|
set = static_cast<StringSet*>(pv_->RObjPtr());
|
|
} else {
|
|
set = CompactObj::AllocateMR<StringSet>();
|
|
set->set_time(MemberTimeSeconds(GetCurrentTimeMs()));
|
|
inner_obj = set;
|
|
|
|
// Expand the set up front to avoid rehashing.
|
|
set->Reserve((config_.reserve > len) ? config_.reserve : len);
|
|
}
|
|
|
|
size_t increment = 1;
|
|
if (rdb_type_ == RDB_TYPE_SET_WITH_EXPIRY) {
|
|
increment = 2;
|
|
}
|
|
|
|
for (size_t i = 0; i < ltrace->arr.size(); i += increment) {
|
|
string_view element = ToSV(ltrace->arr[i].rdb_var);
|
|
|
|
uint32_t ttl_sec = UINT32_MAX;
|
|
if (increment == 2) {
|
|
int64_t ttl_time = -1;
|
|
string_view ttl_str = ToSV(ltrace->arr[i + 1].rdb_var);
|
|
if (!absl::SimpleAtoi(ttl_str, &ttl_time)) {
|
|
LOG(ERROR) << "Can't parse set TTL " << ttl_str;
|
|
ec_ = RdbError(errc::rdb_file_corrupted);
|
|
return;
|
|
}
|
|
|
|
if (ttl_time != -1) {
|
|
if (ttl_time < set->time_now()) {
|
|
continue;
|
|
}
|
|
|
|
ttl_sec = ttl_time - set->time_now();
|
|
}
|
|
}
|
|
if (!set->Add(element, ttl_sec)) {
|
|
LOG(ERROR) << "Duplicate set members detected";
|
|
ec_ = RdbError(errc::duplicate_key);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
|
|
if (ec_)
|
|
return;
|
|
|
|
if (!config_.append) {
|
|
pv_->InitRobj(OBJ_SET, is_intset ? kEncodingIntSet : kEncodingStrMap2, inner_obj);
|
|
}
|
|
std::move(cleanup).Cancel();
|
|
}
|
|
|
|
void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
|
|
size_t increment = 2;
|
|
if (rdb_type_ == RDB_TYPE_HASH_WITH_EXPIRY)
|
|
increment = 3;
|
|
|
|
size_t len = ltrace->arr.size() / increment;
|
|
|
|
/* Too many entries? Use a hash table right from the start. */
|
|
bool keep_lp = !config_.streamed && (len <= 64) && (rdb_type_ != RDB_TYPE_HASH_WITH_EXPIRY);
|
|
|
|
size_t lp_size = 0;
|
|
if (keep_lp) {
|
|
Iterate(*ltrace, [&](const LoadBlob& blob) {
|
|
size_t str_len = StrLen(blob.rdb_var);
|
|
lp_size += str_len;
|
|
|
|
if (str_len > server.max_map_field_len) {
|
|
keep_lp = false;
|
|
return false;
|
|
}
|
|
return true;
|
|
});
|
|
}
|
|
|
|
if (keep_lp) {
|
|
uint8_t* lp = lpNew(lp_size);
|
|
|
|
CHECK(ltrace->arr.size() % 2 == 0);
|
|
for (size_t i = 0; i < ltrace->arr.size(); i += 2) {
|
|
/* Add pair to listpack */
|
|
string_view sv = ToSV(ltrace->arr[i].rdb_var);
|
|
lp = lpAppend(lp, reinterpret_cast<const uint8_t*>(sv.data()), sv.size());
|
|
|
|
sv = ToSV(ltrace->arr[i + 1].rdb_var);
|
|
lp = lpAppend(lp, reinterpret_cast<const uint8_t*>(sv.data()), sv.size());
|
|
}
|
|
|
|
if (ec_) {
|
|
lpFree(lp);
|
|
return;
|
|
}
|
|
|
|
lp = lpShrinkToFit(lp);
|
|
pv_->InitRobj(OBJ_HASH, kEncodingListPack, lp);
|
|
} else {
|
|
StringMap* string_map;
|
|
if (config_.append) {
|
|
// Note we always use StringMap when the object is being streamed.
|
|
if (!EnsureObjEncoding(OBJ_HASH, kEncodingStrMap2)) {
|
|
return;
|
|
}
|
|
|
|
string_map = static_cast<StringMap*>(pv_->RObjPtr());
|
|
} else {
|
|
string_map = CompactObj::AllocateMR<StringMap>();
|
|
string_map->set_time(MemberTimeSeconds(GetCurrentTimeMs()));
|
|
|
|
// Expand the map up front to avoid rehashing.
|
|
string_map->Reserve((config_.reserve > len) ? config_.reserve : len);
|
|
}
|
|
|
|
auto cleanup = absl::MakeCleanup([&] {
|
|
if (!config_.append) {
|
|
CompactObj::DeleteMR<StringMap>(string_map);
|
|
}
|
|
});
|
|
std::string key;
|
|
std::string val;
|
|
for (size_t i = 0; i < ltrace->arr.size(); i += increment) {
|
|
// ToSV may reference an internal buffer, therefore we can use only before the
|
|
// next call to ToSV. To workaround, copy the key locally.
|
|
key = ToSV(ltrace->arr[i].rdb_var);
|
|
val = ToSV(ltrace->arr[i + 1].rdb_var);
|
|
|
|
if (ec_)
|
|
return;
|
|
|
|
uint32_t ttl_sec = UINT32_MAX;
|
|
if (increment == 3) {
|
|
int64_t ttl_time = -1;
|
|
string_view ttl_str = ToSV(ltrace->arr[i + 2].rdb_var);
|
|
if (!absl::SimpleAtoi(ttl_str, &ttl_time)) {
|
|
LOG(ERROR) << "Can't parse hashmap TTL for " << key << ", ttl='" << ttl_str
|
|
<< "', val=" << val;
|
|
ec_ = RdbError(errc::rdb_file_corrupted);
|
|
return;
|
|
}
|
|
|
|
if (ttl_time != -1) {
|
|
if (ttl_time < string_map->time_now()) {
|
|
continue;
|
|
}
|
|
|
|
ttl_sec = ttl_time - string_map->time_now();
|
|
}
|
|
}
|
|
|
|
if (!string_map->AddOrSkip(key, val, ttl_sec)) {
|
|
LOG(ERROR) << "Duplicate hash fields detected for field " << key;
|
|
ec_ = RdbError(errc::rdb_file_corrupted);
|
|
return;
|
|
}
|
|
}
|
|
if (!config_.append) {
|
|
pv_->InitRobj(OBJ_HASH, kEncodingStrMap2, string_map);
|
|
}
|
|
std::move(cleanup).Cancel();
|
|
}
|
|
}
|
|
|
|
void RdbLoaderBase::OpaqueObjLoader::CreateList(const LoadTrace* ltrace) {
|
|
quicklist* ql = nullptr;
|
|
QList* qlv2 = nullptr;
|
|
if (config_.append) {
|
|
if (pv_->ObjType() != OBJ_LIST) {
|
|
ec_ = RdbError(errc::invalid_rdb_type);
|
|
return;
|
|
}
|
|
if (pv_->Encoding() == OBJ_ENCODING_QUICKLIST) {
|
|
ql = static_cast<quicklist*>(pv_->RObjPtr());
|
|
} else {
|
|
DCHECK_EQ(pv_->Encoding(), kEncodingQL2);
|
|
qlv2 = static_cast<QList*>(pv_->RObjPtr());
|
|
}
|
|
} else {
|
|
if (absl::GetFlag(FLAGS_list_experimental_v2)) {
|
|
qlv2 = CompactObj::AllocateMR<QList>(GetFlag(FLAGS_list_max_listpack_size),
|
|
GetFlag(FLAGS_list_compress_depth));
|
|
} else {
|
|
ql = quicklistNew(GetFlag(FLAGS_list_max_listpack_size), GetFlag(FLAGS_list_compress_depth));
|
|
}
|
|
}
|
|
|
|
auto cleanup = absl::Cleanup([&] {
|
|
if (!config_.append) {
|
|
if (ql)
|
|
quicklistRelease(ql);
|
|
else
|
|
CompactObj::DeleteMR<QList>(qlv2);
|
|
}
|
|
});
|
|
|
|
Iterate(*ltrace, [&](const LoadBlob& blob) {
|
|
unsigned container = blob.encoding;
|
|
string_view sv = ToSV(blob.rdb_var);
|
|
|
|
if (ec_)
|
|
return false;
|
|
|
|
uint8_t* lp = nullptr;
|
|
if (container == QUICKLIST_NODE_CONTAINER_PLAIN) {
|
|
lp = (uint8_t*)zmalloc(sv.size());
|
|
::memcpy(lp, (uint8_t*)sv.data(), sv.size());
|
|
if (ql)
|
|
quicklistAppendPlainNode(ql, lp, sv.size());
|
|
else
|
|
qlv2->AppendPlain(lp, sv.size());
|
|
|
|
return true;
|
|
}
|
|
|
|
if (rdb_type_ == RDB_TYPE_LIST_QUICKLIST_2) {
|
|
uint8_t* src = (uint8_t*)sv.data();
|
|
if (!lpValidateIntegrity(src, sv.size(), 0, nullptr, nullptr)) {
|
|
LOG(ERROR) << "Listpack integrity check failed.";
|
|
ec_ = RdbError(errc::rdb_file_corrupted);
|
|
return false;
|
|
}
|
|
|
|
if (lpLength(src) == 0) {
|
|
return true;
|
|
}
|
|
|
|
lp = (uint8_t*)zmalloc(sv.size());
|
|
::memcpy(lp, src, sv.size());
|
|
} else {
|
|
lp = lpNew(sv.size());
|
|
if (!ziplistValidateIntegrity((uint8_t*)sv.data(), sv.size(), 1,
|
|
ziplistEntryConvertAndValidate, &lp)) {
|
|
LOG(ERROR) << "Ziplist integrity check failed: " << sv.size();
|
|
zfree(lp);
|
|
ec_ = RdbError(errc::rdb_file_corrupted);
|
|
return false;
|
|
}
|
|
|
|
/* Silently skip empty ziplists, if we'll end up with empty quicklist we'll fail later. */
|
|
if (lpLength(lp) == 0) {
|
|
zfree(lp);
|
|
return true;
|
|
}
|
|
|
|
lp = lpShrinkToFit(lp);
|
|
}
|
|
|
|
if (ql)
|
|
quicklistAppendListpack(ql, lp);
|
|
else
|
|
qlv2->AppendListpack(lp);
|
|
return true;
|
|
});
|
|
|
|
if (ec_)
|
|
return;
|
|
if ((ql && quicklistCount(ql) == 0) || (qlv2 && qlv2->Size() == 0)) {
|
|
ec_ = RdbError(errc::empty_key);
|
|
return;
|
|
}
|
|
|
|
std::move(cleanup).Cancel();
|
|
|
|
if (!config_.append) {
|
|
if (ql)
|
|
pv_->InitRobj(OBJ_LIST, OBJ_ENCODING_QUICKLIST, ql);
|
|
else
|
|
pv_->InitRobj(OBJ_LIST, kEncodingQL2, qlv2);
|
|
}
|
|
}
|
|
|
|
void RdbLoaderBase::OpaqueObjLoader::CreateZSet(const LoadTrace* ltrace) {
|
|
size_t zsetlen = ltrace->arr.size();
|
|
|
|
unsigned encoding = OBJ_ENCODING_SKIPLIST;
|
|
detail::SortedMap* zs;
|
|
if (config_.append) {
|
|
// Note we always use SortedMap when the object is being streamed.
|
|
if (!EnsureObjEncoding(OBJ_ZSET, OBJ_ENCODING_SKIPLIST)) {
|
|
return;
|
|
}
|
|
|
|
zs = static_cast<detail::SortedMap*>(pv_->RObjPtr());
|
|
} else {
|
|
zs = CompactObj::AllocateMR<detail::SortedMap>();
|
|
|
|
size_t reserve = (config_.reserve > zsetlen) ? config_.reserve : zsetlen;
|
|
if (reserve > 2 && !zs->Reserve(reserve)) {
|
|
LOG(ERROR) << "OOM in dictTryExpand " << zsetlen;
|
|
ec_ = RdbError(errc::out_of_memory);
|
|
return;
|
|
}
|
|
}
|
|
|
|
auto cleanup = absl::MakeCleanup([&] {
|
|
if (!config_.append) {
|
|
CompactObj::DeleteMR<detail::SortedMap>(zs);
|
|
}
|
|
});
|
|
|
|
size_t maxelelen = 0, totelelen = 0;
|
|
|
|
Iterate(*ltrace, [&](const LoadBlob& blob) {
|
|
string_view sv = ToSV(blob.rdb_var);
|
|
|
|
double score = blob.score;
|
|
|
|
/* Don't care about integer-encoded strings. */
|
|
if (sv.size() > maxelelen)
|
|
maxelelen = sv.size();
|
|
totelelen += sv.size();
|
|
|
|
if (!zs->InsertNew(score, sv)) {
|
|
LOG(ERROR) << "Duplicate zset fields detected";
|
|
ec_ = RdbError(errc::rdb_file_corrupted);
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
});
|
|
|
|
if (ec_)
|
|
return;
|
|
|
|
void* inner = zs;
|
|
if (!config_.streamed && zs->Size() <= server.zset_max_listpack_entries &&
|
|
maxelelen <= server.zset_max_listpack_value && lpSafeToAdd(NULL, totelelen)) {
|
|
encoding = OBJ_ENCODING_LISTPACK;
|
|
inner = zs->ToListPack();
|
|
CompactObj::DeleteMR<detail::SortedMap>(zs);
|
|
}
|
|
|
|
std::move(cleanup).Cancel();
|
|
|
|
if (!config_.append) {
|
|
pv_->InitRobj(OBJ_ZSET, encoding, inner);
|
|
}
|
|
}
|
|
|
|
void RdbLoaderBase::OpaqueObjLoader::CreateStream(const LoadTrace* ltrace) {
|
|
stream* s;
|
|
StreamMemTracker mem_tracker;
|
|
if (config_.append) {
|
|
if (!EnsureObjEncoding(OBJ_STREAM, OBJ_ENCODING_STREAM)) {
|
|
return;
|
|
}
|
|
|
|
s = static_cast<stream*>(pv_->RObjPtr());
|
|
} else {
|
|
s = streamNew();
|
|
}
|
|
|
|
auto cleanup = absl::Cleanup([&] {
|
|
if (config_.append) {
|
|
freeStream(s);
|
|
}
|
|
});
|
|
|
|
for (size_t i = 0; i < ltrace->arr.size(); i += 2) {
|
|
string_view nodekey = ToSV(ltrace->arr[i].rdb_var);
|
|
string_view data = ToSV(ltrace->arr[i + 1].rdb_var);
|
|
|
|
uint8_t* lp = (uint8_t*)data.data();
|
|
|
|
if (!streamValidateListpackIntegrity(lp, data.size(), 0)) {
|
|
LOG(ERROR) << "Stream listpack integrity check failed.";
|
|
ec_ = RdbError(errc::rdb_file_corrupted);
|
|
return;
|
|
}
|
|
unsigned char* first = lpFirst(lp);
|
|
if (first == NULL) {
|
|
/* Serialized listpacks should never be empty, since on
|
|
* deletion we should remove the radix tree key if the
|
|
* resulting listpack is empty. */
|
|
LOG(ERROR) << "Empty listpack inside stream";
|
|
ec_ = RdbError(errc::rdb_file_corrupted);
|
|
return;
|
|
}
|
|
uint8_t* copy_lp = (uint8_t*)zmalloc(data.size());
|
|
::memcpy(copy_lp, lp, data.size());
|
|
/* Insert the key in the radix tree. */
|
|
int retval =
|
|
raxTryInsert(s->rax_tree, (unsigned char*)nodekey.data(), nodekey.size(), copy_lp, NULL);
|
|
if (!retval) {
|
|
zfree(copy_lp);
|
|
LOG(ERROR) << "Listpack re-added with existing key";
|
|
ec_ = RdbError(errc::rdb_file_corrupted);
|
|
return;
|
|
}
|
|
}
|
|
|
|
// We only load the stream metadata and consumer groups (stream_trace) on
|
|
// the final read (when reading the stream in increments). Therefore if
|
|
// stream_trace is null add the partial stream, then stream_trace will be
|
|
// loaded later.
|
|
if (!ltrace->stream_trace) {
|
|
if (!config_.append) {
|
|
pv_->InitRobj(OBJ_STREAM, OBJ_ENCODING_STREAM, s);
|
|
}
|
|
std::move(cleanup).Cancel();
|
|
return;
|
|
}
|
|
|
|
s->length = ltrace->stream_trace->stream_len;
|
|
CopyStreamId(ltrace->stream_trace->last_id, &s->last_id);
|
|
CopyStreamId(ltrace->stream_trace->first_id, &s->first_id);
|
|
CopyStreamId(ltrace->stream_trace->max_deleted_entry_id, &s->max_deleted_entry_id);
|
|
s->entries_added = ltrace->stream_trace->entries_added;
|
|
|
|
if (rdb_type_ == RDB_TYPE_STREAM_LISTPACKS) {
|
|
/* Since the rax is already loaded, we can find the first entry's
|
|
* ID. */
|
|
streamGetEdgeID(s, 1, 1, &s->first_id);
|
|
}
|
|
|
|
for (const auto& cg : ltrace->stream_trace->cgroup) {
|
|
string_view cgname = ToSV(cg.name);
|
|
streamID cg_id;
|
|
cg_id.ms = cg.ms;
|
|
cg_id.seq = cg.seq;
|
|
|
|
uint64_t entries_read = cg.entries_read;
|
|
if (rdb_type_ == RDB_TYPE_STREAM_LISTPACKS) {
|
|
entries_read = streamEstimateDistanceFromFirstEverEntry(s, &cg_id);
|
|
}
|
|
|
|
streamCG* cgroup = streamCreateCG(s, cgname.data(), cgname.size(), &cg_id, entries_read);
|
|
if (cgroup == NULL) {
|
|
LOG(ERROR) << "Duplicated consumer group name " << cgname;
|
|
ec_ = RdbError(errc::duplicate_key);
|
|
return;
|
|
}
|
|
|
|
for (const auto& pel : cg.pel_arr) {
|
|
streamNACK* nack = reinterpret_cast<streamNACK*>(zmalloc(sizeof(*nack)));
|
|
nack->delivery_time = pel.delivery_time;
|
|
nack->delivery_count = pel.delivery_count;
|
|
nack->consumer = nullptr;
|
|
|
|
if (!raxTryInsert(cgroup->pel, const_cast<uint8_t*>(pel.rawid.data()), pel.rawid.size(), nack,
|
|
NULL)) {
|
|
LOG(ERROR) << "Duplicated global PEL entry loading stream consumer group";
|
|
ec_ = RdbError(errc::duplicate_key);
|
|
streamFreeNACK(nack);
|
|
return;
|
|
}
|
|
}
|
|
|
|
for (const auto& cons : cg.cons_arr) {
|
|
streamConsumer* consumer = StreamCreateConsumer(cgroup, ToSV(cons.name), cons.seen_time,
|
|
SCC_NO_NOTIFY | SCC_NO_DIRTIFY);
|
|
if (!consumer) {
|
|
LOG(ERROR) << "Duplicate stream consumer detected.";
|
|
ec_ = RdbError(errc::duplicate_key);
|
|
return;
|
|
}
|
|
|
|
consumer->active_time = cons.active_time;
|
|
/* Create the PEL (pending entries list) about entries owned by this specific
|
|
* consumer. */
|
|
for (const auto& rawid : cons.nack_arr) {
|
|
uint8_t* ptr = const_cast<uint8_t*>(rawid.data());
|
|
streamNACK* nack = (streamNACK*)raxFind(cgroup->pel, ptr, rawid.size());
|
|
if (nack == raxNotFound) {
|
|
LOG(ERROR) << "Consumer entry not found in group global PEL";
|
|
ec_ = RdbError(errc::rdb_file_corrupted);
|
|
return;
|
|
}
|
|
|
|
/* Set the NACK consumer, that was left to NULL when
|
|
* loading the global PEL. Then set the same shared
|
|
* NACK structure also in the consumer-specific PEL. */
|
|
nack->consumer = consumer;
|
|
if (!raxTryInsert(consumer->pel, ptr, rawid.size(), nack, NULL)) {
|
|
LOG(ERROR) << "Duplicated consumer PEL entry loading a stream consumer group";
|
|
streamFreeNACK(nack);
|
|
ec_ = RdbError(errc::duplicate_key);
|
|
return;
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
std::move(cleanup).Cancel();
|
|
if (!config_.append) {
|
|
pv_->InitRobj(OBJ_STREAM, OBJ_ENCODING_STREAM, s);
|
|
}
|
|
mem_tracker.UpdateStreamSize(*pv_);
|
|
}
|
|
|
|
void RdbLoaderBase::OpaqueObjLoader::HandleBlob(string_view blob) {
|
|
if (rdb_type_ == RDB_TYPE_STRING) {
|
|
if (config_.append) {
|
|
pv_->AppendString(blob);
|
|
} else if (config_.reserve) {
|
|
pv_->ReserveString(config_.reserve);
|
|
pv_->AppendString(blob);
|
|
} else {
|
|
pv_->SetString(blob);
|
|
}
|
|
return;
|
|
}
|
|
|
|
if (rdb_type_ == RDB_TYPE_SET_INTSET) {
|
|
if (!intsetValidateIntegrity((const uint8_t*)blob.data(), blob.size(), 0)) {
|
|
LOG(ERROR) << "Intset integrity check failed.";
|
|
ec_ = RdbError(errc::rdb_file_corrupted);
|
|
return;
|
|
}
|
|
|
|
const intset* is = (const intset*)blob.data();
|
|
|
|
unsigned len = intsetLen(is);
|
|
|
|
if (len > SetFamily::MaxIntsetEntries()) {
|
|
StringSet* set = SetFamily::ConvertToStrSet(is, len);
|
|
|
|
if (!set) {
|
|
LOG(ERROR) << "OOM in ConvertToStrSet " << len;
|
|
ec_ = RdbError(errc::out_of_memory);
|
|
return;
|
|
}
|
|
pv_->InitRobj(OBJ_SET, kEncodingStrMap2, set);
|
|
} else {
|
|
intset* mine = (intset*)zmalloc(blob.size());
|
|
::memcpy(mine, blob.data(), blob.size());
|
|
pv_->InitRobj(OBJ_SET, kEncodingIntSet, mine);
|
|
}
|
|
} else if (rdb_type_ == RDB_TYPE_SET_LISTPACK) {
|
|
if (!lpValidateIntegrity((uint8_t*)blob.data(), blob.size(), 0, nullptr, nullptr)) {
|
|
LOG(ERROR) << "ListPack integrity check failed.";
|
|
ec_ = RdbError(errc::rdb_file_corrupted);
|
|
return;
|
|
}
|
|
|
|
unsigned char* lp = (unsigned char*)blob.data();
|
|
StringSet* set = CompactObj::AllocateMR<StringSet>();
|
|
for (unsigned char* cur = lpFirst(lp); cur != nullptr; cur = lpNext(lp, cur)) {
|
|
unsigned char field_buf[LP_INTBUF_SIZE];
|
|
string_view elem = container_utils::LpGetView(cur, field_buf);
|
|
if (!set->Add(elem)) {
|
|
LOG(ERROR) << "Duplicate member " << elem;
|
|
ec_ = RdbError(errc::duplicate_key);
|
|
break;
|
|
}
|
|
}
|
|
if (ec_) {
|
|
CompactObj::DeleteMR<StringSet>(set);
|
|
return;
|
|
}
|
|
pv_->InitRobj(OBJ_SET, kEncodingStrMap2, set);
|
|
} else if (rdb_type_ == RDB_TYPE_HASH_ZIPLIST || rdb_type_ == RDB_TYPE_HASH_LISTPACK) {
|
|
unsigned char* lp = lpNew(blob.size());
|
|
switch (rdb_type_) {
|
|
case RDB_TYPE_HASH_ZIPLIST:
|
|
if (!ziplistPairsConvertAndValidateIntegrity((const uint8_t*)blob.data(), blob.size(),
|
|
&lp)) {
|
|
LOG(ERROR) << "Zset ziplist integrity check failed.";
|
|
zfree(lp);
|
|
ec_ = RdbError(errc::rdb_file_corrupted);
|
|
return;
|
|
}
|
|
break;
|
|
case RDB_TYPE_HASH_LISTPACK:
|
|
if (!lpValidateIntegrity((uint8_t*)blob.data(), blob.size(), 0, nullptr, nullptr)) {
|
|
LOG(ERROR) << "ListPack integrity check failed.";
|
|
zfree(lp);
|
|
ec_ = RdbError(errc::rdb_file_corrupted);
|
|
return;
|
|
}
|
|
std::memcpy(lp, blob.data(), blob.size());
|
|
break;
|
|
}
|
|
|
|
if (lpLength(lp) == 0) {
|
|
lpFree(lp);
|
|
ec_ = RdbError(errc::empty_key);
|
|
return;
|
|
}
|
|
|
|
if (lpBytes(lp) > server.max_listpack_map_bytes) {
|
|
StringMap* sm = HSetFamily::ConvertToStrMap(lp);
|
|
lpFree(lp);
|
|
pv_->InitRobj(OBJ_HASH, kEncodingStrMap2, sm);
|
|
} else {
|
|
lp = lpShrinkToFit(lp);
|
|
pv_->InitRobj(OBJ_HASH, kEncodingListPack, lp);
|
|
}
|
|
return;
|
|
} else if (rdb_type_ == RDB_TYPE_ZSET_ZIPLIST) {
|
|
unsigned char* lp = lpNew(blob.size());
|
|
if (!ziplistPairsConvertAndValidateIntegrity((uint8_t*)blob.data(), blob.size(), &lp)) {
|
|
LOG(ERROR) << "Zset ziplist integrity check failed.";
|
|
zfree(lp);
|
|
ec_ = RdbError(errc::rdb_file_corrupted);
|
|
return;
|
|
}
|
|
|
|
if (lpLength(lp) == 0) {
|
|
lpFree(lp);
|
|
ec_ = RdbError(errc::empty_key);
|
|
return;
|
|
}
|
|
|
|
unsigned encoding = OBJ_ENCODING_LISTPACK;
|
|
void* inner;
|
|
if (lpBytes(lp) >= server.max_listpack_map_bytes) {
|
|
inner = detail::SortedMap::FromListPack(CompactObj::memory_resource(), lp);
|
|
lpFree(lp);
|
|
encoding = OBJ_ENCODING_SKIPLIST;
|
|
} else {
|
|
lp = lpShrinkToFit(lp);
|
|
inner = lp;
|
|
}
|
|
pv_->InitRobj(OBJ_ZSET, encoding, inner);
|
|
return;
|
|
} else if (rdb_type_ == RDB_TYPE_ZSET_LISTPACK) {
|
|
if (!lpValidateIntegrity((uint8_t*)blob.data(), blob.size(), 0, nullptr, nullptr)) {
|
|
LOG(ERROR) << "ListPack integrity check failed.";
|
|
ec_ = RdbError(errc::rdb_file_corrupted);
|
|
return;
|
|
}
|
|
unsigned char* src_lp = (unsigned char*)blob.data();
|
|
unsigned long long bytes = lpBytes(src_lp);
|
|
unsigned char* lp = (uint8_t*)zmalloc(bytes);
|
|
std::memcpy(lp, src_lp, bytes);
|
|
pv_->InitRobj(OBJ_ZSET, OBJ_ENCODING_LISTPACK, lp);
|
|
} else if (rdb_type_ == RDB_TYPE_JSON) {
|
|
size_t start_size = static_cast<MiMemoryResource*>(CompactObj::memory_resource())->used();
|
|
{
|
|
auto json = JsonFromString(blob, CompactObj::memory_resource());
|
|
if (json) {
|
|
pv_->SetJson(std::move(*json));
|
|
} else {
|
|
LOG(INFO) << "Invalid JSON string during rdb load of JSON object: " << blob;
|
|
ec_ = RdbError(errc::bad_json_string);
|
|
return;
|
|
}
|
|
}
|
|
size_t end_size = static_cast<MiMemoryResource*>(CompactObj::memory_resource())->used();
|
|
DCHECK(end_size > start_size);
|
|
pv_->SetJsonSize(end_size - start_size);
|
|
} else {
|
|
LOG(FATAL) << "Unsupported rdb type " << rdb_type_;
|
|
}
|
|
}
|
|
|
|
string_view RdbLoaderBase::OpaqueObjLoader::ToSV(const RdbVariant& obj) {
|
|
if (holds_alternative<long long>(obj)) {
|
|
tset_blob_.resize(32);
|
|
auto val = get<long long>(obj);
|
|
char* next = absl::numbers_internal::FastIntToBuffer(val, tset_blob_.data());
|
|
return string_view{tset_blob_.data(), size_t(next - tset_blob_.data())};
|
|
}
|
|
|
|
const base::PODArray<char>* ch_arr = get_if<base::PODArray<char>>(&obj);
|
|
if (ch_arr) {
|
|
// pass non-null pointer to avoid UB with lp API.
|
|
return ch_arr->empty() ? ""sv : string_view{ch_arr->data(), ch_arr->size()};
|
|
}
|
|
|
|
const LzfString* lzf = get_if<LzfString>(&obj);
|
|
if (lzf) {
|
|
tset_blob_.resize(lzf->uncompressed_len);
|
|
if (lzf_decompress(lzf->compressed_blob.data(), lzf->compressed_blob.size(), tset_blob_.data(),
|
|
lzf->uncompressed_len) == 0) {
|
|
LOG(ERROR) << "Invalid LZF compressed string";
|
|
ec_ = RdbError(errc::rdb_file_corrupted);
|
|
return string_view{tset_blob_.data(), 0};
|
|
}
|
|
return string_view{tset_blob_.data(), tset_blob_.size()};
|
|
}
|
|
|
|
LOG(FATAL) << "Unexpected variant";
|
|
return string_view{};
|
|
}
|
|
|
|
bool RdbLoaderBase::OpaqueObjLoader::EnsureObjEncoding(CompactObjType type, unsigned encoding) {
|
|
if (pv_->ObjType() != type) {
|
|
LOG(DFATAL) << "Invalid RDB type " << pv_->ObjType() << "; expected " << type;
|
|
ec_ = RdbError(errc::invalid_rdb_type);
|
|
return false;
|
|
}
|
|
if (pv_->Encoding() != encoding) {
|
|
LOG(DFATAL) << "Invalid encoding " << pv_->Encoding() << "; expected " << encoding;
|
|
ec_ = RdbError(errc::invalid_encoding);
|
|
return false;
|
|
}
|
|
|
|
return true;
|
|
}
|
|
|
|
std::error_code RdbLoaderBase::FetchBuf(size_t size, void* dest) {
|
|
if (size == 0)
|
|
return kOk;
|
|
|
|
uint8_t* next = (uint8_t*)dest;
|
|
size_t bytes_read;
|
|
|
|
size_t to_copy = std::min(mem_buf_->InputLen(), size);
|
|
DVLOG(3) << "Copying " << to_copy << " bytes";
|
|
|
|
::memcpy(next, mem_buf_->InputBuffer().data(), to_copy);
|
|
mem_buf_->ConsumeInput(to_copy);
|
|
size -= to_copy;
|
|
if (size == 0)
|
|
return kOk;
|
|
|
|
next += to_copy;
|
|
|
|
if (size + bytes_read_ > source_limit_) {
|
|
LOG(ERROR) << "Out of bound read " << size + bytes_read_ << " vs " << source_limit_;
|
|
|
|
return RdbError(errc::rdb_file_corrupted);
|
|
}
|
|
|
|
if (size > 512) { // Worth reading directly into next.
|
|
io::MutableBytes mb{next, size};
|
|
|
|
SET_OR_RETURN(src_->Read(mb), bytes_read);
|
|
if (bytes_read < size)
|
|
return RdbError(errc::rdb_file_corrupted);
|
|
|
|
bytes_read_ += bytes_read;
|
|
DCHECK_LE(bytes_read_, source_limit_);
|
|
|
|
return kOk;
|
|
}
|
|
|
|
io::MutableBytes mb = mem_buf_->AppendBuffer();
|
|
|
|
// Must be because mem_buf_ is be empty.
|
|
DCHECK_GT(mb.size(), size);
|
|
|
|
if (bytes_read_ + mb.size() > source_limit_) {
|
|
mb = mb.subspan(0, source_limit_ - bytes_read_);
|
|
}
|
|
|
|
SET_OR_RETURN(src_->ReadAtLeast(mb, size), bytes_read);
|
|
|
|
if (bytes_read < size)
|
|
return RdbError(errc::rdb_file_corrupted);
|
|
bytes_read_ += bytes_read;
|
|
|
|
DCHECK_LE(bytes_read_, source_limit_);
|
|
|
|
mem_buf_->CommitWrite(bytes_read);
|
|
::memcpy(next, mem_buf_->InputBuffer().data(), size);
|
|
mem_buf_->ConsumeInput(size);
|
|
|
|
return kOk;
|
|
}
|
|
|
|
size_t RdbLoaderBase::StrLen(const RdbVariant& tset) {
|
|
const base::PODArray<char>* arr = get_if<base::PODArray<char>>(&tset);
|
|
if (arr)
|
|
return arr->size();
|
|
|
|
if (holds_alternative<long long>(tset)) {
|
|
auto val = get<long long>(tset);
|
|
char buf[32];
|
|
char* next = absl::numbers_internal::FastIntToBuffer(val, buf);
|
|
return (next - buf);
|
|
}
|
|
|
|
const LzfString* lzf = get_if<LzfString>(&tset);
|
|
if (lzf)
|
|
return lzf->uncompressed_len;
|
|
|
|
LOG(DFATAL) << "should not reach";
|
|
return 0;
|
|
}
|
|
|
|
auto RdbLoaderBase::FetchGenericString() -> io::Result<string> {
|
|
bool isencoded;
|
|
size_t len;
|
|
|
|
SET_OR_UNEXPECT(LoadLen(&isencoded), len);
|
|
|
|
if (isencoded) {
|
|
switch (len) {
|
|
case RDB_ENC_INT8:
|
|
case RDB_ENC_INT16:
|
|
case RDB_ENC_INT32:
|
|
return FetchIntegerObject(len);
|
|
case RDB_ENC_LZF:
|
|
return FetchLzfStringObject();
|
|
default:
|
|
LOG(ERROR) << "Unknown RDB string encoding len " << len;
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
}
|
|
}
|
|
|
|
string res;
|
|
|
|
if (len > 0) {
|
|
res.resize(len);
|
|
error_code ec = FetchBuf(len, res.data());
|
|
if (ec) {
|
|
return make_unexpected(ec);
|
|
}
|
|
}
|
|
|
|
return res;
|
|
}
|
|
|
|
auto RdbLoaderBase::FetchLzfStringObject() -> io::Result<string> {
|
|
bool zerocopy_decompress = true;
|
|
|
|
const uint8_t* cbuf = NULL;
|
|
uint64_t clen, len;
|
|
|
|
SET_OR_UNEXPECT(LoadLen(NULL), clen);
|
|
SET_OR_UNEXPECT(LoadLen(NULL), len);
|
|
|
|
if (len > 1ULL << 29 || len <= clen || clen == 0) {
|
|
LOG(ERROR) << "Bad compressed string";
|
|
return Unexpected(rdb::rdb_file_corrupted);
|
|
}
|
|
|
|
if (mem_buf_->InputLen() >= clen) {
|
|
cbuf = mem_buf_->InputBuffer().data();
|
|
} else {
|
|
compr_buf_.resize(clen);
|
|
zerocopy_decompress = false;
|
|
|
|
/* Load the compressed representation and uncompress it to target. */
|
|
error_code ec = FetchBuf(clen, compr_buf_.data());
|
|
if (ec) {
|
|
return make_unexpected(ec);
|
|
}
|
|
cbuf = compr_buf_.data();
|
|
}
|
|
|
|
string res(len, 0);
|
|
|
|
if (lzf_decompress(cbuf, clen, res.data(), len) == 0) {
|
|
LOG(ERROR) << "Invalid LZF compressed string";
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
}
|
|
|
|
// FetchBuf consumes the input but if we have not went through that path
|
|
// we need to consume now.
|
|
if (zerocopy_decompress)
|
|
mem_buf_->ConsumeInput(clen);
|
|
|
|
return res;
|
|
}
|
|
|
|
auto RdbLoaderBase::FetchIntegerObject(int enctype) -> io::Result<string> {
|
|
io::Result<long long> val = ReadIntObj(enctype);
|
|
|
|
if (!val.has_value()) {
|
|
return val.get_unexpected();
|
|
}
|
|
|
|
char buf[32];
|
|
absl::numbers_internal::FastIntToBuffer(*val, buf);
|
|
|
|
return string(buf);
|
|
}
|
|
|
|
io::Result<double> RdbLoaderBase::FetchBinaryDouble() {
|
|
union {
|
|
uint64_t val;
|
|
double d;
|
|
} u;
|
|
|
|
static_assert(sizeof(u) == sizeof(uint64_t));
|
|
auto ec = EnsureRead(8);
|
|
if (ec)
|
|
return make_unexpected(ec);
|
|
|
|
uint8_t buf[8];
|
|
mem_buf_->ReadAndConsume(8, buf);
|
|
u.val = base::LE::LoadT<uint64_t>(buf);
|
|
return u.d;
|
|
}
|
|
|
|
io::Result<double> RdbLoaderBase::FetchDouble() {
|
|
uint8_t len;
|
|
|
|
SET_OR_UNEXPECT(FetchInt<uint8_t>(), len);
|
|
constexpr double kInf = std::numeric_limits<double>::infinity();
|
|
switch (len) {
|
|
case 255:
|
|
return -kInf;
|
|
case 254:
|
|
return kInf;
|
|
case 253:
|
|
return std::numeric_limits<double>::quiet_NaN();
|
|
default:;
|
|
}
|
|
char buf[256];
|
|
error_code ec = FetchBuf(len, buf);
|
|
if (ec)
|
|
return make_unexpected(ec);
|
|
buf[len] = '\0';
|
|
double val;
|
|
if (sscanf(buf, "%lg", &val) != 1)
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
return val;
|
|
}
|
|
|
|
auto RdbLoaderBase::ReadKey() -> io::Result<string> {
|
|
return FetchGenericString();
|
|
}
|
|
|
|
error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
|
|
io::Result<OpaqueObj> iores;
|
|
|
|
switch (rdbtype) {
|
|
case RDB_TYPE_SET:
|
|
case RDB_TYPE_SET_WITH_EXPIRY:
|
|
iores = ReadSet(rdbtype);
|
|
break;
|
|
case RDB_TYPE_SET_INTSET:
|
|
iores = ReadIntSet();
|
|
break;
|
|
case RDB_TYPE_HASH_ZIPLIST:
|
|
case RDB_TYPE_HASH_LISTPACK:
|
|
case RDB_TYPE_ZSET_LISTPACK:
|
|
case RDB_TYPE_ZSET_ZIPLIST:
|
|
case RDB_TYPE_STRING:
|
|
case RDB_TYPE_JSON:
|
|
iores = ReadGeneric(rdbtype);
|
|
break;
|
|
case RDB_TYPE_HASH:
|
|
case RDB_TYPE_HASH_WITH_EXPIRY:
|
|
iores = ReadHMap(rdbtype);
|
|
break;
|
|
case RDB_TYPE_ZSET:
|
|
case RDB_TYPE_ZSET_2:
|
|
iores = ReadZSet(rdbtype);
|
|
break;
|
|
case RDB_TYPE_LIST_QUICKLIST:
|
|
case RDB_TYPE_LIST_QUICKLIST_2:
|
|
iores = ReadListQuicklist(rdbtype);
|
|
break;
|
|
case RDB_TYPE_STREAM_LISTPACKS:
|
|
case RDB_TYPE_STREAM_LISTPACKS_2:
|
|
case RDB_TYPE_STREAM_LISTPACKS_3:
|
|
iores = ReadStreams(rdbtype);
|
|
break;
|
|
case RDB_TYPE_SET_LISTPACK:
|
|
// We need to deal with protocol versions 9 and older because in these
|
|
// RDB_TYPE_JSON == 20. On newer versions > 9 we bumped up RDB_TYPE_JSON to 30
|
|
// because it overlapped with the new type RDB_TYPE_SET_LISTPACK
|
|
if (rdb_version_ < 10) {
|
|
// consider it RDB_TYPE_JSON_OLD (20)
|
|
iores = ReadGeneric(RDB_TYPE_JSON);
|
|
} else {
|
|
iores = ReadGeneric(rdbtype);
|
|
}
|
|
break;
|
|
case RDB_TYPE_MODULE_2:
|
|
iores = ReadRedisJson();
|
|
break;
|
|
case RDB_TYPE_SBF:
|
|
iores = ReadSBF();
|
|
break;
|
|
default:
|
|
LOG(ERROR) << "Unsupported rdb type " << rdbtype;
|
|
|
|
return RdbError(errc::invalid_encoding);
|
|
}
|
|
|
|
if (!iores)
|
|
return iores.error();
|
|
*dest = std::move(*iores);
|
|
return error_code{};
|
|
}
|
|
|
|
static const size_t kMaxStringSize = 200_KB;
|
|
|
|
error_code RdbLoaderBase::ReadStringObj(RdbVariant* dest, bool big_string_split) {
|
|
bool isencoded = false;
|
|
size_t len;
|
|
SET_OR_RETURN(LoadLen(&isencoded), len);
|
|
|
|
if (isencoded) {
|
|
switch (len) {
|
|
case RDB_ENC_INT8:
|
|
case RDB_ENC_INT16:
|
|
case RDB_ENC_INT32: {
|
|
io::Result<long long> io_int = ReadIntObj(len);
|
|
if (!io_int)
|
|
return io_int.error();
|
|
dest->emplace<long long>(*io_int);
|
|
return error_code{};
|
|
}
|
|
case RDB_ENC_LZF: {
|
|
io::Result<LzfString> lzf = ReadLzf();
|
|
if (!lzf)
|
|
return lzf.error();
|
|
|
|
dest->emplace<LzfString>(std::move(lzf.value()));
|
|
return error_code{};
|
|
}
|
|
default:
|
|
LOG(ERROR) << "Unknown RDB string encoding " << len;
|
|
return RdbError(errc::rdb_file_corrupted);
|
|
}
|
|
}
|
|
|
|
if (big_string_split && len > kMaxStringSize) {
|
|
pending_read_.remaining = len - kMaxStringSize;
|
|
pending_read_.reserve = len;
|
|
len = kMaxStringSize;
|
|
}
|
|
|
|
auto& blob = dest->emplace<base::PODArray<char>>();
|
|
blob.resize(len);
|
|
return FetchBuf(len, blob.data());
|
|
}
|
|
|
|
error_code RdbLoaderBase::ReadRemainingString(RdbVariant* dest) {
|
|
size_t read_len = std::min(pending_read_.remaining, kMaxStringSize);
|
|
pending_read_.remaining = pending_read_.remaining - read_len;
|
|
|
|
auto& blob = dest->emplace<base::PODArray<char>>();
|
|
blob.resize(read_len);
|
|
return FetchBuf(read_len, blob.data());
|
|
}
|
|
|
|
io::Result<long long> RdbLoaderBase::ReadIntObj(int enctype) {
|
|
long long val;
|
|
|
|
if (enctype == RDB_ENC_INT8) {
|
|
SET_OR_UNEXPECT(FetchInt<int8_t>(), val);
|
|
} else if (enctype == RDB_ENC_INT16) {
|
|
SET_OR_UNEXPECT(FetchInt<int16_t>(), val);
|
|
} else if (enctype == RDB_ENC_INT32) {
|
|
SET_OR_UNEXPECT(FetchInt<int32_t>(), val);
|
|
} else {
|
|
return Unexpected(errc::invalid_encoding);
|
|
}
|
|
return val;
|
|
}
|
|
|
|
auto RdbLoaderBase::ReadLzf() -> io::Result<LzfString> {
|
|
uint64_t clen;
|
|
LzfString res;
|
|
|
|
SET_OR_UNEXPECT(LoadLen(NULL), clen);
|
|
SET_OR_UNEXPECT(LoadLen(NULL), res.uncompressed_len);
|
|
|
|
if (res.uncompressed_len > 1ULL << 29) {
|
|
LOG(ERROR) << "Uncompressed length is too big " << res.uncompressed_len;
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
}
|
|
|
|
res.compressed_blob.resize(clen);
|
|
/* Load the compressed representation and uncompress it to target. */
|
|
error_code ec = FetchBuf(clen, res.compressed_blob.data());
|
|
if (ec) {
|
|
return make_unexpected(ec);
|
|
}
|
|
|
|
return res;
|
|
}
|
|
|
|
auto RdbLoaderBase::ReadSet(int rdbtype) -> io::Result<OpaqueObj> {
|
|
size_t len;
|
|
if (pending_read_.remaining > 0) {
|
|
len = pending_read_.remaining;
|
|
} else {
|
|
SET_OR_UNEXPECT(LoadLen(NULL), len);
|
|
if (rdbtype == RDB_TYPE_SET_WITH_EXPIRY) {
|
|
len *= 2;
|
|
}
|
|
pending_read_.reserve = len;
|
|
}
|
|
|
|
// Limit each read to kMaxBlobLen elements.
|
|
unique_ptr<LoadTrace> load_trace(new LoadTrace);
|
|
size_t n = std::min(len, kMaxBlobLen);
|
|
load_trace->arr.resize(n);
|
|
for (size_t i = 0; i < n; i++) {
|
|
error_code ec = ReadStringObj(&load_trace->arr[i].rdb_var);
|
|
if (ec) {
|
|
return make_unexpected(ec);
|
|
}
|
|
}
|
|
|
|
// If there are still unread elements, cache the number of remaining
|
|
// elements, or clear if the full object has been read.
|
|
if (len > n) {
|
|
pending_read_.remaining = len - n;
|
|
} else if (pending_read_.remaining > 0) {
|
|
pending_read_.remaining = 0;
|
|
}
|
|
|
|
return OpaqueObj{std::move(load_trace), rdbtype};
|
|
}
|
|
|
|
auto RdbLoaderBase::ReadIntSet() -> io::Result<OpaqueObj> {
|
|
RdbVariant obj;
|
|
error_code ec = ReadStringObj(&obj);
|
|
if (ec) {
|
|
return make_unexpected(ec);
|
|
}
|
|
|
|
const LzfString* lzf = get_if<LzfString>(&obj);
|
|
const base::PODArray<char>* arr = get_if<base::PODArray<char>>(&obj);
|
|
|
|
if (lzf) {
|
|
if (lzf->uncompressed_len == 0 || lzf->compressed_blob.empty())
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
} else if (arr) {
|
|
if (arr->empty())
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
} else {
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
}
|
|
|
|
return OpaqueObj{std::move(obj), RDB_TYPE_SET_INTSET};
|
|
}
|
|
|
|
auto RdbLoaderBase::ReadGeneric(int rdbtype) -> io::Result<OpaqueObj> {
|
|
bool is_string_type = RDB_TYPE_STRING == rdbtype;
|
|
RdbVariant str_obj;
|
|
error_code ec;
|
|
if (pending_read_.remaining) {
|
|
ec = ReadRemainingString(&str_obj);
|
|
} else {
|
|
ec = ReadStringObj(&str_obj, is_string_type);
|
|
}
|
|
if (ec)
|
|
return make_unexpected(ec);
|
|
|
|
if (!is_string_type && StrLen(str_obj) == 0) {
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
}
|
|
|
|
return OpaqueObj{std::move(str_obj), rdbtype};
|
|
}
|
|
|
|
auto RdbLoaderBase::ReadHMap(int rdbtype) -> io::Result<OpaqueObj> {
|
|
size_t len;
|
|
if (pending_read_.remaining > 0) {
|
|
len = pending_read_.remaining;
|
|
} else {
|
|
SET_OR_UNEXPECT(LoadLen(NULL), len);
|
|
|
|
if (rdbtype == RDB_TYPE_HASH) {
|
|
len *= 2;
|
|
} else {
|
|
DCHECK_EQ(rdbtype, RDB_TYPE_HASH_WITH_EXPIRY);
|
|
len *= 3;
|
|
}
|
|
|
|
pending_read_.reserve = len;
|
|
}
|
|
|
|
// Limit each read to kMaxBlobLen elements.
|
|
unique_ptr<LoadTrace> load_trace(new LoadTrace);
|
|
size_t n = std::min<size_t>(len, kMaxBlobLen);
|
|
load_trace->arr.resize(n);
|
|
for (size_t i = 0; i < n; ++i) {
|
|
error_code ec = ReadStringObj(&load_trace->arr[i].rdb_var);
|
|
if (ec)
|
|
return make_unexpected(ec);
|
|
}
|
|
|
|
// If there are still unread elements, cache the number of remaining
|
|
// elements, or clear if the full object has been read.
|
|
if (len > n) {
|
|
pending_read_.remaining = len - n;
|
|
} else if (pending_read_.remaining > 0) {
|
|
pending_read_.remaining = 0;
|
|
}
|
|
|
|
return OpaqueObj{std::move(load_trace), rdbtype};
|
|
}
|
|
|
|
auto RdbLoaderBase::ReadZSet(int rdbtype) -> io::Result<OpaqueObj> {
|
|
uint64_t zsetlen;
|
|
if (pending_read_.remaining > 0) {
|
|
zsetlen = pending_read_.remaining;
|
|
} else {
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), zsetlen);
|
|
pending_read_.reserve = zsetlen;
|
|
}
|
|
|
|
if (zsetlen == 0)
|
|
return Unexpected(errc::empty_key);
|
|
|
|
double score;
|
|
|
|
// Limit each read to kMaxBlobLen elements.
|
|
unique_ptr<LoadTrace> load_trace(new LoadTrace);
|
|
size_t n = std::min<size_t>(zsetlen, kMaxBlobLen);
|
|
load_trace->arr.resize(n);
|
|
for (size_t i = 0; i < n; ++i) {
|
|
error_code ec = ReadStringObj(&load_trace->arr[i].rdb_var);
|
|
if (ec)
|
|
return make_unexpected(ec);
|
|
if (rdbtype == RDB_TYPE_ZSET_2) {
|
|
SET_OR_UNEXPECT(FetchBinaryDouble(), score);
|
|
} else {
|
|
SET_OR_UNEXPECT(FetchDouble(), score);
|
|
}
|
|
if (isnan(score)) {
|
|
LOG(ERROR) << "Zset with NAN score detected";
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
}
|
|
load_trace->arr[i].score = score;
|
|
}
|
|
|
|
// If there are still unread elements, cache the number of remaining
|
|
// elements, or clear if the full object has been read.
|
|
if (zsetlen > n) {
|
|
pending_read_.remaining = zsetlen - n;
|
|
} else if (pending_read_.remaining > 0) {
|
|
pending_read_.remaining = 0;
|
|
}
|
|
|
|
return OpaqueObj{std::move(load_trace), rdbtype};
|
|
}
|
|
|
|
auto RdbLoaderBase::ReadListQuicklist(int rdbtype) -> io::Result<OpaqueObj> {
|
|
size_t len;
|
|
if (pending_read_.remaining > 0) {
|
|
len = pending_read_.remaining;
|
|
} else {
|
|
SET_OR_UNEXPECT(LoadLen(NULL), len);
|
|
pending_read_.reserve = len;
|
|
}
|
|
|
|
if (len == 0)
|
|
return Unexpected(errc::empty_key);
|
|
|
|
unique_ptr<LoadTrace> load_trace(new LoadTrace);
|
|
// Lists pack multiple entries into each list node (8Kb by default),
|
|
// therefore using a smaller segment length than kMaxBlobLen.
|
|
size_t n = std::min<size_t>(len, 512);
|
|
load_trace->arr.resize(n);
|
|
for (size_t i = 0; i < n; ++i) {
|
|
uint64_t container = QUICKLIST_NODE_CONTAINER_PACKED;
|
|
if (rdbtype == RDB_TYPE_LIST_QUICKLIST_2) {
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), container);
|
|
|
|
if (container != QUICKLIST_NODE_CONTAINER_PACKED &&
|
|
container != QUICKLIST_NODE_CONTAINER_PLAIN) {
|
|
LOG(ERROR) << "Quicklist integrity check failed.";
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
}
|
|
}
|
|
|
|
RdbVariant var;
|
|
error_code ec = ReadStringObj(&var);
|
|
if (ec)
|
|
return make_unexpected(ec);
|
|
|
|
if (StrLen(var) == 0) {
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
}
|
|
load_trace->arr[i].rdb_var = std::move(var);
|
|
load_trace->arr[i].encoding = container;
|
|
}
|
|
|
|
// If there are still unread elements, cache the number of remaining
|
|
// elements, or clear if the full object has been read.
|
|
if (len > n) {
|
|
pending_read_.remaining = len - n;
|
|
} else if (pending_read_.remaining > 0) {
|
|
pending_read_.remaining = 0;
|
|
}
|
|
|
|
return OpaqueObj{std::move(load_trace), rdbtype};
|
|
}
|
|
|
|
auto RdbLoaderBase::ReadStreams(int rdbtype) -> io::Result<OpaqueObj> {
|
|
size_t listpacks;
|
|
if (pending_read_.remaining > 0) {
|
|
listpacks = pending_read_.remaining;
|
|
} else {
|
|
SET_OR_UNEXPECT(LoadLen(NULL), listpacks);
|
|
}
|
|
|
|
unique_ptr<LoadTrace> load_trace(new LoadTrace);
|
|
// Streams pack multiple entries into each stream node (4Kb or 100
|
|
// entries), therefore using a smaller segment length than kMaxBlobLen.
|
|
size_t n = std::min<size_t>(listpacks, 512);
|
|
load_trace->arr.resize(n * 2);
|
|
|
|
error_code ec;
|
|
for (size_t i = 0; i < n; ++i) {
|
|
/* Get the master ID, the one we'll use as key of the radix tree
|
|
* node: the entries inside the listpack itself are delta-encoded
|
|
* relatively to this ID. */
|
|
RdbVariant stream_id, blob;
|
|
ec = ReadStringObj(&stream_id);
|
|
if (ec)
|
|
return make_unexpected(ec);
|
|
if (StrLen(stream_id) != sizeof(streamID)) {
|
|
LOG(ERROR) << "Stream node key entry is not the size of a stream ID";
|
|
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
}
|
|
|
|
ec = ReadStringObj(&blob);
|
|
if (ec)
|
|
return make_unexpected(ec);
|
|
if (StrLen(blob) == 0) {
|
|
LOG(ERROR) << "Stream listpacks loading failed";
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
}
|
|
|
|
load_trace->arr[2 * i].rdb_var = std::move(stream_id);
|
|
load_trace->arr[2 * i + 1].rdb_var = std::move(blob);
|
|
}
|
|
|
|
// If there are still unread elements, cache the number of remaining
|
|
// elements, or clear if the full object has been read.
|
|
//
|
|
// We only load the stream metadata and consumer groups in the final read,
|
|
// so if there are still unread elements return the partial stream.
|
|
if (listpacks > n) {
|
|
pending_read_.remaining = listpacks - n;
|
|
return OpaqueObj{std::move(load_trace), rdbtype};
|
|
}
|
|
|
|
pending_read_.remaining = 0;
|
|
|
|
// Load stream metadata.
|
|
load_trace->stream_trace.reset(new StreamTrace);
|
|
|
|
/* Load total number of items inside the stream. */
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->stream_len);
|
|
|
|
/* Load the last entry ID. */
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->last_id.ms);
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->last_id.seq);
|
|
|
|
if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) {
|
|
/* Load the first entry ID. */
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->first_id.ms);
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->first_id.seq);
|
|
|
|
/* Load the maximal deleted entry ID. */
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->max_deleted_entry_id.ms);
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->max_deleted_entry_id.seq);
|
|
|
|
/* Load the offset. */
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), load_trace->stream_trace->entries_added);
|
|
} else {
|
|
/* During migration the offset can be initialized to the stream's
|
|
* length. At this point, we also don't care about tombstones
|
|
* because CG offsets will be later initialized as well. */
|
|
load_trace->stream_trace->entries_added = load_trace->stream_trace->stream_len;
|
|
}
|
|
|
|
/* Consumer groups loading */
|
|
uint64_t cgroups_count;
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), cgroups_count);
|
|
load_trace->stream_trace->cgroup.resize(cgroups_count);
|
|
|
|
for (size_t i = 0; i < cgroups_count; ++i) {
|
|
auto& cgroup = load_trace->stream_trace->cgroup[i];
|
|
/* Get the consumer group name and ID. We can then create the
|
|
* consumer group ASAP and populate its structure as
|
|
* we read more data. */
|
|
|
|
// sds cgname;
|
|
RdbVariant cgname;
|
|
ec = ReadStringObj(&cgname);
|
|
if (ec)
|
|
return make_unexpected(ec);
|
|
cgroup.name = std::move(cgname);
|
|
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.ms);
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.seq);
|
|
|
|
cgroup.entries_read = 0;
|
|
if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_2) {
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.entries_read);
|
|
}
|
|
|
|
/* Load the global PEL for this consumer group, however we'll
|
|
* not yet populate the NACK structures with the message
|
|
* owner, since consumers for this group and their messages will
|
|
* be read as a next step. So for now leave them not resolved
|
|
* and later populate it. */
|
|
uint64_t pel_size;
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), pel_size);
|
|
|
|
cgroup.pel_arr.resize(pel_size);
|
|
|
|
for (size_t j = 0; j < pel_size; ++j) {
|
|
auto& pel = cgroup.pel_arr[j];
|
|
error_code ec = FetchBuf(pel.rawid.size(), pel.rawid.data());
|
|
if (ec) {
|
|
LOG(ERROR) << "Stream PEL ID loading failed.";
|
|
return make_unexpected(ec);
|
|
}
|
|
|
|
SET_OR_UNEXPECT(FetchInt<int64_t>(), pel.delivery_time);
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), pel.delivery_count);
|
|
}
|
|
|
|
/* Now that we loaded our global PEL, we need to load the
|
|
* consumers and their local PELs. */
|
|
uint64_t consumers_num;
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), consumers_num);
|
|
cgroup.cons_arr.resize(consumers_num);
|
|
|
|
for (size_t j = 0; j < consumers_num; ++j) {
|
|
auto& consumer = cgroup.cons_arr[j];
|
|
ec = ReadStringObj(&consumer.name);
|
|
if (ec)
|
|
return make_unexpected(ec);
|
|
|
|
SET_OR_UNEXPECT(FetchInt<int64_t>(), consumer.seen_time);
|
|
|
|
if (rdbtype >= RDB_TYPE_STREAM_LISTPACKS_3) {
|
|
SET_OR_UNEXPECT(FetchInt<int64_t>(), consumer.active_time);
|
|
} else {
|
|
/* That's the best estimate we got */
|
|
consumer.active_time = consumer.seen_time;
|
|
}
|
|
|
|
/* Load the PEL about entries owned by this specific
|
|
* consumer. */
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), pel_size);
|
|
consumer.nack_arr.resize(pel_size);
|
|
for (size_t k = 0; k < pel_size; ++k) {
|
|
auto& nack = consumer.nack_arr[k];
|
|
// unsigned char rawid[sizeof(streamID)];
|
|
error_code ec = FetchBuf(nack.size(), nack.data());
|
|
if (ec) {
|
|
LOG(ERROR) << "Stream PEL ID loading failed.";
|
|
return make_unexpected(ec);
|
|
}
|
|
/*streamNACK* nack = (streamNACK*)raxFind(cgroup->pel, rawid, sizeof(rawid));
|
|
if (nack == raxNotFound) {
|
|
LOG(ERROR) << "Consumer entry not found in group global PEL";
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
}*/
|
|
|
|
/* Set the NACK consumer, that was left to NULL when
|
|
* loading the global PEL. Then set the same shared
|
|
* NACK structure also in the consumer-specific PEL. */
|
|
/*
|
|
nack->consumer = consumer;
|
|
if (!raxTryInsert(consumer->pel, rawid, sizeof(rawid), nack, NULL)) {
|
|
LOG(ERROR) << "Duplicated consumer PEL entry loading a stream consumer group";
|
|
streamFreeNACK(nack);
|
|
return Unexpected(errc::duplicate_key);
|
|
}*/
|
|
}
|
|
} // while (consumers_num)
|
|
} // while (cgroup_num)
|
|
|
|
return OpaqueObj{std::move(load_trace), RDB_TYPE_STREAM_LISTPACKS};
|
|
}
|
|
|
|
auto RdbLoaderBase::ReadRedisJson() -> io::Result<OpaqueObj> {
|
|
auto json_magic_number = LoadLen(nullptr);
|
|
if (!json_magic_number) {
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
}
|
|
|
|
constexpr string_view kJsonModule = "ReJSON-RL"sv;
|
|
string module_name = ModuleTypeName(*json_magic_number);
|
|
if (module_name != kJsonModule) {
|
|
LOG(ERROR) << "Unsupported module: " << module_name;
|
|
return Unexpected(errc::unsupported_operation);
|
|
}
|
|
|
|
int encver = *json_magic_number & 1023;
|
|
if (encver != 3) {
|
|
LOG(ERROR) << "Unsupported ReJSON version: " << encver;
|
|
return Unexpected(errc::unsupported_operation);
|
|
}
|
|
|
|
auto opcode = FetchInt<uint8_t>();
|
|
if (!opcode || *opcode != RDB_MODULE_OPCODE_STRING) {
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
}
|
|
|
|
RdbVariant dest;
|
|
error_code ec = ReadStringObj(&dest);
|
|
if (ec) {
|
|
return make_unexpected(ec);
|
|
}
|
|
|
|
opcode = FetchInt<uint8_t>();
|
|
if (!opcode || *opcode != RDB_MODULE_OPCODE_EOF) {
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
}
|
|
|
|
return OpaqueObj{std::move(dest), RDB_TYPE_JSON};
|
|
}
|
|
|
|
auto RdbLoaderBase::ReadSBF() -> io::Result<OpaqueObj> {
|
|
RdbSBF res;
|
|
uint64_t options;
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), options);
|
|
if (options != 0)
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
SET_OR_UNEXPECT(FetchBinaryDouble(), res.grow_factor);
|
|
SET_OR_UNEXPECT(FetchBinaryDouble(), res.fp_prob);
|
|
if (res.fp_prob <= 0 || res.fp_prob > 0.5) {
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
}
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), res.prev_size);
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), res.current_size);
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), res.max_capacity);
|
|
|
|
unsigned num_filters = 0;
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), num_filters);
|
|
auto is_power2 = [](size_t n) { return (n & (n - 1)) == 0; };
|
|
|
|
for (unsigned i = 0; i < num_filters; ++i) {
|
|
unsigned hash_cnt;
|
|
string filter_data;
|
|
SET_OR_UNEXPECT(LoadLen(nullptr), hash_cnt);
|
|
SET_OR_UNEXPECT(FetchGenericString(), filter_data);
|
|
size_t bit_len = filter_data.size() * 8;
|
|
if (!is_power2(bit_len)) { // must be power of two
|
|
return Unexpected(errc::rdb_file_corrupted);
|
|
}
|
|
res.filters.emplace_back(hash_cnt, std::move(filter_data));
|
|
}
|
|
return OpaqueObj{std::move(res), RDB_TYPE_SBF};
|
|
}
|
|
|
|
template <typename T> io::Result<T> RdbLoaderBase::FetchInt() {
|
|
auto ec = EnsureRead(sizeof(T));
|
|
if (ec)
|
|
return make_unexpected(ec);
|
|
|
|
char buf[16];
|
|
mem_buf_->ReadAndConsume(sizeof(T), buf);
|
|
|
|
return base::LE::LoadT<std::make_unsigned_t<T>>(buf);
|
|
}
|
|
|
|
io::Result<uint8_t> RdbLoaderBase::FetchType() {
|
|
return FetchInt<uint8_t>();
|
|
}
|
|
|
|
// -------------- RdbLoader ----------------------------
|
|
|
|
struct RdbLoader::ObjSettings {
|
|
long long now; // current epoch time in ms.
|
|
int64_t expiretime = 0; // expire epoch time in ms
|
|
uint32_t mc_flags = 0;
|
|
|
|
bool has_expired = false;
|
|
|
|
bool is_sticky = false;
|
|
bool has_mc_flags = false;
|
|
|
|
void Reset() {
|
|
mc_flags = expiretime = 0;
|
|
has_expired = false;
|
|
is_sticky = false;
|
|
has_mc_flags = false;
|
|
}
|
|
|
|
void SetExpire(int64_t val) {
|
|
expiretime = val;
|
|
has_expired = (val <= now);
|
|
}
|
|
|
|
void SetMCFlags(uint32_t flags) {
|
|
has_mc_flags = true;
|
|
mc_flags = flags;
|
|
}
|
|
|
|
ObjSettings() = default;
|
|
};
|
|
|
|
RdbLoader::RdbLoader(Service* service)
|
|
: service_{service},
|
|
rdb_ignore_expiry_{GetFlag(FLAGS_rdb_ignore_expiry)},
|
|
script_mgr_{service == nullptr ? nullptr : service->script_mgr()},
|
|
shard_buf_{shard_set->size()} {
|
|
}
|
|
|
|
RdbLoader::~RdbLoader() {
|
|
while (true) {
|
|
Item* item = item_queue_.Pop();
|
|
if (item == nullptr)
|
|
break;
|
|
delete item;
|
|
}
|
|
|
|
// Decommit local memory.
|
|
// We create an RdbLoader for each thread, so each one will Decommit for itself after
|
|
// full sync ends (since we explicitly reset the RdbLoader).
|
|
auto* tlocal = ServerState::tlocal();
|
|
tlocal->DecommitMemory(ServerState::kAllMemory);
|
|
}
|
|
|
|
error_code RdbLoader::Load(io::Source* src) {
|
|
CHECK(!src_ && src);
|
|
|
|
is_tiered_enabled_ =
|
|
shard_set->Await(0, [] { return EngineShard::tlocal()->tiered_storage() != nullptr; });
|
|
|
|
absl::Time start = absl::Now();
|
|
src_ = src;
|
|
|
|
IoBuf::Bytes bytes = mem_buf_->AppendBuffer();
|
|
io::Result<size_t> read_sz = src_->ReadAtLeast(bytes, 9);
|
|
if (!read_sz)
|
|
return read_sz.error();
|
|
|
|
bytes_read_ = *read_sz;
|
|
if (bytes_read_ < 9) {
|
|
return RdbError(errc::wrong_signature);
|
|
}
|
|
|
|
mem_buf_->CommitWrite(bytes_read_);
|
|
|
|
{
|
|
auto cb = mem_buf_->InputBuffer();
|
|
|
|
if (memcmp(cb.data(), "REDIS", 5) != 0) {
|
|
VLOG(1) << "Bad header: " << absl::CHexEscape(facade::ToSV(cb));
|
|
return RdbError(errc::wrong_signature);
|
|
}
|
|
|
|
char buf[64] = {0};
|
|
::memcpy(buf, cb.data() + 5, 4);
|
|
|
|
rdb_version_ = atoi(buf);
|
|
if (rdb_version_ < 5 || rdb_version_ > RDB_VERSION) { // We accept starting from 5.
|
|
LOG(ERROR) << "RDB Version " << rdb_version_ << " is not supported";
|
|
return RdbError(errc::bad_version);
|
|
}
|
|
|
|
mem_buf_->ConsumeInput(9);
|
|
}
|
|
|
|
int type;
|
|
|
|
/* Key-specific attributes, set by opcodes before the key type. */
|
|
ObjSettings settings;
|
|
settings.now = GetCurrentTimeMs();
|
|
size_t keys_loaded = 0;
|
|
|
|
auto cleanup = absl::Cleanup([&] { FinishLoad(start, &keys_loaded); });
|
|
|
|
// Increment local one if it exists
|
|
if (EngineShard* es = EngineShard::tlocal(); es) {
|
|
namespaces->GetDefaultNamespace().GetCurrentDbSlice().IncrLoadInProgress();
|
|
}
|
|
|
|
while (!stop_early_.load(memory_order_relaxed)) {
|
|
if (pause_) {
|
|
ThisFiber::SleepFor(100ms);
|
|
continue;
|
|
}
|
|
|
|
/* Read type. */
|
|
SET_OR_RETURN(FetchType(), type);
|
|
|
|
DVLOG(3) << "Opcode type: " << type;
|
|
|
|
/* Handle special types. */
|
|
if (type == RDB_OPCODE_EXPIRETIME) {
|
|
LOG(ERROR) << "opcode RDB_OPCODE_EXPIRETIME not supported";
|
|
|
|
return RdbError(errc::invalid_encoding);
|
|
}
|
|
|
|
if (type == RDB_OPCODE_EXPIRETIME_MS) {
|
|
int64_t val;
|
|
/* EXPIRETIME_MS: milliseconds precision expire times introduced
|
|
* with RDB v3. Like EXPIRETIME but no with more precision. */
|
|
SET_OR_RETURN(FetchInt<int64_t>(), val);
|
|
if (!rdb_ignore_expiry_) {
|
|
settings.SetExpire(val);
|
|
}
|
|
continue; /* Read next opcode. */
|
|
}
|
|
|
|
if (type == RDB_OPCODE_DF_MASK) {
|
|
uint32_t mask;
|
|
SET_OR_RETURN(FetchInt<uint32_t>(), mask);
|
|
settings.is_sticky = mask & DF_MASK_FLAG_STICKY;
|
|
settings.has_mc_flags = mask & DF_MASK_FLAG_MC_FLAGS;
|
|
if (settings.has_mc_flags) {
|
|
SET_OR_RETURN(FetchInt<uint32_t>(), settings.mc_flags);
|
|
}
|
|
continue; /* Read next opcode. */
|
|
}
|
|
|
|
if (type == RDB_OPCODE_FREQ) {
|
|
/* FREQ: LFU frequency. */
|
|
FetchInt<uint8_t>(); // IGNORE
|
|
continue; /* Read next opcode. */
|
|
}
|
|
|
|
if (type == RDB_OPCODE_IDLE) {
|
|
/* IDLE: LRU idle time. */
|
|
uint64_t idle;
|
|
SET_OR_RETURN(LoadLen(nullptr), idle); // ignore
|
|
(void)idle;
|
|
continue; /* Read next opcode. */
|
|
}
|
|
|
|
if (type == RDB_OPCODE_EOF) {
|
|
/* EOF: End of file, exit the main loop. */
|
|
break;
|
|
}
|
|
|
|
if (type == RDB_OPCODE_FULLSYNC_END) {
|
|
VLOG(1) << "Read RDB_OPCODE_FULLSYNC_END";
|
|
RETURN_ON_ERR(EnsureRead(8));
|
|
mem_buf_->ConsumeInput(8); // ignore 8 bytes
|
|
|
|
if (full_sync_cut_cb) {
|
|
FlushAllShards(); // Flush as the handler awakes post load handlers
|
|
full_sync_cut_cb();
|
|
}
|
|
continue;
|
|
}
|
|
|
|
if (type == RDB_OPCODE_JOURNAL_OFFSET) {
|
|
VLOG(1) << "Read RDB_OPCODE_JOURNAL_OFFSET";
|
|
uint64_t journal_offset;
|
|
SET_OR_RETURN(FetchInt<uint64_t>(), journal_offset);
|
|
VLOG(1) << "Got offset " << journal_offset;
|
|
journal_offset_ = journal_offset;
|
|
continue;
|
|
}
|
|
|
|
if (type == RDB_OPCODE_SELECTDB) {
|
|
unsigned dbid = 0;
|
|
|
|
/* SELECTDB: Select the specified database. */
|
|
SET_OR_RETURN(LoadLen(nullptr), dbid);
|
|
|
|
if (dbid > GetFlag(FLAGS_dbnum)) {
|
|
LOG(WARNING) << "database id " << dbid << " exceeds dbnum limit. Try increasing the flag.";
|
|
|
|
return RdbError(errc::bad_db_index);
|
|
}
|
|
|
|
DVLOG(2) << "Select DB: " << dbid;
|
|
for (unsigned i = 0; i < shard_set->size(); ++i) {
|
|
// we should flush pending items before switching dbid.
|
|
FlushShardAsync(i);
|
|
|
|
// Active database if not existed before.
|
|
shard_set->Add(
|
|
i, [dbid] { namespaces->GetDefaultNamespace().GetCurrentDbSlice().ActivateDb(dbid); });
|
|
}
|
|
|
|
cur_db_index_ = dbid;
|
|
if (EngineShard::tlocal()) { // because we sometimes create entries inline.
|
|
namespaces->GetDefaultNamespace().GetCurrentDbSlice().ActivateDb(dbid);
|
|
}
|
|
continue; /* Read next opcode. */
|
|
}
|
|
|
|
if (type == RDB_OPCODE_RESIZEDB) {
|
|
/* RESIZEDB: Hint about the size of the keys in the currently
|
|
* selected data base, in order to avoid useless rehashing. */
|
|
uint64_t db_size, expires_size;
|
|
SET_OR_RETURN(LoadLen(nullptr), db_size);
|
|
SET_OR_RETURN(LoadLen(nullptr), expires_size);
|
|
|
|
ResizeDb(db_size, expires_size);
|
|
continue; /* Read next opcode. */
|
|
}
|
|
|
|
if (type == RDB_OPCODE_AUX) {
|
|
RETURN_ON_ERR(HandleAux());
|
|
continue; /* Read type again. */
|
|
}
|
|
|
|
if (type == RDB_OPCODE_MODULE_AUX) {
|
|
uint64_t module_id;
|
|
SET_OR_RETURN(LoadLen(nullptr), module_id);
|
|
string module_name = ModuleTypeName(module_id);
|
|
|
|
LOG(WARNING) << "WARNING: Skipping data for module " << module_name;
|
|
RETURN_ON_ERR(SkipModuleData());
|
|
continue;
|
|
}
|
|
|
|
if (type == RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START ||
|
|
type == RDB_OPCODE_COMPRESSED_LZ4_BLOB_START) {
|
|
RETURN_ON_ERR(HandleCompressedBlob(type));
|
|
continue;
|
|
}
|
|
|
|
if (type == RDB_OPCODE_COMPRESSED_BLOB_END) {
|
|
RETURN_ON_ERR(HandleCompressedBlobFinish());
|
|
continue;
|
|
}
|
|
|
|
if (type == RDB_OPCODE_JOURNAL_BLOB) {
|
|
FlushAllShards(); // Always flush before applying incremental on top
|
|
RETURN_ON_ERR(HandleJournalBlob(service_));
|
|
continue;
|
|
}
|
|
|
|
if (type == RDB_OPCODE_SLOT_INFO) {
|
|
[[maybe_unused]] uint64_t slot_id;
|
|
SET_OR_RETURN(LoadLen(nullptr), slot_id);
|
|
[[maybe_unused]] uint64_t slot_size;
|
|
SET_OR_RETURN(LoadLen(nullptr), slot_size);
|
|
[[maybe_unused]] uint64_t expires_slot_size;
|
|
SET_OR_RETURN(LoadLen(nullptr), expires_slot_size);
|
|
continue;
|
|
}
|
|
|
|
if (!rdbIsObjectTypeDF(type)) {
|
|
return RdbError(errc::invalid_rdb_type);
|
|
}
|
|
|
|
++keys_loaded;
|
|
RETURN_ON_ERR(LoadKeyValPair(type, &settings));
|
|
settings.Reset();
|
|
} // main load loop
|
|
|
|
DVLOG(1) << "RdbLoad loop finished";
|
|
|
|
if (stop_early_) {
|
|
return *ec_;
|
|
}
|
|
|
|
/* Verify the checksum if RDB version is >= 5 */
|
|
RETURN_ON_ERR(VerifyChecksum());
|
|
|
|
return kOk;
|
|
}
|
|
|
|
void RdbLoader::FinishLoad(absl::Time start_time, size_t* keys_loaded) {
|
|
BlockingCounter bc(shard_set->size());
|
|
for (unsigned i = 0; i < shard_set->size(); ++i) {
|
|
// Flush the remaining items.
|
|
FlushShardAsync(i);
|
|
|
|
// Send sentinel callbacks to ensure that all previous messages have been processed.
|
|
shard_set->Add(i, [bc]() mutable { bc->Dec(); });
|
|
}
|
|
bc->Wait(); // wait for sentinels to report.
|
|
// Decrement local one if it exists
|
|
if (EngineShard* es = EngineShard::tlocal(); es) {
|
|
namespaces->GetDefaultNamespace().GetCurrentDbSlice().DecrLoadInProgress();
|
|
}
|
|
|
|
absl::Duration dur = absl::Now() - start_time;
|
|
load_time_ = double(absl::ToInt64Milliseconds(dur)) / 1000;
|
|
keys_loaded_ = *keys_loaded;
|
|
}
|
|
|
|
std::error_code RdbLoaderBase::EnsureRead(size_t min_sz) {
|
|
// In the flow of reading compressed data, we store the uncompressed data to in uncompressed
|
|
// buffer. When parsing entries we call ensure read with 9 bytes to read the length of
|
|
// key/value. If the key/value is very small (less than 9 bytes) the remainded data in
|
|
// uncompressed buffer might contain less than 9 bytes. We need to make sure that we dont read
|
|
// from sink to the uncompressed buffer and therefor in this flow we return here.
|
|
if (mem_buf_ != &origin_mem_buf_)
|
|
return std::error_code{};
|
|
if (mem_buf_->InputLen() >= min_sz)
|
|
return std::error_code{};
|
|
return EnsureReadInternal(min_sz);
|
|
}
|
|
|
|
error_code RdbLoaderBase::EnsureReadInternal(size_t min_to_read) {
|
|
// We need to include what we already read inside Input buffer. Otherwise we might expect to read
|
|
// more than the minimum
|
|
const size_t min_sz = min_to_read - mem_buf_->InputLen();
|
|
|
|
auto out_buf = mem_buf_->AppendBuffer();
|
|
CHECK_GT(out_buf.size(), min_sz);
|
|
|
|
// If limit was applied we do not want to read more than needed
|
|
// important when reading from sockets.
|
|
if (bytes_read_ + out_buf.size() > source_limit_) {
|
|
out_buf = out_buf.subspan(0, source_limit_ - bytes_read_);
|
|
}
|
|
|
|
io::Result<size_t> res = src_->ReadAtLeast(out_buf, min_sz);
|
|
if (!res) {
|
|
VLOG(1) << "Error reading from source: " << res.error() << " " << min_sz << " bytes";
|
|
return res.error();
|
|
}
|
|
if (*res < min_sz)
|
|
return RdbError(errc::rdb_file_corrupted);
|
|
DVLOG(2) << "EnsureRead " << *res << " bytes";
|
|
bytes_read_ += *res;
|
|
|
|
DCHECK_LE(bytes_read_, source_limit_);
|
|
mem_buf_->CommitWrite(*res);
|
|
|
|
return kOk;
|
|
}
|
|
|
|
io::Result<uint64_t> RdbLoaderBase::LoadLen(bool* is_encoded) {
|
|
if (is_encoded)
|
|
*is_encoded = false;
|
|
|
|
// Every RDB file with rdbver >= 5 has 8-bytes checksum at the end,
|
|
// so we can ensure we have 9 bytes to read up until that point.
|
|
error_code ec = EnsureRead(9);
|
|
if (ec)
|
|
return make_unexpected(ec);
|
|
|
|
// Read integer meta info.
|
|
auto bytes = mem_buf_->InputBuffer();
|
|
PackedUIntMeta meta{bytes[0]};
|
|
bytes.remove_prefix(1);
|
|
|
|
// Read integer.
|
|
uint64_t res;
|
|
SET_OR_UNEXPECT(ReadPackedUInt(meta, bytes), res);
|
|
|
|
if (meta.Type() == RDB_ENCVAL && is_encoded)
|
|
*is_encoded = true;
|
|
|
|
mem_buf_->ConsumeInput(1 + meta.ByteSize());
|
|
|
|
return res;
|
|
}
|
|
|
|
error_code RdbLoaderBase::AllocateDecompressOnce(int op_type) {
|
|
if (decompress_impl_) {
|
|
return {};
|
|
}
|
|
|
|
if (op_type == RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START) {
|
|
decompress_impl_ = detail::DecompressImpl::CreateZstd();
|
|
} else if (op_type == RDB_OPCODE_COMPRESSED_LZ4_BLOB_START) {
|
|
decompress_impl_ = detail::DecompressImpl::CreateLZ4();
|
|
} else {
|
|
return RdbError(errc::unsupported_operation);
|
|
}
|
|
return {};
|
|
}
|
|
|
|
error_code RdbLoaderBase::SkipModuleData() {
|
|
uint64_t opcode;
|
|
SET_OR_RETURN(LoadLen(nullptr), opcode); // ignore field 'when_opcode'
|
|
if (opcode != RDB_MODULE_OPCODE_UINT)
|
|
return RdbError(errc::rdb_file_corrupted);
|
|
SET_OR_RETURN(LoadLen(nullptr), opcode); // ignore field 'when'
|
|
|
|
while (true) {
|
|
SET_OR_RETURN(LoadLen(nullptr), opcode);
|
|
|
|
switch (opcode) {
|
|
case RDB_MODULE_OPCODE_EOF:
|
|
return kOk; // Module data end
|
|
|
|
case RDB_MODULE_OPCODE_SINT:
|
|
case RDB_MODULE_OPCODE_UINT: {
|
|
[[maybe_unused]] uint64_t _;
|
|
SET_OR_RETURN(LoadLen(nullptr), _);
|
|
break;
|
|
}
|
|
|
|
case RDB_MODULE_OPCODE_STRING: {
|
|
RdbVariant dest;
|
|
error_code ec = ReadStringObj(&dest);
|
|
if (ec) {
|
|
return ec;
|
|
}
|
|
break;
|
|
}
|
|
|
|
case RDB_MODULE_OPCODE_DOUBLE: {
|
|
[[maybe_unused]] double _;
|
|
SET_OR_RETURN(FetchBinaryDouble(), _);
|
|
break;
|
|
}
|
|
|
|
default:
|
|
// TODO: handle RDB_MODULE_OPCODE_FLOAT
|
|
LOG(ERROR) << "Unsupported module section: " << opcode;
|
|
return RdbError(errc::rdb_file_corrupted);
|
|
}
|
|
}
|
|
}
|
|
|
|
error_code RdbLoaderBase::HandleCompressedBlob(int op_type) {
|
|
DVLOG(2) << "HandleCompressedBlob: " << op_type;
|
|
RETURN_ON_ERR(AllocateDecompressOnce(op_type));
|
|
|
|
// Fetch uncompress blob
|
|
string res;
|
|
SET_OR_RETURN(FetchGenericString(), res);
|
|
|
|
// Decompress blob and switch membuf pointer
|
|
// Last type in the compressed blob is RDB_OPCODE_COMPRESSED_BLOB_END
|
|
// in which we will switch back to the origin membuf (HandleCompressedBlobFinish)
|
|
SET_OR_RETURN(decompress_impl_->Decompress(res), mem_buf_);
|
|
|
|
return kOk;
|
|
}
|
|
|
|
error_code RdbLoaderBase::HandleCompressedBlobFinish() {
|
|
DVLOG(2) << "HandleCompressedBlobFinish";
|
|
|
|
CHECK_NE(&origin_mem_buf_, mem_buf_);
|
|
CHECK_EQ(mem_buf_->InputLen(), size_t(0));
|
|
mem_buf_ = &origin_mem_buf_;
|
|
return kOk;
|
|
}
|
|
|
|
error_code RdbLoaderBase::HandleJournalBlob(Service* service) {
|
|
// Read the number of entries in the journal blob.
|
|
size_t num_entries;
|
|
bool _encoded;
|
|
SET_OR_RETURN(LoadLen(&_encoded), num_entries);
|
|
|
|
// Read the journal blob.
|
|
string journal_blob;
|
|
SET_OR_RETURN(FetchGenericString(), journal_blob);
|
|
|
|
io::BytesSource bs{io::Buffer(journal_blob)};
|
|
journal_reader_.SetSource(&bs);
|
|
|
|
// Parse and exectue in loop.
|
|
size_t done = 0;
|
|
JournalExecutor ex{service};
|
|
while (done < num_entries) {
|
|
journal::ParsedEntry entry{};
|
|
SET_OR_RETURN(journal_reader_.ReadEntry(), entry);
|
|
done++;
|
|
|
|
if (entry.cmd.cmd_args.empty())
|
|
return RdbError(errc::rdb_file_corrupted);
|
|
|
|
if (absl::EqualsIgnoreCase(facade::ToSV(entry.cmd.cmd_args[0]), "FLUSHALL") ||
|
|
absl::EqualsIgnoreCase(facade::ToSV(entry.cmd.cmd_args[0]), "FLUSHDB")) {
|
|
// Applying a flush* operation in the middle of a load can cause out-of-sync deletions of
|
|
// data that should not be deleted, see https://github.com/dragonflydb/dragonfly/issues/1231
|
|
// By returning an error we are effectively restarting the replication.
|
|
return RdbError(errc::unsupported_operation);
|
|
}
|
|
|
|
DVLOG(2) << "Executing item: " << entry.ToString();
|
|
ex.Execute(entry.dbid, entry.cmd);
|
|
}
|
|
|
|
return std::error_code{};
|
|
}
|
|
|
|
error_code RdbLoader::HandleAux() {
|
|
/* AUX: generic string-string fields. Use to add state to RDB
|
|
* which is backward compatible. Implementations of RDB loading
|
|
* are required to skip AUX fields they don't understand.
|
|
*
|
|
* An AUX field is composed of two strings: key and value. */
|
|
string auxkey, auxval;
|
|
|
|
SET_OR_RETURN(FetchGenericString(), auxkey);
|
|
SET_OR_RETURN(FetchGenericString(), auxval);
|
|
|
|
if (!auxkey.empty() && auxkey[0] == '%') {
|
|
/* All the fields with a name staring with '%' are considered
|
|
* information fields and are logged at startup with a log
|
|
* level of NOTICE. */
|
|
LOG(INFO) << "RDB '" << auxkey << "': " << auxval;
|
|
} else if (auxkey == "repl-stream-db") {
|
|
// TODO
|
|
} else if (auxkey == "repl-id") {
|
|
// TODO
|
|
} else if (auxkey == "repl-offset") {
|
|
// TODO
|
|
} else if (auxkey == "lua") {
|
|
LoadScriptFromAux(std::move(auxval));
|
|
} else if (auxkey == "redis-ver") {
|
|
VLOG(1) << "Loading RDB produced by Redis version " << auxval;
|
|
} else if (auxkey == "df-ver") {
|
|
VLOG(1) << "Loading RDB produced by Dragonfly version " << auxval;
|
|
} else if (auxkey == "ctime") {
|
|
int64_t ctime;
|
|
if (absl::SimpleAtoi(auxval, &ctime)) {
|
|
time_t age = time(NULL) - ctime;
|
|
if (age < 0)
|
|
age = 0;
|
|
VLOG(1) << "RDB age " << strings::HumanReadableElapsedTime(age);
|
|
}
|
|
} else if (auxkey == "used-mem") {
|
|
int64_t usedmem;
|
|
if (absl::SimpleAtoi(auxval, &usedmem)) {
|
|
VLOG(1) << "RDB memory usage when created " << strings::HumanReadableNumBytes(usedmem);
|
|
if (usedmem > ssize_t(max_memory_limit)) {
|
|
if (IsClusterEnabled()) {
|
|
LOG(INFO) << "Allowing to load a snapshot of size " << usedmem
|
|
<< ", despite memory limit of " << max_memory_limit << " due to cluster mode";
|
|
} else if (is_tiered_enabled_) {
|
|
LOG(INFO) << "Allowing to load a snapshot of size " << usedmem
|
|
<< ", despite memory limit of " << max_memory_limit << " due to tiered storage";
|
|
} else {
|
|
LOG(WARNING) << "Could not load snapshot - its used memory is " << usedmem
|
|
<< " but the limit is " << max_memory_limit;
|
|
return RdbError(errc::out_of_memory);
|
|
}
|
|
}
|
|
}
|
|
} else if (auxkey == "aof-preamble") {
|
|
long long haspreamble;
|
|
if (absl::SimpleAtoi(auxval, &haspreamble) && haspreamble) {
|
|
VLOG(1) << "RDB has an AOF tail";
|
|
}
|
|
} else if (auxkey == "redis-bits") {
|
|
/* Just ignored. */
|
|
} else if (auxkey == "search-index") {
|
|
LoadSearchIndexDefFromAux(std::move(auxval));
|
|
} else {
|
|
/* We ignore fields we don't understand, as by AUX field
|
|
* contract. */
|
|
LOG(WARNING) << "Unrecognized RDB AUX field: '" << auxkey << "'";
|
|
}
|
|
|
|
return kOk;
|
|
}
|
|
|
|
error_code RdbLoader::VerifyChecksum() {
|
|
uint64_t expected;
|
|
|
|
SET_OR_RETURN(FetchInt<uint64_t>(), expected);
|
|
|
|
io::Bytes cur_buf = mem_buf_->InputBuffer();
|
|
|
|
VLOG(1) << "VerifyChecksum: input buffer len " << cur_buf.size() << ", expected " << expected;
|
|
|
|
return kOk;
|
|
}
|
|
|
|
void RdbLoader::FlushShardAsync(ShardId sid) {
|
|
auto& out_buf = shard_buf_[sid];
|
|
if (out_buf.empty())
|
|
return;
|
|
|
|
auto cb = [indx = this->cur_db_index_, this, ib = std::move(out_buf)] {
|
|
auto& db_slice = namespaces->GetDefaultNamespace().GetCurrentDbSlice();
|
|
|
|
// Before we start loading, increment LoadInProgress.
|
|
// This is required because FlushShardAsync dispatches to multiple shards, and those shards
|
|
// might have not yet have their state (load in progress) incremented.
|
|
db_slice.IncrLoadInProgress();
|
|
this->LoadItemsBuffer(indx, ib);
|
|
db_slice.DecrLoadInProgress();
|
|
|
|
// Block, if tiered storage is active, but can't keep up
|
|
while (db_slice.shard_owner()->ShouldThrottleForTiering()) {
|
|
ThisFiber::SleepFor(100us);
|
|
}
|
|
};
|
|
|
|
bool preempted = shard_set->Add(sid, std::move(cb));
|
|
VLOG_IF(2, preempted) << "FlushShardAsync was throttled";
|
|
}
|
|
|
|
void RdbLoader::FlushAllShards() {
|
|
for (ShardId i = 0; i < shard_set->size(); i++)
|
|
FlushShardAsync(i);
|
|
}
|
|
|
|
std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, LoadConfig config,
|
|
CompactObj* pv) {
|
|
OpaqueObjLoader visitor(opaque.rdb_type, pv, config);
|
|
std::visit(visitor, opaque.obj);
|
|
|
|
return visitor.ec();
|
|
}
|
|
|
|
void RdbLoaderBase::CopyStreamId(const StreamID& src, struct streamID* dest) {
|
|
dest->ms = src.ms;
|
|
dest->seq = src.seq;
|
|
}
|
|
|
|
void RdbLoader::CreateObjectOnShard(const DbContext& db_cntx, const Item* item, DbSlice* db_slice) {
|
|
PrimeValue pv;
|
|
PrimeValue* pv_ptr = &pv;
|
|
DbIndex db_ind = db_cntx.db_index;
|
|
|
|
auto error_msg = [](const auto* item, auto db_ind) {
|
|
return absl::StrCat("Found empty key: ", item->key, " in DB ", db_ind, " rdb_type ",
|
|
item->val.rdb_type);
|
|
};
|
|
|
|
// The scope is important here, as we need to ensure that the object memory is properly
|
|
// accounted for.
|
|
DbSlice::ItAndUpdater append_res;
|
|
|
|
// If we're appending the item to an existing key, first load the
|
|
// object.
|
|
if (item->load_config.append) {
|
|
append_res = db_slice->FindMutable(db_cntx, item->key);
|
|
if (!IsValid(append_res.it)) {
|
|
// If the item has expired we may not find the key. Note if the key
|
|
// is found, but expired since we started loading, we still append to
|
|
// avoid an inconsistent state where only part of the key is loaded.
|
|
if (item->expire_ms == 0 || db_cntx.time_now_ms < item->expire_ms) {
|
|
LOG(ERROR) << "Count not to find append key '" << item->key << "' in DB " << db_ind;
|
|
}
|
|
return;
|
|
}
|
|
pv_ptr = &append_res.it->second;
|
|
}
|
|
|
|
if (ec_ = FromOpaque(item->val, item->load_config, pv_ptr); ec_) {
|
|
if ((*ec_).value() == errc::empty_key) {
|
|
auto error = error_msg(item, db_ind);
|
|
if (RdbTypeAllowedEmpty(item->val.rdb_type)) {
|
|
LOG(WARNING) << error;
|
|
} else {
|
|
LOG(ERROR) << error;
|
|
}
|
|
return;
|
|
}
|
|
LOG(ERROR) << "Could not load value for key '" << item->key << "' in DB " << db_ind;
|
|
stop_early_ = true;
|
|
return;
|
|
}
|
|
|
|
if (item->load_config.append) {
|
|
return;
|
|
}
|
|
|
|
// We need this extra check because we don't return empty_key
|
|
if (!pv.TagAllowsEmptyValue() && pv.Size() == 0) {
|
|
LOG(WARNING) << error_msg(item, db_ind);
|
|
return;
|
|
}
|
|
|
|
if (item->expire_ms > 0 && db_cntx.time_now_ms >= item->expire_ms) {
|
|
VLOG(2) << "Expire key on load: " << item->key;
|
|
return;
|
|
}
|
|
|
|
auto op_res = db_slice->AddOrUpdate(db_cntx, item->key, std::move(pv), item->expire_ms);
|
|
if (!op_res) {
|
|
LOG(ERROR) << "OOM failed to add key '" << item->key << "' in DB " << db_ind;
|
|
ec_ = RdbError(errc::out_of_memory);
|
|
stop_early_ = true;
|
|
return;
|
|
}
|
|
|
|
DbSlice::ItAndUpdater& res = *op_res;
|
|
res.it->first.SetSticky(item->is_sticky);
|
|
if (item->has_mc_flags) {
|
|
res.it->second.SetFlag(true);
|
|
db_slice->SetMCFlag(db_cntx.db_index, res.it->first.AsRef(), item->mc_flags);
|
|
}
|
|
|
|
if (!override_existing_keys_ && !res.is_new) {
|
|
LOG(WARNING) << "RDB has duplicated key '" << item->key << "' in DB " << db_ind;
|
|
}
|
|
|
|
if (auto* ts = db_slice->shard_owner()->tiered_storage(); ts)
|
|
ts->TryStash(db_cntx.db_index, item->key, &res.it->second);
|
|
}
|
|
|
|
void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
|
|
EngineShard* es = EngineShard::tlocal();
|
|
DbContext db_cntx{&namespaces->GetDefaultNamespace(), db_ind, GetCurrentTimeMs()};
|
|
DbSlice& db_slice = db_cntx.GetDbSlice(es->shard_id());
|
|
|
|
DCHECK(!db_slice.IsCacheMode());
|
|
|
|
bool dry_run = absl::GetFlag(FLAGS_rdb_load_dry_run);
|
|
|
|
for (const auto* item : ib) {
|
|
if (dry_run) {
|
|
continue;
|
|
}
|
|
|
|
CreateObjectOnShard(db_cntx, item, &db_slice);
|
|
if (stop_early_) {
|
|
return;
|
|
}
|
|
}
|
|
|
|
for (auto* item : ib) {
|
|
item_queue_.Push(item);
|
|
}
|
|
}
|
|
|
|
void RdbLoader::ResizeDb(size_t key_num, size_t expire_num) {
|
|
DCHECK_LT(key_num, 1U << 31);
|
|
DCHECK_LT(expire_num, 1U << 31);
|
|
// Note: To reserve space, it's necessary to allocate space at the shard level. We might
|
|
// load with different number of shards which makes database resizing unfeasible.
|
|
}
|
|
|
|
// Loads the next key/val pair.
|
|
//
|
|
// Huge objects may be loaded in parts, where only a subset of elements are
|
|
// loaded at a time. This reduces the memory required to load huge objects and
|
|
// prevents LoadItemsBuffer blocking. (Note so far only RDB_TYPE_SET and
|
|
// RDB_TYPE_SET_WITH_EXPIRY support partial reads).
|
|
error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
|
|
std::string key;
|
|
int64_t start = absl::GetCurrentTimeNanos();
|
|
|
|
SET_OR_RETURN(ReadKey(), key);
|
|
|
|
bool streamed = false;
|
|
do {
|
|
// If there is a cached Item in the free pool, take it, otherwise allocate
|
|
// a new Item (LoadItemsBuffer returns free items).
|
|
Item* item = item_queue_.Pop();
|
|
if (item == nullptr) {
|
|
item = new Item;
|
|
}
|
|
// Delete the item if we fail to load the key/val pair.
|
|
auto cleanup = absl::Cleanup([item] { delete item; });
|
|
|
|
item->load_config.append = pending_read_.remaining > 0;
|
|
|
|
error_code ec = ReadObj(type, &item->val);
|
|
if (ec) {
|
|
VLOG(2) << "ReadObj error " << ec << " for key " << key;
|
|
return ec;
|
|
}
|
|
|
|
// If the key can be discarded, we must still continue to read the
|
|
// object from the RDB so we can read the next key.
|
|
if (ShouldDiscardKey(key, settings)) {
|
|
pending_read_.reserve = 0;
|
|
continue;
|
|
}
|
|
|
|
if (pending_read_.remaining > 0) {
|
|
item->key = key;
|
|
streamed = true;
|
|
} else {
|
|
// Avoid copying the key if this is the last read of the object.
|
|
item->key = std::move(key);
|
|
}
|
|
|
|
item->load_config.streamed = streamed;
|
|
item->load_config.reserve = pending_read_.reserve;
|
|
// Clear 'reserve' as we must only set when the object is first
|
|
// initialized.
|
|
pending_read_.reserve = 0;
|
|
|
|
item->is_sticky = settings->is_sticky;
|
|
item->has_mc_flags = settings->has_mc_flags;
|
|
item->mc_flags = settings->mc_flags;
|
|
item->expire_ms = settings->expiretime;
|
|
|
|
std::move(cleanup).Cancel();
|
|
ShardId sid = Shard(item->key, shard_set->size());
|
|
EngineShard* es = EngineShard::tlocal();
|
|
|
|
if (es && es->shard_id() == sid) {
|
|
DbContext db_cntx{&namespaces->GetDefaultNamespace(), cur_db_index_, GetCurrentTimeMs()};
|
|
CreateObjectOnShard(db_cntx, item, &db_cntx.GetDbSlice(sid));
|
|
item_queue_.Push(item);
|
|
} else {
|
|
auto& out_buf = shard_buf_[sid];
|
|
|
|
out_buf.emplace_back(std::move(item));
|
|
|
|
constexpr size_t kBufSize = 64;
|
|
if (out_buf.size() >= kBufSize) {
|
|
// Despite being async, this function can block if the shard queue is full.
|
|
FlushShardAsync(sid);
|
|
}
|
|
}
|
|
} while (pending_read_.remaining > 0);
|
|
|
|
int delta_ms = (absl::GetCurrentTimeNanos() - start) / 1000'000;
|
|
LOG_IF(INFO, delta_ms > 1000) << "Took " << delta_ms << " ms to load rdb_type " << type;
|
|
|
|
return kOk;
|
|
}
|
|
|
|
bool RdbLoader::ShouldDiscardKey(std::string_view key, ObjSettings* settings) const {
|
|
if (!load_unowned_slots_ && IsClusterEnabled()) {
|
|
const auto cluster_config = cluster::ClusterConfig::Current();
|
|
if (cluster_config && !cluster_config->IsMySlot(key)) {
|
|
return true;
|
|
}
|
|
}
|
|
|
|
/* Check if the key already expired. This function is used when loading
|
|
* an RDB file from disk, either at startup, or when an RDB was
|
|
* received from the master. In the latter case, the master is
|
|
* responsible for key expiry. If we would expire keys here, the
|
|
* snapshot taken by the master may not be reflected on the slave.
|
|
* Similarly if the RDB is the preamble of an AOF file, we want to
|
|
* load all the keys as they are, since the log of operations later
|
|
* assume to work in an exact keyspace state. */
|
|
if (ServerState::tlocal()->is_master && settings->has_expired) {
|
|
VLOG(3) << "Expire key on read: " << key;
|
|
return true;
|
|
}
|
|
|
|
return false;
|
|
}
|
|
|
|
void RdbLoader::LoadScriptFromAux(string&& body) {
|
|
ServerState* ss = ServerState::tlocal();
|
|
auto interpreter = ss->BorrowInterpreter();
|
|
absl::Cleanup clean = [ss, interpreter]() { ss->ReturnInterpreter(interpreter); };
|
|
|
|
if (script_mgr_) {
|
|
auto res = script_mgr_->Insert(body, interpreter);
|
|
if (!res)
|
|
LOG(ERROR) << "Error compiling script";
|
|
}
|
|
}
|
|
|
|
void RdbLoader::LoadSearchIndexDefFromAux(string&& def) {
|
|
facade::CapturingReplyBuilder crb{};
|
|
ConnectionContext cntx{nullptr, nullptr};
|
|
cntx.is_replicating = true;
|
|
cntx.journal_emulated = true;
|
|
cntx.skip_acl_validation = true;
|
|
cntx.ns = &namespaces->GetDefaultNamespace();
|
|
|
|
uint32_t consumed = 0;
|
|
facade::RespVec resp_vec;
|
|
facade::RedisParser parser;
|
|
|
|
def += "\r\n"; // RESP terminator
|
|
io::MutableBytes buffer{reinterpret_cast<uint8_t*>(def.data()), def.size()};
|
|
auto res = parser.Parse(buffer, &consumed, &resp_vec);
|
|
|
|
if (res != facade::RedisParser::Result::OK) {
|
|
LOG(ERROR) << "Bad index definition: " << def;
|
|
return;
|
|
}
|
|
|
|
// Prepend FT.CREATE to index definiton
|
|
CmdArgVec arg_vec;
|
|
facade::RespExpr::VecToArgList(resp_vec, &arg_vec);
|
|
string ft_create = "FT.CREATE";
|
|
arg_vec.insert(arg_vec.begin(), MutableSlice{ft_create.data(), ft_create.size()});
|
|
|
|
service_->DispatchCommand(absl::MakeSpan(arg_vec), &crb, &cntx);
|
|
|
|
auto response = crb.Take();
|
|
if (auto err = facade::CapturingReplyBuilder::TryExtractError(response); err) {
|
|
LOG(ERROR) << "Bad index definition: " << def << " " << err->first;
|
|
}
|
|
}
|
|
|
|
void RdbLoader::PerformPostLoad(Service* service) {
|
|
const CommandId* cmd = service->FindCmd("FT.CREATE");
|
|
if (cmd == nullptr) // On MacOS we don't include search so FT.CREATE won't exist.
|
|
return;
|
|
|
|
// Rebuild all search indices as only their definitions are extracted from the snapshot
|
|
shard_set->AwaitRunningOnShardQueue([](EngineShard* es) {
|
|
es->search_indices()->RebuildAllIndices(
|
|
OpArgs{es, nullptr, DbContext{&namespaces->GetDefaultNamespace(), 0, GetCurrentTimeMs()}});
|
|
});
|
|
}
|
|
|
|
} // namespace dfly
|