mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
feat(server): Allow admin commands in multi (#722)
Needed for sentinel support (#706) Signed-off-by: ashotland <ari@dragonflydb.io> Signed-off-by: ashotland <ari@dragonflydb.io>
This commit is contained in:
parent
cdafaa78c0
commit
59bfecad69
5 changed files with 51 additions and 22 deletions
|
@ -5,6 +5,7 @@
|
|||
#include "server/conn_context.h"
|
||||
|
||||
#include "base/logging.h"
|
||||
#include "server/command_registry.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/server_family.h"
|
||||
#include "server/server_state.h"
|
||||
|
@ -15,6 +16,20 @@ namespace dfly {
|
|||
|
||||
using namespace std;
|
||||
|
||||
StoredCmd::StoredCmd(const CommandId* d, CmdArgList args) : descr(d) {
|
||||
stored_args_.reserve(args.size());
|
||||
arg_vec_.resize(args.size());
|
||||
for (size_t i = 0; i < args.size(); ++i) {
|
||||
stored_args_.emplace_back(ArgS(args, i));
|
||||
arg_vec_[i] = MutableSlice{stored_args_[i].data(), stored_args_[i].size()};
|
||||
}
|
||||
arg_list_ = {arg_vec_.data(), arg_vec_.size()};
|
||||
}
|
||||
|
||||
void StoredCmd::Invoke(ConnectionContext* ctx) {
|
||||
descr->Invoke(arg_list_, ctx);
|
||||
}
|
||||
|
||||
void ConnectionContext::SendMonitorMsg(std::string msg) {
|
||||
CHECK(owner());
|
||||
|
||||
|
|
|
@ -13,13 +13,24 @@
|
|||
namespace dfly {
|
||||
|
||||
class EngineShardSet;
|
||||
class ConnectionContext;
|
||||
|
||||
struct StoredCmd {
|
||||
const CommandId* descr;
|
||||
std::vector<std::string> cmd;
|
||||
|
||||
StoredCmd(const CommandId* d = nullptr) : descr(d) {
|
||||
private:
|
||||
std::vector<std::string> stored_args_;
|
||||
CmdArgVec arg_vec_;
|
||||
CmdArgList arg_list_;
|
||||
|
||||
public:
|
||||
StoredCmd(const CommandId* d, CmdArgList args);
|
||||
|
||||
CmdArgList ArgList() const {
|
||||
return arg_list_;
|
||||
}
|
||||
|
||||
void Invoke(ConnectionContext* ctx);
|
||||
};
|
||||
|
||||
struct ConnectionState {
|
||||
|
|
|
@ -155,6 +155,25 @@ TEST_F(DflyEngineTest, Multi) {
|
|||
ASSERT_FALSE(service_->IsShardSetLocked());
|
||||
}
|
||||
|
||||
TEST_F(DflyEngineTest, MultiGlobalCommands) {
|
||||
ASSERT_THAT(Run({"set", "key", "val"}), "OK");
|
||||
|
||||
ASSERT_THAT(Run({"multi"}), "OK");
|
||||
ASSERT_THAT(Run({"move", "key", "2"}), "QUEUED");
|
||||
ASSERT_THAT(Run({"save"}), "QUEUED");
|
||||
|
||||
RespExpr resp = Run({"exec"});
|
||||
ASSERT_THAT(resp, ArrLen(2));
|
||||
|
||||
ASSERT_THAT(Run({"get", "key"}), ArgType(RespExpr::NIL));
|
||||
|
||||
ASSERT_THAT(Run({"select", "2"}), "OK");
|
||||
ASSERT_THAT(Run({"get", "key"}), "val");
|
||||
|
||||
ASSERT_FALSE(service_->IsLocked(0, "key"));
|
||||
ASSERT_FALSE(service_->IsLocked(2, "key"));
|
||||
}
|
||||
|
||||
TEST_F(DflyEngineTest, HitMissStats) {
|
||||
RespExpr resp = Run({"set", "Key1", "VAL"});
|
||||
ASSERT_EQ(resp, "OK");
|
||||
|
|
|
@ -597,11 +597,6 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
|
|||
}
|
||||
|
||||
if (under_multi) {
|
||||
if (cid->opt_mask() & CO::ADMIN) {
|
||||
(*cntx)->SendError("Can not run admin commands under transactions");
|
||||
return;
|
||||
}
|
||||
|
||||
if (cmd_name == "SELECT") {
|
||||
(*cntx)->SendError("Can not call SELECT within a transaction");
|
||||
return;
|
||||
|
@ -628,11 +623,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
|
|||
return (*cntx)->SendError(error);
|
||||
}
|
||||
// TODO: protect against aggregating huge transactions.
|
||||
StoredCmd stored_cmd{cid};
|
||||
stored_cmd.cmd.reserve(args.size());
|
||||
for (size_t i = 0; i < args.size(); ++i) {
|
||||
stored_cmd.cmd.emplace_back(ArgS(args, i));
|
||||
}
|
||||
StoredCmd stored_cmd{cid, args};
|
||||
dfly_cntx->conn_state.exec_info.body.push_back(std::move(stored_cmd));
|
||||
|
||||
return (*cntx)->SendSimpleString("QUEUED");
|
||||
|
@ -1148,22 +1139,15 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
|
|||
CmdArgVec str_list;
|
||||
|
||||
for (auto& scmd : exec_info.body) {
|
||||
str_list.resize(scmd.cmd.size());
|
||||
for (size_t i = 0; i < scmd.cmd.size(); ++i) {
|
||||
string& s = scmd.cmd[i];
|
||||
str_list[i] = MutableSlice{s.data(), s.size()};
|
||||
}
|
||||
|
||||
cntx->transaction->SetExecCmd(scmd.descr);
|
||||
CmdArgList cmd_arg_list{str_list.data(), str_list.size()};
|
||||
if (IsTransactional(scmd.descr)) {
|
||||
OpStatus st = cntx->transaction->InitByArgs(cntx->conn_state.db_index, cmd_arg_list);
|
||||
OpStatus st = cntx->transaction->InitByArgs(cntx->conn_state.db_index, scmd.ArgList());
|
||||
if (st != OpStatus::OK) {
|
||||
(*cntx)->SendError(st);
|
||||
break;
|
||||
}
|
||||
}
|
||||
scmd.descr->Invoke(cmd_arg_list, cntx);
|
||||
scmd.Invoke(cntx);
|
||||
if (rb->GetError()) // checks for i/o error, not logical error.
|
||||
break;
|
||||
}
|
||||
|
|
|
@ -335,7 +335,7 @@ bool Transaction::RunInShard(EngineShard* shard) {
|
|||
|
||||
// We make sure that we lock exactly once for each (multi-hop) transaction inside
|
||||
// transactions that lock incrementally.
|
||||
if (incremental_lock && ((sd.local_mask & KEYLOCK_ACQUIRED) == 0)) {
|
||||
if (!IsGlobal() && incremental_lock && ((sd.local_mask & KEYLOCK_ACQUIRED) == 0)) {
|
||||
DCHECK(!awaked_prerun); // we should not have blocking transaction inside multi block.
|
||||
|
||||
sd.local_mask |= KEYLOCK_ACQUIRED;
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue