mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
chore: refactor zset_family (#1542)
1. Remove shard-local functions from zset_family.h and move them into anonymous namespace in cc file. 2. Fix a warning in geohash.c Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
4a38fb7786
commit
d858300109
3 changed files with 376 additions and 395 deletions
|
@ -28,6 +28,9 @@
|
|||
* ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF
|
||||
* THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*/
|
||||
|
||||
#include <string.h>
|
||||
|
||||
#include "geohash.h"
|
||||
|
||||
/**
|
||||
|
@ -215,7 +218,9 @@ int geohashDecodeAreaToLongLat(const GeoHashArea *area, double *xy) {
|
|||
}
|
||||
|
||||
int geohashDecodeToLongLatType(const GeoHashBits hash, double *xy) {
|
||||
GeoHashArea area = {{0}};
|
||||
GeoHashArea area;
|
||||
memset(&area, 0, sizeof(area));
|
||||
|
||||
if (!xy || !geohashDecodeType(hash, &area))
|
||||
return 0;
|
||||
return geohashDecodeAreaToLongLat(&area, xy);
|
||||
|
|
|
@ -36,6 +36,7 @@ static const char kFloatRangeErr[] = "min or max is not a float";
|
|||
static const char kLexRangeErr[] = "min or max not valid string range item";
|
||||
|
||||
constexpr unsigned kMaxListPackValue = 64;
|
||||
using MScoreResponse = std::vector<std::optional<double>>;
|
||||
|
||||
inline zrangespec GetZrangeSpec(bool reverse, const ZSetFamily::ScoreInterval& si) {
|
||||
auto interval = si;
|
||||
|
@ -1203,6 +1204,375 @@ void BZPopMinMax(CmdArgList args, ConnectionContext* cntx, bool is_max) {
|
|||
return (*cntx)->SendNullArray();
|
||||
}
|
||||
|
||||
vector<ScoredMap> OpFetch(EngineShard* shard, Transaction* t) {
|
||||
ArgSlice keys = t->GetShardArgs(shard->shard_id());
|
||||
DVLOG(1) << "shard:" << shard->shard_id() << ", keys " << vector(keys.begin(), keys.end());
|
||||
DCHECK(!keys.empty());
|
||||
|
||||
vector<ScoredMap> results;
|
||||
results.reserve(keys.size());
|
||||
|
||||
auto& db_slice = shard->db_slice();
|
||||
for (size_t i = 0; i < keys.size(); ++i) {
|
||||
auto it = db_slice.Find(t->GetDbContext(), keys[i], OBJ_ZSET);
|
||||
if (!it) {
|
||||
results.push_back({});
|
||||
continue;
|
||||
}
|
||||
|
||||
ScoredMap sm = FromObject((*it)->second, 1);
|
||||
results.push_back(std::move(sm));
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
auto OpPopCount(const ZSetFamily::ZRangeSpec& range_spec, const OpArgs& op_args, string_view key)
|
||||
-> OpResult<ZSetFamily::ScoredArray> {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, *res_it);
|
||||
|
||||
robj* zobj = res_it.value()->second.AsRObj();
|
||||
|
||||
IntervalVisitor iv{Action::POP, range_spec.params, zobj};
|
||||
std::visit(iv, range_spec.interval);
|
||||
|
||||
res_it.value()->second.SyncRObj();
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, *res_it, key);
|
||||
|
||||
auto zlen = zsetLength(zobj);
|
||||
if (zlen == 0) {
|
||||
CHECK(op_args.shard->db_slice().Del(op_args.db_cntx.db_index, res_it.value()));
|
||||
}
|
||||
|
||||
return iv.PopResult();
|
||||
}
|
||||
|
||||
auto OpRange(const ZSetFamily::ZRangeSpec& range_spec, const OpArgs& op_args, string_view key)
|
||||
-> OpResult<ZSetFamily::ScoredArray> {
|
||||
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
robj* zobj = res_it.value()->second.AsRObj();
|
||||
IntervalVisitor iv{Action::RANGE, range_spec.params, zobj};
|
||||
|
||||
std::visit(iv, range_spec.interval);
|
||||
|
||||
return iv.PopResult();
|
||||
}
|
||||
|
||||
OpResult<unsigned> OpRemRange(const OpArgs& op_args, string_view key,
|
||||
const ZSetFamily::ZRangeSpec& range_spec) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, *res_it);
|
||||
|
||||
robj* zobj = res_it.value()->second.AsRObj();
|
||||
|
||||
IntervalVisitor iv{Action::REMOVE, range_spec.params, zobj};
|
||||
std::visit(iv, range_spec.interval);
|
||||
|
||||
res_it.value()->second.SyncRObj();
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, *res_it, key);
|
||||
|
||||
auto zlen = zsetLength(zobj);
|
||||
if (zlen == 0) {
|
||||
CHECK(op_args.shard->db_slice().Del(op_args.db_cntx.db_index, res_it.value()));
|
||||
}
|
||||
|
||||
return iv.removed();
|
||||
}
|
||||
|
||||
OpResult<unsigned> OpRank(const OpArgs& op_args, string_view key, string_view member,
|
||||
bool reverse) {
|
||||
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
robj* zobj = res_it.value()->second.AsRObj();
|
||||
op_args.shard->tmp_str1 = sdscpylen(op_args.shard->tmp_str1, member.data(), member.size());
|
||||
|
||||
long res = zsetRank(zobj, op_args.shard->tmp_str1, reverse);
|
||||
if (res < 0)
|
||||
return OpStatus::KEY_NOTFOUND;
|
||||
return res;
|
||||
}
|
||||
|
||||
OpResult<unsigned> OpCount(const OpArgs& op_args, std::string_view key,
|
||||
const ZSetFamily::ScoreInterval& interval) {
|
||||
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
robj* zobj = res_it.value()->second.AsRObj();
|
||||
zrangespec range = GetZrangeSpec(false, interval);
|
||||
unsigned count = 0;
|
||||
|
||||
if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
uint8_t* zl = (uint8_t*)zobj->ptr;
|
||||
uint8_t *eptr, *sptr;
|
||||
double score;
|
||||
|
||||
/* Use the first element in range as the starting point */
|
||||
eptr = zzlFirstInRange(zl, &range);
|
||||
|
||||
/* No "first" element */
|
||||
if (eptr == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* First element is in range */
|
||||
sptr = lpNext(zl, eptr);
|
||||
score = zzlGetScore(sptr);
|
||||
|
||||
DCHECK(zslValueLteMax(score, &range));
|
||||
|
||||
/* Iterate over elements in range */
|
||||
while (eptr) {
|
||||
score = zzlGetScore(sptr);
|
||||
|
||||
/* Abort when the node is no longer in range. */
|
||||
if (!zslValueLteMax(score, &range)) {
|
||||
break;
|
||||
} else {
|
||||
count++;
|
||||
zzlNext(zl, &eptr, &sptr);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
CHECK_EQ(unsigned(OBJ_ENCODING_SKIPLIST), zobj->encoding);
|
||||
zset* zs = (zset*)zobj->ptr;
|
||||
zskiplist* zsl = zs->zsl;
|
||||
zskiplistNode* zn;
|
||||
unsigned long rank;
|
||||
|
||||
/* Find first element in range */
|
||||
zn = zslFirstInRange(zsl, &range);
|
||||
|
||||
/* Use rank of first element, if any, to determine preliminary count */
|
||||
if (zn == NULL)
|
||||
return 0;
|
||||
|
||||
rank = zslGetRank(zsl, zn->score, zn->ele);
|
||||
count = (zsl->length - (rank - 1));
|
||||
|
||||
/* Find last element in range */
|
||||
zn = zslLastInRange(zsl, &range);
|
||||
|
||||
/* Use rank of last element, if any, to determine the actual count */
|
||||
if (zn != NULL) {
|
||||
rank = zslGetRank(zsl, zn->score, zn->ele);
|
||||
count -= (zsl->length - rank);
|
||||
}
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
OpResult<unsigned> OpLexCount(const OpArgs& op_args, string_view key,
|
||||
const ZSetFamily::LexInterval& interval) {
|
||||
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
robj* zobj = res_it.value()->second.AsRObj();
|
||||
zlexrangespec range = GetLexRange(false, interval);
|
||||
unsigned count = 0;
|
||||
if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
uint8_t* zl = (uint8_t*)zobj->ptr;
|
||||
uint8_t *eptr, *sptr;
|
||||
|
||||
/* Use the first element in range as the starting point */
|
||||
eptr = zzlFirstInLexRange(zl, &range);
|
||||
|
||||
/* No "first" element */
|
||||
if (eptr) {
|
||||
/* First element is in range */
|
||||
sptr = lpNext(zl, eptr);
|
||||
serverAssertWithInfo(c, zobj, zzlLexValueLteMax(eptr, &range));
|
||||
|
||||
/* Iterate over elements in range */
|
||||
while (eptr) {
|
||||
/* Abort when the node is no longer in range. */
|
||||
if (!zzlLexValueLteMax(eptr, &range)) {
|
||||
break;
|
||||
} else {
|
||||
count++;
|
||||
zzlNext(zl, &eptr, &sptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
DCHECK_EQ(OBJ_ENCODING_SKIPLIST, zobj->encoding);
|
||||
zset* zs = (zset*)zobj->ptr;
|
||||
zskiplist* zsl = zs->zsl;
|
||||
zskiplistNode* zn;
|
||||
unsigned long rank;
|
||||
|
||||
/* Find first element in range */
|
||||
zn = zslFirstInLexRange(zsl, &range);
|
||||
|
||||
/* Use rank of first element, if any, to determine preliminary count */
|
||||
if (zn != NULL) {
|
||||
rank = zslGetRank(zsl, zn->score, zn->ele);
|
||||
count = (zsl->length - (rank - 1));
|
||||
|
||||
/* Find last element in range */
|
||||
zn = zslLastInLexRange(zsl, &range);
|
||||
|
||||
/* Use rank of last element, if any, to determine the actual count */
|
||||
if (zn != NULL) {
|
||||
rank = zslGetRank(zsl, zn->score, zn->ele);
|
||||
count -= (zsl->length - rank);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
zslFreeLexRange(&range);
|
||||
return count;
|
||||
}
|
||||
|
||||
OpResult<unsigned> OpRem(const OpArgs& op_args, string_view key, ArgSlice members) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, *res_it);
|
||||
robj* zobj = res_it.value()->second.AsRObj();
|
||||
sds& tmp_str = op_args.shard->tmp_str1;
|
||||
unsigned deleted = 0;
|
||||
for (string_view member : members) {
|
||||
tmp_str = sdscpylen(tmp_str, member.data(), member.size());
|
||||
deleted += zsetDel(zobj, tmp_str);
|
||||
}
|
||||
auto zlen = zsetLength(zobj);
|
||||
res_it.value()->second.SyncRObj();
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, *res_it, key);
|
||||
|
||||
if (zlen == 0) {
|
||||
CHECK(op_args.shard->db_slice().Del(op_args.db_cntx.db_index, res_it.value()));
|
||||
}
|
||||
|
||||
return deleted;
|
||||
}
|
||||
|
||||
OpResult<double> OpScore(const OpArgs& op_args, string_view key, string_view member) {
|
||||
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
robj* zobj = res_it.value()->second.AsRObj();
|
||||
sds& tmp_str = op_args.shard->tmp_str1;
|
||||
tmp_str = sdscpylen(tmp_str, member.data(), member.size());
|
||||
double score;
|
||||
int retval = zsetScore(zobj, tmp_str, &score);
|
||||
if (retval != C_OK) {
|
||||
return OpStatus::KEY_NOTFOUND;
|
||||
}
|
||||
return score;
|
||||
}
|
||||
|
||||
OpResult<MScoreResponse> OpMScore(const OpArgs& op_args, string_view key, ArgSlice members) {
|
||||
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
MScoreResponse scores(members.size());
|
||||
|
||||
robj* zobj = res_it.value()->second.AsRObj();
|
||||
sds& tmp_str = op_args.shard->tmp_str1;
|
||||
|
||||
for (size_t i = 0; i < members.size(); i++) {
|
||||
const auto& m = members[i];
|
||||
|
||||
tmp_str = sdscpylen(tmp_str, m.data(), m.size());
|
||||
double score;
|
||||
int retval = zsetScore(zobj, tmp_str, &score);
|
||||
if (retval == C_OK) {
|
||||
scores[i] = score;
|
||||
} else {
|
||||
scores[i] = std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
return scores;
|
||||
}
|
||||
|
||||
OpResult<StringVec> OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor,
|
||||
const ScanOpts& scan_op) {
|
||||
OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
|
||||
if (!find_res)
|
||||
return find_res.status();
|
||||
|
||||
PrimeIterator it = find_res.value();
|
||||
StringVec res;
|
||||
robj* zobj = it->second.AsRObj();
|
||||
char buf[128];
|
||||
|
||||
if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
ZSetFamily::RangeParams params;
|
||||
params.with_scores = true;
|
||||
IntervalVisitor iv{Action::RANGE, params, zobj};
|
||||
|
||||
iv(ZSetFamily::IndexInterval{0, kuint32max});
|
||||
ZSetFamily::ScoredArray arr = iv.PopResult();
|
||||
|
||||
for (size_t i = 0; i < arr.size(); ++i) {
|
||||
if (!scan_op.Matches(arr[i].first)) {
|
||||
continue;
|
||||
}
|
||||
res.emplace_back(std::move(arr[i].first));
|
||||
char* str = RedisReplyBuilder::FormatDouble(arr[i].second, buf, sizeof(buf));
|
||||
res.emplace_back(str);
|
||||
}
|
||||
*cursor = 0;
|
||||
} else {
|
||||
CHECK_EQ(unsigned(OBJ_ENCODING_SKIPLIST), zobj->encoding);
|
||||
uint32_t count = scan_op.limit;
|
||||
zset* zs = (zset*)zobj->ptr;
|
||||
|
||||
dict* ht = zs->dict;
|
||||
long maxiterations = count * 10;
|
||||
|
||||
struct ScanArgs {
|
||||
char* sbuf;
|
||||
StringVec* res;
|
||||
const ScanOpts* scan_op;
|
||||
} sargs = {buf, &res, &scan_op};
|
||||
|
||||
auto scanCb = [](void* privdata, const dictEntry* de) {
|
||||
ScanArgs* sargs = (ScanArgs*)privdata;
|
||||
|
||||
sds key = (sds)de->key;
|
||||
if (!sargs->scan_op->Matches(key)) {
|
||||
return;
|
||||
}
|
||||
|
||||
double score = *(double*)dictGetVal(de);
|
||||
|
||||
sargs->res->emplace_back(key, sdslen(key));
|
||||
char* str = RedisReplyBuilder::FormatDouble(score, sargs->sbuf, sizeof(buf));
|
||||
sargs->res->emplace_back(str);
|
||||
};
|
||||
|
||||
do {
|
||||
*cursor = dictScan(ht, *cursor, scanCb, NULL, &sargs);
|
||||
} while (*cursor && maxiterations-- && res.size() < count);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
void ZSetFamily::BZPopMin(CmdArgList args, ConnectionContext* cntx) {
|
||||
|
@ -1353,29 +1723,6 @@ void ZSetFamily::ZCount(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
vector<ScoredMap> OpFetch(EngineShard* shard, Transaction* t) {
|
||||
ArgSlice keys = t->GetShardArgs(shard->shard_id());
|
||||
DVLOG(1) << "shard:" << shard->shard_id() << ", keys " << vector(keys.begin(), keys.end());
|
||||
DCHECK(!keys.empty());
|
||||
|
||||
vector<ScoredMap> results;
|
||||
results.reserve(keys.size());
|
||||
|
||||
auto& db_slice = shard->db_slice();
|
||||
for (size_t i = 0; i < keys.size(); ++i) {
|
||||
auto it = db_slice.Find(t->GetDbContext(), keys[i], OBJ_ZSET);
|
||||
if (!it) {
|
||||
results.push_back({});
|
||||
continue;
|
||||
}
|
||||
|
||||
ScoredMap sm = FromObject((*it)->second, 1);
|
||||
results.push_back(std::move(sm));
|
||||
}
|
||||
|
||||
return results;
|
||||
}
|
||||
|
||||
void ZSetFamily::ZDiff(CmdArgList args, ConnectionContext* cntx) {
|
||||
vector<vector<ScoredMap>> maps(shard_set->size());
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
|
@ -2023,353 +2370,6 @@ void ZSetFamily::ZPopMinMax(CmdArgList args, bool reverse, ConnectionContext* cn
|
|||
OutputScoredArrayResult(result, range_params, cntx);
|
||||
}
|
||||
|
||||
OpResult<StringVec> ZSetFamily::OpScan(const OpArgs& op_args, std::string_view key,
|
||||
uint64_t* cursor, const ScanOpts& scan_op) {
|
||||
OpResult<PrimeIterator> find_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
|
||||
if (!find_res)
|
||||
return find_res.status();
|
||||
|
||||
PrimeIterator it = find_res.value();
|
||||
StringVec res;
|
||||
robj* zobj = it->second.AsRObj();
|
||||
char buf[128];
|
||||
|
||||
if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
RangeParams params;
|
||||
params.with_scores = true;
|
||||
IntervalVisitor iv{Action::RANGE, params, zobj};
|
||||
|
||||
iv(IndexInterval{0, kuint32max});
|
||||
ScoredArray arr = iv.PopResult();
|
||||
|
||||
for (size_t i = 0; i < arr.size(); ++i) {
|
||||
if (!scan_op.Matches(arr[i].first)) {
|
||||
continue;
|
||||
}
|
||||
res.emplace_back(std::move(arr[i].first));
|
||||
char* str = RedisReplyBuilder::FormatDouble(arr[i].second, buf, sizeof(buf));
|
||||
res.emplace_back(str);
|
||||
}
|
||||
*cursor = 0;
|
||||
} else {
|
||||
CHECK_EQ(unsigned(OBJ_ENCODING_SKIPLIST), zobj->encoding);
|
||||
uint32_t count = scan_op.limit;
|
||||
zset* zs = (zset*)zobj->ptr;
|
||||
|
||||
dict* ht = zs->dict;
|
||||
long maxiterations = count * 10;
|
||||
|
||||
struct ScanArgs {
|
||||
char* sbuf;
|
||||
StringVec* res;
|
||||
const ScanOpts* scan_op;
|
||||
} sargs = {buf, &res, &scan_op};
|
||||
|
||||
auto scanCb = [](void* privdata, const dictEntry* de) {
|
||||
ScanArgs* sargs = (ScanArgs*)privdata;
|
||||
|
||||
sds key = (sds)de->key;
|
||||
if (!sargs->scan_op->Matches(key)) {
|
||||
return;
|
||||
}
|
||||
|
||||
double score = *(double*)dictGetVal(de);
|
||||
|
||||
sargs->res->emplace_back(key, sdslen(key));
|
||||
char* str = RedisReplyBuilder::FormatDouble(score, sargs->sbuf, sizeof(buf));
|
||||
sargs->res->emplace_back(str);
|
||||
};
|
||||
|
||||
do {
|
||||
*cursor = dictScan(ht, *cursor, scanCb, NULL, &sargs);
|
||||
} while (*cursor && maxiterations-- && res.size() < count);
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
OpResult<unsigned> ZSetFamily::OpRem(const OpArgs& op_args, string_view key, ArgSlice members) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, *res_it);
|
||||
robj* zobj = res_it.value()->second.AsRObj();
|
||||
sds& tmp_str = op_args.shard->tmp_str1;
|
||||
unsigned deleted = 0;
|
||||
for (string_view member : members) {
|
||||
tmp_str = sdscpylen(tmp_str, member.data(), member.size());
|
||||
deleted += zsetDel(zobj, tmp_str);
|
||||
}
|
||||
auto zlen = zsetLength(zobj);
|
||||
res_it.value()->second.SyncRObj();
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, *res_it, key);
|
||||
|
||||
if (zlen == 0) {
|
||||
CHECK(op_args.shard->db_slice().Del(op_args.db_cntx.db_index, res_it.value()));
|
||||
}
|
||||
|
||||
return deleted;
|
||||
}
|
||||
|
||||
OpResult<double> ZSetFamily::OpScore(const OpArgs& op_args, string_view key, string_view member) {
|
||||
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
robj* zobj = res_it.value()->second.AsRObj();
|
||||
sds& tmp_str = op_args.shard->tmp_str1;
|
||||
tmp_str = sdscpylen(tmp_str, member.data(), member.size());
|
||||
double score;
|
||||
int retval = zsetScore(zobj, tmp_str, &score);
|
||||
if (retval != C_OK) {
|
||||
return OpStatus::KEY_NOTFOUND;
|
||||
}
|
||||
return score;
|
||||
}
|
||||
|
||||
OpResult<ZSetFamily::MScoreResponse> ZSetFamily::OpMScore(const OpArgs& op_args, string_view key,
|
||||
ArgSlice members) {
|
||||
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
MScoreResponse scores(members.size());
|
||||
|
||||
robj* zobj = res_it.value()->second.AsRObj();
|
||||
sds& tmp_str = op_args.shard->tmp_str1;
|
||||
|
||||
for (size_t i = 0; i < members.size(); i++) {
|
||||
const auto& m = members[i];
|
||||
|
||||
tmp_str = sdscpylen(tmp_str, m.data(), m.size());
|
||||
double score;
|
||||
int retval = zsetScore(zobj, tmp_str, &score);
|
||||
if (retval == C_OK) {
|
||||
scores[i] = score;
|
||||
} else {
|
||||
scores[i] = std::nullopt;
|
||||
}
|
||||
}
|
||||
|
||||
return scores;
|
||||
}
|
||||
|
||||
auto ZSetFamily::OpPopCount(const ZRangeSpec& range_spec, const OpArgs& op_args, string_view key)
|
||||
-> OpResult<ScoredArray> {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, *res_it);
|
||||
|
||||
robj* zobj = res_it.value()->second.AsRObj();
|
||||
|
||||
IntervalVisitor iv{Action::POP, range_spec.params, zobj};
|
||||
std::visit(iv, range_spec.interval);
|
||||
|
||||
res_it.value()->second.SyncRObj();
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, *res_it, key);
|
||||
|
||||
auto zlen = zsetLength(zobj);
|
||||
if (zlen == 0) {
|
||||
CHECK(op_args.shard->db_slice().Del(op_args.db_cntx.db_index, res_it.value()));
|
||||
}
|
||||
|
||||
return iv.PopResult();
|
||||
}
|
||||
|
||||
auto ZSetFamily::OpRange(const ZRangeSpec& range_spec, const OpArgs& op_args, string_view key)
|
||||
-> OpResult<ScoredArray> {
|
||||
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
robj* zobj = res_it.value()->second.AsRObj();
|
||||
IntervalVisitor iv{Action::RANGE, range_spec.params, zobj};
|
||||
|
||||
std::visit(iv, range_spec.interval);
|
||||
|
||||
return iv.PopResult();
|
||||
}
|
||||
|
||||
OpResult<unsigned> ZSetFamily::OpRemRange(const OpArgs& op_args, string_view key,
|
||||
const ZRangeSpec& range_spec) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, *res_it);
|
||||
|
||||
robj* zobj = res_it.value()->second.AsRObj();
|
||||
|
||||
IntervalVisitor iv{Action::REMOVE, range_spec.params, zobj};
|
||||
std::visit(iv, range_spec.interval);
|
||||
|
||||
res_it.value()->second.SyncRObj();
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, *res_it, key);
|
||||
|
||||
auto zlen = zsetLength(zobj);
|
||||
if (zlen == 0) {
|
||||
CHECK(op_args.shard->db_slice().Del(op_args.db_cntx.db_index, res_it.value()));
|
||||
}
|
||||
|
||||
return iv.removed();
|
||||
}
|
||||
|
||||
OpResult<unsigned> ZSetFamily::OpRank(const OpArgs& op_args, string_view key, string_view member,
|
||||
bool reverse) {
|
||||
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
robj* zobj = res_it.value()->second.AsRObj();
|
||||
op_args.shard->tmp_str1 = sdscpylen(op_args.shard->tmp_str1, member.data(), member.size());
|
||||
|
||||
long res = zsetRank(zobj, op_args.shard->tmp_str1, reverse);
|
||||
if (res < 0)
|
||||
return OpStatus::KEY_NOTFOUND;
|
||||
return res;
|
||||
}
|
||||
|
||||
OpResult<unsigned> ZSetFamily::OpCount(const OpArgs& op_args, std::string_view key,
|
||||
const ScoreInterval& interval) {
|
||||
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
robj* zobj = res_it.value()->second.AsRObj();
|
||||
zrangespec range = GetZrangeSpec(false, interval);
|
||||
unsigned count = 0;
|
||||
|
||||
if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
uint8_t* zl = (uint8_t*)zobj->ptr;
|
||||
uint8_t *eptr, *sptr;
|
||||
double score;
|
||||
|
||||
/* Use the first element in range as the starting point */
|
||||
eptr = zzlFirstInRange(zl, &range);
|
||||
|
||||
/* No "first" element */
|
||||
if (eptr == NULL) {
|
||||
return 0;
|
||||
}
|
||||
|
||||
/* First element is in range */
|
||||
sptr = lpNext(zl, eptr);
|
||||
score = zzlGetScore(sptr);
|
||||
|
||||
DCHECK(zslValueLteMax(score, &range));
|
||||
|
||||
/* Iterate over elements in range */
|
||||
while (eptr) {
|
||||
score = zzlGetScore(sptr);
|
||||
|
||||
/* Abort when the node is no longer in range. */
|
||||
if (!zslValueLteMax(score, &range)) {
|
||||
break;
|
||||
} else {
|
||||
count++;
|
||||
zzlNext(zl, &eptr, &sptr);
|
||||
}
|
||||
}
|
||||
} else {
|
||||
CHECK_EQ(unsigned(OBJ_ENCODING_SKIPLIST), zobj->encoding);
|
||||
zset* zs = (zset*)zobj->ptr;
|
||||
zskiplist* zsl = zs->zsl;
|
||||
zskiplistNode* zn;
|
||||
unsigned long rank;
|
||||
|
||||
/* Find first element in range */
|
||||
zn = zslFirstInRange(zsl, &range);
|
||||
|
||||
/* Use rank of first element, if any, to determine preliminary count */
|
||||
if (zn == NULL)
|
||||
return 0;
|
||||
|
||||
rank = zslGetRank(zsl, zn->score, zn->ele);
|
||||
count = (zsl->length - (rank - 1));
|
||||
|
||||
/* Find last element in range */
|
||||
zn = zslLastInRange(zsl, &range);
|
||||
|
||||
/* Use rank of last element, if any, to determine the actual count */
|
||||
if (zn != NULL) {
|
||||
rank = zslGetRank(zsl, zn->score, zn->ele);
|
||||
count -= (zsl->length - rank);
|
||||
}
|
||||
}
|
||||
|
||||
return count;
|
||||
}
|
||||
|
||||
OpResult<unsigned> ZSetFamily::OpLexCount(const OpArgs& op_args, string_view key,
|
||||
const ZSetFamily::LexInterval& interval) {
|
||||
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
||||
robj* zobj = res_it.value()->second.AsRObj();
|
||||
zlexrangespec range = GetLexRange(false, interval);
|
||||
unsigned count = 0;
|
||||
if (zobj->encoding == OBJ_ENCODING_LISTPACK) {
|
||||
uint8_t* zl = (uint8_t*)zobj->ptr;
|
||||
uint8_t *eptr, *sptr;
|
||||
|
||||
/* Use the first element in range as the starting point */
|
||||
eptr = zzlFirstInLexRange(zl, &range);
|
||||
|
||||
/* No "first" element */
|
||||
if (eptr) {
|
||||
/* First element is in range */
|
||||
sptr = lpNext(zl, eptr);
|
||||
serverAssertWithInfo(c, zobj, zzlLexValueLteMax(eptr, &range));
|
||||
|
||||
/* Iterate over elements in range */
|
||||
while (eptr) {
|
||||
/* Abort when the node is no longer in range. */
|
||||
if (!zzlLexValueLteMax(eptr, &range)) {
|
||||
break;
|
||||
} else {
|
||||
count++;
|
||||
zzlNext(zl, &eptr, &sptr);
|
||||
}
|
||||
}
|
||||
}
|
||||
} else {
|
||||
DCHECK_EQ(OBJ_ENCODING_SKIPLIST, zobj->encoding);
|
||||
zset* zs = (zset*)zobj->ptr;
|
||||
zskiplist* zsl = zs->zsl;
|
||||
zskiplistNode* zn;
|
||||
unsigned long rank;
|
||||
|
||||
/* Find first element in range */
|
||||
zn = zslFirstInLexRange(zsl, &range);
|
||||
|
||||
/* Use rank of first element, if any, to determine preliminary count */
|
||||
if (zn != NULL) {
|
||||
rank = zslGetRank(zsl, zn->score, zn->ele);
|
||||
count = (zsl->length - (rank - 1));
|
||||
|
||||
/* Find last element in range */
|
||||
zn = zslLastInLexRange(zsl, &range);
|
||||
|
||||
/* Use rank of last element, if any, to determine the actual count */
|
||||
if (zn != NULL) {
|
||||
rank = zslGetRank(zsl, zn->score, zn->ele);
|
||||
count -= (zsl->length - rank);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
zslFreeLexRange(&range);
|
||||
return count;
|
||||
}
|
||||
|
||||
#define HFUNC(x) SetHandler(&ZSetFamily::x)
|
||||
|
||||
void ZSetFamily::Register(CommandRegistry* registry) {
|
||||
|
|
|
@ -96,30 +96,6 @@ class ZSetFamily {
|
|||
static void ZRankGeneric(CmdArgList args, bool reverse, ConnectionContext* cntx);
|
||||
static bool ParseRangeByScoreParams(CmdArgList args, RangeParams* params);
|
||||
static void ZPopMinMax(CmdArgList args, bool reverse, ConnectionContext* cntx);
|
||||
static OpResult<StringVec> OpScan(const OpArgs& op_args, std::string_view key, uint64_t* cursor,
|
||||
const ScanOpts& scan_op);
|
||||
|
||||
static OpResult<unsigned> OpRem(const OpArgs& op_args, std::string_view key, ArgSlice members);
|
||||
static OpResult<double> OpScore(const OpArgs& op_args, std::string_view key,
|
||||
std::string_view member);
|
||||
using MScoreResponse = std::vector<std::optional<double>>;
|
||||
static OpResult<MScoreResponse> OpMScore(const OpArgs& op_args, std::string_view key,
|
||||
ArgSlice members);
|
||||
static OpResult<ScoredArray> OpPopCount(const ZRangeSpec& range_spec, const OpArgs& op_args,
|
||||
std::string_view key);
|
||||
static OpResult<ScoredArray> OpRange(const ZRangeSpec& range_spec, const OpArgs& op_args,
|
||||
std::string_view key);
|
||||
static OpResult<unsigned> OpRemRange(const OpArgs& op_args, std::string_view key,
|
||||
const ZRangeSpec& spec);
|
||||
|
||||
static OpResult<unsigned> OpRank(const OpArgs& op_args, std::string_view key,
|
||||
std::string_view member, bool reverse);
|
||||
|
||||
static OpResult<unsigned> OpCount(const OpArgs& op_args, std::string_view key,
|
||||
const ScoreInterval& interval);
|
||||
|
||||
static OpResult<unsigned> OpLexCount(const OpArgs& op_args, std::string_view key,
|
||||
const LexInterval& interval);
|
||||
};
|
||||
|
||||
} // namespace dfly
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue