chore(set): minor cleanups and refactorings. (#260)

This commit is contained in:
Roman Gershman 2022-08-27 10:18:54 +03:00 committed by Roman Gershman
parent 3acb1bb704
commit 1ed81cf61d
6 changed files with 122 additions and 75 deletions

View file

@ -16,6 +16,9 @@ void InitRedisTables() {
server.page_size = sysconf(_SC_PAGESIZE);
server.maxmemory_policy = 0;
server.lfu_decay_time = 0;
// been used by t_zset routines that convert listpack to skiplist for cases
// above these thresholds.
server.zset_max_listpack_entries = 128;
@ -29,7 +32,7 @@ void InitRedisTables() {
server.rdb_compression = 1;
server.stream_node_max_bytes = 4096;
server.stream_node_max_entries = 100;
server.stream_node_max_entries = 100;
}
// These functions are moved here from server.c

View file

@ -64,38 +64,21 @@ void dictSdsDestructor(dict *privdata, void *val);
size_t sdsZmallocSize(sds s) ;
typedef struct ServerStub {
// char replid[CONFIG_RUN_ID_SIZE+1]; /* Master replication ID (if master). */
size_t loading_loaded_bytes;
size_t page_size;
long long dirty, master_repl_offset;
time_t lastsave;
char* masterhost;
int rdb_compression;
int loading;
int key_load_delay;
int repl_state;
int loading_start_time;
int loading_total_bytes;
int lastbgsave_status;
int lfu_decay_time; /* LFU counter decay factor. */
/* should not be used. Use FLAGS_list_max_ziplist_size and FLAGS_list_compress_depth instead. */
// int list_compress_depth;
// int list_max_ziplist_size;
unsigned long long maxmemory; /* Max number of memory bytes to use */
// unused - left so that object.c will compile.
int maxmemory_policy; /* Policy for key eviction */
int rdb_save_incremental_fsync;
size_t stat_peak_memory;
unsigned long page_size;
size_t hash_max_listpack_entries,
hash_max_listpack_value;
size_t zset_max_listpack_entries;
size_t zset_max_listpack_value;
int sanitize_dump_payload; /* Enables deep sanitization for ziplist and listpack in RDB and RESTORE. */
long long stat_dump_payload_sanitizations; /* Number deep dump payloads integrity validations. */
off_t loading_rdb_used_mem;
size_t stream_node_max_bytes;
long long stream_node_max_entries;

View file

@ -297,6 +297,8 @@ void RdbLoader::OpaqueObjLoader::CreateSet(const LoadTrace* ltrace) {
} else {
res = createSetObject();
// TODO: to move this logic to set_family similarly to ConvertToStrSet.
/* It's faster to expand the dict to the right size asap in order
* to avoid rehashing */
if (len > DICT_HT_INITIAL_SIZE && dictTryExpand((dict*)res->ptr, len) != DICT_OK) {
@ -651,14 +653,12 @@ void RdbLoader::OpaqueObjLoader::HandleBlob(string_view blob) {
unsigned len = intsetLen(is);
if (len > SetFamily::MaxIntsetEntries()) {
res = createSetObject();
if (len > DICT_HT_INITIAL_SIZE && dictTryExpand((dict*)res->ptr, len) != DICT_OK) {
LOG(ERROR) << "OOM in dictTryExpand " << len;
if (!SetFamily::ConvertToStrSet(is, len, res)) {
LOG(ERROR) << "OOM in ConvertToStrSet " << len;
decrRefCount(res);
ec_ = RdbError(errc::out_of_memory);
return;
}
SetFamily::ConvertTo(is, (dict*)res->ptr);
} else {
intset* mine = (intset*)zmalloc(blob.size());
memcpy(mine, blob.data(), blob.size());

View file

@ -209,7 +209,7 @@ bool DoesTimeNibbleMatchSpecifier(string_view time_spec, unsigned int current_ti
for (int i = time_spec.length() - 1; i >= 0; --i) {
// if the current digit is not a wildcard and it does not match the digit in the current time it
// does not match
if (time_spec[i] != '*' && (current_time % 10) != (time_spec[i] - '0')) {
if (time_spec[i] != '*' && int(current_time % 10) != (time_spec[i] - '0')) {
return false;
}
current_time /= 10;

View file

@ -52,6 +52,24 @@ intset* IntsetAddSafe(string_view val, intset* is, bool* success, bool* added) {
return is;
}
unsigned AddStrSet(ArgSlice vals, CompactObj* dest) {
unsigned res = 0;
dict* ds = (dict*)dest->RObjPtr();
auto* es = EngineShard::tlocal();
for (auto member : vals) {
es->tmp_str1 = sdscpylen(es->tmp_str1, member.data(), member.size());
dictEntry* de = dictAddRaw(ds, es->tmp_str1, NULL);
if (de) {
de->key = sdsdup(es->tmp_str1);
++res;
}
}
return res;
}
// returns (removed, isempty)
pair<unsigned, bool> RemoveSet(ArgSlice vals, CompactObj* set) {
bool isempty = false;
@ -105,6 +123,55 @@ void InitSet(ArgSlice vals, CompactObj* set) {
}
}
void ScanCallback(void* privdata, const dictEntry* de) {
StringVec* sv = (StringVec*)privdata;
sds key = (sds)de->key;
sv->push_back(string(key, sdslen(key)));
}
uint64_t ScanStrSet(const CompactObj& co, uint64_t curs, unsigned count, StringVec* res) {
long maxiterations = count * 10;
DCHECK_EQ(kEncodingStrMap, co.Encoding());
dict* ds = (dict*)co.RObjPtr();
do {
curs = dictScan(ds, curs, ScanCallback, NULL, res);
} while (curs && maxiterations-- && res->size() < count);
return curs;
}
using SetType = pair<void*, unsigned>;
// Removes arg from result.
void DiffStrSet(const SetType& st, absl::flat_hash_set<string>* result) {
DCHECK_EQ(kEncodingStrMap, st.second);
dict* ds = (dict*)st.first;
dictIterator* di = dictGetIterator(ds);
dictEntry* de = nullptr;
while ((de = dictNext(di))) {
sds key = (sds)de->key;
result->erase(string_view{key, sdslen(key)});
}
dictReleaseIterator(di);
}
StringVec PopStrSet(unsigned count, const SetType& st) {
StringVec result;
dict* ds = (dict*)st.first;
string str;
dictIterator* di = dictGetSafeIterator(ds);
for (uint32_t i = 0; i < count; ++i) {
dictEntry* de = dictNext(di);
DCHECK(de);
result.emplace_back((sds)de->key, sdslen((sds)de->key));
dictDelete(ds, de->key);
}
dictReleaseIterator(di);
return result;
}
vector<string> ToVec(absl::flat_hash_set<string>&& set) {
vector<string> result(set.size());
size_t i = 0;
@ -354,12 +421,15 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
res += added;
if (!success) {
dict* ds = dictCreate(&setDictType);
SetFamily::ConvertTo(is, ds);
co.SetRObjPtr(is);
co.InitRobj(OBJ_SET, kEncodingStrMap, ds); // 'is' is deleted by co.
inner_obj = ds;
robj tmp;
if (!SetFamily::ConvertToStrSet(is, intsetLen(is), &tmp)) {
return OpStatus::OUT_OF_MEMORY;
}
// frees 'is' on a way.
co.InitRobj(OBJ_SET, kEncodingStrMap, tmp.ptr);
inner_obj = co.RObjPtr();
break;
}
}
@ -368,17 +438,8 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
co.SetRObjPtr(is);
}
if (co.Encoding() == kEncodingStrMap) {
dict* ds = (dict*)inner_obj;
for (auto member : vals) {
es->tmp_str1 = sdscpylen(es->tmp_str1, member.data(), member.size());
dictEntry* de = dictAddRaw(ds, es->tmp_str1, NULL);
if (de) {
de->key = sdsdup(es->tmp_str1);
++res;
}
}
if (co.Encoding() != kEncodingIntSet) {
res = AddStrSet(std::move(vals), &co);
}
db_slice.PostUpdate(op_args.db_ind, it, !new_key);
@ -496,12 +557,6 @@ OpResult<unsigned> Mover::Commit(Transaction* t) {
return res;
}
void ScanCallback(void* privdata, const dictEntry* de) {
StringVec* sv = (StringVec*)privdata;
sds key = (sds)de->key;
sv->push_back(string(key, sdslen(key)));
}
// Read-only OpUnion op on sets.
OpResult<StringVec> OpUnion(const OpArgs& op_args, ArgSlice keys) {
DCHECK(!keys.empty());
@ -562,15 +617,7 @@ OpResult<StringVec> OpDiff(const OpArgs& op_args, ArgSlice keys) {
uniques.erase(string_view{buf, size_t(next - buf)});
}
} else {
DCHECK_EQ(kEncodingStrMap, st2.second);
dict* ds = (dict*)st2.first;
dictIterator* di = dictGetIterator(ds);
dictEntry* de = nullptr;
while ((de = dictNext(di))) {
sds key = (sds)de->key;
uniques.erase(string_view{key, sdslen(key)});
}
dictReleaseIterator(di);
DiffStrSet(st2, &uniques);
}
}
@ -1108,16 +1155,7 @@ OpResult<StringVec> SetFamily::OpPop(const OpArgs& op_args, std::string_view key
is = intsetTrimTail(is, count); // now remove last count items
it->second.SetRObjPtr(is);
} else {
dict* ds = (dict*)st.first;
string str;
dictIterator* di = dictGetSafeIterator(ds);
for (uint32_t i = 0; i < count; ++i) {
dictEntry* de = dictNext(di);
DCHECK(de);
result.emplace_back((sds)de->key, sdslen((sds)de->key));
dictDelete(ds, de->key);
}
dictReleaseIterator(di);
result = PopStrSet(count, st);
}
db_slice.PostUpdate(op_args.db_ind, it);
}
@ -1144,20 +1182,39 @@ OpResult<StringVec> SetFamily::OpScan(const OpArgs& op_args, std::string_view ke
}
*cursor = 0;
} else {
DCHECK_EQ(kEncodingStrMap, it->second.Encoding());
long maxiterations = count * 10;
dict* ds = (dict*)it->second.RObjPtr();
uint64_t cur = *cursor;
do {
cur = dictScan(ds, cur, ScanCallback, NULL, &res);
} while (cur && maxiterations-- && res.size() < count);
*cursor = cur;
*cursor = ScanStrSet(it->second, *cursor, count, &res);
}
return res;
}
bool SetFamily::ConvertToStrSet(const intset* is, size_t expected_len, robj* dest) {
int64_t intele;
char buf[32];
int ii = 0;
dict* ds = dictCreate(&setDictType);
if (expected_len) {
if (dictTryExpand(ds, expected_len) != DICT_OK) {
dictRelease(ds);
return false;
}
}
/* To add the elements we extract integers and create redis objects */
while (intsetGet(const_cast<intset*>(is), ii++, &intele)) {
char* next = absl::numbers_internal::FastIntToBuffer(intele, buf);
sds s = sdsnewlen(buf, next - buf);
CHECK(dictAddRaw(ds, s, NULL));
}
dest->ptr = ds;
dest->encoding = OBJ_ENCODING_HT;
return true;
}
using CI = CommandId;
#define HFUNC(x) SetHandler(&SetFamily::x)

View file

@ -9,6 +9,7 @@
typedef struct intset intset;
typedef struct redisObject robj;
typedef struct dict dict;
namespace dfly {
@ -27,6 +28,9 @@ class SetFamily {
static void ConvertTo(const intset* src, dict* dest);
// Returns true if succeeded, false on OOM.
static bool ConvertToStrSet(const intset* is, size_t expected_len, robj* dest);
private:
static void SAdd(CmdArgList args, ConnectionContext* cntx);
static void SIsMember(CmdArgList args, ConnectionContext* cntx);