diff --git a/src/server/container_utils.cc b/src/server/container_utils.cc index 90c18a9a2..371d0edff 100644 --- a/src/server/container_utils.cc +++ b/src/server/container_utils.cc @@ -3,6 +3,7 @@ // #include "server/container_utils.h" +#include "base/flags.h" #include "base/logging.h" #include "core/sorted_map.h" #include "core/string_map.h" @@ -21,8 +22,101 @@ extern "C" { #include "redis/zset.h" } +ABSL_FLAG(bool, singlehop_blocking, true, "Use single hop optimization for blocking commands"); + namespace dfly::container_utils { +namespace { + +struct ShardFFResult { + PrimeKey key; + ShardId sid = kInvalidSid; +}; + +// Find first non-empty key of a single shard transaction, pass it to `func` and return the key. +// If no such key exists or a wrong type is found, the apropriate status is returned. +// Optimized version of `FindFirstNonEmpty` below. +OpResult FindFirstNonEmptySingleShard(Transaction* trans, int req_obj_type, + BlockingResultCb func) { + DCHECK_EQ(trans->GetUniqueShardCnt(), 1u); + std::string key; + auto cb = [&](Transaction* t, EngineShard* shard) -> Transaction::RunnableResult { + auto args = t->GetShardArgs(shard->shard_id()); + auto ff_res = shard->db_slice().FindFirstReadOnly(t->GetDbContext(), args, req_obj_type); + + if (ff_res == OpStatus::WRONG_TYPE) + return OpStatus::WRONG_TYPE; + + if (ff_res == OpStatus::KEY_NOTFOUND) + return {OpStatus::KEY_NOTFOUND, Transaction::RunnableResult::AVOID_CONCLUDING}; + + CHECK(ff_res.ok()); // No other errors possible + ff_res->first->first.GetString(&key); + func(t, shard, key); + return OpStatus::OK; + }; + + // Schedule single hop and hopefully find a key, otherwise avoid concluding + OpStatus status = trans->ScheduleSingleHop(cb); + if (status == OpStatus::OK) + return key; + return status; +} + +// Find first non-empty key (sorted by order in command arguments) and return it, +// otherwise return not found or wrong type error. +OpResult FindFirstNonEmpty(Transaction* trans, int req_obj_type) { + DCHECK_GT(trans->GetUniqueShardCnt(), 1u); + + using FFResult = std::tuple; // key, argument index, sid + VLOG(2) << "FindFirst::Find " << trans->DebugId(); + + // Holds Find results: (iterator to a found key, and its index in the passed arguments). + // See DbSlice::FindFirst for more details. + std::vector> find_res(shard_set->size()); + std::fill(find_res.begin(), find_res.end(), OpStatus::KEY_NOTFOUND); + + auto cb = [&](Transaction* t, EngineShard* shard) { + auto args = t->GetShardArgs(shard->shard_id()); + auto ff_res = shard->db_slice().FindFirstReadOnly(t->GetDbContext(), args, req_obj_type); + if (ff_res) { + find_res[shard->shard_id()] = + FFResult{ff_res->first->first.AsRef(), ff_res->second, shard->shard_id()}; + } else { + find_res[shard->shard_id()] = ff_res.status(); + } + return OpStatus::OK; + }; + + trans->Execute(std::move(cb), false); + + // If any key is of the wrong type, report it immediately + if (std::find(find_res.begin(), find_res.end(), OpStatus::WRONG_TYPE) != find_res.end()) + return OpStatus::WRONG_TYPE; + + // Order result by their keys position in the command arguments, push errors to back + auto comp = [trans](const OpResult& lhs, const OpResult& rhs) { + if (!lhs || !rhs) + return lhs.ok(); + size_t i1 = trans->ReverseArgIndex(std::get(*lhs), std::get(*lhs)); + size_t i2 = trans->ReverseArgIndex(std::get(*rhs), std::get(*rhs)); + return i1 < i2; + }; + + // Find first element by the order above, so the first key. Returns error only if all are errors + auto it = std::min_element(find_res.begin(), find_res.end(), comp); + DCHECK(it != find_res.end()); + + if (*it == OpStatus::KEY_NOTFOUND) + return OpStatus::KEY_NOTFOUND; + + CHECK(it->ok()); // No other errors than WRONG_TYPE and KEY_NOTFOUND + FFResult& res = **it; + return ShardFFResult{std::get(res).AsRef(), std::get(res)}; +} + +} // namespace + using namespace std; quicklistEntry QLEntry() { @@ -174,76 +268,25 @@ string_view LpGetView(uint8_t* lp_it, uint8_t int_buf[]) { return std::string_view{reinterpret_cast(elem), size_t(ele_len)}; } -OpResult FindFirstNonEmptyKey(Transaction* trans, int req_obj_type) { - using FFResult = std::pair; // key, argument index. - VLOG(2) << "FindFirst::Find " << trans->DebugId(); - - // Holds Find results: (iterator to a found key, and its index in the passed arguments). - // See DbSlice::FindFirst for more details. - // spans all the shards for now. - std::vector> find_res(shard_set->size()); - std::fill(find_res.begin(), find_res.end(), OpStatus::KEY_NOTFOUND); - - auto cb = [&](Transaction* t, EngineShard* shard) { - auto args = t->GetShardArgs(shard->shard_id()); - OpResult> ff_res = - shard->db_slice().FindFirstReadOnly(t->GetDbContext(), args, req_obj_type); - - if (ff_res) { - FFResult ff_result(ff_res->first->first.AsRef(), ff_res->second); - find_res[shard->shard_id()] = std::move(ff_result); - } else { - find_res[shard->shard_id()] = ff_res.status(); - } - - return OpStatus::OK; - }; - - trans->Execute(std::move(cb), false); - - uint32_t min_arg_indx = UINT32_MAX; - ShardFFResult shard_result; - - // We iterate over all results to find the key with the minimal arg_index - // after reversing the arg indexing permutation. - for (size_t sid = 0; sid < find_res.size(); ++sid) { - const auto& fr = find_res[sid]; - auto status = fr.status(); - if (status == OpStatus::KEY_NOTFOUND) - continue; - if (status == OpStatus::WRONG_TYPE) { - return status; - } - CHECK(fr); - - const auto& it_pos = fr.value(); - - size_t arg_indx = trans->ReverseArgIndex(sid, it_pos.second); - if (arg_indx < min_arg_indx) { - min_arg_indx = arg_indx; - shard_result.sid = sid; - - // we do not dereference the key, do not extract the string value, so it it - // ok to just move it. We can not dereference it due to limitations of SmallString - // that rely on thread-local data-structure for pointer translation. - shard_result.key = it_pos.first.AsRef(); - } - } - - if (shard_result.sid == kInvalidSid) { - return OpStatus::KEY_NOTFOUND; - } - - return OpResult{std::move(shard_result)}; -} - OpResult RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_type, BlockingResultCb func, unsigned limit_ms, bool* block_flag) { - trans->Schedule(); - string result_key; - OpResult result = FindFirstNonEmptyKey(trans, req_obj_type); + + // Fast path. If we have only a single shard, we can run opportunistically with a single hop. + // If we don't find anything, we abort concluding and keep scheduled. + // Slow path: schedule, find results from shards, execute action if found. + OpResult result; + if (trans->GetUniqueShardCnt() == 1 && absl::GetFlag(FLAGS_singlehop_blocking)) { + auto res = FindFirstNonEmptySingleShard(trans, req_obj_type, func); + if (res.ok()) + return res; + else + result = res.status(); + } else { + trans->Schedule(); + result = FindFirstNonEmpty(trans, req_obj_type); + } // If a non-empty key exists, execute the callback immediately if (result.ok()) { @@ -255,7 +298,6 @@ OpResult RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty return OpStatus::OK; }; trans->Execute(std::move(cb), true); - return result_key; } @@ -271,6 +313,7 @@ OpResult RunCbOnFirstNonEmptyBlocking(Transaction* trans, int req_obj_ty return OpStatus::TIMED_OUT; } + DCHECK(trans->IsScheduled()); // single shard optimization didn't forget to schedule VLOG(1) << "Blocking " << trans->DebugId(); // If timeout (limit_ms) is zero, block indefinitely diff --git a/src/server/container_utils.h b/src/server/container_utils.h index dfce2b166..2c035b71d 100644 --- a/src/server/container_utils.h +++ b/src/server/container_utils.h @@ -82,13 +82,6 @@ std::string_view LpGetView(uint8_t* lp_it, uint8_t int_buf[]); // Find value by key and return stringview to it, otherwise nullopt. std::optional LpFind(uint8_t* lp, std::string_view key, uint8_t int_buf[]); -struct ShardFFResult { - PrimeKey key; - ShardId sid = kInvalidSid; -}; - -OpResult FindFirstNonEmptyKey(Transaction* trans, int req_obj_type); - using BlockingResultCb = std::function; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 7dd771cf2..8810b59fe 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -472,23 +472,22 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { /*************************************************************************/ // Actually running the callback. // If you change the logic here, also please change the logic + RunnableResult result; try { - OpStatus status = OpStatus::OK; - // if a transaction is suspended, we still run it because of brpoplpush/blmove case // that needs to run lpush on its suspended shard. - status = (*cb_ptr_)(this, shard); + result = (*cb_ptr_)(this, shard); if (unique_shard_cnt_ == 1) { cb_ptr_ = nullptr; // We can do it because only a single thread runs the callback. - local_result_ = status; + local_result_ = result; } else { - if (status == OpStatus::OUT_OF_MEMORY) { + if (result == OpStatus::OUT_OF_MEMORY) { absl::base_internal::SpinLockHolder lk{&local_result_mu_}; CHECK(local_result_ == OpStatus::OK || local_result_ == OpStatus::OUT_OF_MEMORY); - local_result_ = status; + local_result_ = result; } else { - CHECK_EQ(OpStatus::OK, status); + CHECK_EQ(OpStatus::OK, result); } } } catch (std::bad_alloc&) { @@ -500,15 +499,25 @@ bool Transaction::RunInShard(EngineShard* shard, bool txq_ooo) { } /*************************************************************************/ + // at least the coordinator thread owns the reference. + DCHECK_GE(GetUseCount(), 1u); + + shard->db_slice().OnCbFinish(); + + // Handle result flags to alter behaviour. + if (result.flags & RunnableResult::AVOID_CONCLUDING) { + // Multi shard callbacks should either all or none choose to conclude. Because they can't + // communicate, the must know their decision ahead, consequently there is no point in using this + // flag. + CHECK_EQ(unique_shard_cnt_, 1u); + DCHECK(is_concluding || multi_->concluding); + is_concluding = false; + } // Log to jounrnal only once the command finished running if (is_concluding || (multi_ && multi_->concluding)) LogAutoJournalOnShard(shard); - shard->db_slice().OnCbFinish(); - // at least the coordinator thread owns the reference. - DCHECK_GE(GetUseCount(), 1u); - // If we're the head of tx queue (txq_ooo is false), we remove ourselves upon first invocation // and successive hops are run by continuation_trans_ in engine shard. // Otherwise we can remove ourselves only when we're concluding (so no more hops will follow). @@ -928,6 +937,8 @@ void Transaction::ExecuteAsync() { } void Transaction::Conclude() { + if (!IsScheduled()) + return; auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; Execute(std::move(cb), true); } @@ -963,7 +974,7 @@ void Transaction::EnableAllShards() { sd.local_mask |= ACTIVE; } -void Transaction::RunQuickie(EngineShard* shard) { +Transaction::RunnableResult Transaction::RunQuickie(EngineShard* shard) { DCHECK(!IsAtomicMulti()); DCHECK(shard_data_.size() == 1u || multi_->mode == NON_ATOMIC); DCHECK_NE(unique_shard_id_, kInvalidSid); @@ -976,19 +987,23 @@ void Transaction::RunQuickie(EngineShard* shard) { DCHECK(cb_ptr_) << DebugId() << " " << shard->shard_id(); // Calling the callback in somewhat safe way + RunnableResult result; try { - local_result_ = (*cb_ptr_)(this, shard); + result = (*cb_ptr_)(this, shard); } catch (std::bad_alloc&) { LOG_FIRST_N(ERROR, 16) << " out of memory"; - local_result_ = OpStatus::OUT_OF_MEMORY; + result = OpStatus::OUT_OF_MEMORY; } catch (std::exception& e) { LOG(FATAL) << "Unexpected exception " << e.what(); } + shard->db_slice().OnCbFinish(); - LogAutoJournalOnShard(shard); + + // Handling the result, along with conclusion and journaling, is done by the caller sd.is_armed.store(false, memory_order_relaxed); cb_ptr_ = nullptr; // We can do it because only a single shard runs the callback. + return result; } // runs in coordinator thread. @@ -1030,10 +1045,11 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const { // Runs within a engine shard thread. // Optimized path that schedules and runs transactions out of order if possible. -// Returns true if was eagerly executed, false if it was scheduled into queue. +// Returns true if eagerly executed, false if the callback will be handled by the transaction +// queue. bool Transaction::ScheduleUniqueShard(EngineShard* shard) { DCHECK(!IsAtomicMulti()); - DCHECK_EQ(0u, txid_); + DCHECK_EQ(txid_, 0u); DCHECK(shard_data_.size() == 1u || multi_->mode == NON_ATOMIC); DCHECK_NE(unique_shard_id_, kInvalidSid); @@ -1043,31 +1059,45 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { auto& sd = shard_data_[SidToId(unique_shard_id_)]; DCHECK_EQ(TxQueue::kEnd, sd.pq_pos); - // Fast path - for uncontended keys, just run the callback. - // That applies for single key operations like set, get, lpush etc. - if (shard->db_slice().CheckLock(mode, lock_args) && shard->shard_lock()->Check(mode)) { - RunQuickie(shard); - return true; + bool unlocked_keys = + shard->db_slice().CheckLock(mode, lock_args) && shard->shard_lock()->Check(mode); + bool quick_run = unlocked_keys; + + // Fast path. If none of the keys are locked, we can run briefly atomically on the thread + // without acquiring them at all. + if (quick_run) { + auto result = RunQuickie(shard); + local_result_ = result.status; + + if (result.flags & RunnableResult::AVOID_CONCLUDING) { + // If we want to run again, we have to actually acquire keys, but keep ourselves disarmed + DCHECK_EQ(sd.is_armed, false); + unlocked_keys = false; + } else { + LogAutoJournalOnShard(shard); + } } - // we can do it because only a single thread writes into txid_ and sd. - txid_ = op_seq.fetch_add(1, memory_order_relaxed); - sd.pq_pos = shard->txq()->Insert(this); + // Slow path. Some of the keys are locked, so we schedule on the transaction queue. + if (!unlocked_keys) { + coordinator_state_ |= COORD_SCHED; // safe because single shard + txid_ = op_seq.fetch_add(1, memory_order_relaxed); // - + sd.pq_pos = shard->txq()->Insert(this); - DCHECK_EQ(0, sd.local_mask & KEYLOCK_ACQUIRED); + DCHECK_EQ(sd.local_mask & KEYLOCK_ACQUIRED, 0); + shard->db_slice().Acquire(mode, lock_args); + sd.local_mask |= KEYLOCK_ACQUIRED; - shard->db_slice().Acquire(mode, lock_args); - sd.local_mask |= KEYLOCK_ACQUIRED; + DVLOG(1) << "Rescheduling " << DebugId() << " into TxQueue of size " << shard->txq()->size(); - DVLOG(1) << "Rescheduling " << DebugId() << " into TxQueue of size " << shard->txq()->size(); + // If there are blocked transactons waiting for this tx keys, we will add this transaction + // to the tx-queue (these keys will be contended). This will happen even if the queue was empty. + // In that case we must poll the queue, because there will be no other callback trigerring the + // queue before us. + shard->PollExecution("schedule_unique", nullptr); + } - // If there are blocked transactons waiting for this tx keys, we will add this transaction - // to the tx-queue (these keys will be contended). This will happen even if the queue was empty. - // In that case we must poll the queue, because there will be no other callback trigerring the - // queue before us. - shard->PollExecution("schedule_unique", nullptr); - - return false; + return quick_run; } // This function should not block since it's run via RunBriefInParallel. @@ -1303,11 +1333,14 @@ void Transaction::ExpireShardCb(ArgSlice wkeys, EngineShard* shard) { OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) { DCHECK(multi_ && multi_->role == SQUASHED_STUB); DCHECK_EQ(unique_shard_cnt_, 1u); + auto* shard = EngineShard::tlocal(); - auto status = cb(this, shard); + auto result = cb(this, shard); shard->db_slice().OnCbFinish(); LogAutoJournalOnShard(shard); - return status; + + DCHECK_EQ(result.flags, 0); // if it's sophisticated, we shouldn't squash it + return result; } void Transaction::UnlockMultiShardCb(const KeyList& sharded_keys, EngineShard* shard, diff --git a/src/server/transaction.h b/src/server/transaction.h index 66d30f2be..e7f3a7d7a 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -85,7 +85,6 @@ using facade::OpStatus; // }) // // ``` - class Transaction { friend class BlockingController; @@ -106,9 +105,32 @@ class Transaction { } public: + // Result returned by callbacks. Most should use the implcit conversion from OpStatus. + struct RunnableResult { + enum Flag : uint16_t { + // Can be issued by a **single** shard callback to avoid concluding, i.e. perform one more hop + // even if not requested ahead. Used for blocking command fallback. + AVOID_CONCLUDING = 1, + }; + + RunnableResult(OpStatus status = OpStatus::OK, uint16_t flags = 0) + : status(status), flags(flags) { + } + + operator OpStatus() const { + return status; + } + + OpStatus status; + uint16_t flags; + }; + + static_assert(sizeof(RunnableResult) == 4); + using time_point = ::std::chrono::steady_clock::time_point; // Runnable that is run on shards during hop executions (often named callback). - using RunnableType = absl::FunctionRef; + // Callacks should return `OpStatus` which is implicitly converitble to `RunnableResult`! + using RunnableType = absl::FunctionRef; // Provides keys to block on for specific shard. using WaitKeysProvider = std::function; @@ -175,7 +197,7 @@ class Transaction { // Can be used only for single key invocations, because it writes a into shared variable. template auto ScheduleSingleHopT(F&& f) -> decltype(f(this, nullptr)); - // Conclude transaction + // Conclude transaction. Ignored if not scheduled void Conclude(); // Called by engine shard to execute a transaction hop. @@ -278,6 +300,10 @@ class Transaction { return bool(multi_); } + bool IsScheduled() const { + return coordinator_state_ & COORD_SCHED; + } + MultiMode GetMultiMode() const { return multi_->mode; } @@ -455,7 +481,7 @@ class Transaction { std::pair ScheduleInShard(EngineShard* shard); // Optimized version of RunInShard for single shard uncontended cases. - void RunQuickie(EngineShard* shard); + RunnableResult RunQuickie(EngineShard* shard); void ExecuteAsync();