mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
feat(server): Convert DbSlice
's AddOr*
to return AutoUpdater
(#2290)
* fix(server): Use AutoUpdater with AddOr* methods * Remove explicit calls * `operator=` * return *this * PostUpdate twice * exp it * bitops * remove explicit `Run()` * Explicitly `delete` copy ops * Remove `AddOrSkip()`
This commit is contained in:
parent
aaf01d4244
commit
dcedd1645e
15 changed files with 208 additions and 245 deletions
|
@ -145,6 +145,11 @@ class DashTable : public detail::DashTableBase {
|
|||
InsertMode::kInsertIfNotFound);
|
||||
}
|
||||
|
||||
template <typename U, typename V> iterator InsertNew(U&& key, V&& value) {
|
||||
DefaultEvictionPolicy policy;
|
||||
return InsertNew(std::forward<U>(key), std::forward<V>(value), policy);
|
||||
}
|
||||
|
||||
template <typename U, typename V, typename EvictionPolicy>
|
||||
iterator InsertNew(U&& key, V&& value, EvictionPolicy& ev) {
|
||||
return InsertInternal(std::forward<U>(key), std::forward<V>(value), ev,
|
||||
|
|
|
@ -288,6 +288,7 @@ class ElementAccess {
|
|||
std::string_view key_;
|
||||
DbContext context_;
|
||||
EngineShard* shard_ = nullptr;
|
||||
mutable DbSlice::AutoUpdater post_updater_;
|
||||
|
||||
public:
|
||||
ElementAccess(std::string_view key, const OpArgs& args) : key_{key}, context_{args.db_cntx} {
|
||||
|
@ -323,15 +324,16 @@ std::optional<bool> ElementAccess::Exists(EngineShard* shard) {
|
|||
|
||||
OpStatus ElementAccess::Find(EngineShard* shard) {
|
||||
try {
|
||||
std::pair<PrimeIterator, bool> add_res = shard->db_slice().AddOrFind(context_, key_);
|
||||
if (!add_res.second) {
|
||||
if (add_res.first->second.ObjType() != OBJ_STRING) {
|
||||
auto add_res = shard->db_slice().AddOrFind(context_, key_);
|
||||
if (!add_res.is_new) {
|
||||
if (add_res.it->second.ObjType() != OBJ_STRING) {
|
||||
return OpStatus::WRONG_TYPE;
|
||||
}
|
||||
}
|
||||
element_iter_ = add_res.first;
|
||||
added_ = add_res.second;
|
||||
element_iter_ = add_res.it;
|
||||
added_ = add_res.is_new;
|
||||
shard_ = shard;
|
||||
post_updater_ = std::move(add_res.post_updater);
|
||||
return OpStatus::OK;
|
||||
} catch (const std::bad_alloc&) {
|
||||
return OpStatus::OUT_OF_MEMORY;
|
||||
|
@ -349,10 +351,8 @@ std::string ElementAccess::Value() const {
|
|||
|
||||
void ElementAccess::Commit(std::string_view new_value) const {
|
||||
if (shard_) {
|
||||
auto& db_slice = shard_->db_slice();
|
||||
db_slice.PreUpdate(Index(), element_iter_);
|
||||
element_iter_->second.SetString(new_value);
|
||||
db_slice.PostUpdate(Index(), element_iter_, key_, !added_);
|
||||
post_updater_.Run();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -910,7 +910,8 @@ using CommandList = std::vector<Command>;
|
|||
// Helper class used in the shard cb that abstracts away the iteration and execution of subcommands
|
||||
class StateExecutor {
|
||||
public:
|
||||
StateExecutor(ElementAccess access, EngineShard* shard) : access_{access}, shard_(shard) {
|
||||
StateExecutor(ElementAccess access, EngineShard* shard)
|
||||
: access_{std::move(access)}, shard_(shard) {
|
||||
}
|
||||
|
||||
// Iterates over all of the parsed subcommands and executes them one by one. At the end,
|
||||
|
|
|
@ -394,11 +394,18 @@ void DbSlice::AutoUpdater::Cancel() {
|
|||
|
||||
DbSlice::AutoUpdater::AutoUpdater(const Fields& fields) : fields_(fields) {
|
||||
DCHECK(fields_.action == DestructorAction::kRun);
|
||||
fields_.db_slice->PreUpdate(fields_.db_ind, fields_.it);
|
||||
fields_.db_size = fields_.db_slice->DbSize(fields_.db_ind);
|
||||
fields_.deletion_count = fields_.db_slice->deletion_count_;
|
||||
}
|
||||
|
||||
DbSlice::AddOrFindResult& DbSlice::AddOrFindResult::operator=(ItAndUpdater&& o) {
|
||||
it = o.it;
|
||||
exp_it = ExpireIterator{}; // ItAndUpdater doesn't have exp_it
|
||||
is_new = false;
|
||||
post_updater = std::move(o).post_updater;
|
||||
return *this;
|
||||
}
|
||||
|
||||
OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutable(const Context& cntx, string_view key,
|
||||
unsigned req_obj_type) {
|
||||
auto it = FindInternal(cntx, key, FindInternalMode::kDontUpdateCacheStats).first;
|
||||
|
@ -410,6 +417,7 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::FindMutable(const Context& cntx, string
|
|||
return OpStatus::WRONG_TYPE;
|
||||
}
|
||||
|
||||
PreUpdate(cntx.db_index, it);
|
||||
return {
|
||||
{it, AutoUpdater({AutoUpdater::DestructorAction::kRun, this, cntx.db_index, it, key, true})}};
|
||||
}
|
||||
|
@ -491,13 +499,7 @@ OpResult<pair<PrimeIterator, unsigned>> DbSlice::FindFirst(const Context& cntx,
|
|||
return OpStatus::KEY_NOTFOUND;
|
||||
}
|
||||
|
||||
pair<PrimeIterator, bool> DbSlice::AddOrFind(const Context& cntx, string_view key) noexcept(false) {
|
||||
auto res = AddOrFind2(cntx, key);
|
||||
return make_pair(get<0>(res), get<2>(res));
|
||||
}
|
||||
|
||||
tuple<PrimeIterator, ExpireIterator, bool> DbSlice::AddOrFind2(const Context& cntx,
|
||||
string_view key) noexcept(false) {
|
||||
DbSlice::AddOrFindResult DbSlice::AddOrFind(const Context& cntx, string_view key) noexcept(false) {
|
||||
DCHECK(IsDbValid(cntx.db_index));
|
||||
|
||||
DbTable& db = *db_arr_[cntx.db_index];
|
||||
|
@ -505,7 +507,12 @@ tuple<PrimeIterator, ExpireIterator, bool> DbSlice::AddOrFind2(const Context& cn
|
|||
auto res = FindInternal(cntx, key, FindInternalMode::kDontUpdateCacheStats);
|
||||
|
||||
if (IsValid(res.first)) {
|
||||
return tuple_cat(res, make_tuple(false));
|
||||
PreUpdate(cntx.db_index, res.first);
|
||||
return {.it = res.first,
|
||||
.exp_it = res.second,
|
||||
.is_new = false,
|
||||
.post_updater = AutoUpdater(
|
||||
{AutoUpdater::DestructorAction::kRun, this, cntx.db_index, res.first, key, true})};
|
||||
}
|
||||
|
||||
// It's a new entry.
|
||||
|
@ -534,7 +541,7 @@ tuple<PrimeIterator, ExpireIterator, bool> DbSlice::AddOrFind2(const Context& cn
|
|||
|
||||
// If we are over limit in non-cache scenario, just be conservative and throw.
|
||||
if (apply_memory_limit && !caching_mode_ && evp.mem_budget() < 0) {
|
||||
VLOG(2) << "AddOrFind2: over limit, budget: " << evp.mem_budget();
|
||||
VLOG(2) << "AddOrFind: over limit, budget: " << evp.mem_budget();
|
||||
events_.insertion_rejections++;
|
||||
throw bad_alloc();
|
||||
}
|
||||
|
@ -583,7 +590,11 @@ tuple<PrimeIterator, ExpireIterator, bool> DbSlice::AddOrFind2(const Context& cn
|
|||
db.slots_stats[sid].key_count += 1;
|
||||
}
|
||||
|
||||
return make_tuple(it, ExpireIterator{}, true);
|
||||
return {.it = it,
|
||||
.exp_it = ExpireIterator{},
|
||||
.is_new = true,
|
||||
.post_updater = AutoUpdater(
|
||||
{AutoUpdater::DestructorAction::kRun, this, cntx.db_index, it, key, false})};
|
||||
}
|
||||
|
||||
void DbSlice::ActivateDb(DbIndex db_ind) {
|
||||
|
@ -758,12 +769,12 @@ uint32_t DbSlice::GetMCFlag(DbIndex db_ind, const PrimeKey& key) const {
|
|||
return it->second;
|
||||
}
|
||||
|
||||
PrimeIterator DbSlice::AddNew(const Context& cntx, string_view key, PrimeValue obj,
|
||||
uint64_t expire_at_ms) noexcept(false) {
|
||||
auto [it, added] = AddOrSkip(cntx, key, std::move(obj), expire_at_ms);
|
||||
CHECK(added);
|
||||
DbSlice::ItAndUpdater DbSlice::AddNew(const Context& cntx, string_view key, PrimeValue obj,
|
||||
uint64_t expire_at_ms) noexcept(false) {
|
||||
auto res = AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, false);
|
||||
CHECK(res.is_new);
|
||||
|
||||
return it;
|
||||
return {.it = res.it, .post_updater = std::move(res.post_updater)};
|
||||
}
|
||||
|
||||
pair<int64_t, int64_t> DbSlice::ExpireParams::Calculate(int64_t now_ms) const {
|
||||
|
@ -816,45 +827,38 @@ OpResult<int64_t> DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime
|
|||
}
|
||||
}
|
||||
|
||||
std::pair<PrimeIterator, bool> DbSlice::AddOrUpdateInternal(const Context& cntx,
|
||||
std::string_view key, PrimeValue obj,
|
||||
uint64_t expire_at_ms,
|
||||
bool force_update) noexcept(false) {
|
||||
DbSlice::AddOrFindResult DbSlice::AddOrUpdateInternal(const Context& cntx, std::string_view key,
|
||||
PrimeValue obj, uint64_t expire_at_ms,
|
||||
bool force_update) noexcept(false) {
|
||||
DCHECK(!obj.IsRef());
|
||||
|
||||
pair<PrimeIterator, bool> res = AddOrFind(cntx, key);
|
||||
if (!res.second && !force_update) // have not inserted.
|
||||
auto res = AddOrFind(cntx, key);
|
||||
if (!res.is_new && !force_update) // have not inserted.
|
||||
return res;
|
||||
|
||||
auto& db = *db_arr_[cntx.db_index];
|
||||
auto& it = res.first;
|
||||
auto& it = res.it;
|
||||
|
||||
it->second = std::move(obj);
|
||||
PostUpdate(cntx.db_index, it, key, false);
|
||||
|
||||
if (expire_at_ms) {
|
||||
it->second.SetExpire(true);
|
||||
uint64_t delta = expire_at_ms - expire_base_[0];
|
||||
auto [eit, inserted] = db.expire.Insert(it->first.AsRef(), ExpirePeriod(delta));
|
||||
CHECK(inserted || force_update);
|
||||
if (!inserted) {
|
||||
eit->second = ExpirePeriod(delta);
|
||||
if (IsValid(res.exp_it) && force_update) {
|
||||
res.exp_it->second = ExpirePeriod(delta);
|
||||
} else {
|
||||
res.exp_it = db.expire.InsertNew(it->first.AsRef(), ExpirePeriod(delta));
|
||||
}
|
||||
}
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
pair<PrimeIterator, bool> DbSlice::AddOrUpdate(const Context& cntx, string_view key, PrimeValue obj,
|
||||
uint64_t expire_at_ms) noexcept(false) {
|
||||
DbSlice::AddOrFindResult DbSlice::AddOrUpdate(const Context& cntx, string_view key, PrimeValue obj,
|
||||
uint64_t expire_at_ms) noexcept(false) {
|
||||
return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, true);
|
||||
}
|
||||
|
||||
pair<PrimeIterator, bool> DbSlice::AddOrSkip(const Context& cntx, string_view key, PrimeValue obj,
|
||||
uint64_t expire_at_ms) noexcept(false) {
|
||||
return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, false);
|
||||
}
|
||||
|
||||
size_t DbSlice::DbSize(DbIndex db_ind) const {
|
||||
DCHECK_LT(db_ind, db_array_size());
|
||||
|
||||
|
|
|
@ -66,6 +66,8 @@ class DbSlice {
|
|||
class AutoUpdater {
|
||||
public:
|
||||
AutoUpdater();
|
||||
AutoUpdater(const AutoUpdater& o) = delete;
|
||||
AutoUpdater& operator=(const AutoUpdater& o) = delete;
|
||||
AutoUpdater(AutoUpdater&& o);
|
||||
AutoUpdater& operator=(AutoUpdater&& o);
|
||||
~AutoUpdater();
|
||||
|
@ -186,10 +188,6 @@ class DbSlice {
|
|||
return ExpirePeriod{time_ms - expire_base_[0]};
|
||||
}
|
||||
|
||||
// TODO(#2252): Remove this in favor of FindMutable() / FindReadOnly()
|
||||
OpResult<PrimeIterator> Find(const Context& cntx, std::string_view key,
|
||||
unsigned req_obj_type) const;
|
||||
|
||||
struct ItAndUpdater {
|
||||
PrimeIterator it;
|
||||
AutoUpdater post_updater;
|
||||
|
@ -201,6 +199,7 @@ class DbSlice {
|
|||
unsigned req_obj_type) const;
|
||||
|
||||
// Returns (value, expire) dict entries if key exists, null if it does not exist or has expired.
|
||||
// TODO(#2252): Return AutoUpdater here as well
|
||||
std::pair<PrimeIterator, ExpireIterator> FindExt(const Context& cntx, std::string_view key) const;
|
||||
|
||||
// Returns (iterator, args-index) if found, KEY_NOTFOUND otherwise.
|
||||
|
@ -208,30 +207,26 @@ class DbSlice {
|
|||
OpResult<std::pair<PrimeIterator, unsigned>> FindFirst(const Context& cntx, ArgSlice args,
|
||||
int req_obj_type);
|
||||
|
||||
// Return .second=true if insertion occurred, false if we return the existing key.
|
||||
// throws: bad_alloc is insertion could not happen due to out of memory.
|
||||
std::pair<PrimeIterator, bool> AddOrFind(const Context& cntx,
|
||||
std::string_view key) noexcept(false);
|
||||
struct AddOrFindResult {
|
||||
PrimeIterator it;
|
||||
ExpireIterator exp_it;
|
||||
bool is_new = false;
|
||||
AutoUpdater post_updater;
|
||||
|
||||
std::tuple<PrimeIterator, ExpireIterator, bool> AddOrFind2(const Context& cntx,
|
||||
std::string_view key) noexcept(false);
|
||||
AddOrFindResult& operator=(ItAndUpdater&& o);
|
||||
};
|
||||
|
||||
AddOrFindResult AddOrFind(const Context& cntx, std::string_view key) noexcept(false);
|
||||
|
||||
// Same as AddOrSkip, but overwrites in case entry exists.
|
||||
// Returns second=true if insertion took place.
|
||||
std::pair<PrimeIterator, bool> AddOrUpdate(const Context& cntx, std::string_view key,
|
||||
PrimeValue obj, uint64_t expire_at_ms) noexcept(false);
|
||||
|
||||
// Returns second=true if insertion took place, false otherwise.
|
||||
// expire_at_ms equal to 0 - means no expiry.
|
||||
// throws: bad_alloc is insertion could not happen due to out of memory.
|
||||
std::pair<PrimeIterator, bool> AddOrSkip(const Context& cntx, std::string_view key,
|
||||
PrimeValue obj, uint64_t expire_at_ms) noexcept(false);
|
||||
AddOrFindResult AddOrUpdate(const Context& cntx, std::string_view key, PrimeValue obj,
|
||||
uint64_t expire_at_ms) noexcept(false);
|
||||
|
||||
// Adds a new entry. Requires: key does not exist in this slice.
|
||||
// Returns the iterator to the newly added entry.
|
||||
// throws: bad_alloc is insertion could not happen due to out of memory.
|
||||
PrimeIterator AddNew(const Context& cntx, std::string_view key, PrimeValue obj,
|
||||
uint64_t expire_at_ms) noexcept(false);
|
||||
ItAndUpdater AddNew(const Context& cntx, std::string_view key, PrimeValue obj,
|
||||
uint64_t expire_at_ms) noexcept(false);
|
||||
|
||||
// Update entry expiration. Return epxiration timepoint in abs milliseconds, or -1 if the entry
|
||||
// already expired and was deleted;
|
||||
|
@ -393,9 +388,11 @@ class DbSlice {
|
|||
void ReleaseNormalized(IntentLock::Mode m, DbIndex db_index, std::string_view key,
|
||||
unsigned count);
|
||||
|
||||
std::pair<PrimeIterator, bool> AddOrUpdateInternal(const Context& cntx, std::string_view key,
|
||||
PrimeValue obj, uint64_t expire_at_ms,
|
||||
bool force_update) noexcept(false);
|
||||
OpResult<PrimeIterator> Find(const Context& cntx, std::string_view key,
|
||||
unsigned req_obj_type) const;
|
||||
|
||||
AddOrFindResult AddOrUpdateInternal(const Context& cntx, std::string_view key, PrimeValue obj,
|
||||
uint64_t expire_at_ms, bool force_update) noexcept(false);
|
||||
|
||||
void FlushSlotsFb(const SlotSet& slot_ids);
|
||||
void FlushDbIndexes(const std::vector<DbIndex>& indexes);
|
||||
|
|
|
@ -538,17 +538,23 @@ TEST_F(DflyEngineTest, Bug496) {
|
|||
uint32_t cb_id =
|
||||
db.RegisterOnChange([&cb_hits](DbIndex, const DbSlice::ChangeReq&) { cb_hits++; });
|
||||
|
||||
auto [_, added] = db.AddOrFind({}, "key-1");
|
||||
EXPECT_TRUE(added);
|
||||
EXPECT_EQ(cb_hits, 1);
|
||||
{
|
||||
auto res = db.AddOrFind({}, "key-1");
|
||||
EXPECT_TRUE(res.is_new);
|
||||
EXPECT_EQ(cb_hits, 1);
|
||||
}
|
||||
|
||||
tie(_, added) = db.AddOrFind({}, "key-1");
|
||||
EXPECT_FALSE(added);
|
||||
EXPECT_EQ(cb_hits, 1);
|
||||
{
|
||||
auto res = db.AddOrFind({}, "key-1");
|
||||
EXPECT_FALSE(res.is_new);
|
||||
EXPECT_EQ(cb_hits, 2);
|
||||
}
|
||||
|
||||
tie(_, added) = db.AddOrFind({}, "key-2");
|
||||
EXPECT_TRUE(added);
|
||||
EXPECT_EQ(cb_hits, 2);
|
||||
{
|
||||
auto res = db.AddOrFind({}, "key-2");
|
||||
EXPECT_TRUE(res.is_new);
|
||||
EXPECT_EQ(cb_hits, 3);
|
||||
}
|
||||
|
||||
db.UnregisterOnChange(cb_id);
|
||||
});
|
||||
|
|
|
@ -168,8 +168,8 @@ bool RdbRestoreValue::Add(std::string_view data, std::string_view key, DbSlice&
|
|||
}
|
||||
|
||||
DbContext context{.db_index = index, .time_now_ms = GetCurrentTimeMs()};
|
||||
auto [it, added] = db_slice.AddOrSkip(context, key, std::move(pv), expire_ms);
|
||||
return added;
|
||||
db_slice.AddNew(context, key, std::move(pv), expire_ms);
|
||||
return true;
|
||||
}
|
||||
|
||||
class RestoreArgs {
|
||||
|
@ -380,6 +380,7 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) {
|
|||
auto& db_slice = es->db_slice();
|
||||
string_view dest_key = dest_res_.key;
|
||||
PrimeIterator dest_it = db_slice.FindExt(t->GetDbContext(), dest_key).first;
|
||||
DbSlice::AutoUpdater post_updater;
|
||||
bool is_prior_list = false;
|
||||
|
||||
if (IsValid(dest_it)) {
|
||||
|
@ -397,7 +398,10 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) {
|
|||
if (src_res_.ref_val.ObjType() == OBJ_STRING) {
|
||||
pv_.SetString(str_val_);
|
||||
}
|
||||
dest_it = db_slice.AddNew(t->GetDbContext(), dest_key, std::move(pv_), src_res_.expire_ts);
|
||||
auto add_res =
|
||||
db_slice.AddNew(t->GetDbContext(), dest_key, std::move(pv_), src_res_.expire_ts);
|
||||
dest_it = add_res.it;
|
||||
post_updater = std::move(add_res.post_updater);
|
||||
}
|
||||
|
||||
dest_it->first.SetSticky(src_res_.sticky);
|
||||
|
@ -1441,6 +1445,7 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from_key,
|
|||
return OpStatus::OK;
|
||||
|
||||
bool is_prior_list = false;
|
||||
DbSlice::AutoUpdater post_updater;
|
||||
auto [to_it, to_expire] = db_slice.FindExt(op_args.db_cntx, to_key);
|
||||
if (IsValid(to_it)) {
|
||||
if (skip_exists)
|
||||
|
@ -1472,7 +1477,9 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from_key,
|
|||
// On the other hand, AddNew does not rely on the iterators - this is why we keep
|
||||
// the value in `from_obj`.
|
||||
CHECK(db_slice.Del(op_args.db_cntx.db_index, from_it));
|
||||
to_it = db_slice.AddNew(op_args.db_cntx, to_key, std::move(from_obj), exp_ts);
|
||||
auto add_res = db_slice.AddNew(op_args.db_cntx, to_key, std::move(from_obj), exp_ts);
|
||||
to_it = add_res.it;
|
||||
post_updater = std::move(add_res.post_updater);
|
||||
}
|
||||
|
||||
to_it->first.SetSticky(sticky);
|
||||
|
@ -1529,10 +1536,10 @@ OpStatus GenericFamily::OpMove(const OpArgs& op_args, string_view key, DbIndex t
|
|||
from_it->second.SetExpire(IsValid(from_expire));
|
||||
|
||||
CHECK(db_slice.Del(op_args.db_cntx.db_index, from_it));
|
||||
to_it = db_slice.AddNew(target_cntx, key, std::move(from_obj), exp_ts);
|
||||
to_it->first.SetSticky(sticky);
|
||||
auto add_res = db_slice.AddNew(target_cntx, key, std::move(from_obj), exp_ts);
|
||||
add_res.it->first.SetSticky(sticky);
|
||||
|
||||
if (to_it->second.ObjType() == OBJ_LIST && op_args.shard->blocking_controller()) {
|
||||
if (add_res.it->second.ObjType() == OBJ_LIST && op_args.shard->blocking_controller()) {
|
||||
op_args.shard->blocking_controller()->AwakeWatched(target_db, key);
|
||||
}
|
||||
|
||||
|
|
|
@ -71,14 +71,14 @@ OpResult<int> AddToHll(const OpArgs& op_args, string_view key, CmdArgList values
|
|||
string hll;
|
||||
|
||||
try {
|
||||
auto [it, inserted] = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
if (inserted) {
|
||||
auto res = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
if (res.is_new) {
|
||||
hll.resize(getDenseHllSize());
|
||||
createDenseHll(StringToHllPtr(hll));
|
||||
} else if (it->second.ObjType() != OBJ_STRING) {
|
||||
} else if (res.it->second.ObjType() != OBJ_STRING) {
|
||||
return OpStatus::WRONG_TYPE;
|
||||
} else {
|
||||
it->second.GetString(&hll);
|
||||
res.it->second.GetString(&hll);
|
||||
ConvertToDenseIfNeeded(&hll);
|
||||
}
|
||||
|
||||
|
@ -91,9 +91,7 @@ OpResult<int> AddToHll(const OpArgs& op_args, string_view key, CmdArgList values
|
|||
updated += added;
|
||||
}
|
||||
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
|
||||
it->second.SetString(hll);
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !inserted);
|
||||
res.it->second.SetString(hll);
|
||||
|
||||
return std::min(updated, 1);
|
||||
} catch (const std::bad_alloc&) {
|
||||
|
@ -260,10 +258,8 @@ OpResult<int> PFMergeInternal(CmdArgList args, ConnectionContext* cntx) {
|
|||
string_view key = ArgS(args, 0);
|
||||
const OpArgs& op_args = t->GetOpArgs(shard);
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
auto [it, inserted] = db_slice.AddOrFind(t->GetDbContext(), key);
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
|
||||
it->second.SetString(hll);
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !inserted);
|
||||
auto res = db_slice.AddOrFind(t->GetDbContext(), key);
|
||||
res.it->second.SetString(hll);
|
||||
return OpStatus::OK;
|
||||
};
|
||||
trans->Execute(std::move(set_cb), true);
|
||||
|
|
|
@ -168,22 +168,21 @@ OpStatus IncrementValue(optional<string_view> prev_val, IncrByParam* param) {
|
|||
|
||||
OpStatus OpIncrBy(const OpArgs& op_args, string_view key, string_view field, IncrByParam* param) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
const auto [it, inserted] = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
auto add_res = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
|
||||
DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index);
|
||||
|
||||
size_t lpb = 0;
|
||||
|
||||
PrimeValue& pv = it->second;
|
||||
if (inserted) {
|
||||
PrimeValue& pv = add_res.it->second;
|
||||
if (add_res.is_new) {
|
||||
pv.InitRobj(OBJ_HASH, kEncodingListPack, lpNew(0));
|
||||
stats->listpack_blob_cnt++;
|
||||
} else {
|
||||
if (pv.ObjType() != OBJ_HASH)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, it->second);
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
|
||||
op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, add_res.it->second);
|
||||
|
||||
if (pv.Encoding() == kEncodingListPack) {
|
||||
uint8_t* lp = (uint8_t*)pv.RObjPtr();
|
||||
|
@ -205,7 +204,7 @@ OpStatus OpIncrBy(const OpArgs& op_args, string_view key, string_view field, Inc
|
|||
uint8_t* lp = (uint8_t*)pv.RObjPtr();
|
||||
optional<string_view> res;
|
||||
|
||||
if (!inserted)
|
||||
if (!add_res.is_new)
|
||||
res = LpFind(lp, field, intbuf);
|
||||
|
||||
OpStatus status = IncrementValue(res, param);
|
||||
|
@ -232,7 +231,7 @@ OpStatus OpIncrBy(const OpArgs& op_args, string_view key, string_view field, Inc
|
|||
StringMap* sm = GetStringMap(pv, op_args.db_cntx);
|
||||
|
||||
sds val = nullptr;
|
||||
if (!inserted) {
|
||||
if (!add_res.is_new) {
|
||||
auto it = sm->Find(field);
|
||||
if (it != sm->end()) {
|
||||
val = it->second;
|
||||
|
@ -262,7 +261,6 @@ OpStatus OpIncrBy(const OpArgs& op_args, string_view key, string_view field, Inc
|
|||
}
|
||||
}
|
||||
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);
|
||||
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv);
|
||||
|
||||
return OpStatus::OK;
|
||||
|
@ -617,7 +615,7 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
|
|||
VLOG(2) << "OpSet(" << key << ")";
|
||||
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
pair<PrimeIterator, bool> add_res;
|
||||
DbSlice::AddOrFindResult add_res;
|
||||
try {
|
||||
add_res = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
} catch (bad_alloc&) {
|
||||
|
@ -627,10 +625,10 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
|
|||
DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index);
|
||||
|
||||
uint8_t* lp = nullptr;
|
||||
PrimeIterator& it = add_res.first;
|
||||
PrimeIterator& it = add_res.it;
|
||||
PrimeValue& pv = it->second;
|
||||
|
||||
if (add_res.second) { // new key
|
||||
if (add_res.is_new) {
|
||||
if (op_sp.ttl == UINT32_MAX) {
|
||||
lp = lpNew(0);
|
||||
pv.InitRobj(OBJ_HASH, kEncodingListPack, lp);
|
||||
|
@ -646,7 +644,6 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
|
|||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, it->second);
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
|
||||
}
|
||||
|
||||
if (pv.Encoding() == kEncodingListPack) {
|
||||
|
@ -694,7 +691,6 @@ OpResult<uint32_t> OpSet(const OpArgs& op_args, string_view key, CmdArgList valu
|
|||
}
|
||||
}
|
||||
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);
|
||||
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, pv);
|
||||
|
||||
return created;
|
||||
|
|
|
@ -52,16 +52,13 @@ inline OpStatus JsonReplaceVerifyNoOp(JsonType&) {
|
|||
|
||||
void SetJson(const OpArgs& op_args, string_view key, JsonType&& value) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
DbIndex db_index = op_args.db_cntx.db_index;
|
||||
auto [it_output, added] = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
auto res = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
|
||||
op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, it_output->second);
|
||||
db_slice.PreUpdate(db_index, it_output);
|
||||
op_args.shard->search_indices()->RemoveDoc(key, op_args.db_cntx, res.it->second);
|
||||
|
||||
it_output->second.SetJson(std::move(value));
|
||||
res.it->second.SetJson(std::move(value));
|
||||
|
||||
db_slice.PostUpdate(db_index, it_output, key);
|
||||
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, it_output->second);
|
||||
op_args.shard->search_indices()->AddDoc(key, op_args.db_cntx, res.it->second);
|
||||
}
|
||||
|
||||
string JsonTypeToName(const JsonType& val) {
|
||||
|
|
|
@ -246,32 +246,30 @@ OpResult<string> OpMoveSingleShard(const OpArgs& op_args, string_view src, strin
|
|||
}
|
||||
|
||||
quicklist* dest_ql = nullptr;
|
||||
PrimeIterator dest_it;
|
||||
bool new_key = false;
|
||||
src_res->post_updater.Run();
|
||||
DbSlice::AddOrFindResult dest_res;
|
||||
try {
|
||||
src_res->post_updater.Run();
|
||||
tie(dest_it, new_key) = db_slice.AddOrFind(op_args.db_cntx, dest);
|
||||
dest_res = db_slice.AddOrFind(op_args.db_cntx, dest);
|
||||
} catch (bad_alloc&) {
|
||||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
if (new_key) {
|
||||
// Insertion of dest could invalidate src_it. Find it again.
|
||||
src_res = db_slice.FindMutable(op_args.db_cntx, src, OBJ_LIST);
|
||||
src_it = src_res->it;
|
||||
|
||||
if (dest_res.is_new) {
|
||||
robj* obj = createQuicklistObject();
|
||||
dest_ql = (quicklist*)obj->ptr;
|
||||
quicklistSetOptions(dest_ql, GetFlag(FLAGS_list_max_listpack_size),
|
||||
GetFlag(FLAGS_list_compress_depth));
|
||||
dest_it->second.ImportRObj(obj);
|
||||
|
||||
// Insertion of dest could invalidate src_it. Find it again.
|
||||
src_res = db_slice.FindMutable(op_args.db_cntx, src, OBJ_LIST);
|
||||
src_it = src_res->it;
|
||||
dest_res.it->second.ImportRObj(obj);
|
||||
DCHECK(IsValid(src_it));
|
||||
} else {
|
||||
if (dest_it->second.ObjType() != OBJ_LIST)
|
||||
if (dest_res.it->second.ObjType() != OBJ_LIST)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
dest_ql = GetQL(dest_it->second);
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, dest_it);
|
||||
dest_ql = GetQL(dest_res.it->second);
|
||||
}
|
||||
|
||||
string val = ListPop(src_dir, src_ql);
|
||||
|
@ -279,7 +277,7 @@ OpResult<string> OpMoveSingleShard(const OpArgs& op_args, string_view src, strin
|
|||
quicklistPush(dest_ql, val.data(), val.size(), pos);
|
||||
|
||||
src_res->post_updater.Run();
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, dest_it, dest, !new_key);
|
||||
dest_res.post_updater.Run();
|
||||
|
||||
if (quicklistCount(src_ql) == 0) {
|
||||
CHECK(db_slice.Del(op_args.db_cntx.db_index, src_it));
|
||||
|
@ -315,37 +313,34 @@ OpResult<string> Peek(const OpArgs& op_args, string_view key, ListDir dir, bool
|
|||
OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir dir,
|
||||
bool skip_notexist, ArgSlice vals, bool journal_rewrite) {
|
||||
EngineShard* es = op_args.shard;
|
||||
PrimeIterator it;
|
||||
bool new_key = false;
|
||||
DbSlice::AddOrFindResult res;
|
||||
|
||||
if (skip_notexist) {
|
||||
// TODO(#2252): Move to FindMutable() once AddOrFindMutable() is implemented
|
||||
auto it_res = es->db_slice().Find(op_args.db_cntx, key, OBJ_LIST);
|
||||
if (!it_res)
|
||||
auto tmp_res = es->db_slice().FindMutable(op_args.db_cntx, key, OBJ_LIST);
|
||||
if (!tmp_res)
|
||||
return 0; // Redis returns 0 for nonexisting keys for the *PUSHX actions.
|
||||
it = *it_res;
|
||||
res = std::move(*tmp_res);
|
||||
} else {
|
||||
try {
|
||||
tie(it, new_key) = es->db_slice().AddOrFind(op_args.db_cntx, key);
|
||||
res = es->db_slice().AddOrFind(op_args.db_cntx, key);
|
||||
} catch (bad_alloc&) {
|
||||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
}
|
||||
|
||||
quicklist* ql = nullptr;
|
||||
DVLOG(1) << "OpPush " << key << " new_key " << new_key;
|
||||
DVLOG(1) << "OpPush " << key << " new_key " << res.is_new;
|
||||
|
||||
if (new_key) {
|
||||
if (res.is_new) {
|
||||
robj* o = createQuicklistObject();
|
||||
ql = (quicklist*)o->ptr;
|
||||
quicklistSetOptions(ql, GetFlag(FLAGS_list_max_listpack_size),
|
||||
GetFlag(FLAGS_list_compress_depth));
|
||||
it->second.ImportRObj(o);
|
||||
res.it->second.ImportRObj(o);
|
||||
} else {
|
||||
if (it->second.ObjType() != OBJ_LIST)
|
||||
if (res.it->second.ObjType() != OBJ_LIST)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
es->db_slice().PreUpdate(op_args.db_cntx.db_index, it);
|
||||
ql = GetQL(it->second);
|
||||
ql = GetQL(res.it->second);
|
||||
}
|
||||
|
||||
// Left push is LIST_HEAD.
|
||||
|
@ -356,10 +351,10 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
|
|||
quicklistPush(ql, es->tmp_str1, sdslen(es->tmp_str1), pos);
|
||||
}
|
||||
|
||||
if (new_key) {
|
||||
if (res.is_new) {
|
||||
if (es->blocking_controller()) {
|
||||
string tmp;
|
||||
string_view key = it->first.GetSlice(&tmp);
|
||||
string_view key = res.it->first.GetSlice(&tmp);
|
||||
|
||||
es->blocking_controller()->AwakeWatched(op_args.db_cntx.db_index, key);
|
||||
absl::StrAppend(debugMessages.Next(), "OpPush AwakeWatched: ", key, " by ",
|
||||
|
@ -367,8 +362,6 @@ OpResult<uint32_t> OpPush(const OpArgs& op_args, std::string_view key, ListDir d
|
|||
}
|
||||
}
|
||||
|
||||
es->db_slice().PostUpdate(op_args.db_cntx.db_index, it, key, !new_key);
|
||||
|
||||
if (journal_rewrite && op_args.shard->journal()) {
|
||||
string command = dir == ListDir::LEFT ? "LPUSH" : "RPUSH";
|
||||
vector<string_view> mapped(vals.size() + 1);
|
||||
|
|
|
@ -2364,9 +2364,9 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
|
|||
continue;
|
||||
|
||||
try {
|
||||
auto [it, added] = db_slice.AddOrUpdate(db_cntx, item->key, std::move(pv), item->expire_ms);
|
||||
it->first.SetSticky(item->is_sticky);
|
||||
if (!added) {
|
||||
auto res = db_slice.AddOrUpdate(db_cntx, item->key, std::move(pv), item->expire_ms);
|
||||
res.it->first.SetSticky(item->is_sticky);
|
||||
if (!res.is_new) {
|
||||
LOG(WARNING) << "RDB has duplicated key '" << item->key << "' in DB " << db_ind;
|
||||
}
|
||||
} catch (const std::bad_alloc&) {
|
||||
|
|
|
@ -567,27 +567,23 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
|
|||
return 0;
|
||||
}
|
||||
|
||||
PrimeIterator it;
|
||||
bool new_key = false;
|
||||
DbSlice::AddOrFindResult add_res;
|
||||
|
||||
try {
|
||||
tie(it, new_key) = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
add_res = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
} catch (bad_alloc& e) {
|
||||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
CompactObj& co = it->second;
|
||||
CompactObj& co = add_res.it->second;
|
||||
|
||||
if (!new_key) {
|
||||
if (!add_res.is_new) {
|
||||
// for non-overwrite case it must be set.
|
||||
if (!overwrite && co.ObjType() != OBJ_SET)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
// Update stats and trigger any handle the old value if needed.
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
|
||||
}
|
||||
|
||||
if (new_key || overwrite) {
|
||||
if (add_res.is_new || overwrite) {
|
||||
// does not store the values, merely sets the encoding.
|
||||
// TODO: why not store the values as well?
|
||||
InitSet(vals, &co);
|
||||
|
@ -633,7 +629,6 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
|
|||
res = AddStrSet(op_args.db_cntx, vals, UINT32_MAX, &co);
|
||||
}
|
||||
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !new_key);
|
||||
if (journal_update && op_args.shard->journal()) {
|
||||
if (overwrite) {
|
||||
RecordJournal(op_args, "DEL"sv, ArgSlice{key});
|
||||
|
@ -651,18 +646,17 @@ OpResult<uint32_t> OpAddEx(const OpArgs& op_args, string_view key, uint32_t ttl_
|
|||
auto* es = op_args.shard;
|
||||
auto& db_slice = es->db_slice();
|
||||
|
||||
PrimeIterator it;
|
||||
bool new_key = false;
|
||||
DbSlice::AddOrFindResult add_res;
|
||||
|
||||
try {
|
||||
tie(it, new_key) = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
add_res = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
} catch (bad_alloc& e) {
|
||||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
CompactObj& co = it->second;
|
||||
CompactObj& co = add_res.it->second;
|
||||
|
||||
if (new_key) {
|
||||
if (add_res.is_new) {
|
||||
CHECK(absl::GetFlag(FLAGS_use_set2));
|
||||
InitStrSet(&co);
|
||||
} else {
|
||||
|
@ -671,7 +665,6 @@ OpResult<uint32_t> OpAddEx(const OpArgs& op_args, string_view key, uint32_t ttl_
|
|||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
// Update stats and trigger any handle the old value if needed.
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
|
||||
if (co.Encoding() == kEncodingIntSet) {
|
||||
intset* is = (intset*)co.RObjPtr();
|
||||
robj tmp;
|
||||
|
@ -686,8 +679,6 @@ OpResult<uint32_t> OpAddEx(const OpArgs& op_args, string_view key, uint32_t ttl_
|
|||
|
||||
uint32_t res = AddStrSet(op_args.db_cntx, std::move(vals), ttl_sec, &co);
|
||||
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !new_key);
|
||||
|
||||
return res;
|
||||
}
|
||||
|
||||
|
|
|
@ -607,16 +607,14 @@ int StreamTrim(const AddTrimOpts& opts, stream* s) {
|
|||
OpResult<streamID> OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgList args) {
|
||||
DCHECK(!args.empty() && args.size() % 2 == 0);
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
pair<PrimeIterator, bool> add_res;
|
||||
DbSlice::AddOrFindResult add_res;
|
||||
|
||||
if (opts.no_mkstream) {
|
||||
// TODO(#2252): Replace with FindMutable() once AddOrFindMutable() is implemented
|
||||
auto res_it = db_slice.Find(op_args.db_cntx, opts.key, OBJ_STREAM);
|
||||
auto res_it = db_slice.FindMutable(op_args.db_cntx, opts.key, OBJ_STREAM);
|
||||
if (!res_it) {
|
||||
return res_it.status();
|
||||
}
|
||||
add_res.first = res_it.value();
|
||||
add_res.second = false;
|
||||
add_res = std::move(*res_it);
|
||||
} else {
|
||||
try {
|
||||
add_res = db_slice.AddOrFind(op_args.db_cntx, opts.key);
|
||||
|
@ -626,17 +624,14 @@ OpResult<streamID> OpAdd(const OpArgs& op_args, const AddTrimOpts& opts, CmdArgL
|
|||
}
|
||||
|
||||
robj* stream_obj = nullptr;
|
||||
PrimeIterator& it = add_res.first;
|
||||
PrimeIterator& it = add_res.it;
|
||||
|
||||
if (add_res.second) { // new key
|
||||
if (add_res.is_new) {
|
||||
stream_obj = createStreamObject();
|
||||
|
||||
it->second.ImportRObj(stream_obj);
|
||||
} else {
|
||||
if (it->second.ObjType() != OBJ_STREAM)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
|
||||
} else if (it->second.ObjType() != OBJ_STREAM) {
|
||||
return OpStatus::WRONG_TYPE;
|
||||
}
|
||||
|
||||
stream* stream_inst = (stream*)it->second.RObjPtr();
|
||||
|
@ -1128,8 +1123,7 @@ struct CreateOpts {
|
|||
OpStatus OpCreate(const OpArgs& op_args, string_view key, const CreateOpts& opts) {
|
||||
auto* shard = op_args.shard;
|
||||
auto& db_slice = shard->db_slice();
|
||||
// TODO(#2252): Replace with FindMutable() once new AddNew() is implemented
|
||||
OpResult<PrimeIterator> res_it = db_slice.Find(op_args.db_cntx, key, OBJ_STREAM);
|
||||
auto res_it = db_slice.FindMutable(op_args.db_cntx, key, OBJ_STREAM);
|
||||
int64_t entries_read = SCG_INVALID_ENTRIES_READ;
|
||||
if (!res_it) {
|
||||
if (opts.flags & kCreateOptMkstream) {
|
||||
|
@ -1139,13 +1133,13 @@ OpStatus OpCreate(const OpArgs& op_args, string_view key, const CreateOpts& opts
|
|||
return res_it.status();
|
||||
|
||||
robj* stream_obj = createStreamObject();
|
||||
(*res_it)->second.ImportRObj(stream_obj);
|
||||
res_it->it->second.ImportRObj(stream_obj);
|
||||
} else {
|
||||
return res_it.status();
|
||||
}
|
||||
}
|
||||
|
||||
CompactObj& cobj = (*res_it)->second;
|
||||
CompactObj& cobj = res_it->it->second;
|
||||
stream* s = (stream*)cobj.RObjPtr();
|
||||
|
||||
streamID id;
|
||||
|
|
|
@ -88,28 +88,25 @@ OpResult<uint32_t> OpSetRange(const OpArgs& op_args, string_view key, size_t sta
|
|||
}
|
||||
}
|
||||
|
||||
auto [it, added] = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
auto res = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
|
||||
string s;
|
||||
|
||||
if (added) {
|
||||
if (res.is_new) {
|
||||
s.resize(range_len);
|
||||
} else {
|
||||
if (it->second.ObjType() != OBJ_STRING)
|
||||
if (res.it->second.ObjType() != OBJ_STRING)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
s = GetString(op_args.shard, it->second);
|
||||
s = GetString(op_args.shard, res.it->second);
|
||||
if (s.size() < range_len)
|
||||
s.resize(range_len);
|
||||
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
|
||||
}
|
||||
|
||||
memcpy(s.data() + start, value.data(), value.size());
|
||||
it->second.SetString(s);
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !added);
|
||||
res.it->second.SetString(s);
|
||||
|
||||
return it->second.Size();
|
||||
return res.it->second.Size();
|
||||
}
|
||||
|
||||
OpResult<string> OpGetRange(const OpArgs& op_args, string_view key, int32_t start, int32_t end) {
|
||||
|
@ -164,22 +161,16 @@ OpResult<uint32_t> ExtendOrSet(const OpArgs& op_args, string_view key, string_vi
|
|||
bool prepend) {
|
||||
auto* shard = op_args.shard;
|
||||
auto& db_slice = shard->db_slice();
|
||||
auto [it, inserted] = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
if (inserted) {
|
||||
it->second.SetString(val);
|
||||
// TODO(#2252): We currently only call PostUpdate() (no PreUpdate()), make sure this is fixed
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, false);
|
||||
|
||||
auto add_res = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
if (add_res.is_new) {
|
||||
add_res.it->second.SetString(val);
|
||||
return val.size();
|
||||
}
|
||||
|
||||
if (it->second.ObjType() != OBJ_STRING)
|
||||
if (add_res.it->second.ObjType() != OBJ_STRING)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
|
||||
size_t res = ExtendExisting(op_args, it, key, val, prepend);
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, true);
|
||||
return res;
|
||||
return ExtendExisting(op_args, add_res.it, key, val, prepend);
|
||||
}
|
||||
|
||||
OpResult<bool> ExtendOrSkip(const OpArgs& op_args, string_view key, string_view val, bool prepend) {
|
||||
|
@ -232,26 +223,25 @@ OpResult<string> OpGet(const OpArgs& op_args, string_view key, bool del_hit = fa
|
|||
|
||||
OpResult<double> OpIncrFloat(const OpArgs& op_args, string_view key, double val) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
auto [it, inserted] = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
auto add_res = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
|
||||
char buf[128];
|
||||
|
||||
if (inserted) {
|
||||
if (add_res.is_new) {
|
||||
char* str = RedisReplyBuilder::FormatDouble(val, buf, sizeof(buf));
|
||||
it->second.SetString(str);
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, false);
|
||||
add_res.it->second.SetString(str);
|
||||
|
||||
return val;
|
||||
}
|
||||
|
||||
if (it->second.ObjType() != OBJ_STRING)
|
||||
if (add_res.it->second.ObjType() != OBJ_STRING)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
|
||||
if (it->second.Size() == 0)
|
||||
if (add_res.it->second.Size() == 0)
|
||||
return OpStatus::INVALID_FLOAT;
|
||||
|
||||
string tmp;
|
||||
string_view slice = GetSlice(op_args.shard, it->second, &tmp);
|
||||
string_view slice = GetSlice(op_args.shard, add_res.it->second, &tmp);
|
||||
|
||||
double base = 0;
|
||||
if (!ParseDouble(slice, &base)) {
|
||||
|
@ -266,9 +256,7 @@ OpResult<double> OpIncrFloat(const OpArgs& op_args, string_view key, double val)
|
|||
|
||||
char* str = RedisReplyBuilder::FormatDouble(base, buf, sizeof(buf));
|
||||
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
|
||||
it->second.SetString(str);
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, true);
|
||||
add_res.it->second.SetString(str);
|
||||
|
||||
return base;
|
||||
}
|
||||
|
@ -280,6 +268,7 @@ OpResult<int64_t> OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr,
|
|||
|
||||
// we avoid using AddOrFind because of skip_on_missing option for memcache.
|
||||
auto [it, expire_it] = db_slice.FindExt(op_args.db_cntx, key);
|
||||
DbSlice::AutoUpdater post_updater;
|
||||
|
||||
if (!IsValid(it)) {
|
||||
if (skip_on_missing)
|
||||
|
@ -290,7 +279,9 @@ OpResult<int64_t> OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr,
|
|||
|
||||
// AddNew calls PostUpdate inside.
|
||||
try {
|
||||
it = db_slice.AddNew(op_args.db_cntx, key, std::move(cobj), 0);
|
||||
auto add_res = db_slice.AddNew(op_args.db_cntx, key, std::move(cobj), 0);
|
||||
it = add_res.it;
|
||||
post_updater = std::move(add_res.post_updater);
|
||||
} catch (bad_alloc&) {
|
||||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -315,9 +306,7 @@ OpResult<int64_t> OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr,
|
|||
|
||||
int64_t new_val = prev + incr;
|
||||
DCHECK(!it->second.IsExternal());
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
|
||||
it->second.SetInt(new_val);
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);
|
||||
|
||||
return new_val;
|
||||
}
|
||||
|
@ -466,7 +455,7 @@ OpResult<array<int64_t, 5>> OpThrottle(const OpArgs& op_args, const string_view
|
|||
|
||||
// AddNew calls PostUpdate inside.
|
||||
try {
|
||||
it = db_slice.AddNew(op_args.db_cntx, key, std::move(cobj), new_tat_ms);
|
||||
db_slice.AddNew(op_args.db_cntx, key, std::move(cobj), new_tat_ms);
|
||||
} catch (bad_alloc&) {
|
||||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
|
@ -585,24 +574,23 @@ OpResult<optional<string>> SetCmd::Set(const SetParams& params, string_view key,
|
|||
// At this point we either need to add missing entry, or we
|
||||
// will override an existing one
|
||||
// Trying to add a new entry.
|
||||
tuple<PrimeIterator, ExpireIterator, bool> add_res;
|
||||
DbSlice::AddOrFindResult add_res;
|
||||
try {
|
||||
add_res = db_slice.AddOrFind2(op_args_.db_cntx, key);
|
||||
add_res = db_slice.AddOrFind(op_args_.db_cntx, key);
|
||||
} catch (bad_alloc& e) {
|
||||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
PrimeIterator it = get<0>(add_res);
|
||||
if (!get<2>(add_res)) { // Existing.
|
||||
PrimeIterator it = add_res.it;
|
||||
if (!add_res.is_new) {
|
||||
result_builder.CachePrevValueIfNeeded(shard, it->second);
|
||||
return std::move(result_builder).Return(SetExisting(params, it, get<1>(add_res), key, value));
|
||||
return std::move(result_builder).Return(SetExisting(params, it, add_res.exp_it, key, value));
|
||||
}
|
||||
|
||||
// Adding new value.
|
||||
PrimeValue tvalue{value};
|
||||
tvalue.SetFlag(params.memcache_flags != 0);
|
||||
it->second = std::move(tvalue);
|
||||
db_slice.PostUpdate(op_args_.db_cntx.db_index, it, key, false);
|
||||
|
||||
if (params.expire_after_ms) {
|
||||
db_slice.AddExpire(op_args_.db_cntx.db_index, it,
|
||||
|
@ -668,8 +656,6 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt
|
|||
it->first.SetSticky(true);
|
||||
}
|
||||
|
||||
db_slice.PreUpdate(op_args_.db_cntx.db_index, it);
|
||||
|
||||
// Check whether we need to update flags table.
|
||||
bool req_flag_update = (params.memcache_flags != 0) != prime_value.HasFlag();
|
||||
if (req_flag_update) {
|
||||
|
@ -690,8 +676,6 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt
|
|||
}
|
||||
}
|
||||
|
||||
db_slice.PostUpdate(op_args_.db_cntx.db_index, it, key);
|
||||
|
||||
if (manual_journal_ && op_args_.shard->journal()) {
|
||||
RecordJournal(params, key, value);
|
||||
}
|
||||
|
|
|
@ -182,15 +182,14 @@ void OutputScoredArrayResult(const OpResult<ScoredArray>& result,
|
|||
rb->SendScoredArray(result.value(), params.with_scores);
|
||||
}
|
||||
|
||||
OpResult<PrimeIterator> FindZEntry(const ZParams& zparams, const OpArgs& op_args, string_view key,
|
||||
size_t member_len) {
|
||||
OpResult<DbSlice::ItAndUpdater> FindZEntry(const ZParams& zparams, const OpArgs& op_args,
|
||||
string_view key, size_t member_len) {
|
||||
auto& db_slice = op_args.shard->db_slice();
|
||||
if (zparams.flags & ZADD_IN_XX) {
|
||||
// TODO(#2252): Replace once AddOrFindMutable() exists
|
||||
return db_slice.Find(op_args.db_cntx, key, OBJ_ZSET);
|
||||
return db_slice.FindMutable(op_args.db_cntx, key, OBJ_ZSET);
|
||||
}
|
||||
|
||||
pair<PrimeIterator, bool> add_res;
|
||||
DbSlice::AddOrFindResult add_res;
|
||||
|
||||
try {
|
||||
add_res = db_slice.AddOrFind(op_args.db_cntx, key);
|
||||
|
@ -198,10 +197,10 @@ OpResult<PrimeIterator> FindZEntry(const ZParams& zparams, const OpArgs& op_args
|
|||
return OpStatus::OUT_OF_MEMORY;
|
||||
}
|
||||
|
||||
PrimeIterator& it = add_res.first;
|
||||
PrimeIterator& it = add_res.it;
|
||||
PrimeValue& pv = it->second;
|
||||
DbTableStats* stats = db_slice.MutableStats(op_args.db_cntx.db_index);
|
||||
if (add_res.second || zparams.override) {
|
||||
if (add_res.is_new || zparams.override) {
|
||||
if (member_len > server.max_map_field_len) {
|
||||
detail::SortedMap* zs = new detail::SortedMap(CompactObj::memory_resource());
|
||||
pv.InitRobj(OBJ_ZSET, OBJ_ENCODING_SKIPLIST, zs);
|
||||
|
@ -210,23 +209,18 @@ OpResult<PrimeIterator> FindZEntry(const ZParams& zparams, const OpArgs& op_args
|
|||
pv.InitRobj(OBJ_ZSET, OBJ_ENCODING_LISTPACK, lp);
|
||||
stats->listpack_blob_cnt++;
|
||||
}
|
||||
|
||||
if (!add_res.second) {
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
|
||||
}
|
||||
} else {
|
||||
if (it->second.ObjType() != OBJ_ZSET)
|
||||
return OpStatus::WRONG_TYPE;
|
||||
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
|
||||
}
|
||||
|
||||
if (add_res.second && op_args.shard->blocking_controller()) {
|
||||
if (add_res.is_new && op_args.shard->blocking_controller()) {
|
||||
string tmp;
|
||||
string_view key = it->first.GetSlice(&tmp);
|
||||
op_args.shard->blocking_controller()->AwakeWatched(op_args.db_cntx.db_index, key);
|
||||
}
|
||||
|
||||
return it;
|
||||
return DbSlice::ItAndUpdater{add_res.it, std::move(add_res.post_updater)};
|
||||
}
|
||||
|
||||
bool ScoreToLongLat(const std::optional<double>& val, double* xy) {
|
||||
|
@ -978,7 +972,7 @@ OpResult<AddResult> OpAdd(const OpArgs& op_args, const ZParams& zparams, string_
|
|||
size_t field_len = members.size() > server.zset_max_listpack_entries
|
||||
? UINT32_MAX
|
||||
: members.front().second.size();
|
||||
OpResult<PrimeIterator> res_it = FindZEntry(zparams, op_args, key, field_len);
|
||||
auto res_it = FindZEntry(zparams, op_args, key, field_len);
|
||||
|
||||
if (!res_it)
|
||||
return res_it.status();
|
||||
|
@ -992,7 +986,7 @@ OpResult<AddResult> OpAdd(const OpArgs& op_args, const ZParams& zparams, string_
|
|||
|
||||
OpStatus op_status = OpStatus::OK;
|
||||
AddResult aresult;
|
||||
detail::RobjWrapper* robj_wrapper = res_it.value()->second.GetRobjWrapper();
|
||||
detail::RobjWrapper* robj_wrapper = res_it->it->second.GetRobjWrapper();
|
||||
bool is_list_pack = robj_wrapper->encoding() == OBJ_ENCODING_LISTPACK;
|
||||
|
||||
// opportunistically reserve space if multiple entries are about to be added.
|
||||
|
@ -1042,8 +1036,6 @@ OpResult<AddResult> OpAdd(const OpArgs& op_args, const ZParams& zparams, string_
|
|||
--stats->listpack_blob_cnt;
|
||||
}
|
||||
|
||||
op_args.shard->db_slice().PostUpdate(op_args.db_cntx.db_index, *res_it, key);
|
||||
|
||||
if (zparams.flags & ZADD_IN_INCR) {
|
||||
aresult.new_score = new_score;
|
||||
} else {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue