chore: optimize element removal for integer lists (#3827)

Fixes #3806

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-09-30 15:19:01 +03:00 committed by GitHub
parent ec353e1522
commit 14c94e41d7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
5 changed files with 20 additions and 11 deletions

View file

@ -76,6 +76,7 @@ unsigned char *lpGet(unsigned char *p, int64_t *count, unsigned char *intbuf);
// Fills count and returns 1 if the item is an integer, 0 otherwise. // Fills count and returns 1 if the item is an integer, 0 otherwise.
int lpGetInteger(unsigned char *p, int64_t *ival); int lpGetInteger(unsigned char *p, int64_t *ival);
int lpStringToInt64(const char *s, unsigned long slen, int64_t *value);
unsigned char *lpGetValue(unsigned char *p, unsigned int *slen, long long *lval); unsigned char *lpGetValue(unsigned char *p, unsigned int *slen, long long *lval);
unsigned char *lpFind(unsigned char *lp, unsigned char *p, unsigned char *s, uint32_t slen, unsigned int skip); unsigned char *lpFind(unsigned char *lp, unsigned char *p, unsigned char *s, uint32_t slen, unsigned int skip);

View file

@ -1184,14 +1184,6 @@ int quicklistDelRange(quicklist *quicklist, const long start, const long count)
return 1; return 1;
} }
/* compare between a two entries */
int quicklistCompare(const quicklistEntry* entry, const unsigned char *p2, const size_t p2_len) {
if (unlikely(QL_NODE_IS_PLAIN(entry->node))) {
return ((entry->sz == p2_len) && (memcmp(entry->value, p2, p2_len) == 0));
}
return lpCompare(entry->zi, p2, p2_len);
}
/* Returns a quicklist iterator 'iter'. After the initialization every /* Returns a quicklist iterator 'iter'. After the initialization every
* call to quicklistNext() will return the next element of the quicklist. */ * call to quicklistNext() will return the next element of the quicklist. */
quicklistIter *quicklistGetIterator(quicklist *quicklist, int direction) { quicklistIter *quicklistGetIterator(quicklist *quicklist, int direction) {

View file

@ -186,7 +186,6 @@ int quicklistPopCustom(quicklist *quicklist,
void *(*saver)(unsigned char *data, size_t sz)); void *(*saver)(unsigned char *data, size_t sz));
int quicklistPop(quicklist *quicklist, int where, unsigned char **data, size_t *sz, long long *slong); int quicklistPop(quicklist *quicklist, int where, unsigned char **data, size_t *sz, long long *slong);
unsigned long quicklistCount(const quicklist *ql); unsigned long quicklistCount(const quicklist *ql);
int quicklistCompare(const quicklistEntry *entry, const unsigned char *p2, const size_t p2_len);
size_t quicklistGetLzf(const quicklistNode *node, void **data); size_t quicklistGetLzf(const quicklistNode *node, void **data);
void quicklistNodeLimit(int fill, size_t *size, unsigned int *count); void quicklistNodeLimit(int fill, size_t *size, unsigned int *count);
int quicklistNodeExceedsLimit(int fill, size_t new_sz, unsigned int new_count); int quicklistNodeExceedsLimit(int fill, size_t new_sz, unsigned int new_count);

View file

@ -547,10 +547,20 @@ OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, string_view ele
quicklistInitIterator(&qiter, ql, iter_direction, index); quicklistInitIterator(&qiter, ql, iter_direction, index);
quicklistEntry entry; quicklistEntry entry;
unsigned removed = 0; unsigned removed = 0;
const uint8_t* elem_ptr = reinterpret_cast<const uint8_t*>(elem.data()); int64_t ival;
// try parsing the element into an integer.
int is_int = lpStringToInt64(elem.data(), elem.size(), &ival);
auto is_match = [&](const quicklistEntry& entry) {
if (is_int != (entry.value == nullptr))
return false;
return is_int ? entry.longval == ival : ElemCompare(entry, elem);
};
while (quicklistNext(&qiter, &entry)) { while (quicklistNext(&qiter, &entry)) {
if (quicklistCompare(&entry, elem_ptr, elem.size())) { if (is_match(entry)) {
quicklistDelEntry(&qiter, &entry); quicklistDelEntry(&qiter, &entry);
removed++; removed++;
if (count && removed == count) if (count && removed == count)

View file

@ -370,6 +370,13 @@ TEST_F(ListFamilyTest, LRem) {
Run({"set", "foo", "bar"}); Run({"set", "foo", "bar"});
ASSERT_THAT(Run({"lrem", "foo", "0", "elem"}), ErrArg("WRONGTYPE")); ASSERT_THAT(Run({"lrem", "foo", "0", "elem"}), ErrArg("WRONGTYPE"));
ASSERT_THAT(Run({"lrem", "nexists", "0", "elem"}), IntArg(0)); ASSERT_THAT(Run({"lrem", "nexists", "0", "elem"}), IntArg(0));
// Triggers QUICKLIST_NODE_CONTAINER_PLAIN coverage
string val(10000, 'a');
Run({"rpush", kKey2, val, "12345678"});
ASSERT_THAT(Run({"lrem", kKey2, "1", "12345678"}), IntArg(1));
ASSERT_THAT(Run({"lrem", kKey2, "1", val}), IntArg(1));
} }
TEST_F(ListFamilyTest, LTrim) { TEST_F(ListFamilyTest, LTrim) {