Switch PrimeTable to DashTable

This commit is contained in:
Roman Gershman 2022-01-20 12:26:10 +02:00
parent d519bccc55
commit 8d30cc8040
9 changed files with 135 additions and 91 deletions

View file

@ -67,6 +67,10 @@ template <typename V> class OpResult : public OpResultBase {
return &v_;
}
const V& operator*() const {
return v_;
}
private:
V v_;
};

View file

@ -30,6 +30,7 @@ DbSlice::DbSlice(uint32_t index, EngineShard* owner) : shard_id_(index), owner_(
DbSlice::~DbSlice() {
// we do not need this code but it's easier to debug in case we encounter
// memory allocation bugs during delete operations.
for (auto& db : db_arr_) {
if (!db)
continue;
@ -43,10 +44,10 @@ void DbSlice::Reserve(DbIndex db_ind, size_t key_size) {
auto& db = db_arr_[db_ind];
DCHECK(db);
db->main_table.reserve(key_size);
db->main_table.Reserve(key_size);
}
auto DbSlice::Find(DbIndex db_index, std::string_view key, unsigned req_obj_type) const
auto DbSlice::Find(DbIndex db_index, string_view key, unsigned req_obj_type) const
-> OpResult<MainIterator> {
auto it = FindExt(db_index, key).first;
@ -60,11 +61,11 @@ auto DbSlice::Find(DbIndex db_index, std::string_view key, unsigned req_obj_type
return it;
}
pair<MainIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, std::string_view key) const {
pair<MainIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, string_view key) const {
DCHECK(IsDbValid(db_ind));
auto& db = db_arr_[db_ind];
MainIterator it = db->main_table.find(key);
MainIterator it = db->main_table.Find(key);
if (!IsValid(it)) {
return make_pair(it, ExpireIterator{});
@ -72,14 +73,14 @@ pair<MainIterator, ExpireIterator> DbSlice::FindExt(DbIndex db_ind, std::string_
ExpireIterator expire_it;
if (it->second.HasExpire()) { // check expiry state
expire_it = db->expire_table.Find(CompactObj{it->first});
expire_it = db->expire_table.Find(it->first);
CHECK(IsValid(expire_it));
if (expire_it->second <= now_ms_) {
db->expire_table.Erase(expire_it);
db->stats.obj_memory_usage -= (it->first.capacity() + it->second.MallocUsed());
db->main_table.erase(it);
db->stats.obj_memory_usage -= (it->first.MallocUsed() + it->second.MallocUsed());
db->main_table.Erase(it);
return make_pair(MainIterator{}, ExpireIterator{});
}
}
@ -109,9 +110,10 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pair<MainIterator,
auto& db = db_arr_[db_index];
MainIterator existing;
pair<MainIterator, bool> res = db->main_table.emplace(key, MainValue{});
CompactObj co_key{key};
pair<MainIterator, bool> res = db->main_table.Insert(std::move(co_key), PrimeValue{});
if (res.second) { // new entry
db->stats.obj_memory_usage += res.first->first.capacity();
db->stats.obj_memory_usage += res.first->first.MallocUsed();
return make_pair(res.first, true);
}
@ -120,13 +122,13 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pair<MainIterator,
DCHECK(IsValid(existing));
if (existing->second.HasExpire()) {
auto expire_it = db->expire_table.Find(CompactObj{existing->first});
auto expire_it = db->expire_table.Find(existing->first);
CHECK(IsValid(expire_it));
if (expire_it->second <= now_ms_) {
db->expire_table.Erase(expire_it);
// Keep the entry but free the object.
// Keep the entry but reset the object.
db->stats.obj_memory_usage -= existing->second.MallocUsed();
existing->second.Reset();
@ -157,12 +159,11 @@ bool DbSlice::Del(DbIndex db_ind, const MainIterator& it) {
}
if (it->second.HasExpire()) {
CompactObj cobj{it->first};
CHECK_EQ(1u, db->expire_table.Erase(cobj));
CHECK_EQ(1u, db->expire_table.Erase(it->first));
}
db->stats.obj_memory_usage -= (it->first.capacity() + it->second.MallocUsed());
db->main_table.erase(it);
db->stats.obj_memory_usage -= (it->first.MallocUsed() + it->second.MallocUsed());
db->main_table.Erase(it);
return true;
}
@ -174,7 +175,7 @@ size_t DbSlice::FlushDb(DbIndex db_ind) {
CHECK(db);
size_t removed = db->main_table.size();
db->main_table.clear();
db->main_table.Clear();
db->expire_table.Clear();
db->stats.obj_memory_usage = 0;
@ -201,15 +202,14 @@ size_t DbSlice::FlushDb(DbIndex db_ind) {
bool DbSlice::Expire(DbIndex db_ind, MainIterator it, uint64_t at) {
auto& db = db_arr_[db_ind];
if (at == 0 && it->second.HasExpire()) {
CompactObj cobj(it->first);
CHECK_EQ(1u, db->expire_table.Erase(cobj));
CHECK_EQ(1u, db->expire_table.Erase(it->first));
it->second.SetExpire(false);
return true;
}
if (!it->second.HasExpire() && at) {
CHECK(db->expire_table.Insert(CompactObj{it->first}, at).second);
CHECK(db->expire_table.Insert(it->first.AsRef(), at).second);
it->second.SetExpire(true);
return true;
@ -218,25 +218,27 @@ bool DbSlice::Expire(DbIndex db_ind, MainIterator it, uint64_t at) {
return false;
}
void DbSlice::AddNew(DbIndex db_ind, string_view key, MainValue obj, uint64_t expire_at_ms) {
void DbSlice::AddNew(DbIndex db_ind, string_view key, PrimeValue obj, uint64_t expire_at_ms) {
CHECK(AddIfNotExist(db_ind, key, std::move(obj), expire_at_ms));
}
bool DbSlice::AddIfNotExist(DbIndex db_ind, string_view key, MainValue obj, uint64_t expire_at_ms) {
bool DbSlice::AddIfNotExist(DbIndex db_ind, string_view key, PrimeValue obj,
uint64_t expire_at_ms) {
DCHECK(!obj.IsRef());
auto& db = db_arr_[db_ind];
CompactObj co_key{key};
auto [new_entry, success] = db->main_table.emplace(key, std::move(obj));
auto [new_entry, success] = db->main_table.Insert(std::move(co_key), std::move(obj));
if (!success)
return false; // in this case obj won't be moved and will be destroyed during unwinding.
db->stats.obj_memory_usage += (new_entry->first.capacity() + new_entry->second.MallocUsed());
db->stats.obj_memory_usage += (new_entry->first.MallocUsed() + new_entry->second.MallocUsed());
if (expire_at_ms) {
new_entry->second.SetExpire(true);
CHECK(db->expire_table.Insert(CompactObj{new_entry->first}, expire_at_ms).second);
CHECK(db->expire_table.Insert(new_entry->first.AsRef(), expire_at_ms).second);
}
return true;

View file

@ -59,11 +59,11 @@ class DbSlice {
bool Expire(DbIndex db_ind, MainIterator main_it, uint64_t at);
// Adds a new entry. Requires: key does not exist in this slice.
void AddNew(DbIndex db_ind, std::string_view key, MainValue obj, uint64_t expire_at_ms);
void AddNew(DbIndex db_ind, std::string_view key, PrimeValue obj, uint64_t expire_at_ms);
// Adds a new entry if a key does not exists. Returns true if insertion took place,
// false otherwise. expire_at_ms equal to 0 - means no expiry.
bool AddIfNotExist(DbIndex db_ind, std::string_view key, MainValue obj, uint64_t expire_at_ms);
bool AddIfNotExist(DbIndex db_ind, std::string_view key, PrimeValue obj, uint64_t expire_at_ms);
// Creates a database with index `db_ind`. If such database exists does nothing.
void ActivateDb(DbIndex db_ind);
@ -114,7 +114,7 @@ class DbSlice {
using LockTable = absl::flat_hash_map<std::string, IntentLock>;
struct DbWrapper {
MainTable main_table;
PrimeTable main_table;
ExpireTable expire_table;
LockTable lock_table;

68
server/detail/table.h Normal file
View file

@ -0,0 +1,68 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include "core/compact_object.h"
#include "core/dash.h"
namespace dfly {
namespace detail {
using PrimeKey = CompactObj;
using PrimeValue = CompactObj;
struct PrimeTablePolicy {
enum { kSlotNum = 14, kBucketNum = 54, kStashBucketNum = 4 };
static constexpr bool kUseVersion = true;
static uint64_t HashFn(const PrimeKey& s) {
return s.HashCode();
}
static uint64_t HashFn(std::string_view u) {
return CompactObj::HashCode(u);
}
static void DestroyKey(PrimeKey& cs) {
cs.Reset();
}
static void DestroyValue(PrimeValue& o) {
o.Reset();
}
static bool Equal(const PrimeKey& s1, std::string_view s2) {
return s1 == s2;
}
static bool Equal(const PrimeKey& s1, const PrimeKey& s2) {
return s1 == s2;
}
};
struct ExpireTablePolicy {
enum { kSlotNum = 12, kBucketNum = 64, kStashBucketNum = 2 };
static constexpr bool kUseVersion = false;
static uint64_t HashFn(const PrimeKey& s) {
return s.HashCode();
}
static void DestroyKey(PrimeKey& cs) {
cs.Reset();
}
static void DestroyValue(uint64_t) {
}
static bool Equal(const PrimeKey& s1, const PrimeKey& s2) {
return s1 == s2;
}
};
} // namespace detail
} // namespace dfly

View file

@ -42,7 +42,7 @@ class Renamer {
struct FindResult {
string_view key;
MainValue val;
PrimeValue val;
uint64_t expire_ts;
bool found = false;
};

View file

@ -59,7 +59,7 @@ namespace dfly {
using namespace std;
namespace {
quicklist* GetQL(const MainValue& mv) {
quicklist* GetQL(const PrimeValue& mv) {
return mv.GetQL();
}
@ -67,17 +67,11 @@ void* listPopSaver(unsigned char* data, size_t sz) {
return createStringObject((char*)data, sz);
}
OpResult<string> ListPop(DbIndex db_ind, const MainValue& mv, ListDir dir) {
VLOG(1) << "Pop db_indx " << db_ind;
if (mv.ObjType() != OBJ_LIST)
return OpStatus::WRONG_TYPE;
string ListPop(ListDir dir, quicklist* ql) {
long long vlong;
robj* value = NULL;
int ql_where = (dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
quicklist* ql = GetQL(mv);
// Empty list automatically removes the key (see below).
CHECK_EQ(1,
@ -103,8 +97,8 @@ class BPopper {
// If OK is returned then use result() to fetch the value.
OpStatus Run(Transaction* t, unsigned msec);
const std::pair<std::string_view, std::string_view> result() const {
return std::pair<std::string_view, std::string_view>(key_, value_);
auto result() const {
return make_pair<string_view, string_view>(key_, value_);
}
bool found() const {
@ -118,7 +112,8 @@ class BPopper {
MainIterator find_it_;
ShardId find_sid_ = std::numeric_limits<ShardId>::max();
std::string key_, value_;
string key_;
string value_;
};
BPopper::BPopper() {
@ -151,6 +146,7 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) {
if (is_multi) {
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
t->Execute(std::move(cb), true);
return OpStatus::TIMED_OUT;
}
@ -176,13 +172,11 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) {
DCHECK(found());
if (shard->shard_id() == find_sid_) {
key_ = find_it_->first;
OpResult<string> res = ListPop(t->db_index(), find_it_->second, ListDir::LEFT);
CHECK(res.ok());
value_ = std::move(res.value());
find_it_->first.GetString(&key_);
quicklist* ql = GetQL(find_it_->second);
value_ = ListPop(ListDir::LEFT, ql);
if (quicklistCount(ql) == 0) {
CHECK(shard->db_slice().Del(t->db_index(), find_it_));
}
@ -348,23 +342,24 @@ OpResult<uint32_t> ListFamily::OpPush(const OpArgs& op_args, std::string_view ke
}
if (new_key) {
es->AwakeWatched(op_args.db_ind, it->first);
// TODO: to use PrimeKey for watched table.
string tmp;
string_view key = it->first.GetSlice(&tmp);
es->AwakeWatched(op_args.db_ind, key);
}
return quicklistCount(ql);
}
OpResult<string> ListFamily::OpPop(const OpArgs& op_args, string_view key, ListDir dir) {
auto& db_slice = op_args.shard->db_slice();
auto [it, expire] = db_slice.FindExt(op_args.db_ind, key);
if (!IsValid(it))
return OpStatus::KEY_NOTFOUND;
OpResult<MainIterator> it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST);
if (!it_res)
return it_res.status();
OpResult<string> res = ListPop(op_args.db_ind, it->second, dir);
if (res) {
quicklist* ql = GetQL(it->second);
if (quicklistCount(ql) == 0) {
CHECK(db_slice.Del(op_args.db_ind, it));
}
quicklist* ql = GetQL((*it_res)->second);
string res = ListPop(dir, ql);
if (quicklistCount(ql) == 0) {
CHECK(db_slice.Del(op_args.db_ind, *it_res));
}
return res;
}

View file

@ -91,10 +91,6 @@ class ReplyBuilder {
as_resp()->SendSimpleString(str);
}
void SendRespBlob(std::string_view str) {
as_resp()->SendDirect(str);
}
void SendGetReply(std::string_view key, uint32_t flags, std::string_view value);
void SendGetNotFound();
@ -122,6 +118,10 @@ class ReplyBuilder {
as_resp()->SendBulkString(str);
}
void SendRespBlob(std::string_view str) {
as_resp()->SendDirect(str);
}
void CloseConnection() {
serializer_->CloseConnection();
}

View file

@ -66,7 +66,7 @@ OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::s
if (params.how == SET_IF_EXISTS)
return OpStatus::SKIPPED;
db_slice_->AddNew(params.db_index, key, MainValue{value}, at_ms);
db_slice_->AddNew(params.db_index, key, PrimeValue{value}, at_ms);
return OpStatus::OK;
}

View file

@ -4,48 +4,23 @@
#pragma once
#include <absl/container/flat_hash_map.h>
#include "core/compact_object.h"
#include "core/dash.h"
#include "server/detail/table.h"
namespace dfly {
namespace detail {
using PrimeKey = detail::PrimeKey;
using PrimeValue = detail::PrimeValue;
struct ExpireTablePolicy {
enum { kSlotNum = 12, kBucketNum = 64, kStashBucketNum = 2 };
static constexpr bool kUseVersion = false;
static uint64_t HashFn(const CompactObj& s) {
return s.HashCode();
}
static void DestroyKey(CompactObj& cs) {
cs.Reset();
}
static void DestroyValue(uint64_t) {
}
static bool Equal(const CompactObj& s1, const CompactObj& s2) {
return s1 == s2;
}
};
} // namespace detail
using MainValue = CompactObj;
using MainTable = absl::flat_hash_map<std::string, MainValue>;
using ExpireTable = DashTable<CompactObj, uint64_t, detail::ExpireTablePolicy>;
using PrimeTable = DashTable<PrimeKey, PrimeValue, detail::PrimeTablePolicy>;
using ExpireTable = DashTable<PrimeKey, uint64_t, detail::ExpireTablePolicy>;
/// Iterators are invalidated when new keys are added to the table or some entries are deleted.
/// Iterators are still valid if a different entry in the table was mutated.
using MainIterator = MainTable::iterator;
using MainIterator = PrimeTable::iterator;
using ExpireIterator = ExpireTable::iterator;
inline bool IsValid(MainIterator it) {
return it != MainIterator{};
return !it.is_done();
}
inline bool IsValid(ExpireIterator it) {