More bug fixes.

1. RENAME now unblocks blpop/brpop.
2. Fix a deadlock bug with blpop running with the same key multiple times.
This commit is contained in:
Roman Gershman 2022-04-06 10:32:42 +03:00
parent 1fc9f11e76
commit abbefd0bc4
5 changed files with 143 additions and 74 deletions

View file

@ -674,6 +674,8 @@ std::optional<int64_t> CompactObj::TryGetInt() const {
} }
void CompactObj::SetString(std::string_view str) { void CompactObj::SetString(std::string_view str) {
uint8_t mask = mask_ & ~kEncMask;
// Trying auto-detection heuristics first. // Trying auto-detection heuristics first.
if (str.size() <= 20) { if (str.size() <= 20) {
long long ival; long long ival;
@ -681,14 +683,14 @@ void CompactObj::SetString(std::string_view str) {
// We use redis string2ll to be compatible with Redis. // We use redis string2ll to be compatible with Redis.
if (string2ll(str.data(), str.size(), &ival)) { if (string2ll(str.data(), str.size(), &ival)) {
SetMeta(INT_TAG, mask_ & ~kEncMask); SetMeta(INT_TAG, mask);
u_.ival = ival; u_.ival = ival;
return; return;
} }
if (str.size() <= kInlineLen) { if (str.size() <= kInlineLen) {
SetMeta(str.size(), mask_ & ~kEncMask); SetMeta(str.size(), mask);
memcpy(u_.inline_str, str.data(), str.size()); memcpy(u_.inline_str, str.data(), str.size());
return; return;
@ -698,7 +700,6 @@ void CompactObj::SetString(std::string_view str) {
DCHECK_GT(str.size(), kInlineLen); DCHECK_GT(str.size(), kInlineLen);
string_view encoded = str; string_view encoded = str;
uint8_t mask = mask_ & ~kEncMask;
bool is_ascii = kUseAsciiEncoding && validate_ascii_fast(str.data(), str.size()); bool is_ascii = kUseAsciiEncoding && validate_ascii_fast(str.data(), str.size());
if (is_ascii) { if (is_ascii) {

View file

@ -310,21 +310,22 @@ void EngineShard::ProcessAwakened(Transaction* completed_t) {
auto& queue = wq.items; auto& queue = wq.items;
DCHECK(!queue.empty()); // since it's active DCHECK(!queue.empty()); // since it's active
if (queue.front().trans == completed_t) { if (queue.front().trans != completed_t)
do { continue;
const WatchItem& bi = queue.front();
Transaction* head = bi.trans.get();
// if a transaction blpops on the same key multiple times it will appear here DVLOG(1) << "Wakening next transaction for key " << key;
// here several times as well, hence we check != completed_t.
if (head != completed_t && head->NotifySuspended(wq.notify_txid, shard_id()))
break;
queue.pop_front();
} while (!queue.empty());
if (queue.empty()) { do {
wt.RemoveEntry(w_it); const WatchItem& bi = queue.front();
} Transaction* head = bi.trans.get();
if (head->NotifySuspended(wq.notify_txid, shard_id()))
break;
queue.pop_front();
} while (!queue.empty());
if (queue.empty()) {
wt.RemoveEntry(w_it);
} }
} }

View file

@ -40,6 +40,9 @@ class Renamer {
void Finalize(Transaction* t, bool skip_exist_dest); void Finalize(Transaction* t, bool skip_exist_dest);
private: private:
OpStatus MoveSrc(Transaction* t, EngineShard* es);
OpStatus UpdateDest(Transaction* t, EngineShard* es);
DbIndex db_indx_; DbIndex db_indx_;
ShardId src_sid_; ShardId src_sid_;
@ -50,6 +53,9 @@ class Renamer {
bool found = false; bool found = false;
}; };
PrimeValue pv_;
string str_val_;
FindResult src_res_, dest_res_; // index 0 for source, 1 for destination FindResult src_res_, dest_res_; // index 0 for source, 1 for destination
OpResult<void> status_; OpResult<void> status_;
}; };
@ -95,52 +101,66 @@ void Renamer::Finalize(Transaction* t, bool skip_exist_dest) {
DCHECK(src_res_.ref_val.IsRef()); DCHECK(src_res_.ref_val.IsRef());
PrimeValue pv;
string str_val;
auto move_src = [&](Transaction* t, EngineShard* shard) {
if (shard->shard_id() == src_sid_) { // Handle source key.
// TODO: to call PreUpdate/PostUpdate.
auto it = shard->db_slice().FindExt(db_indx_, src_res_.key).first;
CHECK(IsValid(it));
if (it->second.ObjType() == OBJ_STRING) {
it->second.GetString(&str_val);
} else {
pv = std::move(it->second);
}
CHECK(shard->db_slice().Del(db_indx_, it)); // delete the entry with empty value in it.
}
return OpStatus::OK;
};
t->Execute(move(move_src), false);
// Src key exist and we need to override the destination. // Src key exist and we need to override the destination.
auto set_dest = [&](Transaction* t, EngineShard* shard) { // Alternatively, we could apply an optimistic algorithm and move src at Find step.
if (shard->shard_id() != src_sid_) { // We would need to restore the state in case of cleanups.
auto& db_slice = shard->db_slice(); t->Execute([&](Transaction* t, EngineShard* shard) { return MoveSrc(t, shard); }, false);
string_view dest_key = dest_res_.key; t->Execute([&](Transaction* t, EngineShard* shard) { return UpdateDest(t, shard); }, true);
PrimeIterator dest_it = db_slice.FindExt(db_indx_, dest_key).first; }
if (IsValid(dest_it)) {
if (src_res_.ref_val.ObjType() == OBJ_STRING) { OpStatus Renamer::MoveSrc(Transaction* t, EngineShard* es) {
dest_it->second.SetString(str_val); if (es->shard_id() == src_sid_) { // Handle source key.
} else { // TODO: to call PreUpdate/PostUpdate.
dest_it->second = std::move(pv); auto it = es->db_slice().FindExt(db_indx_, src_res_.key).first;
} CHECK(IsValid(it));
db_slice.Expire(db_indx_, dest_it, src_res_.expire_ts);
// We distinguish because of the SmallString that is pinned to its thread by design,
// thus can not be accessed via another thread.
// Therefore, we copy it to standard string in its thread.
if (it->second.ObjType() == OBJ_STRING) {
it->second.GetString(&str_val_);
} else {
bool has_expire = it->second.HasExpire();
pv_ = std::move(it->second);
it->second.SetExpire(has_expire);
}
CHECK(es->db_slice().Del(db_indx_, it)); // delete the entry with empty value in it.
}
return OpStatus::OK;
}
OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) {
if (es->shard_id() != src_sid_) {
auto& db_slice = es->db_slice();
string_view dest_key = dest_res_.key;
PrimeIterator dest_it = db_slice.FindExt(db_indx_, dest_key).first;
bool is_prior_list = false;
if (IsValid(dest_it)) {
bool has_expire = dest_it->second.HasExpire();
is_prior_list = dest_it->second.ObjType() == OBJ_LIST;
if (src_res_.ref_val.ObjType() == OBJ_STRING) {
dest_it->second.SetString(str_val_);
} else { } else {
if (src_res_.ref_val.ObjType() == OBJ_STRING) { dest_it->second = std::move(pv_);
db_slice.AddNew(db_indx_, dest_key, PrimeValue{str_val}, src_res_.expire_ts);
} else {
db_slice.AddNew(db_indx_, dest_key, std::move(pv), src_res_.expire_ts);
}
} }
dest_it->second.SetExpire(has_expire); // preserve expire flag.
db_slice.Expire(db_indx_, dest_it, src_res_.expire_ts);
} else {
if (src_res_.ref_val.ObjType() == OBJ_STRING) {
pv_.SetString(str_val_);
}
dest_it = db_slice.AddNew(db_indx_, dest_key, std::move(pv_), src_res_.expire_ts);
} }
return OpStatus::OK; if (!is_prior_list && dest_it->second.ObjType() == OBJ_LIST) {
}; es->AwakeWatched(db_indx_, dest_key);
t->Execute(move(set_dest), true); }
}
return OpStatus::OK;
} }
const char* ObjTypeName(int type) { const char* ObjTypeName(int type) {
@ -565,17 +585,20 @@ OpResult<uint32_t> GenericFamily::OpExists(const OpArgs& op_args, ArgSlice keys)
return res; return res;
} }
OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from, string_view to, OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from_key, string_view to_key,
bool skip_exists) { bool skip_exists) {
auto& db_slice = op_args.shard->db_slice(); auto& db_slice = op_args.shard->db_slice();
auto [from_it, from_expire] = db_slice.FindExt(op_args.db_ind, from); auto [from_it, from_expire] = db_slice.FindExt(op_args.db_ind, from_key);
if (!IsValid(from_it)) if (!IsValid(from_it))
return OpStatus::KEY_NOTFOUND; return OpStatus::KEY_NOTFOUND;
auto [to_it, to_expire] = db_slice.FindExt(op_args.db_ind, to); bool is_prior_list = false;
auto [to_it, to_expire] = db_slice.FindExt(op_args.db_ind, to_key);
if (IsValid(to_it)) { if (IsValid(to_it)) {
if (skip_exists) if (skip_exists)
return OpStatus::KEY_EXISTS; return OpStatus::KEY_EXISTS;
is_prior_list = (to_it->second.ObjType() == OBJ_LIST);
} }
uint64_t exp_ts = uint64_t exp_ts =
@ -584,7 +607,7 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from, str
// we keep the value we want to move. // we keep the value we want to move.
PrimeValue from_obj = std::move(from_it->second); PrimeValue from_obj = std::move(from_it->second);
// Restore the expire flag on 'from'. // Restore the expire flag on 'from' so we could delete it from expire table.
from_it->second.SetExpire(IsValid(from_expire)); from_it->second.SetExpire(IsValid(from_expire));
if (IsValid(to_it)) { if (IsValid(to_it)) {
@ -601,9 +624,12 @@ OpResult<void> GenericFamily::OpRen(const OpArgs& op_args, string_view from, str
// On the other hand, AddNew does not rely on the iterators - this is why we keep // On the other hand, AddNew does not rely on the iterators - this is why we keep
// the value in `from_obj`. // the value in `from_obj`.
CHECK(db_slice.Del(op_args.db_ind, from_it)); CHECK(db_slice.Del(op_args.db_ind, from_it));
db_slice.AddNew(op_args.db_ind, to, std::move(from_obj), exp_ts); to_it = db_slice.AddNew(op_args.db_ind, to_key, std::move(from_obj), exp_ts);
} }
if (!is_prior_list && to_it->second.ObjType() == OBJ_LIST) {
op_args.shard->AwakeWatched(op_args.db_ind, to_key);
}
return OpStatus::OK; return OpStatus::OK;
} }

View file

@ -275,7 +275,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
RespVec blpop_resp; RespVec blpop_resp;
auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, kKey1, "0"}); blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"});
}); });
do { do {
@ -287,6 +287,42 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
}); });
pop_fb.join(); pop_fb.join();
EXPECT_THAT(blpop_resp, ElementsAre(kKey1, "bar")); EXPECT_THAT(blpop_resp, ElementsAre(kKey1, "bar"));
pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"});
});
do {
this_fiber::sleep_for(30us);
} while (!IsLocked(0, kKey1));
pp_->at(1)->Await([&] {
EXPECT_EQ(1, CheckedInt({"lpush", kKey2, "bar"}));
});
pop_fb.join();
EXPECT_THAT(blpop_resp, ElementsAre(kKey2, "bar"));
}
TEST_F(ListFamilyTest, BPopRename) {
RespVec blpop_resp;
Run({"exists", kKey1, kKey2});
ASSERT_EQ(2, GetDebugInfo().shards_count);
auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, "0"});
});
do {
this_fiber::sleep_for(30us);
} while (!IsLocked(0, kKey1));
pp_->at(1)->Await([&] {
EXPECT_EQ(1, CheckedInt({"lpush", "a", "bar"}));
Run({"rename", "a", kKey1});
});
pop_fb.join();
EXPECT_THAT(blpop_resp, ElementsAre(kKey1, "bar"));
} }
} // namespace dfly } // namespace dfly

View file

@ -1074,9 +1074,9 @@ void Transaction::UnregisterWatch() {
// Runs only in the shard thread. // Runs only in the shard thread.
OpStatus Transaction::AddToWatchedShardCb(EngineShard* shard) { OpStatus Transaction::AddToWatchedShardCb(EngineShard* shard) {
ShardId sid = SidToId(shard->shard_id()); ShardId idx = SidToId(shard->shard_id());
auto& sd = shard_data_[sid]; auto& sd = shard_data_[idx];
CHECK_EQ(0, sd.local_mask & SUSPENDED_Q); CHECK_EQ(0, sd.local_mask & SUSPENDED_Q);
DCHECK_EQ(0, sd.local_mask & ARMED); DCHECK_EQ(0, sd.local_mask & ARMED);
@ -1085,6 +1085,7 @@ OpStatus Transaction::AddToWatchedShardCb(EngineShard* shard) {
shard->AddWatched(s, this); shard->AddWatched(s, this);
} }
sd.local_mask |= SUSPENDED_Q; sd.local_mask |= SUSPENDED_Q;
DVLOG(1) << "AddWatched " << DebugId() << " local_mask:" << sd.local_mask;
return OpStatus::OK; return OpStatus::OK;
} }
@ -1092,16 +1093,16 @@ OpStatus Transaction::AddToWatchedShardCb(EngineShard* shard) {
// Runs only in the shard thread. // Runs only in the shard thread.
// Quadratic complexity in number of arguments and queue length. // Quadratic complexity in number of arguments and queue length.
bool Transaction::RemoveFromWatchedShardCb(EngineShard* shard) { bool Transaction::RemoveFromWatchedShardCb(EngineShard* shard) {
ShardId sid = SidToId(shard->shard_id()); ShardId idx = SidToId(shard->shard_id());
auto& sd = shard_data_[sid]; auto& sd = shard_data_[idx];
constexpr uint16_t kQueueMask = constexpr uint16_t kQueueMask =
-Transaction::SUSPENDED_Q | Transaction::AWAKED_Q | Transaction::EXPIRED_Q; Transaction::SUSPENDED_Q | Transaction::AWAKED_Q | Transaction::EXPIRED_Q;
if ((sd.local_mask & kQueueMask) == 0) if ((sd.local_mask & kQueueMask) == 0)
return false; return false;
sd.local_mask &= kQueueMask; sd.local_mask &= ~kQueueMask;
// TODO: what if args have keys and values? // TODO: what if args have keys and values?
auto args = ShardArgsInShard(shard->shard_id()); auto args = ShardArgsInShard(shard->shard_id());
@ -1129,17 +1130,21 @@ bool Transaction::IsGlobal() const {
} }
// Runs only in the shard thread. // Runs only in the shard thread.
// Returns true if the transcton has changed its state from suspended to awakened,
// false, otherwise.
bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) { bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) {
unsigned sd_id = SidToId(sid); unsigned idx = SidToId(sid);
auto& sd = shard_data_[sd_id]; auto& sd = shard_data_[idx];
unsigned local_mask = sd.local_mask; unsigned local_mask = sd.local_mask;
CHECK_NE(0u, local_mask & SUSPENDED_Q);
DVLOG(1) << "NotifyBlocked " << DebugId() << ", local_mask: " << local_mask;
if (local_mask & Transaction::EXPIRED_Q) { if (local_mask & Transaction::EXPIRED_Q) {
return false; return false;
} }
DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask;
// local_mask could be awaked (i.e. not suspended) if the transaction has been
// awakened by another key or awakened by the same key multiple times.
if (local_mask & SUSPENDED_Q) { if (local_mask & SUSPENDED_Q) {
DCHECK_EQ(0u, local_mask & AWAKED_Q); DCHECK_EQ(0u, local_mask & AWAKED_Q);
@ -1159,7 +1164,7 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) {
} }
CHECK(sd.local_mask & AWAKED_Q); CHECK(sd.local_mask & AWAKED_Q);
return true; return false;
} }
void Transaction::BreakOnClose() { void Transaction::BreakOnClose() {