From d3764efbcab91b1b0616890d7cdfc6109228ebf4 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Wed, 27 Apr 2022 23:50:03 +0300 Subject: [PATCH] Add CONFIG RESETSTAT command. Start working on RPOPLPUSH --- src/facade/dragonfly_connection.cc | 2 +- src/facade/facade.cc | 8 +++--- src/facade/facade_types.h | 4 +-- src/server/list_family.cc | 46 +++++++++++++++++++++++++++++- src/server/list_family.h | 3 ++ src/server/main_service.cc | 2 +- src/server/server_family.cc | 13 +++++++-- tests/gen_sets.sh | 6 ++++ 8 files changed, 73 insertions(+), 11 deletions(-) create mode 100755 tests/gen_sets.sh diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 20dcc3c36..0730d4b7f 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -57,7 +57,7 @@ void FetchBuilderStats(ConnectionStats* stats, SinkReplyBuilder* builder) { stats->io_write_bytes += builder->io_write_bytes(); for (const auto& k_v : builder->err_count()) { - stats->err_count[k_v.first] += k_v.second; + stats->err_count_map[k_v.first] += k_v.second; } builder->reset_io_stats(); } diff --git a/src/facade/facade.cc b/src/facade/facade.cc index 701f727a5..d25a83d35 100644 --- a/src/facade/facade.cc +++ b/src/facade/facade.cc @@ -36,12 +36,12 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { ADD(num_replicas); ADD(num_blocked_clients); - for (const auto& k_v : o.err_count) { - err_count[k_v.first] += k_v.second; + for (const auto& k_v : o.err_count_map) { + err_count_map[k_v.first] += k_v.second; } - for (const auto& k_v : o.cmd_count) { - cmd_count[k_v.first] += k_v.second; + for (const auto& k_v : o.cmd_count_map) { + cmd_count_map[k_v.first] += k_v.second; } return *this; diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h index 614d53d46..9555be9e3 100644 --- a/src/facade/facade_types.h +++ b/src/facade/facade_types.h @@ -20,8 +20,8 @@ using CmdArgVec = std::vector; struct ConnectionStats { - absl::flat_hash_map err_count; - absl::flat_hash_map cmd_count; + absl::flat_hash_map err_count_map; + absl::flat_hash_map cmd_count_map; size_t read_buf_capacity = 0; size_t io_read_cnt = 0; diff --git a/src/server/list_family.cc b/src/server/list_family.cc index e67a35392..1563cf984 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -11,7 +11,6 @@ extern "C" { #include #include "base/logging.h" - #include "server/blocking_controller.h" #include "server/command_registry.h" #include "server/conn_context.h" @@ -238,6 +237,36 @@ void ListFamily::RPop(CmdArgList args, ConnectionContext* cntx) { return PopGeneric(ListDir::RIGHT, std::move(args), cntx); } +void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) { + string_view src = ArgS(args, 1); + string_view dest = ArgS(args, 2); + + OpResult result; + if (dest == src) { + auto cb = [&](Transaction* t, EngineShard* shard) { + return OpRPopLPushSingleKey(OpArgs{shard, t->db_index()}, src); + }; + + result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); + } else { + return (*cntx)->SendError("tbd: not_implemented"); + } + + if (result) { + return (*cntx)->SendBulkString(*result); + } + + switch (result.status()) { + case OpStatus::KEY_NOTFOUND: + (*cntx)->SendNull(); + break; + + default: + (*cntx)->SendError(result.status()); + break; + } +} + void ListFamily::LLen(CmdArgList args, ConnectionContext* cntx) { auto key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) { @@ -782,6 +811,20 @@ OpResult ListFamily::OpRange(const OpArgs& op_args, std::string_view return str_vec; } +OpResult ListFamily::OpRPopLPushSingleKey(const OpArgs& op_args, std::string_view key) { + auto& db_slice = op_args.shard->db_slice(); + auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_LIST); + if (!it_res) + return it_res.status(); + + PrimeIterator it = *it_res; + quicklist* ql = GetQL(it->second); + db_slice.PreUpdate(op_args.db_ind, it); + string val = ListPop(ListDir::RIGHT, ql); + quicklistPushHead(ql, val.data(), val.size()); + return val; +} + using CI = CommandId; #define HFUNC(x) SetHandler(&ListFamily::x) @@ -793,6 +836,7 @@ void ListFamily::Register(CommandRegistry* registry) { << CI{"RPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPush) << CI{"RPUSHX", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(RPushX) << CI{"RPOP", CO::WRITE | CO::FAST | CO::DENYOOM, -2, 1, 1, 1}.HFUNC(RPop) + << CI{"RPOPLPUSH", CO::WRITE | CO::FAST | CO::DENYOOM, 3, 1, 2, 1}.HFUNC(RPopLPush) << CI{"BLPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BLPop) << CI{"BRPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BRPop) << CI{"LLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(LLen) diff --git a/src/server/list_family.h b/src/server/list_family.h index c9a3be79e..25aaf15fa 100644 --- a/src/server/list_family.h +++ b/src/server/list_family.h @@ -35,6 +35,7 @@ class ListFamily { static void LRange(CmdArgList args, ConnectionContext* cntx); static void LRem(CmdArgList args, ConnectionContext* cntx); static void LSet(CmdArgList args, ConnectionContext* cntx); + static void RPopLPush(CmdArgList args, ConnectionContext* cntx); static void PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx); static void PushGeneric(ListDir dir, bool skip_notexist, CmdArgList args, @@ -61,6 +62,8 @@ class ListFamily { static OpResult OpRange(const OpArgs& op_args, std::string_view key, long start, long end); + + static OpResult OpRPopLPushSingleKey(const OpArgs& op_args, std::string_view key); }; } // namespace dfly diff --git a/src/server/main_service.cc b/src/server/main_service.cc index f874f3a10..c165ebe2c 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -443,7 +443,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) std::move(multi_error).Cancel(); - etl.connection_stats.cmd_count[cmd_name]++; + etl.connection_stats.cmd_count_map[cmd_name]++; if (dfly_cntx->conn_state.exec_state != ConnectionState::EXEC_INACTIVE && !is_trans_cmd) { // TODO: protect against aggregating huge transactions. diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 2394cd5b6..7accd33ff 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -343,6 +343,15 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) { string_view res[2] = {param, "tbd"}; return (*cntx)->SendStringArr(res); + } else if (sub_cmd == "RESETSTAT") { + ess_.pool()->Await([](auto*) { + auto* stats = ServerState::tl_connection_stats(); + stats->cmd_count_map.clear(); + stats->err_count_map.clear(); + stats->command_cnt = 0; + stats->async_writes_cnt = 0; + }); + return (*cntx)->SendOk(); } else { string err = StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd, "'. Try CONFIG HELP."); @@ -561,14 +570,14 @@ tcp_port:)"; append(StrCat("unknown_", k_v.first, ":"), k_v.second); } - for (const auto& k_v : m.conn_stats.cmd_count) { + for (const auto& k_v : m.conn_stats.cmd_count_map) { append(StrCat("cmd_", k_v.first, ":"), k_v.second); } } if (should_enter("ERRORSTATS", true)) { ADD_HEADER("# Errorstats"); - for (const auto& k_v : m.conn_stats.err_count) { + for (const auto& k_v : m.conn_stats.err_count_map) { append(StrCat(k_v.first, ":"), k_v.second); } } diff --git a/tests/gen_sets.sh b/tests/gen_sets.sh new file mode 100755 index 000000000..9dbcf57d6 --- /dev/null +++ b/tests/gen_sets.sh @@ -0,0 +1,6 @@ +#!/bin/bash + +memtier_benchmark -p 6379 --command "sadd __key__ __data__" -n 20 --threads=4 \ + -c 10 --command-key-pattern=R --distinct-client-seed -c 30 --data-size=64 \ + --key-prefix="key:" --hide-histogram --random-data --key-maximum=10000 +