diff --git a/src/core/compact_object.cc b/src/core/compact_object.cc index f56d10549..99505d2b4 100644 --- a/src/core/compact_object.cc +++ b/src/core/compact_object.cc @@ -674,6 +674,8 @@ std::optional CompactObj::TryGetInt() const { } void CompactObj::SetString(std::string_view str) { + uint8_t mask = mask_ & ~kEncMask; + // Trying auto-detection heuristics first. if (str.size() <= 20) { long long ival; @@ -681,14 +683,14 @@ void CompactObj::SetString(std::string_view str) { // We use redis string2ll to be compatible with Redis. if (string2ll(str.data(), str.size(), &ival)) { - SetMeta(INT_TAG, mask_ & ~kEncMask); + SetMeta(INT_TAG, mask); u_.ival = ival; return; } if (str.size() <= kInlineLen) { - SetMeta(str.size(), mask_ & ~kEncMask); + SetMeta(str.size(), mask); memcpy(u_.inline_str, str.data(), str.size()); return; @@ -698,7 +700,6 @@ void CompactObj::SetString(std::string_view str) { DCHECK_GT(str.size(), kInlineLen); string_view encoded = str; - uint8_t mask = mask_ & ~kEncMask; bool is_ascii = kUseAsciiEncoding && validate_ascii_fast(str.data(), str.size()); if (is_ascii) { diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index e071b8991..1a64485a1 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -310,21 +310,22 @@ void EngineShard::ProcessAwakened(Transaction* completed_t) { auto& queue = wq.items; DCHECK(!queue.empty()); // since it's active - if (queue.front().trans == completed_t) { - do { - const WatchItem& bi = queue.front(); - Transaction* head = bi.trans.get(); + if (queue.front().trans != completed_t) + continue; - // if a transaction blpops on the same key multiple times it will appear here - // here several times as well, hence we check != completed_t. - if (head != completed_t && head->NotifySuspended(wq.notify_txid, shard_id())) - break; - queue.pop_front(); - } while (!queue.empty()); + DVLOG(1) << "Wakening next transaction for key " << key; - if (queue.empty()) { - wt.RemoveEntry(w_it); - } + do { + const WatchItem& bi = queue.front(); + Transaction* head = bi.trans.get(); + + if (head->NotifySuspended(wq.notify_txid, shard_id())) + break; + queue.pop_front(); + } while (!queue.empty()); + + if (queue.empty()) { + wt.RemoveEntry(w_it); } } diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 2b9575180..e22a82212 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -40,6 +40,9 @@ class Renamer { void Finalize(Transaction* t, bool skip_exist_dest); private: + OpStatus MoveSrc(Transaction* t, EngineShard* es); + OpStatus UpdateDest(Transaction* t, EngineShard* es); + DbIndex db_indx_; ShardId src_sid_; @@ -50,6 +53,9 @@ class Renamer { bool found = false; }; + PrimeValue pv_; + string str_val_; + FindResult src_res_, dest_res_; // index 0 for source, 1 for destination OpResult status_; }; @@ -95,52 +101,66 @@ void Renamer::Finalize(Transaction* t, bool skip_exist_dest) { DCHECK(src_res_.ref_val.IsRef()); - PrimeValue pv; - string str_val; - - auto move_src = [&](Transaction* t, EngineShard* shard) { - if (shard->shard_id() == src_sid_) { // Handle source key. - // TODO: to call PreUpdate/PostUpdate. - auto it = shard->db_slice().FindExt(db_indx_, src_res_.key).first; - CHECK(IsValid(it)); - - if (it->second.ObjType() == OBJ_STRING) { - it->second.GetString(&str_val); - } else { - pv = std::move(it->second); - } - CHECK(shard->db_slice().Del(db_indx_, it)); // delete the entry with empty value in it. - } - return OpStatus::OK; - }; - - t->Execute(move(move_src), false); - // Src key exist and we need to override the destination. - auto set_dest = [&](Transaction* t, EngineShard* shard) { - if (shard->shard_id() != src_sid_) { - auto& db_slice = shard->db_slice(); - string_view dest_key = dest_res_.key; - PrimeIterator dest_it = db_slice.FindExt(db_indx_, dest_key).first; - if (IsValid(dest_it)) { - if (src_res_.ref_val.ObjType() == OBJ_STRING) { - dest_it->second.SetString(str_val); - } else { - dest_it->second = std::move(pv); - } - db_slice.Expire(db_indx_, dest_it, src_res_.expire_ts); + // Alternatively, we could apply an optimistic algorithm and move src at Find step. + // We would need to restore the state in case of cleanups. + t->Execute([&](Transaction* t, EngineShard* shard) { return MoveSrc(t, shard); }, false); + t->Execute([&](Transaction* t, EngineShard* shard) { return UpdateDest(t, shard); }, true); +} + +OpStatus Renamer::MoveSrc(Transaction* t, EngineShard* es) { + if (es->shard_id() == src_sid_) { // Handle source key. + // TODO: to call PreUpdate/PostUpdate. + auto it = es->db_slice().FindExt(db_indx_, src_res_.key).first; + CHECK(IsValid(it)); + + // We distinguish because of the SmallString that is pinned to its thread by design, + // thus can not be accessed via another thread. + // Therefore, we copy it to standard string in its thread. + if (it->second.ObjType() == OBJ_STRING) { + it->second.GetString(&str_val_); + } else { + bool has_expire = it->second.HasExpire(); + pv_ = std::move(it->second); + it->second.SetExpire(has_expire); + } + CHECK(es->db_slice().Del(db_indx_, it)); // delete the entry with empty value in it. + } + + return OpStatus::OK; +} + +OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) { + if (es->shard_id() != src_sid_) { + auto& db_slice = es->db_slice(); + string_view dest_key = dest_res_.key; + PrimeIterator dest_it = db_slice.FindExt(db_indx_, dest_key).first; + bool is_prior_list = false; + + if (IsValid(dest_it)) { + bool has_expire = dest_it->second.HasExpire(); + is_prior_list = dest_it->second.ObjType() == OBJ_LIST; + + if (src_res_.ref_val.ObjType() == OBJ_STRING) { + dest_it->second.SetString(str_val_); } else { - if (src_res_.ref_val.ObjType() == OBJ_STRING) { - db_slice.AddNew(db_indx_, dest_key, PrimeValue{str_val}, src_res_.expire_ts); - } else { - db_slice.AddNew(db_indx_, dest_key, std::move(pv), src_res_.expire_ts); - } + dest_it->second = std::move(pv_); } + dest_it->second.SetExpire(has_expire); // preserve expire flag. + db_slice.Expire(db_indx_, dest_it, src_res_.expire_ts); + } else { + if (src_res_.ref_val.ObjType() == OBJ_STRING) { + pv_.SetString(str_val_); + } + dest_it = db_slice.AddNew(db_indx_, dest_key, std::move(pv_), src_res_.expire_ts); } - return OpStatus::OK; - }; - t->Execute(move(set_dest), true); + if (!is_prior_list && dest_it->second.ObjType() == OBJ_LIST) { + es->AwakeWatched(db_indx_, dest_key); + } + } + + return OpStatus::OK; } const char* ObjTypeName(int type) { @@ -565,17 +585,20 @@ OpResult GenericFamily::OpExists(const OpArgs& op_args, ArgSlice keys) return res; } -OpResult GenericFamily::OpRen(const OpArgs& op_args, string_view from, string_view to, +OpResult GenericFamily::OpRen(const OpArgs& op_args, string_view from_key, string_view to_key, bool skip_exists) { auto& db_slice = op_args.shard->db_slice(); - auto [from_it, from_expire] = db_slice.FindExt(op_args.db_ind, from); + auto [from_it, from_expire] = db_slice.FindExt(op_args.db_ind, from_key); if (!IsValid(from_it)) return OpStatus::KEY_NOTFOUND; - auto [to_it, to_expire] = db_slice.FindExt(op_args.db_ind, to); + bool is_prior_list = false; + auto [to_it, to_expire] = db_slice.FindExt(op_args.db_ind, to_key); if (IsValid(to_it)) { if (skip_exists) return OpStatus::KEY_EXISTS; + + is_prior_list = (to_it->second.ObjType() == OBJ_LIST); } uint64_t exp_ts = @@ -584,7 +607,7 @@ OpResult GenericFamily::OpRen(const OpArgs& op_args, string_view from, str // we keep the value we want to move. PrimeValue from_obj = std::move(from_it->second); - // Restore the expire flag on 'from'. + // Restore the expire flag on 'from' so we could delete it from expire table. from_it->second.SetExpire(IsValid(from_expire)); if (IsValid(to_it)) { @@ -601,9 +624,12 @@ OpResult GenericFamily::OpRen(const OpArgs& op_args, string_view from, str // On the other hand, AddNew does not rely on the iterators - this is why we keep // the value in `from_obj`. CHECK(db_slice.Del(op_args.db_ind, from_it)); - db_slice.AddNew(op_args.db_ind, to, std::move(from_obj), exp_ts); + to_it = db_slice.AddNew(op_args.db_ind, to_key, std::move(from_obj), exp_ts); } + if (!is_prior_list && to_it->second.ObjType() == OBJ_LIST) { + op_args.shard->AwakeWatched(op_args.db_ind, to_key); + } return OpStatus::OK; } diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index f87556cb5..836235ee0 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -275,7 +275,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) { RespVec blpop_resp; auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { - blpop_resp = Run({"blpop", kKey1, kKey1, "0"}); + blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"}); }); do { @@ -287,6 +287,42 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) { }); pop_fb.join(); EXPECT_THAT(blpop_resp, ElementsAre(kKey1, "bar")); + + pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"}); + }); + + do { + this_fiber::sleep_for(30us); + } while (!IsLocked(0, kKey1)); + + pp_->at(1)->Await([&] { + EXPECT_EQ(1, CheckedInt({"lpush", kKey2, "bar"})); + }); + pop_fb.join(); + EXPECT_THAT(blpop_resp, ElementsAre(kKey2, "bar")); +} + +TEST_F(ListFamilyTest, BPopRename) { + RespVec blpop_resp; + + Run({"exists", kKey1, kKey2}); + ASSERT_EQ(2, GetDebugInfo().shards_count); + + auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + blpop_resp = Run({"blpop", kKey1, "0"}); + }); + + do { + this_fiber::sleep_for(30us); + } while (!IsLocked(0, kKey1)); + + pp_->at(1)->Await([&] { + EXPECT_EQ(1, CheckedInt({"lpush", "a", "bar"})); + Run({"rename", "a", kKey1}); + }); + pop_fb.join(); + EXPECT_THAT(blpop_resp, ElementsAre(kKey1, "bar")); } } // namespace dfly diff --git a/src/server/transaction.cc b/src/server/transaction.cc index f3996c506..e4d268442 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1074,9 +1074,9 @@ void Transaction::UnregisterWatch() { // Runs only in the shard thread. OpStatus Transaction::AddToWatchedShardCb(EngineShard* shard) { - ShardId sid = SidToId(shard->shard_id()); + ShardId idx = SidToId(shard->shard_id()); - auto& sd = shard_data_[sid]; + auto& sd = shard_data_[idx]; CHECK_EQ(0, sd.local_mask & SUSPENDED_Q); DCHECK_EQ(0, sd.local_mask & ARMED); @@ -1085,6 +1085,7 @@ OpStatus Transaction::AddToWatchedShardCb(EngineShard* shard) { shard->AddWatched(s, this); } sd.local_mask |= SUSPENDED_Q; + DVLOG(1) << "AddWatched " << DebugId() << " local_mask:" << sd.local_mask; return OpStatus::OK; } @@ -1092,16 +1093,16 @@ OpStatus Transaction::AddToWatchedShardCb(EngineShard* shard) { // Runs only in the shard thread. // Quadratic complexity in number of arguments and queue length. bool Transaction::RemoveFromWatchedShardCb(EngineShard* shard) { - ShardId sid = SidToId(shard->shard_id()); - auto& sd = shard_data_[sid]; + ShardId idx = SidToId(shard->shard_id()); + auto& sd = shard_data_[idx]; constexpr uint16_t kQueueMask = - -Transaction::SUSPENDED_Q | Transaction::AWAKED_Q | Transaction::EXPIRED_Q; + Transaction::SUSPENDED_Q | Transaction::AWAKED_Q | Transaction::EXPIRED_Q; if ((sd.local_mask & kQueueMask) == 0) return false; - sd.local_mask &= kQueueMask; + sd.local_mask &= ~kQueueMask; // TODO: what if args have keys and values? auto args = ShardArgsInShard(shard->shard_id()); @@ -1129,17 +1130,21 @@ bool Transaction::IsGlobal() const { } // Runs only in the shard thread. +// Returns true if the transcton has changed its state from suspended to awakened, +// false, otherwise. bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) { - unsigned sd_id = SidToId(sid); - auto& sd = shard_data_[sd_id]; + unsigned idx = SidToId(sid); + auto& sd = shard_data_[idx]; unsigned local_mask = sd.local_mask; - CHECK_NE(0u, local_mask & SUSPENDED_Q); - DVLOG(1) << "NotifyBlocked " << DebugId() << ", local_mask: " << local_mask; if (local_mask & Transaction::EXPIRED_Q) { return false; } + DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask; + + // local_mask could be awaked (i.e. not suspended) if the transaction has been + // awakened by another key or awakened by the same key multiple times. if (local_mask & SUSPENDED_Q) { DCHECK_EQ(0u, local_mask & AWAKED_Q); @@ -1159,7 +1164,7 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) { } CHECK(sd.local_mask & AWAKED_Q); - return true; + return false; } void Transaction::BreakOnClose() {