mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
fix(server): Exclude eval from pipeline squashing (#2027)
* fix(server): Exclude eval from pipeline squashing Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io> --------- Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
4e0126d72e
commit
4b387cebe2
3 changed files with 24 additions and 6 deletions
|
@ -1104,7 +1104,14 @@ void Service::DispatchManyCommands(absl::Span<CmdArgList> args_list,
|
|||
const bool is_multi =
|
||||
dfly_cntx->conn_state.exec_info.IsCollecting() || CO::IsTransKind(ArgS(args, 0));
|
||||
|
||||
if (!is_multi && cid != nullptr) {
|
||||
// Generally, executing any multi-transactions (including eval) is not possible because they
|
||||
// might request a stricter multi mode than non-atomic which is used for squashing.
|
||||
// TODO: By allowing promoting non-atomic multit transactions to lock-ahead for specific command
|
||||
// invocations, we can potentially execute multiple eval in parallel, which is very powerful
|
||||
// paired with shardlocal eval
|
||||
const bool is_eval = CO::IsEvalKind(ArgS(args, 0));
|
||||
|
||||
if (!is_multi && !is_eval && cid != nullptr) {
|
||||
stored_cmds.reserve(args_list.size());
|
||||
stored_cmds.emplace_back(cid, tail_args);
|
||||
continue;
|
||||
|
@ -1594,13 +1601,13 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
|
|||
|
||||
DCHECK(!cntx->conn_state.script_info); // we should not call eval from the script.
|
||||
|
||||
optional<ShardId> sid;
|
||||
|
||||
// TODO: to determine whether the script is RO by scanning all "redis.p?call" calls
|
||||
// and checking whether all invocations consist of RO commands.
|
||||
// we can do it once during script insertion into script mgr.
|
||||
auto& sinfo = cntx->conn_state.script_info;
|
||||
sinfo = make_unique<ConnectionState::ScriptInfo>();
|
||||
|
||||
optional<ShardId> sid;
|
||||
for (size_t i = 0; i < eval_args.keys.size(); ++i) {
|
||||
string_view key = ArgS(eval_args.keys, i);
|
||||
sinfo->keys.insert(KeyLockArgs::GetLockKey(key));
|
||||
|
@ -1620,7 +1627,11 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
|
|||
|
||||
interpreter->SetGlobalArray("KEYS", eval_args.keys);
|
||||
interpreter->SetGlobalArray("ARGV", eval_args.args);
|
||||
absl::Cleanup clean = [interpreter]() { interpreter->ResetStack(); };
|
||||
|
||||
absl::Cleanup clean = [interpreter, &sinfo]() {
|
||||
interpreter->ResetStack();
|
||||
sinfo.reset();
|
||||
};
|
||||
|
||||
Interpreter::RunResult result;
|
||||
|
||||
|
@ -1666,8 +1677,6 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
|
|||
cntx->transaction->UnlockMulti();
|
||||
}
|
||||
|
||||
cntx->conn_state.script_info.reset(); // reset script_info
|
||||
|
||||
if (result == Interpreter::RUN_ERR) {
|
||||
string resp = StrCat("Error running script (call to ", eval_args.sha, "): ", error);
|
||||
return (*cntx)->SendError(resp, facade::kScriptErrType);
|
||||
|
|
|
@ -58,6 +58,7 @@ MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(Shar
|
|||
sinfo.local_tx = new Transaction{base_cid_};
|
||||
sinfo.local_tx->StartMultiNonAtomic();
|
||||
}
|
||||
num_shards_++;
|
||||
}
|
||||
|
||||
return sinfo;
|
||||
|
@ -100,6 +101,8 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm
|
|||
sinfo.cmds.push_back(cmd);
|
||||
order_.push_back(last_sid);
|
||||
|
||||
num_squashed_++;
|
||||
|
||||
// Because the squashed hop is currently blocking, we cannot add more than the max channel size,
|
||||
// otherwise a deadlock occurs.
|
||||
bool need_flush = sinfo.cmds.size() >= kMaxSquashing - 1;
|
||||
|
@ -260,6 +263,9 @@ void MultiCommandSquasher::Run() {
|
|||
sd.local_tx->UnlockMulti();
|
||||
}
|
||||
}
|
||||
|
||||
VLOG(1) << "Squashed " << num_squashed_ << " of " << cmds_.size()
|
||||
<< " commands, max fanout: " << num_shards_ << ", atomic: " << atomic_;
|
||||
}
|
||||
|
||||
bool MultiCommandSquasher::IsAtomic() const {
|
||||
|
|
|
@ -83,6 +83,9 @@ class MultiCommandSquasher {
|
|||
std::vector<ShardExecInfo> sharded_;
|
||||
std::vector<ShardId> order_; // reply order for squashed cmds
|
||||
|
||||
size_t num_squashed_ = 0;
|
||||
size_t num_shards_ = 0;
|
||||
|
||||
std::vector<MutableSlice> tmp_keylist_;
|
||||
};
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue