Refactor transaction (#697)

* refactor(server): Transaction naming

Signed-off-by: Vladislav <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2023-01-17 15:53:12 +03:00 committed by GitHub
parent b2edf9c848
commit 9536c0a645
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 299 additions and 267 deletions

View file

@ -543,7 +543,7 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) {
ShardId dest_shard = Shard(dest_key, result_set.size());
auto shard_bitop = [&](Transaction* t, EngineShard* shard) {
ArgSlice largs = t->ShardArgsInShard(shard->shard_id());
ArgSlice largs = t->GetShardArgs(shard->shard_id());
DCHECK(!largs.empty());
if (shard->shard_id() == dest_shard) {

View file

@ -90,7 +90,7 @@ void BlockingController::RunStep(Transaction* completed_t) {
if (completed_t) {
awakened_transactions_.erase(completed_t);
auto dbit = watched_dbs_.find(completed_t->db_index());
auto dbit = watched_dbs_.find(completed_t->GetDbIndex());
if (dbit != watched_dbs_.end()) {
DbWatchTable& wt = *dbit->second;
@ -100,7 +100,7 @@ void BlockingController::RunStep(Transaction* completed_t) {
for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) {
string_view key = lock_args.args[i];
if (wt.AddAwakeEvent(WatchQueue::ACTIVE, key)) {
awakened_indices_.emplace(completed_t->db_index());
awakened_indices_.emplace(completed_t->GetDbIndex());
}
}
}
@ -139,7 +139,7 @@ void BlockingController::RunStep(Transaction* completed_t) {
void BlockingController::AddWatched(ArgSlice keys, Transaction* trans) {
VLOG(1) << "AddWatched [" << owner_->shard_id() << "] " << trans->DebugId();
auto [dbit, added] = watched_dbs_.emplace(trans->db_index(), nullptr);
auto [dbit, added] = watched_dbs_.emplace(trans->GetDbIndex(), nullptr);
if (added) {
dbit->second.reset(new DbWatchTable);
}
@ -154,7 +154,7 @@ void BlockingController::AddWatched(ArgSlice keys, Transaction* trans) {
if (!res->second->items.empty()) {
Transaction* last = res->second->items.back().get();
DCHECK_GT(last->use_count(), 0u);
DCHECK_GT(last->GetUseCount(), 0u);
// Duplicate keys case. We push only once per key.
if (last == trans)
@ -169,7 +169,7 @@ void BlockingController::AddWatched(ArgSlice keys, Transaction* trans) {
void BlockingController::RemoveWatched(ArgSlice keys, Transaction* trans) {
VLOG(1) << "RemoveWatched [" << owner_->shard_id() << "] " << trans->DebugId();
auto dbit = watched_dbs_.find(trans->db_index());
auto dbit = watched_dbs_.find(trans->GetDbIndex());
if (dbit == watched_dbs_.end())
return;

View file

@ -68,7 +68,7 @@ TEST_F(BlockingControllerTest, Basic) {
shard_set->Await(0, [&] {
EngineShard* shard = EngineShard::tlocal();
BlockingController bc(shard);
auto keys = trans_->ShardArgsInShard(shard->shard_id());
auto keys = trans_->GetShardArgs(shard->shard_id());
bc.AddWatched(keys, trans_.get());
EXPECT_EQ(1, bc.NumWatched(0));
@ -81,7 +81,7 @@ TEST_F(BlockingControllerTest, Timeout) {
time_point tp = steady_clock::now() + chrono::milliseconds(10);
trans_->Schedule();
auto cb = [&](Transaction* t, EngineShard* shard) { return trans_->ShardArgsInShard(0); };
auto cb = [&](Transaction* t, EngineShard* shard) { return trans_->GetShardArgs(0); };
bool res = trans_->WaitOnWatch(tp, cb);

View file

@ -294,7 +294,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) {
DCHECK(continuation_trans_ == nullptr)
<< continuation_trans_->DebugId() << " when polling " << trans->DebugId();
CHECK_EQ(committed_txid_, trans->notify_txid());
CHECK_EQ(committed_txid_, trans->GetNotifyTxid());
bool keep = trans->RunInShard(this);
if (keep) {
return;

View file

@ -307,14 +307,14 @@ class Renamer {
void Renamer::Find(Transaction* t) {
auto cb = [this](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
auto args = t->GetShardArgs(shard->shard_id());
CHECK_EQ(1u, args.size());
FindResult* res = (shard->shard_id() == src_sid_) ? &src_res_ : &dest_res_;
res->key = args.front();
auto& db_slice = EngineShard::tlocal()->db_slice();
auto [it, exp_it] = db_slice.FindExt(t->db_context(), res->key);
auto [it, exp_it] = db_slice.FindExt(t->GetDbContext(), res->key);
res->found = IsValid(it);
if (IsValid(it)) {
@ -357,7 +357,7 @@ void Renamer::Finalize(Transaction* t, bool skip_exist_dest) {
OpStatus Renamer::MoveSrc(Transaction* t, EngineShard* es) {
if (es->shard_id() == src_sid_) { // Handle source key.
// TODO: to call PreUpdate/PostUpdate.
auto it = es->db_slice().FindExt(t->db_context(), src_res_.key).first;
auto it = es->db_slice().FindExt(t->GetDbContext(), src_res_.key).first;
CHECK(IsValid(it));
// We distinguish because of the SmallString that is pinned to its thread by design,
@ -370,7 +370,7 @@ OpStatus Renamer::MoveSrc(Transaction* t, EngineShard* es) {
pv_ = std::move(it->second);
it->second.SetExpire(has_expire);
}
CHECK(es->db_slice().Del(t->db_index(), it)); // delete the entry with empty value in it.
CHECK(es->db_slice().Del(t->GetDbIndex(), it)); // delete the entry with empty value in it.
}
return OpStatus::OK;
@ -380,7 +380,7 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) {
if (es->shard_id() != src_sid_) {
auto& db_slice = es->db_slice();
string_view dest_key = dest_res_.key;
PrimeIterator dest_it = db_slice.FindExt(t->db_context(), dest_key).first;
PrimeIterator dest_it = db_slice.FindExt(t->GetDbContext(), dest_key).first;
bool is_prior_list = false;
if (IsValid(dest_it)) {
@ -393,18 +393,18 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) {
dest_it->second = std::move(pv_);
}
dest_it->second.SetExpire(has_expire); // preserve expire flag.
db_slice.UpdateExpire(t->db_index(), dest_it, src_res_.expire_ts);
db_slice.UpdateExpire(t->GetDbIndex(), dest_it, src_res_.expire_ts);
} else {
if (src_res_.ref_val.ObjType() == OBJ_STRING) {
pv_.SetString(str_val_);
}
dest_it = db_slice.AddNew(t->db_context(), dest_key, std::move(pv_), src_res_.expire_ts);
dest_it = db_slice.AddNew(t->GetDbContext(), dest_key, std::move(pv_), src_res_.expire_ts);
}
dest_it->first.SetSticky(src_res_.sticky);
if (!is_prior_list && dest_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) {
es->blocking_controller()->AwakeWatched(t->db_index(), dest_key);
es->blocking_controller()->AwakeWatched(t->GetDbIndex(), dest_key);
}
}
@ -621,7 +621,7 @@ void GenericFamily::Del(CmdArgList args, ConnectionContext* cntx) {
bool is_mc = cntx->protocol() == Protocol::MEMCACHE;
auto cb = [&result](const Transaction* t, EngineShard* shard) {
ArgSlice args = t->ShardArgsInShard(shard->shard_id());
ArgSlice args = t->GetShardArgs(shard->shard_id());
auto res = OpDel(t->GetOpArgs(shard), args);
result.fetch_add(res.value_or(0), memory_order_relaxed);
@ -672,7 +672,7 @@ void GenericFamily::Exists(CmdArgList args, ConnectionContext* cntx) {
atomic_uint32_t result{0};
auto cb = [&result](Transaction* t, EngineShard* shard) {
ArgSlice args = t->ShardArgsInShard(shard->shard_id());
ArgSlice args = t->GetShardArgs(shard->shard_id());
auto res = OpExists(t->GetOpArgs(shard), args);
result.fetch_add(res.value_or(0), memory_order_relaxed);
@ -817,7 +817,7 @@ void GenericFamily::Stick(CmdArgList args, ConnectionContext* cntx) {
atomic_uint32_t result{0};
auto cb = [&result](const Transaction* t, EngineShard* shard) {
ArgSlice args = t->ShardArgsInShard(shard->shard_id());
ArgSlice args = t->GetShardArgs(shard->shard_id());
auto res = OpStick(t->GetOpArgs(shard), args);
result.fetch_add(res.value_or(0), memory_order_relaxed);
@ -1159,7 +1159,7 @@ void GenericFamily::Type(CmdArgList args, ConnectionContext* cntx) {
std::string_view key = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<int> {
auto it = shard->db_slice().FindExt(t->db_context(), key).first;
auto it = shard->db_slice().FindExt(t->GetDbContext(), key).first;
if (!it.is_done()) {
return it->second.ObjType();
} else {
@ -1177,7 +1177,7 @@ void GenericFamily::Type(CmdArgList args, ConnectionContext* cntx) {
void GenericFamily::Time(CmdArgList args, ConnectionContext* cntx) {
uint64_t now_usec;
if (cntx->transaction) {
now_usec = cntx->transaction->db_context().time_now_ms * 1000;
now_usec = cntx->transaction->GetDbContext().time_now_ms * 1000;
} else {
now_usec = absl::GetCurrentTimeNanos() / 1000;
}
@ -1193,7 +1193,7 @@ OpResult<void> GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des
Transaction* transaction = cntx->transaction;
if (transaction->unique_shard_cnt() == 1) {
if (transaction->GetUniqueShardCnt() == 1) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest);
};
@ -1249,14 +1249,14 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) {
OpResult<uint64_t> GenericFamily::OpTtl(Transaction* t, EngineShard* shard, string_view key) {
auto& db_slice = shard->db_slice();
auto [it, expire_it] = db_slice.FindExt(t->db_context(), key);
auto [it, expire_it] = db_slice.FindExt(t->GetDbContext(), key);
if (!IsValid(it))
return OpStatus::KEY_NOTFOUND;
if (!IsValid(expire_it))
return OpStatus::SKIPPED;
int64_t ttl_ms = db_slice.ExpireTime(expire_it) - t->db_context().time_now_ms;
int64_t ttl_ms = db_slice.ExpireTime(expire_it) - t->GetDbContext().time_now_ms;
DCHECK_GT(ttl_ms, 0); // Otherwise FindExt would return null.
return ttl_ms;
}

View file

@ -953,7 +953,7 @@ void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) {
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<StringVec> {
auto& db_slice = shard->db_slice();
auto it_res = db_slice.Find(t->db_context(), key, OBJ_HASH);
auto it_res = db_slice.Find(t->GetDbContext(), key, OBJ_HASH);
if (!it_res)
return it_res.status();

View file

@ -124,10 +124,10 @@ OpResult<ShardFFResult> FindFirst(Transaction* trans) {
fill(find_res.begin(), find_res.end(), OpStatus::KEY_NOTFOUND);
auto cb = [&find_res](auto* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
auto args = t->GetShardArgs(shard->shard_id());
OpResult<pair<PrimeIterator, unsigned>> ff_res =
shard->db_slice().FindFirst(t->db_context(), args);
shard->db_slice().FindFirst(t->GetDbContext(), args);
if (ff_res) {
FFResult ff_result(ff_res->first->first.AsRef(), ff_res->second);
@ -241,7 +241,7 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) {
// Block
auto wcb = [&](Transaction* t, EngineShard* shard) {
return t->ShardArgsInShard(shard->shard_id());
return t->GetShardArgs(shard->shard_id());
};
++stats->num_blocked_clients;
@ -277,16 +277,16 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) {
if (shard->shard_id() == ff_result_.sid) {
ff_result_.key.GetString(&key_);
auto& db_slice = shard->db_slice();
auto it_res = db_slice.Find(t->db_context(), key_, OBJ_LIST);
auto it_res = db_slice.Find(t->GetDbContext(), key_, OBJ_LIST);
CHECK(it_res); // must exist and must be ok.
PrimeIterator it = *it_res;
quicklist* ql = GetQL(it->second);
db_slice.PreUpdate(t->db_index(), it);
db_slice.PreUpdate(t->GetDbIndex(), it);
value_ = ListPop(dir_, ql);
db_slice.PostUpdate(t->db_index(), it, key_);
db_slice.PostUpdate(t->GetDbIndex(), it, key_);
if (quicklistCount(ql) == 0) {
CHECK(shard->db_slice().Del(t->db_index(), it));
CHECK(shard->db_slice().Del(t->GetDbIndex(), it));
}
}
@ -475,7 +475,7 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, ListDir dir, u
OpResult<string> MoveTwoShards(Transaction* trans, string_view src, string_view dest,
ListDir src_dir, ListDir dest_dir, bool conclude_on_error) {
DCHECK_EQ(2u, trans->unique_shard_cnt());
DCHECK_EQ(2u, trans->GetUniqueShardCnt());
OpResult<string> find_res[2];
OpResult<string> result;
@ -487,7 +487,7 @@ OpResult<string> MoveTwoShards(Transaction* trans, string_view src, string_view
// the destination.
//
auto cb = [&](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
auto args = t->GetShardArgs(shard->shard_id());
DCHECK_EQ(1u, args.size());
bool is_dest = args.front() == dest;
find_res[is_dest] = Peek(t->GetOpArgs(shard), args.front(), src_dir, !is_dest);
@ -505,7 +505,7 @@ OpResult<string> MoveTwoShards(Transaction* trans, string_view src, string_view
} else {
// Everything is ok, lets proceed with the mutations.
auto cb = [&](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
auto args = t->GetShardArgs(shard->shard_id());
bool is_dest = args.front() == dest;
OpArgs op_args = t->GetOpArgs(shard);
@ -782,7 +782,7 @@ void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, Lis
ListDir dest_dir) {
OpResult<string> result;
if (cntx->transaction->unique_shard_cnt() == 1) {
if (cntx->transaction->GetUniqueShardCnt() == 1) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpMoveSingleShard(t->GetOpArgs(shard), src, dest, src_dir, dest_dir);
};
@ -857,7 +857,7 @@ OpResult<string> BPopPusher::Run(Transaction* t, unsigned msec) {
t->Schedule();
if (t->unique_shard_cnt() == 1) {
if (t->GetUniqueShardCnt() == 1) {
return RunSingle(t, tp);
}

View file

@ -674,7 +674,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
return (*cntx)->SendError(st);
dfly_cntx->transaction = dist_trans.get();
dfly_cntx->last_command_debug.shards_count = dfly_cntx->transaction->unique_shard_cnt();
dfly_cntx->last_command_debug.shards_count = dfly_cntx->transaction->GetUniqueShardCnt();
} else {
dfly_cntx->transaction = nullptr;
}
@ -887,7 +887,7 @@ void Service::Watch(CmdArgList args, ConnectionContext* cntx) {
atomic_uint32_t keys_existed = 0;
auto cb = [&](Transaction* t, EngineShard* shard) {
ArgSlice largs = t->ShardArgsInShard(shard->shard_id());
ArgSlice largs = t->GetShardArgs(shard->shard_id());
for (auto k : largs) {
shard->db_slice().RegisterWatchedKey(cntx->db_index(), k, &exec_info);
}
@ -1081,7 +1081,7 @@ bool CheckWatchedKeyExpiry(ConnectionContext* cntx, const CommandRegistry& regis
atomic_uint32_t watch_exist_count{0};
auto cb = [&watch_exist_count](Transaction* t, EngineShard* shard) {
ArgSlice args = t->ShardArgsInShard(shard->shard_id());
ArgSlice args = t->GetShardArgs(shard->shard_id());
auto res = GenericFamily::OpExists(t->GetOpArgs(shard), args);
watch_exist_count.fetch_add(res.value_or(0), memory_order_relaxed);

View file

@ -1008,7 +1008,7 @@ void ServerFamily::BreakOnShutdown() {
void ServerFamily::FlushDb(CmdArgList args, ConnectionContext* cntx) {
DCHECK(cntx->transaction);
Drakarys(cntx->transaction, cntx->transaction->db_index());
Drakarys(cntx->transaction, cntx->transaction->GetDbIndex());
cntx->reply_builder()->SendOk();
}

View file

@ -717,19 +717,19 @@ class Mover {
};
OpStatus Mover::OpFind(Transaction* t, EngineShard* es) {
ArgSlice largs = t->ShardArgsInShard(es->shard_id());
ArgSlice largs = t->GetShardArgs(es->shard_id());
// In case both src and dest are in the same shard, largs size will be 2.
DCHECK_LE(largs.size(), 2u);
for (auto k : largs) {
unsigned index = (k == src_) ? 0 : 1;
OpResult<PrimeIterator> res = es->db_slice().Find(t->db_context(), k, OBJ_SET);
OpResult<PrimeIterator> res = es->db_slice().Find(t->GetDbContext(), k, OBJ_SET);
if (res && index == 0) { // successful src find.
DCHECK(!res->is_done());
const CompactObj& val = res.value()->second;
SetType st{val.RObjPtr(), val.Encoding()};
found_[0] = IsInSet(t->db_context(), st, member_);
found_[0] = IsInSet(t->GetDbContext(), st, member_);
} else {
found_[index] = res.status();
}
@ -739,7 +739,7 @@ OpStatus Mover::OpFind(Transaction* t, EngineShard* es) {
}
OpStatus Mover::OpMutate(Transaction* t, EngineShard* es) {
ArgSlice largs = t->ShardArgsInShard(es->shard_id());
ArgSlice largs = t->GetShardArgs(es->shard_id());
DCHECK_LE(largs.size(), 2u);
OpArgs op_args = t->GetOpArgs(es);
@ -868,7 +868,7 @@ OpResult<StringVec> OpDiff(const OpArgs& op_args, ArgSlice keys) {
// Read-only OpInter op on sets.
OpResult<StringVec> OpInter(const Transaction* t, EngineShard* es, bool remove_first) {
ArgSlice keys = t->ShardArgsInShard(es->shard_id());
ArgSlice keys = t->GetShardArgs(es->shard_id());
if (remove_first) {
keys.remove_prefix(1);
}
@ -876,14 +876,15 @@ OpResult<StringVec> OpInter(const Transaction* t, EngineShard* es, bool remove_f
StringVec result;
if (keys.size() == 1) {
OpResult<PrimeIterator> find_res = es->db_slice().Find(t->db_context(), keys.front(), OBJ_SET);
OpResult<PrimeIterator> find_res =
es->db_slice().Find(t->GetDbContext(), keys.front(), OBJ_SET);
if (!find_res)
return find_res.status();
PrimeValue& pv = find_res.value()->second;
if (IsDenseEncoding(pv)) {
StringSet* ss = (StringSet*)pv.RObjPtr();
ss->set_time(TimeNowSecRel(t->db_context().time_now_ms));
ss->set_time(TimeNowSecRel(t->GetDbContext().time_now_ms));
}
container_utils::IterateSet(find_res.value()->second,
@ -900,7 +901,7 @@ OpResult<StringVec> OpInter(const Transaction* t, EngineShard* es, bool remove_f
OpStatus status = OpStatus::OK;
for (size_t i = 0; i < keys.size(); ++i) {
OpResult<PrimeIterator> find_res = es->db_slice().Find(t->db_context(), keys[i], OBJ_SET);
OpResult<PrimeIterator> find_res = es->db_slice().Find(t->GetDbContext(), keys[i], OBJ_SET);
if (!find_res) {
if (status == OpStatus::OK || status == OpStatus::KEY_NOTFOUND ||
find_res.status() != OpStatus::KEY_NOTFOUND) {
@ -916,7 +917,7 @@ OpResult<StringVec> OpInter(const Transaction* t, EngineShard* es, bool remove_f
if (status != OpStatus::OK)
return status;
auto comp = [db_contx = t->db_context()](const SetType& left, const SetType& right) {
auto comp = [db_contx = t->GetDbContext()](const SetType& left, const SetType& right) {
return SetTypeLen(db_contx, left) < SetTypeLen(db_contx, right);
};
@ -931,7 +932,7 @@ OpResult<StringVec> OpInter(const Transaction* t, EngineShard* es, bool remove_f
while (intsetGet(is, ii++, &intele)) {
size_t j = 1;
for (j = 1; j < sets.size(); j++) {
if (sets[j].first != is && !IsInSet(t->db_context(), sets[j], intele))
if (sets[j].first != is && !IsInSet(t->GetDbContext(), sets[j], intele))
break;
}
@ -941,7 +942,7 @@ OpResult<StringVec> OpInter(const Transaction* t, EngineShard* es, bool remove_f
}
}
} else {
InterStrSet(t->db_context(), sets, &result);
InterStrSet(t->GetDbContext(), sets, &result);
}
return result;
@ -1068,11 +1069,11 @@ void SIsMember(CmdArgList args, ConnectionContext* cntx) {
string_view val = ArgS(args, 2);
auto cb = [&](Transaction* t, EngineShard* shard) {
OpResult<PrimeIterator> find_res = shard->db_slice().Find(t->db_context(), key, OBJ_SET);
OpResult<PrimeIterator> find_res = shard->db_slice().Find(t->GetDbContext(), key, OBJ_SET);
if (find_res) {
SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()};
return IsInSet(t->db_context(), st, val) ? OpStatus::OK : OpStatus::KEY_NOTFOUND;
return IsInSet(t->GetDbContext(), st, val) ? OpStatus::OK : OpStatus::KEY_NOTFOUND;
}
return find_res.status();
@ -1099,10 +1100,10 @@ void SMIsMember(CmdArgList args, ConnectionContext* cntx) {
memberships.reserve(vals.size());
auto cb = [&](Transaction* t, EngineShard* shard) {
OpResult<PrimeIterator> find_res = shard->db_slice().Find(t->db_context(), key, OBJ_SET);
OpResult<PrimeIterator> find_res = shard->db_slice().Find(t->GetDbContext(), key, OBJ_SET);
if (find_res) {
SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()};
FindInSet(memberships, t->db_context(), st, vals);
FindInSet(memberships, t->GetDbContext(), st, vals);
return OpStatus::OK;
}
return find_res.status();
@ -1164,7 +1165,7 @@ void SCard(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<uint32_t> {
OpResult<PrimeIterator> find_res = shard->db_slice().Find(t->db_context(), key, OBJ_SET);
OpResult<PrimeIterator> find_res = shard->db_slice().Find(t->GetDbContext(), key, OBJ_SET);
if (!find_res) {
return find_res.status();
}
@ -1223,7 +1224,7 @@ void SDiff(CmdArgList args, ConnectionContext* cntx) {
ShardId src_shard = Shard(src_key, result_set.size());
auto cb = [&](Transaction* t, EngineShard* shard) {
ArgSlice largs = t->ShardArgsInShard(shard->shard_id());
ArgSlice largs = t->GetShardArgs(shard->shard_id());
if (shard->shard_id() == src_shard) {
CHECK_EQ(src_key, largs.front());
result_set[shard->shard_id()] = OpDiff(t->GetOpArgs(shard), largs);
@ -1259,7 +1260,7 @@ void SDiffStore(CmdArgList args, ConnectionContext* cntx) {
// read-only op
auto diff_cb = [&](Transaction* t, EngineShard* shard) {
ArgSlice largs = t->ShardArgsInShard(shard->shard_id());
ArgSlice largs = t->GetShardArgs(shard->shard_id());
DCHECK(!largs.empty());
if (shard->shard_id() == dest_shard) {
@ -1329,7 +1330,7 @@ void SInter(CmdArgList args, ConnectionContext* cntx) {
};
cntx->transaction->ScheduleSingleHop(std::move(cb));
OpResult<SvArray> result = InterResultVec(result_set, cntx->transaction->unique_shard_cnt());
OpResult<SvArray> result = InterResultVec(result_set, cntx->transaction->GetUniqueShardCnt());
if (result) {
SvArray arr = std::move(*result);
if (cntx->conn_state.script_info) { // sort under script
@ -1348,7 +1349,7 @@ void SInterStore(CmdArgList args, ConnectionContext* cntx) {
atomic_uint32_t inter_shard_cnt{0};
auto inter_cb = [&](Transaction* t, EngineShard* shard) {
ArgSlice largs = t->ShardArgsInShard(shard->shard_id());
ArgSlice largs = t->GetShardArgs(shard->shard_id());
if (shard->shard_id() == dest_shard) {
CHECK_EQ(largs.front(), dest_key);
if (largs.size() == 1)
@ -1385,7 +1386,7 @@ void SUnion(CmdArgList args, ConnectionContext* cntx) {
ResultStringVec result_set(shard_set->size());
auto cb = [&](Transaction* t, EngineShard* shard) {
ArgSlice largs = t->ShardArgsInShard(shard->shard_id());
ArgSlice largs = t->GetShardArgs(shard->shard_id());
result_set[shard->shard_id()] = OpUnion(t->GetOpArgs(shard), largs);
return OpStatus::OK;
};
@ -1410,7 +1411,7 @@ void SUnionStore(CmdArgList args, ConnectionContext* cntx) {
ShardId dest_shard = Shard(dest_key, result_set.size());
auto union_cb = [&](Transaction* t, EngineShard* shard) {
ArgSlice largs = t->ShardArgsInShard(shard->shard_id());
ArgSlice largs = t->GetShardArgs(shard->shard_id());
if (shard->shard_id() == dest_shard) {
CHECK_EQ(largs.front(), dest_key);
largs.remove_prefix(1);

View file

@ -935,7 +935,7 @@ void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) {
continue;
MGetResponse& results = mget_resp[sid];
ArgSlice slice = transaction->ShardArgsInShard(sid);
ArgSlice slice = transaction->GetShardArgs(sid);
DCHECK(!slice.empty());
DCHECK_EQ(slice.size(), results.size());
@ -966,11 +966,11 @@ void StringFamily::MSet(CmdArgList args, ConnectionContext* cntx) {
for (size_t i = 1; i < args.size(); ++i) {
absl::StrAppend(&str, " ", ArgS(args, i));
}
LOG(INFO) << "MSET/" << transaction->unique_shard_cnt() << str;
LOG(INFO) << "MSET/" << transaction->GetUniqueShardCnt() << str;
}
auto cb = [&](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
auto args = t->GetShardArgs(shard->shard_id());
return OpMSet(t->GetOpArgs(shard), args);
};
@ -990,9 +990,9 @@ void StringFamily::MSetNx(CmdArgList args, ConnectionContext* cntx) {
atomic_bool exists{false};
auto cb = [&](Transaction* t, EngineShard* es) {
auto args = t->ShardArgsInShard(es->shard_id());
auto args = t->GetShardArgs(es->shard_id());
for (size_t i = 0; i < args.size(); i += 2) {
auto it = es->db_slice().FindExt(t->db_context(), args[i]).first;
auto it = es->db_slice().FindExt(t->GetDbContext(), args[i]).first;
if (IsValid(it)) {
exists.store(true, memory_order_relaxed);
break;
@ -1009,7 +1009,7 @@ void StringFamily::MSetNx(CmdArgList args, ConnectionContext* cntx) {
if (to_skip)
return OpStatus::OK;
auto args = t->ShardArgsInShard(shard->shard_id());
auto args = t->GetShardArgs(shard->shard_id());
return OpMSet(t->GetOpArgs(shard), std::move(args));
};
@ -1022,7 +1022,7 @@ void StringFamily::StrLen(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<size_t> {
OpResult<PrimeIterator> it_res = shard->db_slice().Find(t->db_context(), key, OBJ_STRING);
OpResult<PrimeIterator> it_res = shard->db_slice().Find(t->GetDbContext(), key, OBJ_STRING);
if (!it_res.ok())
return it_res.status();
@ -1102,14 +1102,14 @@ void StringFamily::PSetEx(CmdArgList args, ConnectionContext* cntx) {
auto StringFamily::OpMGet(bool fetch_mcflag, bool fetch_mcver, const Transaction* t,
EngineShard* shard) -> MGetResponse {
auto args = t->ShardArgsInShard(shard->shard_id());
auto args = t->GetShardArgs(shard->shard_id());
DCHECK(!args.empty());
MGetResponse response(args.size());
auto& db_slice = shard->db_slice();
for (size_t i = 0; i < args.size(); ++i) {
OpResult<PrimeIterator> it_res = db_slice.Find(t->db_context(), args[i], OBJ_STRING);
OpResult<PrimeIterator> it_res = db_slice.Find(t->GetDbContext(), args[i], OBJ_STRING);
if (!it_res)
continue;
@ -1118,7 +1118,7 @@ auto StringFamily::OpMGet(bool fetch_mcflag, bool fetch_mcver, const Transaction
dest.value = GetString(shard, it->second);
if (fetch_mcflag) {
dest.mc_flag = db_slice.GetMCFlag(t->db_index(), it->first);
dest.mc_flag = db_slice.GetMCFlag(t->GetDbIndex(), it->first);
if (fetch_mcver) {
dest.mc_ver = it.GetVersion();
}

View file

@ -44,7 +44,7 @@ IntentLock::Mode Transaction::Mode() const {
Transaction::Transaction(const CommandId* cid) : cid_(cid) {
string_view cmd_name(cid_->name());
if (cmd_name == "EXEC" || cmd_name == "EVAL" || cmd_name == "EVALSHA") {
multi_.reset(new Multi);
multi_.reset(new MultiData);
multi_->multi_opts = cid->opt_mask();
if (cmd_name == "EVAL" || cmd_name == "EVALSHA") {
@ -183,7 +183,7 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) {
if (multi_->is_expanding) {
multi_->keys.push_back(key);
} else {
multi_->locks[key].cnt[int(mode)]++;
multi_->lock_counts[key][mode]++;
}
};
@ -376,7 +376,7 @@ bool Transaction::RunInShard(EngineShard* shard) {
LogAutoJournalOnShard(shard);
// at least the coordinator thread owns the reference.
DCHECK_GE(use_count(), 1u);
DCHECK_GE(GetUseCount(), 1u);
// we remove tx from tx-queue upon first invocation.
// if it needs to run again it runs via a dedicated continuation_trans_ state in EngineShard.
@ -533,14 +533,13 @@ void Transaction::ScheduleInternal() {
}
}
void Transaction::LockMulti() {
DCHECK(multi_ && multi_->is_expanding);
void Transaction::MultiData::AddLocks(IntentLock::Mode mode) {
DCHECK(is_expanding);
IntentLock::Mode mode = Mode();
for (auto key : multi_->keys) {
multi_->locks[key].cnt[int(mode)]++;
for (auto key : keys) {
lock_counts[key][mode]++;
}
multi_->keys.clear();
keys.clear();
}
// Optimized "Schedule and execute" function for the most common use-case of a single hop
@ -593,7 +592,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
// Note that the logic here is a bit different from the public Schedule() function.
if (multi_) {
if (multi_->is_expanding)
LockMulti();
multi_->AddLocks(Mode());
} else {
ScheduleInternal();
}
@ -619,15 +618,15 @@ void Transaction::UnlockMulti() {
vector<KeyList> sharded_keys(shard_set->size());
// It's LE and not EQ because there may be callbacks in progress that increase use_count_.
DCHECK_LE(1u, use_count());
DCHECK_LE(1u, GetUseCount());
for (const auto& k_v : multi_->locks) {
for (const auto& k_v : multi_->lock_counts) {
ShardId sid = Shard(k_v.first, sharded_keys.size());
sharded_keys[sid].push_back(k_v);
}
if (ServerState::tlocal()->journal()) {
SetMultiUniqueShardCount();
CalculateUnqiueShardCntForMulti();
}
uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed);
@ -637,12 +636,12 @@ void Transaction::UnlockMulti() {
shard_set->Add(i, [&] { UnlockMultiShardCb(sharded_keys, EngineShard::tlocal()); });
}
WaitForShardCallbacks();
DCHECK_GE(use_count(), 1u);
DCHECK_GE(GetUseCount(), 1u);
VLOG(1) << "UnlockMultiEnd " << DebugId();
}
void Transaction::SetMultiUniqueShardCount() {
void Transaction::CalculateUnqiueShardCntForMulti() {
uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed);
DCHECK_EQ(prev, 0u);
@ -671,7 +670,7 @@ void Transaction::SetMultiUniqueShardCount() {
void Transaction::Schedule() {
if (multi_ && multi_->is_expanding) {
LockMulti();
multi_->AddLocks(Mode());
} else {
ScheduleInternal();
}
@ -809,7 +808,7 @@ void Transaction::RunQuickie(EngineShard* shard) {
// runs in coordinator thread.
// Marks the transaction as expired and removes it from the waiting queue.
void Transaction::UnwatchBlocking(bool should_expire, WaitKeysPovider wcb) {
void Transaction::UnwatchBlocking(bool should_expire, WaitKeysProvider wcb) {
DVLOG(1) << "UnwatchBlocking " << DebugId();
DCHECK(!IsGlobal());
@ -849,7 +848,7 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const {
KeyLockArgs res;
res.db_index = db_index_;
res.key_step = cid_->key_arg_step();
res.args = ShardArgsInShard(sid);
res.args = GetShardArgs(sid);
return res;
}
@ -987,7 +986,7 @@ bool Transaction::CancelShardCb(EngineShard* shard) {
}
// runs in engine-shard thread.
ArgSlice Transaction::ShardArgsInShard(ShardId sid) const {
ArgSlice Transaction::GetShardArgs(ShardId sid) const {
DCHECK(!args_.empty());
// We can read unique_shard_cnt_ only because ShardArgsInShard is called after IsArmedInShard
@ -1010,9 +1009,9 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const {
return reverse_index_[sd.arg_start + arg_index];
}
bool Transaction::WaitOnWatch(const time_point& tp, WaitKeysPovider wkeys_provider) {
bool Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider) {
// Assumes that transaction is pending and scheduled. TODO: To verify it with state machine.
VLOG(2) << "WaitOnWatch Start use_count(" << use_count() << ")";
VLOG(2) << "WaitOnWatch Start use_count(" << GetUseCount() << ")";
using namespace chrono;
auto cb = [&](Transaction* t, EngineShard* shard) {
@ -1103,8 +1102,8 @@ void Transaction::UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, E
ShardId sid = shard->shard_id();
for (const auto& k_v : sharded_keys[sid]) {
auto release = [&](IntentLock::Mode mode) {
if (k_v.second.cnt[mode]) {
shard->db_slice().Release(mode, db_index_, k_v.first, k_v.second.cnt[mode]);
if (k_v.second[mode]) {
shard->db_slice().Release(mode, db_index_, k_v.first, k_v.second[mode]);
}
};
@ -1236,7 +1235,7 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) {
entry_payload = cmd_with_full_args_;
} else {
auto cmd = facade::ToSV(cmd_with_full_args_.front());
entry_payload = make_pair(cmd, ShardArgsInShard(shard->shard_id()));
entry_payload = make_pair(cmd, GetShardArgs(shard->shard_id()));
}
LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_);

View file

@ -28,15 +28,23 @@ class BlockingController;
using facade::OpResult;
using facade::OpStatus;
// Central building block of the transactional framework.
//
// Use it to run callbacks on the shard threads - such dispatches are called hops.
// The shards to run on are determined by the keys of the underlying command.
// Global transactions run on all shards.
//
// Use ScheduleSingleHop() if only a single hop is needed.
// Otherwise, schedule the transaction with Schedule() and run successive hops
// with Execute().
class Transaction {
friend class BlockingController;
Transaction(const Transaction&);
void operator=(const Transaction&) = delete;
~Transaction();
~Transaction(); // Transactions are reference counted with intrusive_ptr.
// Transactions are reference counted.
friend void intrusive_ptr_add_ref(Transaction* trans) noexcept {
trans->use_count_.fetch_add(1, std::memory_order_relaxed);
}
@ -49,33 +57,84 @@ class Transaction {
}
public:
using RunnableType = std::function<OpStatus(Transaction* t, EngineShard*)>;
using time_point = ::std::chrono::steady_clock::time_point;
// Runnable that is run on shards during hop executions (often named callback).
using RunnableType = std::function<OpStatus(Transaction* t, EngineShard*)>;
// Provides keys to block on for specific shard.
using WaitKeysProvider = std::function<ArgSlice(Transaction*, EngineShard* shard)>;
// State on specific shard.
enum LocalMask : uint16_t {
ARMED = 1, // Transaction was armed with the callback
OUT_OF_ORDER = 2,
KEYLOCK_ACQUIRED = 4,
SUSPENDED_Q = 0x10, // added by the coordination flow (via WaitBlocked()).
AWAKED_Q = 0x20, // awaked by condition (lpush etc)
EXPIRED_Q = 0x40, // timed-out and should be garbage collected from the blocking queue.
ARMED = 1, // Whether callback cb_ is set
OUT_OF_ORDER = 1 << 1, // Whether its running out of order
KEYLOCK_ACQUIRED = 1 << 2, // Whether its key locks are acquired
SUSPENDED_Q = 1 << 3, // Whether is suspened (by WatchInShard())
AWAKED_Q = 1 << 4, // Whether it was awakened (by NotifySuspended())
EXPIRED_Q = 1 << 5, // Whether it timed out and should be dropped
};
public:
explicit Transaction(const CommandId* cid);
// Initialize from command (args) on specific db.
OpStatus InitByArgs(DbIndex index, CmdArgList args);
void SetExecCmd(const CommandId* cid);
// Get command arguments for specific shard. Called from shard thread.
ArgSlice GetShardArgs(ShardId sid) const;
std::string DebugId() const;
// Runs in engine thread
ArgSlice ShardArgsInShard(ShardId sid) const;
// Maps the index in ShardArgsInShard(shard_id) slice back to the index
// in the original array passed to InitByArgs.
// Map arg_index from GetShardArgs slice to index in original command slice from InitByArgs.
size_t ReverseArgIndex(ShardId shard_id, size_t arg_index) const;
// Schedule transaction.
// Usually used for multi hop transactions like RENAME or BLPOP.
// For single hop transactions use ScheduleSingleHop instead.
void Schedule();
// Execute transaction hop. If conclude is true, it is removed from the pending queue.
void Execute(RunnableType cb, bool conclude);
// Execute single hop and conclude.
// Callback should return OK for multi key invocations, otherwise return value is ill-defined.
OpStatus ScheduleSingleHop(RunnableType cb);
// Execute single hop with return value and conclude.
// Can be used only for single key invocations, because it writes a into shared variable.
template <typename F> auto ScheduleSingleHopT(F&& f) -> decltype(f(this, nullptr));
// Called by EngineShard when performing Execute over the tx queue.
// Returns true if transaction should be kept in the queue.
bool RunInShard(EngineShard* shard);
// Registers transaction into watched queue and blocks until a) either notification is received.
// or b) tp is reached. If tp is time_point::max() then waits indefinitely.
// Expects that the transaction had been scheduled before, and uses Execute(.., true) to register.
// Returns false if timeout occurred, true if was notified by one of the keys.
bool WaitOnWatch(const time_point& tp, WaitKeysProvider cb);
// Returns true if transaction is awaked, false if it's timed-out and can be removed from the
// blocking queue. NotifySuspended may be called from (multiple) shard threads and
// with each call potentially improving the minimal wake_txid at which
// this transaction has been awaked.
bool NotifySuspended(TxId committed_ts, ShardId sid);
// Cancel all blocking watches on shutdown. Set COORD_CANCELLED.
void BreakOnShutdown();
// Log a journal entry on shard with payload and shard count.
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload,
uint32_t shard_cnt) const;
// Unlock key locks of a multi transaction.
void UnlockMulti();
// Reset CommandId to be executed when executing MULTI/EXEC or from script.
void SetExecCmd(const CommandId* cid);
// Returns locking arguments needed for DbSlice to Acquire/Release transactional locks.
// Runs in the shard thread.
KeyLockArgs GetLockArgs(ShardId sid) const;
public:
//! Returns true if the transaction spans this shard_id.
//! Runs from the coordinator thread.
bool IsActive(ShardId shard_id) const {
@ -100,51 +159,19 @@ class Transaction {
return shard_data_[SidToId(sid)].local_mask;
}
// Schedules a transaction. Usually used for multi-hop transactions like Rename or BLPOP.
// For single hop, use ScheduleSingleHop instead.
void Schedule();
// if conclude is true, removes the transaction from the pending queue.
void Execute(RunnableType cb, bool conclude);
// for multi-key scenarios cb should return Status::Ok since otherwise the return value
// will be ill-defined.
OpStatus ScheduleSingleHop(RunnableType cb);
// Fits only for single key scenarios because it writes into shared variable res from
// potentially multiple threads.
template <typename F> auto ScheduleSingleHopT(F&& f) -> decltype(f(this, nullptr)) {
decltype(f(this, nullptr)) res;
ScheduleSingleHop([&res, f = std::forward<F>(f)](Transaction* t, EngineShard* shard) {
res = f(t, shard);
return res.status();
});
return res;
}
void UnlockMulti();
// In multi transaciton command we calculate the unique shard count of the trasaction
// after all transaciton commands where executed, by checking the last txid writen to
// all journals.
// This value is writen to journal so that replica we be able to apply the multi command
// atomicaly.
void SetMultiUniqueShardCount();
TxId txid() const {
return txid_;
}
// based on cid_->opt_mask.
IntentLock::Mode Mode() const;
IntentLock::Mode Mode() const; // Based on command mask
const char* Name() const;
const char* Name() const; // Based on command name
uint32_t unique_shard_cnt() const {
uint32_t GetUniqueShardCnt() const {
return unique_shard_cnt_;
}
TxId notify_txid() const {
TxId GetNotifyTxid() const {
return notify_txid_.load(std::memory_order_relaxed);
}
@ -158,103 +185,43 @@ class Transaction {
return coordinator_state_ & COORD_OOO;
}
// Registers transaction into watched queue and blocks until a) either notification is received.
// or b) tp is reached. If tp is time_point::max() then waits indefinitely.
// Expects that the transaction had been scheduled before, and uses Execute(.., true) to register.
// Returns false if timeout occurred, true if was notified by one of the keys.
using WaitKeysPovider = std::function<ArgSlice(Transaction*, EngineShard* shard)>;
bool WaitOnWatch(const time_point& tp, WaitKeysPovider cb);
// Returns true if transaction is awaked, false if it's timed-out and can be removed from the
// blocking queue. NotifySuspended may be called from (multiple) shard threads and
// with each call potentially improving the minimal wake_txid at which
// this transaction has been awaked.
bool NotifySuspended(TxId committed_ts, ShardId sid);
void BreakOnShutdown();
// Called by EngineShard when performing Execute over the tx queue.
// Returns true if transaction should be kept in the queue.
bool RunInShard(EngineShard* shard);
//! Returns locking arguments needed for DbSlice to Acquire/Release transactional locks.
//! Runs in the shard thread.
KeyLockArgs GetLockArgs(ShardId sid) const;
OpArgs GetOpArgs(EngineShard* shard) const {
return OpArgs{shard, this, db_context()};
return OpArgs{shard, this, GetDbContext()};
}
DbContext db_context() const {
DbContext GetDbContext() const {
return DbContext{.db_index = db_index_, .time_now_ms = time_now_ms_};
}
DbIndex db_index() const {
DbIndex GetDbIndex() const {
return db_index_;
}
// Log a journal entry on shard with payload.
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload,
uint32_t shard_cnt) const;
std::string DebugId() const;
private:
// Holds number of locks for each IntentLock::Mode: shared and exlusive.
struct LockCnt {
unsigned& operator[](IntentLock::Mode mode) {
return cnt[int(mode)];
}
unsigned operator[](IntentLock::Mode mode) const {
return cnt[int(mode)];
}
private:
unsigned cnt[2] = {0, 0};
};
using KeyList = std::vector<std::pair<std::string_view, LockCnt>>;
unsigned SidToId(ShardId sid) const {
return sid < shard_data_.size() ? sid : 0;
}
void ScheduleInternal();
void LockMulti();
void UnwatchBlocking(bool should_expire, WaitKeysPovider wcb);
void ExecuteAsync();
// Optimized version of RunInShard for single shard uncontended cases.
void RunQuickie(EngineShard* shard);
//! Returns true if transaction run out-of-order during the scheduling phase.
bool ScheduleUniqueShard(EngineShard* shard);
/// Returns pair(schedule_success, lock_granted)
/// schedule_success is true if transaction was scheduled on db_slice.
/// lock_granted is true if lock was granted for all the keys on this shard.
/// Runs in the shard thread.
std::pair<bool, bool> ScheduleInShard(EngineShard* shard);
// Returns true if we need to follow up with PollExecution on this shard.
bool CancelShardCb(EngineShard* shard);
void UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard* shard);
void UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, EngineShard* shard);
// Adds itself to watched queue in the shard. Must run in that shard thread.
OpStatus WatchInShard(ArgSlice keys, EngineShard* shard);
void WaitForShardCallbacks() {
run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); });
// store operations below can not be ordered above the fence
std::atomic_thread_fence(std::memory_order_release);
seqlock_.fetch_add(1, std::memory_order_relaxed);
}
// Returns the previous value of run count.
uint32_t DecreaseRunCnt();
uint32_t use_count() const {
return use_count_.load(std::memory_order_relaxed);
}
// Log command in the journal of a shard for write commands with auto-journaling enabled.
// Should be called immediately after the last phase (hop).
void LogAutoJournalOnShard(EngineShard* shard);
struct PerShardData {
PerShardData(PerShardData&&) noexcept {
}
PerShardData() = default;
uint32_t arg_start = 0; // Indices into args_ array.
uint16_t arg_count = 0;
@ -265,17 +232,14 @@ class Transaction {
// Needed to rollback inconsistent schedulings or remove OOO transactions from
// tx queue.
uint32_t pq_pos = TxQueue::kEnd;
PerShardData(PerShardData&&) noexcept {
}
PerShardData() = default;
};
enum { kPerShardSize = sizeof(PerShardData) };
// State of a multi transaction.
struct MultiData {
// Increase lock counts for all current keys for mode. Clear keys.
void AddLocks(IntentLock::Mode mode);
struct Multi {
absl::flat_hash_map<std::string_view, LockCnt> locks;
absl::flat_hash_map<std::string_view, LockCnt> lock_counts;
std::vector<std::string_view> keys;
uint32_t multi_opts = 0; // options of the parent transaction.
@ -285,9 +249,77 @@ class Transaction {
bool locks_recorded = false;
};
util::fibers_ext::EventCount blocking_ec_; // used to wake blocking transactions.
util::fibers_ext::EventCount run_ec_;
enum CoordinatorState : uint8_t {
COORD_SCHED = 1,
COORD_EXEC = 2,
// We are running the last execution step in multi-hop operation.
COORD_EXEC_CONCLUDING = 4,
COORD_BLOCKED = 8,
COORD_CANCELLED = 0x10,
COORD_OOO = 0x20,
};
private:
// Generic schedule used from Schedule() and ScheduleSingleHop() on slow path.
void ScheduleInternal();
// Schedule if only one shard is active.
// Returns true if transaction ran out-of-order during the scheduling phase.
bool ScheduleUniqueShard(EngineShard* shard);
// Schedule on shards transaction queue.
// Returns pair(schedule_success, lock_granted)
// schedule_success is true if transaction was scheduled on db_slice.
// lock_granted is true if lock was granted for all the keys on this shard.
// Runs in the shard thread.
std::pair<bool, bool> ScheduleInShard(EngineShard* shard);
// Optimized version of RunInShard for single shard uncontended cases.
void RunQuickie(EngineShard* shard);
void ExecuteAsync();
// Adds itself to watched queue in the shard. Must run in that shard thread.
OpStatus WatchInShard(ArgSlice keys, EngineShard* shard);
void UnwatchBlocking(bool should_expire, WaitKeysProvider wcb);
// Returns true if we need to follow up with PollExecution on this shard.
bool CancelShardCb(EngineShard* shard);
void UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard* shard);
void UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, EngineShard* shard);
// Calculate number of unqiue shards for multi transaction after alll commands were executed.
// This value is used in stable state replication to allow applying the command atomically.
void CalculateUnqiueShardCntForMulti();
void WaitForShardCallbacks() {
run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); });
// store operations below can not be ordered above the fence
std::atomic_thread_fence(std::memory_order_release);
seqlock_.fetch_add(1, std::memory_order_relaxed);
}
// Log command in shard's journal, if this is a write command with auto-journaling enabled.
// Should be called immediately after the last phase (hop).
void LogAutoJournalOnShard(EngineShard* shard);
// Returns the previous value of run count.
uint32_t DecreaseRunCnt();
uint32_t GetUseCount() const {
return use_count_.load(std::memory_order_relaxed);
}
unsigned SidToId(ShardId sid) const {
return sid < shard_data_.size() ? sid : 0;
}
private:
// shard_data spans all the shards in ess_.
// I wish we could use a dense array of size [0..uniq_shards] but since
// multiple threads access this array to synchronize between themselves using
@ -301,39 +333,25 @@ class Transaction {
// Stores the full undivided command.
CmdArgList cmd_with_full_args_;
// Reverse argument mapping. Allows to reconstruct responses according to the original order of
// keys.
// Reverse argument mapping for ReverseArgIndex to convert from shard index to original index.
std::vector<uint32_t> reverse_index_;
RunnableType cb_;
std::unique_ptr<Multi> multi_; // Initialized when the transaction is multi/exec.
const CommandId* cid_;
RunnableType cb_; // Run on shard threads
const CommandId* cid_; // Underlying command
std::unique_ptr<MultiData> multi_; // Initialized when the transaction is multi/exec.
TxId txid_{0};
DbIndex db_index_{0};
uint64_t time_now_ms_{0};
std::atomic<TxId> notify_txid_{kuint64max};
std::atomic_uint32_t use_count_{0}, run_count_{0}, seqlock_{0};
// unique_shard_cnt_ and unique_shard_id_ is accessed only by coordinator thread.
// unique_shard_cnt_ and unique_shard_id_ are accessed only by coordinator thread.
uint32_t unique_shard_cnt_{0}; // number of unique shards span by args_
ShardId unique_shard_id_{kInvalidSid};
DbIndex db_index_;
// Used for single-hop transactions with unique_shards_ == 1, hence no data-race.
OpStatus local_result_ = OpStatus::OK;
enum CoordinatorState : uint8_t {
COORD_SCHED = 1,
COORD_EXEC = 2,
// We are running the last execution step in multi-hop operation.
COORD_EXEC_CONCLUDING = 4,
COORD_BLOCKED = 8,
COORD_CANCELLED = 0x10,
COORD_OOO = 0x20,
};
util::fibers_ext::EventCount blocking_ec_; // Used to wake blocking transactions.
util::fibers_ext::EventCount run_ec_; // Used to wait for shard callbacks
// Transaction coordinator state, written and read by coordinator thread.
// Can be read by shard threads as long as we respect ordering rules, i.e. when
@ -341,6 +359,10 @@ class Transaction {
// If COORDINATOR_XXX has been set, it means we passed or crossed stage XXX.
uint8_t coordinator_state_ = 0;
// Used for single-hop transactions with unique_shards_ == 1, hence no data-race.
OpStatus local_result_ = OpStatus::OK;
private:
struct PerShardCache {
std::vector<std::string_view> args;
std::vector<uint32_t> original_index;
@ -359,6 +381,16 @@ class Transaction {
static thread_local TLTmpSpace tmp_space;
};
template <typename F> auto Transaction::ScheduleSingleHopT(F&& f) -> decltype(f(this, nullptr)) {
decltype(f(this, nullptr)) res;
ScheduleSingleHop([&res, f = std::forward<F>(f)](Transaction* t, EngineShard* shard) {
res = f(t, shard);
return res.status();
});
return res;
}
inline uint16_t trans_id(const Transaction* ptr) {
return (intptr_t(ptr) >> 8) & 0xFFFF;
}

View file

@ -723,7 +723,7 @@ OpResult<KeyIterWeightVec> FindShardKeysAndWeights(EngineShard* shard, Transacti
auto& db_slice = shard->db_slice();
KeyIterWeightVec key_weight_vec(keys.size() - src_keys_offset);
for (unsigned j = src_keys_offset; j < keys.size(); ++j) {
auto it_res = db_slice.Find(t->db_context(), keys[j], OBJ_ZSET);
auto it_res = db_slice.Find(t->GetDbContext(), keys[j], OBJ_ZSET);
if (it_res == OpStatus::WRONG_TYPE) // TODO: support sets with default score 1.
return it_res.status();
if (!it_res)
@ -738,7 +738,7 @@ OpResult<KeyIterWeightVec> FindShardKeysAndWeights(EngineShard* shard, Transacti
OpResult<ScoredMap> OpUnion(EngineShard* shard, Transaction* t, string_view dest, AggType agg_type,
const vector<double>& weights, bool store) {
ArgSlice keys = t->ShardArgsInShard(shard->shard_id());
ArgSlice keys = t->GetShardArgs(shard->shard_id());
DVLOG(1) << "shard:" << shard->shard_id() << ", keys " << vector(keys.begin(), keys.end());
DCHECK(!keys.empty());
@ -769,7 +769,7 @@ OpResult<ScoredMap> OpUnion(EngineShard* shard, Transaction* t, string_view dest
OpResult<ScoredMap> OpInter(EngineShard* shard, Transaction* t, string_view dest, AggType agg_type,
const vector<double>& weights, bool store) {
ArgSlice keys = t->ShardArgsInShard(shard->shard_id());
ArgSlice keys = t->GetShardArgs(shard->shard_id());
DVLOG(1) << "shard:" << shard->shard_id() << ", keys " << vector(keys.begin(), keys.end());
DCHECK(!keys.empty());
@ -785,7 +785,7 @@ OpResult<ScoredMap> OpInter(EngineShard* shard, Transaction* t, string_view dest
return OpStatus::SKIPPED; // return noop
for (unsigned j = start; j < keys.size(); ++j) {
auto it_res = db_slice.Find(t->db_context(), keys[j], OBJ_ZSET);
auto it_res = db_slice.Find(t->GetDbContext(), keys[j], OBJ_ZSET);
if (it_res == OpStatus::WRONG_TYPE) // TODO: support sets with default score 1.
return it_res.status();
@ -1177,7 +1177,7 @@ void ZSetFamily::ZCard(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<uint32_t> {
OpResult<PrimeIterator> find_res = shard->db_slice().Find(t->db_context(), key, OBJ_ZSET);
OpResult<PrimeIterator> find_res = shard->db_slice().Find(t->GetDbContext(), key, OBJ_ZSET);
if (!find_res) {
return find_res.status();
}