diff --git a/src/server/command_registry.cc b/src/server/command_registry.cc index ece7e7d63..d52dc84d9 100644 --- a/src/server/command_registry.cc +++ b/src/server/command_registry.cc @@ -111,8 +111,8 @@ const char* OptName(CO::CommandOpt fl) { return "blocking"; case GLOBAL_TRANS: return "global-trans"; - case DESTINATION_KEY: - return "dest-key"; + case VARIADIC_KEYS: + return "variadic-keys"; } return "unknown"; } diff --git a/src/server/command_registry.h b/src/server/command_registry.h index 1bd6a15f2..439edf9af 100644 --- a/src/server/command_registry.h +++ b/src/server/command_registry.h @@ -30,7 +30,9 @@ enum CommandOpt : uint32_t { NOSCRIPT = 0x100, BLOCKING = 0x200, // implies REVERSE_MAPPING GLOBAL_TRANS = 0x1000, - DESTINATION_KEY = 0x2000, + + // arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc. + VARIADIC_KEYS = 0x2000, }; const char* OptName(CommandOpt fl); @@ -85,7 +87,7 @@ class CommandId { } bool is_multi_key() const { - return (last_key_ != first_key_) || (opt_mask_ & CO::DESTINATION_KEY); + return (last_key_ != first_key_) || (opt_mask_ & CO::VARIADIC_KEYS); } int8_t key_arg_step() const { diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 81f9069c0..92d654898 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -24,10 +24,10 @@ namespace dfly { using namespace std; using namespace util; +using absl::StrCat; using ::io::Result; using testing::ElementsAre; using testing::HasSubstr; -using absl::StrCat; namespace this_fiber = boost::this_fiber; namespace { @@ -286,6 +286,11 @@ TEST_F(DflyEngineTest, Eval) { resp = Run({"eval", "return 77", "2", "foo", "zoo"}); EXPECT_THAT(resp, IntArg(77)); + + // a,b important here to spawn multiple shards. + resp = Run({"eval", "return redis.call('exists', KEYS[2])", "2", "a", "b"}); + EXPECT_EQ(2, GetDebugInfo().shards_count); + EXPECT_THAT(resp, IntArg(0)); } TEST_F(DflyEngineTest, EvalResp) { @@ -297,23 +302,40 @@ TEST_F(DflyEngineTest, EvalResp) { EXPECT_THAT(resp.GetVec(), ElementsAre(IntArg(5), "foo", "17.5")); } -TEST_F(DflyEngineTest, Hello) { - auto resp = Run({"hello"}); - ASSERT_THAT(resp, ArrLen(12)); - resp = Run({"hello", "2"}); - ASSERT_THAT(resp, ArrLen(12)); +TEST_F(DflyEngineTest, EvalPublish) { + auto resp = pp_->at(1)->Await([&] { return Run({"subscribe", "foo"}); }); + EXPECT_THAT(resp, ArrLen(3)); - EXPECT_THAT(resp.GetVec(), ElementsAre("server", "redis", "version", ArgType(RespExpr::STRING), - "proto", IntArg(2), "id", ArgType(RespExpr::INT64), "mode", - "standalone", "role", "master")); + resp = Run({"eval", "return redis.call('publish', 'foo', 'bar')", "0"}); + EXPECT_THAT(resp, IntArg(1)); +} - // These are valid arguments to HELLO, however as they are not yet supported the implementation - // is degraded to 'unknown command'. - EXPECT_THAT(Run({"hello", "3"}), - ErrArg("ERR unknown command 'HELLO' with args beginning with: `3`")); - EXPECT_THAT( - Run({"hello", "2", "AUTH", "uname", "pwd"}), - ErrArg("ERR unknown command 'HELLO' with args beginning with: `2`, `AUTH`, `uname`, `pwd`")); +TEST_F(DflyEngineTest, EvalBug59) { + auto resp = Run({"eval", R"( +local epoch +if redis.call('exists', KEYS[2]) ~= 0 then + epoch = redis.call("hget", KEYS[2], "e") +end +if epoch == false or epoch == nil then + epoch = ARGV[6] + redis.call("hset", KEYS[2], "e", epoch) +end +local offset = redis.call("hincrby", KEYS[2], "s", 1) +if ARGV[5] ~= '0' then + redis.call("expire", KEYS[2], ARGV[5]) +end +redis.call("xadd", KEYS[1], "MAXLEN", ARGV[2], offset, "d", ARGV[1]) +redis.call("expire", KEYS[1], ARGV[3]) +if ARGV[4] ~= '' then + local payload = "__" .. "p1:" .. offset .. ":" .. epoch .. "__" .. ARGV[1] + redis.call("publish", ARGV[4], payload) +end + +return {offset, epoch} + )", + "2", "x", "y", "1", "2", "3", "4", "5", "6"}); + ASSERT_THAT(resp, ArrLen(2)); + EXPECT_THAT(resp.GetVec(), ElementsAre(IntArg(1), "6")); } TEST_F(DflyEngineTest, EvalSha) { @@ -339,6 +361,25 @@ TEST_F(DflyEngineTest, EvalSha) { EXPECT_THAT(resp, "c6459b95a0e81df97af6fdd49b1a9e0287a57363"); } +TEST_F(DflyEngineTest, Hello) { + auto resp = Run({"hello"}); + ASSERT_THAT(resp, ArrLen(12)); + resp = Run({"hello", "2"}); + ASSERT_THAT(resp, ArrLen(12)); + + EXPECT_THAT(resp.GetVec(), ElementsAre("server", "redis", "version", ArgType(RespExpr::STRING), + "proto", IntArg(2), "id", ArgType(RespExpr::INT64), "mode", + "standalone", "role", "master")); + + // These are valid arguments to HELLO, however as they are not yet supported the implementation + // is degraded to 'unknown command'. + EXPECT_THAT(Run({"hello", "3"}), + ErrArg("ERR unknown command 'HELLO' with args beginning with: `3`")); + EXPECT_THAT( + Run({"hello", "2", "AUTH", "uname", "pwd"}), + ErrArg("ERR unknown command 'HELLO' with args beginning with: `2`, `AUTH`, `uname`, `pwd`")); +} + TEST_F(DflyEngineTest, Memcache) { using MP = MemcacheParser; @@ -437,9 +478,7 @@ TEST_F(DflyEngineTest, OOM) { } TEST_F(DflyEngineTest, PSubscribe) { - auto resp = pp_->at(1)->Await([&] { - return Run({"psubscribe", "a*", "b*"}); - }); + auto resp = pp_->at(1)->Await([&] { return Run({"psubscribe", "a*", "b*"}); }); EXPECT_THAT(resp, ArrLen(3)); resp = pp_->at(0)->Await([&] { return Run({"publish", "ab", "foo"}); }); EXPECT_THAT(resp, IntArg(1)); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 07fc8e8ee..d28cb914c 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -511,21 +511,23 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) if (under_script) { DCHECK(dfly_cntx->transaction); - OpResult key_index_res = DetermineKeys(cid, args); - if (!key_index_res) - return (*cntx)->SendError(key_index_res.status()); + if (IsTransactional(cid)) { + OpResult key_index_res = DetermineKeys(cid, args); + if (!key_index_res) + return (*cntx)->SendError(key_index_res.status()); - const auto& key_index = *key_index_res; - for (unsigned i = key_index.start; i < key_index.end; ++i) { - string_view key = ArgS(args, i); - if (!dfly_cntx->conn_state.script_info->keys.contains(key)) { - return (*cntx)->SendError("script tried accessing undeclared key"); + const auto& key_index = *key_index_res; + for (unsigned i = key_index.start; i < key_index.end; ++i) { + string_view key = ArgS(args, i); + if (!dfly_cntx->conn_state.script_info->keys.contains(key)) { + return (*cntx)->SendError("script tried accessing undeclared key"); + } } + dfly_cntx->transaction->SetExecCmd(cid); + OpStatus st = dfly_cntx->transaction->InitByArgs(dfly_cntx->conn_state.db_index, args); + if (st != OpStatus::OK) + return (*cntx)->SendError(st); } - dfly_cntx->transaction->SetExecCmd(cid); - OpStatus st = dfly_cntx->transaction->InitByArgs(dfly_cntx->conn_state.db_index, args); - if (st != OpStatus::OK) - return (*cntx)->SendError(st); } else { DCHECK(dfly_cntx->transaction == nullptr); @@ -1081,18 +1083,21 @@ void Service::RegisterCommands() { constexpr auto kExecMask = CO::LOADING | CO::NOSCRIPT | CO::GLOBAL_TRANS; - registry_ << CI{"QUIT", CO::READONLY | CO::FAST, 1, 0, 0, 0}.HFUNC(Quit) - << CI{"MULTI", CO::NOSCRIPT | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(Multi) - << CI{"DISCARD", CO::NOSCRIPT | CO::FAST | CO::LOADING, 1, 0, 0, 0}.MFUNC(Discard) - << CI{"EVAL", CO::NOSCRIPT, -3, 3, 3, 1}.MFUNC(Eval).SetValidator(&EvalValidator) - << CI{"EVALSHA", CO::NOSCRIPT, -3, 3, 3, 1}.MFUNC(EvalSha).SetValidator(&EvalValidator) - << CI{"EXEC", kExecMask, 1, 0, 0, 0}.MFUNC(Exec) - << CI{"PUBLISH", CO::LOADING | CO::FAST, 3, 0, 0, 0}.MFUNC(Publish) - << CI{"SUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.MFUNC(Subscribe) - << CI{"UNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -1, 0, 0, 0}.MFUNC(Unsubscribe) - << CI{"PSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.MFUNC(PSubscribe) - << CI{"PUNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -1, 0, 0, 0}.MFUNC(PUnsubscribe) - << CI{"FUNCTION", CO::NOSCRIPT, 2, 0, 0, 0}.MFUNC(Function); + registry_ + << CI{"QUIT", CO::READONLY | CO::FAST, 1, 0, 0, 0}.HFUNC(Quit) + << CI{"MULTI", CO::NOSCRIPT | CO::FAST | CO::LOADING, 1, 0, 0, 0}.HFUNC(Multi) + << CI{"DISCARD", CO::NOSCRIPT | CO::FAST | CO::LOADING, 1, 0, 0, 0}.MFUNC(Discard) + << CI{"EVAL", CO::NOSCRIPT | CO::VARIADIC_KEYS, -3, 3, 3, 1}.MFUNC(Eval).SetValidator( + &EvalValidator) + << CI{"EVALSHA", CO::NOSCRIPT | CO::VARIADIC_KEYS, -3, 3, 3, 1}.MFUNC(EvalSha).SetValidator( + &EvalValidator) + << CI{"EXEC", kExecMask, 1, 0, 0, 0}.MFUNC(Exec) + << CI{"PUBLISH", CO::LOADING | CO::FAST, 3, 0, 0, 0}.MFUNC(Publish) + << CI{"SUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.MFUNC(Subscribe) + << CI{"UNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -1, 0, 0, 0}.MFUNC(Unsubscribe) + << CI{"PSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -2, 0, 0, 0}.MFUNC(PSubscribe) + << CI{"PUNSUBSCRIBE", CO::NOSCRIPT | CO::LOADING, -1, 0, 0, 0}.MFUNC(PUnsubscribe) + << CI{"FUNCTION", CO::NOSCRIPT, 2, 0, 0, 0}.MFUNC(Function); StreamFamily::Register(®istry_); StringFamily::Register(®istry_); @@ -1116,6 +1121,13 @@ void Service::RegisterCommands() { LOG(INFO) << " " << key << ": with " << key_len << " keys"; } }); + + LOG(INFO) << "Non-transactional commands are: "; + registry_.Traverse([](std::string_view name, const CI& cid) { + if (!IsTransactional(&cid)) { + LOG(INFO) << " " << name; + } + }); } } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 786302a21..950126729 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -16,6 +16,7 @@ namespace dfly { using namespace std; using namespace util; +using absl::StrCat; thread_local Transaction::TLTmpSpace Transaction::tmp_space; @@ -51,7 +52,8 @@ Transaction::Transaction(const CommandId* cid) : cid_(cid) { } Transaction::~Transaction() { - DVLOG(2) << "Transaction " << DebugId() << " destroyed"; + DVLOG(2) << "Transaction " << StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, ")") + << " destroyed"; } /** @@ -286,7 +288,7 @@ void Transaction::SetExecCmd(const CommandId* cid) { string Transaction::DebugId() const { DCHECK_GT(use_count_.load(memory_order_relaxed), 0u); - return absl::StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, " (", trans_id(this), ")"); + return StrCat(Name(), "@", txid_, "/", unique_shard_cnt_, " (", trans_id(this), ")"); } // Runs in the dbslice thread. Returns true if transaction needs to be kept in the queue. @@ -496,9 +498,7 @@ void Transaction::ScheduleInternal() { if (!is_active(i)) continue; - shard_set->Add(i, [] { - EngineShard::tlocal()->PollExecution("cancel_cleanup", nullptr); - }); + shard_set->Add(i, [] { EngineShard::tlocal()->PollExecution("cancel_cleanup", nullptr); }); } } } @@ -1156,14 +1156,19 @@ OpResult DetermineKeys(const CommandId* cid, CmdArgList args) { DCHECK_EQ(0u, cid->opt_mask() & CO::GLOBAL_TRANS); KeyIndex key_index; + int num_custom_keys = -1; - if (cid->opt_mask() & CO::DESTINATION_KEY) { - key_index.bonus = 1; + if (cid->opt_mask() & CO::VARIADIC_KEYS) { if (args.size() < 3) { return OpStatus::SYNTAX_ERR; } + string_view name{cid->name()}; + + if (!absl::StartsWith(name, "EVAL")) { + key_index.bonus = 1; // ZSTORE commands + } string_view num(ArgS(args, 2)); if (!absl::SimpleAtoi(num, &num_custom_keys) || num_custom_keys < 0) return OpStatus::INVALID_INT; @@ -1185,20 +1190,7 @@ OpResult DetermineKeys(const CommandId* cid, CmdArgList args) { return key_index; } - string_view name{cid->name()}; - if (name == "EVAL" || name == "EVALSHA") { - DCHECK_GE(args.size(), 3u); - uint32_t num_keys; - - CHECK(absl::SimpleAtoi(ArgS(args, 2), &num_keys)); - key_index.start = 3; - key_index.end = 3 + num_keys; - key_index.step = 1; - - return key_index; - } - - LOG(FATAL) << "TBD: Not supported"; + LOG(FATAL) << "TBD: Not supported " << cid->name(); return key_index; } diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index 132b27fef..2630a5f2b 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -1875,7 +1875,7 @@ OpResult ZSetFamily::OpLexCount(const OpArgs& op_args, string_view key #define HFUNC(x) SetHandler(&ZSetFamily::x) void ZSetFamily::Register(CommandRegistry* registry) { - constexpr uint32_t kUnionMask = CO::WRITE | CO::DESTINATION_KEY | CO::REVERSE_MAPPING; + constexpr uint32_t kUnionMask = CO::WRITE | CO::VARIADIC_KEYS | CO::REVERSE_MAPPING; *registry << CI{"ZADD", CO::FAST | CO::WRITE | CO::DENYOOM, -4, 1, 1, 1}.HFUNC(ZAdd) << CI{"ZCARD", CO::FAST | CO::READONLY, 2, 1, 1, 1}.HFUNC(ZCard)