diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 5a46beaac..e071b8991 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -5,8 +5,8 @@ #include "server/engine_shard_set.h" extern "C" { -#include "redis/zmalloc.h" #include "redis/object.h" +#include "redis/zmalloc.h" } #include "base/logging.h" @@ -311,16 +311,16 @@ void EngineShard::ProcessAwakened(Transaction* completed_t) { DCHECK(!queue.empty()); // since it's active if (queue.front().trans == completed_t) { - queue.pop_front(); - - while (!queue.empty()) { + do { const WatchItem& bi = queue.front(); Transaction* head = bi.trans.get(); - if (head->NotifySuspended(wq.notify_txid, shard_id())) + // if a transaction blpops on the same key multiple times it will appear here + // here several times as well, hence we check != completed_t. + if (head != completed_t && head->NotifySuspended(wq.notify_txid, shard_id())) break; queue.pop_front(); - } + } while (!queue.empty()); if (queue.empty()) { wt.RemoveEntry(w_it); @@ -512,8 +512,7 @@ void EngineShard::CacheStats() { mi_stats_merge(); size_t used_mem = UsedMemory(); - cached_stats[db_slice_.shard_id()].used_memory.store(used_mem, - memory_order_relaxed); + cached_stats[db_slice_.shard_id()].used_memory.store(used_mem, memory_order_relaxed); } size_t EngineShard::UsedMemory() const { diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index cf3a05694..2b9575180 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -31,105 +31,116 @@ class Renamer { Renamer(DbIndex dind, ShardId source_id) : db_indx_(dind), src_sid_(source_id) { } - OpResult Find(ShardId shard_id, const ArgSlice& args); + void Find(Transaction* t); OpResult status() const { return status_; }; - Transaction::RunnableType Finalize(bool skip_exist_dest); + void Finalize(Transaction* t, bool skip_exist_dest); private: - void MoveValues(EngineShard* shard, const ArgSlice& args); - DbIndex db_indx_; ShardId src_sid_; struct FindResult { string_view key; - PrimeValue val; + PrimeValue ref_val; uint64_t expire_ts; bool found = false; }; FindResult src_res_, dest_res_; // index 0 for source, 1 for destination - OpResult status_; }; -OpResult Renamer::Find(ShardId shard_id, const ArgSlice& args) { - CHECK_EQ(1u, args.size()); - FindResult* res = (shard_id == src_sid_) ? &src_res_ : &dest_res_; +void Renamer::Find(Transaction* t) { + auto cb = [this](Transaction* t, EngineShard* shard) { + auto args = t->ShardArgsInShard(shard->shard_id()); + CHECK_EQ(1u, args.size()); - res->key = args.front(); - auto& db_slice = EngineShard::tlocal()->db_slice(); - auto [it, exp_it] = db_slice.FindExt(db_indx_, res->key); + FindResult* res = (shard->shard_id() == src_sid_) ? &src_res_ : &dest_res_; - res->found = IsValid(it); - if (IsValid(it)) { - res->val = it->second.AsRef(); - res->expire_ts = IsValid(exp_it) ? db_slice.expire_base() + exp_it->second.duration() : 0; - } + res->key = args.front(); + auto& db_slice = EngineShard::tlocal()->db_slice(); + auto [it, exp_it] = db_slice.FindExt(db_indx_, res->key); - return OpStatus::OK; + res->found = IsValid(it); + if (IsValid(it)) { + res->ref_val = it->second.AsRef(); + res->expire_ts = IsValid(exp_it) ? db_slice.expire_base() + exp_it->second.duration() : 0; + } + return OpStatus::OK; + }; + + t->Execute(move(cb), false); }; -void Renamer::MoveValues(EngineShard* shard, const ArgSlice& args) { - auto shard_id = shard->shard_id(); - - // TODO: when we want to maintain heap per shard model this code will require additional - // work - if (shard_id == src_sid_) { // Handle source key. - // delete the source entry. - auto it = shard->db_slice().FindExt(db_indx_, src_res_.key).first; - CHECK(shard->db_slice().Del(db_indx_, it)); - return; - } - - // Handle destination - string_view dest_key = dest_res_.key; - PrimeIterator dest_it = shard->db_slice().FindExt(db_indx_, dest_key).first; - if (IsValid(dest_it)) { - // we just move the source. We won't be able to do it with heap per shard model. - dest_it->second = std::move(src_res_.val); - shard->db_slice().Expire(db_indx_, dest_it, src_res_.expire_ts); - } else { - // we just add the key to destination with the source object. - shard->db_slice().AddNew(db_indx_, dest_key, std::move(src_res_.val), src_res_.expire_ts); - } -} - -Transaction::RunnableType Renamer::Finalize(bool skip_exist_dest) { +void Renamer::Finalize(Transaction* t, bool skip_exist_dest) { auto cleanup = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; if (!src_res_.found) { status_ = OpStatus::KEY_NOTFOUND; - return cleanup; + t->Execute(move(cleanup), true); + return; } if (dest_res_.found && skip_exist_dest) { status_ = OpStatus::KEY_EXISTS; - return cleanup; + t->Execute(move(cleanup), true); + return; } - DCHECK(src_res_.val.IsRef()); + DCHECK(src_res_.ref_val.IsRef()); - // We can not copy from the existing value and delete it at the same time. - // TODO: if we want to allocate in shard, we must implement CompactObject::Clone. - // For now we hack COW for strings. - string val; - src_res_.val.GetString(&val); - src_res_.val.Reset(); - src_res_.val.SetString(val); + PrimeValue pv; + string str_val; + + auto move_src = [&](Transaction* t, EngineShard* shard) { + if (shard->shard_id() == src_sid_) { // Handle source key. + // TODO: to call PreUpdate/PostUpdate. + auto it = shard->db_slice().FindExt(db_indx_, src_res_.key).first; + CHECK(IsValid(it)); + + if (it->second.ObjType() == OBJ_STRING) { + it->second.GetString(&str_val); + } else { + pv = std::move(it->second); + } + CHECK(shard->db_slice().Del(db_indx_, it)); // delete the entry with empty value in it. + } + return OpStatus::OK; + }; + + t->Execute(move(move_src), false); // Src key exist and we need to override the destination. - return [this](Transaction* t, EngineShard* shard) { - this->MoveValues(shard, t->ShardArgsInShard(shard->shard_id())); + auto set_dest = [&](Transaction* t, EngineShard* shard) { + if (shard->shard_id() != src_sid_) { + auto& db_slice = shard->db_slice(); + string_view dest_key = dest_res_.key; + PrimeIterator dest_it = db_slice.FindExt(db_indx_, dest_key).first; + if (IsValid(dest_it)) { + if (src_res_.ref_val.ObjType() == OBJ_STRING) { + dest_it->second.SetString(str_val); + } else { + dest_it->second = std::move(pv); + } + db_slice.Expire(db_indx_, dest_it, src_res_.expire_ts); + } else { + if (src_res_.ref_val.ObjType() == OBJ_STRING) { + db_slice.AddNew(db_indx_, dest_key, PrimeValue{str_val}, src_res_.expire_ts); + } else { + db_slice.AddNew(db_indx_, dest_key, std::move(pv), src_res_.expire_ts); + } + } + } return OpStatus::OK; }; + t->Execute(move(set_dest), true); } const char* ObjTypeName(int type) { @@ -397,14 +408,8 @@ OpResult GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des // Phase 1 -> Fetch keys from both shards. // Phase 2 -> If everything is ok, clone the source object, delete the destination object, and // set its ptr to cloned one. we also copy the expiration data of the source key. - transaction->Execute( - [&renamer](Transaction* t, EngineShard* shard) { - auto args = t->ShardArgsInShard(shard->shard_id()); - return renamer.Find(shard->shard_id(), args).status(); - }, - false); - - transaction->Execute(renamer.Finalize(skip_exist_dest), true); + renamer.Find(transaction); + renamer.Finalize(transaction, skip_exist_dest); return renamer.status(); } diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index 9a961ba11..ca2f10493 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -23,8 +23,7 @@ using absl::StrCat; namespace dfly { -class GenericFamilyTest : public BaseFamilyTest { -}; +class GenericFamilyTest : public BaseFamilyTest {}; TEST_F(GenericFamilyTest, Expire) { Run({"set", "key", "val"}); @@ -52,7 +51,6 @@ TEST_F(GenericFamilyTest, Expire) { EXPECT_THAT(resp[0], ArgType(RespExpr::NIL)); } - TEST_F(GenericFamilyTest, Del) { for (size_t i = 0; i < 1000; ++i) { Run({"set", StrCat("foo", i), "1"}); @@ -93,11 +91,12 @@ TEST_F(GenericFamilyTest, Exists) { EXPECT_THAT(resp[0], IntArg(3)); } - TEST_F(GenericFamilyTest, Rename) { RespVec resp; + string b_val(32, 'b'); + string x_val(32, 'x'); - resp = Run({"mset", "x", "0", "b", "1"}); + resp = Run({"mset", "x", x_val, "b", b_val}); ASSERT_THAT(resp, RespEq("OK")); ASSERT_EQ(2, last_cmd_dbg_info_.shards_count); @@ -110,8 +109,7 @@ TEST_F(GenericFamilyTest, Rename) { int64_t val = CheckedInt({"get", "x"}); ASSERT_EQ(kint64min, val); // does not exist - val = CheckedInt({"get", "b"}); - ASSERT_EQ(0, val); // it has value of x. + ASSERT_THAT(Run({"get", "b"}), RespEq(x_val)); // swapped. EXPECT_EQ(CheckedInt({"exists", "x", "b"}), 1); @@ -133,7 +131,16 @@ TEST_F(GenericFamilyTest, Rename) { exist_fb.join(); ren_fb.join(); +} +TEST_F(GenericFamilyTest, RenameNonString) { + EXPECT_EQ(1, CheckedInt({"lpush", "x", "elem"})); + auto resp = Run({"rename", "x", "b"}); + ASSERT_THAT(resp, RespEq("OK")); + ASSERT_EQ(2, last_cmd_dbg_info_.shards_count); + + EXPECT_EQ(0, CheckedInt({"del", "x"})); + EXPECT_EQ(1, CheckedInt({"del", "b"})); } TEST_F(GenericFamilyTest, RenameBinary) { diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index e4916b59d..f87556cb5 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -271,4 +271,22 @@ TEST_F(ListFamilyTest, WrongTypeDoesNotWake) { EXPECT_THAT(blpop_resp, ElementsAre(kKey1, "B")); } +TEST_F(ListFamilyTest, BPopSameKeyTwice) { + RespVec blpop_resp; + + auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + blpop_resp = Run({"blpop", kKey1, kKey1, "0"}); + }); + + do { + this_fiber::sleep_for(30us); + } while (!IsLocked(0, kKey1)); + + pp_->at(1)->Await([&] { + EXPECT_EQ(1, CheckedInt({"lpush", kKey1, "bar"})); + }); + pop_fb.join(); + EXPECT_THAT(blpop_resp, ElementsAre(kKey1, "bar")); +} + } // namespace dfly