Add update hooks to zset and hset families

This commit is contained in:
Roman Gershman 2022-04-30 22:56:44 +03:00
parent 370d4cd0ee
commit b1f993377b
2 changed files with 27 additions and 6 deletions

View file

@ -455,6 +455,8 @@ OpResult<uint32_t> HSetFamily::OpSet(const OpArgs& op_args, string_view key, Cmd
} else {
if (it->second.ObjType() != OBJ_HASH)
return OpStatus::WRONG_TYPE;
db_slice.PreUpdate(op_args.db_ind, it);
}
hset = it->second.AsRObj();
@ -504,6 +506,7 @@ OpResult<uint32_t> HSetFamily::OpSet(const OpArgs& op_args, string_view key, Cmd
}
}
it->second.SyncRObj();
db_slice.PostUpdate(op_args.db_ind, it);
return created;
}
@ -517,6 +520,7 @@ OpResult<uint32_t> HSetFamily::OpDel(const OpArgs& op_args, string_view key, Cmd
if (!it_res)
return it_res.status();
db_slice.PreUpdate(op_args.db_ind, *it_res);
CompactObj& co = (*it_res)->second;
robj* hset = co.AsRObj();
unsigned deleted = 0;
@ -541,6 +545,7 @@ OpResult<uint32_t> HSetFamily::OpDel(const OpArgs& op_args, string_view key, Cmd
co.SyncRObj();
db_slice.PostUpdate(op_args.db_ind, *it_res);
if (key_remove) {
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
stats->listpack_blob_cnt--;
@ -764,6 +769,7 @@ OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_vie
if (it->second.ObjType() != OBJ_HASH)
return OpStatus::WRONG_TYPE;
db_slice.PreUpdate(op_args.db_ind, it);
hset = it->second.AsRObj();
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
@ -860,6 +866,7 @@ OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_vie
}
it->second.SyncRObj();
db_slice.PostUpdate(op_args.db_ind, it);
return OpStatus::OK;
}

View file

@ -100,6 +100,7 @@ OpResult<PrimeIterator> FindZEntry(unsigned flags, const OpArgs& op_args, string
} else {
if (it->second.ObjType() != OBJ_ZSET)
return OpStatus::WRONG_TYPE;
db_slice.PreUpdate(op_args.db_ind, it);
}
return it;
}
@ -1203,6 +1204,8 @@ OpStatus ZSetFamily::OpAdd(const ZParams& zparams, const OpArgs& op_args, string
double new_score;
int retflags = 0;
OpStatus res = OpStatus::OK;
for (size_t j = 0; j < members.size(); j++) {
const auto& m = members[j];
tmp_str = sdscpylen(tmp_str, m.second.data(), m.second.size());
@ -1214,11 +1217,12 @@ OpStatus ZSetFamily::OpAdd(const ZParams& zparams, const OpArgs& op_args, string
CHECK_EQ(1u, members.size());
add_result->is_nan = true;
return OpStatus::OK;
break;
}
if (retflags & ZADD_OUT_NOP)
return OpStatus::SKIPPED;
if (retflags & ZADD_OUT_NOP) {
res = OpStatus::SKIPPED;
}
}
if (retflags & ZADD_OUT_ADDED)
@ -1232,20 +1236,24 @@ OpStatus ZSetFamily::OpAdd(const ZParams& zparams, const OpArgs& op_args, string
DVLOG(2) << "ZAdd " << zobj->ptr;
res_it.value()->second.SyncRObj();
op_args.shard->db_slice().PostUpdate(op_args.db_ind, *res_it);
if (zparams.flags & ZADD_IN_INCR) {
add_result->new_score = new_score;
} else {
add_result->num_updated = zparams.ch ? added + updated : added;
}
return OpStatus::OK;
return res;
}
OpResult<unsigned> ZSetFamily::OpRem(const OpArgs& op_args, string_view key, ArgSlice members) {
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET);
auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_ZSET);
if (!res_it)
return res_it.status();
db_slice.PreUpdate(op_args.db_ind, *res_it);
robj* zobj = res_it.value()->second.AsRObj();
sds& tmp_str = op_args.shard->tmp_str1;
unsigned deleted = 0;
@ -1255,6 +1263,7 @@ OpResult<unsigned> ZSetFamily::OpRem(const OpArgs& op_args, string_view key, Arg
}
auto zlen = zsetLength(zobj);
res_it.value()->second.SyncRObj();
db_slice.PostUpdate(op_args.db_ind, *res_it);
if (zlen == 0) {
CHECK(op_args.shard->db_slice().Del(op_args.db_ind, res_it.value()));
@ -1295,16 +1304,21 @@ auto ZSetFamily::OpRange(const ZRangeSpec& range_spec, const OpArgs& op_args, st
OpResult<unsigned> ZSetFamily::OpRemRange(const OpArgs& op_args, string_view key,
const ZRangeSpec& range_spec) {
OpResult<PrimeIterator> res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET);
auto& db_slice = op_args.shard->db_slice();
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_ind, key, OBJ_ZSET);
if (!res_it)
return res_it.status();
db_slice.PreUpdate(op_args.db_ind, *res_it);
robj* zobj = res_it.value()->second.AsRObj();
IntervalVisitor iv{Action::REMOVE, range_spec.params, zobj};
std::visit(iv, range_spec.interval);
res_it.value()->second.SyncRObj();
db_slice.PostUpdate(op_args.db_ind, *res_it);
auto zlen = zsetLength(zobj);
if (zlen == 0) {
CHECK(op_args.shard->db_slice().Del(op_args.db_ind, res_it.value()));