diff --git a/src/server/bitops_family.cc b/src/server/bitops_family.cc index 58ba8cd7b..517184797 100644 --- a/src/server/bitops_family.cc +++ b/src/server/bitops_family.cc @@ -543,7 +543,7 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) { ShardId dest_shard = Shard(dest_key, result_set.size()); auto shard_bitop = [&](Transaction* t, EngineShard* shard) { - ArgSlice largs = t->ShardArgsInShard(shard->shard_id()); + ArgSlice largs = t->GetShardArgs(shard->shard_id()); DCHECK(!largs.empty()); if (shard->shard_id() == dest_shard) { diff --git a/src/server/blocking_controller.cc b/src/server/blocking_controller.cc index 97b19361a..213c587a3 100644 --- a/src/server/blocking_controller.cc +++ b/src/server/blocking_controller.cc @@ -90,7 +90,7 @@ void BlockingController::RunStep(Transaction* completed_t) { if (completed_t) { awakened_transactions_.erase(completed_t); - auto dbit = watched_dbs_.find(completed_t->db_index()); + auto dbit = watched_dbs_.find(completed_t->GetDbIndex()); if (dbit != watched_dbs_.end()) { DbWatchTable& wt = *dbit->second; @@ -100,7 +100,7 @@ void BlockingController::RunStep(Transaction* completed_t) { for (size_t i = 0; i < lock_args.args.size(); i += lock_args.key_step) { string_view key = lock_args.args[i]; if (wt.AddAwakeEvent(WatchQueue::ACTIVE, key)) { - awakened_indices_.emplace(completed_t->db_index()); + awakened_indices_.emplace(completed_t->GetDbIndex()); } } } @@ -139,7 +139,7 @@ void BlockingController::RunStep(Transaction* completed_t) { void BlockingController::AddWatched(ArgSlice keys, Transaction* trans) { VLOG(1) << "AddWatched [" << owner_->shard_id() << "] " << trans->DebugId(); - auto [dbit, added] = watched_dbs_.emplace(trans->db_index(), nullptr); + auto [dbit, added] = watched_dbs_.emplace(trans->GetDbIndex(), nullptr); if (added) { dbit->second.reset(new DbWatchTable); } @@ -154,7 +154,7 @@ void BlockingController::AddWatched(ArgSlice keys, Transaction* trans) { if (!res->second->items.empty()) { Transaction* last = res->second->items.back().get(); - DCHECK_GT(last->use_count(), 0u); + DCHECK_GT(last->GetUseCount(), 0u); // Duplicate keys case. We push only once per key. if (last == trans) @@ -169,7 +169,7 @@ void BlockingController::AddWatched(ArgSlice keys, Transaction* trans) { void BlockingController::RemoveWatched(ArgSlice keys, Transaction* trans) { VLOG(1) << "RemoveWatched [" << owner_->shard_id() << "] " << trans->DebugId(); - auto dbit = watched_dbs_.find(trans->db_index()); + auto dbit = watched_dbs_.find(trans->GetDbIndex()); if (dbit == watched_dbs_.end()) return; diff --git a/src/server/blocking_controller_test.cc b/src/server/blocking_controller_test.cc index a51733df0..cae414b4c 100644 --- a/src/server/blocking_controller_test.cc +++ b/src/server/blocking_controller_test.cc @@ -68,7 +68,7 @@ TEST_F(BlockingControllerTest, Basic) { shard_set->Await(0, [&] { EngineShard* shard = EngineShard::tlocal(); BlockingController bc(shard); - auto keys = trans_->ShardArgsInShard(shard->shard_id()); + auto keys = trans_->GetShardArgs(shard->shard_id()); bc.AddWatched(keys, trans_.get()); EXPECT_EQ(1, bc.NumWatched(0)); @@ -81,7 +81,7 @@ TEST_F(BlockingControllerTest, Timeout) { time_point tp = steady_clock::now() + chrono::milliseconds(10); trans_->Schedule(); - auto cb = [&](Transaction* t, EngineShard* shard) { return trans_->ShardArgsInShard(0); }; + auto cb = [&](Transaction* t, EngineShard* shard) { return trans_->GetShardArgs(0); }; bool res = trans_->WaitOnWatch(tp, cb); diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index effa8fb79..05d62b309 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -294,7 +294,7 @@ void EngineShard::PollExecution(const char* context, Transaction* trans) { DCHECK(continuation_trans_ == nullptr) << continuation_trans_->DebugId() << " when polling " << trans->DebugId(); - CHECK_EQ(committed_txid_, trans->notify_txid()); + CHECK_EQ(committed_txid_, trans->GetNotifyTxid()); bool keep = trans->RunInShard(this); if (keep) { return; diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 66d5c620b..7e83d3ffb 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -307,14 +307,14 @@ class Renamer { void Renamer::Find(Transaction* t) { auto cb = [this](Transaction* t, EngineShard* shard) { - auto args = t->ShardArgsInShard(shard->shard_id()); + auto args = t->GetShardArgs(shard->shard_id()); CHECK_EQ(1u, args.size()); FindResult* res = (shard->shard_id() == src_sid_) ? &src_res_ : &dest_res_; res->key = args.front(); auto& db_slice = EngineShard::tlocal()->db_slice(); - auto [it, exp_it] = db_slice.FindExt(t->db_context(), res->key); + auto [it, exp_it] = db_slice.FindExt(t->GetDbContext(), res->key); res->found = IsValid(it); if (IsValid(it)) { @@ -357,7 +357,7 @@ void Renamer::Finalize(Transaction* t, bool skip_exist_dest) { OpStatus Renamer::MoveSrc(Transaction* t, EngineShard* es) { if (es->shard_id() == src_sid_) { // Handle source key. // TODO: to call PreUpdate/PostUpdate. - auto it = es->db_slice().FindExt(t->db_context(), src_res_.key).first; + auto it = es->db_slice().FindExt(t->GetDbContext(), src_res_.key).first; CHECK(IsValid(it)); // We distinguish because of the SmallString that is pinned to its thread by design, @@ -370,7 +370,7 @@ OpStatus Renamer::MoveSrc(Transaction* t, EngineShard* es) { pv_ = std::move(it->second); it->second.SetExpire(has_expire); } - CHECK(es->db_slice().Del(t->db_index(), it)); // delete the entry with empty value in it. + CHECK(es->db_slice().Del(t->GetDbIndex(), it)); // delete the entry with empty value in it. } return OpStatus::OK; @@ -380,7 +380,7 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) { if (es->shard_id() != src_sid_) { auto& db_slice = es->db_slice(); string_view dest_key = dest_res_.key; - PrimeIterator dest_it = db_slice.FindExt(t->db_context(), dest_key).first; + PrimeIterator dest_it = db_slice.FindExt(t->GetDbContext(), dest_key).first; bool is_prior_list = false; if (IsValid(dest_it)) { @@ -393,18 +393,18 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) { dest_it->second = std::move(pv_); } dest_it->second.SetExpire(has_expire); // preserve expire flag. - db_slice.UpdateExpire(t->db_index(), dest_it, src_res_.expire_ts); + db_slice.UpdateExpire(t->GetDbIndex(), dest_it, src_res_.expire_ts); } else { if (src_res_.ref_val.ObjType() == OBJ_STRING) { pv_.SetString(str_val_); } - dest_it = db_slice.AddNew(t->db_context(), dest_key, std::move(pv_), src_res_.expire_ts); + dest_it = db_slice.AddNew(t->GetDbContext(), dest_key, std::move(pv_), src_res_.expire_ts); } dest_it->first.SetSticky(src_res_.sticky); if (!is_prior_list && dest_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) { - es->blocking_controller()->AwakeWatched(t->db_index(), dest_key); + es->blocking_controller()->AwakeWatched(t->GetDbIndex(), dest_key); } } @@ -621,7 +621,7 @@ void GenericFamily::Del(CmdArgList args, ConnectionContext* cntx) { bool is_mc = cntx->protocol() == Protocol::MEMCACHE; auto cb = [&result](const Transaction* t, EngineShard* shard) { - ArgSlice args = t->ShardArgsInShard(shard->shard_id()); + ArgSlice args = t->GetShardArgs(shard->shard_id()); auto res = OpDel(t->GetOpArgs(shard), args); result.fetch_add(res.value_or(0), memory_order_relaxed); @@ -672,7 +672,7 @@ void GenericFamily::Exists(CmdArgList args, ConnectionContext* cntx) { atomic_uint32_t result{0}; auto cb = [&result](Transaction* t, EngineShard* shard) { - ArgSlice args = t->ShardArgsInShard(shard->shard_id()); + ArgSlice args = t->GetShardArgs(shard->shard_id()); auto res = OpExists(t->GetOpArgs(shard), args); result.fetch_add(res.value_or(0), memory_order_relaxed); @@ -817,7 +817,7 @@ void GenericFamily::Stick(CmdArgList args, ConnectionContext* cntx) { atomic_uint32_t result{0}; auto cb = [&result](const Transaction* t, EngineShard* shard) { - ArgSlice args = t->ShardArgsInShard(shard->shard_id()); + ArgSlice args = t->GetShardArgs(shard->shard_id()); auto res = OpStick(t->GetOpArgs(shard), args); result.fetch_add(res.value_or(0), memory_order_relaxed); @@ -1159,7 +1159,7 @@ void GenericFamily::Type(CmdArgList args, ConnectionContext* cntx) { std::string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - auto it = shard->db_slice().FindExt(t->db_context(), key).first; + auto it = shard->db_slice().FindExt(t->GetDbContext(), key).first; if (!it.is_done()) { return it->second.ObjType(); } else { @@ -1177,7 +1177,7 @@ void GenericFamily::Type(CmdArgList args, ConnectionContext* cntx) { void GenericFamily::Time(CmdArgList args, ConnectionContext* cntx) { uint64_t now_usec; if (cntx->transaction) { - now_usec = cntx->transaction->db_context().time_now_ms * 1000; + now_usec = cntx->transaction->GetDbContext().time_now_ms * 1000; } else { now_usec = absl::GetCurrentTimeNanos() / 1000; } @@ -1193,7 +1193,7 @@ OpResult GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des Transaction* transaction = cntx->transaction; - if (transaction->unique_shard_cnt() == 1) { + if (transaction->GetUniqueShardCnt() == 1) { auto cb = [&](Transaction* t, EngineShard* shard) { return OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest); }; @@ -1249,14 +1249,14 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) { OpResult GenericFamily::OpTtl(Transaction* t, EngineShard* shard, string_view key) { auto& db_slice = shard->db_slice(); - auto [it, expire_it] = db_slice.FindExt(t->db_context(), key); + auto [it, expire_it] = db_slice.FindExt(t->GetDbContext(), key); if (!IsValid(it)) return OpStatus::KEY_NOTFOUND; if (!IsValid(expire_it)) return OpStatus::SKIPPED; - int64_t ttl_ms = db_slice.ExpireTime(expire_it) - t->db_context().time_now_ms; + int64_t ttl_ms = db_slice.ExpireTime(expire_it) - t->GetDbContext().time_now_ms; DCHECK_GT(ttl_ms, 0); // Otherwise FindExt would return null. return ttl_ms; } diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index 3af4f1a40..a1ab8bd94 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -953,7 +953,7 @@ void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { auto& db_slice = shard->db_slice(); - auto it_res = db_slice.Find(t->db_context(), key, OBJ_HASH); + auto it_res = db_slice.Find(t->GetDbContext(), key, OBJ_HASH); if (!it_res) return it_res.status(); diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 85b374bfa..a0ac0054c 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -124,10 +124,10 @@ OpResult FindFirst(Transaction* trans) { fill(find_res.begin(), find_res.end(), OpStatus::KEY_NOTFOUND); auto cb = [&find_res](auto* t, EngineShard* shard) { - auto args = t->ShardArgsInShard(shard->shard_id()); + auto args = t->GetShardArgs(shard->shard_id()); OpResult> ff_res = - shard->db_slice().FindFirst(t->db_context(), args); + shard->db_slice().FindFirst(t->GetDbContext(), args); if (ff_res) { FFResult ff_result(ff_res->first->first.AsRef(), ff_res->second); @@ -241,7 +241,7 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) { // Block auto wcb = [&](Transaction* t, EngineShard* shard) { - return t->ShardArgsInShard(shard->shard_id()); + return t->GetShardArgs(shard->shard_id()); }; ++stats->num_blocked_clients; @@ -277,16 +277,16 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) { if (shard->shard_id() == ff_result_.sid) { ff_result_.key.GetString(&key_); auto& db_slice = shard->db_slice(); - auto it_res = db_slice.Find(t->db_context(), key_, OBJ_LIST); + auto it_res = db_slice.Find(t->GetDbContext(), key_, OBJ_LIST); CHECK(it_res); // must exist and must be ok. PrimeIterator it = *it_res; quicklist* ql = GetQL(it->second); - db_slice.PreUpdate(t->db_index(), it); + db_slice.PreUpdate(t->GetDbIndex(), it); value_ = ListPop(dir_, ql); - db_slice.PostUpdate(t->db_index(), it, key_); + db_slice.PostUpdate(t->GetDbIndex(), it, key_); if (quicklistCount(ql) == 0) { - CHECK(shard->db_slice().Del(t->db_index(), it)); + CHECK(shard->db_slice().Del(t->GetDbIndex(), it)); } } @@ -475,7 +475,7 @@ OpResult OpPop(const OpArgs& op_args, string_view key, ListDir dir, u OpResult MoveTwoShards(Transaction* trans, string_view src, string_view dest, ListDir src_dir, ListDir dest_dir, bool conclude_on_error) { - DCHECK_EQ(2u, trans->unique_shard_cnt()); + DCHECK_EQ(2u, trans->GetUniqueShardCnt()); OpResult find_res[2]; OpResult result; @@ -487,7 +487,7 @@ OpResult MoveTwoShards(Transaction* trans, string_view src, string_view // the destination. // auto cb = [&](Transaction* t, EngineShard* shard) { - auto args = t->ShardArgsInShard(shard->shard_id()); + auto args = t->GetShardArgs(shard->shard_id()); DCHECK_EQ(1u, args.size()); bool is_dest = args.front() == dest; find_res[is_dest] = Peek(t->GetOpArgs(shard), args.front(), src_dir, !is_dest); @@ -505,7 +505,7 @@ OpResult MoveTwoShards(Transaction* trans, string_view src, string_view } else { // Everything is ok, lets proceed with the mutations. auto cb = [&](Transaction* t, EngineShard* shard) { - auto args = t->ShardArgsInShard(shard->shard_id()); + auto args = t->GetShardArgs(shard->shard_id()); bool is_dest = args.front() == dest; OpArgs op_args = t->GetOpArgs(shard); @@ -782,7 +782,7 @@ void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, Lis ListDir dest_dir) { OpResult result; - if (cntx->transaction->unique_shard_cnt() == 1) { + if (cntx->transaction->GetUniqueShardCnt() == 1) { auto cb = [&](Transaction* t, EngineShard* shard) { return OpMoveSingleShard(t->GetOpArgs(shard), src, dest, src_dir, dest_dir); }; @@ -857,7 +857,7 @@ OpResult BPopPusher::Run(Transaction* t, unsigned msec) { t->Schedule(); - if (t->unique_shard_cnt() == 1) { + if (t->GetUniqueShardCnt() == 1) { return RunSingle(t, tp); } diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 57263c957..d5a542036 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -674,7 +674,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) return (*cntx)->SendError(st); dfly_cntx->transaction = dist_trans.get(); - dfly_cntx->last_command_debug.shards_count = dfly_cntx->transaction->unique_shard_cnt(); + dfly_cntx->last_command_debug.shards_count = dfly_cntx->transaction->GetUniqueShardCnt(); } else { dfly_cntx->transaction = nullptr; } @@ -887,7 +887,7 @@ void Service::Watch(CmdArgList args, ConnectionContext* cntx) { atomic_uint32_t keys_existed = 0; auto cb = [&](Transaction* t, EngineShard* shard) { - ArgSlice largs = t->ShardArgsInShard(shard->shard_id()); + ArgSlice largs = t->GetShardArgs(shard->shard_id()); for (auto k : largs) { shard->db_slice().RegisterWatchedKey(cntx->db_index(), k, &exec_info); } @@ -1081,7 +1081,7 @@ bool CheckWatchedKeyExpiry(ConnectionContext* cntx, const CommandRegistry& regis atomic_uint32_t watch_exist_count{0}; auto cb = [&watch_exist_count](Transaction* t, EngineShard* shard) { - ArgSlice args = t->ShardArgsInShard(shard->shard_id()); + ArgSlice args = t->GetShardArgs(shard->shard_id()); auto res = GenericFamily::OpExists(t->GetOpArgs(shard), args); watch_exist_count.fetch_add(res.value_or(0), memory_order_relaxed); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index e590817ba..b8c3a877d 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1008,7 +1008,7 @@ void ServerFamily::BreakOnShutdown() { void ServerFamily::FlushDb(CmdArgList args, ConnectionContext* cntx) { DCHECK(cntx->transaction); - Drakarys(cntx->transaction, cntx->transaction->db_index()); + Drakarys(cntx->transaction, cntx->transaction->GetDbIndex()); cntx->reply_builder()->SendOk(); } diff --git a/src/server/set_family.cc b/src/server/set_family.cc index 7313fa0cf..c6c83c33b 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -717,19 +717,19 @@ class Mover { }; OpStatus Mover::OpFind(Transaction* t, EngineShard* es) { - ArgSlice largs = t->ShardArgsInShard(es->shard_id()); + ArgSlice largs = t->GetShardArgs(es->shard_id()); // In case both src and dest are in the same shard, largs size will be 2. DCHECK_LE(largs.size(), 2u); for (auto k : largs) { unsigned index = (k == src_) ? 0 : 1; - OpResult res = es->db_slice().Find(t->db_context(), k, OBJ_SET); + OpResult res = es->db_slice().Find(t->GetDbContext(), k, OBJ_SET); if (res && index == 0) { // successful src find. DCHECK(!res->is_done()); const CompactObj& val = res.value()->second; SetType st{val.RObjPtr(), val.Encoding()}; - found_[0] = IsInSet(t->db_context(), st, member_); + found_[0] = IsInSet(t->GetDbContext(), st, member_); } else { found_[index] = res.status(); } @@ -739,7 +739,7 @@ OpStatus Mover::OpFind(Transaction* t, EngineShard* es) { } OpStatus Mover::OpMutate(Transaction* t, EngineShard* es) { - ArgSlice largs = t->ShardArgsInShard(es->shard_id()); + ArgSlice largs = t->GetShardArgs(es->shard_id()); DCHECK_LE(largs.size(), 2u); OpArgs op_args = t->GetOpArgs(es); @@ -868,7 +868,7 @@ OpResult OpDiff(const OpArgs& op_args, ArgSlice keys) { // Read-only OpInter op on sets. OpResult OpInter(const Transaction* t, EngineShard* es, bool remove_first) { - ArgSlice keys = t->ShardArgsInShard(es->shard_id()); + ArgSlice keys = t->GetShardArgs(es->shard_id()); if (remove_first) { keys.remove_prefix(1); } @@ -876,14 +876,15 @@ OpResult OpInter(const Transaction* t, EngineShard* es, bool remove_f StringVec result; if (keys.size() == 1) { - OpResult find_res = es->db_slice().Find(t->db_context(), keys.front(), OBJ_SET); + OpResult find_res = + es->db_slice().Find(t->GetDbContext(), keys.front(), OBJ_SET); if (!find_res) return find_res.status(); PrimeValue& pv = find_res.value()->second; if (IsDenseEncoding(pv)) { StringSet* ss = (StringSet*)pv.RObjPtr(); - ss->set_time(TimeNowSecRel(t->db_context().time_now_ms)); + ss->set_time(TimeNowSecRel(t->GetDbContext().time_now_ms)); } container_utils::IterateSet(find_res.value()->second, @@ -900,7 +901,7 @@ OpResult OpInter(const Transaction* t, EngineShard* es, bool remove_f OpStatus status = OpStatus::OK; for (size_t i = 0; i < keys.size(); ++i) { - OpResult find_res = es->db_slice().Find(t->db_context(), keys[i], OBJ_SET); + OpResult find_res = es->db_slice().Find(t->GetDbContext(), keys[i], OBJ_SET); if (!find_res) { if (status == OpStatus::OK || status == OpStatus::KEY_NOTFOUND || find_res.status() != OpStatus::KEY_NOTFOUND) { @@ -916,7 +917,7 @@ OpResult OpInter(const Transaction* t, EngineShard* es, bool remove_f if (status != OpStatus::OK) return status; - auto comp = [db_contx = t->db_context()](const SetType& left, const SetType& right) { + auto comp = [db_contx = t->GetDbContext()](const SetType& left, const SetType& right) { return SetTypeLen(db_contx, left) < SetTypeLen(db_contx, right); }; @@ -931,7 +932,7 @@ OpResult OpInter(const Transaction* t, EngineShard* es, bool remove_f while (intsetGet(is, ii++, &intele)) { size_t j = 1; for (j = 1; j < sets.size(); j++) { - if (sets[j].first != is && !IsInSet(t->db_context(), sets[j], intele)) + if (sets[j].first != is && !IsInSet(t->GetDbContext(), sets[j], intele)) break; } @@ -941,7 +942,7 @@ OpResult OpInter(const Transaction* t, EngineShard* es, bool remove_f } } } else { - InterStrSet(t->db_context(), sets, &result); + InterStrSet(t->GetDbContext(), sets, &result); } return result; @@ -1068,11 +1069,11 @@ void SIsMember(CmdArgList args, ConnectionContext* cntx) { string_view val = ArgS(args, 2); auto cb = [&](Transaction* t, EngineShard* shard) { - OpResult find_res = shard->db_slice().Find(t->db_context(), key, OBJ_SET); + OpResult find_res = shard->db_slice().Find(t->GetDbContext(), key, OBJ_SET); if (find_res) { SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()}; - return IsInSet(t->db_context(), st, val) ? OpStatus::OK : OpStatus::KEY_NOTFOUND; + return IsInSet(t->GetDbContext(), st, val) ? OpStatus::OK : OpStatus::KEY_NOTFOUND; } return find_res.status(); @@ -1099,10 +1100,10 @@ void SMIsMember(CmdArgList args, ConnectionContext* cntx) { memberships.reserve(vals.size()); auto cb = [&](Transaction* t, EngineShard* shard) { - OpResult find_res = shard->db_slice().Find(t->db_context(), key, OBJ_SET); + OpResult find_res = shard->db_slice().Find(t->GetDbContext(), key, OBJ_SET); if (find_res) { SetType st{find_res.value()->second.RObjPtr(), find_res.value()->second.Encoding()}; - FindInSet(memberships, t->db_context(), st, vals); + FindInSet(memberships, t->GetDbContext(), st, vals); return OpStatus::OK; } return find_res.status(); @@ -1164,7 +1165,7 @@ void SCard(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - OpResult find_res = shard->db_slice().Find(t->db_context(), key, OBJ_SET); + OpResult find_res = shard->db_slice().Find(t->GetDbContext(), key, OBJ_SET); if (!find_res) { return find_res.status(); } @@ -1223,7 +1224,7 @@ void SDiff(CmdArgList args, ConnectionContext* cntx) { ShardId src_shard = Shard(src_key, result_set.size()); auto cb = [&](Transaction* t, EngineShard* shard) { - ArgSlice largs = t->ShardArgsInShard(shard->shard_id()); + ArgSlice largs = t->GetShardArgs(shard->shard_id()); if (shard->shard_id() == src_shard) { CHECK_EQ(src_key, largs.front()); result_set[shard->shard_id()] = OpDiff(t->GetOpArgs(shard), largs); @@ -1259,7 +1260,7 @@ void SDiffStore(CmdArgList args, ConnectionContext* cntx) { // read-only op auto diff_cb = [&](Transaction* t, EngineShard* shard) { - ArgSlice largs = t->ShardArgsInShard(shard->shard_id()); + ArgSlice largs = t->GetShardArgs(shard->shard_id()); DCHECK(!largs.empty()); if (shard->shard_id() == dest_shard) { @@ -1329,7 +1330,7 @@ void SInter(CmdArgList args, ConnectionContext* cntx) { }; cntx->transaction->ScheduleSingleHop(std::move(cb)); - OpResult result = InterResultVec(result_set, cntx->transaction->unique_shard_cnt()); + OpResult result = InterResultVec(result_set, cntx->transaction->GetUniqueShardCnt()); if (result) { SvArray arr = std::move(*result); if (cntx->conn_state.script_info) { // sort under script @@ -1348,7 +1349,7 @@ void SInterStore(CmdArgList args, ConnectionContext* cntx) { atomic_uint32_t inter_shard_cnt{0}; auto inter_cb = [&](Transaction* t, EngineShard* shard) { - ArgSlice largs = t->ShardArgsInShard(shard->shard_id()); + ArgSlice largs = t->GetShardArgs(shard->shard_id()); if (shard->shard_id() == dest_shard) { CHECK_EQ(largs.front(), dest_key); if (largs.size() == 1) @@ -1385,7 +1386,7 @@ void SUnion(CmdArgList args, ConnectionContext* cntx) { ResultStringVec result_set(shard_set->size()); auto cb = [&](Transaction* t, EngineShard* shard) { - ArgSlice largs = t->ShardArgsInShard(shard->shard_id()); + ArgSlice largs = t->GetShardArgs(shard->shard_id()); result_set[shard->shard_id()] = OpUnion(t->GetOpArgs(shard), largs); return OpStatus::OK; }; @@ -1410,7 +1411,7 @@ void SUnionStore(CmdArgList args, ConnectionContext* cntx) { ShardId dest_shard = Shard(dest_key, result_set.size()); auto union_cb = [&](Transaction* t, EngineShard* shard) { - ArgSlice largs = t->ShardArgsInShard(shard->shard_id()); + ArgSlice largs = t->GetShardArgs(shard->shard_id()); if (shard->shard_id() == dest_shard) { CHECK_EQ(largs.front(), dest_key); largs.remove_prefix(1); diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 0f5e334c8..e77a6565e 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -935,7 +935,7 @@ void StringFamily::MGet(CmdArgList args, ConnectionContext* cntx) { continue; MGetResponse& results = mget_resp[sid]; - ArgSlice slice = transaction->ShardArgsInShard(sid); + ArgSlice slice = transaction->GetShardArgs(sid); DCHECK(!slice.empty()); DCHECK_EQ(slice.size(), results.size()); @@ -966,11 +966,11 @@ void StringFamily::MSet(CmdArgList args, ConnectionContext* cntx) { for (size_t i = 1; i < args.size(); ++i) { absl::StrAppend(&str, " ", ArgS(args, i)); } - LOG(INFO) << "MSET/" << transaction->unique_shard_cnt() << str; + LOG(INFO) << "MSET/" << transaction->GetUniqueShardCnt() << str; } auto cb = [&](Transaction* t, EngineShard* shard) { - auto args = t->ShardArgsInShard(shard->shard_id()); + auto args = t->GetShardArgs(shard->shard_id()); return OpMSet(t->GetOpArgs(shard), args); }; @@ -990,9 +990,9 @@ void StringFamily::MSetNx(CmdArgList args, ConnectionContext* cntx) { atomic_bool exists{false}; auto cb = [&](Transaction* t, EngineShard* es) { - auto args = t->ShardArgsInShard(es->shard_id()); + auto args = t->GetShardArgs(es->shard_id()); for (size_t i = 0; i < args.size(); i += 2) { - auto it = es->db_slice().FindExt(t->db_context(), args[i]).first; + auto it = es->db_slice().FindExt(t->GetDbContext(), args[i]).first; if (IsValid(it)) { exists.store(true, memory_order_relaxed); break; @@ -1009,7 +1009,7 @@ void StringFamily::MSetNx(CmdArgList args, ConnectionContext* cntx) { if (to_skip) return OpStatus::OK; - auto args = t->ShardArgsInShard(shard->shard_id()); + auto args = t->GetShardArgs(shard->shard_id()); return OpMSet(t->GetOpArgs(shard), std::move(args)); }; @@ -1022,7 +1022,7 @@ void StringFamily::StrLen(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - OpResult it_res = shard->db_slice().Find(t->db_context(), key, OBJ_STRING); + OpResult it_res = shard->db_slice().Find(t->GetDbContext(), key, OBJ_STRING); if (!it_res.ok()) return it_res.status(); @@ -1102,14 +1102,14 @@ void StringFamily::PSetEx(CmdArgList args, ConnectionContext* cntx) { auto StringFamily::OpMGet(bool fetch_mcflag, bool fetch_mcver, const Transaction* t, EngineShard* shard) -> MGetResponse { - auto args = t->ShardArgsInShard(shard->shard_id()); + auto args = t->GetShardArgs(shard->shard_id()); DCHECK(!args.empty()); MGetResponse response(args.size()); auto& db_slice = shard->db_slice(); for (size_t i = 0; i < args.size(); ++i) { - OpResult it_res = db_slice.Find(t->db_context(), args[i], OBJ_STRING); + OpResult it_res = db_slice.Find(t->GetDbContext(), args[i], OBJ_STRING); if (!it_res) continue; @@ -1118,7 +1118,7 @@ auto StringFamily::OpMGet(bool fetch_mcflag, bool fetch_mcver, const Transaction dest.value = GetString(shard, it->second); if (fetch_mcflag) { - dest.mc_flag = db_slice.GetMCFlag(t->db_index(), it->first); + dest.mc_flag = db_slice.GetMCFlag(t->GetDbIndex(), it->first); if (fetch_mcver) { dest.mc_ver = it.GetVersion(); } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 49e795826..0dba4bdde 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -44,7 +44,7 @@ IntentLock::Mode Transaction::Mode() const { Transaction::Transaction(const CommandId* cid) : cid_(cid) { string_view cmd_name(cid_->name()); if (cmd_name == "EXEC" || cmd_name == "EVAL" || cmd_name == "EVALSHA") { - multi_.reset(new Multi); + multi_.reset(new MultiData); multi_->multi_opts = cid->opt_mask(); if (cmd_name == "EVAL" || cmd_name == "EVALSHA") { @@ -183,7 +183,7 @@ OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) { if (multi_->is_expanding) { multi_->keys.push_back(key); } else { - multi_->locks[key].cnt[int(mode)]++; + multi_->lock_counts[key][mode]++; } }; @@ -376,7 +376,7 @@ bool Transaction::RunInShard(EngineShard* shard) { LogAutoJournalOnShard(shard); // at least the coordinator thread owns the reference. - DCHECK_GE(use_count(), 1u); + DCHECK_GE(GetUseCount(), 1u); // we remove tx from tx-queue upon first invocation. // if it needs to run again it runs via a dedicated continuation_trans_ state in EngineShard. @@ -533,14 +533,13 @@ void Transaction::ScheduleInternal() { } } -void Transaction::LockMulti() { - DCHECK(multi_ && multi_->is_expanding); +void Transaction::MultiData::AddLocks(IntentLock::Mode mode) { + DCHECK(is_expanding); - IntentLock::Mode mode = Mode(); - for (auto key : multi_->keys) { - multi_->locks[key].cnt[int(mode)]++; + for (auto key : keys) { + lock_counts[key][mode]++; } - multi_->keys.clear(); + keys.clear(); } // Optimized "Schedule and execute" function for the most common use-case of a single hop @@ -593,7 +592,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { // Note that the logic here is a bit different from the public Schedule() function. if (multi_) { if (multi_->is_expanding) - LockMulti(); + multi_->AddLocks(Mode()); } else { ScheduleInternal(); } @@ -619,15 +618,15 @@ void Transaction::UnlockMulti() { vector sharded_keys(shard_set->size()); // It's LE and not EQ because there may be callbacks in progress that increase use_count_. - DCHECK_LE(1u, use_count()); + DCHECK_LE(1u, GetUseCount()); - for (const auto& k_v : multi_->locks) { + for (const auto& k_v : multi_->lock_counts) { ShardId sid = Shard(k_v.first, sharded_keys.size()); sharded_keys[sid].push_back(k_v); } if (ServerState::tlocal()->journal()) { - SetMultiUniqueShardCount(); + CalculateUnqiueShardCntForMulti(); } uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed); @@ -637,12 +636,12 @@ void Transaction::UnlockMulti() { shard_set->Add(i, [&] { UnlockMultiShardCb(sharded_keys, EngineShard::tlocal()); }); } WaitForShardCallbacks(); - DCHECK_GE(use_count(), 1u); + DCHECK_GE(GetUseCount(), 1u); VLOG(1) << "UnlockMultiEnd " << DebugId(); } -void Transaction::SetMultiUniqueShardCount() { +void Transaction::CalculateUnqiueShardCntForMulti() { uint32_t prev = run_count_.fetch_add(shard_data_.size(), memory_order_relaxed); DCHECK_EQ(prev, 0u); @@ -671,7 +670,7 @@ void Transaction::SetMultiUniqueShardCount() { void Transaction::Schedule() { if (multi_ && multi_->is_expanding) { - LockMulti(); + multi_->AddLocks(Mode()); } else { ScheduleInternal(); } @@ -809,7 +808,7 @@ void Transaction::RunQuickie(EngineShard* shard) { // runs in coordinator thread. // Marks the transaction as expired and removes it from the waiting queue. -void Transaction::UnwatchBlocking(bool should_expire, WaitKeysPovider wcb) { +void Transaction::UnwatchBlocking(bool should_expire, WaitKeysProvider wcb) { DVLOG(1) << "UnwatchBlocking " << DebugId(); DCHECK(!IsGlobal()); @@ -849,7 +848,7 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const { KeyLockArgs res; res.db_index = db_index_; res.key_step = cid_->key_arg_step(); - res.args = ShardArgsInShard(sid); + res.args = GetShardArgs(sid); return res; } @@ -987,7 +986,7 @@ bool Transaction::CancelShardCb(EngineShard* shard) { } // runs in engine-shard thread. -ArgSlice Transaction::ShardArgsInShard(ShardId sid) const { +ArgSlice Transaction::GetShardArgs(ShardId sid) const { DCHECK(!args_.empty()); // We can read unique_shard_cnt_ only because ShardArgsInShard is called after IsArmedInShard @@ -1010,9 +1009,9 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const { return reverse_index_[sd.arg_start + arg_index]; } -bool Transaction::WaitOnWatch(const time_point& tp, WaitKeysPovider wkeys_provider) { +bool Transaction::WaitOnWatch(const time_point& tp, WaitKeysProvider wkeys_provider) { // Assumes that transaction is pending and scheduled. TODO: To verify it with state machine. - VLOG(2) << "WaitOnWatch Start use_count(" << use_count() << ")"; + VLOG(2) << "WaitOnWatch Start use_count(" << GetUseCount() << ")"; using namespace chrono; auto cb = [&](Transaction* t, EngineShard* shard) { @@ -1103,8 +1102,8 @@ void Transaction::UnlockMultiShardCb(const std::vector& sharded_keys, E ShardId sid = shard->shard_id(); for (const auto& k_v : sharded_keys[sid]) { auto release = [&](IntentLock::Mode mode) { - if (k_v.second.cnt[mode]) { - shard->db_slice().Release(mode, db_index_, k_v.first, k_v.second.cnt[mode]); + if (k_v.second[mode]) { + shard->db_slice().Release(mode, db_index_, k_v.first, k_v.second[mode]); } }; @@ -1236,7 +1235,7 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) { entry_payload = cmd_with_full_args_; } else { auto cmd = facade::ToSV(cmd_with_full_args_.front()); - entry_payload = make_pair(cmd, ShardArgsInShard(shard->shard_id())); + entry_payload = make_pair(cmd, GetShardArgs(shard->shard_id())); } LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_); diff --git a/src/server/transaction.h b/src/server/transaction.h index 660107a77..aa470f896 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -28,15 +28,23 @@ class BlockingController; using facade::OpResult; using facade::OpStatus; +// Central building block of the transactional framework. +// +// Use it to run callbacks on the shard threads - such dispatches are called hops. +// The shards to run on are determined by the keys of the underlying command. +// Global transactions run on all shards. +// +// Use ScheduleSingleHop() if only a single hop is needed. +// Otherwise, schedule the transaction with Schedule() and run successive hops +// with Execute(). class Transaction { friend class BlockingController; Transaction(const Transaction&); void operator=(const Transaction&) = delete; - ~Transaction(); + ~Transaction(); // Transactions are reference counted with intrusive_ptr. - // Transactions are reference counted. friend void intrusive_ptr_add_ref(Transaction* trans) noexcept { trans->use_count_.fetch_add(1, std::memory_order_relaxed); } @@ -49,33 +57,84 @@ class Transaction { } public: - using RunnableType = std::function; using time_point = ::std::chrono::steady_clock::time_point; + // Runnable that is run on shards during hop executions (often named callback). + using RunnableType = std::function; + // Provides keys to block on for specific shard. + using WaitKeysProvider = std::function; + // State on specific shard. enum LocalMask : uint16_t { - ARMED = 1, // Transaction was armed with the callback - OUT_OF_ORDER = 2, - KEYLOCK_ACQUIRED = 4, - SUSPENDED_Q = 0x10, // added by the coordination flow (via WaitBlocked()). - AWAKED_Q = 0x20, // awaked by condition (lpush etc) - EXPIRED_Q = 0x40, // timed-out and should be garbage collected from the blocking queue. + ARMED = 1, // Whether callback cb_ is set + OUT_OF_ORDER = 1 << 1, // Whether its running out of order + KEYLOCK_ACQUIRED = 1 << 2, // Whether its key locks are acquired + SUSPENDED_Q = 1 << 3, // Whether is suspened (by WatchInShard()) + AWAKED_Q = 1 << 4, // Whether it was awakened (by NotifySuspended()) + EXPIRED_Q = 1 << 5, // Whether it timed out and should be dropped }; + public: explicit Transaction(const CommandId* cid); + // Initialize from command (args) on specific db. OpStatus InitByArgs(DbIndex index, CmdArgList args); - void SetExecCmd(const CommandId* cid); + // Get command arguments for specific shard. Called from shard thread. + ArgSlice GetShardArgs(ShardId sid) const; - std::string DebugId() const; - - // Runs in engine thread - ArgSlice ShardArgsInShard(ShardId sid) const; - - // Maps the index in ShardArgsInShard(shard_id) slice back to the index - // in the original array passed to InitByArgs. + // Map arg_index from GetShardArgs slice to index in original command slice from InitByArgs. size_t ReverseArgIndex(ShardId shard_id, size_t arg_index) const; + // Schedule transaction. + // Usually used for multi hop transactions like RENAME or BLPOP. + // For single hop transactions use ScheduleSingleHop instead. + void Schedule(); + + // Execute transaction hop. If conclude is true, it is removed from the pending queue. + void Execute(RunnableType cb, bool conclude); + + // Execute single hop and conclude. + // Callback should return OK for multi key invocations, otherwise return value is ill-defined. + OpStatus ScheduleSingleHop(RunnableType cb); + + // Execute single hop with return value and conclude. + // Can be used only for single key invocations, because it writes a into shared variable. + template auto ScheduleSingleHopT(F&& f) -> decltype(f(this, nullptr)); + + // Called by EngineShard when performing Execute over the tx queue. + // Returns true if transaction should be kept in the queue. + bool RunInShard(EngineShard* shard); + + // Registers transaction into watched queue and blocks until a) either notification is received. + // or b) tp is reached. If tp is time_point::max() then waits indefinitely. + // Expects that the transaction had been scheduled before, and uses Execute(.., true) to register. + // Returns false if timeout occurred, true if was notified by one of the keys. + bool WaitOnWatch(const time_point& tp, WaitKeysProvider cb); + + // Returns true if transaction is awaked, false if it's timed-out and can be removed from the + // blocking queue. NotifySuspended may be called from (multiple) shard threads and + // with each call potentially improving the minimal wake_txid at which + // this transaction has been awaked. + bool NotifySuspended(TxId committed_ts, ShardId sid); + + // Cancel all blocking watches on shutdown. Set COORD_CANCELLED. + void BreakOnShutdown(); + + // Log a journal entry on shard with payload and shard count. + void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, + uint32_t shard_cnt) const; + + // Unlock key locks of a multi transaction. + void UnlockMulti(); + + // Reset CommandId to be executed when executing MULTI/EXEC or from script. + void SetExecCmd(const CommandId* cid); + + // Returns locking arguments needed for DbSlice to Acquire/Release transactional locks. + // Runs in the shard thread. + KeyLockArgs GetLockArgs(ShardId sid) const; + + public: //! Returns true if the transaction spans this shard_id. //! Runs from the coordinator thread. bool IsActive(ShardId shard_id) const { @@ -100,51 +159,19 @@ class Transaction { return shard_data_[SidToId(sid)].local_mask; } - // Schedules a transaction. Usually used for multi-hop transactions like Rename or BLPOP. - // For single hop, use ScheduleSingleHop instead. - void Schedule(); - - // if conclude is true, removes the transaction from the pending queue. - void Execute(RunnableType cb, bool conclude); - - // for multi-key scenarios cb should return Status::Ok since otherwise the return value - // will be ill-defined. - OpStatus ScheduleSingleHop(RunnableType cb); - - // Fits only for single key scenarios because it writes into shared variable res from - // potentially multiple threads. - template auto ScheduleSingleHopT(F&& f) -> decltype(f(this, nullptr)) { - decltype(f(this, nullptr)) res; - - ScheduleSingleHop([&res, f = std::forward(f)](Transaction* t, EngineShard* shard) { - res = f(t, shard); - return res.status(); - }); - return res; - } - - void UnlockMulti(); - // In multi transaciton command we calculate the unique shard count of the trasaction - // after all transaciton commands where executed, by checking the last txid writen to - // all journals. - // This value is writen to journal so that replica we be able to apply the multi command - // atomicaly. - void SetMultiUniqueShardCount(); - TxId txid() const { return txid_; } - // based on cid_->opt_mask. - IntentLock::Mode Mode() const; + IntentLock::Mode Mode() const; // Based on command mask - const char* Name() const; + const char* Name() const; // Based on command name - uint32_t unique_shard_cnt() const { + uint32_t GetUniqueShardCnt() const { return unique_shard_cnt_; } - TxId notify_txid() const { + TxId GetNotifyTxid() const { return notify_txid_.load(std::memory_order_relaxed); } @@ -158,103 +185,43 @@ class Transaction { return coordinator_state_ & COORD_OOO; } - // Registers transaction into watched queue and blocks until a) either notification is received. - // or b) tp is reached. If tp is time_point::max() then waits indefinitely. - // Expects that the transaction had been scheduled before, and uses Execute(.., true) to register. - // Returns false if timeout occurred, true if was notified by one of the keys. - using WaitKeysPovider = std::function; - bool WaitOnWatch(const time_point& tp, WaitKeysPovider cb); - - // Returns true if transaction is awaked, false if it's timed-out and can be removed from the - // blocking queue. NotifySuspended may be called from (multiple) shard threads and - // with each call potentially improving the minimal wake_txid at which - // this transaction has been awaked. - bool NotifySuspended(TxId committed_ts, ShardId sid); - - void BreakOnShutdown(); - - // Called by EngineShard when performing Execute over the tx queue. - // Returns true if transaction should be kept in the queue. - bool RunInShard(EngineShard* shard); - - //! Returns locking arguments needed for DbSlice to Acquire/Release transactional locks. - //! Runs in the shard thread. - KeyLockArgs GetLockArgs(ShardId sid) const; - OpArgs GetOpArgs(EngineShard* shard) const { - return OpArgs{shard, this, db_context()}; + return OpArgs{shard, this, GetDbContext()}; } - DbContext db_context() const { + DbContext GetDbContext() const { return DbContext{.db_index = db_index_, .time_now_ms = time_now_ms_}; } - DbIndex db_index() const { + DbIndex GetDbIndex() const { return db_index_; } - // Log a journal entry on shard with payload. - void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, - uint32_t shard_cnt) const; + std::string DebugId() const; private: + // Holds number of locks for each IntentLock::Mode: shared and exlusive. struct LockCnt { + unsigned& operator[](IntentLock::Mode mode) { + return cnt[int(mode)]; + } + + unsigned operator[](IntentLock::Mode mode) const { + return cnt[int(mode)]; + } + + private: unsigned cnt[2] = {0, 0}; }; using KeyList = std::vector>; - unsigned SidToId(ShardId sid) const { - return sid < shard_data_.size() ? sid : 0; - } - - void ScheduleInternal(); - void LockMulti(); - - void UnwatchBlocking(bool should_expire, WaitKeysPovider wcb); - void ExecuteAsync(); - - // Optimized version of RunInShard for single shard uncontended cases. - void RunQuickie(EngineShard* shard); - - //! Returns true if transaction run out-of-order during the scheduling phase. - bool ScheduleUniqueShard(EngineShard* shard); - - /// Returns pair(schedule_success, lock_granted) - /// schedule_success is true if transaction was scheduled on db_slice. - /// lock_granted is true if lock was granted for all the keys on this shard. - /// Runs in the shard thread. - std::pair ScheduleInShard(EngineShard* shard); - - // Returns true if we need to follow up with PollExecution on this shard. - bool CancelShardCb(EngineShard* shard); - - void UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard* shard); - void UnlockMultiShardCb(const std::vector& sharded_keys, EngineShard* shard); - - // Adds itself to watched queue in the shard. Must run in that shard thread. - OpStatus WatchInShard(ArgSlice keys, EngineShard* shard); - - void WaitForShardCallbacks() { - run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); }); - - // store operations below can not be ordered above the fence - std::atomic_thread_fence(std::memory_order_release); - seqlock_.fetch_add(1, std::memory_order_relaxed); - } - - // Returns the previous value of run count. - uint32_t DecreaseRunCnt(); - - uint32_t use_count() const { - return use_count_.load(std::memory_order_relaxed); - } - - // Log command in the journal of a shard for write commands with auto-journaling enabled. - // Should be called immediately after the last phase (hop). - void LogAutoJournalOnShard(EngineShard* shard); - struct PerShardData { + PerShardData(PerShardData&&) noexcept { + } + + PerShardData() = default; + uint32_t arg_start = 0; // Indices into args_ array. uint16_t arg_count = 0; @@ -265,17 +232,14 @@ class Transaction { // Needed to rollback inconsistent schedulings or remove OOO transactions from // tx queue. uint32_t pq_pos = TxQueue::kEnd; - - PerShardData(PerShardData&&) noexcept { - } - - PerShardData() = default; }; - enum { kPerShardSize = sizeof(PerShardData) }; + // State of a multi transaction. + struct MultiData { + // Increase lock counts for all current keys for mode. Clear keys. + void AddLocks(IntentLock::Mode mode); - struct Multi { - absl::flat_hash_map locks; + absl::flat_hash_map lock_counts; std::vector keys; uint32_t multi_opts = 0; // options of the parent transaction. @@ -285,9 +249,77 @@ class Transaction { bool locks_recorded = false; }; - util::fibers_ext::EventCount blocking_ec_; // used to wake blocking transactions. - util::fibers_ext::EventCount run_ec_; + enum CoordinatorState : uint8_t { + COORD_SCHED = 1, + COORD_EXEC = 2, + // We are running the last execution step in multi-hop operation. + COORD_EXEC_CONCLUDING = 4, + COORD_BLOCKED = 8, + COORD_CANCELLED = 0x10, + COORD_OOO = 0x20, + }; + + private: + // Generic schedule used from Schedule() and ScheduleSingleHop() on slow path. + void ScheduleInternal(); + + // Schedule if only one shard is active. + // Returns true if transaction ran out-of-order during the scheduling phase. + bool ScheduleUniqueShard(EngineShard* shard); + + // Schedule on shards transaction queue. + // Returns pair(schedule_success, lock_granted) + // schedule_success is true if transaction was scheduled on db_slice. + // lock_granted is true if lock was granted for all the keys on this shard. + // Runs in the shard thread. + std::pair ScheduleInShard(EngineShard* shard); + + // Optimized version of RunInShard for single shard uncontended cases. + void RunQuickie(EngineShard* shard); + + void ExecuteAsync(); + + // Adds itself to watched queue in the shard. Must run in that shard thread. + OpStatus WatchInShard(ArgSlice keys, EngineShard* shard); + + void UnwatchBlocking(bool should_expire, WaitKeysProvider wcb); + + // Returns true if we need to follow up with PollExecution on this shard. + bool CancelShardCb(EngineShard* shard); + + void UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard* shard); + + void UnlockMultiShardCb(const std::vector& sharded_keys, EngineShard* shard); + + // Calculate number of unqiue shards for multi transaction after alll commands were executed. + // This value is used in stable state replication to allow applying the command atomically. + void CalculateUnqiueShardCntForMulti(); + + void WaitForShardCallbacks() { + run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); }); + + // store operations below can not be ordered above the fence + std::atomic_thread_fence(std::memory_order_release); + seqlock_.fetch_add(1, std::memory_order_relaxed); + } + + // Log command in shard's journal, if this is a write command with auto-journaling enabled. + // Should be called immediately after the last phase (hop). + void LogAutoJournalOnShard(EngineShard* shard); + + // Returns the previous value of run count. + uint32_t DecreaseRunCnt(); + + uint32_t GetUseCount() const { + return use_count_.load(std::memory_order_relaxed); + } + + unsigned SidToId(ShardId sid) const { + return sid < shard_data_.size() ? sid : 0; + } + + private: // shard_data spans all the shards in ess_. // I wish we could use a dense array of size [0..uniq_shards] but since // multiple threads access this array to synchronize between themselves using @@ -301,39 +333,25 @@ class Transaction { // Stores the full undivided command. CmdArgList cmd_with_full_args_; - // Reverse argument mapping. Allows to reconstruct responses according to the original order of - // keys. + // Reverse argument mapping for ReverseArgIndex to convert from shard index to original index. std::vector reverse_index_; - RunnableType cb_; - std::unique_ptr multi_; // Initialized when the transaction is multi/exec. - - const CommandId* cid_; + RunnableType cb_; // Run on shard threads + const CommandId* cid_; // Underlying command + std::unique_ptr multi_; // Initialized when the transaction is multi/exec. TxId txid_{0}; + DbIndex db_index_{0}; uint64_t time_now_ms_{0}; std::atomic notify_txid_{kuint64max}; std::atomic_uint32_t use_count_{0}, run_count_{0}, seqlock_{0}; - // unique_shard_cnt_ and unique_shard_id_ is accessed only by coordinator thread. + // unique_shard_cnt_ and unique_shard_id_ are accessed only by coordinator thread. uint32_t unique_shard_cnt_{0}; // number of unique shards span by args_ - ShardId unique_shard_id_{kInvalidSid}; - DbIndex db_index_; - // Used for single-hop transactions with unique_shards_ == 1, hence no data-race. - OpStatus local_result_ = OpStatus::OK; - - enum CoordinatorState : uint8_t { - COORD_SCHED = 1, - COORD_EXEC = 2, - - // We are running the last execution step in multi-hop operation. - COORD_EXEC_CONCLUDING = 4, - COORD_BLOCKED = 8, - COORD_CANCELLED = 0x10, - COORD_OOO = 0x20, - }; + util::fibers_ext::EventCount blocking_ec_; // Used to wake blocking transactions. + util::fibers_ext::EventCount run_ec_; // Used to wait for shard callbacks // Transaction coordinator state, written and read by coordinator thread. // Can be read by shard threads as long as we respect ordering rules, i.e. when @@ -341,6 +359,10 @@ class Transaction { // If COORDINATOR_XXX has been set, it means we passed or crossed stage XXX. uint8_t coordinator_state_ = 0; + // Used for single-hop transactions with unique_shards_ == 1, hence no data-race. + OpStatus local_result_ = OpStatus::OK; + + private: struct PerShardCache { std::vector args; std::vector original_index; @@ -359,6 +381,16 @@ class Transaction { static thread_local TLTmpSpace tmp_space; }; +template auto Transaction::ScheduleSingleHopT(F&& f) -> decltype(f(this, nullptr)) { + decltype(f(this, nullptr)) res; + + ScheduleSingleHop([&res, f = std::forward(f)](Transaction* t, EngineShard* shard) { + res = f(t, shard); + return res.status(); + }); + return res; +} + inline uint16_t trans_id(const Transaction* ptr) { return (intptr_t(ptr) >> 8) & 0xFFFF; } diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index 6e98ce1a8..1869982a3 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -723,7 +723,7 @@ OpResult FindShardKeysAndWeights(EngineShard* shard, Transacti auto& db_slice = shard->db_slice(); KeyIterWeightVec key_weight_vec(keys.size() - src_keys_offset); for (unsigned j = src_keys_offset; j < keys.size(); ++j) { - auto it_res = db_slice.Find(t->db_context(), keys[j], OBJ_ZSET); + auto it_res = db_slice.Find(t->GetDbContext(), keys[j], OBJ_ZSET); if (it_res == OpStatus::WRONG_TYPE) // TODO: support sets with default score 1. return it_res.status(); if (!it_res) @@ -738,7 +738,7 @@ OpResult FindShardKeysAndWeights(EngineShard* shard, Transacti OpResult OpUnion(EngineShard* shard, Transaction* t, string_view dest, AggType agg_type, const vector& weights, bool store) { - ArgSlice keys = t->ShardArgsInShard(shard->shard_id()); + ArgSlice keys = t->GetShardArgs(shard->shard_id()); DVLOG(1) << "shard:" << shard->shard_id() << ", keys " << vector(keys.begin(), keys.end()); DCHECK(!keys.empty()); @@ -769,7 +769,7 @@ OpResult OpUnion(EngineShard* shard, Transaction* t, string_view dest OpResult OpInter(EngineShard* shard, Transaction* t, string_view dest, AggType agg_type, const vector& weights, bool store) { - ArgSlice keys = t->ShardArgsInShard(shard->shard_id()); + ArgSlice keys = t->GetShardArgs(shard->shard_id()); DVLOG(1) << "shard:" << shard->shard_id() << ", keys " << vector(keys.begin(), keys.end()); DCHECK(!keys.empty()); @@ -785,7 +785,7 @@ OpResult OpInter(EngineShard* shard, Transaction* t, string_view dest return OpStatus::SKIPPED; // return noop for (unsigned j = start; j < keys.size(); ++j) { - auto it_res = db_slice.Find(t->db_context(), keys[j], OBJ_ZSET); + auto it_res = db_slice.Find(t->GetDbContext(), keys[j], OBJ_ZSET); if (it_res == OpStatus::WRONG_TYPE) // TODO: support sets with default score 1. return it_res.status(); @@ -1177,7 +1177,7 @@ void ZSetFamily::ZCard(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - OpResult find_res = shard->db_slice().Find(t->db_context(), key, OBJ_ZSET); + OpResult find_res = shard->db_slice().Find(t->GetDbContext(), key, OBJ_ZSET); if (!find_res) { return find_res.status(); }