feat(server): Async unlock multi (#774)

* feat(server):Async unlock multi

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2023-02-11 19:57:02 +03:00 committed by GitHub
parent 3ba46a5097
commit 6e612e7545
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 21 additions and 19 deletions

View file

@ -140,7 +140,6 @@ TEST_F(DflyEngineTest, Multi) {
ASSERT_THAT(resp.GetVec(), ElementsAre(ArgType(RespExpr::NIL), ArgType(RespExpr::NIL))); ASSERT_THAT(resp.GetVec(), ElementsAre(ArgType(RespExpr::NIL), ArgType(RespExpr::NIL)));
atomic_bool tx_empty = true; atomic_bool tx_empty = true;
shard_set->RunBriefInParallel([&](EngineShard* shard) { shard_set->RunBriefInParallel([&](EngineShard* shard) {
if (!shard->txq()->Empty()) if (!shard->txq()->Empty())
tx_empty.store(false); tx_empty.store(false);
@ -264,6 +263,7 @@ TEST_F(DflyEngineTest, MultiConsistent) {
mset_fb.Join(); mset_fb.Join();
fb.Join(); fb.Join();
ASSERT_FALSE(service_->IsLocked(0, kKey1)); ASSERT_FALSE(service_->IsLocked(0, kKey1));
ASSERT_FALSE(service_->IsLocked(0, kKey4)); ASSERT_FALSE(service_->IsLocked(0, kKey4));
ASSERT_FALSE(service_->IsShardSetLocked()); ASSERT_FALSE(service_->IsShardSetLocked());
@ -865,6 +865,7 @@ TEST_F(DflyEngineTest, Bug468) {
resp = Run({"exec"}); resp = Run({"exec"});
ASSERT_THAT(resp, ErrArg("not an integer")); ASSERT_THAT(resp, ErrArg("not an integer"));
ASSERT_FALSE(service_->IsLocked(0, "foo")); ASSERT_FALSE(service_->IsLocked(0, "foo"));
resp = Run({"eval", "return redis.call('set', 'foo', 'bar', 'EX', 'moo')", "1", "foo"}); resp = Run({"eval", "return redis.call('set', 'foo', 'bar', 'EX', 'moo')", "1", "foo"});

View file

@ -207,6 +207,11 @@ RespExpr BaseFamilyTest::Run(std::string_view id, ArgSlice slice) {
DCHECK(context->transaction == nullptr); DCHECK(context->transaction == nullptr);
auto cmd = absl::AsciiStrToUpper(slice.front());
if (cmd == "EVAL" || cmd == "EVALSHA" || cmd == "EXEC") {
shard_set->AwaitRunningOnShardQueue([](auto*) {}); // Wait for async UnlockMulti.
}
unique_lock lk(mu_); unique_lock lk(mu_);
last_cmd_dbg_info_ = context->last_command_debug; last_cmd_dbg_info_ = context->last_command_debug;

View file

@ -600,33 +600,28 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
// Runs in the coordinator fiber. // Runs in the coordinator fiber.
void Transaction::UnlockMulti() { void Transaction::UnlockMulti() {
VLOG(1) << "UnlockMulti " << DebugId(); VLOG(1) << "UnlockMulti " << DebugId();
DCHECK(multi_); DCHECK(multi_);
using KeyList = vector<pair<std::string_view, LockCnt>>; DCHECK_GE(GetUseCount(), 1u); // Greater-equal because there may be callbacks in progress.
vector<KeyList> sharded_keys(shard_set->size());
// It's LE and not EQ because there may be callbacks in progress that increase use_count_. auto sharded_keys = make_shared<vector<KeyList>>(shard_set->size());
DCHECK_LE(1u, GetUseCount()); for (const auto& [key, cnt] : multi_->lock_counts) {
ShardId sid = Shard(key, sharded_keys->size());
for (const auto& k_v : multi_->lock_counts) { (*sharded_keys)[sid].emplace_back(key, cnt);
ShardId sid = Shard(k_v.first, sharded_keys.size());
sharded_keys[sid].push_back(k_v);
} }
uint32_t shard_journals_cnt = 0; unsigned shard_journals_cnt =
if (ServerState::tlocal()->journal()) { ServerState::tlocal()->journal() ? CalcMultiNumOfShardJournals() : 0;
shard_journals_cnt = CalcMultiNumOfShardJournals();
}
uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed); uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed);
DCHECK_EQ(prev, 0u); DCHECK_EQ(prev, 0u);
use_count_.fetch_add(shard_data_.size(), std::memory_order_relaxed);
for (ShardId i = 0; i < shard_data_.size(); ++i) { for (ShardId i = 0; i < shard_data_.size(); ++i) {
shard_set->Add( shard_set->Add(i, [this, sharded_keys, shard_journals_cnt]() {
i, [&] { UnlockMultiShardCb(sharded_keys, EngineShard::tlocal(), shard_journals_cnt); }); this->UnlockMultiShardCb(*sharded_keys, EngineShard::tlocal(), shard_journals_cnt);
intrusive_ptr_release(this);
});
} }
WaitForShardCallbacks();
DCHECK_GE(GetUseCount(), 1u);
VLOG(1) << "UnlockMultiEnd " << DebugId(); VLOG(1) << "UnlockMultiEnd " << DebugId();
} }

View file

@ -227,7 +227,8 @@ class Transaction {
unsigned cnt[2] = {0, 0}; unsigned cnt[2] = {0, 0};
}; };
using KeyList = std::vector<std::pair<std::string_view, LockCnt>>; // owned std::string because callbacks its used in run fully async and can outlive the entries.
using KeyList = std::vector<std::pair<std::string, LockCnt>>;
struct PerShardData { struct PerShardData {
PerShardData(PerShardData&&) noexcept { PerShardData(PerShardData&&) noexcept {