-sfix(rdb_load): replica load expired keys

This commit is contained in:
adi_holden 2023-03-15 12:34:13 +02:00 committed by Roman Gershman
parent bafad66fc3
commit dd03ce0cf0
4 changed files with 24 additions and 16 deletions

View file

@ -2129,6 +2129,7 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
if (item == nullptr) { if (item == nullptr) {
item = new Item; item = new Item;
} }
auto cleanup = absl::Cleanup([item] { delete item; });
// Read key // Read key
SET_OR_RETURN(ReadKey(), item->key); SET_OR_RETURN(ReadKey(), item->key);
@ -2148,22 +2149,23 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
* Similarly if the RDB is the preamble of an AOF file, we want to * Similarly if the RDB is the preamble of an AOF file, we want to
* load all the keys as they are, since the log of operations later * load all the keys as they are, since the log of operations later
* assume to work in an exact keyspace state. */ * assume to work in an exact keyspace state. */
// TODO: check rdbflags&RDBFLAGS_AOF_PREAMBLE logic in rdb.c
bool should_expire = settings->has_expired; // TODO: to implement
if (should_expire) {
// decrRefCount(val);
} else {
ShardId sid = Shard(item->key, shard_set->size());
item->expire_ms = settings->expiretime;
auto& out_buf = shard_buf_[sid]; if (ServerState::tlocal()->is_master && settings->has_expired) {
VLOG(1) << "Expire key: " << item->key;
return kOk;
}
out_buf.emplace_back(item); ShardId sid = Shard(item->key, shard_set->size());
item->expire_ms = settings->expiretime;
constexpr size_t kBufSize = 128; auto& out_buf = shard_buf_[sid];
if (out_buf.size() >= kBufSize) {
FlushShardAsync(sid); out_buf.emplace_back(item);
} std::move(cleanup).Cancel();
constexpr size_t kBufSize = 128;
if (out_buf.size() >= kBufSize) {
FlushShardAsync(sid);
} }
return kOk; return kOk;

View file

@ -186,7 +186,6 @@ class RdbSerializer {
std::unique_ptr<LZF_HSLOT[]> lzf_; std::unique_ptr<LZF_HSLOT[]> lzf_;
CompressionMode compression_mode_; CompressionMode compression_mode_;
// TODO : This compressor impl should support different compression algorithms zstd/lz4 etc.
std::unique_ptr<CompressorImpl> compressor_impl_; std::unique_ptr<CompressorImpl> compressor_impl_;
static constexpr size_t kMinStrSizeToCompress = 256; static constexpr size_t kMinStrSizeToCompress = 256;

View file

@ -231,6 +231,15 @@ TEST_F(RdbTest, ReloadTtl) {
EXPECT_LT(990, CheckedInt({"ttl", "key"})); EXPECT_LT(990, CheckedInt({"ttl", "key"}));
} }
TEST_F(RdbTest, ReloadExpired) {
Run({"set", "key", "val"});
Run({"expire", "key", "2"});
sleep(2);
Run({"debug", "reload"});
auto resp = Run({"get", "key"});
ASSERT_THAT(resp, ArgType(RespExpr::NIL));
}
TEST_F(RdbTest, SaveFlush) { TEST_F(RdbTest, SaveFlush) {
Run({"debug", "populate", "500000"}); Run({"debug", "populate", "500000"});

View file

@ -43,8 +43,6 @@ using absl::StrCat;
namespace { namespace {
// TODO: 2. Use time-out on socket-reads so that we would not deadlock on unresponsive master.
// 3. Support ipv6 at some point.
int ResolveDns(std::string_view host, char* dest) { int ResolveDns(std::string_view host, char* dest) {
struct addrinfo hints, *servinfo; struct addrinfo hints, *servinfo;