mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
chore: performance improvements around zset (#1928)
1. ExpireIfNeeded was unjuistifiedly high in the profile topk table. Lets make the initial condition within an inline to reduce its impact. 2. Reserve space for hmap/zset if multiple items are added. Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
9a09b793b4
commit
00ee506cc8
7 changed files with 76 additions and 14 deletions
|
@ -229,11 +229,13 @@ auto DenseSet::FindEmptyAround(uint32_t bid) -> ChainVectorIterator {
|
|||
}
|
||||
|
||||
void DenseSet::Reserve(size_t sz) {
|
||||
sz = std::min<size_t>(sz, kMinSize);
|
||||
sz = std::max<size_t>(sz, kMinSize);
|
||||
|
||||
sz = absl::bit_ceil(sz);
|
||||
capacity_log_ = absl::bit_width(sz);
|
||||
entries_.reserve(sz);
|
||||
if (sz > entries_.size()) {
|
||||
entries_.resize(sz);
|
||||
capacity_log_ = absl::bit_width(sz) - 1;
|
||||
}
|
||||
}
|
||||
|
||||
void DenseSet::Grow() {
|
||||
|
@ -622,20 +624,21 @@ auto DenseSet::NewLink(void* data, DensePtr next) -> DenseLinkKey* {
|
|||
return lk;
|
||||
}
|
||||
|
||||
bool DenseSet::ExpireIfNeeded(DensePtr* prev, DensePtr* node) const {
|
||||
bool DenseSet::ExpireIfNeededInternal(DensePtr* prev, DensePtr* node) const {
|
||||
DCHECK(node != nullptr);
|
||||
DCHECK(node->HasTtl());
|
||||
|
||||
bool deleted = false;
|
||||
while (node->HasTtl()) {
|
||||
do {
|
||||
uint32_t obj_time = ObjExpireTime(node->GetObject());
|
||||
if (obj_time > time_now_) {
|
||||
break;
|
||||
}
|
||||
|
||||
// updates the node to next item if relevant.
|
||||
// updates the *node to next item if relevant or resets it to empty.
|
||||
const_cast<DenseSet*>(this)->Delete(prev, node);
|
||||
deleted = true;
|
||||
}
|
||||
} while (node->HasTtl());
|
||||
|
||||
return deleted;
|
||||
}
|
||||
|
|
|
@ -331,8 +331,15 @@ class DenseSet {
|
|||
mr()->deallocate(plink, sizeof(DenseLinkKey), alignof(DenseLinkKey));
|
||||
}
|
||||
|
||||
// Returns true if *ptr was deleted.
|
||||
bool ExpireIfNeeded(DensePtr* prev, DensePtr* ptr) const;
|
||||
// Returns true if *node was deleted.
|
||||
bool ExpireIfNeeded(DensePtr* prev, DensePtr* node) const {
|
||||
if (node->HasTtl()) {
|
||||
return ExpireIfNeededInternal(prev, node);
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
bool ExpireIfNeededInternal(DensePtr* prev, DensePtr* node) const;
|
||||
|
||||
// Deletes the object pointed by ptr and removes it from the set.
|
||||
// If ptr is a link then it will be deleted internally.
|
||||
|
|
|
@ -542,7 +542,7 @@ SortedMap::ScoredArray SortedMap::DfImpl::GetRange(const zrangespec& range, unsi
|
|||
double score = GetObjScore(ele);
|
||||
if (range.min > score || (range.min == score && range.minex))
|
||||
break;
|
||||
arr.emplace_back(string{(sds)ele, sdslen((sds)ele)}, GetObjScore(ele));
|
||||
arr.emplace_back(string{(sds)ele, sdslen((sds)ele)}, score);
|
||||
if (!path.Prev())
|
||||
break;
|
||||
}
|
||||
|
@ -557,17 +557,28 @@ SortedMap::ScoredArray SortedMap::DfImpl::GetRange(const zrangespec& range, unsi
|
|||
return arr;
|
||||
}
|
||||
|
||||
auto path2 = path;
|
||||
size_t num_elems = 0;
|
||||
|
||||
// Count the number of elements in the range.
|
||||
while (limit--) {
|
||||
ScoreSds ele = path.Terminal();
|
||||
|
||||
double score = GetObjScore(ele);
|
||||
if (range.max < score || (range.max == score && range.maxex))
|
||||
break;
|
||||
|
||||
arr.emplace_back(string{(sds)ele, sdslen((sds)ele)}, GetObjScore(ele));
|
||||
++num_elems;
|
||||
if (!path.Next())
|
||||
break;
|
||||
}
|
||||
|
||||
// reserve enough space.
|
||||
arr.resize(num_elems);
|
||||
for (size_t i = 0; i < num_elems; ++i) {
|
||||
ScoreSds ele = path2.Terminal();
|
||||
arr[i] = {string{(sds)ele, sdslen((sds)ele)}, GetObjScore(ele)};
|
||||
path2.Next();
|
||||
}
|
||||
}
|
||||
|
||||
return arr;
|
||||
|
|
|
@ -170,7 +170,7 @@ class SortedMap {
|
|||
size_t MallocSize() const;
|
||||
|
||||
bool Reserve(size_t sz) {
|
||||
return dictExpand(dict, 1) == DICT_OK;
|
||||
return dictExpand(dict, sz) == DICT_OK;
|
||||
}
|
||||
|
||||
size_t DeleteRangeByRank(unsigned start, unsigned end) {
|
||||
|
|
|
@ -56,6 +56,7 @@ void SinkReplyBuilder::Send(const iovec* v, uint32_t len) {
|
|||
|
||||
// Allow batching with up to kMaxBatchSize of data.
|
||||
if ((should_batch_ || should_aggregate_) && (batch_.size() + bsize < kMaxBatchSize)) {
|
||||
batch_.reserve(batch_.size() + bsize);
|
||||
for (unsigned i = 0; i < len; ++i) {
|
||||
std::string_view src((char*)v[i].iov_base, v[i].iov_len);
|
||||
DVLOG(3) << "Appending to stream " << absl::CHexEscape(src);
|
||||
|
|
|
@ -9,6 +9,7 @@ extern "C" {
|
|||
#include "redis/object.h"
|
||||
#include "redis/redis_aux.h"
|
||||
#include "redis/util.h"
|
||||
#include "redis/zmalloc.h"
|
||||
}
|
||||
|
||||
#include "base/logging.h"
|
||||
|
@ -106,6 +107,14 @@ pair<uint8_t*, bool> LpInsert(uint8_t* lp, string_view field, string_view val, b
|
|||
return make_pair(lp, !updated);
|
||||
}
|
||||
|
||||
size_t EstimateListpackMinBytes(CmdArgList members) {
|
||||
size_t bytes = 0;
|
||||
for (const auto& member : members) {
|
||||
bytes += (member.size() + 1); // string + at least 1 byte for string header.
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
size_t HMapLength(const DbContext& db_cntx, const CompactObj& co) {
|
||||
void* ptr = co.RObjPtr();
|
||||
if (co.Encoding() == kEncodingStrMap2) {
|
||||
|
@ -653,6 +662,11 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
|
|||
|
||||
if (lp) {
|
||||
bool inserted;
|
||||
size_t malloc_reserved = zmalloc_size(lp);
|
||||
size_t min_sz = EstimateListpackMinBytes(values);
|
||||
if (min_sz > malloc_reserved) {
|
||||
lp = (uint8_t*)zrealloc(lp, min_sz);
|
||||
}
|
||||
for (size_t i = 0; i < values.size(); i += 2) {
|
||||
tie(lp, inserted) = LpInsert(lp, ArgS(values, i), ArgS(values, i + 1), op_sp.skip_if_exists);
|
||||
created += inserted;
|
||||
|
@ -662,7 +676,7 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
|
|||
} else {
|
||||
DCHECK_EQ(kEncodingStrMap2, pv.Encoding()); // Dictionary
|
||||
StringMap* sm = GetStringMap(pv, op_args.db_cntx);
|
||||
|
||||
sm->Reserve(values.size() / 2);
|
||||
bool added;
|
||||
|
||||
for (size_t i = 0; i < values.size(); i += 2) {
|
||||
|
|
|
@ -13,6 +13,7 @@ extern "C" {
|
|||
#include "redis/object.h"
|
||||
#include "redis/redis_aux.h"
|
||||
#include "redis/util.h"
|
||||
#include "redis/zmalloc.h"
|
||||
#include "redis/zset.h"
|
||||
}
|
||||
|
||||
|
@ -915,6 +916,14 @@ struct AddResult {
|
|||
bool is_nan = false;
|
||||
};
|
||||
|
||||
size_t EstimateListpackMinBytes(ScoredMemberSpan members) {
|
||||
size_t bytes = members.size() * 2; // at least 2 bytes per score;
|
||||
for (const auto& member : members) {
|
||||
bytes += (member.second.size() + 1); // string + at least 1 byte for string header.
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
OpResult<AddResult> OpAdd(const OpArgs& op_args, const ZParams& zparams, string_view key,
|
||||
ScoredMemberSpan members) {
|
||||
DCHECK(!members.empty() || zparams.override);
|
||||
|
@ -948,6 +957,23 @@ OpResult<AddResult> OpAdd(const OpArgs& op_args, const ZParams& zparams, string_
|
|||
AddResult aresult;
|
||||
detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper();
|
||||
bool is_list_pack = robj_wrapper->encoding() == OBJ_ENCODING_LISTPACK;
|
||||
|
||||
// opportunistically reserve space if multiple entries are about to be added.
|
||||
if ((zparams.flags & ZADD_IN_XX) == 0 && members.size() > 2) {
|
||||
if (is_list_pack) {
|
||||
uint8_t* zl = (uint8_t*)robj_wrapper->inner_obj();
|
||||
size_t malloc_reserved = zmalloc_size(zl);
|
||||
size_t min_sz = EstimateListpackMinBytes(members);
|
||||
if (min_sz > malloc_reserved) {
|
||||
zl = (uint8_t*)zrealloc(zl, min_sz);
|
||||
robj_wrapper->set_inner_obj(zl);
|
||||
}
|
||||
} else {
|
||||
detail::SortedMap* sm = (detail::SortedMap*)robj_wrapper->inner_obj();
|
||||
sm->Reserve(members.size());
|
||||
}
|
||||
}
|
||||
|
||||
for (size_t j = 0; j < members.size(); j++) {
|
||||
const auto& m = members[j];
|
||||
tmp_str = sdscpylen(tmp_str, m.second.data(), m.second.size());
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue