chore(strings): Simplify Set flow (#164)

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2022-06-19 23:19:42 +03:00 committed by GitHub
parent da3ae760d5
commit 6b7d2a22df
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
10 changed files with 73 additions and 51 deletions

View file

@ -17,8 +17,6 @@ option(BUILD_SHARED_LIBS "Build shared libraries" OFF)
option(DF_USE_SSL "Provide support for SSL connections" ON) option(DF_USE_SSL "Provide support for SSL connections" ON)
include(third_party) include(third_party)
message(STATUS "after thirdpary")
include(internal) include(internal)
include_directories(src) include_directories(src)

View file

@ -708,6 +708,9 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio
assert(seg_id < segment_.size()); assert(seg_id < segment_.size());
SegmentType* target = segment_[seg_id]; SegmentType* target = segment_[seg_id];
// Load heap allocated segment data - to avoid TLB miss when accessing the bucket.
__builtin_prefetch(target, 0, 1);
auto [it, res] = auto [it, res] =
target->Insert(std::forward<U>(key), std::forward<V>(value), key_hash, EqPred()); target->Insert(std::forward<U>(key), std::forward<V>(value), key_hash, EqPred());

View file

@ -1055,6 +1055,10 @@ auto Segment<Key, Value, Policy>::FindIt(U&& key, Hash_t key_hash, Pred&& cf) co
uint8_t bidx = BucketIndex(key_hash); uint8_t bidx = BucketIndex(key_hash);
const Bucket& target = bucket_[bidx]; const Bucket& target = bucket_[bidx];
// It helps a bit (10% on my home machine) and more importantly, it does not hurt
// since we are going to access this memory in a bit.
__builtin_prefetch(&target);
uint8_t fp_hash = key_hash & kFpMask; uint8_t fp_hash = key_hash & kFpMask;
SlotId sid = target.FindByFp(fp_hash, false, key, cf); SlotId sid = target.FindByFp(fp_hash, false, key, cf);
if (sid != BucketType::kNanSlot) { if (sid != BucketType::kNanSlot) {

View file

@ -23,7 +23,9 @@ extern "C" {
#include "redis/zmalloc.h" #include "redis/zmalloc.h"
} }
#if defined(__clang__)
#pragma clang diagnostic ignored "-Wunused-const-variable" #pragma clang diagnostic ignored "-Wunused-const-variable"
#endif
namespace dfly { namespace dfly {
@ -1027,7 +1029,7 @@ static void BM_Insert(benchmark::State& state) {
} }
} }
} }
BENCHMARK(BM_Insert)->Arg(1000)->Arg(10000)->Arg(100000); BENCHMARK(BM_Insert)->Arg(10000)->Arg(100000)->Arg(1000000);
struct NoDestroySdsPolicy : public SdsDashPolicy { struct NoDestroySdsPolicy : public SdsDashPolicy {
static void DestroyKey(sds s) { static void DestroyKey(sds s) {
@ -1109,7 +1111,7 @@ static void BM_RedisDictInsert(benchmark::State& state) {
dictRelease(d); dictRelease(d);
} }
} }
BENCHMARK(BM_RedisDictInsert)->Arg(1000)->Arg(10000)->Arg(100000); BENCHMARK(BM_RedisDictInsert)->Arg(10000)->Arg(100000)->Arg(1000000);
static void BM_RedisStringInsert(benchmark::State& state) { static void BM_RedisStringInsert(benchmark::State& state) {
unsigned count = state.range(0); unsigned count = state.range(0);

View file

@ -622,7 +622,7 @@ int sdsull2str(char *s, unsigned long long v) {
* sdscatprintf(sdsempty(),"%lld\n", value); * sdscatprintf(sdsempty(),"%lld\n", value);
*/ */
sds sdsfromlonglong(long long value) { sds sdsfromlonglong(long long value) {
char buf[SDS_LLSTR_SIZE]; char buf[SDS_LLSTR_SIZE + 10];
int len = sdsll2str(buf,value); int len = sdsll2str(buf,value);
return sdsnewlen(buf,len); return sdsnewlen(buf,len);

View file

@ -278,18 +278,23 @@ OpResult<pair<PrimeIterator, unsigned>> DbSlice::FindFirst(DbIndex db_index, Arg
return OpStatus::KEY_NOTFOUND; return OpStatus::KEY_NOTFOUND;
} }
auto DbSlice::AddOrFind(DbIndex db_index, string_view key) noexcept(false) pair<PrimeIterator, bool> DbSlice::AddOrFind(DbIndex db_index, string_view key) noexcept(false) {
-> pair<PrimeIterator, bool> { auto res = AddOrFind2(db_index, key);
return make_pair(get<0>(res), get<2>(res));
}
tuple<PrimeIterator, ExpireIterator, bool> DbSlice::AddOrFind2(DbIndex db_index,
std::string_view key) noexcept(false) {
DCHECK(IsDbValid(db_index)); DCHECK(IsDbValid(db_index));
auto& db = db_arr_[db_index]; auto& db = db_arr_[db_index];
// If we have some registered onchange callbacks, we must know in advance whether its Find or Add. // If we have some registered onchange callbacks, we must know in advance whether its Find or Add.
if (!change_cb_.empty()) { if (!change_cb_.empty()) {
auto it = FindExt(db_index, key).first; auto res = FindExt(db_index, key);
if (IsValid(it)) { if (IsValid(res.first)) {
return make_pair(it, true); return tuple_cat(res, make_tuple(true));
} }
// It's a new entry. // It's a new entry.
@ -326,15 +331,16 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) noexcept(false)
it.SetVersion(NextVersion()); it.SetVersion(NextVersion());
memory_budget_ = evp.mem_budget(); memory_budget_ = evp.mem_budget();
return make_pair(it, true); return make_tuple(it, ExpireIterator{}, true);
} }
auto& existing = it; auto& existing = it;
DCHECK(IsValid(existing)); DCHECK(IsValid(existing));
ExpireIterator expire_it;
if (existing->second.HasExpire()) { if (existing->second.HasExpire()) {
auto expire_it = db->expire.Find(existing->first); expire_it = db->expire.Find(existing->first);
CHECK(IsValid(expire_it)); CHECK(IsValid(expire_it));
// TODO: to implement the incremental update of expiry values using multi-generation // TODO: to implement the incremental update of expiry values using multi-generation
@ -357,11 +363,11 @@ auto DbSlice::AddOrFind(DbIndex db_index, string_view key) noexcept(false)
existing->second.Reset(); existing->second.Reset();
events_.expired_keys++; events_.expired_keys++;
return make_pair(existing, true); return make_tuple(existing, ExpireIterator{}, true);
} }
} }
return make_pair(existing, false); return make_tuple(existing, expire_it, false);
} }
void DbSlice::ActivateDb(DbIndex db_ind) { void DbSlice::ActivateDb(DbIndex db_ind) {
@ -429,7 +435,7 @@ void DbSlice::FlushDb(DbIndex db_ind) {
} }
// Returns true if a state has changed, false otherwise. // Returns true if a state has changed, false otherwise.
bool DbSlice::Expire(DbIndex db_ind, PrimeIterator it, uint64_t at) { bool DbSlice::UpdateExpire(DbIndex db_ind, PrimeIterator it, uint64_t at) {
auto& db = *db_arr_[db_ind]; auto& db = *db_arr_[db_ind];
if (at == 0 && it->second.HasExpire()) { if (at == 0 && it->second.HasExpire()) {
CHECK_EQ(1u, db.expire.Erase(it->first)); CHECK_EQ(1u, db.expire.Erase(it->first));
@ -485,19 +491,15 @@ pair<PrimeIterator, bool> DbSlice::AddOrFind(DbIndex db_ind, string_view key, Pr
return res; return res;
auto& db = *db_arr_[db_ind]; auto& db = *db_arr_[db_ind];
auto& new_it = res.first; auto& it = res.first;
size_t value_heap_size = obj.MallocUsed(); it->second = std::move(obj);
db.stats.obj_memory_usage += value_heap_size; PostUpdate(db_ind, it);
if (obj.ObjType() == OBJ_STRING)
db.stats.strval_memory_usage += value_heap_size;
new_it->second = std::move(obj);
if (expire_at_ms) { if (expire_at_ms) {
new_it->second.SetExpire(true); it->second.SetExpire(true);
uint64_t delta = expire_at_ms - expire_base_[0]; uint64_t delta = expire_at_ms - expire_base_[0];
CHECK(db.expire.Insert(new_it->first.AsRef(), ExpirePeriod(delta)).second); CHECK(db.expire.Insert(it->first.AsRef(), ExpirePeriod(delta)).second);
} }
return res; return res;

View file

@ -124,7 +124,10 @@ class DbSlice {
// Return .second=true if insertion ocurred, false if we return the existing key. // Return .second=true if insertion ocurred, false if we return the existing key.
// throws: bad_alloc is insertion could not happen due to out of memory. // throws: bad_alloc is insertion could not happen due to out of memory.
std::pair<PrimeIterator, bool> AddOrFind(DbIndex db_ind, std::string_view key) noexcept(false); std::pair<PrimeIterator, bool> AddOrFind(DbIndex db_index, std::string_view key) noexcept(false);
std::tuple<PrimeIterator, ExpireIterator, bool> AddOrFind2(DbIndex db_index,
std::string_view key) noexcept(false);
// Returns second=true if insertion took place, false otherwise. // Returns second=true if insertion took place, false otherwise.
// expire_at_ms equal to 0 - means no expiry. // expire_at_ms equal to 0 - means no expiry.
@ -134,7 +137,7 @@ class DbSlice {
// Either adds or removes (if at == 0) expiry. Returns true if a change was made. // Either adds or removes (if at == 0) expiry. Returns true if a change was made.
// Does not change expiry if at != 0 and expiry already exists. // Does not change expiry if at != 0 and expiry already exists.
bool Expire(DbIndex db_ind, PrimeIterator main_it, uint64_t at); bool UpdateExpire(DbIndex db_ind, PrimeIterator main_it, uint64_t at);
void SetMCFlag(DbIndex db_ind, PrimeKey key, uint32_t flag); void SetMCFlag(DbIndex db_ind, PrimeKey key, uint32_t flag);
uint32_t GetMCFlag(DbIndex db_ind, const PrimeKey& key) const; uint32_t GetMCFlag(DbIndex db_ind, const PrimeKey& key) const;
@ -222,8 +225,8 @@ class DbSlice {
void UnregisterOnChange(uint64_t id); void UnregisterOnChange(uint64_t id);
struct DeleteExpiredStats { struct DeleteExpiredStats {
uint32_t deleted = 0; // number of deleted items due to expiry (less than traversed). uint32_t deleted = 0; // number of deleted items due to expiry (less than traversed).
uint32_t traversed = 0; // number of traversed items that have ttl bit uint32_t traversed = 0; // number of traversed items that have ttl bit
size_t survivor_ttl_sum = 0; // total sum of ttl of survivors (traversed - deleted). size_t survivor_ttl_sum = 0; // total sum of ttl of survivors (traversed - deleted).
}; };

View file

@ -149,7 +149,7 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) {
dest_it->second = std::move(pv_); dest_it->second = std::move(pv_);
} }
dest_it->second.SetExpire(has_expire); // preserve expire flag. dest_it->second.SetExpire(has_expire); // preserve expire flag.
db_slice.Expire(db_indx_, dest_it, src_res_.expire_ts); db_slice.UpdateExpire(db_indx_, dest_it, src_res_.expire_ts);
} else { } else {
if (src_res_.ref_val.ObjType() == OBJ_STRING) { if (src_res_.ref_val.ObjType() == OBJ_STRING) {
pv_.SetString(str_val_); pv_.SetString(str_val_);
@ -621,7 +621,7 @@ OpStatus GenericFamily::OpExpire(const OpArgs& op_args, string_view key,
} else if (IsValid(expire_it)) { } else if (IsValid(expire_it)) {
expire_it->second = db_slice.FromAbsoluteTime(now_msec + rel_msec); expire_it->second = db_slice.FromAbsoluteTime(now_msec + rel_msec);
} else { } else {
db_slice.Expire(op_args.db_ind, it, rel_msec + now_msec); db_slice.UpdateExpire(op_args.db_ind, it, rel_msec + now_msec);
} }
return OpStatus::OK; return OpStatus::OK;
@ -698,10 +698,10 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from_key,
to_it->second = std::move(from_obj); to_it->second = std::move(from_obj);
to_it->second.SetExpire(IsValid(to_expire)); // keep the expire flag on 'to'. to_it->second.SetExpire(IsValid(to_expire)); // keep the expire flag on 'to'.
// It is guaranteed that Expire() call does not erase the element because then // It is guaranteed that UpdateExpire() call does not erase the element because then
// from_it would be invalid. Therefore, it Expire does not invalidate any iterators and // from_it would be invalid. Therefore, UpdateExpire does not invalidate any iterators,
// we can delete via from_it. // therefore we can delete 'from_it'.
db_slice.Expire(op_args.db_ind, to_it, exp_ts); db_slice.UpdateExpire(op_args.db_ind, to_it, exp_ts);
CHECK(db_slice.Del(op_args.db_ind, from_it)); CHECK(db_slice.Del(op_args.db_ind, from_it));
} else { } else {
// Here we first delete from_it because AddNew below could invalidate from_it. // Here we first delete from_it because AddNew below could invalidate from_it.

View file

@ -39,9 +39,6 @@ extern "C" {
using namespace std; using namespace std;
// TODO: to move to absl flags and keep legacy flags only for glog library.
// absl flags allow parsing of custom types and allow specifying which flags appear
// for helpshort.
ABSL_FLAG(uint32_t, port, 6379, "Redis port"); ABSL_FLAG(uint32_t, port, 6379, "Redis port");
ABSL_FLAG(uint32_t, memcache_port, 0, "Memcached port"); ABSL_FLAG(uint32_t, memcache_port, 0, "Memcached port");
ABSL_FLAG(uint64_t, maxmemory, 0, ABSL_FLAG(uint64_t, maxmemory, 0,

View file

@ -143,25 +143,41 @@ OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::s
VLOG(2) << "Set " << key << "(" << db_slice_.shard_id() << ") "; VLOG(2) << "Set " << key << "(" << db_slice_.shard_id() << ") ";
auto [it, expire_it] = db_slice_.FindExt(params.db_index, key); if (params.how == SET_IF_EXISTS) {
auto [it, expire_it] = db_slice_.FindExt(params.db_index, key);
if (IsValid(it)) { // existing if (IsValid(it)) { // existing
return SetExisting(params, it, expire_it, value); return SetExisting(params, it, expire_it, value);
}
return OpStatus::SKIPPED;
} }
// New entry // New entry
if (params.how == SET_IF_EXISTS) tuple<PrimeIterator, ExpireIterator, bool> add_res;
return OpStatus::SKIPPED;
PrimeValue tvalue{value};
tvalue.SetFlag(params.memcache_flags != 0);
uint64_t at_ms = params.expire_after_ms ? params.expire_after_ms + db_slice_.Now() : 0;
try { try {
it = db_slice_.AddNew(params.db_index, key, std::move(tvalue), at_ms); add_res = db_slice_.AddOrFind2(params.db_index, key);
} catch (bad_alloc& e) { } catch (bad_alloc& e) {
return OpStatus::OUT_OF_MEMORY; return OpStatus::OUT_OF_MEMORY;
} }
PrimeIterator it = get<0>(add_res);
if (!get<2>(add_res)) {
return SetExisting(params, it, get<1>(add_res), value);
}
// adding new value.
PrimeValue tvalue{value};
tvalue.SetFlag(params.memcache_flags != 0);
it->second = std::move(tvalue);
db_slice_.PostUpdate(params.db_index, it);
if (params.expire_after_ms) {
db_slice_.UpdateExpire(params.db_index, it, params.expire_after_ms + db_slice_.Now());
}
if (params.memcache_flags)
db_slice_.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags);
EngineShard* shard = db_slice_.shard_owner(); EngineShard* shard = db_slice_.shard_owner();
if (shard->tiered_storage()) { // external storage enabled. if (shard->tiered_storage()) { // external storage enabled.
@ -170,9 +186,6 @@ OpResult<void> SetCmd::Set(const SetParams& params, std::string_view key, std::s
} }
} }
if (params.memcache_flags)
db_slice_.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags);
return OpStatus::OK; return OpStatus::OK;
} }
@ -194,7 +207,7 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt
if (IsValid(e_it) && at_ms) { if (IsValid(e_it) && at_ms) {
e_it->second = db_slice_.FromAbsoluteTime(at_ms); e_it->second = db_slice_.FromAbsoluteTime(at_ms);
} else { } else {
bool changed = db_slice_.Expire(params.db_index, it, at_ms); bool changed = db_slice_.UpdateExpire(params.db_index, it, at_ms);
if (changed && at_ms == 0) // erased. if (changed && at_ms == 0) // erased.
return OpStatus::OK; return OpStatus::OK;
} }