Bug fixes.

1. Fix crash when calling BLPOP on the same key several times.
2. Extend RENAME functionality to cover all data-types.
   Before that it worked only for strings and that also was incorrect.
This commit is contained in:
Roman Gershman 2022-04-05 23:20:05 +03:00
parent c6e4e97865
commit 1fc9f11e76
4 changed files with 108 additions and 79 deletions

View file

@ -5,8 +5,8 @@
#include "server/engine_shard_set.h"
extern "C" {
#include "redis/zmalloc.h"
#include "redis/object.h"
#include "redis/zmalloc.h"
}
#include "base/logging.h"
@ -311,16 +311,16 @@ void EngineShard::ProcessAwakened(Transaction* completed_t) {
DCHECK(!queue.empty()); // since it's active
if (queue.front().trans == completed_t) {
queue.pop_front();
while (!queue.empty()) {
do {
const WatchItem& bi = queue.front();
Transaction* head = bi.trans.get();
if (head->NotifySuspended(wq.notify_txid, shard_id()))
// if a transaction blpops on the same key multiple times it will appear here
// here several times as well, hence we check != completed_t.
if (head != completed_t && head->NotifySuspended(wq.notify_txid, shard_id()))
break;
queue.pop_front();
}
} while (!queue.empty());
if (queue.empty()) {
wt.RemoveEntry(w_it);
@ -512,8 +512,7 @@ void EngineShard::CacheStats() {
mi_stats_merge();
size_t used_mem = UsedMemory();
cached_stats[db_slice_.shard_id()].used_memory.store(used_mem,
memory_order_relaxed);
cached_stats[db_slice_.shard_id()].used_memory.store(used_mem, memory_order_relaxed);
}
size_t EngineShard::UsedMemory() const {

View file

@ -31,105 +31,116 @@ class Renamer {
Renamer(DbIndex dind, ShardId source_id) : db_indx_(dind), src_sid_(source_id) {
}
OpResult<void> Find(ShardId shard_id, const ArgSlice& args);
void Find(Transaction* t);
OpResult<void> status() const {
return status_;
};
Transaction::RunnableType Finalize(bool skip_exist_dest);
void Finalize(Transaction* t, bool skip_exist_dest);
private:
void MoveValues(EngineShard* shard, const ArgSlice& args);
DbIndex db_indx_;
ShardId src_sid_;
struct FindResult {
string_view key;
PrimeValue val;
PrimeValue ref_val;
uint64_t expire_ts;
bool found = false;
};
FindResult src_res_, dest_res_; // index 0 for source, 1 for destination
OpResult<void> status_;
};
OpResult<void> Renamer::Find(ShardId shard_id, const ArgSlice& args) {
CHECK_EQ(1u, args.size());
FindResult* res = (shard_id == src_sid_) ? &src_res_ : &dest_res_;
void Renamer::Find(Transaction* t) {
auto cb = [this](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
CHECK_EQ(1u, args.size());
res->key = args.front();
auto& db_slice = EngineShard::tlocal()->db_slice();
auto [it, exp_it] = db_slice.FindExt(db_indx_, res->key);
FindResult* res = (shard->shard_id() == src_sid_) ? &src_res_ : &dest_res_;
res->found = IsValid(it);
if (IsValid(it)) {
res->val = it->second.AsRef();
res->expire_ts = IsValid(exp_it) ? db_slice.expire_base() + exp_it->second.duration() : 0;
}
res->key = args.front();
auto& db_slice = EngineShard::tlocal()->db_slice();
auto [it, exp_it] = db_slice.FindExt(db_indx_, res->key);
return OpStatus::OK;
res->found = IsValid(it);
if (IsValid(it)) {
res->ref_val = it->second.AsRef();
res->expire_ts = IsValid(exp_it) ? db_slice.expire_base() + exp_it->second.duration() : 0;
}
return OpStatus::OK;
};
t->Execute(move(cb), false);
};
void Renamer::MoveValues(EngineShard* shard, const ArgSlice& args) {
auto shard_id = shard->shard_id();
// TODO: when we want to maintain heap per shard model this code will require additional
// work
if (shard_id == src_sid_) { // Handle source key.
// delete the source entry.
auto it = shard->db_slice().FindExt(db_indx_, src_res_.key).first;
CHECK(shard->db_slice().Del(db_indx_, it));
return;
}
// Handle destination
string_view dest_key = dest_res_.key;
PrimeIterator dest_it = shard->db_slice().FindExt(db_indx_, dest_key).first;
if (IsValid(dest_it)) {
// we just move the source. We won't be able to do it with heap per shard model.
dest_it->second = std::move(src_res_.val);
shard->db_slice().Expire(db_indx_, dest_it, src_res_.expire_ts);
} else {
// we just add the key to destination with the source object.
shard->db_slice().AddNew(db_indx_, dest_key, std::move(src_res_.val), src_res_.expire_ts);
}
}
Transaction::RunnableType Renamer::Finalize(bool skip_exist_dest) {
void Renamer::Finalize(Transaction* t, bool skip_exist_dest) {
auto cleanup = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
if (!src_res_.found) {
status_ = OpStatus::KEY_NOTFOUND;
return cleanup;
t->Execute(move(cleanup), true);
return;
}
if (dest_res_.found && skip_exist_dest) {
status_ = OpStatus::KEY_EXISTS;
return cleanup;
t->Execute(move(cleanup), true);
return;
}
DCHECK(src_res_.val.IsRef());
DCHECK(src_res_.ref_val.IsRef());
// We can not copy from the existing value and delete it at the same time.
// TODO: if we want to allocate in shard, we must implement CompactObject::Clone.
// For now we hack COW for strings.
string val;
src_res_.val.GetString(&val);
src_res_.val.Reset();
src_res_.val.SetString(val);
PrimeValue pv;
string str_val;
auto move_src = [&](Transaction* t, EngineShard* shard) {
if (shard->shard_id() == src_sid_) { // Handle source key.
// TODO: to call PreUpdate/PostUpdate.
auto it = shard->db_slice().FindExt(db_indx_, src_res_.key).first;
CHECK(IsValid(it));
if (it->second.ObjType() == OBJ_STRING) {
it->second.GetString(&str_val);
} else {
pv = std::move(it->second);
}
CHECK(shard->db_slice().Del(db_indx_, it)); // delete the entry with empty value in it.
}
return OpStatus::OK;
};
t->Execute(move(move_src), false);
// Src key exist and we need to override the destination.
return [this](Transaction* t, EngineShard* shard) {
this->MoveValues(shard, t->ShardArgsInShard(shard->shard_id()));
auto set_dest = [&](Transaction* t, EngineShard* shard) {
if (shard->shard_id() != src_sid_) {
auto& db_slice = shard->db_slice();
string_view dest_key = dest_res_.key;
PrimeIterator dest_it = db_slice.FindExt(db_indx_, dest_key).first;
if (IsValid(dest_it)) {
if (src_res_.ref_val.ObjType() == OBJ_STRING) {
dest_it->second.SetString(str_val);
} else {
dest_it->second = std::move(pv);
}
db_slice.Expire(db_indx_, dest_it, src_res_.expire_ts);
} else {
if (src_res_.ref_val.ObjType() == OBJ_STRING) {
db_slice.AddNew(db_indx_, dest_key, PrimeValue{str_val}, src_res_.expire_ts);
} else {
db_slice.AddNew(db_indx_, dest_key, std::move(pv), src_res_.expire_ts);
}
}
}
return OpStatus::OK;
};
t->Execute(move(set_dest), true);
}
const char* ObjTypeName(int type) {
@ -397,14 +408,8 @@ OpResult<void> GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des
// Phase 1 -> Fetch keys from both shards.
// Phase 2 -> If everything is ok, clone the source object, delete the destination object, and
// set its ptr to cloned one. we also copy the expiration data of the source key.
transaction->Execute(
[&renamer](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
return renamer.Find(shard->shard_id(), args).status();
},
false);
transaction->Execute(renamer.Finalize(skip_exist_dest), true);
renamer.Find(transaction);
renamer.Finalize(transaction, skip_exist_dest);
return renamer.status();
}

View file

@ -23,8 +23,7 @@ using absl::StrCat;
namespace dfly {
class GenericFamilyTest : public BaseFamilyTest {
};
class GenericFamilyTest : public BaseFamilyTest {};
TEST_F(GenericFamilyTest, Expire) {
Run({"set", "key", "val"});
@ -52,7 +51,6 @@ TEST_F(GenericFamilyTest, Expire) {
EXPECT_THAT(resp[0], ArgType(RespExpr::NIL));
}
TEST_F(GenericFamilyTest, Del) {
for (size_t i = 0; i < 1000; ++i) {
Run({"set", StrCat("foo", i), "1"});
@ -93,11 +91,12 @@ TEST_F(GenericFamilyTest, Exists) {
EXPECT_THAT(resp[0], IntArg(3));
}
TEST_F(GenericFamilyTest, Rename) {
RespVec resp;
string b_val(32, 'b');
string x_val(32, 'x');
resp = Run({"mset", "x", "0", "b", "1"});
resp = Run({"mset", "x", x_val, "b", b_val});
ASSERT_THAT(resp, RespEq("OK"));
ASSERT_EQ(2, last_cmd_dbg_info_.shards_count);
@ -110,8 +109,7 @@ TEST_F(GenericFamilyTest, Rename) {
int64_t val = CheckedInt({"get", "x"});
ASSERT_EQ(kint64min, val); // does not exist
val = CheckedInt({"get", "b"});
ASSERT_EQ(0, val); // it has value of x.
ASSERT_THAT(Run({"get", "b"}), RespEq(x_val)); // swapped.
EXPECT_EQ(CheckedInt({"exists", "x", "b"}), 1);
@ -133,7 +131,16 @@ TEST_F(GenericFamilyTest, Rename) {
exist_fb.join();
ren_fb.join();
}
TEST_F(GenericFamilyTest, RenameNonString) {
EXPECT_EQ(1, CheckedInt({"lpush", "x", "elem"}));
auto resp = Run({"rename", "x", "b"});
ASSERT_THAT(resp, RespEq("OK"));
ASSERT_EQ(2, last_cmd_dbg_info_.shards_count);
EXPECT_EQ(0, CheckedInt({"del", "x"}));
EXPECT_EQ(1, CheckedInt({"del", "b"}));
}
TEST_F(GenericFamilyTest, RenameBinary) {

View file

@ -271,4 +271,22 @@ TEST_F(ListFamilyTest, WrongTypeDoesNotWake) {
EXPECT_THAT(blpop_resp, ElementsAre(kKey1, "B"));
}
TEST_F(ListFamilyTest, BPopSameKeyTwice) {
RespVec blpop_resp;
auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
blpop_resp = Run({"blpop", kKey1, kKey1, "0"});
});
do {
this_fiber::sleep_for(30us);
} while (!IsLocked(0, kKey1));
pp_->at(1)->Await([&] {
EXPECT_EQ(1, CheckedInt({"lpush", kKey1, "bar"}));
});
pop_fb.join();
EXPECT_THAT(blpop_resp, ElementsAre(kKey1, "bar"));
}
} // namespace dfly