diff --git a/src/core/dash.h b/src/core/dash.h index ab08b5a63..771ffcc42 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -213,7 +213,7 @@ class DashTable : public detail::DashTableBase { } size_t bucket_count() const { - return unique_segments_ * SegmentType::kNumBuckets; + return unique_segments_ * SegmentType::kRegularBucketCnt; } // Overall capacity of the table (including stash buckets). @@ -920,8 +920,8 @@ template void DashTable<_Key, _Value, Policy>::TraverseBucket(const_iterator it, Cb&& cb) { SegmentType& s = *segment_[it.seg_id_]; const auto& b = s.GetBucket(it.bucket_id_); - b.ForEachSlot([&](uint8_t slot, bool probe) { - cb(iterator{this, it.seg_id_, it.bucket_id_, slot}); + b.ForEachSlot([it, cb = std::move(cb), table = this](auto* bucket, uint8_t slot, bool probe) { + cb(iterator{table, it.seg_id_, it.bucket_id_, slot}); }); } diff --git a/src/core/dash_internal.h b/src/core/dash_internal.h index 12deb3b79..49b2b3f44 100644 --- a/src/core/dash_internal.h +++ b/src/core/dash_internal.h @@ -201,20 +201,6 @@ template class BucketBase { template std::pair IterateStash(uint8_t fp, bool is_probe, F&& func) const; - // calls for each busy slot: cb(iterator, probe) - template void ForEachSlot(Cb&& cb) const { - uint32_t mask = this->GetBusy(); - uint32_t probe_mask = this->GetProbe(true); - - for (unsigned j = 0; j < NUM_SLOTS; ++j) { - if (mask & 1) { - cb(j, probe_mask & 1); - } - mask >>= 1; - probe_mask >>= 1; - } - } - void Swap(unsigned slot_a, unsigned slot_b) { slotb_.Swap(slot_a, slot_b); std::swap(finger_arr_[slot_a], finger_arr_[slot_b]); @@ -270,17 +256,10 @@ class VersionedBB : public BucketBase { return c; } -#if 0 - // Returns 64 bit bucket version of 2 low bytes zeroed. - /*uint64_t BaseVersion() const { - // high_ is followed by low array. - // Hence little endian load from high_ ptr copies 2 bytes from low_ into 2 highest bytes of c. - uint64_t c = absl::little_endian::Load64(high_); - - // Fix the version by getting rid of 2 garbage bytes. - return c << 16; - }*/ -#endif + void UpdateVersion(uint64_t version) { + uint64_t c = std::max(GetVersion(), version); + absl::little_endian::Store64(version_, c); + } void Clear() { Base::Clear(); @@ -357,6 +336,34 @@ template void ForEachSlot(Cb&& cb) const { + uint32_t mask = this->GetBusy(); + uint32_t probe_mask = this->GetProbe(true); + + for (unsigned j = 0; j < NUM_SLOTS; ++j) { + if (mask & 1) { + cb(this, j, probe_mask & 1); + } + mask >>= 1; + probe_mask >>= 1; + } + } + + // calls for each busy slot: cb(iterator, probe) + template void ForEachSlot(Cb&& cb) { + uint32_t mask = this->GetBusy(); + uint32_t probe_mask = this->GetProbe(true); + + for (unsigned j = 0; j < NUM_SLOTS; ++j) { + if (mask & 1) { + cb(this, j, probe_mask & 1); + } + mask >>= 1; + probe_mask >>= 1; + } + } }; // class Bucket static constexpr uint8_t kNanBid = 0xFF; @@ -385,8 +392,10 @@ template = kNumBuckets) { // Stash + if (bid >= kRegularBucketCnt) { // Stash constexpr auto kLastSlotMask = 1u << (kNumSlots - 1); if (bucket_[bid].GetBusy() & kLastSlotMask) - RemoveStashReference(bid - kNumBuckets, right_hashval); + RemoveStashReference(bid - kRegularBucketCnt, right_hashval); } return bucket_[bid].ShiftRight(); @@ -540,15 +549,15 @@ template > kFingerBits) % kNumBuckets; + return (hash >> kFingerBits) % kRegularBucketCnt; } static uint8_t NextBid(uint8_t bid) { - return bid < kNumBuckets - 1 ? bid + 1 : 0; + return bid < kRegularBucketCnt - 1 ? bid + 1 : 0; } static uint8_t PrevBid(uint8_t bid) { - return bid ? bid - 1 : kNumBuckets - 1; + return bid ? bid - 1 : kRegularBucketCnt - 1; } // if own_items is true it means we try to move owned item to probing bucket. @@ -1041,7 +1050,7 @@ auto Segment::TryMoveFromStash(unsigned stash_id, unsigned s Hash_t key_hash) -> Iterator { uint8_t bid = BucketIndex(key_hash); uint8_t hash_fp = key_hash & kFpMask; - uint8_t stash_bid = kNumBuckets + stash_id; + uint8_t stash_bid = kRegularBucketCnt + stash_id; auto& key = Key(stash_bid, stash_slot_id); auto& value = Value(stash_bid, stash_slot_id); @@ -1058,10 +1067,7 @@ auto Segment::TryMoveFromStash(unsigned stash_id, unsigned s // We maintain the invariant for the physical bucket by updating the version when // the entries move between buckets. uint64_t ver = bucket_[stash_bid].GetVersion(); - uint64_t dest_ver = bucket_[bid].GetVersion(); - if (dest_ver < ver) { - bucket_[bid].SetVersion(ver); - } + bucket_[bid].UpdateVersion(ver); } RemoveStashReference(stash_id, key_hash); return Iterator{bid, SlotId(reg_slot)}; @@ -1120,7 +1126,7 @@ auto Segment::FindIt(U&& key, Hash_t key_hash, Pred&& cf) co auto stash_cb = [&](unsigned overflow_index, unsigned pos) -> SlotId { assert(pos < STASH_BUCKET_NUM); - pos += kNumBuckets; + pos += kRegularBucketCnt; const Bucket& bucket = bucket_[pos]; return bucket.FindByFp(fp_hash, false, key, cf); }; @@ -1133,7 +1139,7 @@ auto Segment::FindIt(U&& key, Hash_t key_hash, Pred&& cf) co for (unsigned i = 0; i < STASH_BUCKET_NUM; ++i) { auto sid = stash_cb(0, i); if (sid != BucketType::kNanSlot) { - return Iterator{uint8_t(kNumBuckets + i), sid}; + return Iterator{uint8_t(kRegularBucketCnt + i), sid}; } } @@ -1147,12 +1153,12 @@ auto Segment::FindIt(U&& key, Hash_t key_hash, Pred&& cf) co auto stash_res = target.IterateStash(fp_hash, false, stash_cb); if (stash_res.second != BucketType::kNanSlot) { - return Iterator{uint8_t(kNumBuckets + stash_res.first), stash_res.second}; + return Iterator{uint8_t(kRegularBucketCnt + stash_res.first), stash_res.second}; } stash_res = probe.IterateStash(fp_hash, true, stash_cb); if (stash_res.second != BucketType::kNanSlot) { - return Iterator{uint8_t(kNumBuckets + stash_res.first), stash_res.second}; + return Iterator{uint8_t(kRegularBucketCnt + stash_res.first), stash_res.second}; } return Iterator{}; } @@ -1161,7 +1167,7 @@ template template void Segment::TraverseAll(Cb&& cb) const { for (uint8_t i = 0; i < kTotalBuckets; ++i) { - bucket_[i].ForEachSlot([&](SlotId slot, bool) { cb(Iterator{i, slot}); }); + bucket_[i].ForEachSlot([&](auto*, SlotId slot, bool) { cb(Iterator{i, slot}); }); } } @@ -1177,8 +1183,8 @@ void Segment::Delete(const Iterator& it, Hash_t key_hash) { auto& b = bucket_[it.index]; - if (it.index >= kNumBuckets) { - RemoveStashReference(it.index - kNumBuckets, key_hash); + if (it.index >= kRegularBucketCnt) { + RemoveStashReference(it.index - kRegularBucketCnt, key_hash); } b.Delete(it.slot); @@ -1197,11 +1203,11 @@ void Segment::Split(HFunc&& hfn, Segment* dest_right) { // do_versioning(); auto is_mine = [this](Hash_t hash) { return (hash >> (64 - local_depth_) & 1) == 0; }; - for (unsigned i = 0; i < kNumBuckets; ++i) { + for (unsigned i = 0; i < kRegularBucketCnt; ++i) { uint32_t invalid_mask = 0; - auto cb = [&](unsigned slot, bool probe) { - auto& key = Key(i, slot); + auto cb = [&](auto* bucket, unsigned slot, bool probe) { + auto& key = bucket->key[slot]; Hash_t hash = hfn(key); // we extract local_depth bits from the left part of the hash. Since we extended local_depth, @@ -1211,8 +1217,8 @@ void Segment::Split(HFunc&& hfn, Segment* dest_right) { invalid_mask |= (1u << slot); - Iterator it = dest_right->InsertUniq(std::forward(key), - std::forward(Value(i, slot)), hash, false); + Iterator it = dest_right->InsertUniq(std::forward(bucket->key[slot]), + std::forward(bucket->value[slot]), hash, false); // we move items residing in a regular bucket to a new segment. // Note 1: in case we are somehow attacked with items that after the split @@ -1237,11 +1243,8 @@ void Segment::Split(HFunc&& hfn, Segment* dest_right) { if constexpr (USE_VERSION) { // Maintaining consistent versioning. - uint64_t ver = bucket_[i].GetVersion(); - uint64_t dest_ver = dest_right->bucket_[it.index].GetVersion(); - if (dest_ver < ver) { - dest_right->bucket_[it.index].SetVersion(ver); - } + uint64_t ver = bucket->GetVersion(); + dest_right->bucket_[it.index].UpdateVersion(ver); } }; @@ -1251,11 +1254,11 @@ void Segment::Split(HFunc&& hfn, Segment* dest_right) { for (unsigned i = 0; i < STASH_BUCKET_NUM; ++i) { uint32_t invalid_mask = 0; - unsigned bid = kNumBuckets + i; + unsigned bid = kRegularBucketCnt + i; Bucket& stash = bucket_[bid]; - auto cb = [&](unsigned slot, bool probe) { - auto& key = Key(bid, slot); + auto cb = [&](auto* bucket, unsigned slot, bool probe) { + auto& key = bucket->key[slot]; Hash_t hash = hfn(key); if (is_mine(hash)) { @@ -1269,18 +1272,15 @@ void Segment::Split(HFunc&& hfn, Segment* dest_right) { } invalid_mask |= (1u << slot); - auto it = dest_right->InsertUniq(std::forward(Key(bid, slot)), - std::forward(Value(bid, slot)), hash, false); + auto it = dest_right->InsertUniq(std::forward(bucket->key[slot]), + std::forward(bucket->value[slot]), hash, false); (void)it; assert(it.index != kNanBid); if constexpr (USE_VERSION) { // Update the version in the destination bucket. - uint64_t ver = stash.GetVersion(); - uint64_t dest_ver = dest_right->bucket_[it.index].GetVersion(); - if (dest_ver < ver) { - dest_right->bucket_[it.index].SetVersion(ver); - } + uint64_t ver = bucket->GetVersion(); + dest_right->bucket_[it.index].UpdateVersion(ver); } // Remove stash reference pointing to stach bucket i. @@ -1310,9 +1310,7 @@ int Segment::MoveToOther(bool own_items, unsigned from_bid, // We never decrease the version of the entry. if constexpr (USE_VERSION) { auto& dst = bucket_[to_bid]; - if (dst.GetVersion() < src.GetVersion()) { - dst.SetVersion(src.GetVersion()); - } + dst.UpdateVersion(src.GetVersion()); } src.Delete(src_slot); @@ -1382,11 +1380,11 @@ auto Segment::InsertUniq(U&& key, V&& value, Hash_t key_hash // we balance stash fill rate by starting from y % STASH_BUCKET_NUM. for (unsigned i = 0; i < STASH_BUCKET_NUM; ++i) { unsigned stash_pos = (bid + i) % STASH_BUCKET_NUM; - int stash_slot = TryInsertToBucket(kNumBuckets + stash_pos, std::forward(key), + int stash_slot = TryInsertToBucket(kRegularBucketCnt + stash_pos, std::forward(key), std::forward(value), meta_hash, false); if (stash_slot >= 0) { target.SetStashPtr(stash_pos, meta_hash, &neighbor); - return Iterator{uint8_t(kNumBuckets + stash_pos), uint8_t(stash_slot)}; + return Iterator{uint8_t(kRegularBucketCnt + stash_pos), uint8_t(stash_slot)}; } } @@ -1443,7 +1441,7 @@ std::enable_if_t Segment::CVCOnInsert(uint64_t // Important to repeat exactly the insertion logic of InsertUnique. for (unsigned i = 0; i < STASH_BUCKET_NUM; ++i) { - unsigned stash_bid = kNumBuckets + ((bid + i) % STASH_BUCKET_NUM); + unsigned stash_bid = kRegularBucketCnt + ((bid + i) % STASH_BUCKET_NUM); const Bucket& stash = bucket_[stash_bid]; if (!stash.IsFull()) { if (!stash.IsEmpty() && stash.GetVersion() < ver_threshold) @@ -1462,7 +1460,7 @@ std::enable_if_t Segment::CVCOnBump(uint64_t v unsigned bid, unsigned slot, Hash_t hash, uint8_t result_bid[2]) const { - if (bid < kNumBuckets) { + if (bid < kRegularBucketCnt) { // right now we do not migrate entries from nid to bid, only from stash to normal buckets. return 0; } @@ -1501,7 +1499,7 @@ std::enable_if_t Segment::CVCOnBump(uint64_t v assert(result == 0); - unsigned stash_pos = bid - kNumBuckets; + unsigned stash_pos = bid - kRegularBucketCnt; uint8_t fp_hash = hash & kFpMask; auto find_stash = [&](unsigned, unsigned pos) { @@ -1536,12 +1534,12 @@ std::enable_if_t Segment::CVCOnBump(uint64_t v template template bool Segment::TraverseLogicalBucket(uint8_t bid, HashFn&& hfun, Cb&& cb) const { - assert(bid < kNumBuckets); + assert(bid < kRegularBucketCnt); const Bucket& b = bucket_[bid]; bool found = false; if (b.GetProbe(false)) { // Check items that this bucket owns. - b.ForEachSlot([&](SlotId slot, bool probe) { + b.ForEachSlot([&](auto* bucket, SlotId slot, bool probe) { if (!probe) { found = true; cb(Iterator{bid, slot}); @@ -1554,10 +1552,10 @@ bool Segment::TraverseLogicalBucket(uint8_t bid, HashFn&& hf // check for probing entries in the next bucket, i.e. those that should reside in b. if (next.GetProbe(true)) { - next.ForEachSlot([&](SlotId slot, bool probe) { + next.ForEachSlot([&](auto* bucket, SlotId slot, bool probe) { if (probe) { found = true; - assert(BucketIndex(hfun(next.key[slot])) == bid); + assert(BucketIndex(hfun(bucket->key[slot])) == bid); cb(Iterator{nid, slot}); } }); @@ -1566,10 +1564,10 @@ bool Segment::TraverseLogicalBucket(uint8_t bid, HashFn&& hf // Finally go over stash buckets and find those entries that belong to b. if (b.HasStash()) { // do not bother with overflow fps. Just go over all the stash buckets. - for (uint8_t j = kNumBuckets; j < kTotalBuckets; ++j) { + for (uint8_t j = kRegularBucketCnt; j < kTotalBuckets; ++j) { const auto& stashb = bucket_[j]; - stashb.ForEachSlot([&](SlotId slot, bool probe) { - if (BucketIndex(hfun(stashb.key[slot])) == bid) { + stashb.ForEachSlot([&](auto* bucket, SlotId slot, bool probe) { + if (BucketIndex(hfun(bucket->key[slot])) == bid) { found = true; cb(Iterator{j, slot}); } @@ -1619,7 +1617,7 @@ auto Segment::BumpUp(uint8_t bid, SlotId slot, Hash_t key_ha uint8_t fp_hash = key_hash & kFpMask; assert(fp_hash == from.Fp(slot)); - if (bid < kNumBuckets) { + if (bid < kRegularBucketCnt) { // non stash case. if (slot > 0 && bp.CanBumpDown(from.key[slot - 1])) { from.Swap(slot - 1, slot); @@ -1631,7 +1629,7 @@ auto Segment::BumpUp(uint8_t bid, SlotId slot, Hash_t key_ha // stash bucket // We swap the item with the item in the "normal" bucket in the last slot. - unsigned stash_pos = bid - kNumBuckets; + unsigned stash_pos = bid - kRegularBucketCnt; // If we have an empty space for some reason just unload the stash entry. if (Iterator it = TryMoveFromStash(stash_pos, slot, key_hash); it.found()) { @@ -1648,7 +1646,7 @@ auto Segment::BumpUp(uint8_t bid, SlotId slot, Hash_t key_ha // bucket_offs - 0 if exact bucket, 1 if neighbour unsigned bucket_offs = target.UnsetStashPtr(fp_hash, stash_pos, &next); - uint8_t swap_bid = (target_bid + bucket_offs) % kNumBuckets; + uint8_t swap_bid = (target_bid + bucket_offs) % kRegularBucketCnt; auto& swapb = bucket_[swap_bid]; constexpr unsigned kLastSlot = kNumSlots - 1; @@ -1707,12 +1705,12 @@ unsigned Segment::UnloadStash(HFunc&& hfunc) { unsigned moved = 0; for (unsigned i = 0; i < STASH_BUCKET_NUM; ++i) { - unsigned bid = kNumBuckets + i; + unsigned bid = kRegularBucketCnt + i; Bucket& stash = bucket_[bid]; uint32_t invalid_mask = 0; - auto cb = [&](unsigned slot, bool probe) { - auto& key = Key(bid, slot); + auto cb = [&](auto* bucket, unsigned slot, bool probe) { + auto& key = bucket->key[slot]; Hash_t hash = hfunc(key); Iterator res = TryMoveFromStash(i, slot, hash); if (res.found()) { diff --git a/src/core/dash_test.cc b/src/core/dash_test.cc index 25ef87f80..ac46a7bf5 100644 --- a/src/core/dash_test.cc +++ b/src/core/dash_test.cc @@ -163,7 +163,7 @@ set DashTest::FillSegment(unsigned bid) { std::equal_to eq; for (Segment::Key_t key = 0; key < 1000000u; ++key) { uint64_t hash = dt_.DoHash(key); - unsigned bi = (hash >> 8) % Segment::kNumBuckets; + unsigned bi = (hash >> 8) % Segment::kRegularBucketCnt; if (bi != bid) continue; uint8_t fp = hash & 0xFF; @@ -219,7 +219,7 @@ TEST_F(DashTest, Basic) { auto hfun = &UInt64Policy::HashFn; - auto cursor = segment_.TraverseLogicalBucket((hash >> 8) % Segment::kNumBuckets, hfun, cb); + auto cursor = segment_.TraverseLogicalBucket((hash >> 8) % Segment::kRegularBucketCnt, hfun, cb); ASSERT_EQ(1, has_called); ASSERT_EQ(0, segment_.TraverseLogicalBucket(cursor, hfun, cb)); ASSERT_EQ(1, has_called); @@ -237,7 +237,7 @@ TEST_F(DashTest, Segment) { set keys = FillSegment(0); EXPECT_TRUE(segment_.GetBucket(0).IsFull() && segment_.GetBucket(1).IsFull()); - for (size_t i = 2; i < Segment::kNumBuckets; ++i) { + for (size_t i = 2; i < Segment::kRegularBucketCnt; ++i) { EXPECT_EQ(0, segment_.GetBucket(i).Size()); } EXPECT_EQ(4 * Segment::kNumSlots, keys.size()); @@ -254,10 +254,10 @@ TEST_F(DashTest, Segment) { segment_.TraverseAll(cb); ASSERT_EQ(keys.size(), has_called); - ASSERT_TRUE(segment_.GetBucket(Segment::kNumBuckets).IsFull()); + ASSERT_TRUE(segment_.GetBucket(Segment::kRegularBucketCnt).IsFull()); std::array arr; uint64_t* next = arr.begin(); - for (unsigned i = Segment::kNumBuckets; i < Segment::kNumBuckets + 2; ++i) { + for (unsigned i = Segment::kRegularBucketCnt; i < Segment::kRegularBucketCnt + 2; ++i) { const auto* k = &segment_.Key(i, 0); next = std::copy(k, k + Segment::kNumSlots, next); } @@ -311,9 +311,10 @@ TEST_F(DashTest, SegmentFull) { } TEST_F(DashTest, Split) { + // fills segment with maximum keys that must reside in bucket id 0. set keys = FillSegment(0); Segment::Value_t val; - Segment s2{2}; + Segment s2{2}; // segment with local depth 2. segment_.Split(&UInt64Policy::HashFn, &s2); unsigned sum[2] = {0}; @@ -335,8 +336,8 @@ TEST_F(DashTest, Split) { TEST_F(DashTest, BumpUp) { set keys = FillSegment(0); - constexpr unsigned kFirstStashId = Segment::kNumBuckets; - constexpr unsigned kSecondStashId = Segment::kNumBuckets + 1; + constexpr unsigned kFirstStashId = Segment::kRegularBucketCnt; + constexpr unsigned kSecondStashId = Segment::kRegularBucketCnt + 1; constexpr unsigned kNumSlots = Segment::kNumSlots; EXPECT_TRUE(segment_.GetBucket(0).IsFull()); @@ -391,7 +392,7 @@ TEST_F(DashTest, BumpPolicy) { }; set keys = FillSegment(0); - constexpr unsigned kFirstStashId = Segment::kNumBuckets; + constexpr unsigned kFirstStashId = Segment::kRegularBucketCnt; EXPECT_TRUE(segment_.GetBucket(0).IsFull()); EXPECT_TRUE(segment_.GetBucket(1).IsFull()); @@ -886,14 +887,14 @@ struct SimpleEvictPolicy { // returns number of items evicted from the table. // 0 means - nothing has been evicted. unsigned Evict(const U64Dash::HotspotBuckets& hotb, U64Dash* me) { - constexpr unsigned kNumBuckets = U64Dash::HotspotBuckets::kNumBuckets; + constexpr unsigned kRegularBucketCnt = U64Dash::HotspotBuckets::kNumBuckets; - uint32_t bid = hotb.key_hash % kNumBuckets; + uint32_t bid = hotb.key_hash % kRegularBucketCnt; unsigned slot_index = (hotb.key_hash >> 32) % U64Dash::kBucketWidth; - for (unsigned i = 0; i < kNumBuckets; ++i) { - auto it = hotb.at((bid + i) % kNumBuckets); + for (unsigned i = 0; i < kRegularBucketCnt; ++i) { + auto it = hotb.at((bid + i) % kRegularBucketCnt); it += slot_index; if (it.is_done()) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 39bc7eaaf..cbc1c1f9c 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -1313,8 +1313,8 @@ size_t DbSlice::EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* t PrimeTable::Segment_t* segment = table->prime.GetSegment(it.segment_id()); DCHECK(segment); - constexpr unsigned kNumStashBuckets = - PrimeTable::Segment_t::kTotalBuckets - PrimeTable::Segment_t::kNumBuckets; + constexpr unsigned kNumStashBuckets = PrimeTable::Segment_t::kStashBucketCnt; + constexpr unsigned kNumRegularBuckets = PrimeTable::Segment_t::kRegularBucketCnt; PrimeTable::bucket_iterator it2(it); unsigned evicted = 0; @@ -1329,7 +1329,7 @@ size_t DbSlice::EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* t }; for (unsigned i = 0; !evict_succeeded && i < kNumStashBuckets; ++i) { - unsigned stash_bid = i + PrimeTable::Segment_t::kNumBuckets; + unsigned stash_bid = i + kNumRegularBuckets; const auto& bucket = segment->GetBucket(stash_bid); if (bucket.IsEmpty()) continue; @@ -1360,8 +1360,8 @@ size_t DbSlice::EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* t // Try normal buckets now. We iterate from largest slot to smallest across the whole segment. for (int slot_id = PrimeTable::Segment_t::kNumSlots - 1; !evict_succeeded && slot_id >= 0; --slot_id) { - for (unsigned i = 0; i < PrimeTable::Segment_t::kNumBuckets; ++i) { - unsigned bid = (it.bucket_id() + i) % PrimeTable::Segment_t::kNumBuckets; + for (unsigned i = 0; i < kNumRegularBuckets; ++i) { + unsigned bid = (it.bucket_id() + i) % kNumRegularBuckets; const auto& bucket = segment->GetBucket(bid); if (!bucket.IsBusy(slot_id)) continue;