fix(server): Save element expirations for hash sets & sets (#2223)

* fix(server): Save element expirations for hash sets

* Add support for sets

* Fixes

* Fixes
This commit is contained in:
Shahar Mike 2023-11-30 14:08:04 +02:00 committed by GitHub
parent fd3fcdcbb4
commit d15bcf8392
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 216 additions and 52 deletions

View file

@ -439,9 +439,11 @@ void RdbLoaderBase::OpaqueObjLoader::operator()(const LzfString& lzfstr) {
void RdbLoaderBase::OpaqueObjLoader::operator()(const unique_ptr<LoadTrace>& ptr) {
switch (rdb_type_) {
case RDB_TYPE_SET:
case RDB_TYPE_SET_WITH_EXPIRY:
CreateSet(ptr.get());
break;
case RDB_TYPE_HASH:
case RDB_TYPE_HASH_WITH_EXPIRY:
CreateHMap(ptr.get());
break;
case RDB_TYPE_LIST_QUICKLIST:
@ -468,7 +470,7 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
size_t len = ltrace->blob_count();
bool is_intset = true;
if (len <= SetFamily::MaxIntsetEntries()) {
if (rdb_type_ == RDB_TYPE_HASH && ltrace->blob_count() <= SetFamily::MaxIntsetEntries()) {
Iterate(*ltrace, [&](const LoadBlob& blob) {
if (!holds_alternative<long long>(blob.rdb_var)) {
is_intset = false;
@ -506,12 +508,22 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
});
} else {
bool use_set2 = GetFlag(FLAGS_use_set2);
if (use_set2) {
StringSet* set = new StringSet{CompactObj::memory_resource()};
set->set_time(MemberTimeSeconds(GetCurrentTimeMs()));
res = createObject(OBJ_SET, set);
res->encoding = OBJ_ENCODING_HT;
} else {
res = createSetObject();
if (rdb_type_ == RDB_TYPE_SET_WITH_EXPIRY) {
LOG(ERROR) << "Detected set with key expiration, but use_set2 is disabled. Unable to load "
"set - key will be ignored.";
pv_->ImportRObj(res);
std::move(cleanup).Cancel();
return;
}
}
// TODO: to move this logic to set_family similarly to ConvertToStrSet.
@ -525,18 +537,41 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
}
if (use_set2) {
Iterate(*ltrace, [&](const LoadBlob& blob) {
sdsele = ToSds(blob.rdb_var);
if (!sdsele)
return false;
size_t increment = 1;
if (rdb_type_ == RDB_TYPE_SET_WITH_EXPIRY) {
increment = 2;
}
if (!((StringSet*)res->ptr)->AddSds(sdsele)) {
LOG(ERROR) << "Duplicate set members detected";
ec_ = RdbError(errc::duplicate_key);
return false;
auto set = (StringSet*)res->ptr;
for (const auto& seg : ltrace->arr) {
for (size_t i = 0; i < seg.size(); i += increment) {
string_view element = ToSV(seg[i].rdb_var);
uint32_t ttl_sec = UINT32_MAX;
if (increment == 2) {
int64_t ttl_time = -1;
string_view ttl_str = ToSV(seg[i + 1].rdb_var);
if (!absl::SimpleAtoi(ttl_str, &ttl_time)) {
LOG(ERROR) << "Can't parse set TTL " << ttl_str;
ec_ = RdbError(errc::rdb_file_corrupted);
return;
}
if (ttl_time != -1) {
if (ttl_time < set->time_now()) {
continue;
}
ttl_sec = ttl_time - set->time_now();
}
}
if (!set->Add(element, ttl_sec)) {
LOG(ERROR) << "Duplicate set members detected";
ec_ = RdbError(errc::duplicate_key);
return;
}
}
return true;
});
}
} else {
Iterate(*ltrace, [&](const LoadBlob& blob) {
sdsele = ToSds(blob.rdb_var);
@ -560,10 +595,14 @@ void RdbLoaderBase::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
}
void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
size_t len = ltrace->blob_count() / 2;
size_t increment = 2;
if (rdb_type_ == RDB_TYPE_HASH_WITH_EXPIRY)
increment = 3;
size_t len = ltrace->blob_count() / increment;
/* Too many entries? Use a hash table right from the start. */
bool keep_lp = (len <= 64);
bool keep_lp = (len <= 64) && (rdb_type_ != RDB_TYPE_HASH_WITH_EXPIRY);
size_t lp_size = 0;
if (keep_lp) {
@ -603,12 +642,13 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
pv_->InitRobj(OBJ_HASH, kEncodingListPack, lp);
} else {
StringMap* string_map = new StringMap;
string_map->set_time(MemberTimeSeconds(GetCurrentTimeMs()));
auto cleanup = absl::MakeCleanup([&] { delete string_map; });
std::string key;
string_map->Reserve(len);
for (const auto& seg : ltrace->arr) {
for (size_t i = 0; i < seg.size(); i += 2) {
for (size_t i = 0; i < seg.size(); i += increment) {
// ToSV may reference an internal buffer, therefore we can use only before the
// next call to ToSV. To workaround, copy the key locally.
key = ToSV(seg[i].rdb_var);
@ -617,7 +657,27 @@ void RdbLoaderBase::OpaqueObjLoader::CreateHMap(const LoadTrace* ltrace) {
if (ec_)
return;
if (!string_map->AddOrSkip(key, val)) {
uint32_t ttl_sec = UINT32_MAX;
if (increment == 3) {
int64_t ttl_time = -1;
string_view ttl_str = ToSV(seg[i + 2].rdb_var);
if (!absl::SimpleAtoi(ttl_str, &ttl_time)) {
LOG(ERROR) << "Can't parse hashmap TTL for " << key << ", val=" << val
<< ", ttl=" << ttl_str;
ec_ = RdbError(errc::rdb_file_corrupted);
return;
}
if (ttl_time != -1) {
if (ttl_time < string_map->time_now()) {
continue;
}
ttl_sec = ttl_time - string_map->time_now();
}
}
if (!string_map->AddOrSkip(key, val, ttl_sec)) {
LOG(ERROR) << "Duplicate hash fields detected for field " << key;
ec_ = RdbError(errc::rdb_file_corrupted);
return;
@ -1310,7 +1370,8 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
return ReadStringObj(&dest->obj);
}
case RDB_TYPE_SET:
iores = ReadSet();
case RDB_TYPE_SET_WITH_EXPIRY:
iores = ReadSet(rdbtype);
break;
case RDB_TYPE_SET_INTSET:
iores = ReadIntSet();
@ -1321,7 +1382,8 @@ error_code RdbLoaderBase::ReadObj(int rdbtype, OpaqueObj* dest) {
iores = ReadGeneric(rdbtype);
break;
case RDB_TYPE_HASH:
iores = ReadHMap();
case RDB_TYPE_HASH_WITH_EXPIRY:
iores = ReadHMap(rdbtype);
break;
case RDB_TYPE_ZSET:
case RDB_TYPE_ZSET_2:
@ -1439,12 +1501,13 @@ auto RdbLoaderBase::ReadLzf() -> io::Result<LzfString> {
return res;
}
auto RdbLoaderBase::ReadSet() -> io::Result<OpaqueObj> {
auto RdbLoaderBase::ReadSet(int rdbtype) -> io::Result<OpaqueObj> {
size_t len;
SET_OR_UNEXPECT(LoadLen(NULL), len);
if (len == 0)
return Unexpected(errc::empty_key);
if (rdbtype == RDB_TYPE_SET_WITH_EXPIRY) {
len *= 2;
}
unique_ptr<LoadTrace> load_trace(new LoadTrace);
load_trace->arr.resize((len + kMaxBlobLen - 1) / kMaxBlobLen);
@ -1459,7 +1522,7 @@ auto RdbLoaderBase::ReadSet() -> io::Result<OpaqueObj> {
}
}
return OpaqueObj{std::move(load_trace), RDB_TYPE_SET};
return OpaqueObj{std::move(load_trace), rdbtype};
}
auto RdbLoaderBase::ReadIntSet() -> io::Result<OpaqueObj> {
@ -1498,16 +1561,19 @@ auto RdbLoaderBase::ReadGeneric(int rdbtype) -> io::Result<OpaqueObj> {
return OpaqueObj{std::move(str_obj), rdbtype};
}
auto RdbLoaderBase::ReadHMap() -> io::Result<OpaqueObj> {
auto RdbLoaderBase::ReadHMap(int rdbtype) -> io::Result<OpaqueObj> {
size_t len;
SET_OR_UNEXPECT(LoadLen(nullptr), len);
if (len == 0)
return Unexpected(errc::empty_key);
unique_ptr<LoadTrace> load_trace(new LoadTrace);
len *= 2;
if (rdbtype == RDB_TYPE_HASH) {
len *= 2;
} else {
DCHECK_EQ(rdbtype, RDB_TYPE_HASH_WITH_EXPIRY);
len *= 3;
}
load_trace->arr.resize((len + kMaxBlobLen - 1) / kMaxBlobLen);
for (size_t i = 0; i < load_trace->arr.size(); ++i) {
size_t n = std::min<size_t>(len, kMaxBlobLen);
@ -1520,7 +1586,7 @@ auto RdbLoaderBase::ReadHMap() -> io::Result<OpaqueObj> {
len -= n;
}
return OpaqueObj{std::move(load_trace), RDB_TYPE_HASH};
return OpaqueObj{std::move(load_trace), rdbtype};
}
auto RdbLoaderBase::ReadZSet(int rdbtype) -> io::Result<OpaqueObj> {