diff --git a/src/redis/object.c b/src/redis/object.c index 4b70097b1..45ae81acf 100644 --- a/src/redis/object.c +++ b/src/redis/object.c @@ -369,11 +369,12 @@ void freeModuleObject(robj *o) { zfree(mv); } -void freeStreamObject(robj *o) { - freeStream(o->ptr); -} #endif +void freeStreamObject(robj *o) { + freeStream(o->ptr); +} + void incrRefCount(robj *o) { if (o->refcount < OBJ_FIRST_SPECIAL_REFCOUNT) { o->refcount++; @@ -399,8 +400,7 @@ void decrRefCount(robj *o) { // freeModuleObject(o); break; case OBJ_STREAM: - serverPanic("Unsupported OBJ_STREAM type"); - //freeStreamObject(o); + freeStreamObject(o); break; default: serverPanic("Unknown object type"); break; } @@ -658,7 +658,7 @@ size_t stringObjectLen(robj *o) { } } -// ROMAN: Copied from the DISABLED part below +// ROMAN: Copied from the DISABLED part below int getLongLongFromObject(robj *o, long long *target) { long long value; @@ -850,7 +850,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) { } } else if (o->type == OBJ_STREAM) { serverPanic("OBJ_STREAM not supported"); -#ifdef ROMAN_CLIENT_DISABLE +#ifdef ROMAN_CLIENT_DISABLE stream *s = o->ptr; asize = sizeof(*o)+sizeof(*s); asize += streamRadixTreeMemoryUsage(s->rax); @@ -911,10 +911,10 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) { } raxStop(&ri); } -#endif +#endif } else if (o->type == OBJ_MODULE) { serverPanic("OBJ_MODULE not supported"); -#ifdef ROMAN_CLIENT_DISABLE +#ifdef ROMAN_CLIENT_DISABLE moduleValue *mv = o->ptr; moduleType *mt = mv->type; if (mt->mem_usage != NULL) { @@ -922,7 +922,7 @@ size_t objectComputeSize(robj *key, robj *o, size_t sample_size, int dbid) { } else { asize = 0; } -#endif +#endif } else { serverPanic("Unknown object type"); } @@ -1175,7 +1175,7 @@ sds getMemoryDoctorReport(void) { return s; } -#endif +#endif /* Set the object LRU/LFU depending on server.maxmemory_policy. * The lfu_freq arg is only relevant if policy is MAXMEMORY_FLAG_LFU. diff --git a/src/redis/t_stream.c b/src/redis/t_stream.c index c55eb5e03..9ca2577af 100644 --- a/src/redis/t_stream.c +++ b/src/redis/t_stream.c @@ -66,7 +66,7 @@ void streamFreeNACK(streamNACK *na); size_t streamReplyWithRangeFromConsumerPEL(client *c, stream *s, streamID *start, streamID *end, size_t count, streamConsumer *consumer); int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq, int *seq_given); int streamParseIDOrReply(client *c, robj *o, streamID *id, uint64_t missing_seq); -#endif +#endif /* ----------------------------------------------------------------------- * Low level stream encoding: a radix tree of listpacks. @@ -420,7 +420,7 @@ void streamGetEdgeID(stream *s, int first, int skip_tombstones, streamID *edge_i * If 'use_id' is not NULL, the ID is not auto-generated by the function, * but instead the passed ID is used to add the new entry. In this case * adding the entry may fail as specified later in this comment. - * + * * When 'use_id' is used alongside with a zero 'seq-given', the sequence * part of the passed ID is ignored and the function will attempt to use an * auto-generated sequence. @@ -966,7 +966,7 @@ static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, i if (streamParseStrictIDOrReply(c,c->argv[i+1],&args->minid,0,NULL) != C_OK) return -1; - + i++; args->trim_strategy = TRIM_STRATEGY_MINID; args->trim_strategy_arg_idx = i; @@ -1026,9 +1026,9 @@ static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, i } else { /* User didn't provide LIMIT, we must set it. */ if (args->approx_trim) { - /* In order to prevent from trimming to do too much work and + /* In order to prevent from trimming to do too much work and * cause latency spikes we limit the amount of work it can do. - * We have to cap args->limit from both sides in case + * We have to cap args->limit from both sides in case * stream_node_max_entries is 0 or too big (could cause overflow) */ args->limit = 100 * server.stream_node_max_entries; /* Maximum 100 rax nodes. */ @@ -1044,7 +1044,7 @@ static int streamParseAddOrTrimArgsOrReply(client *c, streamAddTrimArgs *args, i return i; } -#endif +#endif /* Initialize the stream iterator, so that we can call iterating functions * to get the next items. This requires a corresponding streamIteratorStop() @@ -1382,7 +1382,7 @@ void streamLastValidID(stream *s, streamID *maxid) streamIteratorStop(&si); } -#if ROMAN_ENABLE +#if ROMAN_ENABLE /* Emit a reply in the client output buffer by formatting a Stream ID * in the standard - format, using the simple string protocol * of REPL. */ @@ -1396,7 +1396,7 @@ void setDeferredReplyStreamID(client *c, void *dr, streamID *id) { setDeferredReplyBulkSds(c, dr, replyid); } -#endif +#endif /* Similar to the above function, but just creates an object, usually useful * for replication purposes to create arguments. */ @@ -1453,7 +1453,7 @@ int streamRangeHasTombstones(stream *s, streamID *start, streamID *end) { return 0; } -#if ROMAN_ENABLE +#if ROMAN_ENABLE /* Replies with a consumer group's current lag, that is the number of messages * in the stream that are yet to be delivered. In case that the lag isn't * available due to fragmentation, the reply to the client is a null. */ @@ -1490,7 +1490,7 @@ void streamReplyWithCGLag(client *c, stream *s, streamCG *cg) { /* This function returns a value that is the ID's logical read counter, or its * distance (the number of entries) from the first entry ever to have been added * to the stream. - * + * * A counter is returned only in one of the following cases: * 1. The ID is the same as the stream's last ID. In this case, the returned * is the same as the stream's entries_added counter. @@ -1624,7 +1624,7 @@ void streamPropagateConsumerCreation(client *c, robj *key, robj *groupname, sds decrRefCount(argv[4]); } -#endif +#endif /* Send the stream items in the specified range to the client 'c'. The range * the client will receive is between start and end inclusive, if 'count' is @@ -1706,7 +1706,7 @@ size_t streamReplyWithRange(client *c, stream *s, streamID *start, streamID *end /* Update the group last_id if needed. */ if (group && streamCompareID(&id,&group->last_id) > 0) { if (group->entries_read != SCG_INVALID_ENTRIES_READ && !streamRangeHasTombstones(s,&id,NULL)) { - /* A valid counter and no future tombstones mean we can + /* A valid counter and no future tombstones mean we can * increment the read counter to keep tracking the group's * progress. */ group->entries_read++; @@ -1882,7 +1882,7 @@ robj *streamTypeLookupWriteOrCreate(client *c, robj *key, int no_create) { * to be autogenerated. When a non-NULL 'seq_given' argument is provided, this * form is accepted and the argument is set to 0 unless the sequence part is * specified. - * + * * If 'c' is set to NULL, no reply is sent to the client. */ int streamGenericParseIDOrReply(client *c, const robj *o, streamID *id, uint64_t missing_seq, int strict, int *seq_given) { char buf[128]; @@ -1954,7 +1954,7 @@ int streamParseStrictIDOrReply(client *c, robj *o, streamID *id, uint64_t missin /* Helper for parsing a stream ID that is a range query interval. When the * exclude argument is NULL, streamParseIDOrReply() is called and the interval - * is treated as close (inclusive). Otherwise, the exclude argument is set if + * is treated as close (inclusive). Otherwise, the exclude argument is set if * the interval is open (the "(" prefix) and streamParseStrictIDOrReply() is * called in that case. */ @@ -1962,13 +1962,13 @@ int streamParseIntervalIDOrReply(client *c, robj *o, streamID *id, int *exclude, char *p = o->ptr; size_t len = sdslen(p); int invalid = 0; - + if (exclude != NULL) *exclude = (len > 1 && p[0] == '('); if (exclude != NULL && *exclude) { robj *t = createStringObject(p+1,len-1); invalid = (streamParseStrictIDOrReply(c,t,id,missing_seq,NULL) == C_ERR); decrRefCount(t); - } else + } else invalid = (streamParseIDOrReply(c,o,id,missing_seq) == C_ERR); if (invalid) return C_ERR; @@ -2095,7 +2095,7 @@ void xrangeGenericCommand(client *c, int rev) { robj *startarg = rev ? c->argv[3] : c->argv[2]; robj *endarg = rev ? c->argv[2] : c->argv[3]; int startex = 0, endex = 0; - + /* Parse start and end IDs. */ if (streamParseIntervalIDOrReply(c,startarg,&startid,&startex,0) != C_OK) return; @@ -2447,7 +2447,7 @@ cleanup: /* Cleanup. */ zfree(groups); } -#endif +#endif /* ----------------------------------------------------------------------- * Low level implementation of consumer groups @@ -2515,15 +2515,12 @@ streamCG *streamLookupCG(stream *s, sds groupname) { return (cg == raxNotFound) ? NULL : cg; } -#if ROMAN_ENABLE /* Create a consumer with the specified name in the group 'cg' and return. * If the consumer exists, return NULL. As a side effect, when the consumer * is successfully created, the key space will be notified and dirty++ unless * the SCC_NO_NOTIFY or SCC_NO_DIRTIFY flags is specified. */ streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid, int flags) { if (cg == NULL) return NULL; - int notify = !(flags & SCC_NO_NOTIFY); - int dirty = !(flags & SCC_NO_DIRTIFY); streamConsumer *consumer = zmalloc(sizeof(*consumer)); int success = raxTryInsert(cg->consumers,(unsigned char*)name, sdslen(name),consumer,NULL); @@ -2534,13 +2531,11 @@ streamConsumer *streamCreateConsumer(streamCG *cg, sds name, robj *key, int dbid consumer->name = sdsdup(name); consumer->pel = raxNew(); consumer->seen_time = mstime(); - if (dirty) server.dirty++; - if (notify) notifyKeyspaceEvent(NOTIFY_STREAM,"xgroup-createconsumer",key,dbid); + return consumer; } -#endif -/* Lookup the consumer with the specified name in the group 'cg'. Its last +/* Lookup the consumer with the specified name in the group 'cg'. Its last * seen time is updated unless the SLC_NO_REFRESH flag is specified. */ streamConsumer *streamLookupConsumer(streamCG *cg, sds name, int flags) { if (cg == NULL) return NULL; @@ -3943,6 +3938,8 @@ NULL } } +#endif + /* Validate the integrity stream listpack entries structure. Both in term of a * valid listpack, but also that the structure of the entries matches a valid * stream. return 1 if valid 0 if not valid. */ @@ -4035,4 +4032,3 @@ int streamValidateListpackIntegrity(unsigned char *lp, size_t size, int deep) { return 1; } -#endif \ No newline at end of file diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 4ac3a88e6..735b245cd 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -1,7 +1,7 @@ add_executable(dragonfly dfly_main.cc) cxx_link(dragonfly base dragonfly_lib) -add_library(dragonfly_lib blocking_controller.cc channel_slice.cc command_registry.cc +add_library(dragonfly_lib blocking_controller.cc channel_slice.cc command_registry.cc common.cc config_flags.cc conn_context.cc db_slice.cc debugcmd.cc engine_shard_set.cc generic_family.cc hset_family.cc io_mgr.cc @@ -22,7 +22,8 @@ cxx_test(list_family_test dfly_test_lib LABELS DFLY) cxx_test(set_family_test dfly_test_lib LABELS DFLY) cxx_test(stream_family_test dfly_test_lib LABELS DFLY) cxx_test(string_family_test dfly_test_lib LABELS DFLY) -cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/redis6_small.rdb LABELS DFLY) +cxx_test(rdb_test dfly_test_lib DATA testdata/empty.rdb testdata/redis6_small.rdb + testdata/redis6_stream.rdb LABELS DFLY) cxx_test(zset_family_test dfly_test_lib LABELS DFLY) cxx_test(blocking_controller_test dragonfly_lib LABELS DFLY) diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 3b0ab9e29..8f446428a 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -10,6 +10,7 @@ extern "C" { #include "redis/listpack.h" #include "redis/lzfP.h" /* LZF compression library */ #include "redis/rdb.h" +#include "redis/stream.h" #include "redis/util.h" #include "redis/ziplist.h" #include "redis/zmalloc.h" @@ -834,6 +835,9 @@ io::Result RdbLoader::ReadObj(int rdbtype) { case RDB_TYPE_LIST_QUICKLIST: res_obj = ReadListQuicklist(rdbtype); break; + case RDB_TYPE_STREAM_LISTPACKS: + res_obj = ReadStreams(); + break; default: LOG(ERROR) << "Unsupported rdb type " << rdbtype; return Unexpected(errc::invalid_encoding); @@ -1264,6 +1268,179 @@ io::Result RdbLoader::ReadListQuicklist(int rdbtype) { return res; } +::io::Result RdbLoader::ReadStreams() { + uint64_t listpacks; + SET_OR_UNEXPECT(LoadLen(nullptr), listpacks); + + robj* o = createStreamObject(); + stream* s = (stream*)o->ptr; + + auto cleanup = absl::Cleanup([&] { decrRefCount(o); }); + + while (listpacks--) { + /* Get the master ID, the one we'll use as key of the radix tree + * node: the entries inside the listpack itself are delta-encoded + * relatively to this ID. */ + sds nodekey; + SET_OR_UNEXPECT(ReadKey(), nodekey); + auto cleanup2 = absl::Cleanup([&] { sdsfree(nodekey); }); + + if (sdslen(nodekey) != sizeof(streamID)) { + LOG(ERROR) << "Stream node key entry is not the size of a stream ID"; + + return Unexpected(errc::rdb_file_corrupted); + } + + /* Load the listpack. */ + OpaqueBuf fetch; + SET_OR_UNEXPECT(FetchGenericString(RDB_LOAD_PLAIN), fetch); + if (fetch.second == 0 || fetch.first == nullptr) { + LOG(ERROR) << "Stream listpacks loading failed"; + return Unexpected(errc::rdb_file_corrupted); + } + DCHECK(fetch.first); + + uint8_t* lp = (uint8_t*)fetch.first; + + if (!streamValidateListpackIntegrity(lp, fetch.second, 0)) { + zfree(lp); + LOG(ERROR) << "Stream listpack integrity check failed."; + return Unexpected(errc::rdb_file_corrupted); + } + + unsigned char* first = lpFirst(lp); + if (first == NULL) { + /* Serialized listpacks should never be empty, since on + * deletion we should remove the radix tree key if the + * resulting listpack is empty. */ + LOG(ERROR) << "Empty listpack inside stream"; + zfree(lp); + return Unexpected(errc::rdb_file_corrupted); + } + + /* Insert the key in the radix tree. */ + int retval = raxTryInsert(s->rax_tree, (unsigned char*)nodekey, sizeof(streamID), lp, NULL); + if (!retval) { + LOG(ERROR) << "Listpack re-added with existing key"; + return Unexpected(errc::duplicate_key); + } + } + + /* Load total number of items inside the stream. */ + SET_OR_UNEXPECT(LoadLen(nullptr), s->length); + + /* Load the last entry ID. */ + SET_OR_UNEXPECT(LoadLen(nullptr), s->last_id.ms); + SET_OR_UNEXPECT(LoadLen(nullptr), s->last_id.seq); + + /* Consumer groups loading */ + uint64_t cgroups_count; + SET_OR_UNEXPECT(LoadLen(nullptr), cgroups_count); + + while (cgroups_count--) { + /* Get the consumer group name and ID. We can then create the + * consumer group ASAP and populate its structure as + * we read more data. */ + streamID cg_id; + + sds cgname; + SET_OR_UNEXPECT(ReadKey(), cgname); + + auto cleanup2 = absl::Cleanup([&] { sdsfree(cgname); }); + SET_OR_UNEXPECT(LoadLen(nullptr), cg_id.ms); + SET_OR_UNEXPECT(LoadLen(nullptr), cg_id.seq); + + streamCG* cgroup = streamCreateCG(s, cgname, sdslen(cgname), &cg_id, 0); + if (cgroup == NULL) { + LOG(ERROR) << "Duplicated consumer group name " << cgname; + return Unexpected(errc::duplicate_key); + } + std::move(cleanup2).Invoke(); + // no need to free cgroup because it's attached to s. + + /* Load the global PEL for this consumer group, however we'll + * not yet populate the NACK structures with the message + * owner, since consumers for this group and their messages will + * be read as a next step. So for now leave them not resolved + * and later populate it. */ + uint64_t pel_size; + SET_OR_UNEXPECT(LoadLen(nullptr), pel_size); + + while (pel_size--) { + uint8_t rawid[sizeof(streamID)]; + error_code ec = FetchBuf(sizeof(rawid), rawid); + if (ec) { + LOG(ERROR) << "Stream PEL ID loading failed."; + return make_unexpected(ec); + } + + streamNACK* nack = streamCreateNACK(NULL); + auto cleanup2 = absl::Cleanup([&] { streamFreeNACK(nack); }); + + SET_OR_UNEXPECT(FetchInt(), nack->delivery_time); + SET_OR_UNEXPECT(LoadLen(nullptr), nack->delivery_count); + + if (!raxTryInsert(cgroup->pel, rawid, sizeof(rawid), nack, NULL)) { + LOG(ERROR) << "Duplicated global PEL entry loading stream consumer group"; + return Unexpected(errc::duplicate_key); + } + std::move(cleanup2).Cancel(); + } + + /* Now that we loaded our global PEL, we need to load the + * consumers and their local PELs. */ + uint64_t consumers_num; + SET_OR_UNEXPECT(LoadLen(nullptr), consumers_num); + + while (consumers_num--) { + sds cname; + SET_OR_UNEXPECT(ReadKey(), cname); + + streamConsumer* consumer = + streamCreateConsumer(cgroup, cname, NULL, 0, SCC_NO_NOTIFY | SCC_NO_DIRTIFY); + sdsfree(cname); + if (!consumer) { + LOG(ERROR) << "Duplicate stream consumer detected."; + return Unexpected(errc::duplicate_key); + } + // no need to free consumer because it's attached to cgroup. + + SET_OR_UNEXPECT(FetchInt(), consumer->seen_time); + + /* Load the PEL about entries owned by this specific + * consumer. */ + SET_OR_UNEXPECT(LoadLen(nullptr), pel_size); + while (pel_size--) { + unsigned char rawid[sizeof(streamID)]; + error_code ec = FetchBuf(sizeof(rawid), rawid); + if (ec) { + LOG(ERROR) << "Stream PEL ID loading failed."; + return make_unexpected(ec); + } + streamNACK* nack = (streamNACK*)raxFind(cgroup->pel, rawid, sizeof(rawid)); + if (nack == raxNotFound) { + LOG(ERROR) << "Consumer entry not found in group global PEL"; + return Unexpected(errc::rdb_file_corrupted); + } + + /* Set the NACK consumer, that was left to NULL when + * loading the global PEL. Then set the same shared + * NACK structure also in the consumer-specific PEL. */ + nack->consumer = consumer; + if (!raxTryInsert(consumer->pel, rawid, sizeof(rawid), nack, NULL)) { + LOG(ERROR) << "Duplicated consumer PEL entry loading a stream consumer group"; + streamFreeNACK(nack); + return Unexpected(errc::duplicate_key); + } + } + } // while (consumers_num) + } // while (cgroup_num) + + std::move(cleanup).Cancel(); + + return o; +} + void RdbLoader::ResizeDb(size_t key_num, size_t expire_num) { DCHECK_LT(key_num, 1U << 31); DCHECK_LT(expire_num, 1U << 31); diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index 14f5cbe89..3e4b64534 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -72,6 +72,7 @@ class RdbLoader { ::io::Result ReadZSet(int rdbtype); ::io::Result ReadZSetZL(); ::io::Result ReadListQuicklist(int rdbtype); + ::io::Result ReadStreams(); std::error_code EnsureRead(size_t min_sz) { if (mem_buf_.InputLen() >= min_sz) diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 9504d566a..4154a22f7 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -115,6 +115,17 @@ TEST_F(RdbTest, LoadSmall6) { EXPECT_THAT(resp.GetVec(), ElementsAre(IntArg(1), IntArg(1))); } +TEST_F(RdbTest, LoadStream) { + io::FileSource fs = GetSource("redis6_stream.rdb"); + RdbLoader loader(service_->script_mgr()); + + // must run in proactor thread in order to avoid polluting the serverstate + // in the main, testing thread. + auto ec = pp_->at(0)->Await([&] { return loader.Load(&fs); }); + + ASSERT_FALSE(ec) << ec.message(); +} + TEST_F(RdbTest, Reload) { absl::FlagSaver fs; diff --git a/src/server/testdata/redis6_stream.rdb b/src/server/testdata/redis6_stream.rdb new file mode 100644 index 000000000..a49c9e20e Binary files /dev/null and b/src/server/testdata/redis6_stream.rdb differ