diff --git a/src/server/bitops_family.cc b/src/server/bitops_family.cc index cd7d1fda7..e8e01c6ea 100644 --- a/src/server/bitops_family.cc +++ b/src/server/bitops_family.cc @@ -1158,7 +1158,6 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) { return OpStatus::OK; }; - cntx->transaction->Schedule(); cntx->transaction->Execute(std::move(shard_bitop), false); // we still have more work to do // All result from each shard const auto joined_results = CombineResultOp(result_set, op); diff --git a/src/server/blocking_controller_test.cc b/src/server/blocking_controller_test.cc index e9af80b7a..6d1e746fc 100644 --- a/src/server/blocking_controller_test.cc +++ b/src/server/blocking_controller_test.cc @@ -96,7 +96,6 @@ TEST_F(BlockingControllerTest, Timeout) { bool blocked; bool paused; - trans_->Schedule(); auto cb = [&](Transaction* t, EngineShard* shard) { return trans_->GetShardArgs(0); }; facade::OpStatus status = trans_->WaitOnWatch( diff --git a/src/server/container_utils.cc b/src/server/container_utils.cc index 0af7f1b35..896c336c6 100644 --- a/src/server/container_utils.cc +++ b/src/server/container_utils.cc @@ -272,7 +272,6 @@ OpResult RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty else result = res.status(); } else { - trans->Schedule(); result = FindFirstNonEmpty(trans, req_obj_type); } diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 30d03db9c..86c255de5 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -78,9 +78,6 @@ struct TransactionGuard { }; explicit TransactionGuard(Transaction* t, bool disable_expirations = false) : t(t) { - VLOG(2) << "Transaction guard try schedule"; - t->Schedule(); - VLOG(2) << "Transaction guard schedule"; t->Execute( [disable_expirations](Transaction* t, EngineShard* shard) { if (disable_expirations) { diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index d2550eb17..fcc0c3a27 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -1313,7 +1313,6 @@ OpResult GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des return result; } - transaction->Schedule(); unsigned shard_count = shard_set->size(); Renamer renamer{Shard(key[0], shard_count)}; diff --git a/src/server/hll_family.cc b/src/server/hll_family.cc index 71ad0b586..ac3c4de64 100644 --- a/src/server/hll_family.cc +++ b/src/server/hll_family.cc @@ -263,7 +263,6 @@ OpResult PFMergeInternal(CmdArgList args, ConnectionContext* cntx) { }; Transaction* trans = cntx->transaction; - trans->Schedule(); trans->Execute(std::move(cb), false); if (!success) { diff --git a/src/server/list_family.cc b/src/server/list_family.cc index 10a0115de..f02f04cff 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -722,7 +722,6 @@ void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, Lis }; result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); } else { - cntx->transaction->Schedule(); result = MoveTwoShards(cntx->transaction, src, dest, src_dir, dest_dir, true); } @@ -834,8 +833,6 @@ OpResult BPopPusher::Run(ConnectionContext* cntx, unsigned limit_ms) { time_point tp = limit_ms ? chrono::steady_clock::now() + chrono::milliseconds(limit_ms) : time_point::max(); - cntx->transaction->Schedule(); - if (cntx->transaction->GetUniqueShardCnt() == 1) { return RunSingle(cntx, tp); } diff --git a/src/server/search/search_family.cc b/src/server/search/search_family.cc index 082a17c74..7b190c104 100644 --- a/src/server/search/search_family.cc +++ b/src/server/search/search_family.cc @@ -441,8 +441,6 @@ void SearchFamily::FtCreate(CmdArgList args, ConnectionContext* cntx) { if (auto err = parser.Error(); err) return cntx->SendError(err->MakeReply()); - cntx->transaction->Schedule(); - // Check if index already exists atomic_uint exists_cnt = 0; cntx->transaction->Execute( diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 8de79e3c7..f9b5ff118 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1425,8 +1425,6 @@ bool ServerFamily::TEST_IsSaving() const { error_code ServerFamily::Drakarys(Transaction* transaction, DbIndex db_ind) { VLOG(1) << "Drakarys"; - transaction->Schedule(); // TODO: to convert to ScheduleSingleHop ? - transaction->Execute( [db_ind](Transaction* t, EngineShard* shard) { shard->db_slice().FlushDb(db_ind); diff --git a/src/server/set_family.cc b/src/server/set_family.cc index 7240c022a..1a3fdb520 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -991,8 +991,6 @@ void SMove(CmdArgList args, ConnectionContext* cntx) { string_view member = ArgS(args, 2); Mover mover{src, dest, member, true}; - cntx->transaction->Schedule(); - mover.Find(cntx->transaction); OpResult result = mover.Commit(cntx->transaction); @@ -1149,7 +1147,6 @@ void SDiffStore(CmdArgList args, ConnectionContext* cntx) { return OpStatus::OK; }; - cntx->transaction->Schedule(); cntx->transaction->Execute(std::move(diff_cb), false); ResultSetView rsv = DiffResultVec(result_set, src_shard); if (!rsv) { @@ -1290,7 +1287,6 @@ void SInterStore(CmdArgList args, ConnectionContext* cntx) { return OpStatus::OK; }; - cntx->transaction->Schedule(); cntx->transaction->Execute(std::move(inter_cb), false); OpResult result = InterResultVec(result_set, inter_shard_cnt.load(memory_order_relaxed)); @@ -1378,7 +1374,6 @@ void SUnionStore(CmdArgList args, ConnectionContext* cntx) { return OpStatus::OK; }; - cntx->transaction->Schedule(); cntx->transaction->Execute(std::move(union_cb), false); ResultSetView unionset = UnionResultVec(result_set); diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 30c3a9d18..ac58427b2 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -3045,7 +3045,7 @@ void XReadGeneric(CmdArgList args, bool read_group, ConnectionContext* cntx) { res_pairs[sid] = OpGetGroupConsumerPairs(s_args, t->GetOpArgs(shard), gc_opts); return OpStatus::OK; }; - cntx->transaction->Schedule(); + if (opts->read_group) { // If the command is `XReadGroup`, we need to get // the (group, consumer) pairs for each key. diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 23adb6f7d..fbc7a657e 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -1243,8 +1243,6 @@ void StringFamily::MSet(CmdArgList args, ConnectionContext* cntx) { void StringFamily::MSetNx(CmdArgList args, ConnectionContext* cntx) { Transaction* transaction = cntx->transaction; - transaction->Schedule(); - atomic_bool exists{false}; auto cb = [&](Transaction* t, EngineShard* es) { diff --git a/src/server/test_utils.cc b/src/server/test_utils.cc index 3b3197755..062c691d6 100644 --- a/src/server/test_utils.cc +++ b/src/server/test_utils.cc @@ -81,7 +81,6 @@ void TransactionSuspension::Start() { auto st = transaction_->InitByArgs(0, {}); CHECK_EQ(st, OpStatus::OK); - transaction_->Schedule(); transaction_->Execute([](Transaction* t, EngineShard* shard) { return OpStatus::OK; }, false); } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 7bd863b63..d4c57091d 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -834,10 +834,6 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { return local_result_; } -void Transaction::Schedule() { - // no-op -} - // Runs in coordinator thread. void Transaction::Execute(RunnableType cb, bool conclude) { if (multi_ && multi_->role == SQUASHED_STUB) { diff --git a/src/server/transaction.h b/src/server/transaction.h index 07341166f..ab6926598 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -43,10 +43,6 @@ using facade::OpStatus; // The shards to run on are determined by the keys of the underlying command. // Global transactions run on all shards. // -// Use ScheduleSingleHop() if only a single hop is needed. -// Otherwise, schedule the transaction with Schedule() and run successive hops -// with Execute(). -// // 1. Multi transactions // // Multi transactions are handled by a single transaction, which exposes the same interface for @@ -184,11 +180,6 @@ class Transaction { // Map arg_index from GetShardArgs slice to index in original command slice from InitByArgs. size_t ReverseArgIndex(ShardId shard_id, size_t arg_index) const; - // Schedule transaction. - // Usually used for multi hop transactions like RENAME or BLPOP. - // For single hop transactions use ScheduleSingleHop instead. - void Schedule(); - // Execute transaction hop. If conclude is true, it is removed from the pending queue. void Execute(RunnableType cb, bool conclude); @@ -500,7 +491,6 @@ class Transaction { // "Launder" keys by filtering uniques and replacing pointers with same lifetime as transaction. void LaunderKeyStorage(CmdArgVec* keys); - // Generic schedule used from Schedule() and ScheduleSingleHop() on slow path. void ScheduleInternal(); // Schedule on shards transaction queue. Returns true if scheduled successfully, diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index 3418d2b55..77147e94a 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -1207,8 +1207,6 @@ void ZUnionFamilyInternal(CmdArgList args, bool store, ConnectionContext* cntx) return OpStatus::OK; }; - cntx->transaction->Schedule(); - // For commands not storing computed result, this should be // the last transaction hop (e.g. ZUNION) cntx->transaction->Execute(std::move(cb), !store); @@ -2014,7 +2012,6 @@ void ZSetFamily::ZInterStore(CmdArgList args, ConnectionContext* cntx) { return OpStatus::OK; }; - cntx->transaction->Schedule(); cntx->transaction->Execute(std::move(cb), false); OpResult result = IntersectResults(maps, op_args.agg_type); @@ -2814,7 +2811,7 @@ void GeoSearchStoreGeneric(ConnectionContext* cntx, const GeoShape& shape_ref, s auto* rb = static_cast(cntx->reply_builder()); ShardId from_shard = Shard(key, shard_set->size()); - cntx->transaction->Schedule(); + if (!member.empty()) { // get shape.xy from member OpResult member_score;