feat(server): Check locks in heartbeat, allow multiple tx in replica (#815)

This commit is contained in:
Vladislav 2023-02-17 16:50:44 +03:00 committed by GitHub
parent f92403fb09
commit 2b40a7f324
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 59 additions and 34 deletions

View file

@ -723,6 +723,14 @@ void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
}
}
bool DbSlice::CheckLock(IntentLock::Mode mode, DbIndex dbid, string_view key) const {
KeyLockArgs args;
args.db_index = dbid;
args.args = ArgSlice{&key, 1};
args.key_step = 1;
return CheckLock(mode, args);
}
bool DbSlice::CheckLock(IntentLock::Mode mode, const KeyLockArgs& lock_args) const {
DCHECK(!lock_args.args.empty());
const auto& lt = db_arr_[lock_args.db_index]->trans_locks;
@ -862,7 +870,13 @@ auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteEx
auto& db = *db_arr_[cntx.db_index];
DeleteExpiredStats result;
std::string stash;
auto cb = [&](ExpireIterator it) {
auto key = it->first.GetSlice(&stash);
if (!CheckLock(IntentLock::EXCLUSIVE, cntx.db_index, key))
return;
result.traversed++;
time_t ttl = ExpireTime(it) - cntx.time_now_ms;
if (ttl <= 0) {

View file

@ -223,7 +223,10 @@ class DbSlice {
db_arr_[db_index]->Release(m, key, count);
}
// Returns true if all keys can be locked under m. Does not lock them though.
// Returns true if the key can be locked under m. Does not lock.
bool CheckLock(IntentLock::Mode m, DbIndex dbid, std::string_view key) const;
// Returns true if all keys can be locked under m. Does not lock.
bool CheckLock(IntentLock::Mode m, const KeyLockArgs& lock_args) const;
size_t db_array_size() const {

View file

@ -482,6 +482,7 @@ void EngineShard::Heartbeat() {
db_slice_.FreeMemWithEvictionStep(i, redline - db_slice_.memory_budget());
}
}
// Journal entries for expired entries are not writen to socket in the loop above.
// Trigger write to socket when loop finishes.
if (auto journal = EngineShard::tlocal()->journal(); journal) {

View file

@ -834,12 +834,9 @@ facade::ConnectionStats* Service::GetThreadLocalConnectionStats() {
bool Service::IsLocked(DbIndex db_index, std::string_view key) const {
ShardId sid = Shard(key, shard_count());
KeyLockArgs args;
args.db_index = db_index;
args.args = ArgSlice{&key, 1};
args.key_step = 1;
bool is_open = pp_.at(sid)->AwaitBrief(
[args] { return EngineShard::tlocal()->db_slice().CheckLock(IntentLock::EXCLUSIVE, args); });
bool is_open = pp_.at(sid)->AwaitBrief([db_index, key] {
return EngineShard::tlocal()->db_slice().CheckLock(IntentLock::EXCLUSIVE, db_index, key);
});
return !is_open;
}

View file

@ -1190,12 +1190,9 @@ error_code Replica::SendCommand(string_view command, ReqSerializer* serializer)
bool Replica::TransactionData::AddEntry(journal::ParsedEntry&& entry) {
++journal_rec_count;
// Expiry joins multi transaction or runs standalone.
if (entry.opcode == journal::Op::EXPIRED) {
entry.opcode = commands.empty() ? journal::Op::COMMAND : journal::Op::MULTI_COMMAND;
}
switch (entry.opcode) {
case journal::Op::EXPIRED:
case journal::Op::COMMAND:
commands.push_back(std::move(entry.cmd));
[[fallthrough]];
@ -1218,32 +1215,40 @@ bool Replica::TransactionData::IsGlobalCmd() const {
return commands.size() == 1 && commands.front().cmd_args.size() == 1;
}
// Expired entries within MULTI...EXEC sequence which belong to different database
// should be executed outside the multi transaciton.
bool Replica::TransactionReader::ReturnEntryOOO(const TransactionData& tx_data,
const journal::ParsedEntry& entry) {
return !tx_data.commands.empty() && entry.opcode == journal::Op::EXPIRED &&
tx_data.dbid != entry.dbid;
Replica::TransactionData Replica::TransactionData::FromSingle(journal::ParsedEntry&& entry) {
TransactionData data;
DCHECK(data.AddEntry(std::move(entry)));
return data;
}
auto Replica::TransactionReader::NextTxData(JournalReader* reader, Context* cntx)
-> optional<TransactionData> {
io::Result<journal::ParsedEntry> res;
TransactionData next_tx;
std::swap(saved_data_, next_tx);
do {
while (true) {
if (res = reader->ReadEntry(); !res) {
cntx->ReportError(res.error());
return std::nullopt;
}
if (ReturnEntryOOO(next_tx, *res)) {
std::swap(saved_data_, next_tx);
// Check if journal command can be executed right away.
// Expiration checks lock on master, so it never conflicts with running multi transactions.
if (res->opcode == journal::Op::EXPIRED || res->opcode == journal::Op::COMMAND)
return TransactionData::FromSingle(std::move(res.value()));
// Otherwise, continue building multi command.
DCHECK(res->opcode == journal::Op::MULTI_COMMAND || res->opcode == journal::Op::EXEC);
DCHECK(res->txid > 0);
auto txid = res->txid;
auto& txdata = current_[txid];
if (txdata.AddEntry(std::move(res.value()))) {
auto out = std::move(txdata);
current_.erase(txid);
return out;
}
}
} while (!cntx->IsCancelled() && !next_tx.AddEntry(std::move(*res)));
return cntx->IsCancelled() ? std::nullopt : make_optional(std::move(next_tx));
return std::nullopt;
}
} // namespace dfly

View file

@ -55,25 +55,30 @@ class Replica {
// This class holds the commands of transaction in single shard.
// Once all commands recieved the command can be executed.
struct TransactionData {
// Update the data from ParsedEntry and return if all shard transaction commands were recieved.
// Update the data from ParsedEntry and return true if all shard transaction commands were
// recieved.
bool AddEntry(journal::ParsedEntry&& entry);
bool IsGlobalCmd() const;
TxId txid;
DbIndex dbid;
uint32_t shard_cnt;
std::vector<journal::ParsedEntry::CmdData> commands;
// Counting the number of journal records in specific transaction in specific shard.
uint32_t journal_rec_count = 0;
static TransactionData FromSingle(journal::ParsedEntry&& entry);
TxId txid{0};
DbIndex dbid{0};
uint32_t shard_cnt{0};
std::vector<journal::ParsedEntry::CmdData> commands{0};
uint32_t journal_rec_count{0}; // Count number of source entries to check offset.
};
// Utility for reading TransactionData from a journal reader.
// The journal stream can contain interleaved data for multiple multi transactions,
// expiries and out of order executed transactions that need to be grouped on the replica side.
struct TransactionReader {
std::optional<TransactionData> NextTxData(JournalReader* reader, Context* cntx);
static bool ReturnEntryOOO(const TransactionData& tx_data, const journal::ParsedEntry& entry);
private:
TransactionData saved_data_{};
// Stores ongoing multi transaction data.
absl::flat_hash_map<TxId, TransactionData> current_;
};
// Coorindator for multi shard execution.