diff --git a/server/generic_family.cc b/server/generic_family.cc index 6d6790009..38c3d676b 100644 --- a/server/generic_family.cc +++ b/server/generic_family.cc @@ -21,6 +21,95 @@ namespace { DEFINE_VARZ(VarzQps, ping_qps); +class Renamer { + public: + Renamer(DbIndex dind, ShardId source_id) : db_indx_(dind), src_sid_(source_id) { + } + + // TODO: to implement locking semantics. + OpResult FindAndLock(ShardId shard_id, const ArgSlice& args); + + OpResult status() const { + return status_; + }; + + Transaction::RunnableType Finalize(bool skip_exist_dest); + + private: + void SwapValues(EngineShard* shard, const ArgSlice& args); + + DbIndex db_indx_; + ShardId src_sid_; + std::pair find_res_[2]; + + uint64_t expire_; + MainValue src_val_; + + OpResult status_; +}; + +OpResult Renamer::FindAndLock(ShardId shard_id, const ArgSlice& args) { + CHECK_EQ(1u, args.size()); + unsigned indx = (shard_id == src_sid_) ? 0 : 1; + + find_res_[indx] = EngineShard::tlocal()->db_slice().FindExt(db_indx_, args.front()); + + return OpStatus::OK; +}; + +void Renamer::SwapValues(EngineShard* shard, const ArgSlice& args) { + auto& dest = find_res_[1]; + auto shard_id = shard->shard_id(); + + // NOTE: This object juggling between shards won't work if we want to maintain heap per shard + // model. + if (shard_id == src_sid_) { // Handle source key. + // delete the source entry. + CHECK(shard->db_slice().Del(db_indx_, find_res_[0].first)); + return; + } + + // Handle destination + MainIterator dest_it = dest.first; + if (IsValid(dest_it)) { + dest_it->second = std::move(src_val_); // we just move the source. + shard->db_slice().Expire(db_indx_, dest_it, expire_); + } else { + // we just add the key to destination with the source object. + std::string_view key = args.front(); // from key + shard->db_slice().AddNew(db_indx_, key, std::move(src_val_), expire_); + } +} + +Transaction::RunnableType Renamer::Finalize(bool skip_exist_dest) { + const auto& src = find_res_[0]; + const auto& dest = find_res_[1]; + + auto cleanup = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; + + if (!IsValid(src.first)) { + status_ = OpStatus::KEY_NOTFOUND; + + return cleanup; + } + + if (IsValid(dest.first) && skip_exist_dest) { + status_ = OpStatus::KEY_EXISTS; + + return cleanup; + } + + expire_ = IsValid(src.second) ? src.second->second : 0; + src_val_ = std::move(src.first->second); + + // Src key exist and we need to override the destination. + return [this](Transaction* t, EngineShard* shard) { + this->SwapValues(shard, t->ShardArgsInShard(shard->shard_id())); + + return OpStatus::OK; + }; +} + } // namespace void GenericFamily::Init(util::ProactorPool* pp) { @@ -196,8 +285,23 @@ OpResult GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des return result; } - // TODO: to finish it - return OpStatus::OK; + transaction->Schedule(); + unsigned shard_count = transaction->shard_set()->size(); + Renamer renamer{transaction->db_index(), Shard(key[0], shard_count)}; + + // Phase 1 -> Fetch keys from both shards. + // Phase 2 -> If everything is ok, clone the source object, delete the destination object, and + // set its ptr to cloned one. we also copy the expiration data of the source key. + transaction->Execute( + [&renamer](Transaction* t, EngineShard* shard) { + auto args = t->ShardArgsInShard(shard->shard_id()); + return renamer.FindAndLock(shard->shard_id(), args).status(); + }, + false); + + transaction->Execute(renamer.Finalize(skip_exist_dest), true); + + return renamer.status(); } void GenericFamily::Echo(CmdArgList args, ConnectionContext* cntx) { diff --git a/server/generic_family_test.cc b/server/generic_family_test.cc index d504d918b..d86c91256 100644 --- a/server/generic_family_test.cc +++ b/server/generic_family_test.cc @@ -84,4 +84,42 @@ TEST_F(GenericFamilyTest, Exists) { EXPECT_THAT(resp[0], IntArg(3)); } -} // namespace dfly \ No newline at end of file + +TEST_F(GenericFamilyTest, Rename) { + RespVec resp = Run({"mset", "x", "0", "b", "1"}); + ASSERT_THAT(resp, RespEq("OK")); + ASSERT_EQ(2, last_cmd_dbg_info_.shards_count); + + resp = Run({"rename", "z", "b"}); + ASSERT_THAT(resp[0], ErrArg("no such key")); + + resp = Run({"rename", "x", "b"}); + ASSERT_THAT(resp, RespEq("OK")); + + int64_t val = CheckedInt({"get", "x"}); + ASSERT_EQ(kint64min, val); // does not exist + + val = CheckedInt({"get", "b"}); + ASSERT_EQ(0, val); // it has value of x. + + const char* keys[2] = {"b", "x"}; + auto ren_fb = pp_->at(0)->LaunchFiber([&] { + for (size_t i = 0; i < 200; ++i) { + int j = i % 2; + auto resp = Run({"rename", keys[j], keys[1 - j]}); + ASSERT_THAT(resp, RespEq("OK")); + } + }); + + auto exist_fb = pp_->at(2)->LaunchFiber([&] { + for (size_t i = 0; i < 300; ++i) { + int64_t resp = CheckedInt({"exists", "x", "b"}); + ASSERT_EQ(1, resp); + } + }); + + ren_fb.join(); + exist_fb.join(); +} + +} // namespace dfly diff --git a/server/transaction.cc b/server/transaction.cc index 620a1abcb..98ef801d4 100644 --- a/server/transaction.cc +++ b/server/transaction.cc @@ -20,7 +20,7 @@ namespace { std::atomic_uint64_t op_seq{1}; -constexpr size_t kTransSize = sizeof(Transaction); +[[maybe_unused]] constexpr size_t kTransSize = sizeof(Transaction); } // namespace @@ -77,6 +77,7 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) { DCHECK_EQ(unique_shard_cnt_, 0u); db_index_ = index; + if (!cid_->is_multi_key()) { // Single key optimization. auto key = ArgS(args, cid_->first_key_pos()); args_.push_back(key); @@ -470,8 +471,6 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { return true; } - intrusive_ptr_add_ref(this); - // we can do it because only a single thread writes into txid_ and sd. txid_ = op_seq.fetch_add(1, std::memory_order_relaxed); TxQueue::Iterator it = shard->InsertTxQ(this); @@ -601,8 +600,12 @@ inline uint32_t Transaction::DecreaseRunCnt() { // We use release so that no stores will be reordered after. uint32_t res = run_count_.fetch_sub(1, std::memory_order_release); - if (res == 1) + if (res == 1) { + // to protect against cases where Transaction is destroyed before run_ec_.notify + // finishes running. + ::boost::intrusive_ptr guard(this); run_ec_.notify(); + } return res; } diff --git a/server/transaction.h b/server/transaction.h index a4a46ab9c..ebff47061 100644 --- a/server/transaction.h +++ b/server/transaction.h @@ -8,7 +8,6 @@ #include #include -#include #include #include #include @@ -83,9 +82,11 @@ class Transaction { //! Returns true if the transaction is armed for execution on this sid (used to avoid //! duplicate runs). Supports local transactions under multi as well. + //! Can be used in contexts that wait for an event to happen. bool IsArmedInShard(ShardId sid) const { if (sid >= shard_data_.size()) sid = 0; + // We use acquire so that no reordering will move before this load. return run_count_.load(std::memory_order_acquire) > 0 && shard_data_[sid].local_mask & ARMED; } @@ -104,6 +105,12 @@ class Transaction { return shard_data_[sid].pq_pos; } + // Schedules a transaction. Usually used for multi-hop transactions like Rename or BLPOP. + // For single hop, use ScheduleSingleHop instead. + void Schedule() { + ScheduleInternal(false); + } + // if conclude is true, removes the transaction from the pending queue. void Execute(RunnableType cb, bool conclude);