chore: Add tiered_storage_test. (#613)

1. Support tiered deletion.
2. Add notion of tiered entity in "DEBUG OBJECT" output.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-12-28 10:37:55 +02:00 committed by GitHub
parent e39d266abe
commit 63b83c5b99
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
19 changed files with 225 additions and 62 deletions

View file

@ -876,13 +876,15 @@ void CompactObj::GetString(char* dest) const {
void CompactObj::SetExternal(size_t offset, size_t sz) { void CompactObj::SetExternal(size_t offset, size_t sz) {
SetMeta(EXTERNAL_TAG, mask_ & ~kEncMask); SetMeta(EXTERNAL_TAG, mask_ & ~kEncMask);
u_.ext_ptr.offset = offset; u_.ext_ptr.page_index = offset / 4096;
u_.ext_ptr.page_offset = offset % 4096;
u_.ext_ptr.size = sz; u_.ext_ptr.size = sz;
} }
std::pair<size_t, size_t> CompactObj::GetExternalPtr() const { std::pair<size_t, size_t> CompactObj::GetExternalSlice() const {
DCHECK_EQ(EXTERNAL_TAG, taglen_); DCHECK_EQ(EXTERNAL_TAG, taglen_);
return pair<size_t, size_t>(size_t(u_.ext_ptr.offset), size_t(u_.ext_ptr.size)); size_t offset = size_t(u_.ext_ptr.page_index) * 4096 + u_.ext_ptr.page_offset;
return pair<size_t, size_t>(offset, size_t(u_.ext_ptr.size));
} }
void CompactObj::Reset() { void CompactObj::Reset() {

View file

@ -268,7 +268,7 @@ class CompactObj {
} }
void SetExternal(size_t offset, size_t sz); void SetExternal(size_t offset, size_t sz);
std::pair<size_t, size_t> GetExternalPtr() const; std::pair<size_t, size_t> GetExternalSlice() const;
// In case this object a single blob, returns number of bytes allocated on heap // In case this object a single blob, returns number of bytes allocated on heap
// for that blob. Otherwise returns 0. // for that blob. Otherwise returns 0.
@ -317,9 +317,12 @@ class CompactObj {
} }
struct ExternalPtr { struct ExternalPtr {
size_t offset; uint32_t type : 8;
uint32_t reserved : 24;
uint32_t page_index;
uint16_t page_offset; // 0 for multi-page blobs. != 0 for small blobs.
uint16_t reserved2;
uint32_t size; uint32_t size;
uint32_t unneeded;
} __attribute__((packed)); } __attribute__((packed));
struct JsonWrapper { struct JsonWrapper {

View file

@ -42,7 +42,7 @@ cxx_test(blocking_controller_test dragonfly_lib LABELS DFLY)
cxx_test(snapshot_test dragonfly_lib LABELS DFLY) cxx_test(snapshot_test dragonfly_lib LABELS DFLY)
cxx_test(json_family_test dfly_test_lib LABELS DFLY) cxx_test(json_family_test dfly_test_lib LABELS DFLY)
cxx_test(journal_test dfly_test_lib LABELS DFLY) cxx_test(journal_test dfly_test_lib LABELS DFLY)
cxx_test(tiered_storage_test dfly_test_lib LABELS DFLY)
add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY) add_custom_target(check_dfly WORKING_DIRECTORY .. COMMAND ctest -L DFLY)
add_dependencies(check_dfly dragonfly_test json_family_test list_family_test add_dependencies(check_dfly dragonfly_test json_family_test list_family_test

View file

@ -631,7 +631,7 @@ std::string GetString(const PrimeValue& pv, EngineShard* shard) {
std::string res; std::string res;
if (pv.IsExternal()) { if (pv.IsExternal()) {
auto* tiered = shard->tiered_storage(); auto* tiered = shard->tiered_storage();
auto [offset, size] = pv.GetExternalPtr(); auto [offset, size] = pv.GetExternalSlice();
res.resize(size); res.resize(size);
std::error_code ec = tiered->Read(offset, size, res.data()); std::error_code ec = tiered->Read(offset, size, res.data());

View file

@ -193,13 +193,15 @@ bool ParseDouble(string_view src, double* value) {
#define ADD(x) (x) += o.x #define ADD(x) (x) += o.x
TieredStats& TieredStats::operator+=(const TieredStats& o) { TieredStats& TieredStats::operator+=(const TieredStats& o) {
static_assert(sizeof(TieredStats) == 40); static_assert(sizeof(TieredStats) == 48);
ADD(tiered_reads); ADD(tiered_reads);
ADD(tiered_writes); ADD(tiered_writes);
ADD(storage_capacity); ADD(storage_capacity);
ADD(storage_reserved); ADD(storage_reserved);
ADD(aborted_offloads); ADD(aborted_write_cnt);
ADD(flush_skip_cnt);
return *this; return *this;
} }

View file

@ -99,7 +99,8 @@ struct TieredStats {
// how much was reserved by actively stored items. // how much was reserved by actively stored items.
size_t storage_reserved = 0; size_t storage_reserved = 0;
size_t aborted_offloads = 0; size_t aborted_write_cnt = 0;
size_t flush_skip_cnt = 0;
TieredStats& operator+=(const TieredStats&); TieredStats& operator+=(const TieredStats&);
}; };

View file

@ -37,24 +37,40 @@ static_assert(kPrimeSegmentSize == 32288);
// 24576 // 24576
static_assert(kExpireSegmentSize == 23528); static_assert(kExpireSegmentSize == 23528);
void UpdateStatsOnDeletion(PrimeIterator it, DbTableStats* stats) { void PerformDeletion(PrimeIterator del_it, ExpireIterator exp_it, EngineShard* shard,
size_t value_heap_size = it->second.MallocUsed(); DbTable* table) {
stats->inline_keys -= it->first.IsInline(); if (!exp_it.is_done()) {
stats->obj_memory_usage -= (it->first.MallocUsed() + value_heap_size); table->expire.Erase(exp_it);
if (it->second.ObjType() == OBJ_STRING)
stats->strval_memory_usage -= value_heap_size;
}
void EvictItemFun(PrimeIterator del_it, DbTable* table) {
if (del_it->second.HasExpire()) {
CHECK_EQ(1u, table->expire.Erase(del_it->first));
} }
UpdateStatsOnDeletion(del_it, &table->stats); DbTableStats& stats = table->stats;
const PrimeValue& pv = del_it->second;
if (pv.IsExternal()) {
auto [offset, size] = pv.GetExternalSlice();
DVLOG(2) << "Evicted from bucket " << del_it.bucket_id() << " " << del_it->first.ToString(); stats.tiered_entries--;
stats.tiered_size -= size;
TieredStorage* tiered = shard->tiered_storage();
tiered->Free(offset, size);
}
size_t value_heap_size = pv.MallocUsed();
stats.inline_keys -= del_it->first.IsInline();
stats.obj_memory_usage -= (del_it->first.MallocUsed() + value_heap_size);
if (pv.ObjType() == OBJ_STRING)
stats.strval_memory_usage -= value_heap_size;
table->prime.Erase(del_it); table->prime.Erase(del_it);
}
inline void PerformDeletion(PrimeIterator del_it, EngineShard* shard, DbTable* table) {
ExpireIterator exp_it;
if (del_it->second.HasExpire()) {
exp_it = table->expire.Find(del_it->first);
DCHECK(!exp_it.is_done());
}
PerformDeletion(del_it, exp_it, shard, table);
}; };
class PrimeEvictionPolicy { class PrimeEvictionPolicy {
@ -167,7 +183,7 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT
} }
DbTable* table = db_slice_->GetDBTable(cntx_.db_index); DbTable* table = db_slice_->GetDBTable(cntx_.db_index);
EvictItemFun(last_slot_it, table); PerformDeletion(last_slot_it, db_slice_->shard_owner(), table);
++evicted_; ++evicted_;
} }
me->ShiftRight(bucket_it); me->ShiftRight(bucket_it);
@ -446,16 +462,11 @@ bool DbSlice::Del(DbIndex db_ind, PrimeIterator it) {
} }
auto& db = db_arr_[db_ind]; auto& db = db_arr_[db_ind];
if (it->second.HasExpire()) {
CHECK_EQ(1u, db->expire.Erase(it->first));
}
if (it->second.HasFlag()) { if (it->second.HasFlag()) {
CHECK_EQ(1u, db->mcflag.Erase(it->first)); CHECK_EQ(1u, db->mcflag.Erase(it->first));
} }
UpdateStatsOnDeletion(it, &db->stats); PerformDeletion(it, shard_owner(), db.get());
db->prime.Erase(it);
return true; return true;
} }
@ -474,7 +485,17 @@ void DbSlice::FlushDb(DbIndex db_ind) {
CreateDb(db_ind); CreateDb(db_ind);
db_arr_[db_ind]->trans_locks.swap(db_ptr->trans_locks); db_arr_[db_ind]->trans_locks.swap(db_ptr->trans_locks);
auto cb = [db_ptr = std::move(db_ptr)]() mutable { auto cb = [this, db_ptr = std::move(db_ptr)]() mutable {
if (db_ptr->stats.tiered_entries > 0) {
for (auto it = db_ptr->prime.begin(); it != db_ptr->prime.end(); ++it) {
if (it->second.IsExternal()) {
PerformDeletion(it, shard_owner(), db_ptr.get());
}
}
}
DCHECK_EQ(0u, db_ptr->stats.tiered_entries);
db_ptr.reset(); db_ptr.reset();
mi_heap_collect(ServerState::tlocal()->data_heap(), true); mi_heap_collect(ServerState::tlocal()->data_heap(), true);
}; };
@ -711,7 +732,7 @@ void DbSlice::PreUpdate(DbIndex db_ind, PrimeIterator it) {
// before calling to PreUpdate or it does not need to read it at all. // before calling to PreUpdate or it does not need to read it at all.
// After this code executes, the external blob is lost. // After this code executes, the external blob is lost.
TieredStorage* tiered = shard_owner()->tiered_storage(); TieredStorage* tiered = shard_owner()->tiered_storage();
auto [offset, size] = it->second.GetExternalPtr(); auto [offset, size] = it->second.GetExternalSlice();
tiered->Free(offset, size); tiered->Free(offset, size);
it->second.Reset(); it->second.Reset();
@ -764,9 +785,7 @@ pair<PrimeIterator, ExpireIterator> DbSlice::ExpireIfNeeded(const Context& cntx,
if (time_t(cntx.time_now_ms) < expire_time) if (time_t(cntx.time_now_ms) < expire_time)
return make_pair(it, expire_it); return make_pair(it, expire_it);
db->expire.Erase(expire_it); PerformDeletion(it, expire_it, shard_owner(), db.get());
UpdateStatsOnDeletion(it, &db->stats);
db->prime.Erase(it);
++events_.expired_keys; ++events_.expired_keys;
return make_pair(PrimeIterator{}, ExpireIterator{}); return make_pair(PrimeIterator{}, ExpireIterator{});
@ -892,7 +911,7 @@ size_t DbSlice::EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* t
if (evict_it == it || evict_it->first.IsSticky()) if (evict_it == it || evict_it->first.IsSticky())
continue; continue;
EvictItemFun(evict_it, table); PerformDeletion(evict_it, shard_owner(), table);
++evicted; ++evicted;
if (freed_memory_fun() > memory_to_free) { if (freed_memory_fun() > memory_to_free) {
evict_succeeded = true; evict_succeeded = true;
@ -918,7 +937,7 @@ size_t DbSlice::EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* t
if (evict_it == it || evict_it->first.IsSticky()) if (evict_it == it || evict_it->first.IsSticky())
continue; continue;
EvictItemFun(evict_it, table); PerformDeletion(evict_it, shard_owner(), table);
++evicted; ++evicted;
if (freed_memory_fun() > memory_to_free) { if (freed_memory_fun() > memory_to_free) {

View file

@ -202,7 +202,7 @@ class DbSlice {
*/ */
void FlushDb(DbIndex db_ind); void FlushDb(DbIndex db_ind);
EngineShard* shard_owner() { EngineShard* shard_owner() const {
return owner_; return owner_;
} }

View file

@ -55,6 +55,8 @@ struct ObjInfo {
enum LockStatus { NONE, S, X } lock_status = NONE; enum LockStatus { NONE, S, X } lock_status = NONE;
int64_t ttl = INT64_MAX; int64_t ttl = INT64_MAX;
optional<uint32_t> external_len;
bool has_sec_precision = false; bool has_sec_precision = false;
bool found = false; bool found = false;
}; };
@ -338,11 +340,17 @@ void DebugCmd::Inspect(string_view key) {
PrimeIterator it = pt->Find(key); PrimeIterator it = pt->Find(key);
ObjInfo oinfo; ObjInfo oinfo;
if (IsValid(it)) { if (IsValid(it)) {
const PrimeValue& pv = it->second;
oinfo.found = true; oinfo.found = true;
oinfo.encoding = it->second.Encoding(); oinfo.encoding = pv.Encoding();
oinfo.bucket_id = it.bucket_id(); oinfo.bucket_id = it.bucket_id();
oinfo.slot_id = it.slot_id(); oinfo.slot_id = it.slot_id();
if (it->second.HasExpire()) { if (pv.IsExternal()) {
oinfo.external_len.emplace(pv.GetExternalSlice().second);
}
if (pv.HasExpire()) {
ExpireIterator exp_it = exp_t->Find(it->first); ExpireIterator exp_it = exp_t->Find(it->first);
CHECK(!exp_it.is_done()); CHECK(!exp_it.is_done());
@ -366,13 +374,20 @@ void DebugCmd::Inspect(string_view key) {
ObjInfo res = ess.Await(sid, cb); ObjInfo res = ess.Await(sid, cb);
string resp; string resp;
if (res.found) { if (!res.found) {
StrAppend(&resp, "encoding:", strEncoding(res.encoding), " bucket_id:", res.bucket_id); (*cntx_)->SendError(kKeyNotFoundErr);
StrAppend(&resp, " slot:", res.slot_id, " shard:", sid); return;
}
if (res.ttl != INT64_MAX) { StrAppend(&resp, "encoding:", strEncoding(res.encoding), " bucket_id:", res.bucket_id);
StrAppend(&resp, " ttl:", res.ttl, res.has_sec_precision ? "s" : "ms"); StrAppend(&resp, " slot:", res.slot_id, " shard:", sid);
}
if (res.ttl != INT64_MAX) {
StrAppend(&resp, " ttl:", res.ttl, res.has_sec_precision ? "s" : "ms");
}
if (res.external_len) {
StrAppend(&resp, " spill_len:", *res.external_len);
} }
if (res.lock_status != ObjInfo::NONE) { if (res.lock_status != ObjInfo::NONE) {

View file

@ -52,9 +52,9 @@ class DflyEngineTest : public BaseFamilyTest {
} }
}; };
class DefragDflyEngineTest : public DflyEngineTest { class DefragDflyEngineTest : public BaseFamilyTest {
protected: protected:
DefragDflyEngineTest() : DflyEngineTest() { DefragDflyEngineTest() : BaseFamilyTest() {
num_threads_ = 1; num_threads_ = 1;
} }
}; };
@ -141,8 +141,8 @@ TEST_F(DflyEngineTest, HitMissStats) {
resp = Run({"get", "Key2"}); resp = Run({"get", "Key2"});
ASSERT_THAT(resp, ArgType(RespExpr::NIL)); ASSERT_THAT(resp, ArgType(RespExpr::NIL));
EXPECT_THAT(service_->server_family().GetMetrics().events.hits, 1); EXPECT_THAT(GetMetrics().events.hits, 1);
EXPECT_THAT(service_->server_family().GetMetrics().events.misses, 1); EXPECT_THAT(GetMetrics().events.misses, 1);
} }
TEST_F(DflyEngineTest, MultiEmpty) { TEST_F(DflyEngineTest, MultiEmpty) {

View file

@ -20,7 +20,7 @@ extern "C" {
using namespace std; using namespace std;
ABSL_FLAG(string, backing_prefix, "", ""); ABSL_FLAG(string, spill_file_prefix, "", "");
ABSL_FLAG(uint32_t, hz, 100, ABSL_FLAG(uint32_t, hz, 100,
"Base frequency at which the server performs other background tasks. " "Base frequency at which the server performs other background tasks. "
@ -227,7 +227,7 @@ void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
CompactObj::InitThreadLocal(shard_->memory_resource()); CompactObj::InitThreadLocal(shard_->memory_resource());
SmallString::InitThreadLocal(data_heap); SmallString::InitThreadLocal(data_heap);
string backing_prefix = GetFlag(FLAGS_backing_prefix); string backing_prefix = GetFlag(FLAGS_spill_file_prefix);
if (!backing_prefix.empty()) { if (!backing_prefix.empty()) {
if (pb->GetKind() != ProactorBase::IOURING) { if (pb->GetKind() != ProactorBase::IOURING) {
LOG(ERROR) << "Only ioring based backing storage is supported. Exiting..."; LOG(ERROR) << "Only ioring based backing storage is supported. Exiting...";

View file

@ -95,6 +95,9 @@ TEST_F(GenericFamilyTest, Del) {
exist_fb.Join(); exist_fb.Join();
del_fb.Join(); del_fb.Join();
Run({"setex", "k1", "10", "bar"});
Run({"del", "k1"});
} }
TEST_F(GenericFamilyTest, TTL) { TEST_F(GenericFamilyTest, TTL) {

View file

@ -17,6 +17,7 @@ extern "C" {
#include "base/logging.h" #include "base/logging.h"
#include "facade/facade_test.h" // needed to find operator== for RespExpr. #include "facade/facade_test.h" // needed to find operator== for RespExpr.
#include "io/file.h" #include "io/file.h"
#include "io/file_util.h"
#include "server/engine_shard_set.h" #include "server/engine_shard_set.h"
#include "server/rdb_load.h" #include "server/rdb_load.h"
#include "server/test_utils.h" #include "server/test_utils.h"
@ -31,15 +32,32 @@ using absl::StrCat;
ABSL_DECLARE_FLAG(int32, list_compress_depth); ABSL_DECLARE_FLAG(int32, list_compress_depth);
ABSL_DECLARE_FLAG(int32, list_max_listpack_size); ABSL_DECLARE_FLAG(int32, list_max_listpack_size);
ABSL_DECLARE_FLAG(int, compression_mode); ABSL_DECLARE_FLAG(int, compression_mode);
ABSL_DECLARE_FLAG(string, dbfilename);
namespace dfly { namespace dfly {
class RdbTest : public BaseFamilyTest { class RdbTest : public BaseFamilyTest {
protected: protected:
protected: static void SetUpTestSuite();
void TearDown();
io::FileSource GetSource(string name); io::FileSource GetSource(string name);
}; };
void RdbTest::SetUpTestSuite() {
BaseFamilyTest::SetUpTestSuite();
SetFlag(&FLAGS_dbfilename, "rdbtestdump");
}
void RdbTest::TearDown() {
auto rdb_files = io::StatFiles("rdbtestdump*");
CHECK(rdb_files);
for (const auto& fl : *rdb_files) {
unlink(fl.name.c_str());
}
BaseFamilyTest::TearDown();
}
inline const uint8_t* to_byte(const void* s) { inline const uint8_t* to_byte(const void* s) {
return reinterpret_cast<const uint8_t*>(s); return reinterpret_cast<const uint8_t*>(s);
} }
@ -146,7 +164,7 @@ TEST_F(RdbTest, Stream) {
} }
TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) { TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) {
Run({"debug", "populate", "500000"}); Run({"debug", "populate", "50000"});
for (int i = 0; i <= 3; ++i) { for (int i = 0; i <= 3; ++i) {
SetFlag(&FLAGS_compression_mode, i); SetFlag(&FLAGS_compression_mode, i);
@ -156,7 +174,7 @@ TEST_F(RdbTest, ComressionModeSaveDragonflyAndReload) {
auto save_info = service_->server_family().GetLastSaveInfo(); auto save_info = service_->server_family().GetLastSaveInfo();
resp = Run({"debug", "load", save_info->file_name}); resp = Run({"debug", "load", save_info->file_name});
ASSERT_EQ(resp, "OK"); ASSERT_EQ(resp, "OK");
ASSERT_EQ(500000, CheckedInt({"dbsize"})); ASSERT_EQ(50000, CheckedInt({"dbsize"}));
} }
} }
@ -240,7 +258,8 @@ TEST_F(RdbTest, SaveManyDbs) {
Run({"select", "1"}); Run({"select", "1"});
Run({"debug", "populate", "10000"}); Run({"debug", "populate", "10000"});
}); });
auto metrics = service_->server_family().GetMetrics();
auto metrics = GetMetrics();
ASSERT_EQ(2, metrics.db.size()); ASSERT_EQ(2, metrics.db.size());
EXPECT_EQ(50000, metrics.db[0].key_count); EXPECT_EQ(50000, metrics.db[0].key_count);
EXPECT_EQ(10000, metrics.db[1].key_count); EXPECT_EQ(10000, metrics.db[1].key_count);
@ -272,7 +291,7 @@ TEST_F(RdbTest, SaveManyDbs) {
auto resp = Run({"debug", "reload", "NOSAVE"}); auto resp = Run({"debug", "reload", "NOSAVE"});
EXPECT_EQ(resp, "OK"); EXPECT_EQ(resp, "OK");
metrics = service_->server_family().GetMetrics(); metrics = GetMetrics();
ASSERT_EQ(2, metrics.db.size()); ASSERT_EQ(2, metrics.db.size());
EXPECT_EQ(50000, metrics.db[0].key_count); EXPECT_EQ(50000, metrics.db[0].key_count);
EXPECT_EQ(10000, metrics.db[1].key_count); EXPECT_EQ(10000, metrics.db[1].key_count);

View file

@ -1319,7 +1319,8 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
append("tiered_writes", m.tiered_stats.tiered_writes); append("tiered_writes", m.tiered_stats.tiered_writes);
append("tiered_reserved", m.tiered_stats.storage_reserved); append("tiered_reserved", m.tiered_stats.storage_reserved);
append("tiered_capacity", m.tiered_stats.storage_capacity); append("tiered_capacity", m.tiered_stats.storage_capacity);
append("tiered_aborted_writes", m.tiered_stats.aborted_offloads); append("tiered_aborted_write_total", m.tiered_stats.aborted_write_cnt);
append("tiered_flush_skip_total", m.tiered_stats.flush_skip_cnt);
} }
if (should_enter("PERSISTENCE", true)) { if (should_enter("PERSISTENCE", true)) {

View file

@ -44,7 +44,7 @@ string GetString(EngineShard* shard, const PrimeValue& pv) {
string res; string res;
if (pv.IsExternal()) { if (pv.IsExternal()) {
auto* tiered = shard->tiered_storage(); auto* tiered = shard->tiered_storage();
auto [offset, size] = pv.GetExternalPtr(); auto [offset, size] = pv.GetExternalSlice();
res.resize(size); res.resize(size);
error_code ec = tiered->Read(offset, size, res.data()); error_code ec = tiered->Read(offset, size, res.data());

View file

@ -179,6 +179,14 @@ RespExpr BaseFamilyTest::Run(ArgSlice list) {
return Run(GetId(), list); return Run(GetId(), list);
} }
RespExpr BaseFamilyTest::Run(absl::Span<std::string> span) {
vector<string_view> sv_vec(span.size());
for (unsigned i = 0; i < span.size(); ++i) {
sv_vec[i] = span[i];
}
return Run(sv_vec);
}
RespExpr BaseFamilyTest::Run(std::string_view id, ArgSlice slice) { RespExpr BaseFamilyTest::Run(std::string_view id, ArgSlice slice) {
TestConnWrapper* conn_wrapper = AddFindConn(Protocol::REDIS, id); TestConnWrapper* conn_wrapper = AddFindConn(Protocol::REDIS, id);
@ -292,6 +300,12 @@ int64_t BaseFamilyTest::CheckedInt(ArgSlice list) {
return res; return res;
} }
string BaseFamilyTest::CheckedString(ArgSlice list) {
RespExpr resp = Run(list);
CHECK_EQ(RespExpr::STRING, int(resp.type)) << list;
return string{ToSV(resp.GetBuf())};
}
CmdArgVec BaseFamilyTest::TestConnWrapper::Args(ArgSlice list) { CmdArgVec BaseFamilyTest::TestConnWrapper::Args(ArgSlice list) {
CHECK_NE(0u, list.size()); CHECK_NE(0u, list.size());

View file

@ -47,6 +47,7 @@ class BaseFamilyTest : public ::testing::Test {
} }
RespExpr Run(ArgSlice list); RespExpr Run(ArgSlice list);
RespExpr Run(absl::Span<std::string> list);
RespExpr Run(std::string_view id, ArgSlice list); RespExpr Run(std::string_view id, ArgSlice list);
@ -60,6 +61,7 @@ class BaseFamilyTest : public ::testing::Test {
return CheckedInt(ArgSlice{list.begin(), list.size()}); return CheckedInt(ArgSlice{list.begin(), list.size()});
} }
int64_t CheckedInt(ArgSlice list); int64_t CheckedInt(ArgSlice list);
std::string CheckedString(ArgSlice list);
bool IsLocked(DbIndex db_index, std::string_view key) const; bool IsLocked(DbIndex db_index, std::string_view key) const;
ConnectionContext::DebugInfo GetDebugInfo(const std::string& id) const; ConnectionContext::DebugInfo GetDebugInfo(const std::string& id) const;
@ -71,6 +73,10 @@ class BaseFamilyTest : public ::testing::Test {
TestConnWrapper* AddFindConn(Protocol proto, std::string_view id); TestConnWrapper* AddFindConn(Protocol proto, std::string_view id);
static std::vector<std::string> StrArray(const RespExpr& expr); static std::vector<std::string> StrArray(const RespExpr& expr);
Metrics GetMetrics() const {
return service_->server_family().GetMetrics();
}
void AdvanceTime(int64_t ms) { void AdvanceTime(int64_t ms) {
TEST_current_time_ms += ms; TEST_current_time_ms += ms;
} }

View file

@ -340,13 +340,13 @@ void TieredStorage::FinishIoRequest(int io_res, InflightWriteRequest* req) {
LOG(ERROR) << "Error writing into ssd file: " << util::detail::SafeErrorMessage(-io_res); LOG(ERROR) << "Error writing into ssd file: " << util::detail::SafeErrorMessage(-io_res);
alloc_.Free(req->page_index() * kBlockLen, kBlockLen); alloc_.Free(req->page_index() * kBlockLen, kBlockLen);
req->Undo(&bin_record, &db_slice_); req->Undo(&bin_record, &db_slice_);
++stats_.aborted_offloads; ++stats_.aborted_write_cnt;
} else { } else {
// Also removes the entries from bin_record. // Also removes the entries from bin_record.
uint16_t entries_serialized = req->ExternalizeEntries(&bin_record, &db_slice_); uint16_t entries_serialized = req->ExternalizeEntries(&bin_record, &db_slice_);
if (entries_serialized == 0) { // aborted if (entries_serialized == 0) { // aborted
++stats_.aborted_offloads; ++stats_.aborted_write_cnt;
alloc_.Free(req->page_index() * kBlockLen, kBlockLen); alloc_.Free(req->page_index() * kBlockLen, kBlockLen);
} else { // succeeded. } else { // succeeded.
VLOG(2) << "page_refcnt emplace " << req->page_index(); VLOG(2) << "page_refcnt emplace " << req->page_index();
@ -413,6 +413,7 @@ error_code TieredStorage::ScheduleOffload(DbIndex db_index, PrimeIterator it) {
if (!flush_succeeded) { if (!flush_succeeded) {
// we could not flush because I/O is saturated, so lets remove the last item. // we could not flush because I/O is saturated, so lets remove the last item.
bin_record.pending_entries.erase(it->first.AsRef()); bin_record.pending_entries.erase(it->first.AsRef());
++stats_.flush_skip_cnt;
} }
return error_code{}; return error_code{};

View file

@ -0,0 +1,77 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include <absl/strings/str_cat.h>
#include <gmock/gmock.h>
#include "base/flags.h"
#include "server/test_utils.h"
using namespace std;
using namespace testing;
using absl::SetFlag;
using absl::StrCat;
ABSL_DECLARE_FLAG(string, spill_file_prefix);
namespace dfly {
class TieredStorageTest : public BaseFamilyTest {
protected:
TieredStorageTest() : BaseFamilyTest() {
num_threads_ = 1;
}
void FillExternalKeys(unsigned count);
static void SetUpTestSuite();
};
void TieredStorageTest::SetUpTestSuite() {
BaseFamilyTest::SetUpTestSuite();
SetFlag(&FLAGS_spill_file_prefix, "/tmp/spill");
}
void TieredStorageTest::FillExternalKeys(unsigned count) {
string val(256, 'a');
unsigned batch_cnt = count / 50;
for (unsigned i = 0; i < batch_cnt; ++i) {
vector<string> cmd;
cmd.push_back("mset");
for (unsigned j = 0; j < 50; ++j) {
string key = StrCat("k", i * 100 + j);
cmd.push_back(key);
cmd.push_back(val);
}
Run(absl::Span<string>{cmd});
}
for (unsigned i = batch_cnt * 50; i < count; ++i) {
Run({"set", StrCat("k", i), val});
}
}
TEST_F(TieredStorageTest, Basic) {
FillExternalKeys(5000);
EXPECT_EQ(5000, CheckedInt({"dbsize"}));
Metrics m = GetMetrics();
EXPECT_GT(m.db[0].tiered_entries, 0u);
FillExternalKeys(5000);
m = GetMetrics();
unsigned tiered_entries = m.db[0].tiered_entries;
EXPECT_GT(tiered_entries, 0u);
string resp = CheckedString({"debug", "object", "k1"});
EXPECT_THAT(resp, HasSubstr("spill_len"));
Run({"del", "k1"});
m = GetMetrics();
EXPECT_EQ(m.db[0].tiered_entries, tiered_entries - 1);
}
} // namespace dfly