mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
feat(core): Add Ttl semantics to string_map (#813)
Now string_map respects the optional ttl argument that accepts ttl time in seconds. Upon traversing and accessing expirerd elements they will be transparently removed and deleted from the map. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
18ce8c3a5b
commit
f92403fb09
5 changed files with 108 additions and 49 deletions
|
@ -86,7 +86,7 @@ size_t DenseSet::PushFront(DenseSet::ChainVectorIterator it, void* data, bool ha
|
|||
}
|
||||
|
||||
if (has_ttl)
|
||||
it->SetTtl();
|
||||
it->SetTtl(true);
|
||||
return ObjectAllocSize(data);
|
||||
}
|
||||
|
||||
|
@ -97,7 +97,7 @@ void DenseSet::PushFront(DenseSet::ChainVectorIterator it, DenseSet::DensePtr pt
|
|||
if (it->IsEmpty()) {
|
||||
it->SetObject(ptr.GetObject());
|
||||
if (ptr.HasTtl())
|
||||
it->SetTtl();
|
||||
it->SetTtl(true);
|
||||
if (ptr.IsLink()) {
|
||||
FreeLink(ptr.AsLink());
|
||||
}
|
||||
|
@ -112,7 +112,7 @@ void DenseSet::PushFront(DenseSet::ChainVectorIterator it, DenseSet::DensePtr pt
|
|||
// allocate a new link if needed and copy the pointer to the new link
|
||||
it->SetLink(NewLink(ptr.Raw(), *it));
|
||||
if (ptr.HasTtl())
|
||||
it->SetTtl();
|
||||
it->SetTtl(true);
|
||||
DCHECK(!it->AsLink()->next.IsEmpty());
|
||||
}
|
||||
}
|
||||
|
@ -313,7 +313,7 @@ void DenseSet::Grow() {
|
|||
}
|
||||
}
|
||||
|
||||
void* DenseSet::AddOrFind(void* ptr, bool has_ttl) {
|
||||
auto DenseSet::AddOrFindDense(void* ptr, bool has_ttl) -> DensePtr* {
|
||||
uint64_t hc = Hash(ptr, 0);
|
||||
|
||||
if (entries_.empty()) {
|
||||
|
@ -332,7 +332,7 @@ void* DenseSet::AddOrFind(void* ptr, bool has_ttl) {
|
|||
uint32_t bucket_id = BucketId(hc);
|
||||
DensePtr* dptr = Find(ptr, bucket_id, 0).second;
|
||||
if (dptr != nullptr) {
|
||||
return dptr->GetObject();
|
||||
return dptr;
|
||||
}
|
||||
|
||||
DCHECK_LT(bucket_id, entries_.size());
|
||||
|
@ -373,7 +373,7 @@ void* DenseSet::AddOrFind(void* ptr, bool has_ttl) {
|
|||
|
||||
DensePtr to_insert(ptr);
|
||||
if (has_ttl)
|
||||
to_insert.SetTtl();
|
||||
to_insert.SetTtl(true);
|
||||
|
||||
while (!entries_[bucket_id].IsEmpty() && entries_[bucket_id].IsDisplaced()) {
|
||||
DensePtr unlinked = PopPtrFront(entries_.begin() + bucket_id);
|
||||
|
@ -497,6 +497,25 @@ void* DenseSet::PopInternal() {
|
|||
return ret;
|
||||
}
|
||||
|
||||
void* DenseSet::AddOrReplaceObj(void* obj, bool has_ttl) {
|
||||
DensePtr* ptr = AddOrFindDense(obj, has_ttl);
|
||||
if (!ptr)
|
||||
return nullptr;
|
||||
|
||||
if (ptr->IsLink()) {
|
||||
ptr = ptr->AsLink();
|
||||
}
|
||||
|
||||
void* res = ptr->Raw();
|
||||
obj_malloc_used_ -= ObjectAllocSize(res);
|
||||
obj_malloc_used_ += ObjectAllocSize(obj);
|
||||
|
||||
ptr->SetObject(obj);
|
||||
ptr->SetTtl(has_ttl);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
/**
|
||||
* stable scanning api. has the same guarantees as redis scan command.
|
||||
* we avoid doing bit-reverse by using a different function to derive a bucket id
|
||||
|
|
|
@ -106,8 +106,11 @@ class DenseSet {
|
|||
return (uptr() & kDisplaceDirectionBit) == kDisplaceDirectionBit ? 1 : -1;
|
||||
}
|
||||
|
||||
void SetTtl() {
|
||||
ptr_ = (void*)(uptr() | kTtlBit);
|
||||
void SetTtl(bool b) {
|
||||
if (b)
|
||||
ptr_ = (void*)(uptr() | kTtlBit);
|
||||
else
|
||||
ptr_ = (void*)(uptr() & (~kTtlBit));
|
||||
}
|
||||
|
||||
void Reset() {
|
||||
|
@ -245,10 +248,6 @@ class DenseSet {
|
|||
return false;
|
||||
}
|
||||
|
||||
// Returns previous object if the object with such key already exists,
|
||||
// Returns null if obj was added.
|
||||
void* AddOrFind(void* obj, bool has_ttl);
|
||||
|
||||
void* FindInternal(const void* obj, uint32_t cookie) const;
|
||||
void* PopInternal();
|
||||
|
||||
|
@ -265,6 +264,17 @@ class DenseSet {
|
|||
obj_malloc_used_ -= delta;
|
||||
}
|
||||
|
||||
// Returns previous if the equivalent object already exists,
|
||||
// Returns nullptr if obj was added.
|
||||
void* AddOrFindObj(void* obj, bool has_ttl) {
|
||||
DensePtr* ptr = AddOrFindDense(obj, has_ttl);
|
||||
return ptr ? ptr->GetObject() : nullptr;
|
||||
}
|
||||
|
||||
// Returns the previous object if it has been replaced.
|
||||
// nullptr, if obj was added.
|
||||
void* AddOrReplaceObj(void* obj, bool has_ttl);
|
||||
|
||||
private:
|
||||
DenseSet(const DenseSet&) = delete;
|
||||
DenseSet& operator=(DenseSet&) = delete;
|
||||
|
@ -297,6 +307,10 @@ class DenseSet {
|
|||
void* PopDataFront(ChainVectorIterator);
|
||||
DensePtr PopPtrFront(ChainVectorIterator);
|
||||
|
||||
// Returns DensePtr if the object with such key already exists,
|
||||
// Returns null if obj was added.
|
||||
DensePtr* AddOrFindDense(void* obj, bool has_ttl);
|
||||
|
||||
// ============ Pseudo Linked List in DenseSet end ==================
|
||||
|
||||
// returns (prev, item) pair. If item is root, then prev is null.
|
||||
|
|
|
@ -19,9 +19,13 @@ namespace dfly {
|
|||
|
||||
namespace {
|
||||
|
||||
constexpr uint64_t kValTtlBit = 1ULL << 63;
|
||||
constexpr uint64_t kValMask = ~kValTtlBit;
|
||||
|
||||
inline sds GetValue(sds key) {
|
||||
char* valptr = key + sdslen(key) + 1;
|
||||
return (char*)absl::little_endian::Load64(valptr);
|
||||
uint64_t val = absl::little_endian::Load64(valptr);
|
||||
return (sds)(kValMask & val);
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
@ -31,29 +35,36 @@ StringMap::~StringMap() {
|
|||
}
|
||||
|
||||
bool StringMap::AddOrUpdate(string_view field, string_view value, uint32_t ttl_sec) {
|
||||
CHECK_EQ(ttl_sec, UINT32_MAX); // TBD
|
||||
|
||||
// 8 additional bytes for a pointer to value.
|
||||
sds newkey = AllocSdsWithSpace(field.size(), 8);
|
||||
sds newkey;
|
||||
size_t meta_offset = field.size() + 1;
|
||||
sds sdsval = sdsnewlen(value.data(), value.size());
|
||||
uint64_t sdsval_tag = uint64_t(sdsval);
|
||||
|
||||
if (ttl_sec == UINT32_MAX) {
|
||||
// The layout is:
|
||||
// key, '\0', 8-byte pointer to value
|
||||
newkey = AllocSdsWithSpace(field.size(), 8);
|
||||
} else {
|
||||
// The layout is:
|
||||
// key, '\0', 8-byte pointer to value, 4-byte absolute time.
|
||||
// the value pointer it tagged.
|
||||
newkey = AllocSdsWithSpace(field.size(), 8 + 4);
|
||||
uint32_t at = time_now() + ttl_sec;
|
||||
absl::little_endian::Store32(newkey + meta_offset + 8, at); // skip the value pointer.
|
||||
sdsval_tag |= kValTtlBit;
|
||||
}
|
||||
|
||||
if (!field.empty()) {
|
||||
memcpy(newkey, field.data(), field.size());
|
||||
}
|
||||
|
||||
sds val = sdsnewlen(value.data(), value.size());
|
||||
absl::little_endian::Store64(newkey + field.size() + 1, uint64_t(val));
|
||||
absl::little_endian::Store64(newkey + meta_offset, sdsval_tag);
|
||||
|
||||
bool has_ttl = false;
|
||||
sds prev_entry = (sds)AddOrFind(newkey, has_ttl);
|
||||
// Replace the whole entry.
|
||||
sds prev_entry = (sds)AddOrReplaceObj(newkey, sdsval_tag & kValTtlBit);
|
||||
if (prev_entry) {
|
||||
sdsfree(newkey);
|
||||
char* valptr = prev_entry + sdslen(prev_entry) + 1;
|
||||
sds prev_val = (sds)absl::little_endian::Load64(valptr);
|
||||
DecreaseMallocUsed(zmalloc_usable_size(sdsAllocPtr(prev_val)));
|
||||
sdsfree(prev_val);
|
||||
|
||||
absl::little_endian::Store64(valptr, uint64_t(val));
|
||||
IncreaseMallocUsed(zmalloc_usable_size(sdsAllocPtr(val)));
|
||||
|
||||
ObjDelete(prev_entry, false);
|
||||
return false;
|
||||
}
|
||||
|
||||
|
@ -61,27 +72,12 @@ bool StringMap::AddOrUpdate(string_view field, string_view value, uint32_t ttl_s
|
|||
}
|
||||
|
||||
bool StringMap::AddOrSkip(std::string_view field, std::string_view value, uint32_t ttl_sec) {
|
||||
CHECK_EQ(ttl_sec, UINT32_MAX); // TBD
|
||||
|
||||
void* obj = FindInternal(&field, 1); // 1 - string_view
|
||||
|
||||
if (obj)
|
||||
return false;
|
||||
|
||||
// 8 additional bytes for a pointer to value.
|
||||
sds newkey = AllocSdsWithSpace(field.size(), 8);
|
||||
if (!field.empty()) {
|
||||
memcpy(newkey, field.data(), field.size());
|
||||
}
|
||||
|
||||
sds val = sdsnewlen(value.data(), value.size());
|
||||
absl::little_endian::Store64(newkey + field.size() + 1, uint64_t(val));
|
||||
|
||||
bool has_ttl = false;
|
||||
sds prev_entry = (sds)AddOrFind(newkey, has_ttl);
|
||||
DCHECK(!prev_entry);
|
||||
|
||||
return true;
|
||||
return AddOrUpdate(field, value, ttl_sec);
|
||||
}
|
||||
|
||||
bool StringMap::Erase(string_view key) {
|
||||
|
@ -146,8 +142,17 @@ size_t StringMap::ObjectAllocSize(const void* obj) const {
|
|||
}
|
||||
|
||||
uint32_t StringMap::ObjExpireTime(const void* obj) const {
|
||||
LOG(FATAL) << "TBD";
|
||||
return 0;
|
||||
sds str = (sds)obj;
|
||||
const char* valptr = str + sdslen(str) + 1;
|
||||
|
||||
uint64_t val = absl::little_endian::Load64(valptr);
|
||||
DCHECK(val & kValTtlBit);
|
||||
if (val & kValTtlBit) {
|
||||
return absl::little_endian::Load32(valptr + 8);
|
||||
}
|
||||
|
||||
// Should not reach.
|
||||
return UINT32_MAX;
|
||||
}
|
||||
|
||||
void StringMap::ObjDelete(void* obj, bool has_ttl) const {
|
||||
|
|
|
@ -90,10 +90,31 @@ TEST_F(StringMapTest, Basic) {
|
|||
EXPECT_GT(sm_->ObjMallocUsed(), sz);
|
||||
it = sm_->begin();
|
||||
EXPECT_STREQ("baraaaaaaaaaaaa2", it->second);
|
||||
|
||||
EXPECT_FALSE(sm_->AddOrSkip("foo", "bar2"));
|
||||
EXPECT_STREQ("baraaaaaaaaaaaa2", it->second);
|
||||
}
|
||||
|
||||
TEST_F(StringMapTest, EmptyFind) {
|
||||
sm_->Find("bar");
|
||||
}
|
||||
|
||||
TEST_F(StringMapTest, Ttl) {
|
||||
EXPECT_TRUE(sm_->AddOrUpdate("bla", "val1", 1));
|
||||
EXPECT_FALSE(sm_->AddOrUpdate("bla", "val2", 1));
|
||||
sm_->set_time(1);
|
||||
EXPECT_TRUE(sm_->AddOrUpdate("bla", "val2", 1));
|
||||
EXPECT_EQ(1u, sm_->Size());
|
||||
|
||||
EXPECT_FALSE(sm_->AddOrSkip("bla", "val3", 2));
|
||||
|
||||
// set ttl to 2, meaning that the key will expire at time 3.
|
||||
EXPECT_TRUE(sm_->AddOrSkip("bla2", "val3", 2));
|
||||
EXPECT_TRUE(sm_->Contains("bla2"));
|
||||
|
||||
sm_->set_time(3);
|
||||
auto it = sm_->begin();
|
||||
EXPECT_TRUE(it == sm_->end());
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -39,7 +39,7 @@ StringSet::~StringSet() {
|
|||
}
|
||||
|
||||
bool StringSet::AddSds(sds s1) {
|
||||
return AddOrFind(s1, false) == nullptr;
|
||||
return AddOrFindObj(s1, false) == nullptr;
|
||||
}
|
||||
|
||||
bool StringSet::Add(string_view src, uint32_t ttl_sec) {
|
||||
|
@ -60,7 +60,7 @@ bool StringSet::Add(string_view src, uint32_t ttl_sec) {
|
|||
has_ttl = true;
|
||||
}
|
||||
|
||||
if (AddOrFind(newsds, has_ttl) != nullptr) {
|
||||
if (AddOrFindObj(newsds, has_ttl) != nullptr) {
|
||||
sdsfree(newsds);
|
||||
return false;
|
||||
}
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue