diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index 095222845..c3834012c 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -5,6 +5,7 @@ #include "server/conn_context.h" #include "base/logging.h" +#include "server/command_registry.h" #include "server/engine_shard_set.h" #include "server/server_family.h" #include "server/server_state.h" @@ -15,6 +16,20 @@ namespace dfly { using namespace std; +StoredCmd::StoredCmd(const CommandId* d, CmdArgList args) : descr(d) { + stored_args_.reserve(args.size()); + arg_vec_.resize(args.size()); + for (size_t i = 0; i < args.size(); ++i) { + stored_args_.emplace_back(ArgS(args, i)); + arg_vec_[i] = MutableSlice{stored_args_[i].data(), stored_args_[i].size()}; + } + arg_list_ = {arg_vec_.data(), arg_vec_.size()}; +} + +void StoredCmd::Invoke(ConnectionContext* ctx) { + descr->Invoke(arg_list_, ctx); +} + void ConnectionContext::SendMonitorMsg(std::string msg) { CHECK(owner()); diff --git a/src/server/conn_context.h b/src/server/conn_context.h index 0a7817bca..786e5079f 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -13,13 +13,24 @@ namespace dfly { class EngineShardSet; +class ConnectionContext; struct StoredCmd { const CommandId* descr; - std::vector cmd; - StoredCmd(const CommandId* d = nullptr) : descr(d) { + private: + std::vector stored_args_; + CmdArgVec arg_vec_; + CmdArgList arg_list_; + + public: + StoredCmd(const CommandId* d, CmdArgList args); + + CmdArgList ArgList() const { + return arg_list_; } + + void Invoke(ConnectionContext* ctx); }; struct ConnectionState { diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 4714059e6..b920646ba 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -155,6 +155,25 @@ TEST_F(DflyEngineTest, Multi) { ASSERT_FALSE(service_->IsShardSetLocked()); } +TEST_F(DflyEngineTest, MultiGlobalCommands) { + ASSERT_THAT(Run({"set", "key", "val"}), "OK"); + + ASSERT_THAT(Run({"multi"}), "OK"); + ASSERT_THAT(Run({"move", "key", "2"}), "QUEUED"); + ASSERT_THAT(Run({"save"}), "QUEUED"); + + RespExpr resp = Run({"exec"}); + ASSERT_THAT(resp, ArrLen(2)); + + ASSERT_THAT(Run({"get", "key"}), ArgType(RespExpr::NIL)); + + ASSERT_THAT(Run({"select", "2"}), "OK"); + ASSERT_THAT(Run({"get", "key"}), "val"); + + ASSERT_FALSE(service_->IsLocked(0, "key")); + ASSERT_FALSE(service_->IsLocked(2, "key")); +} + TEST_F(DflyEngineTest, HitMissStats) { RespExpr resp = Run({"set", "Key1", "VAL"}); ASSERT_EQ(resp, "OK"); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index af2f60cc8..b5e9d5508 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -597,11 +597,6 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) } if (under_multi) { - if (cid->opt_mask() & CO::ADMIN) { - (*cntx)->SendError("Can not run admin commands under transactions"); - return; - } - if (cmd_name == "SELECT") { (*cntx)->SendError("Can not call SELECT within a transaction"); return; @@ -628,11 +623,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) return (*cntx)->SendError(error); } // TODO: protect against aggregating huge transactions. - StoredCmd stored_cmd{cid}; - stored_cmd.cmd.reserve(args.size()); - for (size_t i = 0; i < args.size(); ++i) { - stored_cmd.cmd.emplace_back(ArgS(args, i)); - } + StoredCmd stored_cmd{cid, args}; dfly_cntx->conn_state.exec_info.body.push_back(std::move(stored_cmd)); return (*cntx)->SendSimpleString("QUEUED"); @@ -1148,22 +1139,15 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { CmdArgVec str_list; for (auto& scmd : exec_info.body) { - str_list.resize(scmd.cmd.size()); - for (size_t i = 0; i < scmd.cmd.size(); ++i) { - string& s = scmd.cmd[i]; - str_list[i] = MutableSlice{s.data(), s.size()}; - } - cntx->transaction->SetExecCmd(scmd.descr); - CmdArgList cmd_arg_list{str_list.data(), str_list.size()}; if (IsTransactional(scmd.descr)) { - OpStatus st = cntx->transaction->InitByArgs(cntx->conn_state.db_index, cmd_arg_list); + OpStatus st = cntx->transaction->InitByArgs(cntx->conn_state.db_index, scmd.ArgList()); if (st != OpStatus::OK) { (*cntx)->SendError(st); break; } } - scmd.descr->Invoke(cmd_arg_list, cntx); + scmd.Invoke(cntx); if (rb->GetError()) // checks for i/o error, not logical error. break; } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 913808080..5498cce2c 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -335,7 +335,7 @@ bool Transaction::RunInShard(EngineShard* shard) { // We make sure that we lock exactly once for each (multi-hop) transaction inside // transactions that lock incrementally. - if (incremental_lock && ((sd.local_mask & KEYLOCK_ACQUIRED) == 0)) { + if (!IsGlobal() && incremental_lock && ((sd.local_mask & KEYLOCK_ACQUIRED) == 0)) { DCHECK(!awaked_prerun); // we should not have blocking transaction inside multi block. sd.local_mask |= KEYLOCK_ACQUIRED;