diff --git a/server/CMakeLists.txt b/server/CMakeLists.txt index 8a98ce9fe..132f98a0d 100644 --- a/server/CMakeLists.txt +++ b/server/CMakeLists.txt @@ -2,7 +2,7 @@ add_executable(midi-redis dfly_main.cc) cxx_link(midi-redis base dragonfly_lib) add_library(dragonfly_lib command_registry.cc common_types.cc config_flags.cc - conn_context.cc db_slice.cc dragonfly_listener.cc + conn_context.cc db_slice.cc debugcmd.cc dragonfly_listener.cc dragonfly_connection.cc engine_shard_set.cc main_service.cc memcache_parser.cc redis_parser.cc resp_expr.cc reply_builder.cc) diff --git a/server/debugcmd.cc b/server/debugcmd.cc new file mode 100644 index 000000000..a0075ead8 --- /dev/null +++ b/server/debugcmd.cc @@ -0,0 +1,140 @@ +// Copyright 2021, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// +#include "server/debugcmd.h" + +#include + +#include "base/logging.h" +#include "server/engine_shard_set.h" + +namespace dfly { + +using namespace boost; +using namespace std; + +static const char kUintErr[] = "value is out of range, must be positive"; + +struct PopulateBatch { + uint64_t index[32]; + uint64_t sz = 0; +}; + +void DoPopulateBatch(std::string_view prefix, size_t val_size, const PopulateBatch& ps) { + EngineShard* es = EngineShard::tlocal(); + DbSlice& db_slice = es->db_slice; + + for (unsigned i = 0; i < ps.sz; ++i) { + string key = absl::StrCat(prefix, ":", ps.index[i]); + string val = absl::StrCat("value:", ps.index[i]); + + if (val.size() < val_size) { + val.resize(val_size, 'x'); + } + auto [it, res] = db_slice.AddOrFind(0, key); + if (res) { + it->second = std::move(val); + } + } +} + +DebugCmd::DebugCmd(EngineShardSet* ess, ConnectionContext* cntx) : ess_(ess), cntx_(cntx) { +} + +void DebugCmd::Run(CmdArgList args) { + std::string_view subcmd = ArgS(args, 1); + if (subcmd == "HELP") { + std::string_view help_arr[] = { + "DEBUG [ [value] [opt] ...]. Subcommands are:", + "POPULATE [] []", + " Create string keys named key:. If is specified then", + " it is used instead of the 'key' prefix.", + "HELP", + " Prints this help.", + }; + return cntx_->SendSimpleStrArr(help_arr, ABSL_ARRAYSIZE(help_arr)); + } + + VLOG(1) << "subcmd " << subcmd; + + if (subcmd == "POPULATE") { + return Populate(args); + } + + string reply = absl::StrCat("Unknown subcommand or wrong number of arguments for '", subcmd, + "'. Try DEBUG HELP."); + return cntx_->SendError(reply); +} + +void DebugCmd::Populate(CmdArgList args) { + if (args.size() < 3 || args.size() > 5) { + return cntx_->SendError( + "Unknown subcommand or wrong number of arguments for 'populate'. Try DEBUG HELP."); + } + + uint64_t total_count = 0; + if (!absl::SimpleAtoi(ArgS(args, 2), &total_count)) + return cntx_->SendError(kUintErr); + std::string_view prefix{"key"}; + + if (args.size() > 3) { + prefix = ArgS(args, 3); + } + uint32_t val_size = 0; + if (args.size() > 4) { + std::string_view str = ArgS(args, 4); + if (!absl::SimpleAtoi(str, &val_size)) + return cntx_->SendError(kUintErr); + } + + size_t runners_count = ess_->pool()->size(); + vector> ranges(runners_count - 1); + uint64_t batch_size = total_count / runners_count; + size_t from = 0; + for (size_t i = 0; i < ranges.size(); ++i) { + ranges[i].first = from; + ranges[i].second = batch_size; + from += batch_size; + } + ranges.emplace_back(from, total_count - from); + + auto distribute_cb = [this, val_size, prefix]( + uint64_t from, uint64_t len) { + string key = absl::StrCat(prefix, ":"); + size_t prefsize = key.size(); + std::vector ps(ess_->size(), PopulateBatch{}); + + for (uint64_t i = from; i < from + len; ++i) { + absl::StrAppend(&key, i); + ShardId sid = Shard(key, ess_->size()); + key.resize(prefsize); + + auto& pops = ps[sid]; + pops.index[pops.sz++] = i; + if (pops.sz == 32) { + ess_->Add(sid, [=, p = pops] { + DoPopulateBatch(prefix, val_size, p); + if (i % 100 == 0) { + this_fiber::yield(); + } + }); + + // we capture pops by value so we can override it here. + pops.sz = 0; + } + } + + ess_->RunBriefInParallel( + [&](EngineShard* shard) { DoPopulateBatch(prefix, val_size, ps[shard->shard_id()]); }); + }; + vector fb_arr(ranges.size()); + for (size_t i = 0; i < ranges.size(); ++i) { + fb_arr[i] = ess_->pool()->at(i)->LaunchFiber(distribute_cb, ranges[i].first, ranges[i].second); + } + for (auto& fb : fb_arr) + fb.join(); + + cntx_->SendOk(); +} + +} // namespace dfly diff --git a/server/debugcmd.h b/server/debugcmd.h new file mode 100644 index 000000000..5054ba4e2 --- /dev/null +++ b/server/debugcmd.h @@ -0,0 +1,26 @@ +// Copyright 2021, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include "server/conn_context.h" + +namespace dfly { + +class EngineShardSet; + +class DebugCmd { + public: + DebugCmd(EngineShardSet* ess, ConnectionContext* cntx); + + void Run(CmdArgList args); + + private: + void Populate(CmdArgList args); + + EngineShardSet* ess_; + ConnectionContext* cntx_; +}; + +} // namespace dfly diff --git a/server/main_service.cc b/server/main_service.cc index f7ba55c1a..45689d074 100644 --- a/server/main_service.cc +++ b/server/main_service.cc @@ -12,6 +12,7 @@ #include "base/logging.h" #include "server/conn_context.h" +#include "server/debugcmd.h" #include "util/metrics/metrics.h" #include "util/uring/uring_fiber_algo.h" #include "util/varz.h" @@ -206,6 +207,14 @@ void Service::Get(CmdArgList args, ConnectionContext* cntx) { cntx->EndMultilineReply(); } +void Service::Debug(CmdArgList args, ConnectionContext* cntx) { + ToUpper(&args[1]); + + DebugCmd dbg_cmd{&shard_set_, cntx}; + + return dbg_cmd.Run(args); +} + VarzValue::Map Service::GetVarzStats() { VarzValue::Map res; @@ -228,7 +237,8 @@ void Service::RegisterCommands() { registry_ << CI{"PING", CO::STALE | CO::FAST, -1, 0, 0, 0}.HFUNC(Ping) << CI{"SET", CO::WRITE | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(Set) - << CI{"GET", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Get); + << CI{"GET", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Get) + << CI{"DEBUG", CO::RANDOM | CO::READONLY, -2, 0, 0, 0}.HFUNC(Debug); } } // namespace dfly diff --git a/server/main_service.h b/server/main_service.h index dc9fd25da..b9f598ceb 100644 --- a/server/main_service.h +++ b/server/main_service.h @@ -49,6 +49,7 @@ class Service { void Ping(CmdArgList args, ConnectionContext* cntx); void Set(CmdArgList args, ConnectionContext* cntx); void Get(CmdArgList args, ConnectionContext* cntx); + void Debug(CmdArgList args, ConnectionContext* cntx); void RegisterCommands(); diff --git a/server/reply_builder.cc b/server/reply_builder.cc index c4e56862c..3feaa1e6a 100644 --- a/server/reply_builder.cc +++ b/server/reply_builder.cc @@ -177,4 +177,15 @@ void ReplyBuilder::SendGetNotFound() { } } +void ReplyBuilder::SendSimpleStrArr(const std::string_view* arr, uint32_t count) { + CHECK(protocol_ == Protocol::REDIS); + string res = absl::StrCat("*", count, kCRLF); + + for (size_t i = 0; i < count; ++i) { + StrAppend(&res, "+", arr[i], kCRLF); + } + + serializer_->SendDirect(res); +} + } // namespace dfly diff --git a/server/reply_builder.h b/server/reply_builder.h index 06837fe62..40e174322 100644 --- a/server/reply_builder.h +++ b/server/reply_builder.h @@ -99,6 +99,10 @@ class ReplyBuilder { serializer_->SetBatchMode(mode); } + // Resp specific. + // This one is prefixed with + and with clrf added automatically to each item.. + void SendSimpleStrArr(const std::string_view* arr, uint32_t count); + private: RespSerializer* as_resp() { return static_cast(serializer_.get());