fix: skip empty objects on load and replication (#3514)

* skip empty objects in rdb save
* skip empty objects in rdb load
* delete empty keys in FindReadOnly

---------

Signed-off-by: kostas <kostas@dragonflydb.io>
This commit is contained in:
Kostas Kyrimis 2024-08-20 09:44:41 +03:00 committed by GitHub
parent 84a697dd75
commit b979994025
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 185 additions and 12 deletions

View file

@ -574,6 +574,12 @@ size_t CompactObj::Size() const {
case ROBJ_TAG:
raw_size = u_.r_obj.Size();
break;
case JSON_TAG:
raw_size = u_.json_obj.json_len;
break;
case SBF_TAG:
raw_size = u_.sbf->current_size();
break;
default:
LOG(DFATAL) << "Should not reach " << int(taglen_);
}
@ -684,9 +690,11 @@ void CompactObj::SetJson(JsonType&& j) {
if (taglen_ == JSON_TAG && u_.json_obj.encoding == kEncodingJsonCons) {
// already json
DCHECK(u_.json_obj.json_ptr != nullptr); // must be allocated
u_.json_obj.json_len = j.size();
u_.json_obj.json_ptr->swap(j);
} else {
SetMeta(JSON_TAG);
u_.json_obj.json_len = j.size();
u_.json_obj.json_ptr = AllocateMR<JsonType>(std::move(j));
u_.json_obj.encoding = kEncodingJsonCons;
}
@ -842,6 +850,11 @@ bool CompactObj::HasAllocated() const {
return true;
}
bool CompactObj::TagAllowsEmptyValue() const {
const auto type = ObjType();
return type == OBJ_JSON || type == OBJ_STREAM || type == OBJ_STRING || type == OBJ_SBF;
}
void __attribute__((noinline)) CompactObj::GetString(string* res) const {
res->resize(Size());
GetString(res->data());

View file

@ -398,6 +398,12 @@ class CompactObj {
bool HasAllocated() const;
bool TagAllowsEmptyValue() const;
uint8_t Tag() const {
return taglen_;
}
private:
void EncodeString(std::string_view str);
size_t DecodedLen(size_t sz) const;

View file

@ -34,7 +34,7 @@ struct ShardFFResult {
// Returns (iterator, args-index) if found, KEY_NOTFOUND otherwise.
// If multiple keys are found, returns the first index in the ArgSlice.
OpResult<std::pair<DbSlice::ConstIterator, unsigned>> FindFirstReadOnly(const DbSlice& db_slice,
OpResult<std::pair<DbSlice::ConstIterator, unsigned>> FindFirstReadOnly(DbSlice& db_slice,
const DbContext& cntx,
const ShardArgs& args,
int req_obj_type) {

View file

@ -426,14 +426,25 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutableInternal(const Context& cntx
}
}
DbSlice::ItAndExpConst DbSlice::FindReadOnly(const Context& cntx, std::string_view key) const {
bool DbSlice::DelEmptyPrimeValue(const Context& cntx, Iterator it) {
auto& pv = it->second;
if (!pv.TagAllowsEmptyValue() && pv.Size() == 0) {
auto key = it.key();
LOG(ERROR) << "Found empty key: " << key << " with obj type " << pv.ObjType();
Del(cntx, it);
return true;
}
return false;
}
DbSlice::ItAndExpConst DbSlice::FindReadOnly(const Context& cntx, std::string_view key) {
auto res = FindInternal(cntx, key, std::nullopt, UpdateStatsMode::kReadStats);
return {ConstIterator(res->it, StringOrView::FromView(key)),
ExpConstIterator(res->exp_it, StringOrView::FromView(key))};
}
OpResult<DbSlice::ConstIterator> DbSlice::FindReadOnly(const Context& cntx, string_view key,
unsigned req_obj_type) const {
unsigned req_obj_type) {
auto res = FindInternal(cntx, key, req_obj_type, UpdateStatsMode::kReadStats);
if (res.ok()) {
return ConstIterator(res->it, StringOrView::FromView(key));
@ -443,7 +454,7 @@ OpResult<DbSlice::ConstIterator> DbSlice::FindReadOnly(const Context& cntx, stri
OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std::string_view key,
std::optional<unsigned> req_obj_type,
UpdateStatsMode stats_mode) const {
UpdateStatsMode stats_mode) {
if (!IsDbValid(cntx.db_index)) {
return OpStatus::KEY_NOTFOUND;
}
@ -537,6 +548,9 @@ OpResult<DbSlice::PrimeItAndExp> DbSlice::FindInternal(const Context& cntx, std:
// We do not use TopKey feature, so disable it until we redesign it.
// db.top_keys.Touch(key);
if (DelEmptyPrimeValue(cntx, Iterator(res.it, StringOrView::FromView(key)))) {
return OpStatus::KEY_NOTFOUND;
}
return res;
}

View file

@ -292,9 +292,9 @@ class DbSlice {
ExpConstIterator exp_it;
};
ItAndExpConst FindReadOnly(const Context& cntx, std::string_view key) const;
ItAndExpConst FindReadOnly(const Context& cntx, std::string_view key);
OpResult<ConstIterator> FindReadOnly(const Context& cntx, std::string_view key,
unsigned req_obj_type) const;
unsigned req_obj_type);
struct AddOrFindResult {
Iterator it;
@ -515,6 +515,8 @@ class DbSlice {
void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key);
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size);
bool DelEmptyPrimeValue(const Context& cntx, Iterator it);
OpResult<AddOrFindResult> AddOrUpdateInternal(const Context& cntx, std::string_view key,
PrimeValue obj, uint64_t expire_at_ms,
bool force_update);
@ -555,7 +557,7 @@ class DbSlice {
OpResult<PrimeItAndExp> FindInternal(const Context& cntx, std::string_view key,
std::optional<unsigned> req_obj_type,
UpdateStatsMode stats_mode) const;
UpdateStatsMode stats_mode);
OpResult<ItAndUpdater> FindMutableInternal(const Context& cntx, std::string_view key,
std::optional<unsigned> req_obj_type);

View file

@ -2465,10 +2465,21 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
for (const auto* item : ib) {
PrimeValue pv;
if (ec_ = FromOpaque(item->val, &pv); ec_) {
if ((*ec_).value() == errc::empty_key) {
LOG(ERROR) << "Found empty key: " << item->key << " in DB " << db_ind << " rdb_type "
<< item->val.rdb_type;
continue;
}
LOG(ERROR) << "Could not load value for key '" << item->key << "' in DB " << db_ind;
stop_early_ = true;
break;
}
// We need this extra check because we don't return empty_key
if (!pv.TagAllowsEmptyValue() && pv.Size() == 0) {
LOG(ERROR) << "Found empty key: " << item->key << " in DB " << db_ind << " rdb_type "
<< item->val.rdb_type;
continue;
}
if (item->expire_ms > 0 && db_cntx.time_now_ms >= item->expire_ms)
continue;

View file

@ -59,7 +59,7 @@ class RdbLoaderBase {
struct OpaqueObj {
RdbVariant obj;
int rdb_type;
int rdb_type{0};
};
struct LoadBlob {

View file

@ -337,6 +337,13 @@ error_code RdbSerializer::SelectDb(uint32_t dbid) {
// Called by snapshot
io::Result<uint8_t> RdbSerializer::SaveEntry(const PrimeKey& pk, const PrimeValue& pv,
uint64_t expire_ms, DbIndex dbid) {
if (!pv.TagAllowsEmptyValue() && pv.Size() == 0) {
string_view key = pk.GetSlice(&tmp_str_);
LOG(ERROR) << "SaveEntry skipped empty PrimeValue with key: " << key << " with tag "
<< pv.Tag();
return 0;
}
DVLOG(3) << "Selecting " << dbid << " previous: " << last_entry_db_index_;
SelectDb(dbid);
@ -357,9 +364,9 @@ io::Result<uint8_t> RdbSerializer::SaveEntry(const PrimeKey& pk, const PrimeValu
return make_unexpected(ec);
}
string_view key = pk.GetSlice(&tmp_str_);
uint8_t rdb_type = RdbObjectType(pv);
string_view key = pk.GetSlice(&tmp_str_);
DVLOG(3) << ((void*)this) << ": Saving key/val start " << key << " in dbid=" << dbid;
if (auto ec = WriteOpcode(rdb_type); ec)

View file

@ -0,0 +1,19 @@
import pytest
import asyncio
from .utility import *
@pytest.mark.asyncio
async def test_empty_hash_as_zipmap_bug(async_client):
await async_client.execute_command("HSET foo a_field a_value")
await async_client.execute_command("HSETEX foo 1 b_field b_value")
await async_client.execute_command("HDEL foo a_field")
@assert_eventually
async def check_if_empty():
assert await async_client.execute_command("HGETALL foo") == []
await check_if_empty()
# Key does not exist and it's empty
assert await async_client.execute_command(f"EXISTS foo") == 0

View file

@ -349,7 +349,7 @@ class DflyInstanceFactory:
# Add 1 byte limit for big values
args.setdefault("serialization_max_chunk_size", 1)
if version >= 1.21:
if version > 1.21:
args.setdefault("use_new_io")
for k, v in args.items():

View file

@ -1,4 +1,5 @@
import random
from itertools import chain, repeat
import re
import pytest
@ -28,12 +29,12 @@ M_SLOW = [pytest.mark.slow]
M_STRESS = [pytest.mark.slow, pytest.mark.opt_only]
async def wait_for_replicas_state(*clients, state="online", timeout=0.05):
async def wait_for_replicas_state(*clients, state="online", node_role="slave", timeout=0.05):
"""Wait until all clients (replicas) reach passed state"""
while len(clients) > 0:
await asyncio.sleep(timeout)
roles = await asyncio.gather(*(c.role() for c in clients))
clients = [c for c, role in zip(clients, roles) if role[0] != "slave" or role[3] != state]
clients = [c for c, role in zip(clients, roles) if role[0] != node_role or role[3] != state]
"""
@ -2410,3 +2411,103 @@ async def test_replicate_old_master(
assert await c_replica.execute_command("get", "k1") == "v1"
await disconnect_clients(c_master, c_replica)
# This Test was intorduced in response to a bug when replicating empty hashmaps (encoded as
# ziplists) created with HSET, HSETEX, HDEL and then replicated 2 times.
# For more information plz refer to the issue on gh:
# https://github.com/dragonflydb/dragonfly/issues/3504
@dfly_args({"proactor_threads": 1})
@pytest.mark.asyncio
async def test_empty_hash_map_replicate_old_master(df_factory):
cpu = platform.processor()
if cpu != "x86_64":
pytest.skip(f"Supported only on x64, running on {cpu}")
dfly_version = "v1.21.2"
released_dfly_path = download_dragonfly_release(dfly_version)
# old versions
instances = [df_factory.create(path=released_dfly_path, version=1.21) for i in range(3)]
# new version
instances.append(df_factory.create())
df_factory.start_all(instances)
old_c_master = instances[0].client()
# Create an empty hashmap
await old_c_master.execute_command("HSET foo a_field a_value")
await old_c_master.execute_command("HSETEX foo 2 b_field b_value")
await old_c_master.execute_command("HDEL foo a_field")
@assert_eventually
async def check_if_empty():
assert await old_c_master.execute_command("HGETALL foo") == []
await check_if_empty()
assert await old_c_master.execute_command(f"EXISTS foo") == 1
await old_c_master.close()
async def assert_body(client, result=1, state="online", node_role="slave"):
async with async_timeout.timeout(10):
await wait_for_replicas_state(client, state=state, node_role=node_role)
assert await client.execute_command(f"EXISTS foo") == result
assert await client.execute_command("REPLTAKEOVER 1") == "OK"
index = 0
last_old_replica = 2
# Adjacent pairs
for a, b in zip(instances, instances[1:]):
logging.debug(index)
client_b = b.client()
assert await client_b.execute_command(f"REPLICAOF localhost {a.port}") == "OK"
if index != last_old_replica:
await assert_body(client_b, state="stable_sync", node_role="replica")
else:
await assert_body(client_b, result=0)
index = index + 1
await client_b.close()
# This Test was intorduced in response to a bug when replicating empty hash maps with
# HSET, HSETEX, HDEL and then loaded via replication.
# For more information plz refer to the issue on gh:
# https://github.com/dragonflydb/dragonfly/issues/3504
@dfly_args({"proactor_threads": 1})
@pytest.mark.asyncio
async def test_empty_hashmap_loading_bug(df_factory: DflyInstanceFactory):
cpu = platform.processor()
if cpu != "x86_64":
pytest.skip(f"Supported only on x64, running on {cpu}")
dfly_version = "v1.21.2"
released_dfly_path = download_dragonfly_release(dfly_version)
master = df_factory.create(path=released_dfly_path, version=1.21)
master.start()
c_master = master.client()
# Create an empty hashmap
await c_master.execute_command("HSET foo a_field a_value")
await c_master.execute_command("HSETEX foo 2 b_field b_value")
await c_master.execute_command("HDEL foo a_field")
@assert_eventually
async def check_if_empty():
assert await c_master.execute_command("HGETALL foo") == []
await check_if_empty()
assert await c_master.execute_command(f"EXISTS foo") == 1
replica = df_factory.create()
replica.start()
c_replica = replica.client()
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_for_replicas_state(c_replica)
assert await c_replica.execute_command(f"dbsize") == 0
await close_clients(c_master, c_replica)