mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
Add expiry semantics to the set. (#340)
This commit is contained in:
parent
44f98de128
commit
08779b154b
5 changed files with 300 additions and 79 deletions
|
@ -25,33 +25,47 @@ constexpr size_t kMinSizeShift = 2;
|
|||
constexpr size_t kMinSize = 1 << kMinSizeShift;
|
||||
constexpr bool kAllowDisplacements = true;
|
||||
|
||||
DenseSet::IteratorBase::IteratorBase(const DenseSet* owner, ChainVectorIterator begin_list)
|
||||
: owner_(*owner), curr_list_(begin_list) {
|
||||
if (begin_list != owner->entries_.end()) {
|
||||
curr_entry_ = *begin_list;
|
||||
DenseSet::IteratorBase::IteratorBase(const DenseSet* owner, bool is_end)
|
||||
: owner_(const_cast<DenseSet&>(*owner)),
|
||||
curr_entry_(nullptr) {
|
||||
curr_list_ = is_end ? owner_.entries_.end() : owner_.entries_.begin();
|
||||
if (curr_list_ != owner->entries_.end()) {
|
||||
curr_entry_ = &(*curr_list_);
|
||||
owner->ExpireIfNeeded(nullptr, curr_entry_);
|
||||
|
||||
// find the first non null entry
|
||||
if (curr_entry_.IsEmpty()) {
|
||||
if (curr_entry_->IsEmpty()) {
|
||||
Advance();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
void DenseSet::IteratorBase::Advance() {
|
||||
if (curr_entry_.IsLink()) {
|
||||
curr_entry_ = curr_entry_.AsLink()->next;
|
||||
DCHECK(!curr_entry_.IsEmpty());
|
||||
} else {
|
||||
bool step_link = false;
|
||||
DCHECK(curr_entry_);
|
||||
|
||||
if (curr_entry_->IsLink()) {
|
||||
DenseLinkKey* plink = curr_entry_->AsLink();
|
||||
if (!owner_.ExpireIfNeeded(curr_entry_, &plink->next) || curr_entry_->IsLink()) {
|
||||
curr_entry_ = &plink->next;
|
||||
step_link = true;
|
||||
}
|
||||
}
|
||||
|
||||
if (!step_link) {
|
||||
DCHECK(curr_list_ != owner_.entries_.end());
|
||||
do {
|
||||
++curr_list_;
|
||||
if (curr_list_ == owner_.entries_.end()) {
|
||||
curr_entry_.Reset();
|
||||
curr_entry_ = nullptr;
|
||||
return;
|
||||
}
|
||||
owner_.ExpireIfNeeded(nullptr, &(*curr_list_));
|
||||
} while (curr_list_->IsEmpty());
|
||||
DCHECK(curr_list_ != owner_.entries_.end());
|
||||
curr_entry_ = *curr_list_;
|
||||
curr_entry_ = &(*curr_list_);
|
||||
}
|
||||
DCHECK(!curr_entry_->IsEmpty());
|
||||
}
|
||||
|
||||
DenseSet::DenseSet(pmr::memory_resource* mr) : entries_(mr) {
|
||||
|
@ -61,7 +75,7 @@ DenseSet::~DenseSet() {
|
|||
ClearInternal();
|
||||
}
|
||||
|
||||
size_t DenseSet::PushFront(DenseSet::ChainVectorIterator it, void* data) {
|
||||
size_t DenseSet::PushFront(DenseSet::ChainVectorIterator it, void* data, bool has_ttl) {
|
||||
// if this is an empty list assign the value to the empty placeholder pointer
|
||||
if (it->IsEmpty()) {
|
||||
it->SetObject(data);
|
||||
|
@ -70,6 +84,8 @@ size_t DenseSet::PushFront(DenseSet::ChainVectorIterator it, void* data) {
|
|||
it->SetLink(NewLink(data, *it));
|
||||
}
|
||||
|
||||
if (has_ttl)
|
||||
it->SetTtl();
|
||||
return ObjectAllocSize(data);
|
||||
}
|
||||
|
||||
|
@ -79,6 +95,8 @@ void DenseSet::PushFront(DenseSet::ChainVectorIterator it, DenseSet::DensePtr pt
|
|||
|
||||
if (it->IsEmpty()) {
|
||||
it->SetObject(ptr.GetObject());
|
||||
if (ptr.HasTtl())
|
||||
it->SetTtl();
|
||||
if (ptr.IsLink()) {
|
||||
FreeLink(ptr.AsLink());
|
||||
}
|
||||
|
@ -92,6 +110,8 @@ void DenseSet::PushFront(DenseSet::ChainVectorIterator it, DenseSet::DensePtr pt
|
|||
|
||||
// allocate a new link if needed and copy the pointer to the new link
|
||||
it->SetLink(NewLink(ptr.Raw(), *it));
|
||||
if (ptr.HasTtl())
|
||||
it->SetTtl();
|
||||
DCHECK(!it->AsLink()->next.IsEmpty());
|
||||
}
|
||||
}
|
||||
|
@ -136,8 +156,9 @@ void* DenseSet::PopDataFront(DenseSet::ChainVectorIterator it) {
|
|||
void DenseSet::ClearInternal() {
|
||||
for (auto it = entries_.begin(); it != entries_.end(); ++it) {
|
||||
while (!it->IsEmpty()) {
|
||||
bool has_ttl = it->HasTtl();
|
||||
void* obj = PopDataFront(it);
|
||||
ObjDelete(obj);
|
||||
ObjDelete(obj, has_ttl);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -153,6 +174,8 @@ bool DenseSet::Equal(DensePtr dptr, const void* ptr, uint32_t cookie) const {
|
|||
}
|
||||
|
||||
auto DenseSet::FindEmptyAround(uint32_t bid) -> ChainVectorIterator {
|
||||
ExpireIfNeeded(nullptr, &entries_[bid]);
|
||||
|
||||
if (entries_[bid].IsEmpty()) {
|
||||
return entries_.begin() + bid;
|
||||
}
|
||||
|
@ -163,12 +186,14 @@ auto DenseSet::FindEmptyAround(uint32_t bid) -> ChainVectorIterator {
|
|||
|
||||
if (bid + 1 < entries_.size()) {
|
||||
auto it = next(entries_.begin(), bid + 1);
|
||||
ExpireIfNeeded(nullptr, &(*it));
|
||||
if (it->IsEmpty())
|
||||
return it;
|
||||
}
|
||||
|
||||
if (bid) {
|
||||
auto it = next(entries_.begin(), bid - 1);
|
||||
ExpireIfNeeded(nullptr, &(*it));
|
||||
if (it->IsEmpty())
|
||||
return it;
|
||||
}
|
||||
|
@ -195,6 +220,13 @@ void DenseSet::Grow() {
|
|||
DensePtr* prev = nullptr;
|
||||
|
||||
while (true) {
|
||||
if (ExpireIfNeeded(prev, curr)) {
|
||||
// if curr has disappeared due to expiry and prev was converted from Link to a
|
||||
// regular DensePtr
|
||||
if (prev && !prev->IsLink())
|
||||
break;
|
||||
}
|
||||
|
||||
if (curr->IsEmpty())
|
||||
break;
|
||||
void* ptr = curr->GetObject();
|
||||
|
@ -254,7 +286,7 @@ void DenseSet::Grow() {
|
|||
}
|
||||
}
|
||||
|
||||
bool DenseSet::AddInternal(void* ptr) {
|
||||
bool DenseSet::AddInternal(void* ptr, bool has_ttl) {
|
||||
uint64_t hc = Hash(ptr, 0);
|
||||
|
||||
if (entries_.empty()) {
|
||||
|
@ -262,7 +294,7 @@ bool DenseSet::AddInternal(void* ptr) {
|
|||
entries_.resize(kMinSize);
|
||||
uint32_t bucket_id = BucketId(hc);
|
||||
auto e = entries_.begin() + bucket_id;
|
||||
obj_malloc_used_ += PushFront(e, ptr);
|
||||
obj_malloc_used_ += PushFront(e, ptr, has_ttl);
|
||||
++size_;
|
||||
++num_used_buckets_;
|
||||
|
||||
|
@ -282,7 +314,7 @@ bool DenseSet::AddInternal(void* ptr) {
|
|||
for (unsigned j = 0; j < 2; ++j) {
|
||||
ChainVectorIterator list = FindEmptyAround(bucket_id);
|
||||
if (list != entries_.end()) {
|
||||
obj_malloc_used_ += PushFront(list, ptr);
|
||||
obj_malloc_used_ += PushFront(list, ptr, has_ttl);
|
||||
if (std::distance(entries_.begin(), list) != bucket_id) {
|
||||
list->SetDisplaced(std::distance(entries_.begin() + bucket_id, list));
|
||||
}
|
||||
|
@ -312,6 +344,9 @@ bool DenseSet::AddInternal(void* ptr) {
|
|||
*/
|
||||
|
||||
DensePtr to_insert(ptr);
|
||||
if (has_ttl)
|
||||
to_insert.SetTtl();
|
||||
|
||||
while (!entries_[bucket_id].IsEmpty() && entries_[bucket_id].IsDisplaced()) {
|
||||
DensePtr unlinked = PopPtrFront(entries_.begin() + bucket_id);
|
||||
|
||||
|
@ -344,6 +379,7 @@ auto DenseSet::Find(const void* ptr, uint32_t bid, uint32_t cookie) -> pair<Dens
|
|||
|
||||
DensePtr* curr = &entries_[bid + offset[j]];
|
||||
|
||||
ExpireIfNeeded(nullptr, curr);
|
||||
if (Equal(*curr, ptr, cookie)) {
|
||||
return make_pair(nullptr, curr);
|
||||
}
|
||||
|
@ -353,6 +389,8 @@ auto DenseSet::Find(const void* ptr, uint32_t bid, uint32_t cookie) -> pair<Dens
|
|||
DensePtr* prev = &entries_[bid];
|
||||
DensePtr* curr = prev->Next();
|
||||
while (curr != nullptr) {
|
||||
ExpireIfNeeded(prev, curr);
|
||||
|
||||
if (Equal(*curr, ptr, cookie)) {
|
||||
return make_pair(prev, curr);
|
||||
}
|
||||
|
@ -364,11 +402,11 @@ auto DenseSet::Find(const void* ptr, uint32_t bid, uint32_t cookie) -> pair<Dens
|
|||
return make_pair(nullptr, nullptr);
|
||||
}
|
||||
|
||||
void* DenseSet::Delete(DensePtr* prev, DensePtr* ptr) {
|
||||
void* ret = nullptr;
|
||||
void DenseSet::Delete(DensePtr* prev, DensePtr* ptr) {
|
||||
void* obj = nullptr;
|
||||
|
||||
if (ptr->IsObject()) {
|
||||
ret = ptr->Raw();
|
||||
obj = ptr->Raw();
|
||||
ptr->Reset();
|
||||
if (prev == nullptr) {
|
||||
--num_used_buckets_;
|
||||
|
@ -388,30 +426,33 @@ void* DenseSet::Delete(DensePtr* prev, DensePtr* ptr) {
|
|||
DCHECK(ptr->IsLink());
|
||||
|
||||
DenseLinkKey* link = ptr->AsLink();
|
||||
ret = link->Raw();
|
||||
obj = link->Raw();
|
||||
*ptr = link->next;
|
||||
--num_chain_entries_;
|
||||
FreeLink(link);
|
||||
}
|
||||
|
||||
obj_malloc_used_ -= ObjectAllocSize(ret);
|
||||
obj_malloc_used_ -= ObjectAllocSize(obj);
|
||||
--size_;
|
||||
|
||||
return ret;
|
||||
ObjDelete(obj, false);
|
||||
}
|
||||
|
||||
void* DenseSet::PopInternal() {
|
||||
std::pmr::vector<DenseSet::DensePtr>::iterator bucket_iter = entries_.begin();
|
||||
|
||||
// find the first non-empty chain
|
||||
while (bucket_iter != entries_.end() && bucket_iter->IsEmpty()) {
|
||||
++bucket_iter;
|
||||
}
|
||||
do {
|
||||
while (bucket_iter != entries_.end() && bucket_iter->IsEmpty()) {
|
||||
++bucket_iter;
|
||||
}
|
||||
|
||||
// empty set
|
||||
if (bucket_iter == entries_.end()) {
|
||||
return nullptr;
|
||||
}
|
||||
// empty set
|
||||
if (bucket_iter == entries_.end()) {
|
||||
return nullptr;
|
||||
}
|
||||
|
||||
ExpireIfNeeded(nullptr, &(*bucket_iter));
|
||||
} while (bucket_iter->IsEmpty());
|
||||
|
||||
if (bucket_iter->IsLink()) {
|
||||
--num_chain_entries_;
|
||||
|
@ -487,4 +528,22 @@ auto DenseSet::NewLink(void* data, DensePtr next) -> DenseLinkKey* {
|
|||
return lk;
|
||||
}
|
||||
|
||||
bool DenseSet::ExpireIfNeeded(DensePtr* prev, DensePtr* node) const {
|
||||
DCHECK_NOTNULL(node);
|
||||
|
||||
bool deleted = false;
|
||||
while (node->HasTtl()) {
|
||||
uint32_t obj_time = ObjExpireTime(node->GetObject());
|
||||
if (obj_time > time_now_) {
|
||||
break;
|
||||
}
|
||||
|
||||
// updates the node to next item if relevant.
|
||||
const_cast<DenseSet*>(this)->Delete(prev, node);
|
||||
deleted = true;
|
||||
}
|
||||
|
||||
return deleted;
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue