fix(server): issue with invalid command inside multi #468 (#469)

fix(server): Fix redundant locking in multi transactions

Apparently commands that scheduled themselves using "Schedule()" call
crashed under multi transactions. This has been fixed now inside Transaction code.
It has been covered by DflyEngineTest.MultiRename test.

In addition, this PR fixes #468 that opened the rabbit hole of
nasty bugs under multi transactions.

Signed-off-by: Boaz Sade <boaz@dragonflydb.io>
Signed-off-by: Roman Gershman <roman@dragonflydb.io>

Signed-off-by: Boaz Sade <boaz@dragonflydb.io>
Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Boaz Sade 2022-11-09 23:31:19 +02:00 committed by GitHub
parent 4567925440
commit 91ab423e6a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
5 changed files with 112 additions and 47 deletions

View file

@ -212,7 +212,10 @@ TEST_F(DflyEngineTest, MultiWeirdCommands) {
}
TEST_F(DflyEngineTest, MultiRename) {
RespExpr resp = Run({"multi"});
RespExpr resp = Run({"mget", kKey1, kKey4});
ASSERT_EQ(1, GetDebugInfo().shards_count);
resp = Run({"multi"});
ASSERT_EQ(resp, "OK");
Run({"set", kKey1, "1"});
@ -222,9 +225,21 @@ TEST_F(DflyEngineTest, MultiRename) {
ASSERT_THAT(resp, ArrLen(2));
EXPECT_THAT(resp.GetVec(), ElementsAre("OK", "OK"));
ASSERT_FALSE(service_->IsLocked(0, kKey1));
ASSERT_FALSE(service_->IsLocked(0, kKey4));
ASSERT_FALSE(service_->IsShardSetLocked());
// Now rename with keys spawning multiple shards.
Run({"mget", kKey4, kKey2});
ASSERT_EQ(2, GetDebugInfo().shards_count);
Run({"multi"});
resp = Run({"rename", kKey4, kKey2});
ASSERT_EQ(resp, "QUEUED");
resp = Run({"exec"});
EXPECT_EQ(resp, "OK");
EXPECT_FALSE(service_->IsLocked(0, kKey1));
EXPECT_FALSE(service_->IsLocked(0, kKey2));
EXPECT_FALSE(service_->IsLocked(0, kKey4));
EXPECT_FALSE(service_->IsShardSetLocked());
}
TEST_F(DflyEngineTest, MultiHop) {
@ -281,7 +296,9 @@ TEST_F(DflyEngineTest, FlushDb) {
}
TEST_F(DflyEngineTest, Eval) {
auto resp = Run({"incrby", "foo", "42"});
RespExpr resp;
resp = Run({"incrby", "foo", "42"});
EXPECT_THAT(resp, IntArg(42));
resp = Run({"eval", "return redis.call('get', 'foo')", "0"});
@ -294,6 +311,7 @@ TEST_F(DflyEngineTest, Eval) {
resp = Run({"eval", "return redis.call('get', 'foo')", "1", "foo"});
EXPECT_THAT(resp, "42");
ASSERT_FALSE(service_->IsLocked(0, "foo"));
resp = Run({"eval", "return redis.call('get', KEYS[1])", "1", "foo"});
EXPECT_THAT(resp, "42");
@ -687,6 +705,22 @@ TEST_F(DflyEngineTest, Watch) {
ASSERT_THAT(Run({"exec"}), kExecSuccess);
}
TEST_F(DflyEngineTest, Bug468) {
RespExpr resp = Run({"multi"});
ASSERT_EQ(resp, "OK");
resp = Run({"SET", "foo", "bar", "EX", "moo"});
ASSERT_EQ(resp, "QUEUED");
resp = Run({"exec"});
ASSERT_THAT(resp, ErrArg("not an integer"));
ASSERT_FALSE(service_->IsLocked(0, "foo"));
resp = Run({"eval", "return redis.call('set', 'foo', 'bar', 'EX', 'moo')", "1", "foo"});
ASSERT_THAT(resp, ErrArg("not an integer"));
ASSERT_FALSE(service_->IsLocked(0, "foo"));
}
// TODO: to test transactions with a single shard since then all transactions become local.
// To consider having a parameter in dragonfly engine controlling number of shards
// unconditionally from number of cpus. TO TEST BLPOP under multi for single/multi argument case.

View file

@ -209,9 +209,7 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) {
time_point tp =
msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max();
bool is_multi = t->IsMulti();
if (!is_multi) {
t->Schedule();
}
t->Schedule();
auto* stats = ServerState::tl_connection_stats();
@ -352,8 +350,8 @@ OpResult<string> Peek(const OpArgs& op_args, string_view key, ListDir dir, bool
quicklist* ql = GetQL(it_res.value()->second);
quicklistEntry entry = container_utils::QLEntry();
quicklistIter* iter = (dir == ListDir::LEFT) ? quicklistGetIterator(ql, AL_START_HEAD) :
quicklistGetIterator(ql, AL_START_TAIL);
quicklistIter* iter = (dir == ListDir::LEFT) ? quicklistGetIterator(ql, AL_START_HEAD)
: quicklistGetIterator(ql, AL_START_TAIL);
CHECK(quicklistNext(iter, &entry));
quicklistReleaseIterator(iter);
@ -848,7 +846,7 @@ void ListFamily::MoveGeneric(ConnectionContext* cntx, string_view src, string_vi
if (cntx->transaction->unique_shard_cnt() == 1) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpMoveSingleShard(t->GetOpArgs(shard), src, dest, src_dir, dest_dir);
return OpMoveSingleShard(t->GetOpArgs(shard), src, dest, src_dir, dest_dir);
};
result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
@ -865,11 +863,11 @@ void ListFamily::MoveGeneric(ConnectionContext* cntx, string_view src, string_vi
//
cntx->transaction->Schedule();
auto cb = [&](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
DCHECK_EQ(1u, args.size());
bool is_dest = args.front() == dest;
find_res[is_dest] = Peek(t->GetOpArgs(shard), args.front(), src_dir, !is_dest);
return OpStatus::OK;
auto args = t->ShardArgsInShard(shard->shard_id());
DCHECK_EQ(1u, args.size());
bool is_dest = args.front() == dest;
find_res[is_dest] = Peek(t->GetOpArgs(shard), args.front(), src_dir, !is_dest);
return OpStatus::OK;
};
cntx->transaction->Execute(move(cb), false);
@ -881,18 +879,18 @@ void ListFamily::MoveGeneric(ConnectionContext* cntx, string_view src, string_vi
} else {
// Everything is ok, lets proceed with the mutations.
auto cb = [&](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
bool is_dest = args.front() == dest;
OpArgs op_args = t->GetOpArgs(shard);
auto args = t->ShardArgsInShard(shard->shard_id());
bool is_dest = args.front() == dest;
OpArgs op_args = t->GetOpArgs(shard);
if (is_dest) {
string_view val{find_res[0].value()};
absl::Span<string_view> span{&val, 1};
OpPush(op_args, args.front(), dest_dir, false, span);
} else {
OpPop(op_args, args.front(), src_dir, 1, false);
}
return OpStatus::OK;
if (is_dest) {
string_view val{find_res[0].value()};
absl::Span<string_view> span{&val, 1};
OpPush(op_args, args.front(), dest_dir, false, span);
} else {
OpPop(op_args, args.front(), src_dir, 1, false);
}
return OpStatus::OK;
};
cntx->transaction->Execute(move(cb), true);
result = std::move(find_res[0].value());

View file

@ -659,8 +659,10 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
return (*cntx)->SendError("script tried accessing undeclared key");
}
}
dfly_cntx->transaction->SetExecCmd(cid);
OpStatus st = dfly_cntx->transaction->InitByArgs(dfly_cntx->conn_state.db_index, args);
if (st != OpStatus::OK) {
return (*cntx)->SendError(st);
}
@ -1164,7 +1166,7 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
}
}
scmd.descr->Invoke(cmd_arg_list, cntx);
if (rb->GetError())
if (rb->GetError()) // checks for i/o error, not logical error.
break;
}

View file

@ -48,7 +48,7 @@ Transaction::Transaction(const CommandId* cid) : cid_(cid) {
multi_->multi_opts = cid->opt_mask();
if (cmd_name == "EVAL" || cmd_name == "EVALSHA") {
multi_->incremental = false; // we lock all the keys at once.
multi_->is_expanding = false; // we lock all the keys at once.
}
}
}
@ -105,7 +105,7 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) {
DCHECK_LT(key_index.start, args.size());
DCHECK_GT(key_index.start, 0u);
bool incremental_locking = multi_ && multi_->incremental;
bool incremental_locking = multi_ && multi_->is_expanding;
bool single_key = !multi_ && key_index.HasSingleKey();
bool needs_reverse_mapping = cid_->opt_mask() & CO::REVERSE_MAPPING;
@ -151,8 +151,12 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) {
if (multi_) {
mode = Mode();
multi_->keys.clear();
tmp_space.uniq_keys.clear();
DCHECK_LT(int(mode), 2);
// With EVAL, we call this function for EVAL itself as well as for each command
// for eval. currently, we lock everything only during the eval call.
should_record_locks = incremental_locking || !multi_->locks_recorded;
}
@ -175,7 +179,11 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) {
shard_index[sid].original_index.push_back(i - 1);
if (should_record_locks && tmp_space.uniq_keys.insert(key).second) {
multi_->locks[key].cnt[int(mode)]++;
if (multi_->is_expanding) {
multi_->keys.push_back(key);
} else {
multi_->locks[key].cnt[int(mode)]++;
}
};
if (key_index.step == 2) { // value
@ -272,13 +280,12 @@ void Transaction::SetExecCmd(const CommandId* cid) {
DCHECK(multi_);
DCHECK(!cb_);
// The order is important, we are Schedule() for multi transaction before overriding cid_.
// TODO: The flow is ugly. I should introduce a proper interface for Multi transactions
// The order is important, we call Schedule for multi transaction before overriding cid_.
// TODO: The flow is ugly. Consider introducing a proper interface for Multi transactions
// like SetupMulti/TurndownMulti. We already have UnlockMulti that should be part of
// TurndownMulti.
if (txid_ == 0) {
Schedule();
ScheduleInternal();
}
unique_shard_cnt_ = 0;
@ -313,7 +320,7 @@ bool Transaction::RunInShard(EngineShard* shard) {
bool was_suspended = sd.local_mask & SUSPENDED_Q;
bool awaked_prerun = (sd.local_mask & AWAKED_Q) != 0;
bool incremental_lock = multi_ && multi_->incremental;
bool incremental_lock = multi_ && multi_->is_expanding;
// For multi we unlock transaction (i.e. its keys) in UnlockMulti() call.
// Therefore we differentiate between concluding, which says that this specific
@ -341,7 +348,7 @@ bool Transaction::RunInShard(EngineShard* shard) {
// if transaction is suspended (blocked in watched queue), then it's a noop.
OpStatus status = OpStatus::OK;
if (!was_suspended) {
if (!was_suspended) {
status = cb_(this, shard);
}
@ -527,6 +534,16 @@ void Transaction::ScheduleInternal() {
}
}
void Transaction::LockMulti() {
DCHECK(multi_ && multi_->is_expanding);
IntentLock::Mode mode = Mode();
for (auto key : multi_->keys) {
multi_->locks[key].cnt[int(mode)]++;
}
multi_->keys.clear();
}
// Optimized "Schedule and execute" function for the most common use-case of a single hop
// transactions like set/mset/mget etc. Does not apply for more complicated cases like RENAME or
// BLPOP where a data must be read from multiple shards before performing another hop.
@ -574,8 +591,14 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
shard_set->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier.
} else {
// Transaction spans multiple shards or it's global (like flushdb) or multi.
if (!multi_)
// Note that the logic here is a bit different from the public Schedule() function.
if (multi_) {
if (multi_->is_expanding)
LockMulti();
} else {
ScheduleInternal();
}
ExecuteAsync();
}
@ -616,6 +639,14 @@ void Transaction::UnlockMulti() {
VLOG(1) << "UnlockMultiEnd " << DebugId();
}
void Transaction::Schedule() {
if (multi_ && multi_->is_expanding) {
LockMulti();
} else {
ScheduleInternal();
}
}
// Runs in coordinator thread.
void Transaction::Execute(RunnableType cb, bool conclude) {
DCHECK(coordinator_state_ & COORD_SCHED);
@ -1137,8 +1168,8 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) {
return false;
}
DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask
<< " by " << committed_txid;
DVLOG(1) << "NotifySuspended " << DebugId() << ", local_mask:" << local_mask << " by "
<< committed_txid;
// local_mask could be awaked (i.e. not suspended) if the transaction has been
// awakened by another key or awakened by the same key multiple times.

View file

@ -24,8 +24,8 @@ namespace dfly {
class EngineShard;
class BlockingController;
using facade::OpStatus;
using facade::OpResult;
using facade::OpStatus;
class Transaction {
friend class BlockingController;
@ -101,9 +101,7 @@ class Transaction {
// Schedules a transaction. Usually used for multi-hop transactions like Rename or BLPOP.
// For single hop, use ScheduleSingleHop instead.
void Schedule() {
ScheduleInternal();
}
void Schedule();
// if conclude is true, removes the transaction from the pending queue.
void Execute(RunnableType cb, bool conclude);
@ -188,7 +186,6 @@ class Transaction {
}
private:
struct LockCnt {
unsigned cnt[2] = {0, 0};
};
@ -200,6 +197,7 @@ class Transaction {
}
void ScheduleInternal();
void LockMulti();
void ExpireBlocking();
void ExecuteAsync();
@ -259,12 +257,14 @@ class Transaction {
};
enum { kPerShardSize = sizeof(PerShardData) };
struct Multi {
absl::flat_hash_map<std::string_view, LockCnt> locks;
std::vector<std::string_view> keys;
uint32_t multi_opts = 0; // options of the parent transaction.
bool incremental = true;
// Whether this transaction can lock more keys during its progress.
bool is_expanding = true;
bool locks_recorded = false;
};