From 6d44d72a56082591ab96abc8ee0e0981292cbb43 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sun, 15 May 2022 23:11:45 +0300 Subject: [PATCH] Add more debugging tools for object introspection --- src/server/debugcmd.cc | 5 + src/server/generic_family.cc | 221 +++++++++++++++++++---------------- src/server/generic_family.h | 10 -- 3 files changed, 125 insertions(+), 111 deletions(-) diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 235d775cf..ffe9c7548 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -44,6 +44,8 @@ struct PopulateBatch { struct ObjInfo { unsigned encoding; unsigned bucket_id = 0; + unsigned slot_id = 0; + int64_t ttl = INT64_MAX; bool has_sec_precision = false; @@ -292,6 +294,7 @@ void DebugCmd::Inspect(string_view key) { } ObjInfo oinfo(it->second.Encoding(), it.bucket_id()); + oinfo.slot_id = it.slot_id(); if (it->second.HasExpire()) { ExpireIterator exp_it = exp_t->Find(it->first); @@ -309,6 +312,8 @@ void DebugCmd::Inspect(string_view key) { if (res) { string resp; StrAppend(&resp, "encoding:", strEncoding(res->encoding), " bucket_id:", res->bucket_id); + StrAppend(&resp, " slot:", res->slot_id); + if (res->ttl != INT64_MAX) { StrAppend(&resp, " ttl:", res->ttl, res->has_sec_precision ? "s" : "ms"); } diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 2df6f31eb..d9dced1e0 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -10,10 +10,10 @@ extern "C" { } #include "base/logging.h" +#include "server/blocking_controller.h" #include "server/command_registry.h" #include "server/conn_context.h" #include "server/engine_shard_set.h" -#include "server/blocking_controller.h" #include "server/error.h" #include "server/transaction.h" #include "util/varz.h" @@ -164,6 +164,105 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) { return OpStatus::OK; } +struct ScanOpts { + string_view pattern; + string_view type_filter; + size_t limit = 10; + + unsigned bucket_id = UINT_MAX; +}; + +bool ScanCb(const OpArgs& op_args, PrimeIterator it, const ScanOpts& opts, StringVec* res) { + auto& db_slice = op_args.shard->db_slice(); + if (it->second.HasExpire()) { + it = db_slice.ExpireIfNeeded(op_args.db_ind, it).first; + } + + if (!IsValid(it)) + return false; + + bool matches = opts.type_filter.empty() || ObjTypeName(it->second.ObjType()) == opts.type_filter; + + if (!matches) + return false; + + if (opts.bucket_id != UINT_MAX && opts.bucket_id != it.bucket_id()) { + return false; + } + + if (opts.pattern.empty()) { + res->push_back(it->first.ToString()); + } else { + string str = it->first.ToString(); + if (stringmatchlen(opts.pattern.data(), opts.pattern.size(), str.data(), str.size(), 0) != 1) + return false; + + res->push_back(std::move(str)); + } + + return true; +} + +void OpScan(const OpArgs& op_args, const ScanOpts& scan_opts, uint64_t* cursor, + StringVec* vec) { + auto& db_slice = op_args.shard->db_slice(); + DCHECK(db_slice.IsDbValid(op_args.db_ind)); + + unsigned cnt = 0; + + VLOG(1) << "PrimeTable " << db_slice.shard_id() << "/" << op_args.db_ind << " has " + << db_slice.DbSize(op_args.db_ind); + + PrimeTable::cursor cur = *cursor; + auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_ind); + do { + cur = prime_table->Traverse( + cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, scan_opts, vec); }); + } while (cur && cnt < scan_opts.limit); + + VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur.value(); + *cursor = cur.value(); +} + +uint64_t ScanGeneric(uint64_t cursor, const ScanOpts& scan_opts, StringVec* keys, + ConnectionContext* cntx) { + ShardId sid = cursor % 1024; + + EngineShardSet* ess = cntx->shard_set; + unsigned shard_count = ess->size(); + + // Dash table returns a cursor with its right byte empty. We will use it + // for encoding shard index. For now scan has a limitation of 255 shards. + CHECK_LT(shard_count, 1024u); + + if (sid >= shard_count) { // protection + return 0; + } + + cursor >>= 10; + + do { + ess->Await(sid, [&] { + OpArgs op_args{EngineShard::tlocal(), cntx->conn_state.db_index}; + + OpScan(op_args, scan_opts, &cursor, keys); + }); + if (cursor == 0) { + ++sid; + if (unsigned(sid) == shard_count) + break; + } + } while (keys->size() < scan_opts.limit); + + if (sid < shard_count) { + cursor = (cursor << 10) | sid; + } else { + DCHECK_EQ(0u, cursor); + } + + return cursor; +} + } // namespace void GenericFamily::Init(util::ProactorPool* pp) { @@ -298,8 +397,11 @@ void GenericFamily::Keys(CmdArgList args, ConnectionContext* cntx) { StringVec keys; + ScanOpts scan_opts; + scan_opts.pattern = pattern; + scan_opts.limit = 512; do { - cursor = ScanGeneric(cursor, pattern, string_view{}, 512, &keys, cntx); + cursor = ScanGeneric(cursor, scan_opts, &keys, cntx); } while (cursor != 0 && keys.size() < FLAGS_keys_output_limit); (*cntx)->StartArray(keys.size()); @@ -446,14 +548,14 @@ void GenericFamily::Echo(CmdArgList args, ConnectionContext* cntx) { void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) { string_view token = ArgS(args, 1); - string_view pattern, type_filter; uint64_t cursor = 0; - uint32_t limit = 10; if (!absl::SimpleAtoi(token, &cursor)) { return (*cntx)->SendError("invalid cursor"); } + ScanOpts scan_opts; + for (unsigned i = 2; i < args.size(); i += 2) { if (i + 1 == args.size()) { return (*cntx)->SendError(kSyntaxErr); @@ -463,27 +565,31 @@ void GenericFamily::Scan(CmdArgList args, ConnectionContext* cntx) { string_view opt = ArgS(args, i); if (opt == "COUNT") { - if (!absl::SimpleAtoi(ArgS(args, i + 1), &limit)) { + if (!absl::SimpleAtoi(ArgS(args, i + 1), &scan_opts.limit)) { return (*cntx)->SendError(kInvalidIntErr); } - if (limit == 0) - limit = 1; - else if (limit > 4096) - limit = 4096; + if (scan_opts.limit == 0) + scan_opts.limit = 1; + else if (scan_opts.limit > 4096) + scan_opts.limit = 4096; } else if (opt == "MATCH") { - pattern = ArgS(args, i + 1); - if (pattern == "*") - pattern = string_view{}; + scan_opts.pattern = ArgS(args, i + 1); + if (scan_opts.pattern == "*") + scan_opts.pattern = string_view{}; } else if (opt == "TYPE") { ToLower(&args[i + 1]); - type_filter = ArgS(args, i + 1); + scan_opts.type_filter = ArgS(args, i + 1); + } else if (opt == "BUCKET") { + if (!absl::SimpleAtoi(ArgS(args, i + 1), &scan_opts.bucket_id)) { + return (*cntx)->SendError(kInvalidIntErr); + } } else { return (*cntx)->SendError(kSyntaxErr); } } StringVec keys; - cursor = ScanGeneric(cursor, pattern, type_filter, limit, &keys, cntx); + cursor = ScanGeneric(cursor, scan_opts, &keys, cntx); (*cntx)->StartArray(2); (*cntx)->SendSimpleString(absl::StrCat(cursor)); @@ -608,93 +714,6 @@ OpResult GenericFamily::OpRen(const OpArgs& op_args, string_view from_key, return OpStatus::OK; } -uint64_t GenericFamily::ScanGeneric(uint64_t cursor, string_view pattern, string_view type_filter, - unsigned limit, StringVec* keys, ConnectionContext* cntx) { - ShardId sid = cursor % 1024; - - EngineShardSet* ess = cntx->shard_set; - unsigned shard_count = ess->size(); - - // Dash table returns a cursor with its right byte empty. We will use it - // for encoding shard index. For now scan has a limitation of 255 shards. - CHECK_LT(shard_count, 1024u); - - if (sid >= shard_count) { // protection - return 0; - } - - cursor >>= 10; - - do { - ess->Await(sid, [&] { - OpArgs op_args{EngineShard::tlocal(), cntx->conn_state.db_index}; - OpScan(op_args, pattern, type_filter, limit, &cursor, keys); - }); - if (cursor == 0) { - ++sid; - if (unsigned(sid) == shard_count) - break; - } - } while (keys->size() < limit); - - if (sid < shard_count) { - cursor = (cursor << 10) | sid; - } else { - DCHECK_EQ(0u, cursor); - } - - return cursor; -} - -void GenericFamily::OpScan(const OpArgs& op_args, string_view pattern, string_view type_filter, - size_t limit, uint64_t* cursor, StringVec* vec) { - auto& db_slice = op_args.shard->db_slice(); - DCHECK(db_slice.IsDbValid(op_args.db_ind)); - - unsigned cnt = 0; - - VLOG(1) << "PrimeTable " << db_slice.shard_id() << "/" << op_args.db_ind << " has " - << db_slice.DbSize(op_args.db_ind); - - PrimeTable::cursor cur = *cursor; - auto [prime_table, expire_table] = db_slice.GetTables(op_args.db_ind); - do { - cur = prime_table->Traverse( - cur, [&](PrimeIterator it) { cnt += ScanCb(op_args, it, pattern, type_filter, vec); }); - } while (cur && cnt < limit); - - VLOG(1) << "OpScan " << db_slice.shard_id() << " cursor: " << cur.value(); - *cursor = cur.value(); -} - -bool GenericFamily::ScanCb(const OpArgs& op_args, PrimeIterator it, string_view pattern, - string_view type_filter, StringVec* res) { - auto& db_slice = op_args.shard->db_slice(); - if (it->second.HasExpire()) { - it = db_slice.ExpireIfNeeded(op_args.db_ind, it).first; - } - - if (!IsValid(it)) - return false; - - bool matches = type_filter.empty() || ObjTypeName(it->second.ObjType()) == type_filter; - - if (!matches) - return false; - - if (pattern.empty()) { - res->push_back(it->first.ToString()); - } else { - string str = it->first.ToString(); - if (stringmatchlen(pattern.data(), pattern.size(), str.data(), str.size(), 0) != 1) - return false; - - res->push_back(std::move(str)); - } - - return true; -} - using CI = CommandId; #define HFUNC(x) SetHandler(&GenericFamily::x) diff --git a/src/server/generic_family.h b/src/server/generic_family.h index ea2be8ecd..6e1073a2c 100644 --- a/src/server/generic_family.h +++ b/src/server/generic_family.h @@ -67,16 +67,6 @@ class GenericFamily { static OpResult OpExists(const OpArgs& op_args, ArgSlice keys); static OpResult OpRen(const OpArgs& op_args, std::string_view from, std::string_view to, bool skip_exists); - - static uint64_t ScanGeneric(uint64_t cursor, std::string_view pattern, - std::string_view type_filter, unsigned limit, StringVec* keys, - ConnectionContext* cntx); - - static void OpScan(const OpArgs& op_args, std::string_view pattern, std::string_view type_filter, - size_t limit, uint64_t* cursor, StringVec* vec); - - static bool ScanCb(const OpArgs& op_args, PrimeIterator it, std::string_view pattern, - std::string_view type_filter, StringVec* res); }; } // namespace dfly