mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 02:15:45 +02:00
bug(dense set): fix scan function to return only home bucket data (#474)
dense set: fix scan function to return only home bucket data Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
22f8554680
commit
97057cadcf
3 changed files with 73 additions and 27 deletions
|
@ -26,8 +26,7 @@ constexpr size_t kMinSize = 1 << kMinSizeShift;
|
|||
constexpr bool kAllowDisplacements = true;
|
||||
|
||||
DenseSet::IteratorBase::IteratorBase(const DenseSet* owner, bool is_end)
|
||||
: owner_(const_cast<DenseSet&>(*owner)),
|
||||
curr_entry_(nullptr) {
|
||||
: owner_(const_cast<DenseSet&>(*owner)), curr_entry_(nullptr) {
|
||||
curr_list_ = is_end ? owner_.entries_.end() : owner_.entries_.begin();
|
||||
if (curr_list_ != owner->entries_.end()) {
|
||||
curr_entry_ = &(*curr_list_);
|
||||
|
@ -173,6 +172,32 @@ bool DenseSet::Equal(DensePtr dptr, const void* ptr, uint32_t cookie) const {
|
|||
return ObjEqual(dptr.GetObject(), ptr, cookie);
|
||||
}
|
||||
|
||||
bool DenseSet::NoItemBelongsBucket(uint32_t bid) const {
|
||||
auto& entries = const_cast<DenseSet*>(this)->entries_;
|
||||
DensePtr* curr = &entries[bid];
|
||||
ExpireIfNeeded(nullptr, curr);
|
||||
if (!curr->IsEmpty() && !curr->IsDisplaced()) {
|
||||
return false;
|
||||
}
|
||||
|
||||
if (bid + 1 < entries_.size()) {
|
||||
DensePtr* right_bucket = &entries[bid + 1];
|
||||
ExpireIfNeeded(nullptr, right_bucket);
|
||||
if (!right_bucket->IsEmpty() && right_bucket->IsDisplaced() &&
|
||||
right_bucket->GetDisplacedDirection() == 1)
|
||||
return false;
|
||||
}
|
||||
|
||||
if (bid > 0) {
|
||||
DensePtr* left_bucket = &entries[bid - 1];
|
||||
ExpireIfNeeded(nullptr, left_bucket);
|
||||
if (!left_bucket->IsEmpty() && left_bucket->IsDisplaced() &&
|
||||
left_bucket->GetDisplacedDirection() == -1)
|
||||
return false;
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
auto DenseSet::FindEmptyAround(uint32_t bid) -> ChainVectorIterator {
|
||||
ExpireIfNeeded(nullptr, &entries_[bid]);
|
||||
|
||||
|
@ -494,33 +519,45 @@ uint32_t DenseSet::Scan(uint32_t cursor, const ItemCb& cb) const {
|
|||
|
||||
auto& entries = const_cast<DenseSet*>(this)->entries_;
|
||||
|
||||
// skip empty entries
|
||||
do {
|
||||
while (entries_idx < entries_.size() && entries_[entries_idx].IsEmpty()) {
|
||||
++entries_idx;
|
||||
}
|
||||
// First find the bucket to scan, skip empty buckets.
|
||||
// A bucket is empty if the current index is empty and the data is not displaced
|
||||
// to the right or to the left.
|
||||
while (entries_idx < entries_.size() && NoItemBelongsBucket(entries_idx)) {
|
||||
++entries_idx;
|
||||
}
|
||||
|
||||
if (entries_idx == entries_.size()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
ExpireIfNeeded(nullptr, &entries[entries_idx]);
|
||||
} while (entries_[entries_idx].IsEmpty());
|
||||
if (entries_idx == entries_.size()) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
DensePtr* curr = &entries[entries_idx];
|
||||
|
||||
// when scanning add all entries in a given chain
|
||||
while (true) {
|
||||
cb(curr->GetObject());
|
||||
if (!curr->IsLink())
|
||||
break;
|
||||
// Check home bucket
|
||||
if (!curr->IsEmpty() && !curr->IsDisplaced()) {
|
||||
// scanning add all entries in a given chain
|
||||
while (true) {
|
||||
cb(curr->GetObject());
|
||||
if (!curr->IsLink())
|
||||
break;
|
||||
|
||||
DensePtr* mcurr = const_cast<DensePtr*>(curr);
|
||||
DensePtr* mcurr = const_cast<DensePtr*>(curr);
|
||||
|
||||
if (ExpireIfNeeded(mcurr, &mcurr->AsLink()->next) && !mcurr->IsLink()) {
|
||||
break;
|
||||
if (ExpireIfNeeded(mcurr, &mcurr->AsLink()->next) && !mcurr->IsLink()) {
|
||||
break;
|
||||
}
|
||||
curr = &curr->AsLink()->next;
|
||||
}
|
||||
}
|
||||
|
||||
// Check if the bucket on the left belongs to the home bucket.
|
||||
if (entries_idx > 0) {
|
||||
DensePtr* left_bucket = &entries[entries_idx - 1];
|
||||
ExpireIfNeeded(nullptr, left_bucket);
|
||||
|
||||
if (left_bucket->IsDisplaced() &&
|
||||
left_bucket->GetDisplacedDirection() == -1) { // left of the home bucket
|
||||
cb(left_bucket->GetObject());
|
||||
}
|
||||
curr = &curr->AsLink()->next;
|
||||
}
|
||||
|
||||
// move to the next index for the next scan and check if we are done
|
||||
|
@ -529,12 +566,13 @@ uint32_t DenseSet::Scan(uint32_t cursor, const ItemCb& cb) const {
|
|||
return 0;
|
||||
}
|
||||
|
||||
// In case of displacement, we want to fully cover the bucket we traversed, therefore
|
||||
// we check if the bucket on the right belongs to the home bucket.
|
||||
ExpireIfNeeded(nullptr, &entries[entries_idx]);
|
||||
// Check if the bucket on the right belongs to the home bucket.
|
||||
DensePtr* right_bucket = &entries[entries_idx];
|
||||
ExpireIfNeeded(nullptr, right_bucket);
|
||||
|
||||
if (entries[entries_idx].GetDisplacedDirection() == 1) { // right of the home bucket
|
||||
cb(entries[entries_idx].GetObject());
|
||||
if (right_bucket->IsDisplaced() &&
|
||||
right_bucket->GetDisplacedDirection() == 1) { // right of the home bucket
|
||||
cb(right_bucket->GetObject());
|
||||
}
|
||||
|
||||
return entries_idx << (32 - capacity_log_);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue