From c6e4e9786541ee020cd5d56131dcb96165a8d89e Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 5 Apr 2022 18:45:58 +0300 Subject: [PATCH] List fixes. 1. Fix blocked_clients statistic. 2. Add HELLO decorator. 3. Non-list keys should not wake blpop/brpop commands. 4. Fix Info output whitespacing. --- src/facade/facade.cc | 8 +- src/facade/facade_types.h | 6 +- src/server/engine_shard_set.cc | 4 +- src/server/list_family.cc | 28 ++++--- src/server/list_family.h | 2 +- src/server/list_family_test.cc | 28 +++++++ src/server/server_family.cc | 135 ++++++++++++++++++--------------- src/server/server_family.h | 1 + src/server/transaction.cc | 1 + 9 files changed, 136 insertions(+), 77 deletions(-) diff --git a/src/facade/facade.cc b/src/facade/facade.cc index d7649839d..3aa006ce4 100644 --- a/src/facade/facade.cc +++ b/src/facade/facade.cc @@ -21,10 +21,8 @@ constexpr size_t kSizeConnStats = sizeof(ConnectionStats); ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { // To break this code deliberately if we add/remove a field to this struct. - static_assert(kSizeConnStats == 152); + static_assert(kSizeConnStats == 160); - ADD(num_conns); - ADD(num_replicas); ADD(read_buf_capacity); ADD(io_read_cnt); ADD(io_read_bytes); @@ -34,6 +32,10 @@ ConnectionStats& ConnectionStats::operator+=(const ConnectionStats& o) { ADD(pipelined_cmd_cnt); ADD(async_writes_cnt); + ADD(num_conns); + ADD(num_replicas); + ADD(num_blocked_clients); + for (const auto& k_v : o.err_count) { err_count[k_v.first] += k_v.second; } diff --git a/src/facade/facade_types.h b/src/facade/facade_types.h index 94eac05ee..614d53d46 100644 --- a/src/facade/facade_types.h +++ b/src/facade/facade_types.h @@ -23,8 +23,6 @@ struct ConnectionStats { absl::flat_hash_map err_count; absl::flat_hash_map cmd_count; - uint32_t num_conns = 0; - uint32_t num_replicas = 0; size_t read_buf_capacity = 0; size_t io_read_cnt = 0; size_t io_read_bytes = 0; @@ -36,6 +34,10 @@ struct ConnectionStats { // Writes count that happenned via SendRawMessageAsync call. size_t async_writes_cnt = 0; + uint32_t num_conns = 0; + uint32_t num_replicas = 0; + uint32_t num_blocked_clients = 0; + ConnectionStats& operator+=(const ConnectionStats& o); }; diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 7587fdb59..5a46beaac 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -6,6 +6,7 @@ extern "C" { #include "redis/zmalloc.h" +#include "redis/object.h" } #include "base/logging.h" @@ -269,12 +270,13 @@ void EngineShard::ProcessAwakened(Transaction* completed_t) { for (auto key : wt.awakened_keys) { string_view sv_key = static_cast(key); auto [it, exp_it] = db_slice_.FindExt(index, sv_key); // Double verify we still got the item. - if (!IsValid(it)) + if (!IsValid(it) || it->second.ObjType() != OBJ_LIST) // Only LIST is allowed to block. continue; auto w_it = wt.queue_map.find(sv_key); CHECK(w_it != wt.queue_map.end()); DVLOG(1) << "NotifyWatchQueue " << key; + Transaction* t2 = NotifyWatchQueue(w_it->second.get()); if (t2) { awakened_transactions_.insert(t2); diff --git a/src/server/list_family.cc b/src/server/list_family.cc index db0ebde17..f904b8f6e 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -15,6 +15,7 @@ extern "C" { #include "server/conn_context.h" #include "server/engine_shard_set.h" #include "server/error.h" +#include "server/server_state.h" #include "server/transaction.h" /** @@ -57,6 +58,8 @@ DEFINE_int32(list_compress_depth, 0, "Compress depth of the list. Default is no namespace dfly { using namespace std; +using namespace facade; + namespace { quicklistEntry QLEntry() { @@ -143,6 +146,8 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) { t->Schedule(); } + auto* stats = ServerState::tl_connection_stats(); + while (true) { result = t->FindFirst(); @@ -152,8 +157,7 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) { if (result.status() != OpStatus::KEY_NOTFOUND) { // Some error occurred. // We could be registered in the queue due to previous iterations. t->UnregisterWatch(); - - return result.status(); + break; } if (is_multi) { @@ -163,12 +167,16 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) { return OpStatus::TIMED_OUT; } - if (!t->WaitOnWatch(tp)) { + ++stats->num_blocked_clients; + bool wait_succeeded = t->WaitOnWatch(tp); + --stats->num_blocked_clients; + + if (!wait_succeeded) return OpStatus::TIMED_OUT; - } } - DCHECK_EQ(OpStatus::OK, result.status()); + if (!result) + return result.status(); VLOG(1) << "Popping an element"; find_sid_ = result->sid; @@ -408,14 +416,16 @@ void ListFamily::PushGeneric(ListDir dir, bool skip_notexists, CmdArgList args, return (*cntx)->SendLong(result.value()); } -void ListFamily::PopGeneric(ListDir dir, const CmdArgList& args, ConnectionContext* cntx) { +void ListFamily::PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); int32_t count = 1; bool return_arr = false; if (args.size() > 2) { - if (args.size() > 3) - return (*cntx)->SendError(kSyntaxErr); + if (args.size() > 3) { + ToLower(&args[0]); + return (*cntx)->SendError(WrongNumArgsError(ArgS(args, 0))); + } string_view count_s = ArgS(args, 2); if (!absl::SimpleAtoi(count_s, &count)) { @@ -423,7 +433,7 @@ void ListFamily::PopGeneric(ListDir dir, const CmdArgList& args, ConnectionConte } if (count < 0) { - return (*cntx)->SendError(facade::kUintErr); + return (*cntx)->SendError(kUintErr); } return_arr = true; } diff --git a/src/server/list_family.h b/src/server/list_family.h index b8d47e812..01960e618 100644 --- a/src/server/list_family.h +++ b/src/server/list_family.h @@ -35,7 +35,7 @@ class ListFamily { static void LRem(CmdArgList args, ConnectionContext* cntx); static void LSet(CmdArgList args, ConnectionContext* cntx); - static void PopGeneric(ListDir dir, const CmdArgList& args, ConnectionContext* cntx); + static void PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx); static void PushGeneric(ListDir dir, bool skip_notexist, CmdArgList args, ConnectionContext* cntx); diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index ba1c83a18..e4916b59d 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -243,4 +243,32 @@ TEST_F(ListFamilyTest, BLPopSerialize) { } } +TEST_F(ListFamilyTest, WrongTypeDoesNotWake) { + RespVec blpop_resp; + + auto pop_fb = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] { + blpop_resp = Run({"blpop", kKey1, "0"}); + }); + + do { + this_fiber::sleep_for(30us); + } while (!IsLocked(0, kKey1)); + + auto p1_fb = pp_->at(1)->LaunchFiber([&] { + Run({"multi"}); + Run({"lpush", kKey1, "A"}); + Run({"set", kKey1, "foo"}); + + auto resp = Run({"exec"}); + EXPECT_THAT(resp, ElementsAre(IntArg(1), "OK")); + + Run({"del", kKey1}); + Run({"lpush", kKey1, "B"}); + }); + + p1_fb.join(); + pop_fb.join(); + EXPECT_THAT(blpop_resp, ElementsAre(kKey1, "B")); +} + } // namespace dfly diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 346a1ddcf..0ffb22814 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -49,6 +49,7 @@ namespace fibers = ::boost::fibers; namespace fs = std::filesystem; using facade::MCReplyBuilder; using strings::HumanReadableNumBytes; +using absl::StrCat; namespace { @@ -251,7 +252,7 @@ void ServerFamily::Config(CmdArgList args, ConnectionContext* cntx) { return (*cntx)->SendStringArr(res); } else { - string err = absl::StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd, + string err = StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd, "'. Try CONFIG HELP."); return (*cntx)->SendError(err, kSyntaxErr); } @@ -272,7 +273,7 @@ void ServerFamily::Memory(CmdArgList args, ConnectionContext* cntx) { return (*cntx)->SendLong(1); } - string err = absl::StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd, + string err = StrCat("Unknown subcommand or wrong number of arguments for '", sub_cmd, "'. Try MEMORY HELP."); return (*cntx)->SendError(err, kSyntaxErr); } @@ -282,7 +283,7 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) { auto [current, switched] = global_state_.Next(GlobalState::SAVING); if (!switched) { - string error = absl::StrCat(GlobalState::Name(current), " - can not save database"); + string error = StrCat(GlobalState::Name(current), " - can not save database"); return (*cntx)->SendError(error); } @@ -294,13 +295,13 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) { if (!dir_path.empty()) { ec = CreateDirs(dir_path); if (ec) - return (*cntx)->SendError(absl::StrCat("create dir ", ec.message())); + return (*cntx)->SendError(StrCat("create dir ", ec.message())); } string filename = FLAGS_dbfilename.empty() ? "dump_save.rdb" : FLAGS_dbfilename; fs::path path = dir_path; path.append(filename); - path.concat(absl::StrCat("_", fl_index++)); + path.concat(StrCat("_", fl_index++)); VLOG(1) << "Saving to " << path; auto res = uring::OpenWrite(path.generic_string()); @@ -409,133 +410,144 @@ tcp_port:)"; auto should_enter = [&](string_view name, bool hidden = false) { bool res = (!hidden && section.empty()) || section == "ALL" || section == name; if (res && !info.empty()) - info.push_back('\n'); + info.append("\r\n"); return res; }; + auto append = [&info](absl::AlphaNum a1, absl::AlphaNum a2) { + absl::StrAppend(&info, a1, a2, "\r\n"); + }; + + #define ADD_HEADER(x) absl::StrAppend(&info, x "\r\n") + if (should_enter("SERVER")) { - absl::StrAppend(&info, kInfo1, FLAGS_port, "\n"); + append(kInfo1, FLAGS_port); size_t uptime = time(NULL) - start_time_; - absl::StrAppend(&info, "uptime_in_seconds:", uptime, "\n"); - absl::StrAppend(&info, "uptime_in_days:", uptime / (3600 * 24), "\n"); + append("uptime_in_seconds:", uptime); + append("uptime_in_days:", uptime / (3600 * 24)); } Metrics m = GetMetrics(); auto sdata_res = io::ReadStatusInfo(); if (should_enter("CLIENTS")) { - absl::StrAppend(&info, "# Clients\n"); - absl::StrAppend(&info, "connected_clients:", m.conn_stats.num_conns, "\n"); - absl::StrAppend(&info, "client_read_buf_capacity:", m.conn_stats.read_buf_capacity, "\n"); - absl::StrAppend(&info, "blocked_clients:", 0, "\n"); + ADD_HEADER("# Clients"); + append("connected_clients:", m.conn_stats.num_conns); + append("client_read_buf_capacity:", m.conn_stats.read_buf_capacity); + append("blocked_clients:", m.conn_stats.num_blocked_clients); } if (should_enter("MEMORY")) { - absl::StrAppend(&info, "# Memory\n"); + ADD_HEADER("# Memory"); - absl::StrAppend(&info, "used_memory:", m.heap_used_bytes, "\n"); - absl::StrAppend(&info, "used_memory_human:", HumanReadableNumBytes(m.heap_used_bytes), "\n"); - absl::StrAppend(&info, "comitted_memory:", _mi_stats_main.committed.current, "\n"); + append("used_memory:", m.heap_used_bytes); + append("used_memory_human:", HumanReadableNumBytes(m.heap_used_bytes)); + append("comitted_memory:", _mi_stats_main.committed.current); if (sdata_res.has_value()) { - absl::StrAppend(&info, "used_memory_rss:", sdata_res->vm_rss, "\n"); - absl::StrAppend(&info, "used_memory_rss_human:", HumanReadableNumBytes(sdata_res->vm_rss), - "\n"); + append("used_memory_rss:", sdata_res->vm_rss); + append("used_memory_rss_human:", HumanReadableNumBytes(sdata_res->vm_rss)); } else { LOG(ERROR) << "Error fetching /proc/self/status stats"; } - absl::StrAppend(&info, "used_memory_peak:", used_mem_peak.load(memory_order_relaxed), "\n"); + append("used_memory_peak:", used_mem_peak.load(memory_order_relaxed)); // Blob - all these cases where the key/objects are represented by a single blob allocated on // heap. For example, strings or intsets. members of lists, sets, zsets etc // are not accounted for to avoid complex computations. In some cases, when number of members // is known we approximate their allocations by taking 16 bytes per member. - absl::StrAppend(&info, "blob_used_memory:", m.db.obj_memory_usage, "\n"); - absl::StrAppend(&info, "table_used_memory:", m.db.table_mem_usage, "\n"); - absl::StrAppend(&info, "num_buckets:", m.db.bucket_count, "\n"); - absl::StrAppend(&info, "num_entries:", m.db.key_count, "\n"); - absl::StrAppend(&info, "inline_keys:", m.db.inline_keys, "\n"); - absl::StrAppend(&info, "small_string_bytes:", m.db.small_string_bytes, "\n"); - absl::StrAppend(&info, "listpack_blobs:", m.db.listpack_blob_cnt, "\n"); - absl::StrAppend(&info, "listpack_bytes:", m.db.listpack_bytes, "\n"); + append("blob_used_memory:", m.db.obj_memory_usage); + append("table_used_memory:", m.db.table_mem_usage); + append("num_buckets:", m.db.bucket_count); + append("num_entries:", m.db.key_count); + append("inline_keys:", m.db.inline_keys); + append("small_string_bytes:", m.db.small_string_bytes); + append("listpack_blobs:", m.db.listpack_blob_cnt); + append("listpack_bytes:", m.db.listpack_bytes); } if (should_enter("STATS")) { - absl::StrAppend(&info, "# Stats\n"); - absl::StrAppend(&info, "instantaneous_ops_per_sec:", m.qps, "\n"); - absl::StrAppend(&info, "total_commands_processed:", m.conn_stats.command_cnt, "\n"); - absl::StrAppend(&info, "total_pipelined_commands:", m.conn_stats.pipelined_cmd_cnt, "\n"); - absl::StrAppend(&info, "total_net_input_bytes:", m.conn_stats.io_read_bytes, "\n"); - absl::StrAppend(&info, "total_net_output_bytes:", m.conn_stats.io_write_bytes, "\n"); - absl::StrAppend(&info, "instantaneous_input_kbps:", -1, "\n"); - absl::StrAppend(&info, "instantaneous_output_kbps:", -1, "\n"); - absl::StrAppend(&info, "rejected_connections:", -1, "\n"); - absl::StrAppend(&info, "expired_keys:", m.events.expired_keys, "\n"); - absl::StrAppend(&info, "gc_keys:", m.events.garbage_collected, "\n"); - absl::StrAppend(&info, "keyspace_hits:", -1, "\n"); - absl::StrAppend(&info, "keyspace_misses:", -1, "\n"); - absl::StrAppend(&info, "total_reads_processed:", m.conn_stats.io_read_cnt, "\n"); - absl::StrAppend(&info, "total_writes_processed:", m.conn_stats.io_write_cnt, "\n"); - absl::StrAppend(&info, "async_writes_count:", m.conn_stats.async_writes_cnt, "\n"); + ADD_HEADER("# Stats"); + + append("instantaneous_ops_per_sec:", m.qps); + append("total_commands_processed:", m.conn_stats.command_cnt); + append("total_pipelined_commands:", m.conn_stats.pipelined_cmd_cnt); + append("total_net_input_bytes:", m.conn_stats.io_read_bytes); + append("total_net_output_bytes:", m.conn_stats.io_write_bytes); + append("instantaneous_input_kbps:", -1); + append("instantaneous_output_kbps:", -1); + append("rejected_connections:", -1); + append("expired_keys:", m.events.expired_keys); + append("gc_keys:", m.events.garbage_collected); + append("keyspace_hits:", -1); + append("keyspace_misses:", -1); + append("total_reads_processed:", m.conn_stats.io_read_cnt); + append("total_writes_processed:", m.conn_stats.io_write_cnt); + append("async_writes_count:", m.conn_stats.async_writes_cnt); } if (should_enter("REPLICATION")) { - absl::StrAppend(&info, "# Replication\n"); + ADD_HEADER("# Replication"); ServerState& etl = *ServerState::tlocal(); if (etl.is_master) { - absl::StrAppend(&info, "role:master\n"); - absl::StrAppend(&info, "connected_slaves:", m.conn_stats.num_replicas, "\n"); + append("role:", "master"); + append("connected_slaves:", m.conn_stats.num_replicas); } else { - absl::StrAppend(&info, "role:slave\n"); + append("role:", "slave"); // it's safe to access replica_ because replica_ is created before etl.is_master set to // false and cleared after etl.is_master is set to true. And since the code here that checks // for is_master and copies shared_ptr is atomic, it1 should be correct. auto replica_ptr = replica_; Replica::Info rinfo = replica_ptr->GetInfo(); - absl::StrAppend(&info, "master_host:", rinfo.host, "\n"); - absl::StrAppend(&info, "master_port:", rinfo.port, "\n"); + append("master_host:", rinfo.host); + append("master_port:", rinfo.port); const char* link = rinfo.master_link_established ? "up" : "down"; - absl::StrAppend(&info, "master_link_status:", link, "\n"); - absl::StrAppend(&info, "master_last_io_seconds_ago:", rinfo.master_last_io_sec, "\n"); - absl::StrAppend(&info, "master_sync_in_progress:", rinfo.sync_in_progress, "\n"); + append("master_link_status:", link); + append("master_last_io_seconds_ago:", rinfo.master_last_io_sec); + append("master_sync_in_progress:", rinfo.sync_in_progress); } } if (should_enter("COMMANDSTATS", true)) { - absl::StrAppend(&info, "# Commandstats\n"); + ADD_HEADER("# Commandstats"); + auto unknown_cmd = service_.UknownCmdMap(); for (const auto& k_v : unknown_cmd) { - absl::StrAppend(&info, "unknown_", k_v.first, ":", k_v.second, "\n"); + append(StrCat("unknown_", k_v.first, ":"), k_v.second); } for (const auto& k_v : m.conn_stats.cmd_count) { - absl::StrAppend(&info, "cmd_", k_v.first, ":", k_v.second, "\n"); + append(StrCat("cmd_", k_v.first, ":"), k_v.second); } } if (should_enter("ERRORSTATS", true)) { - absl::StrAppend(&info, "# Errorstats\n"); + ADD_HEADER("# Errorstats"); for (const auto& k_v : m.conn_stats.err_count) { - absl::StrAppend(&info, k_v.first, ":", k_v.second, "\n"); + append(StrCat(k_v.first, ":"), k_v.second); } } if (should_enter("KEYSPACE")) { - absl::StrAppend(&info, "# Keyspace\n"); - absl::StrAppend(&info, "db0:keys=xxx,expires=yyy,avg_ttl=zzz\n"); // TODO + ADD_HEADER("# Keyspace"); + append("db0:", "keys=xxx,expires=yyy,avg_ttl=zzz"); // TODO } (*cntx)->SendBulkString(info); } +void ServerFamily::Hello(CmdArgList args, ConnectionContext* cntx) { + return (*cntx)->SendOk(); +} + void ServerFamily::ReplicaOf(CmdArgList args, ConnectionContext* cntx) { std::string_view host = ArgS(args, 1); std::string_view port_s = ArgS(args, 2); @@ -651,6 +663,7 @@ void ServerFamily::Register(CommandRegistry* registry) { << CI{"FLUSHDB", CO::WRITE | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(FlushDb) << CI{"FLUSHALL", CO::WRITE | CO::GLOBAL_TRANS, -1, 0, 0, 0}.HFUNC(FlushAll) << CI{"INFO", CO::LOADING, -1, 0, 0, 0}.HFUNC(Info) + << CI{"HELLO", CO::LOADING, -1, 0, 0, 0}.HFUNC(Hello) << CI{"LASTSAVE", CO::LOADING | CO::RANDOM | CO::FAST, 1, 0, 0, 0}.HFUNC(LastSave) << CI{"MEMORY", kMemOpts, -2, 0, 0, 0}.HFUNC(Memory) << CI{"SAVE", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Save) diff --git a/src/server/server_family.h b/src/server/server_family.h index ccc56bdb2..e58a8ce49 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -67,6 +67,7 @@ class ServerFamily { void FlushDb(CmdArgList args, ConnectionContext* cntx); void FlushAll(CmdArgList args, ConnectionContext* cntx); void Info(CmdArgList args, ConnectionContext* cntx); + void Hello(CmdArgList args, ConnectionContext* cntx); void LastSave(CmdArgList args, ConnectionContext* cntx); void Psync(CmdArgList args, ConnectionContext* cntx); void ReplicaOf(CmdArgList args, ConnectionContext* cntx); diff --git a/src/server/transaction.cc b/src/server/transaction.cc index a13b955c1..f3996c506 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -405,6 +405,7 @@ bool Transaction::RunInShard(EngineShard* shard) { sd.local_mask &= ~KEYLOCK_ACQUIRED; } sd.local_mask &= ~OUT_OF_ORDER; + // It has 2 responsibilities. // 1: to go over potential wakened keys, verify them and activate watch queues. // 2: if this transaction was notified and finished running - to remove it from the head