chore: Remove Schedule() call (#2938)

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-04-21 11:32:44 +03:00 committed by GitHub
parent 322b2e7ac1
commit 9b9c32c91d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
16 changed files with 2 additions and 42 deletions

View file

@ -1158,7 +1158,6 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) {
return OpStatus::OK; return OpStatus::OK;
}; };
cntx->transaction->Schedule();
cntx->transaction->Execute(std::move(shard_bitop), false); // we still have more work to do cntx->transaction->Execute(std::move(shard_bitop), false); // we still have more work to do
// All result from each shard // All result from each shard
const auto joined_results = CombineResultOp(result_set, op); const auto joined_results = CombineResultOp(result_set, op);

View file

@ -96,7 +96,6 @@ TEST_F(BlockingControllerTest, Timeout) {
bool blocked; bool blocked;
bool paused; bool paused;
trans_->Schedule();
auto cb = [&](Transaction* t, EngineShard* shard) { return trans_->GetShardArgs(0); }; auto cb = [&](Transaction* t, EngineShard* shard) { return trans_->GetShardArgs(0); };
facade::OpStatus status = trans_->WaitOnWatch( facade::OpStatus status = trans_->WaitOnWatch(

View file

@ -272,7 +272,6 @@ OpResult<string> RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty
else else
result = res.status(); result = res.status();
} else { } else {
trans->Schedule();
result = FindFirstNonEmpty(trans, req_obj_type); result = FindFirstNonEmpty(trans, req_obj_type);
} }

View file

@ -78,9 +78,6 @@ struct TransactionGuard {
}; };
explicit TransactionGuard(Transaction* t, bool disable_expirations = false) : t(t) { explicit TransactionGuard(Transaction* t, bool disable_expirations = false) : t(t) {
VLOG(2) << "Transaction guard try schedule";
t->Schedule();
VLOG(2) << "Transaction guard schedule";
t->Execute( t->Execute(
[disable_expirations](Transaction* t, EngineShard* shard) { [disable_expirations](Transaction* t, EngineShard* shard) {
if (disable_expirations) { if (disable_expirations) {

View file

@ -1313,7 +1313,6 @@ OpResult<void> GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des
return result; return result;
} }
transaction->Schedule();
unsigned shard_count = shard_set->size(); unsigned shard_count = shard_set->size();
Renamer renamer{Shard(key[0], shard_count)}; Renamer renamer{Shard(key[0], shard_count)};

View file

@ -263,7 +263,6 @@ OpResult<int> PFMergeInternal(CmdArgList args, ConnectionContext* cntx) {
}; };
Transaction* trans = cntx->transaction; Transaction* trans = cntx->transaction;
trans->Schedule();
trans->Execute(std::move(cb), false); trans->Execute(std::move(cb), false);
if (!success) { if (!success) {

View file

@ -722,7 +722,6 @@ void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, Lis
}; };
result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
} else { } else {
cntx->transaction->Schedule();
result = MoveTwoShards(cntx->transaction, src, dest, src_dir, dest_dir, true); result = MoveTwoShards(cntx->transaction, src, dest, src_dir, dest_dir, true);
} }
@ -834,8 +833,6 @@ OpResult<string> BPopPusher::Run(ConnectionContext* cntx, unsigned limit_ms) {
time_point tp = time_point tp =
limit_ms ? chrono::steady_clock::now() + chrono::milliseconds(limit_ms) : time_point::max(); limit_ms ? chrono::steady_clock::now() + chrono::milliseconds(limit_ms) : time_point::max();
cntx->transaction->Schedule();
if (cntx->transaction->GetUniqueShardCnt() == 1) { if (cntx->transaction->GetUniqueShardCnt() == 1) {
return RunSingle(cntx, tp); return RunSingle(cntx, tp);
} }

View file

@ -441,8 +441,6 @@ void SearchFamily::FtCreate(CmdArgList args, ConnectionContext* cntx) {
if (auto err = parser.Error(); err) if (auto err = parser.Error(); err)
return cntx->SendError(err->MakeReply()); return cntx->SendError(err->MakeReply());
cntx->transaction->Schedule();
// Check if index already exists // Check if index already exists
atomic_uint exists_cnt = 0; atomic_uint exists_cnt = 0;
cntx->transaction->Execute( cntx->transaction->Execute(

View file

@ -1425,8 +1425,6 @@ bool ServerFamily::TEST_IsSaving() const {
error_code ServerFamily::Drakarys(Transaction* transaction, DbIndex db_ind) { error_code ServerFamily::Drakarys(Transaction* transaction, DbIndex db_ind) {
VLOG(1) << "Drakarys"; VLOG(1) << "Drakarys";
transaction->Schedule(); // TODO: to convert to ScheduleSingleHop ?
transaction->Execute( transaction->Execute(
[db_ind](Transaction* t, EngineShard* shard) { [db_ind](Transaction* t, EngineShard* shard) {
shard->db_slice().FlushDb(db_ind); shard->db_slice().FlushDb(db_ind);

View file

@ -991,8 +991,6 @@ void SMove(CmdArgList args, ConnectionContext* cntx) {
string_view member = ArgS(args, 2); string_view member = ArgS(args, 2);
Mover mover{src, dest, member, true}; Mover mover{src, dest, member, true};
cntx->transaction->Schedule();
mover.Find(cntx->transaction); mover.Find(cntx->transaction);
OpResult<unsigned> result = mover.Commit(cntx->transaction); OpResult<unsigned> result = mover.Commit(cntx->transaction);
@ -1149,7 +1147,6 @@ void SDiffStore(CmdArgList args, ConnectionContext* cntx) {
return OpStatus::OK; return OpStatus::OK;
}; };
cntx->transaction->Schedule();
cntx->transaction->Execute(std::move(diff_cb), false); cntx->transaction->Execute(std::move(diff_cb), false);
ResultSetView rsv = DiffResultVec(result_set, src_shard); ResultSetView rsv = DiffResultVec(result_set, src_shard);
if (!rsv) { if (!rsv) {
@ -1290,7 +1287,6 @@ void SInterStore(CmdArgList args, ConnectionContext* cntx) {
return OpStatus::OK; return OpStatus::OK;
}; };
cntx->transaction->Schedule();
cntx->transaction->Execute(std::move(inter_cb), false); cntx->transaction->Execute(std::move(inter_cb), false);
OpResult<SvArray> result = InterResultVec(result_set, inter_shard_cnt.load(memory_order_relaxed)); OpResult<SvArray> result = InterResultVec(result_set, inter_shard_cnt.load(memory_order_relaxed));
@ -1378,7 +1374,6 @@ void SUnionStore(CmdArgList args, ConnectionContext* cntx) {
return OpStatus::OK; return OpStatus::OK;
}; };
cntx->transaction->Schedule();
cntx->transaction->Execute(std::move(union_cb), false); cntx->transaction->Execute(std::move(union_cb), false);
ResultSetView unionset = UnionResultVec(result_set); ResultSetView unionset = UnionResultVec(result_set);

View file

@ -3045,7 +3045,7 @@ void XReadGeneric(CmdArgList args, bool read_group, ConnectionContext* cntx) {
res_pairs[sid] = OpGetGroupConsumerPairs(s_args, t->GetOpArgs(shard), gc_opts); res_pairs[sid] = OpGetGroupConsumerPairs(s_args, t->GetOpArgs(shard), gc_opts);
return OpStatus::OK; return OpStatus::OK;
}; };
cntx->transaction->Schedule();
if (opts->read_group) { if (opts->read_group) {
// If the command is `XReadGroup`, we need to get // If the command is `XReadGroup`, we need to get
// the (group, consumer) pairs for each key. // the (group, consumer) pairs for each key.

View file

@ -1243,8 +1243,6 @@ void StringFamily::MSet(CmdArgList args, ConnectionContext* cntx) {
void StringFamily::MSetNx(CmdArgList args, ConnectionContext* cntx) { void StringFamily::MSetNx(CmdArgList args, ConnectionContext* cntx) {
Transaction* transaction = cntx->transaction; Transaction* transaction = cntx->transaction;
transaction->Schedule();
atomic_bool exists{false}; atomic_bool exists{false};
auto cb = [&](Transaction* t, EngineShard* es) { auto cb = [&](Transaction* t, EngineShard* es) {

View file

@ -81,7 +81,6 @@ void TransactionSuspension::Start() {
auto st = transaction_->InitByArgs(0, {}); auto st = transaction_->InitByArgs(0, {});
CHECK_EQ(st, OpStatus::OK); CHECK_EQ(st, OpStatus::OK);
transaction_->Schedule();
transaction_->Execute([](Transaction* t, EngineShard* shard) { return OpStatus::OK; }, false); transaction_->Execute([](Transaction* t, EngineShard* shard) { return OpStatus::OK; }, false);
} }

View file

@ -834,10 +834,6 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
return local_result_; return local_result_;
} }
void Transaction::Schedule() {
// no-op
}
// Runs in coordinator thread. // Runs in coordinator thread.
void Transaction::Execute(RunnableType cb, bool conclude) { void Transaction::Execute(RunnableType cb, bool conclude) {
if (multi_ && multi_->role == SQUASHED_STUB) { if (multi_ && multi_->role == SQUASHED_STUB) {

View file

@ -43,10 +43,6 @@ using facade::OpStatus;
// The shards to run on are determined by the keys of the underlying command. // The shards to run on are determined by the keys of the underlying command.
// Global transactions run on all shards. // Global transactions run on all shards.
// //
// Use ScheduleSingleHop() if only a single hop is needed.
// Otherwise, schedule the transaction with Schedule() and run successive hops
// with Execute().
//
// 1. Multi transactions // 1. Multi transactions
// //
// Multi transactions are handled by a single transaction, which exposes the same interface for // Multi transactions are handled by a single transaction, which exposes the same interface for
@ -184,11 +180,6 @@ class Transaction {
// Map arg_index from GetShardArgs slice to index in original command slice from InitByArgs. // Map arg_index from GetShardArgs slice to index in original command slice from InitByArgs.
size_t ReverseArgIndex(ShardId shard_id, size_t arg_index) const; size_t ReverseArgIndex(ShardId shard_id, size_t arg_index) const;
// Schedule transaction.
// Usually used for multi hop transactions like RENAME or BLPOP.
// For single hop transactions use ScheduleSingleHop instead.
void Schedule();
// Execute transaction hop. If conclude is true, it is removed from the pending queue. // Execute transaction hop. If conclude is true, it is removed from the pending queue.
void Execute(RunnableType cb, bool conclude); void Execute(RunnableType cb, bool conclude);
@ -500,7 +491,6 @@ class Transaction {
// "Launder" keys by filtering uniques and replacing pointers with same lifetime as transaction. // "Launder" keys by filtering uniques and replacing pointers with same lifetime as transaction.
void LaunderKeyStorage(CmdArgVec* keys); void LaunderKeyStorage(CmdArgVec* keys);
// Generic schedule used from Schedule() and ScheduleSingleHop() on slow path.
void ScheduleInternal(); void ScheduleInternal();
// Schedule on shards transaction queue. Returns true if scheduled successfully, // Schedule on shards transaction queue. Returns true if scheduled successfully,

View file

@ -1207,8 +1207,6 @@ void ZUnionFamilyInternal(CmdArgList args, bool store, ConnectionContext* cntx)
return OpStatus::OK; return OpStatus::OK;
}; };
cntx->transaction->Schedule();
// For commands not storing computed result, this should be // For commands not storing computed result, this should be
// the last transaction hop (e.g. ZUNION) // the last transaction hop (e.g. ZUNION)
cntx->transaction->Execute(std::move(cb), !store); cntx->transaction->Execute(std::move(cb), !store);
@ -2014,7 +2012,6 @@ void ZSetFamily::ZInterStore(CmdArgList args, ConnectionContext* cntx) {
return OpStatus::OK; return OpStatus::OK;
}; };
cntx->transaction->Schedule();
cntx->transaction->Execute(std::move(cb), false); cntx->transaction->Execute(std::move(cb), false);
OpResult<ScoredMap> result = IntersectResults(maps, op_args.agg_type); OpResult<ScoredMap> result = IntersectResults(maps, op_args.agg_type);
@ -2814,7 +2811,7 @@ void GeoSearchStoreGeneric(ConnectionContext* cntx, const GeoShape& shape_ref, s
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
ShardId from_shard = Shard(key, shard_set->size()); ShardId from_shard = Shard(key, shard_set->size());
cntx->transaction->Schedule();
if (!member.empty()) { if (!member.empty()) {
// get shape.xy from member // get shape.xy from member
OpResult<double> member_score; OpResult<double> member_score;