diff --git a/src/core/dense_set.cc b/src/core/dense_set.cc index 796f70a63..76036cdba 100644 --- a/src/core/dense_set.cc +++ b/src/core/dense_set.cc @@ -549,6 +549,11 @@ void DenseSet::AddUnique(void* obj, bool has_ttl, uint64_t hashcode) { ++size_; } +void DenseSet::Prefetch(uint64_t hash) { + uint32_t bid = BucketId(hash); + PREFETCH_READ(&entries_[bid]); +} + auto DenseSet::Find2(const void* ptr, uint32_t bid, uint32_t cookie) -> tuple { DCHECK_LT(bid, entries_.size()); @@ -563,19 +568,23 @@ auto DenseSet::Find2(const void* ptr, uint32_t bid, uint32_t cookie) // first look for displaced nodes since this is quicker than iterating a potential long chain if (bid > 0) { curr = &entries_[bid - 1]; - ExpireIfNeeded(nullptr, curr); + if (curr->IsDisplaced() && curr->GetDisplacedDirection() == -1) { + ExpireIfNeeded(nullptr, curr); - if (Equal(*curr, ptr, cookie)) { - return {bid - 1, nullptr, curr}; + if (Equal(*curr, ptr, cookie)) { + return {bid - 1, nullptr, curr}; + } } } if (bid + 1 < entries_.size()) { curr = &entries_[bid + 1]; - ExpireIfNeeded(nullptr, curr); + if (curr->IsDisplaced() && curr->GetDisplacedDirection() == 1) { + ExpireIfNeeded(nullptr, curr); - if (Equal(*curr, ptr, cookie)) { - return {bid + 1, nullptr, curr}; + if (Equal(*curr, ptr, cookie)) { + return {bid + 1, nullptr, curr}; + } } } diff --git a/src/core/dense_set.h b/src/core/dense_set.h index 866a19914..adcc0e8f3 100644 --- a/src/core/dense_set.h +++ b/src/core/dense_set.h @@ -208,6 +208,7 @@ class DenseSet { public: using MemoryResource = PMR_NS::memory_resource; + static constexpr uint32_t kMaxBatchLen = 32; explicit DenseSet(MemoryResource* mr = PMR_NS::get_default_resource()); virtual ~DenseSet(); @@ -317,6 +318,8 @@ class DenseSet { // Assumes that the object does not exist in the set. void AddUnique(void* obj, bool has_ttl, uint64_t hashcode); + void Prefetch(uint64_t hash); + private: DenseSet(const DenseSet&) = delete; DenseSet& operator=(DenseSet&) = delete; diff --git a/src/core/string_set.cc b/src/core/string_set.cc index 502a12a34..800d0136f 100644 --- a/src/core/string_set.cc +++ b/src/core/string_set.cc @@ -51,6 +51,42 @@ bool StringSet::Add(string_view src, uint32_t ttl_sec) { return true; } +unsigned StringSet::AddMany(absl::Span span, uint32_t ttl_sec) { + uint64_t hash[kMaxBatchLen]; + string_view* data = span.data(); + bool has_ttl = ttl_sec != UINT32_MAX; + size_t count = span.size(); + unsigned res = 0; + + if (BucketCount() < count) { + Reserve(count); + } + while (count >= kMaxBatchLen) { + for (unsigned i = 0; i < kMaxBatchLen; ++i) { + hash[i] = CompactObj::HashCode(data[i]); + Prefetch(hash[i]); + } + + for (unsigned i = 0; i < kMaxBatchLen; ++i) { + void* prev = FindInternal(data + i, hash[i], 1); + if (prev == nullptr) { + ++res; + sds field = MakeSetSds(data[i], ttl_sec); + AddUnique(field, has_ttl, hash[i]); + } + } + + count -= kMaxBatchLen; + data += kMaxBatchLen; + res += kMaxBatchLen; + } + + for (unsigned i = 0; i < count; ++i) { + res += Add(data[i], ttl_sec); + } + return res; +} + std::optional StringSet::Pop() { sds str = (sds)PopInternal(); diff --git a/src/core/string_set.h b/src/core/string_set.h index 2e8d73c96..4907f3a9b 100644 --- a/src/core/string_set.h +++ b/src/core/string_set.h @@ -4,10 +4,11 @@ #pragma once +#include + #include #include #include -#include #include #include "core/dense_set.h" @@ -28,6 +29,8 @@ class StringSet : public DenseSet { // Returns true if elem was added. bool Add(std::string_view s1, uint32_t ttl_sec = UINT32_MAX); + unsigned AddMany(absl::Span span, uint32_t ttl_sec); + bool Erase(std::string_view str) { return EraseInternal(&str, 1); } diff --git a/src/core/string_set_test.cc b/src/core/string_set_test.cc index 187dcbd6c..200e840ba 100644 --- a/src/core/string_set_test.cc +++ b/src/core/string_set_test.cc @@ -552,4 +552,34 @@ void BM_Add(benchmark::State& state) { } BENCHMARK(BM_Add); +void BM_AddMany(benchmark::State& state) { + vector strs; + mt19937 generator(0); + StringSet ss; + unsigned elems = 100000; + for (size_t i = 0; i < elems; ++i) { + string str = random_string(generator, 16); + strs.push_back(str); + } + ss.Reserve(elems); + array str_views; + + while (state.KeepRunning()) { + unsigned offset = 0; + while (offset < elems) { + unsigned len = min(elems - offset, 32u); + for (size_t i = 0; i < len; ++i) { + str_views[i] = strs[offset + i]; + } + offset += len; + ss.AddMany({str_views.data(), len}, UINT32_MAX); + } + state.PauseTiming(); + ss.Clear(); + ss.Reserve(elems); + state.ResumeTiming(); + } +} +BENCHMARK(BM_AddMany); + } // namespace dfly diff --git a/src/server/set_family.cc b/src/server/set_family.cc index c914bf7c9..f0792f1ef 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -93,8 +93,24 @@ struct StringSetWrapper { unsigned Add(const NewEntries& entries, uint32_t ttl_sec) const { unsigned res = 0; - for (string_view member : EntriesRange(entries)) - res += ss->Add(member, ttl_sec); + string_view members[StringSet::kMaxBatchLen]; + size_t entries_len = std::visit([](const auto& e) { return e.size(); }, entries); + unsigned len = 0; + if (ss->BucketCount() < entries_len) { + ss->Reserve(entries_len); + } + for (string_view member : EntriesRange(entries)) { + members[len++] = member; + if (len == StringSet::kMaxBatchLen) { + res += ss->AddMany({members, StringSet::kMaxBatchLen}, ttl_sec); + len = 0; + } + } + + if (len) { + res += ss->AddMany({members, len}, ttl_sec); + } + return res; }