chore: introduce a Clone function for the dense set (#3740)

* chore: introduce a Clone function for the dense set

We use a state machine to prefetch data in batches.
After this change, the hot spots are predominantly inside ObjectClone and
Hash methods.

All in all benchmarks show ~45% CPU reduction:
```
BM_Clone/elements:32000    1517322 ns      1517338 ns         2772
BM_Fill/elements:32000      841087 ns       841097 ns         4900
```

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-09-19 16:14:33 +03:00 committed by GitHub
parent 3af2dfc4e7
commit abf3acec4a
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
11 changed files with 172 additions and 34 deletions

View file

@ -12,7 +12,7 @@
#include <type_traits>
#include <vector>
#include "glog/logging.h"
#include "base/logging.h"
extern "C" {
#include "redis/zmalloc.h"
@ -25,6 +25,8 @@ constexpr size_t kMinSizeShift = 2;
constexpr size_t kMinSize = 1 << kMinSizeShift;
constexpr bool kAllowDisplacements = true;
#define PREFETCH_READ(x) __builtin_prefetch(x, 0, 1)
DenseSet::IteratorBase::IteratorBase(const DenseSet* owner, bool is_end)
: owner_(const_cast<DenseSet*>(owner)), curr_entry_(nullptr) {
curr_list_ = is_end ? owner_->entries_.end() : owner_->entries_.begin();
@ -197,6 +199,54 @@ bool DenseSet::Equal(DensePtr dptr, const void* ptr, uint32_t cookie) const {
return ObjEqual(dptr.GetObject(), ptr, cookie);
}
void DenseSet::CloneBatch(unsigned len, CloneItem* items, DenseSet* other) const {
// We handle a batch of items to minimize data dependencies when accessing memory for a single
// item. We prefetch the memory for entire batch before actually reading data from any of the
// elements.
while (len) {
unsigned dest_id = 0;
// we walk "len" linked lists in parallel, and prefetch their next, obj pointers
// before actually processing them.
for (unsigned i = 0; i < len; ++i) {
auto& src = items[i];
if (src.obj) {
// The majority of the CPU is spent in this block.
void* new_obj = other->ObjectClone(src.obj, src.has_ttl);
uint64_t hash = Hash(src.obj, 0);
other->AddUnique(new_obj, src.has_ttl, hash);
src.obj = nullptr;
}
const DenseLinkKey* link = src.link;
if (link) {
src.link = nullptr;
auto& dest = items[dest_id++];
DCHECK(!link->next.IsEmpty());
if (src.fetch_tail) {
// switch to the final state.
DCHECK(link->next.IsObject());
dest.obj = link->next.Raw();
src.fetch_tail = false;
} else {
dest.obj = link->Raw();
if (link->next.IsObject()) {
// next state - pre-terminal, fetch the final object.
dest.fetch_tail = true;
dest.link = link;
} else {
dest.link = link->next.AsLink();
PREFETCH_READ(dest.link);
}
}
PREFETCH_READ(dest.obj);
}
}
// update the length of the batch for the next iteration.
len = dest_id;
}
}
bool DenseSet::NoItemBelongsBucket(uint32_t bid) const {
auto& entries = const_cast<DenseSet*>(this)->entries_;
DensePtr* curr = &entries[bid];
@ -263,6 +313,41 @@ void DenseSet::Reserve(size_t sz) {
}
}
void DenseSet::Fill(DenseSet* other) const {
DCHECK(other->entries_.empty());
other->Reserve(UpperBoundSize());
constexpr unsigned kArrLen = 32;
CloneItem arr[kArrLen];
unsigned len = 0;
for (auto it = entries_.begin(); it != entries_.end(); ++it) {
const DensePtr* ptr = &(*it);
if (!ptr->IsEmpty()) {
arr[len].has_ttl = ptr->HasTtl();
if (ptr->IsObject()) {
arr[len].link = nullptr;
arr[len].obj = ptr->Raw();
PREFETCH_READ(arr[len].obj);
} else {
arr[len].link = ptr->AsLink();
arr[len].obj = nullptr;
PREFETCH_READ(arr[len].link);
}
++len;
if (len == kArrLen) {
CloneBatch(kArrLen, arr, other);
len = 0;
}
}
}
CloneBatch(len, arr, other);
}
void DenseSet::Grow(size_t prev_size) {
// perform rehashing of items in the set
for (long i = prev_size - 1; i >= 0; --i) {

View file

@ -244,6 +244,8 @@ class DenseSet {
uint32_t Scan(uint32_t cursor, const ItemCb& cb) const;
void Reserve(size_t sz);
void Fill(DenseSet* other) const;
// set an abstract time that allows expiry.
void set_time(uint32_t val) {
time_now_ = val;
@ -264,6 +266,7 @@ class DenseSet {
virtual size_t ObjectAllocSize(const void* obj) const = 0;
virtual uint32_t ObjExpireTime(const void* obj) const = 0;
virtual void ObjDelete(void* obj, bool has_ttl) const = 0;
virtual void* ObjectClone(const void* obj, bool has_ttl) const = 0;
void CollectExpired();
@ -324,6 +327,15 @@ class DenseSet {
bool Equal(DensePtr dptr, const void* ptr, uint32_t cookie) const;
struct CloneItem {
const DenseLinkKey* link = nullptr;
void* obj = nullptr;
bool has_ttl = false;
bool fetch_tail = false;
};
void CloneBatch(unsigned len, CloneItem* items, DenseSet* other) const;
MemoryResource* mr() {
return entries_.get_allocator().resource();
}

View file

@ -133,6 +133,10 @@ void ScoreMap::ObjDelete(void* obj, bool has_ttl) const {
sdsfree(s1);
}
void* ScoreMap::ObjectClone(const void* obj, bool has_ttl) const {
return nullptr;
}
detail::SdsScorePair ScoreMap::iterator::BreakToPair(void* obj) {
sds f = (sds)obj;
return detail::SdsScorePair(f, GetValue(f));

View file

@ -123,6 +123,7 @@ class ScoreMap : public DenseSet {
size_t ObjectAllocSize(const void* obj) const final;
uint32_t ObjExpireTime(const void* obj) const final;
void ObjDelete(void* obj, bool has_ttl) const final;
void* ObjectClone(const void* obj, bool has_ttl) const final;
};
} // namespace dfly

View file

@ -283,6 +283,10 @@ void StringMap::ObjDelete(void* obj, bool has_ttl) const {
sdsfree(s1);
}
void* StringMap::ObjectClone(const void* obj, bool has_ttl) const {
return nullptr;
}
detail::SdsPair StringMap::iterator::BreakToPair(void* obj) {
sds f = (sds)obj;
return detail::SdsPair(f, GetValue(f));

View file

@ -158,6 +158,7 @@ class StringMap : public DenseSet {
size_t ObjectAllocSize(const void* obj) const final;
uint32_t ObjExpireTime(const void* obj) const final;
void ObjDelete(void* obj, bool has_ttl) const final;
void* ObjectClone(const void* obj, bool has_ttl) const final;
};
} // namespace dfly

View file

@ -133,4 +133,18 @@ void StringSet::ObjDelete(void* obj, bool has_ttl) const {
sdsfree((sds)obj);
}
void* StringSet::ObjectClone(const void* obj, bool has_ttl) const {
sds src = (sds)obj;
if (has_ttl) {
size_t slen = sdslen(src);
char* ttlptr = src + slen + 1;
uint32_t at = absl::little_endian::Load32(ttlptr);
sds newsds = AllocImmutableWithTtl(slen, at);
if (slen)
memcpy(newsds, src, slen);
return newsds;
}
return sdsnewlen(src, sdslen(src));
}
} // namespace dfly

View file

@ -112,6 +112,7 @@ class StringSet : public DenseSet {
size_t ObjectAllocSize(const void* s1) const override;
uint32_t ObjExpireTime(const void* obj) const override;
void ObjDelete(void* obj, bool has_ttl) const override;
void* ObjectClone(const void* obj, bool has_ttl) const override;
};
} // end namespace dfly

View file

@ -68,6 +68,7 @@ class StringSetTest : public ::testing::Test {
void SetUp() override {
ss_ = new StringSet(&alloc_);
generator_.seed(0);
}
void TearDown() override {
@ -80,6 +81,7 @@ class StringSetTest : public ::testing::Test {
StringSet* ss_;
DenseSetAllocator alloc_;
mt19937 generator_;
};
TEST_F(StringSetTest, Basic) {
@ -133,18 +135,14 @@ static string random_string(mt19937& rand, unsigned len) {
TEST_F(StringSetTest, Resizing) {
constexpr size_t num_strs = 4096;
// pseudo random deterministic sequence with known seed should produce
// the same sequence on all systems
mt19937 rand(0);
vector<string> strs;
while (strs.size() != num_strs) {
auto str = random_string(rand, 10);
auto str = random_string(generator_, 10);
if (find(strs.begin(), strs.end(), str) != strs.end()) {
continue;
}
strs.push_back(random_string(rand, 10));
strs.push_back(random_string(generator_, 10));
}
for (size_t i = 0; i < num_strs; ++i) {
@ -240,12 +238,11 @@ TEST_F(StringSetTest, IntOnly) {
EXPECT_FALSE(ss_->Add(to_string(i)));
}
mt19937 generator(0);
size_t num_remove = generator() % 4096;
size_t num_remove = generator_() % 4096;
unordered_set<string> removed;
for (size_t i = 0; i < num_remove; ++i) {
auto remove_int = generator() % num_ints;
auto remove_int = generator_() % num_ints;
auto remove = to_string(remove_int);
if (numbers.count(remove_int)) {
ASSERT_TRUE(ss_->Contains(remove)) << remove_int;
@ -273,7 +270,7 @@ TEST_F(StringSetTest, IntOnly) {
do {
cursor = ss_->Scan(cursor, scan_callback);
// randomly throw in some new numbers
uint32_t val = generator();
uint32_t val = generator_();
VLOG(1) << "Val " << val;
ss_->Add(to_string(val));
} while (cursor != 0);
@ -284,19 +281,18 @@ TEST_F(StringSetTest, IntOnly) {
TEST_F(StringSetTest, XtremeScanGrow) {
unordered_set<string> to_see, force_grow, seen;
mt19937 generator(0);
while (to_see.size() != 8) {
to_see.insert(random_string(generator, 10));
to_see.insert(random_string(generator_, 10));
}
while (force_grow.size() != 8192) {
string str = random_string(generator, 10);
string str = random_string(generator_, 10);
if (to_see.count(str)) {
continue;
}
force_grow.insert(random_string(generator, 10));
force_grow.insert(random_string(generator_, 10));
}
for (auto& str : to_see) {
@ -329,10 +325,8 @@ TEST_F(StringSetTest, Pop) {
constexpr size_t num_items = 8;
unordered_set<string> to_insert;
mt19937 generator(0);
while (to_insert.size() != num_items) {
auto str = random_string(generator, 10);
auto str = random_string(generator_, 10);
if (to_insert.count(str)) {
continue;
}
@ -364,10 +358,8 @@ TEST_F(StringSetTest, Iteration) {
constexpr size_t num_items = 8192;
unordered_set<string> to_insert;
mt19937 generator(0);
while (to_insert.size() != num_items) {
auto str = random_string(generator, 10);
auto str = random_string(generator_, 10);
if (to_insert.count(str)) {
continue;
}
@ -418,12 +410,10 @@ TEST_F(StringSetTest, Ttl) {
}
TEST_F(StringSetTest, Grow) {
mt19937 generator(0);
for (size_t j = 0; j < 10; ++j) {
for (size_t i = 0; i < 4098; ++i) {
ss_->Reserve(generator() % 256);
auto str = random_string(generator, 3);
ss_->Reserve(generator_() % 256);
auto str = random_string(generator_, 3);
ss_->Add(str);
}
ss_->Clear();
@ -432,10 +422,9 @@ TEST_F(StringSetTest, Grow) {
TEST_F(StringSetTest, Reserve) {
vector<string> strs;
mt19937 generator(0);
for (size_t i = 0; i < 10; ++i) {
strs.push_back(random_string(generator, 10));
strs.push_back(random_string(generator_, 10));
ss_->Add(strs.back());
}
@ -447,6 +436,18 @@ TEST_F(StringSetTest, Reserve) {
}
}
TEST_F(StringSetTest, Fill) {
for (size_t i = 0; i < 100; ++i) {
ss_->Add(random_string(generator_, 10));
}
StringSet s2;
ss_->Fill(&s2);
EXPECT_EQ(s2.UpperBoundSize(), ss_->UpperBoundSize());
for (sds str : *ss_) {
EXPECT_TRUE(s2.Contains(str));
}
}
TEST_F(StringSetTest, IterateEmpty) {
for (const auto& s : *ss_) {
// We're iterating to make sure there is no crash. However, if we got here, it's a bug
@ -458,7 +459,8 @@ void BM_Clone(benchmark::State& state) {
vector<string> strs;
mt19937 generator(0);
StringSet ss1, ss2;
for (size_t i = 0; i < 2000; ++i) {
unsigned elems = state.range(0);
for (size_t i = 0; i < elems; ++i) {
string str = random_string(generator, 10);
ss1.Add(str);
}
@ -473,6 +475,25 @@ void BM_Clone(benchmark::State& state) {
state.ResumeTiming();
}
}
BENCHMARK(BM_Clone);
BENCHMARK(BM_Clone)->ArgName("elements")->Arg(32000);
void BM_Fill(benchmark::State& state) {
unsigned elems = state.range(0);
vector<string> strs;
mt19937 generator(0);
StringSet ss1, ss2;
for (size_t i = 0; i < elems; ++i) {
string str = random_string(generator, 10);
ss1.Add(str);
}
while (state.KeepRunning()) {
ss1.Fill(&ss2);
state.PauseTiming();
ss2.Clear();
state.ResumeTiming();
}
}
BENCHMARK(BM_Fill)->ArgName("elements")->Arg(32000);
} // namespace dfly

View file

@ -169,10 +169,6 @@ sds sdsnewlen(const void *init, size_t initlen) {
return _sdsnewlen(init, initlen, 0);
}
sds sdstrynewlen(const void *init, size_t initlen) {
return _sdsnewlen(init, initlen, 1);
}
/* Create an empty (zero length) sds string. Even in this case the string
* always has an implicit null term. */
sds sdsempty(void) {

View file

@ -216,7 +216,6 @@ static inline void sdssetalloc(sds s, size_t newlen) {
}
sds sdsnewlen(const void *init, size_t initlen);
sds sdstrynewlen(const void *init, size_t initlen);
sds sdsnew(const char *init);
sds sdsempty(void);
sds sdsdup(const sds s);