chore: get rid of object.c and robj* in cc code (#2610)

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-02-18 16:52:23 +02:00 committed by GitHub
parent 417ca952d6
commit fa75360227
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
45 changed files with 94 additions and 1962 deletions

View file

@ -13,17 +13,17 @@ cxx_link(dfly_core base absl::flat_hash_map absl::str_format redis_lib TRDP::lua
fibers2 ${SEARCH_LIB} jsonpath OpenSSL::Crypto TRDP::dconv)
add_executable(dash_bench dash_bench.cc)
cxx_link(dash_bench dfly_core)
cxx_link(dash_bench dfly_core redis_test_lib)
cxx_test(dfly_core_test dfly_core LABELS DFLY)
cxx_test(compact_object_test dfly_core LABELS DFLY)
cxx_test(extent_tree_test dfly_core LABELS DFLY)
cxx_test(external_alloc_test dfly_core LABELS DFLY)
cxx_test(dash_test dfly_core file DATA testdata/ids.txt LABELS DFLY)
cxx_test(dash_test dfly_core file redis_test_lib DATA testdata/ids.txt LABELS DFLY)
cxx_test(interpreter_test dfly_core LABELS DFLY)
cxx_test(lru_test dfly_core LABELS DFLY)
cxx_test(string_set_test dfly_core LABELS DFLY)
cxx_test(string_map_test dfly_core LABELS DFLY)
cxx_test(sorted_map_test dfly_core LABELS DFLY)
cxx_test(sorted_map_test dfly_core redis_test_lib LABELS DFLY)
cxx_test(bptree_set_test dfly_core LABELS DFLY)
cxx_test(score_map_test dfly_core LABELS DFLY)

View file

@ -10,7 +10,7 @@
extern "C" {
#include "redis/intset.h"
#include "redis/listpack.h"
#include "redis/object.h"
#include "redis/quicklist.h"
#include "redis/redis_aux.h"
#include "redis/stream.h"
#include "redis/util.h"
@ -199,9 +199,6 @@ static_assert(ascii_len(16) == 18);
static_assert(ascii_len(17) == 19);
struct TL {
robj tmp_robj{
.type = 0, .encoding = 0, .lru = 0, .refcount = OBJ_STATIC_REFCOUNT, .ptr = nullptr};
MemoryResource* local_mr = PMR_NS::get_default_resource();
size_t small_str_bytes;
base::PODArray<uint8_t> tmp_buf;
@ -671,42 +668,12 @@ unsigned CompactObj::Encoding() const {
}
}
robj* CompactObj::AsRObj() const {
CHECK_EQ(ROBJ_TAG, taglen_);
robj* res = &tl.tmp_robj;
unsigned enc = u_.r_obj.encoding();
res->type = u_.r_obj.type();
if (res->type == OBJ_SET || res->type == OBJ_HASH || res->type == OBJ_ZSET) {
LOG(DFATAL) << "Should not call AsRObj for type " << res->type;
}
res->encoding = enc;
res->lru = 0; // u_.r_obj.unneeded;
res->ptr = u_.r_obj.inner_obj();
return res;
}
void CompactObj::InitRobj(unsigned type, unsigned encoding, void* obj) {
DCHECK_NE(type, OBJ_STRING);
SetMeta(ROBJ_TAG, mask_);
u_.r_obj.Init(type, encoding, obj);
}
void CompactObj::SyncRObj() {
robj* obj = &tl.tmp_robj;
DCHECK_EQ(ROBJ_TAG, taglen_);
DCHECK_EQ(u_.r_obj.type(), obj->type);
DCHECK_NE(OBJ_SET, obj->type) << "sets should be handled without robj";
CHECK_NE(OBJ_ZSET, obj->type) << "zsets should be handled without robj";
unsigned enc = obj->encoding;
u_.r_obj.Init(obj->type, enc, obj->ptr);
}
void CompactObj::SetInt(int64_t val) {
if (INT_TAG != taglen_) {
SetMeta(INT_TAG, mask_ & ~kEncMask);

View file

@ -12,8 +12,6 @@
#include "core/json/json_object.h"
#include "core/small_string.h"
typedef struct redisObject robj;
namespace dfly {
constexpr unsigned kEncodingIntSet = 0;
@ -269,16 +267,10 @@ class CompactObj {
u_.r_obj.Init(u_.r_obj.type(), u_.r_obj.encoding(), ptr);
}
robj* AsRObj() const;
// takes ownership over obj_inner.
// type should not be OBJ_STRING.
void InitRobj(unsigned type, unsigned encoding, void* obj_inner);
// Syncs 'this' instance with the object that was previously returned by AsRObj().
// Requires: AsRObj() has been called before in the same thread in fiber-atomic section.
void SyncRObj();
// For STR object.
void SetInt(int64_t val);
std::optional<int64_t> TryGetInt() const;

View file

@ -17,9 +17,7 @@
#include "core/mi_memory_resource.h"
extern "C" {
#include "redis/dict.h"
#include "redis/intset.h"
#include "redis/object.h"
#include "redis/redis_aux.h"
#include "redis/stream.h"
#include "redis/zmalloc.h"

View file

@ -8,14 +8,17 @@ else()
set(ZMALLOC_DEPS "")
endif()
add_library(redis_lib crc16.c crc64.c crcspeed.c debug.c dict.c intset.c geo.c
geohash.c geohash_helper.c
listpack.c mt19937-64.c object.c lzf_c.c lzf_d.c sds.c
quicklist.c rax.c pqsort.c redis_aux.c siphash.c t_stream.c t_zset.c
add_library(redis_lib crc16.c crc64.c crcspeed.c debug.c intset.c geo.c
geohash.c geohash_helper.c t_zset.c
listpack.c mt19937-64.c lzf_c.c lzf_d.c sds.c
quicklist.c rax.c pqsort.c redis_aux.c siphash.c t_stream.c
util.c ziplist.c hyperloglog.c ${ZMALLOC_SRC})
cxx_link(redis_lib ${ZMALLOC_DEPS})
add_library(redis_test_lib dict.c)
cxx_link(redis_test_lib redis_lib)
if("${CMAKE_CXX_COMPILER_ID}" STREQUAL "GNU")
target_compile_options(redis_lib PRIVATE -Wno-maybe-uninitialized)
endif()

View file

@ -74,44 +74,3 @@ void _serverAssert(const char *estr, const char *file, int line) {
serverLog(LL_WARNING,"=== ASSERTION FAILED ===");
serverLog(LL_WARNING,"==> %s:%d '%s' is not true",file,line,estr);
}
/* Low level logging. To use only for very big messages, otherwise
* serverLog() is to prefer. */
void serverLogRaw(int level, const char *msg) {
FILE *fp;
int log_to_stdout = 1;
level &= 0xff; /* clear flags */
if (level < verbosity) return;
fp = stdout;
fprintf(fp,"%s",msg);
fflush(fp);
if (!log_to_stdout) fclose(fp);
}
void serverLogHexDump(int level, char *descr, void *value, size_t len) {
char buf[65], *b;
unsigned char *v = value;
char charset[] = "0123456789abcdef";
serverLog(level,"%s (hexdump of %zu bytes):", descr, len);
b = buf;
while(len) {
b[0] = charset[(*v)>>4];
b[1] = charset[(*v)&0xf];
b[2] = '\0';
b += 2;
len--;
v++;
if (b-buf == 64 || len == 0) {
serverLogRaw(level|LL_RAW,buf);
b = buf;
}
}
serverLogRaw(level|LL_RAW,"\n");
}

View file

@ -34,7 +34,7 @@
#include "geohash_helper.h"
#include "listpack.h"
#include "pqsort.h"
#include "object.h"
#include "util.h"
#include "zmalloc.h"
#include "zset.h"

View file

@ -34,7 +34,6 @@
#include <math.h>
#include <string.h>
#include "redis/object.h"
#include "redis/redis_aux.h"
#include "redis/util.h"
@ -591,69 +590,6 @@ void hllDenseRegHisto(uint8_t* registers, int* reghisto) {
/* ================== Sparse representation implementation ================= */
/* Convert the HLL with sparse representation given as input in its dense
* representation. Both representations are represented by SDS strings, and
* the input representation is freed as a side effect.
*
* The function returns C_OK if the sparse representation was valid,
* otherwise C_ERR is returned if the representation was corrupted. */
int hllSparseToDense(robj* o) {
sds sparse = o->ptr, dense;
struct hllhdr *hdr, *oldhdr = (struct hllhdr*)sparse;
int idx = 0, runlen, regval;
uint8_t *p = (uint8_t*)sparse, *end = p + sdslen(sparse);
/* If the representation is already the right one return ASAP. */
hdr = (struct hllhdr*)sparse;
if (hdr->encoding == HLL_DENSE)
return C_OK;
/* Create a string of the right size filled with zero bytes.
* Note that the cached cardinality is set to 0 as a side effect
* that is exactly the cardinality of an empty HLL. */
dense = sdsnewlen(NULL, HLL_DENSE_SIZE);
hdr = (struct hllhdr*)dense;
*hdr = *oldhdr; /* This will copy the magic and cached cardinality. */
hdr->encoding = HLL_DENSE;
/* Now read the sparse representation and set non-zero registers
* accordingly. */
p += HLL_HDR_SIZE;
while (p < end) {
if (HLL_SPARSE_IS_ZERO(p)) {
runlen = HLL_SPARSE_ZERO_LEN(p);
idx += runlen;
p++;
} else if (HLL_SPARSE_IS_XZERO(p)) {
runlen = HLL_SPARSE_XZERO_LEN(p);
idx += runlen;
p += 2;
} else {
runlen = HLL_SPARSE_VAL_LEN(p);
regval = HLL_SPARSE_VAL_VALUE(p);
if ((runlen + idx) > HLL_REGISTERS)
break; /* Overflow. */
while (runlen--) {
HLL_DENSE_SET_REGISTER(hdr->registers, idx, regval);
idx++;
}
p++;
}
}
/* If the sparse representation was valid, we expect to find idx
* set to HLL_REGISTERS. */
if (idx != HLL_REGISTERS) {
sdsfree(dense);
return C_ERR;
}
/* Free the old representation and set the new one. */
sdsfree(o->ptr);
o->ptr = dense;
return C_OK;
}
/* Compute the register histogram in the sparse representation. */
void hllSparseRegHisto(uint8_t* sparse, int sparselen, int* invalid, int* reghisto) {
int idx = 0, runlen, regval;
@ -796,6 +732,7 @@ uint64_t hllCount(struct hllhdr* hdr, int* invalid) {
return (uint64_t)E;
}
#if 0
/* Merge by computing MAX(registers[i],hll[i]) the HyperLogLog 'hll'
* with an array of uint8_t HLL_REGISTERS registers pointed by 'max'.
*
@ -851,7 +788,6 @@ int hllMerge(uint8_t* max, robj* hll) {
}
/* ========================== HyperLogLog commands ========================== */
robj* createHLLObject(void) {
robj* o;
struct hllhdr* hdr;
@ -884,6 +820,7 @@ robj* createHLLObject(void) {
hdr->encoding = HLL_SPARSE;
return o;
}
#endif
/* ========================== Dragonfly custom functions ===================== */

File diff suppressed because it is too large Load diff

View file

@ -1,133 +0,0 @@
#ifndef __REDIS_OBJECT_H
#define __REDIS_OBJECT_H
#include <stddef.h>
#include "sds.h"
#include "quicklist.h"
/* The actual Redis Object */
#define OBJ_STRING 0U /* String object. */
#define OBJ_LIST 1U /* List object. */
#define OBJ_SET 2U /* Set object. */
#define OBJ_ZSET 3U /* Sorted set object. */
#define OBJ_HASH 4U /* Hash object. */
#define OBJ_MODULE 5U /* Module object. */
#define OBJ_STREAM 6U /* Stream object. */
/* Objects encoding. Some kind of objects like Strings and Hashes can be
* internally represented in multiple ways. The 'encoding' field of the object
* is set to one of this fields for this object. */
#define OBJ_ENCODING_RAW 0U /* Raw representation */
#define OBJ_ENCODING_INT 1U /* Encoded as integer */
#define OBJ_ENCODING_HT 2U /* Encoded as hash table */
#define OBJ_ENCODING_ZIPMAP 3U /* Encoded as zipmap */
#define OBJ_ENCODING_LINKEDLIST 4U /* No longer used: old list encoding. */
#define OBJ_ENCODING_ZIPLIST 5U /* Encoded as ziplist */
#define OBJ_ENCODING_INTSET 6U /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7U /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8U /* Embedded sds string encoding */
#define OBJ_ENCODING_QUICKLIST 9U /* Encoded as linked list of ziplists */
#define OBJ_ENCODING_STREAM 10U /* Encoded as a radix tree of listpacks */
#define OBJ_ENCODING_LISTPACK 11 /* Encoded as a listpack */
#define OBJ_ENCODING_COMPRESS_INTERNAL 15U /* Kept as lzf compressed, to pass compressed blob to another thread */
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64 elements */
#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
#define OBJ_HASH_KEY 1
#define OBJ_HASH_VALUE 2
#define OBJ_SHARED_INTEGERS 10000
#define OBJ_SHARED_REFCOUNT INT_MAX /* Global object never destroyed. */
#define OBJ_STATIC_REFCOUNT (INT_MAX-1) /* Object allocated in the stack. */
#define OBJ_FIRST_SPECIAL_REFCOUNT OBJ_STATIC_REFCOUNT
/* Error codes */
#define C_OK 0
#define C_ERR -1
typedef struct redisObject {
unsigned type:4;
unsigned encoding:4;
unsigned lru:24; /* LRU time (relative to global lru_clock) or
* LFU data (least significant 8 bits frequency
* and most significant 16 bits access time). */
int refcount;
void *ptr;
} robj;
/* Redis object implementation */
void decrRefCountVoid(void *o);
int getLongLongFromObject(robj *o, long long *target);
void incrRefCount(robj *o);
robj *makeObjectShared(robj *o);
robj *resetRefCount(robj *obj);
void freeStringObject(robj *o);
void freeListObject(robj *o);
void freeSetObject(robj *o);
void freeZsetObject(robj *o);
void freeHashObject(robj *o);
robj *createObject(int type, void *ptr);
robj *createStreamObject(void);
robj *createStringObject(const char *ptr, size_t len);
robj *createRawStringObject(const char *ptr, size_t len);
robj *createEmbeddedStringObject(const char *ptr, size_t len);
robj *dupStringObject(const robj *o);
int isSdsRepresentableAsLongLong(sds s, long long *llval);
int isObjectRepresentableAsLongLong(robj *o, long long *llongval);
robj *getDecodedObject(robj *o);
size_t stringObjectLen(robj *o);
robj *createStringObjectFromLongLong(long long value);
robj *createStringObjectFromLongLongForValue(long long value);
robj *createStringObjectFromLongDouble(long double value, int humanfriendly);
robj *createQuicklistObject(void);
robj *createSetObject(void);
robj *createIntsetObject(void);
robj *createHashObject(void);
unsigned long long estimateObjectIdleTime(const robj *o);
uint8_t LFUDecrAndReturn(time_t epoch_sec, const robj *o);
void listTypeConvert(robj *subject, int enc);
int objectSetLRUOrLFU(robj *val, long long lfu_freq, long long lru_idle,
long long lru_clock, int lru_multiplier);
robj *setTypeCreate(sds value);
int setTypeAdd(robj *subject, sds value);
int setTypeRemove(robj *subject, sds value);
int setTypeIsMember(const robj *subject, sds value);
int setTypeRandomElement(robj *setobj, sds *sdsele, int64_t *llele);
unsigned long setTypeRandomElements(robj *set, unsigned long count, robj *aux_set);
unsigned long setTypeSize(const robj *subject);
void setTypeConvert(robj *subject, int enc);
static inline int sdsEncodedObject(const robj *o) {
return o->encoding == OBJ_ENCODING_RAW || o->encoding == OBJ_ENCODING_EMBSTR;
}
/* Structure to hold list iteration abstraction. */
typedef struct {
robj *subject;
unsigned char encoding;
unsigned char direction; /* Iteration direction */
quicklistIter *iter;
} listTypeIterator;
/* Structure for an entry while iterating over a list. */
typedef struct {
listTypeIterator *li;
quicklistEntry entry; /* Entry in quicklist */
} listTypeEntry;
const char *strEncoding(int encoding);
#define serverAssertWithInfo(x, y, z) serverAssert(z)
#endif

View file

@ -34,7 +34,6 @@
#include <stdio.h>
#include <time.h>
#include "object.h"
#include "redis_aux.h"
/* The current RDB version. When the format changes in a way that is no longer

View file

@ -6,7 +6,6 @@
#include "crc64.h"
#include "dict.h"
#include "endianconv.h"
#include "object.h"
#include "zmalloc.h"
Server server;
@ -30,35 +29,24 @@ void InitRedisTables() {
server.stream_node_max_entries = 100;
}
// These functions are moved here from server.c
uint64_t dictSdsHash(const void* key) {
return dictGenHashFunction((unsigned char*)key, sdslen((char*)key));
}
int dictSdsKeyCompare(dict* d, const void* key1, const void* key2) {
int l1, l2;
DICT_NOTUSED(d);
l1 = sdslen((sds)key1);
l2 = sdslen((sds)key2);
if (l1 != l2)
return 0;
return memcmp(key1, key2, l1) == 0;
}
void dictSdsDestructor(dict* d, void* val) {
DICT_NOTUSED(d);
sdsfree(val);
}
/* Return the size consumed from the allocator, for the specified SDS string,
* including internal fragmentation. This function is used in order to compute
* the client output buffer size. */
size_t sdsZmallocSize(sds s) {
void* sh = sdsAllocPtr(s);
return zmalloc_size(sh);
const char *strEncoding(int encoding) {
switch(encoding) {
case OBJ_ENCODING_RAW: return "raw";
case OBJ_ENCODING_INT: return "int";
case OBJ_ENCODING_HT: return "hashtable";
case OBJ_ENCODING_ZIPMAP: return "zipmap";
case OBJ_ENCODING_LINKEDLIST: return "linkedlist";
case OBJ_ENCODING_ZIPLIST: return "ziplist";
case OBJ_ENCODING_INTSET: return "intset";
case OBJ_ENCODING_SKIPLIST: return "skiplist";
case OBJ_ENCODING_EMBSTR: return "embstr";
case OBJ_ENCODING_QUICKLIST: return "quicklist";
case OBJ_ENCODING_STREAM: return "stream";
case OBJ_ENCODING_LISTPACK: return "listpack";
case OBJ_ENCODING_COMPRESS_INTERNAL: return "compress_internal";
default: return "unknown";
}
}
/* Toggle the 64 bit unsigned integer pointed by *p from little endian to

View file

@ -76,6 +76,33 @@ extern Server server;
void InitRedisTables();
typedef struct redisObject robj;
const char *strEncoding(int encoding);
/* The actual Redis Object */
#define OBJ_STRING 0U /* String object. */
#define OBJ_LIST 1U /* List object. */
#define OBJ_SET 2U /* Set object. */
#define OBJ_ZSET 3U /* Sorted set object. */
#define OBJ_HASH 4U /* Hash object. */
#define OBJ_MODULE 5U /* Module object. */
#define OBJ_STREAM 6U /* Stream object. */
/* Objects encoding. Some kind of objects like Strings and Hashes can be
* internally represented in multiple ways. The 'encoding' field of the object
* is set to one of this fields for this object. */
#define OBJ_ENCODING_RAW 0U /* Raw representation */
#define OBJ_ENCODING_INT 1U /* Encoded as integer */
#define OBJ_ENCODING_HT 2U /* Encoded as hash table */
#define OBJ_ENCODING_ZIPMAP 3U /* Encoded as zipmap */
#define OBJ_ENCODING_LINKEDLIST 4U /* No longer used: old list encoding. */
#define OBJ_ENCODING_ZIPLIST 5U /* Encoded as ziplist */
#define OBJ_ENCODING_INTSET 6U /* Encoded as intset */
#define OBJ_ENCODING_SKIPLIST 7U /* Encoded as skiplist */
#define OBJ_ENCODING_EMBSTR 8U /* Embedded sds string encoding */
#define OBJ_ENCODING_QUICKLIST 9U /* Encoded as linked list of ziplists */
#define OBJ_ENCODING_STREAM 10U /* Encoded as a radix tree of listpacks */
#define OBJ_ENCODING_LISTPACK 11 /* Encoded as a listpack */
#define OBJ_ENCODING_COMPRESS_INTERNAL 15U /* Kept as lzf compressed, to pass compressed blob to another thread */
#endif /* __REDIS_AUX_H */

View file

@ -3,10 +3,12 @@
#include "util.h"
#include "rax.h"
#include "object.h"
#include "sds.h"
#include "listpack.h"
typedef struct redisObject robj;
/* Stream item ID: a 128 bit number composed of a milliseconds time and
* a sequence counter. IDs generated in the same millisecond (or in a past
* millisecond if the clock jumped backward) will use the millisecond time
@ -98,12 +100,6 @@ typedef struct streamNACK {
in the last delivery. */
} streamNACK;
/* Stream propagation information, passed to functions in order to propagate
* XCLAIM commands to AOF and slaves. */
typedef struct streamPropInfo {
robj *keyname;
robj *groupname;
} streamPropInfo;
typedef struct {
/* XADD options */
@ -146,7 +142,6 @@ typedef struct {
stream *streamNew(void);
void freeStream(stream *s);
unsigned long streamLength(const robj *subject);
// size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end, size_t count, int rev, streamCG *group, streamConsumer *consumer, int flags, streamPropInfo *spi);
void streamIteratorStart(streamIterator *si, stream *s, streamID *start, streamID *end, int rev);
int streamIteratorGetID(streamIterator *si, streamID *id, int64_t *numfields);
@ -165,8 +160,7 @@ int streamEntryExists(stream *s, streamID *id);
void streamFreeNACK(streamNACK *na);
int streamIncrID(streamID *id);
int streamDecrID(streamID *id);
// void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds consumername);
robj *streamDup(robj *o);
int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep);
int streamParseID(const robj *o, streamID *id);
robj *createObjectFromStreamID(streamID *id);

View file

@ -96,12 +96,6 @@ void freeStream(stream *s) {
zfree(s);
}
/* Return the length of a stream. */
unsigned long streamLength(const robj *subject) {
stream *s = subject->ptr;
return s->length;
}
/* Set 'id' to be its successor stream ID.
* If 'id' is the maximal possible id, it is wrapped around to 0-0 and a
* C_ERR is returned. */
@ -157,114 +151,6 @@ void streamNextID(streamID *last_id, streamID *new_id) {
}
}
/* This is a helper function for the COPY command.
* Duplicate a Stream object, with the guarantee that the returned object
* has the same encoding as the original one.
*
* The resulting object always has refcount set to 1 */
robj *streamDup(robj *o) {
robj *sobj;
serverAssert(o->type == OBJ_STREAM);
switch (o->encoding) {
case OBJ_ENCODING_STREAM:
sobj = createStreamObject();
break;
default:
serverPanic("Wrong encoding.");
break;
}
stream *s;
stream *new_s;
s = o->ptr;
new_s = sobj->ptr;
raxIterator ri;
uint64_t rax_key[2];
raxStart(&ri, s->rax_tree);
raxSeek(&ri, "^", NULL, 0);
size_t lp_bytes = 0; /* Total bytes in the listpack. */
unsigned char *lp = NULL; /* listpack pointer. */
/* Get a reference to the listpack node. */
while (raxNext(&ri)) {
lp = ri.data;
lp_bytes = lpBytes(lp);
unsigned char *new_lp = zmalloc(lp_bytes);
memcpy(new_lp, lp, lp_bytes);
memcpy(rax_key, ri.key, sizeof(rax_key));
raxInsert(new_s->rax_tree, (unsigned char *)&rax_key, sizeof(rax_key),
new_lp, NULL);
}
new_s->length = s->length;
new_s->first_id = s->first_id;
new_s->last_id = s->last_id;
new_s->max_deleted_entry_id = s->max_deleted_entry_id;
new_s->entries_added = s->entries_added;
raxStop(&ri);
if (s->cgroups == NULL) return sobj;
/* Consumer Groups */
raxIterator ri_cgroups;
raxStart(&ri_cgroups, s->cgroups);
raxSeek(&ri_cgroups, "^", NULL, 0);
while (raxNext(&ri_cgroups)) {
streamCG *cg = ri_cgroups.data;
streamCG *new_cg = streamCreateCG(new_s, (char *)ri_cgroups.key,
ri_cgroups.key_len, &cg->last_id,
cg->entries_read);
serverAssert(new_cg != NULL);
/* Consumer Group PEL */
raxIterator ri_cg_pel;
raxStart(&ri_cg_pel,cg->pel);
raxSeek(&ri_cg_pel,"^",NULL,0);
while(raxNext(&ri_cg_pel)){
streamNACK *nack = ri_cg_pel.data;
streamNACK *new_nack = streamCreateNACK(NULL);
new_nack->delivery_time = nack->delivery_time;
new_nack->delivery_count = nack->delivery_count;
raxInsert(new_cg->pel, ri_cg_pel.key, sizeof(streamID), new_nack, NULL);
}
raxStop(&ri_cg_pel);
/* Consumers */
raxIterator ri_consumers;
raxStart(&ri_consumers, cg->consumers);
raxSeek(&ri_consumers, "^", NULL, 0);
while (raxNext(&ri_consumers)) {
streamConsumer *consumer = ri_consumers.data;
streamConsumer *new_consumer;
new_consumer = zmalloc(sizeof(*new_consumer));
new_consumer->name = sdsdup(consumer->name);
new_consumer->pel = raxNew();
raxInsert(new_cg->consumers,(unsigned char *)new_consumer->name,
sdslen(new_consumer->name), new_consumer, NULL);
new_consumer->seen_time = consumer->seen_time;
/* Consumer PEL */
raxIterator ri_cpel;
raxStart(&ri_cpel, consumer->pel);
raxSeek(&ri_cpel, "^", NULL, 0);
while (raxNext(&ri_cpel)) {
streamNACK *new_nack = raxFind(new_cg->pel,ri_cpel.key,sizeof(streamID));
serverAssert(new_nack != raxNotFound);
new_nack->consumer = new_consumer;
raxInsert(new_consumer->pel,ri_cpel.key,sizeof(streamID),new_nack,NULL);
}
raxStop(&ri_cpel);
}
raxStop(&ri_consumers);
}
raxStop(&ri_cgroups);
return sobj;
}
/* This is a wrapper function for lpGet() to directly get an integer value
* from the listpack (that may store numbers as a string), converting
* the string if needed.
@ -413,266 +299,6 @@ void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_i
}
/* Adds a new item into the stream 's' having the specified number of
* field-value pairs as specified in 'numfields' and stored into 'argv'.
* Returns the new entry ID populating the 'added_id' structure.
*
* If 'use_id' is not NULL, the ID is not auto-generated by the function,
* but instead the passed ID is used to add the new entry. In this case
* adding the entry may fail as specified later in this comment.
*
* When 'use_id' is used alongside with a zero 'seq-given', the sequence
* part of the passed ID is ignored and the function will attempt to use an
* auto-generated sequence.
*
* The function returns C_OK if the item was added, this is always true
* if the ID was generated by the function. However the function may return
* C_ERR in several cases:
* 1. If an ID was given via 'use_id', but adding it failed since the
* current top ID is greater or equal. errno will be set to EDOM.
* 2. If a size of a single element or the sum of the elements is too big to
* be stored into the stream. errno will be set to ERANGE. */
int streamAppendItem(stream *s, robj **argv, int64_t numfields, streamID *added_id, streamID *use_id, int seq_given) {
/* Generate the new entry ID. */
streamID id;
if (use_id) {
if (seq_given) {
id = *use_id;
} else {
/* The automatically generated sequence can be either zero (new
* timestamps) or the incremented sequence of the last ID. In the
* latter case, we need to prevent an overflow/advancing forward
* in time. */
if (s->last_id.ms == use_id->ms) {
if (s->last_id.seq == UINT64_MAX) {
return C_ERR;
}
id = s->last_id;
id.seq++;
} else {
id = *use_id;
}
}
} else {
streamNextID(&s->last_id,&id);
}
/* Check that the new ID is greater than the last entry ID
* or return an error. Automatically generated IDs might
* overflow (and wrap-around) when incrementing the sequence
part. */
if (streamCompareID(&id,&s->last_id) <= 0) {
errno = EDOM;
return C_ERR;
}
/* Avoid overflow when trying to add an element to the stream (listpack
* can only host up to 32bit length sttrings, and also a total listpack size
* can't be bigger than 32bit length. */
size_t totelelen = 0;
for (int64_t i = 0; i < numfields*2; i++) {
sds ele = argv[i]->ptr;
totelelen += sdslen(ele);
}
if (totelelen > STREAM_LISTPACK_MAX_SIZE) {
errno = ERANGE;
return C_ERR;
}
/* Add the new entry. */
raxIterator ri;
raxStart(&ri,s->rax_tree);
raxSeek(&ri,"$",NULL,0);
size_t lp_bytes = 0; /* Total bytes in the tail listpack. */
unsigned char *lp = NULL; /* Tail listpack pointer. */
if (!raxEOF(&ri)) {
/* Get a reference to the tail node listpack. */
lp = ri.data;
lp_bytes = lpBytes(lp);
}
raxStop(&ri);
/* We have to add the key into the radix tree in lexicographic order,
* to do so we consider the ID as a single 128 bit number written in
* big endian, so that the most significant bytes are the first ones. */
uint64_t rax_key[2]; /* Key in the radix tree containing the listpack.*/
streamID master_id; /* ID of the master entry in the listpack. */
/* Create a new listpack and radix tree node if needed. Note that when
* a new listpack is created, we populate it with a "master entry". This
* is just a set of fields that is taken as references in order to compress
* the stream entries that we'll add inside the listpack.
*
* Note that while we use the first added entry fields to create
* the master entry, the first added entry is NOT represented in the master
* entry, which is a stand alone object. But of course, the first entry
* will compress well because it's used as reference.
*
* The master entry is composed like in the following example:
*
* +-------+---------+------------+---------+--/--+---------+---------+-+
* | count | deleted | num-fields | field_1 | field_2 | ... | field_N |0|
* +-------+---------+------------+---------+--/--+---------+---------+-+
*
* count and deleted just represent respectively the total number of
* entries inside the listpack that are valid, and marked as deleted
* (deleted flag in the entry flags set). So the total number of items
* actually inside the listpack (both deleted and not) is count+deleted.
*
* The real entries will be encoded with an ID that is just the
* millisecond and sequence difference compared to the key stored at
* the radix tree node containing the listpack (delta encoding), and
* if the fields of the entry are the same as the master entry fields, the
* entry flags will specify this fact and the entry fields and number
* of fields will be omitted (see later in the code of this function).
*
* The "0" entry at the end is the same as the 'lp-count' entry in the
* regular stream entries (see below), and marks the fact that there are
* no more entries, when we scan the stream from right to left. */
/* First of all, check if we can append to the current macro node or
* if we need to switch to the next one. 'lp' will be set to NULL if
* the current node is full. */
if (lp != NULL) {
size_t node_max_bytes = server.stream_node_max_bytes;
if (node_max_bytes == 0 || node_max_bytes > STREAM_LISTPACK_MAX_SIZE)
node_max_bytes = STREAM_LISTPACK_MAX_SIZE;
if (lp_bytes + totelelen >= node_max_bytes) {
lp = NULL;
} else if (server.stream_node_max_entries) {
unsigned char *lp_ele = lpFirst(lp);
/* Count both live entries and deleted ones. */
int64_t count = lpGetInteger(lp_ele) + lpGetInteger(lpNext(lp,lp_ele));
if (count >= server.stream_node_max_entries) {
/* Shrink extra pre-allocated memory */
lp = lpShrinkToFit(lp);
if (ri.data != lp)
raxInsert(s->rax_tree,ri.key,ri.key_len,lp,NULL);
lp = NULL;
}
}
}
int flags = STREAM_ITEM_FLAG_NONE;
if (lp == NULL) {
master_id = id;
streamEncodeID(rax_key,&id);
/* Create the listpack having the master entry ID and fields.
* Pre-allocate some bytes when creating listpack to avoid realloc on
* every XADD. Since listpack.c uses malloc_size, it'll grow in steps,
* and won't realloc on every XADD.
* When listpack reaches max number of entries, we'll shrink the
* allocation to fit the data. */
size_t prealloc = STREAM_LISTPACK_MAX_PRE_ALLOCATE;
if (server.stream_node_max_bytes > 0 && server.stream_node_max_bytes < prealloc) {
prealloc = server.stream_node_max_bytes;
}
lp = lpNew(prealloc);
lp = lpAppendInteger(lp,1); /* One item, the one we are adding. */
lp = lpAppendInteger(lp,0); /* Zero deleted so far. */
lp = lpAppendInteger(lp,numfields);
for (int64_t i = 0; i < numfields; i++) {
sds field = argv[i*2]->ptr;
lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
}
lp = lpAppendInteger(lp,0); /* Master entry zero terminator. */
raxInsert(s->rax_tree,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
/* The first entry we insert, has obviously the same fields of the
* master entry. */
flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
} else {
serverAssert(ri.key_len == sizeof(rax_key));
memcpy(rax_key,ri.key,sizeof(rax_key));
/* Read the master ID from the radix tree key. */
streamDecodeID(rax_key,&master_id);
unsigned char *lp_ele = lpFirst(lp);
/* Update count and skip the deleted fields. */
int64_t count = lpGetInteger(lp_ele);
lp = lpReplaceInteger(lp,&lp_ele,count+1);
lp_ele = lpNext(lp,lp_ele); /* seek deleted. */
lp_ele = lpNext(lp,lp_ele); /* seek master entry num fields. */
/* Check if the entry we are adding, have the same fields
* as the master entry. */
int64_t master_fields_count = lpGetInteger(lp_ele);
lp_ele = lpNext(lp,lp_ele);
if (numfields == master_fields_count) {
int64_t i;
for (i = 0; i < master_fields_count; i++) {
sds field = argv[i*2]->ptr;
int64_t e_len;
unsigned char buf[LP_INTBUF_SIZE];
unsigned char *e = lpGet(lp_ele,&e_len,buf);
/* Stop if there is a mismatch. */
if (sdslen(field) != (size_t)e_len ||
memcmp(e,field,e_len) != 0) break;
lp_ele = lpNext(lp,lp_ele);
}
/* All fields are the same! We can compress the field names
* setting a single bit in the flags. */
if (i == master_fields_count) flags |= STREAM_ITEM_FLAG_SAMEFIELDS;
}
}
/* Populate the listpack with the new entry. We use the following
* encoding:
*
* +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
* |flags|entry-id|num-fields|field-1|value-1|...|field-N|value-N|lp-count|
* +-----+--------+----------+-------+-------+-/-+-------+-------+--------+
*
* However if the SAMEFIELD flag is set, we have just to populate
* the entry with the values, so it becomes:
*
* +-----+--------+-------+-/-+-------+--------+
* |flags|entry-id|value-1|...|value-N|lp-count|
* +-----+--------+-------+-/-+-------+--------+
*
* The entry-id field is actually two separated fields: the ms
* and seq difference compared to the master entry.
*
* The lp-count field is a number that states the number of listpack pieces
* that compose the entry, so that it's possible to travel the entry
* in reverse order: we can just start from the end of the listpack, read
* the entry, and jump back N times to seek the "flags" field to read
* the stream full entry. */
lp = lpAppendInteger(lp,flags);
lp = lpAppendInteger(lp,id.ms - master_id.ms);
lp = lpAppendInteger(lp,id.seq - master_id.seq);
if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
lp = lpAppendInteger(lp,numfields);
for (int64_t i = 0; i < numfields; i++) {
sds field = argv[i*2]->ptr, value = argv[i*2+1]->ptr;
if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS))
lp = lpAppend(lp,(unsigned char*)field,sdslen(field));
lp = lpAppend(lp,(unsigned char*)value,sdslen(value));
}
/* Compute and store the lp-count field. */
int64_t lp_count = numfields;
lp_count += 3; /* Add the 3 fixed fields flags + ms-diff + seq-diff. */
if (!(flags & STREAM_ITEM_FLAG_SAMEFIELDS)) {
/* If the item is not compressed, it also has the fields other than
* the values, and an additional num-fields field. */
lp_count += numfields+1;
}
lp = lpAppendInteger(lp,lp_count);
/* Insert back into the tree in order to update the listpack pointer. */
if (ri.data != lp)
raxInsert(s->rax_tree,(unsigned char*)&rax_key,sizeof(rax_key),lp,NULL);
s->length++;
s->entries_added++;
s->last_id = id;
if (s->length == 1) s->first_id = id;
if (added_id) *added_id = id;
return C_OK;
}
/* Trim the stream 's' according to args->trim_strategy, and return the
* number of elements removed from the stream. The 'approx' option, if non-zero,
* specifies that the trimming must be performed in a approximated way in
@ -1372,7 +998,6 @@ void setDeferredReplyStreamID(client *c, void *dr, streamID *id) {
setDeferredReplyBulkSds(c, dr, replyid);
}
#endif
/* Similar to the above function, but just creates an object, usually useful
* for replication purposes to create arguments. */
@ -1380,6 +1005,7 @@ robj *createObjectFromStreamID(streamID *id) {
return createObject(OBJ_STRING, sdscatfmt(sdsempty(),"%U-%U",
id->ms,id->seq));
}
#endif
/* Returns non-zero if the ID is 0-0. */
int streamIDEqZero(streamID *id) {

View file

@ -60,11 +60,9 @@
#include <stdlib.h>
#include <string.h>
#include "dict.h"
#include "listpack.h"
#include "redis_aux.h"
#include "sds.h"
#include "object.h"
#include "zmalloc.h"
#include "zset.h"
#include "util.h"
@ -393,6 +391,7 @@ zskiplistNode *zslLastInRange(zskiplist *zsl, const zrangespec *range) {
return x;
}
#if 0
/* Delete all the elements with score between min and max from the skiplist.
* Both min and max can be inclusive or exclusive (see range->minex and
* range->maxex). When inclusive a score >= min && score <= max is deleted.
@ -484,6 +483,7 @@ unsigned long zslDeleteRangeByRank(zskiplist *zsl, unsigned int start, unsigned
}
return removed;
}
#endif
/* Find the rank for an element by both score and key.
* Returns 0 when the element cannot be found, rank otherwise.
@ -534,46 +534,6 @@ zskiplistNode* zslGetElementByRank(zskiplist *zsl, unsigned long rank) {
/* ------------------------ Lexicographic ranges ---------------------------- */
/* Parse max or min argument of ZRANGEBYLEX.
* (foo means foo (open interval)
* [foo means foo (closed interval)
* - means the min string possible
* + means the max string possible
*
* If the string is valid the *dest pointer is set to the redis object
* that will be used for the comparison, and ex will be set to 0 or 1
* respectively if the item is exclusive or inclusive. C_OK will be
* returned.
*
* If the string is not a valid range C_ERR is returned, and the value
* of *dest and *ex is undefined. */
int zslParseLexRangeItem(robj *item, sds *dest, int *ex) {
char *c = item->ptr;
switch(c[0]) {
case '+':
if (c[1] != '\0') return C_ERR;
*ex = 1;
*dest = cmaxstring;
return C_OK;
case '-':
if (c[1] != '\0') return C_ERR;
*ex = 1;
*dest = cminstring;
return C_OK;
case '(':
*ex = 1;
*dest = sdsnewlen(c+1,sdslen(c)-1);
return C_OK;
case '[':
*ex = 0;
*dest = sdsnewlen(c+1,sdslen(c)-1);
return C_OK;
default:
return C_ERR;
}
}
/* Free a lex range structure, must be called only after zslParseLexRange()
* populated the structure with success (C_OK returned). */
void zslFreeLexRange(zlexrangespec *spec) {
@ -583,27 +543,6 @@ void zslFreeLexRange(zlexrangespec *spec) {
spec->max != cmaxstring) sdsfree(spec->max);
}
/* Populate the lex rangespec according to the objects min and max.
*
* Return C_OK on success. On error C_ERR is returned.
* When OK is returned the structure must be freed with zslFreeLexRange(),
* otherwise no release is needed. */
int zslParseLexRange(robj *min, robj *max, zlexrangespec *spec) {
/* The range can't be valid if objects are integer encoded.
* Every item must start with ( or [. */
if (min->encoding == OBJ_ENCODING_INT ||
max->encoding == OBJ_ENCODING_INT) return C_ERR;
spec->min = spec->max = NULL;
if (zslParseLexRangeItem(min, &spec->min, &spec->minex) == C_ERR ||
zslParseLexRangeItem(max, &spec->max, &spec->maxex) == C_ERR) {
zslFreeLexRange(spec);
return C_ERR;
} else {
return C_OK;
}
}
/* This is just a wrapper to sdscmp() that is able to
* handle shared.minstring and shared.maxstring as the equivalent of
* -inf and +inf for strings */

View file

@ -710,33 +710,7 @@ sds getAbsolutePath(char *filename) {
return abspath;
}
/*
* Gets the proper timezone in a more portable fashion
* i.e timezone variables are linux specific.
*/
long getTimeZone(void) {
#if defined(__linux__) || defined(__sun)
return timezone;
#else
struct timeval tv;
struct timezone tz;
gettimeofday(&tv, &tz);
return tz.tz_minuteswest * 60L;
#endif
}
/* Return true if the specified path is just a file basename without any
* relative or absolute path. This function just checks that no / or \
* character exists inside the specified path, that's enough in the
* environments where Redis runs. */
int pathIsBaseName(char *path) {
return strchr(path,'/') == NULL && strchr(path,'\\') == NULL;
}
#endif
/* Return the UNIX time in microseconds */
long long ustime(void) {
@ -749,30 +723,6 @@ long long ustime(void) {
return ust;
}
/* This function is used to obtain the current LRU clock.
* If the current resolution is lower than the frequency we refresh the
* LRU clock (as it should be in production servers) we return the
* precomputed value, otherwise we need to resort to a system call. */
unsigned int LRU_CLOCK(void) {
unsigned int lruclock;
if (0 /* 1000/server.hz <= LRU_CLOCK_RESOLUTION*/) {
// NOTE(roman): server.lruclock is just an optimization
// lruclock = server.lruclock;
} else {
lruclock = getLRUClock();
}
return lruclock;
}
/* Positive input is sleep time in microseconds. Negative input is fractions
* of microseconds, i.e. -10 means 100 nanoseconds. */
void debugDelay(int usec) {
/* Since even the shortest sleep results in context switch and system call,
* the way we achieve short sleeps is by statistically sleeping less often. */
if (usec < 0) usec = (rand() % -usec) == 0 ? 1: 0;
if (usec) usleep(usec);
}
#ifdef REDIS_TEST
#include <assert.h>

View file

@ -40,6 +40,10 @@
* This should be the size of the buffer given to ld2string */
#define MAX_LONG_DOUBLE_CHARS 5*1024
/* Error codes */
#define C_OK 0
#define C_ERR -1
/* long double to string conversion options */
typedef enum {
LD_STR_AUTO, /* %.17Lg */
@ -63,9 +67,6 @@ int string2ld(const char *s, size_t slen, long double *dp);
int d2string(char *buf, size_t len, double value);
int ld2string(char *buf, size_t len, long double value, ld2string_mode mode);
long getTimeZone(void);
int pathIsBaseName(char *path);
#define LOG_MAX_LEN 1024 /* Default maximum length of syslog messages.*/
/* Log levels */
@ -83,16 +84,11 @@ int pathIsBaseName(char *path);
void serverLog(int level, const char *fmt, ...);
void _serverPanic(const char *file, int line, const char *msg, ...);
void _serverAssert(const char *estr, const char *file, int line);
void serverLogHexDump(int level, char *descr, void *value, size_t len);
#define serverPanic(...) _serverPanic(__FILE__,__LINE__,__VA_ARGS__),_exit(1)
#define serverAssert(_e) ((_e)?(void)0 : (_serverAssert(#_e,__FILE__,__LINE__),_exit(1)))
extern int verbosity;
typedef long long mstime_t; /* millisecond time type. */
unsigned int LRU_CLOCK(void);
void debugDelay(int usec);
long long ustime(void);
/* Return the current time in minutes, just taking the least significant
@ -122,16 +118,5 @@ static inline mstime_t mstime(void) {
return ustime()/1000;
}
/* Return the LRU clock, based on the clock resolution. This is a time
* in a reduced-bits format that can be used to set and check the
* object->lru field of redisObject structures. */
static inline unsigned int getLRUClock(void) {
int64_t t = time(NULL);
return t & LRU_CLOCK_MAX;
}
#ifdef REDIS_TEST
int utilTest(int argc, char **argv, int accurate);
#endif
#endif

View file

@ -95,4 +95,7 @@ unsigned char *zzlDeleteRangeByLex(unsigned char *zl, const zlexrangespec *range
extern sds cmaxstring;
extern sds cminstring;
#define ZSKIPLIST_MAXLEVEL 32 /* Should be enough for 2^64 elements */
#define ZSKIPLIST_P 0.25 /* Skiplist P = 1/4 */
#endif

View file

@ -6,16 +6,11 @@
#include <bitset>
#include "base/expected.hpp"
#include "facade/op_status.h"
extern "C" {
#include "redis/object.h"
}
#include "absl/strings/match.h"
#include "base/expected.hpp"
#include "base/logging.h"
#include "facade/cmd_arg_parser.h"
#include "facade/op_status.h"
#include "server/acl/acl_commands_def.h"
#include "server/command_registry.h"
#include "server/common.h"

View file

@ -8,10 +8,6 @@
#include <boost/smart_ptr/intrusive_ptr.hpp>
extern "C" {
#include "redis/object.h"
}
#include "base/logging.h"
#include "server/engine_shard_set.h"
#include "server/transaction.h"

View file

@ -11,7 +11,6 @@
#include <system_error>
extern "C" {
#include "redis/object.h"
#include "redis/rdb.h"
#include "redis/util.h"
}

View file

@ -16,7 +16,6 @@
extern "C" {
#include "redis/intset.h"
#include "redis/listpack.h"
#include "redis/object.h"
#include "redis/redis_aux.h"
#include "redis/util.h"
#include "redis/zset.h"

View file

@ -9,7 +9,6 @@
extern "C" {
#include "redis/listpack.h"
#include "redis/object.h"
#include "redis/quicklist.h"
}

View file

@ -4,10 +4,6 @@
#include "server/db_slice.h"
extern "C" {
#include "redis/object.h"
}
#include <absl/cleanup/cleanup.h>
#include "base/flags.h"

View file

@ -3,6 +3,10 @@
//
#include "server/debugcmd.h"
extern "C" {
#include "redis/redis_aux.h"
}
#include <absl/cleanup/cleanup.h>
#include <absl/random/random.h>
#include <absl/strings/str_cat.h>

View file

@ -7,7 +7,6 @@
#include <absl/strings/match.h>
extern "C" {
#include "redis/object.h"
#include "redis/zmalloc.h"
}
#include <sys/statvfs.h>

View file

@ -6,7 +6,6 @@
extern "C" {
#include "redis/crc64.h"
#include "redis/object.h"
#include "redis/util.h"
}

View file

@ -6,7 +6,6 @@
extern "C" {
#include "redis/listpack.h"
#include "redis/object.h"
#include "redis/redis_aux.h"
#include "redis/util.h"
#include "redis/zmalloc.h"

View file

@ -6,7 +6,6 @@
extern "C" {
#include "redis/listpack.h"
#include "redis/object.h"
#include "redis/sds.h"
}

View file

@ -4,12 +4,6 @@
#include "server/json_family.h"
#include "facade/op_status.h"
extern "C" {
#include "redis/object.h"
}
#include <absl/strings/match.h>
#include <absl/strings/str_join.h>
#include <absl/strings/str_split.h>
@ -25,6 +19,7 @@ extern "C" {
#include "core/json/json_object.h"
#include "core/json/jsonpath_grammar.hh"
#include "facade/cmd_arg_parser.h"
#include "facade/op_status.h"
#include "server/acl/acl_commands_def.h"
#include "server/command_registry.h"
#include "server/error.h"

View file

@ -6,7 +6,6 @@
#include "server/acl/acl_commands_def.h"
extern "C" {
#include "redis/object.h"
#include "redis/sds.h"
}

View file

@ -7,11 +7,10 @@
#include "absl/strings/escaping.h"
extern "C" {
#include "redis/intset.h"
#include "redis/listpack.h"
#include "redis/lzfP.h" /* LZF compression library */
#include "redis/rdb.h"
#include "redis/quicklist.h"
#include "redis/stream.h"
#include "redis/util.h"
#include "redis/ziplist.h"

View file

@ -6,14 +6,13 @@
#include <system_error>
extern "C" {
#include "redis/object.h"
#include "redis/rdb.h"
}
#include "base/io_buf.h"
#include "base/mpsc_intrusive_queue.h"
#include "base/pod_array.h"
#include "io/io.h"
#include "redis/rdb.h"
#include "server/common.h"
#include "server/journal/serializer.h"

View file

@ -16,6 +16,7 @@ extern "C" {
#include "redis/crc64.h"
#include "redis/intset.h"
#include "redis/listpack.h"
#include "redis/quicklist.h"
#include "redis/rdb.h"
#include "redis/stream.h"
#include "redis/util.h"
@ -371,7 +372,7 @@ error_code RdbSerializer::SaveObject(const PrimeValue& pv) {
CHECK_NE(obj_type, OBJ_STRING);
if (obj_type == OBJ_LIST) {
return SaveListObject(pv.AsRObj());
return SaveListObject(pv);
}
if (obj_type == OBJ_SET) {
@ -387,7 +388,7 @@ error_code RdbSerializer::SaveObject(const PrimeValue& pv) {
}
if (obj_type == OBJ_STREAM) {
return SaveStreamObject(pv.AsRObj());
return SaveStreamObject(pv);
}
if (obj_type == OBJ_JSON) {
@ -398,10 +399,10 @@ error_code RdbSerializer::SaveObject(const PrimeValue& pv) {
return make_error_code(errc::function_not_supported);
}
error_code RdbSerializer::SaveListObject(const robj* obj) {
error_code RdbSerializer::SaveListObject(const PrimeValue& pv) {
/* Save a list value */
DCHECK_EQ(OBJ_ENCODING_QUICKLIST, obj->encoding);
const quicklist* ql = reinterpret_cast<const quicklist*>(obj->ptr);
DCHECK_EQ(OBJ_ENCODING_QUICKLIST, pv.Encoding());
const quicklist* ql = reinterpret_cast<const quicklist*>(pv.RObjPtr());
quicklistNode* node = ql->head;
DVLOG(2) << "Saving list of length " << ql->len;
@ -537,9 +538,9 @@ error_code RdbSerializer::SaveZSetObject(const PrimeValue& pv) {
return error_code{};
}
error_code RdbSerializer::SaveStreamObject(const robj* obj) {
error_code RdbSerializer::SaveStreamObject(const PrimeValue& pv) {
/* Store how many listpacks we have inside the radix tree. */
stream* s = (stream*)obj->ptr;
stream* s = (stream*)pv.RObjPtr();
rax* rax = s->rax_tree;
RETURN_ON_ERR(SaveLen(raxSize(rax)));

View file

@ -7,7 +7,6 @@
extern "C" {
#include "redis/lzfP.h"
#include "redis/object.h"
}
#include <optional>
@ -215,11 +214,11 @@ class RdbSerializer : public SerializerBase {
private:
std::error_code SaveObject(const PrimeValue& pv);
std::error_code SaveListObject(const robj* obj);
std::error_code SaveListObject(const PrimeValue& pv);
std::error_code SaveSetObject(const PrimeValue& pv);
std::error_code SaveHSetObject(const PrimeValue& pv);
std::error_code SaveZSetObject(const PrimeValue& pv);
std::error_code SaveStreamObject(const robj* obj);
std::error_code SaveStreamObject(const PrimeValue& obj);
std::error_code SaveJsonObject(const PrimeValue& pv);
std::error_code SaveLongLongAsString(int64_t value);

View file

@ -17,7 +17,6 @@
extern "C" {
#include "redis/listpack.h"
#include "redis/object.h"
};
namespace dfly {

View file

@ -14,10 +14,6 @@
#include "server/search/doc_accessors.h"
#include "server/server_state.h"
extern "C" {
#include "redis/object.h"
};
namespace dfly {
using namespace std;

View file

@ -6,7 +6,6 @@
extern "C" {
#include "redis/intset.h"
#include "redis/object.h"
#include "redis/redis_aux.h"
#include "redis/util.h"
}

View file

@ -4,10 +4,6 @@
#include "server/snapshot.h"
extern "C" {
#include "redis/object.h"
}
#include <absl/functional/bind_front.h>
#include <absl/strings/match.h>
#include <absl/strings/str_cat.h>

View file

@ -7,7 +7,6 @@
#include <absl/strings/str_cat.h>
extern "C" {
#include "redis/object.h"
#include "redis/stream.h"
#include "redis/zmalloc.h"
}

View file

@ -4,10 +4,6 @@
#include "server/string_family.h"
extern "C" {
#include "redis/object.h"
}
#include <absl/container/inlined_vector.h>
#include <algorithm>

View file

@ -4,10 +4,6 @@
#include "server/tiered_storage.h"
extern "C" {
#include "redis/object.h"
}
#include <mimalloc.h>
#include "absl/cleanup/cleanup.h"

View file

@ -11,7 +11,6 @@ extern "C" {
#include "redis/geohash.h"
#include "redis/geohash_helper.h"
#include "redis/listpack.h"
#include "redis/object.h"
#include "redis/redis_aux.h"
#include "redis/util.h"
#include "redis/zmalloc.h"
@ -1561,11 +1560,10 @@ OpResult<unsigned> OpLexCount(const OpArgs& op_args, string_view key,
/* Use the first element in range as the starting point */
eptr = zzlFirstInLexRange(zl, &range);
/* No "first" element */
if (eptr) {
/* First element is in range */
sptr = lpNext(zl, eptr);
serverAssertWithInfo(c, robj_wrapper, zzlLexValueLteMax(eptr, &range));
DCHECK(zzlLexValueLteMax(eptr, &range));
/* Iterate over elements in range */
while (eptr) {

View file

@ -222,6 +222,7 @@ TEST_F(ZSetFamilyTest, ByLex) {
EXPECT_THAT(resp.GetVec(), ElementsAre("alpha", "bar", "cool"));
EXPECT_EQ(3, CheckedInt({"ZLEXCOUNT", "key", "(foo", "+"}));
EXPECT_EQ(0, CheckedInt({"ZLEXCOUNT", "key", "(foo", "[fop"}));
EXPECT_EQ(3, CheckedInt({"ZREMRANGEBYLEX", "key", "(foo", "+"}));
resp = Run({"zrangebylex", "key", "[a", "+"});