diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index c594ba3df..f2f58181f 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -455,6 +455,8 @@ OpResult HSetFamily::OpSet(const OpArgs& op_args, string_view key, Cmd } else { if (it->second.ObjType() != OBJ_HASH) return OpStatus::WRONG_TYPE; + + db_slice.PreUpdate(op_args.db_ind, it); } hset = it->second.AsRObj(); @@ -504,6 +506,7 @@ OpResult HSetFamily::OpSet(const OpArgs& op_args, string_view key, Cmd } } it->second.SyncRObj(); + db_slice.PostUpdate(op_args.db_ind, it); return created; } @@ -517,6 +520,7 @@ OpResult HSetFamily::OpDel(const OpArgs& op_args, string_view key, Cmd if (!it_res) return it_res.status(); + db_slice.PreUpdate(op_args.db_ind, *it_res); CompactObj& co = (*it_res)->second; robj* hset = co.AsRObj(); unsigned deleted = 0; @@ -541,6 +545,7 @@ OpResult HSetFamily::OpDel(const OpArgs& op_args, string_view key, Cmd co.SyncRObj(); + db_slice.PostUpdate(op_args.db_ind, *it_res); if (key_remove) { if (hset->encoding == OBJ_ENCODING_LISTPACK) { stats->listpack_blob_cnt--; @@ -764,6 +769,7 @@ OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_vie if (it->second.ObjType() != OBJ_HASH) return OpStatus::WRONG_TYPE; + db_slice.PreUpdate(op_args.db_ind, it); hset = it->second.AsRObj(); if (hset->encoding == OBJ_ENCODING_LISTPACK) { @@ -860,6 +866,7 @@ OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_vie } it->second.SyncRObj(); + db_slice.PostUpdate(op_args.db_ind, it); return OpStatus::OK; } diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index 9e3a34fe2..363249989 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -100,6 +100,7 @@ OpResult FindZEntry(unsigned flags, const OpArgs& op_args, string } else { if (it->second.ObjType() != OBJ_ZSET) return OpStatus::WRONG_TYPE; + db_slice.PreUpdate(op_args.db_ind, it); } return it; } @@ -1203,6 +1204,8 @@ OpStatus ZSetFamily::OpAdd(const ZParams& zparams, const OpArgs& op_args, string double new_score; int retflags = 0; + OpStatus res = OpStatus::OK; + for (size_t j = 0; j < members.size(); j++) { const auto& m = members[j]; tmp_str = sdscpylen(tmp_str, m.second.data(), m.second.size()); @@ -1214,11 +1217,12 @@ OpStatus ZSetFamily::OpAdd(const ZParams& zparams, const OpArgs& op_args, string CHECK_EQ(1u, members.size()); add_result->is_nan = true; - return OpStatus::OK; + break; } - if (retflags & ZADD_OUT_NOP) - return OpStatus::SKIPPED; + if (retflags & ZADD_OUT_NOP) { + res = OpStatus::SKIPPED; + } } if (retflags & ZADD_OUT_ADDED) @@ -1232,20 +1236,24 @@ OpStatus ZSetFamily::OpAdd(const ZParams& zparams, const OpArgs& op_args, string DVLOG(2) << "ZAdd " << zobj->ptr; res_it.value()->second.SyncRObj(); + op_args.shard->db_slice().PostUpdate(op_args.db_ind, *res_it); + if (zparams.flags & ZADD_IN_INCR) { add_result->new_score = new_score; } else { add_result->num_updated = zparams.ch ? added + updated : added; } - return OpStatus::OK; + return res; } OpResult ZSetFamily::OpRem(const OpArgs& op_args, string_view key, ArgSlice members) { - OpResult res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET); + auto& db_slice = op_args.shard->db_slice(); + OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_ZSET); if (!res_it) return res_it.status(); + db_slice.PreUpdate(op_args.db_ind, *res_it); robj* zobj = res_it.value()->second.AsRObj(); sds& tmp_str = op_args.shard->tmp_str1; unsigned deleted = 0; @@ -1255,6 +1263,7 @@ OpResult ZSetFamily::OpRem(const OpArgs& op_args, string_view key, Arg } auto zlen = zsetLength(zobj); res_it.value()->second.SyncRObj(); + db_slice.PostUpdate(op_args.db_ind, *res_it); if (zlen == 0) { CHECK(op_args.shard->db_slice().Del(op_args.db_ind, res_it.value())); @@ -1295,16 +1304,21 @@ auto ZSetFamily::OpRange(const ZRangeSpec& range_spec, const OpArgs& op_args, st OpResult ZSetFamily::OpRemRange(const OpArgs& op_args, string_view key, const ZRangeSpec& range_spec) { - OpResult res_it = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_ZSET); + auto& db_slice = op_args.shard->db_slice(); + OpResult res_it = db_slice.Find(op_args.db_ind, key, OBJ_ZSET); if (!res_it) return res_it.status(); + db_slice.PreUpdate(op_args.db_ind, *res_it); + robj* zobj = res_it.value()->second.AsRObj(); IntervalVisitor iv{Action::REMOVE, range_spec.params, zobj}; std::visit(iv, range_spec.interval); res_it.value()->second.SyncRObj(); + db_slice.PostUpdate(op_args.db_ind, *res_it); + auto zlen = zsetLength(zobj); if (zlen == 0) { CHECK(op_args.shard->db_slice().Del(op_args.db_ind, res_it.value()));