mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
fix: slot calculation during transaction squashing (#4460)
This commit is contained in:
parent
5ba608b58d
commit
0e116b1535
5 changed files with 19 additions and 22 deletions
|
@ -9,6 +9,7 @@ extern "C" {
|
|||
#include "base/flags.h"
|
||||
#include "base/logging.h"
|
||||
#include "cluster_support.h"
|
||||
#include "common.h"
|
||||
|
||||
using namespace std;
|
||||
|
||||
|
@ -31,22 +32,15 @@ void UniqueSlotChecker::Add(SlotId slot_id) {
|
|||
return;
|
||||
}
|
||||
|
||||
if (!slot_id_.has_value()) {
|
||||
if (slot_id_ == kNoSlotId) {
|
||||
slot_id_ = slot_id;
|
||||
return;
|
||||
}
|
||||
|
||||
if (*slot_id_ != slot_id) {
|
||||
} else if (slot_id_ != slot_id) {
|
||||
slot_id_ = kInvalidSlotId;
|
||||
}
|
||||
}
|
||||
|
||||
optional<SlotId> UniqueSlotChecker::GetUniqueSlotId() const {
|
||||
if (slot_id_.has_value() && *slot_id_ == kInvalidSlotId) {
|
||||
return nullopt;
|
||||
}
|
||||
|
||||
return slot_id_;
|
||||
return slot_id_ > kMaxSlotNum ? optional<SlotId>() : slot_id_;
|
||||
}
|
||||
|
||||
namespace {
|
||||
|
|
|
@ -8,14 +8,10 @@
|
|||
#include <optional>
|
||||
#include <string_view>
|
||||
|
||||
#include "common.h"
|
||||
|
||||
namespace dfly {
|
||||
|
||||
using SlotId = std::uint16_t;
|
||||
|
||||
constexpr SlotId kMaxSlotNum = 0x3FFF;
|
||||
constexpr SlotId kInvalidSlotId = kMaxSlotNum + 1;
|
||||
|
||||
// A simple utility class that "aggregates" SlotId-s and can tell whether all inputs were the same.
|
||||
// Only works when cluster is enabled.
|
||||
|
@ -26,8 +22,17 @@ class UniqueSlotChecker {
|
|||
|
||||
std::optional<SlotId> GetUniqueSlotId() const;
|
||||
|
||||
void Reset() {
|
||||
slot_id_ = kNoSlotId;
|
||||
}
|
||||
|
||||
private:
|
||||
std::optional<SlotId> slot_id_;
|
||||
// kNoSlotId - if slot wasn't set at all
|
||||
static constexpr SlotId kNoSlotId = kMaxSlotNum + 1;
|
||||
// kInvalidSlotId - if several different slots were set
|
||||
static constexpr SlotId kInvalidSlotId = kNoSlotId + 1;
|
||||
|
||||
SlotId slot_id_ = kNoSlotId;
|
||||
};
|
||||
|
||||
SlotId KeySlot(std::string_view key);
|
||||
|
|
|
@ -77,15 +77,14 @@ MultiCommandSquasher::MultiCommandSquasher(absl::Span<StoredCmd> cmds, Connectio
|
|||
atomic_ = mode != Transaction::NON_ATOMIC;
|
||||
}
|
||||
|
||||
MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(
|
||||
ShardId sid, optional<SlotId> slot_id) {
|
||||
MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(ShardId sid) {
|
||||
if (sharded_.empty())
|
||||
sharded_.resize(shard_set->size());
|
||||
|
||||
auto& sinfo = sharded_[sid];
|
||||
if (!sinfo.local_tx) {
|
||||
if (IsAtomic()) {
|
||||
sinfo.local_tx = new Transaction{cntx_->transaction, sid, slot_id};
|
||||
sinfo.local_tx = new Transaction{cntx_->transaction, sid, nullopt};
|
||||
} else {
|
||||
// Non-atomic squashing does not use the transactional framework for fan out, so local
|
||||
// transactions have to be fully standalone, check locks and release them immediately.
|
||||
|
@ -121,11 +120,9 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm
|
|||
return SquashResult::NOT_SQUASHED;
|
||||
|
||||
// Check if all commands belong to one shard
|
||||
UniqueSlotChecker slot_checker;
|
||||
ShardId last_sid = kInvalidSid;
|
||||
|
||||
for (string_view key : keys->Range(args)) {
|
||||
slot_checker.Add(key);
|
||||
ShardId sid = Shard(key, shard_set->size());
|
||||
if (last_sid == kInvalidSid || last_sid == sid)
|
||||
last_sid = sid;
|
||||
|
@ -133,7 +130,7 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm
|
|||
return SquashResult::NOT_SQUASHED; // at least two shards
|
||||
}
|
||||
|
||||
auto& sinfo = PrepareShardInfo(last_sid, slot_checker.GetUniqueSlotId());
|
||||
auto& sinfo = PrepareShardInfo(last_sid);
|
||||
|
||||
sinfo.cmds.push_back(cmd);
|
||||
order_.push_back(last_sid);
|
||||
|
|
|
@ -52,7 +52,7 @@ class MultiCommandSquasher {
|
|||
bool verify_commands, bool error_abort);
|
||||
|
||||
// Lazy initialize shard info.
|
||||
ShardExecInfo& PrepareShardInfo(ShardId sid, std::optional<SlotId> slot_id);
|
||||
ShardExecInfo& PrepareShardInfo(ShardId sid);
|
||||
|
||||
// Retrun squash flags
|
||||
SquashResult TrySquash(StoredCmd* cmd);
|
||||
|
|
|
@ -325,6 +325,7 @@ void Transaction::InitByKeys(const KeyIndex& key_index) {
|
|||
// Stub transactions always operate only on single shard.
|
||||
bool is_stub = multi_ && multi_->role == SQUASHED_STUB;
|
||||
|
||||
unique_slot_checker_.Reset();
|
||||
if ((key_index.NumArgs() == 1 && !IsAtomicMulti()) || is_stub) {
|
||||
DCHECK(!IsActiveMulti() || multi_->mode == NON_ATOMIC);
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue