Implement PEXPIREAT and tune expire dictionary

This commit is contained in:
Roman Gershman 2022-03-12 21:51:35 +02:00
parent 81ffb189ef
commit cceb0d90ca
16 changed files with 166 additions and 44 deletions

View file

@ -172,6 +172,7 @@ API 2.0
- [ ] DISCARD
- [X] Generic Family
- [X] SCAN
- [X] PEXPIREAT
- [X] String Family
- [X] APPEND
- [X] PREPEND (dragonfly specific)
@ -202,12 +203,12 @@ TBD.
## Design decisions along the way
### Expiration deadlines with relative accuracy
I decided to limit the expiration range to 180 days. Moreover, expiration deadlines
I decided to limit the expiration range to 365 days. Moreover, expiration deadlines
with millisecond precision (PEXPIRE/PSETEX etc) will be rounded to closest second
**for deadlines greater than 16777215ms (approximately 280 minutes). In other words,
**for deadlines greater than 33554431ms (approximately 560 minutes). In other words,
expiries of `PEXPIRE key 10010` will expire exactly after 10 seconds and 10ms. However,
`PEXPIRE key 16777300` will expire after 16777 seconds (i.e. 300ms earlier). Similarly,
`PEXPIRE key 16777800` will expire after 16778 seconds, i.e. 200ms later.
`PEXPIRE key 34000300` will expire after 34000 seconds (i.e. 300ms earlier). Similarly,
`PEXPIRE key 34000800` will expire after 34001 seconds, i.e. 200ms later.
Such rounding has at most 0.006% error which I hope is acceptable for ranges so big.
If you it breaks your use-cases - talk to me or open an issue and explain your case.
Such rounding has at most 0.002% error which I hope is acceptable for large ranges.
If it breaks your use-cases - talk to me or open an issue and explain your case.

54
src/core/expire_period.h Normal file
View file

@ -0,0 +1,54 @@
// Copyright 2022, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
#include <cstdint>
namespace dfly {
class ExpirePeriod {
public:
ExpirePeriod() : val_(0), gen_(0), precision_(0) {
static_assert(sizeof(ExpirePeriod) == 4);
}
explicit ExpirePeriod(uint64_t ms, unsigned gen = 0) : ExpirePeriod() {
Set(ms);
}
// in milliseconds
uint64_t duration() const {
return precision_ ? val_ * 1000 : val_;
}
unsigned generation() const {
return gen_;
}
void Set(uint64_t ms);
private:
uint32_t val_ : 25;
uint32_t gen_ : 6;
uint32_t precision_ : 1; // 0 - ms, 1 - sec.
};
inline void ExpirePeriod::Set(uint64_t ms) {
constexpr uint64_t kBarrier = (1ULL << 25);
if (ms < kBarrier) {
val_ = ms;
precision_ = 0;
return;
}
precision_ = 1;
if (ms < kBarrier << 10) {
ms = (ms + 500) / 1000;
}
val_ = ms >= kBarrier ? kBarrier - 1 : ms;
}
} // namespace dfly

View file

@ -16,7 +16,7 @@ namespace dfly {
enum class ListDir : uint8_t { LEFT, RIGHT };
constexpr uint64_t kMaxExpireDeadlineSec = (1u << 24) - 1;
constexpr uint64_t kMaxExpireDeadlineSec = (1u << 25) - 1;
using DbIndex = uint16_t;
using ShardId = uint16_t;

View file

@ -23,6 +23,15 @@ using namespace std;
using namespace util;
using facade::OpStatus;
constexpr auto kPrimeSegmentSize = PrimeTable::kSegBytes;
constexpr auto kExpireSegmentSize = ExpireTable::kSegBytes;
// mi_malloc good size is 32768 just 48 bytes more.
static_assert(kPrimeSegmentSize == 32720);
// 20480 is the next goodsize so we are loosing ~300 bytes or 1.5%.
static_assert(kExpireSegmentSize == 20168);
#define ADD(x) (x) += o.x
DbStats& DbStats::operator+=(const DbStats& o) {
@ -62,6 +71,7 @@ DbSlice::DbWrapper::DbWrapper(std::pmr::memory_resource* mr)
DbSlice::DbSlice(uint32_t index, EngineShard* owner) : shard_id_(index), owner_(owner) {
db_arr_.emplace_back();
CreateDb(0);
expire_base_[0] = expire_base_[1] = 0;
}
DbSlice::~DbSlice() {
@ -190,7 +200,11 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) -> pair<MainIterator,
auto expire_it = db->expire_table.Find(existing->first);
CHECK(IsValid(expire_it));
if (expire_it->second <= now_ms_) {
// TODO: to implement the incremental update of expiry values using multi-generation
// expire_base_ update. Right now we use only index 0.
uint32_t delta_ms = now_ms_ - expire_base_[0];
if (expire_it->second.duration() <= delta_ms) {
db->expire_table.Erase(expire_it);
if (existing->second.HasFlag()) {
@ -284,7 +298,9 @@ bool DbSlice::Expire(DbIndex db_ind, MainIterator it, uint64_t at) {
}
if (!it->second.HasExpire() && at) {
CHECK(db->expire_table.Insert(it->first.AsRef(), at).second);
uint64_t delta = at - expire_base_[0]; // TODO: employ multigen expire updates.
CHECK(db->expire_table.Insert(it->first.AsRef(), ExpirePeriod(delta)).second);
it->second.SetExpire(true);
return true;
@ -342,8 +358,8 @@ pair<MainIterator, bool> DbSlice::AddIfNotExist(DbIndex db_ind, string_view key,
if (expire_at_ms) {
new_entry->second.SetExpire(true);
CHECK(db.expire_table.Insert(new_entry->first.AsRef(), expire_at_ms).second);
uint64_t delta = expire_at_ms - expire_base_[0];
CHECK(db.expire_table.Insert(new_entry->first.AsRef(), ExpirePeriod(delta)).second);
}
return make_pair(new_entry, true);
@ -454,7 +470,11 @@ pair<MainIterator, ExpireIterator> DbSlice::ExpireIfNeeded(DbIndex db_ind, MainI
auto expire_it = db->expire_table.Find(it->first);
CHECK(IsValid(expire_it));
if (expire_it->second > now_ms_)
// TODO: to employ multi-generation update of expire-base and the underlying values.
uint32_t delta_ms = now_ms_ - expire_base_[0];
if (expire_it->second.duration() > delta_ms)
return make_pair(it, expire_it);
db->expire_table.Erase(expire_it);

View file

@ -92,6 +92,15 @@ class DbSlice {
now_ms_ = now_ms;
}
void UpdateExpireBase(uint64_t now, unsigned generation) {
expire_base_[generation & 1] = now;
}
uint64_t expire_base() const {
return expire_base_[0];
}
// returns wall clock in millis as it has been set via UpdateExpireClock.
uint64_t Now() const {
return now_ms_;
@ -212,6 +221,8 @@ class DbSlice {
EngineShard* owner_;
uint64_t now_ms_ = 0; // Used for expire logic, represents a real clock.
uint64_t expire_base_[2]; // Used for expire logic, represents a real clock.
uint64_t version_ = 1; // Used to version entries in the PrimeTable.
mutable SliceEvents events_; // we may change this even for const operations.

View file

@ -6,6 +6,7 @@
#include "core/compact_object.h"
#include "core/dash.h"
#include "core/expire_period.h"
namespace dfly {
@ -45,7 +46,7 @@ struct PrimeTablePolicy {
};
struct ExpireTablePolicy {
enum { kSlotNum = 12, kBucketNum = 64, kStashBucketNum = 2 };
enum { kSlotNum = 14, kBucketNum = 56, kStashBucketNum = 4 };
static constexpr bool kUseVersion = false;
static uint64_t HashFn(const PrimeKey& s) {
@ -56,7 +57,10 @@ struct ExpireTablePolicy {
cs.Reset();
}
static void DestroyValue(uint64_t) {
static void DestroyValue(ExpirePeriod e) {
}
static void DestroyValue(uint32_t val) {
}
static bool Equal(const PrimeKey& s1, const PrimeKey& s2) {

View file

@ -81,6 +81,7 @@ EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time, mi_heap_t*
tmp_str1 = sdsempty();
tmp_str2 = sdsempty();
db_slice_.UpdateExpireBase(absl::GetCurrentTimeNanos() / 1000000, 0);
}
EngineShard::~EngineShard() {

View file

@ -20,8 +20,8 @@ DEFINE_uint32(dbnum, 16, "Number of databases");
namespace dfly {
using namespace std;
using facade::Protocol;
using facade::kExpiryOutOfRange;
using facade::Protocol;
namespace {
@ -61,12 +61,13 @@ OpResult<void> Renamer::Find(ShardId shard_id, const ArgSlice& args) {
FindResult* res = (shard_id == src_sid_) ? &src_res_ : &dest_res_;
res->key = args.front();
auto [it, exp_it] = EngineShard::tlocal()->db_slice().FindExt(db_indx_, res->key);
auto& db_slice = EngineShard::tlocal()->db_slice();
auto [it, exp_it] = db_slice.FindExt(db_indx_, res->key);
res->found = IsValid(it);
if (IsValid(it)) {
res->val = it->second.AsRef();
res->expire_ts = IsValid(exp_it) ? exp_it->second : 0;
res->expire_ts = IsValid(exp_it) ? db_slice.expire_base() + exp_it->second.duration() : 0;
}
return OpStatus::OK;
@ -93,7 +94,6 @@ void Renamer::MoveValues(EngineShard* shard, const ArgSlice& args) {
shard->db_slice().Expire(db_indx_, dest_it, src_res_.expire_ts);
} else {
// we just add the key to destination with the source object.
shard->db_slice().AddNew(db_indx_, dest_key, std::move(src_res_.val), src_res_.expire_ts);
}
}
@ -275,6 +275,29 @@ void GenericFamily::ExpireAt(CmdArgList args, ConnectionContext* cntx) {
}
}
void GenericFamily::PexpireAt(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
string_view msec = ArgS(args, 2);
int64_t int_arg;
if (!absl::SimpleAtoi(msec, &int_arg)) {
return (*cntx)->SendError(kInvalidIntErr);
}
int_arg = std::max(int_arg, 0L);
ExpireParams params{.ts = int_arg, .absolute = true, .unit = MSEC};
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpExpire(OpArgs{shard, t->db_index()}, key, params);
};
OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb));
if (status == OpStatus::OUT_OF_RANGE) {
return (*cntx)->SendError(kExpiryOutOfRange);
} else {
(*cntx)->SendLong(status == OpStatus::OK);
}
}
void GenericFamily::Rename(CmdArgList args, ConnectionContext* cntx) {
OpResult<void> st = RenameGeneric(args, false, cntx);
(*cntx)->SendError(st.status());
@ -451,7 +474,7 @@ OpStatus GenericFamily::OpExpire(const OpArgs& op_args, string_view key,
if (rel_msec <= 0) {
CHECK(db_slice.Del(op_args.db_ind, it));
} else if (IsValid(expire_it)) {
expire_it->second = rel_msec + now_msec;
expire_it->second.Set(rel_msec + now_msec - db_slice.expire_base());
} else {
db_slice.Expire(op_args.db_ind, it, rel_msec + now_msec);
}
@ -468,7 +491,7 @@ OpResult<uint64_t> GenericFamily::OpTtl(Transaction* t, EngineShard* shard, stri
if (!IsValid(expire))
return OpStatus::SKIPPED;
int64_t ttl_ms = expire->second - db_slice.Now();
int64_t ttl_ms = db_slice.expire_base() + expire->second.duration() - db_slice.Now();
DCHECK_GT(ttl_ms, 0); // Otherwise FindExt would return null.
return ttl_ms;
}
@ -514,20 +537,21 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from, str
return OpStatus::KEY_EXISTS;
}
uint64_t exp_ts = IsValid(expire_it) ? expire_it->second : 0;
uint64_t exp_ts = IsValid(expire_it) ? expire_it->second.duration() : 0;
if (IsValid(to_it)) {
to_it->second = std::move(from_it->second);
from_it->second.SetExpire(IsValid(expire_it));
if (IsValid(to_expire)) {
to_it->second.SetExpire(true);
to_expire->second = exp_ts;
to_expire->second.Set(exp_ts);
} else {
to_it->second.SetExpire(false);
db_slice.Expire(op_args.db_ind, to_it, exp_ts);
db_slice.Expire(op_args.db_ind, to_it, exp_ts + db_slice.expire_base());
}
} else {
db_slice.AddNew(op_args.db_ind, to, std::move(from_it->second), exp_ts);
db_slice.AddNew(op_args.db_ind, to, std::move(from_it->second),
exp_ts + db_slice.expire_base());
// Need search again since the container might invalidate the iterators.
from_it = db_slice.FindExt(op_args.db_ind, from).first;
}
@ -578,6 +602,7 @@ void GenericFamily::Register(CommandRegistry* registry) {
<< CI{"EXISTS", CO::READONLY | CO::FAST, -2, 1, -1, 1}.HFUNC(Exists)
<< CI{"EXPIRE", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(Expire)
<< CI{"EXPIREAT", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(ExpireAt)
<< CI{"PEXPIREAT", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(PexpireAt)
<< CI{"RENAME", CO::WRITE, 3, 1, 2, 1}.HFUNC(Rename)
<< CI{"SELECT", kSelectOpts, 2, 0, 0, 0}.HFUNC(Select)
<< CI{"SCAN", CO::READONLY | CO::FAST, -2, 0, 0, 0}.HFUNC(Scan)

View file

@ -42,6 +42,7 @@ class GenericFamily {
static void Exists(CmdArgList args, ConnectionContext* cntx);
static void Expire(CmdArgList args, ConnectionContext* cntx);
static void ExpireAt(CmdArgList args, ConnectionContext* cntx);
static void PexpireAt(CmdArgList args, ConnectionContext* cntx);
static void Rename(CmdArgList args, ConnectionContext* cntx);
static void RenameNx(CmdArgList args, ConnectionContext* cntx);

View file

@ -27,27 +27,27 @@ class GenericFamilyTest : public BaseFamilyTest {
};
TEST_F(GenericFamilyTest, Expire) {
constexpr uint64_t kNow = 1636070340000;
UpdateTime(kNow);
Run({"set", "key", "val"});
auto resp = Run({"expire", "key", "1"});
EXPECT_THAT(resp[0], IntArg(1));
UpdateTime(kNow + 1000);
UpdateTime(expire_now_ + 1000);
resp = Run({"get", "key"});
EXPECT_THAT(resp, ElementsAre(ArgType(RespExpr::NIL)));
Run({"set", "key", "val"});
resp = Run({"expireat", "key", absl::StrCat((kNow + 2000) / 1000)});
EXPECT_THAT(resp[0], IntArg(1));
resp = Run({"expireat", "key", absl::StrCat((kNow + 3000) / 1000)});
resp = Run({"pexpireat", "key", absl::StrCat(expire_now_ + 2000)});
EXPECT_THAT(resp[0], IntArg(1));
UpdateTime(kNow + 2999);
// override
resp = Run({"pexpireat", "key", absl::StrCat(expire_now_ + 3000)});
EXPECT_THAT(resp[0], IntArg(1));
UpdateTime(expire_now_ + 2999);
resp = Run({"get", "key"});
EXPECT_THAT(resp[0], "val");
UpdateTime(kNow + 3000);
UpdateTime(expire_now_ + 3000);
resp = Run({"get", "key"});
EXPECT_THAT(resp[0], ArgType(RespExpr::NIL));
}

View file

@ -49,12 +49,10 @@ TEST_F(ListFamilyTest, Expire) {
auto resp = Run({"lpush", kKey1, "1"});
EXPECT_THAT(resp[0], IntArg(1));
constexpr uint64_t kNow = 232279092000;
UpdateTime(kNow);
resp = Run({"expire", kKey1, "1"});
EXPECT_THAT(resp[0], IntArg(1));
UpdateTime(kNow + 1000);
UpdateTime(expire_now_ + 1000);
resp = Run({"lpush", kKey1, "1"});
EXPECT_THAT(resp[0], IntArg(1));
}

View file

@ -61,7 +61,7 @@ OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::s
}
if (IsValid(expire_it) && at_ms) {
expire_it->second = at_ms;
expire_it->second.Set(at_ms - db_slice_->expire_base());
} else {
db_slice_->Expire(params.db_index, it, at_ms);
}

View file

@ -54,22 +54,19 @@ TEST_F(StringFamilyTest, Incr) {
}
TEST_F(StringFamilyTest, Expire) {
constexpr uint64_t kNow = 232279092000;
UpdateTime(kNow);
ASSERT_THAT(Run({"set", "key", "val", "PX", "20"}), RespEq("OK"));
UpdateTime(kNow + 10);
UpdateTime(expire_now_ + 10);
EXPECT_THAT(Run({"get", "key"}), RespEq("val"));
UpdateTime(kNow + 20);
UpdateTime(expire_now_ + 20);
EXPECT_THAT(Run({"get", "key"}), ElementsAre(ArgType(RespExpr::NIL)));
ASSERT_THAT(Run({"set", "i", "1", "PX", "10"}), RespEq("OK"));
ASSERT_THAT(Run({"incr", "i"}), ElementsAre(IntArg(2)));
UpdateTime(kNow + 30);
UpdateTime(expire_now_ + 30);
ASSERT_THAT(Run({"incr", "i"}), ElementsAre(IntArg(1)));
}

View file

@ -5,6 +5,7 @@
#pragma once
#include "server/detail/table.h"
#include "core/expire_period.h"
namespace dfly {
@ -12,7 +13,7 @@ using PrimeKey = detail::PrimeKey;
using PrimeValue = detail::PrimeValue;
using PrimeTable = DashTable<PrimeKey, PrimeValue, detail::PrimeTablePolicy>;
using ExpireTable = DashTable<PrimeKey, uint64_t, detail::ExpireTablePolicy>;
using ExpireTable = DashTable<PrimeKey, ExpirePeriod, detail::ExpireTablePolicy>;
/// Iterators are invalidated when new keys are added to the table or some entries are deleted.
/// Iterators are still valid if a different entry in the table was mutated.

View file

@ -74,6 +74,13 @@ void BaseFamilyTest::SetUp() {
service_->Init(nullptr, opts);
ess_ = &service_->shard_set();
expire_now_ = absl::GetCurrentTimeNanos() / 1000000;
auto cb = [&](EngineShard* s) {
s->db_slice().UpdateExpireBase(expire_now_ - 1000, 0);
s->db_slice().UpdateExpireClock(expire_now_);
};
ess_->RunBriefInParallel(cb);
const TestInfo* const test_info = UnitTest::GetInstance()->current_test_info();
LOG(INFO) << "Starting " << test_info->name();
}

View file

@ -69,6 +69,7 @@ class BaseFamilyTest : public ::testing::Test {
// ts is ms
void UpdateTime(uint64_t ms);
std::string GetId() const;
std::unique_ptr<util::ProactorPool> pp_;
@ -79,6 +80,7 @@ class BaseFamilyTest : public ::testing::Test {
absl::flat_hash_map<std::string, std::unique_ptr<TestConnWrapper>> connections_;
::boost::fibers::mutex mu_;
ConnectionContext::DebugInfo last_cmd_dbg_info_;
uint64_t expire_now_;
};
} // namespace dfly