chore: Refactor RdbLoad (#564)

Specifically, pass OpaqueObj to parsing routines so that they could fill it instead of creating it.
In addition, this change improves the interface between generic_family and RdbLoad code:
it removes reliance on the internal Item class.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-12-16 16:41:47 +02:00 committed by GitHub
parent 08803e664c
commit d4cad11f86
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
3 changed files with 152 additions and 101 deletions

View file

@ -134,13 +134,15 @@ class RdbRestoreValue : protected RdbLoaderBase {
std::optional<RdbLoaderBase::OpaqueObj> RdbRestoreValue::Parse(std::string_view payload) {
InMemSource source(payload);
src_ = &source;
if (auto type_id = FetchType(); type_id && rdbIsObjectType(type_id.value())) {
io::Result<OpaqueObj> io_res = ReadObj(type_id.value()); // load the type from the input stream
if (!io_res) {
if (io::Result<uint8_t> type_id = FetchType(); type_id && rdbIsObjectType(type_id.value())) {
OpaqueObj obj;
error_code ec = ReadObj(type_id.value(), &obj); // load the type from the input stream
if (ec) {
LOG(ERROR) << "failed to load data for type id " << (unsigned int)type_id.value();
return std::nullopt;
}
return std::optional<OpaqueObj>(std::move(io_res.value()));
return std::optional<OpaqueObj>(std::move(obj));
} else {
LOG(ERROR) << "failed to load type id from the input stream or type id is invalid";
return std::nullopt;
@ -149,20 +151,20 @@ std::optional<RdbLoaderBase::OpaqueObj> RdbRestoreValue::Parse(std::string_view
bool RdbRestoreValue::Add(std::string_view data, std::string_view key, DbSlice& db_slice,
DbIndex index, uint64_t expire_ms) {
auto value_to_load = Parse(data);
if (!value_to_load) {
auto opaque_res = Parse(data);
if (!opaque_res) {
return false;
}
Item item{
.key = std::string(key), .val = std::move(value_to_load.value()), .expire_ms = expire_ms};
PrimeValue pv;
if (auto ec = Visit(item, &pv); ec) {
if (auto ec = FromOpaque(*opaque_res, &pv); ec) {
// we failed - report and exit
LOG(WARNING) << "error while trying to save data: " << ec;
return false;
}
DbContext context{.db_index = index, .time_now_ms = GetCurrentTimeMs()};
auto [it, added] = db_slice.AddOrSkip(context, key, std::move(pv), item.expire_ms);
auto [it, added] = db_slice.AddOrSkip(context, key, std::move(pv), expire_ms);
return added;
}

View file

@ -1183,69 +1183,90 @@ auto RdbLoaderBase::ReadKey() -> io::Result<string> {
return FetchGenericString();
}
auto RdbLoaderBase::ReadObj(int rdbtype) -> io::Result<OpaqueObj> {
error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
io::Result<OpaqueObj> iores;
switch (rdbtype) {
case RDB_TYPE_STRING: {
dest->rdb_type = RDB_TYPE_STRING;
/* Read string value */
auto fetch = ReadStringObj();
if (!fetch)
return make_unexpected(fetch.error());
return OpaqueObj{std::move(*fetch), RDB_TYPE_STRING};
return ReadStringObj(&dest->obj);
}
case RDB_TYPE_SET:
return ReadSet();
iores = ReadSet();
break;
case RDB_TYPE_SET_INTSET:
return ReadIntSet();
iores = ReadIntSet();
break;
case RDB_TYPE_HASH_ZIPLIST:
return ReadHZiplist();
iores = ReadHZiplist();
break;
case RDB_TYPE_HASH:
return ReadHMap();
iores = ReadHMap();
break;
case RDB_TYPE_ZSET:
case RDB_TYPE_ZSET_2:
return ReadZSet(rdbtype);
iores = ReadZSet(rdbtype);
break;
case RDB_TYPE_ZSET_ZIPLIST:
return ReadZSetZL();
iores = ReadZSetZL();
break;
case RDB_TYPE_LIST_QUICKLIST:
case RDB_TYPE_LIST_QUICKLIST_2:
return ReadListQuicklist(rdbtype);
case RDB_TYPE_STREAM_LISTPACKS:
return ReadStreams();
iores = ReadListQuicklist(rdbtype);
break;
case RDB_TYPE_STREAM_LISTPACKS:
iores = ReadStreams();
break;
break;
default:
LOG(ERROR) << "Unsupported rdb type " << rdbtype;
return RdbError(errc::invalid_encoding);
}
LOG(ERROR) << "Unsupported rdb type " << rdbtype;
return Unexpected(errc::invalid_encoding);
if (!iores)
return iores.error();
*dest = std::move(*iores);
return error_code{};
}
auto RdbLoaderBase::ReadStringObj() -> io::Result<RdbVariant> {
error_code RdbLoaderBase::ReadStringObj(RdbVariant* dest) {
bool isencoded;
size_t len;
SET_OR_UNEXPECT(LoadLen(&isencoded), len);
SET_OR_RETURN(LoadLen(&isencoded), len);
if (isencoded) {
switch (len) {
case RDB_ENC_INT8:
case RDB_ENC_INT16:
case RDB_ENC_INT32:
return ReadIntObj(len);
case RDB_ENC_LZF:
return ReadLzf();
case RDB_ENC_INT32: {
io::Result<long long> io_int = ReadIntObj(len);
if (!io_int)
return io_int.error();
dest->emplace<long long>(*io_int);
return error_code{};
}
case RDB_ENC_LZF: {
io::Result<LzfString> lzf = ReadLzf();
if (!lzf)
return lzf.error();
dest->emplace<LzfString>(std::move(lzf.value()));
return error_code{};
}
default:
LOG(ERROR) << "Unknown RDB string encoding " << len;
return Unexpected(errc::rdb_file_corrupted);
return RdbError(errc::rdb_file_corrupted);
}
}
base::PODArray<char> blob;
auto& blob = dest->emplace<base::PODArray<char>>();
blob.resize(len);
error_code ec = FetchBuf(len, blob.data());
if (ec) {
return make_unexpected(ec);
}
return blob;
return ec;
}
io::Result<long long> RdbLoaderBase::ReadIntObj(int enctype) {
@ -1295,24 +1316,24 @@ auto RdbLoaderBase::ReadSet() -> io::Result<OpaqueObj> {
unique_ptr<LoadTrace> res(new LoadTrace);
res->arr.resize(len);
for (size_t i = 0; i < len; i++) {
io::Result<RdbVariant> fetch = ReadStringObj();
if (!fetch) {
return make_unexpected(fetch.error());
error_code ec = ReadStringObj(&res->arr[i].rdb_var);
if (ec) {
return make_unexpected(ec);
}
res->arr[i].rdb_var = std::move(fetch.value());
}
return OpaqueObj{std::move(res), RDB_TYPE_SET};
}
auto RdbLoaderBase::ReadIntSet() -> io::Result<OpaqueObj> {
io::Result<RdbVariant> fetch = ReadStringObj();
if (!fetch) {
return make_unexpected(fetch.error());
RdbVariant obj;
error_code ec = ReadStringObj(&obj);
if (ec) {
return make_unexpected(ec);
}
const LzfString* lzf = get_if<LzfString>(&fetch.value());
const base::PODArray<char>* arr = get_if<base::PODArray<char>>(&fetch.value());
const LzfString* lzf = get_if<LzfString>(&obj);
const base::PODArray<char>* arr = get_if<base::PODArray<char>>(&obj);
if (lzf) {
if (lzf->uncompressed_len == 0 || lzf->compressed_blob.empty())
@ -1324,12 +1345,14 @@ auto RdbLoaderBase::ReadIntSet() -> io::Result<OpaqueObj> {
return Unexpected(errc::rdb_file_corrupted);
}
return OpaqueObj{std::move(*fetch), RDB_TYPE_SET_INTSET};
return OpaqueObj{std::move(obj), RDB_TYPE_SET_INTSET};
}
auto RdbLoaderBase::ReadHZiplist() -> io::Result<OpaqueObj> {
RdbVariant str_obj;
SET_OR_UNEXPECT(ReadStringObj(), str_obj);
error_code ec = ReadStringObj(&str_obj);
if (ec)
return make_unexpected(ec);
if (StrLen(str_obj) == 0) {
return Unexpected(errc::rdb_file_corrupted);
@ -1349,7 +1372,9 @@ auto RdbLoaderBase::ReadHMap() -> io::Result<OpaqueObj> {
load_trace->arr.resize(len * 2);
for (size_t i = 0; i < load_trace->arr.size(); ++i) {
SET_OR_UNEXPECT(ReadStringObj(), load_trace->arr[i].rdb_var);
error_code ec = ReadStringObj(&load_trace->arr[i].rdb_var);
if (ec)
return make_unexpected(ec);
}
return OpaqueObj{std::move(load_trace), RDB_TYPE_HASH};
@ -1369,7 +1394,9 @@ auto RdbLoaderBase::ReadZSet(int rdbtype) -> io::Result<OpaqueObj> {
double score;
for (size_t i = 0; i < load_trace->arr.size(); ++i) {
SET_OR_UNEXPECT(ReadStringObj(), load_trace->arr[i].rdb_var);
error_code ec = ReadStringObj(&load_trace->arr[i].rdb_var);
if (ec)
return make_unexpected(ec);
if (rdbtype == RDB_TYPE_ZSET_2) {
SET_OR_UNEXPECT(FetchBinaryDouble(), score);
} else {
@ -1388,7 +1415,9 @@ auto RdbLoaderBase::ReadZSet(int rdbtype) -> io::Result<OpaqueObj> {
auto RdbLoaderBase::ReadZSetZL() -> io::Result<OpaqueObj> {
RdbVariant str_obj;
SET_OR_UNEXPECT(ReadStringObj(), str_obj);
error_code ec = ReadStringObj(&str_obj);
if (ec)
return make_unexpected(ec);
if (StrLen(str_obj) == 0) {
return Unexpected(errc::rdb_file_corrupted);
@ -1420,7 +1449,10 @@ auto RdbLoaderBase::ReadListQuicklist(int rdbtype) -> io::Result<OpaqueObj> {
}
RdbVariant var;
SET_OR_UNEXPECT(ReadStringObj(), var);
error_code ec = ReadStringObj(&var);
if (ec)
return make_unexpected(ec);
if (StrLen(var) == 0) {
return Unexpected(errc::rdb_file_corrupted);
}
@ -1438,21 +1470,24 @@ auto RdbLoaderBase::ReadStreams() -> io::Result<OpaqueObj> {
unique_ptr<LoadTrace> load_trace(new LoadTrace);
load_trace->arr.resize(listpacks * 2);
error_code ec;
for (size_t i = 0; i < listpacks; ++i) {
/* Get the master ID, the one we'll use as key of the radix tree
* node: the entries inside the listpack itself are delta-encoded
* relatively to this ID. */
RdbVariant stream_id, blob;
SET_OR_UNEXPECT(ReadStringObj(), stream_id);
ec = ReadStringObj(&stream_id);
if (ec)
return make_unexpected(ec);
if (StrLen(stream_id) != sizeof(streamID)) {
LOG(ERROR) << "Stream node key entry is not the size of a stream ID";
return Unexpected(errc::rdb_file_corrupted);
}
SET_OR_UNEXPECT(ReadStringObj(), blob);
ec = ReadStringObj(&blob);
if (ec)
return make_unexpected(ec);
if (StrLen(blob) == 0) {
LOG(ERROR) << "Stream listpacks loading failed";
return Unexpected(errc::rdb_file_corrupted);
@ -1484,7 +1519,9 @@ auto RdbLoaderBase::ReadStreams() -> io::Result<OpaqueObj> {
// sds cgname;
RdbVariant cgname;
SET_OR_UNEXPECT(ReadStringObj(), cgname);
ec = ReadStringObj(&cgname);
if (ec)
return make_unexpected(ec);
cgroup.name = std::move(cgname);
SET_OR_UNEXPECT(LoadLen(nullptr), cgroup.ms);
@ -1529,7 +1566,9 @@ auto RdbLoaderBase::ReadStreams() -> io::Result<OpaqueObj> {
for (size_t j = 0; j < consumers_num; ++j) {
auto& consumer = cgroup.cons_arr[j];
SET_OR_UNEXPECT(ReadStringObj(), consumer.name);
ec = ReadStringObj(&consumer.name);
if (ec)
return make_unexpected(ec);
SET_OR_UNEXPECT(FetchInt<int64_t>(), consumer.seen_time);
@ -1788,10 +1827,10 @@ error_code RdbLoader::Load(io::Source* src) {
std::error_code RdbLoaderBase::EnsureRead(size_t min_sz) {
// In the flow of reading compressed data, we store the uncompressed data to in uncompressed
// buffer. When parsing entries we call ensure read with 9 bytes to read the length of key/value.
// If the key/value is very small (less than 9 bytes) the remainded data in uncompressed buffer
// might contain less than 9 bytes. We need to make sure that we dont read from sink to the
// uncompressed buffer and therefor in this flow we return here.
// buffer. When parsing entries we call ensure read with 9 bytes to read the length of
// key/value. If the key/value is very small (less than 9 bytes) the remainded data in
// uncompressed buffer might contain less than 9 bytes. We need to make sure that we dont read
// from sink to the uncompressed buffer and therefor in this flow we return here.
if (mem_buf_ != &origin_mem_buf_)
return std::error_code{};
if (mem_buf_->InputLen() >= min_sz)
@ -1984,18 +2023,17 @@ void RdbLoader::FlushShardAsync(ShardId sid) {
if (out_buf.empty())
return;
ItemsBuf* ib = new ItemsBuf{std::move(out_buf)};
auto cb = [indx = this->cur_db_index_, ib, this] {
this->LoadItemsBuffer(indx, *ib);
delete ib;
auto cb = [indx = this->cur_db_index_, this, ib = std::move(out_buf)] {
this->LoadItemsBuffer(indx, ib);
};
shard_set->Add(sid, std::move(cb));
}
std::error_code RdbLoaderBase::Visit(const Item& item, CompactObj* pv) {
OpaqueObjLoader visitor(item.val.rdb_type, pv);
std::visit(visitor, item.val.obj);
std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, CompactObj* pv) {
OpaqueObjLoader visitor(opaque.rdb_type, pv);
std::visit(visitor, opaque.obj);
return visitor.ec();
}
@ -2003,21 +2041,25 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
DbSlice& db_slice = EngineShard::tlocal()->db_slice();
DbContext db_cntx{.db_index = db_ind, .time_now_ms = GetCurrentTimeMs()};
for (const auto& item : ib) {
for (const auto* item : ib) {
PrimeValue pv;
if (ec_ = Visit(item, &pv); ec_) {
if (ec_ = FromOpaque(item->val, &pv); ec_) {
stop_early_ = true;
break;
}
if (item.expire_ms > 0 && db_cntx.time_now_ms >= item.expire_ms)
if (item->expire_ms > 0 && db_cntx.time_now_ms >= item->expire_ms)
continue;
auto [it, added] = db_slice.AddOrUpdate(db_cntx, item.key, std::move(pv), item.expire_ms);
auto [it, added] = db_slice.AddOrUpdate(db_cntx, item->key, std::move(pv), item->expire_ms);
if (!added) {
LOG(WARNING) << "RDB has duplicated key '" << item.key << "' in DB " << db_ind;
LOG(WARNING) << "RDB has duplicated key '" << item->key << "' in DB " << db_ind;
}
}
for (auto* item : ib) {
delete item;
}
}
void RdbLoader::ResizeDb(size_t key_num, size_t expire_num) {
@ -2026,22 +2068,19 @@ void RdbLoader::ResizeDb(size_t key_num, size_t expire_num) {
}
error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
/* Read key */
string key;
OpaqueObj val;
Item* item = new Item;
// We free key in LoadItemsBuffer.
SET_OR_RETURN(ReadKey(), key);
// Read key
// We free item in LoadItemsBuffer.
SET_OR_RETURN(ReadKey(), item->key);
io::Result<OpaqueObj> io_res = ReadObj(type);
error_code ec = ReadObj(type, &item->val);
if (!io_res) {
VLOG(1) << "ReadObj error " << io_res.error() << " for key " << key;
return io_res.error();
if (ec) {
VLOG(1) << "ReadObj error " << ec << " for key " << item->key;
return ec;
}
val = std::move(io_res.value());
/* Check if the key already expired. This function is used when loading
* an RDB file from disk, either at startup, or when an RDB was
* received from the master. In the latter case, the master is
@ -2055,12 +2094,12 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
if (should_expire) {
// decrRefCount(val);
} else {
ShardId sid = Shard(key, shard_set->size());
uint64_t expire_at_ms = settings->expiretime;
ShardId sid = Shard(item->key, shard_set->size());
item->expire_ms = settings->expiretime;
auto& out_buf = shard_buf_[sid];
out_buf.emplace_back(Item{std::move(key), std::move(val), expire_at_ms});
out_buf.emplace_back(item);
constexpr size_t kBufSize = 128;
if (out_buf.size() >= kBufSize) {

View file

@ -87,20 +87,13 @@ class RdbLoaderBase {
class OpaqueObjLoader;
struct Item {
std::string key;
OpaqueObj val;
uint64_t expire_ms;
};
using ItemsBuf = std::vector<Item>;
::io::Result<uint8_t> FetchType() {
return FetchInt<uint8_t>();
}
template <typename T> io::Result<T> FetchInt();
std::error_code Visit(const Item& item, CompactObj* pv);
static std::error_code FromOpaque(const OpaqueObj& opaque, CompactObj* pv);
io::Result<uint64_t> LoadLen(bool* is_encoded);
std::error_code FetchBuf(size_t size, void* dest);
@ -114,8 +107,8 @@ class RdbLoaderBase {
::io::Result<std::string> ReadKey();
::io::Result<OpaqueObj> ReadObj(int rdbtype);
::io::Result<RdbVariant> ReadStringObj();
std::error_code ReadObj(int rdbtype, OpaqueObj* dest);
std::error_code ReadStringObj(RdbVariant* rdb_variant);
::io::Result<long long> ReadIntObj(int encoding);
::io::Result<LzfString> ReadLzf();
@ -184,6 +177,23 @@ class RdbLoader : protected RdbLoaderBase {
}
private:
struct Item {
std::string key;
OpaqueObj val;
uint64_t expire_ms;
std::atomic<Item*> next;
friend void MPSC_intrusive_store_next(Item* dest, Item* nxt) {
dest->next.store(nxt, std::memory_order_release);
}
friend Item* MPSC_intrusive_load_next(const Item& src) {
return src.next.load(std::memory_order_acquire);
}
};
using ItemsBuf = std::vector<Item*>;
struct ObjSettings;
std::error_code LoadKeyValPair(int type, ObjSettings* settings);
void ResizeDb(size_t key_num, size_t expire_num);