diff --git a/go.work b/go.work index da3028d45..82a4f97c1 100644 --- a/go.work +++ b/go.work @@ -1,3 +1,6 @@ go 1.20 -use ./contrib/charts/dragonfly +use ( + ./contrib/charts/dragonfly + ./tools/replay +) diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index 5b9011515..d843a8d6f 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -13,8 +13,10 @@ #include "absl/strings/str_cat.h" #include "base/flags.h" +#include "base/io_buf.h" #include "base/logging.h" #include "core/heap_size.h" +#include "core/uring.h" #include "facade/conn_context.h" #include "facade/dragonfly_listener.h" #include "facade/memcache_parser.h" @@ -100,6 +102,129 @@ void UpdateIoBufCapacity(const base::IoBuf& io_buf, ConnectionStats* stats, } } +struct TrafficLogger { + // protects agains closing the file while writing or data races when opening the file. + // Also, makes sure that LogTraffic are executed atomically. + fb2::Mutex mutex; + unique_ptr log_file; + + void ResetLocked(); + // Returns true if Write succeeded, false if it failed and the recording should be aborted. + bool Write(string_view blob); + bool Write(iovec* blobs, size_t len); +}; + +void TrafficLogger::ResetLocked() { + if (log_file) { + log_file->Close(); + log_file.reset(); + } +} + +// Returns true if Write succeeded, false if it failed and the recording should be aborted. +bool TrafficLogger::Write(string_view blob) { + auto ec = log_file->Write(io::Buffer(blob)); + if (ec) { + LOG(ERROR) << "Error writing to traffic log: " << ec; + ResetLocked(); + return false; + } + return true; +} + +bool TrafficLogger::Write(iovec* blobs, size_t len) { + auto ec = log_file->Write(blobs, len); + if (ec) { + LOG(ERROR) << "Error writing to traffic log: " << ec; + ResetLocked(); + return false; + } + return true; +} + +thread_local TrafficLogger tl_traffic_logger{}; // nullopt while disabled + +void OpenTrafficLogger(string_view base_path) { + unique_lock lk{tl_traffic_logger.mutex}; + if (tl_traffic_logger.log_file) + return; + + // Open file with append mode, without it concurrent fiber writes seem to conflict + string path = absl::StrCat( + base_path, "-", absl::Dec(ProactorBase::me()->GetPoolIndex(), absl::kZeroPad3), ".bin"); + auto file = util::fb2::OpenWrite(path, io::WriteFile::Options{/*.append = */ false}); + if (!file) { + LOG(ERROR) << "Error opening a file " << path << " for traffic logging: " << file.error(); + return; + } + tl_traffic_logger.log_file = unique_ptr{file.value()}; +} + +void LogTraffic(uint32_t id, bool has_more, absl::Span resp) { + string_view cmd = resp.front().GetView(); + if (absl::EqualsIgnoreCase(cmd, "debug"sv)) + return; + + DVLOG(2) << "Recording " << cmd; + + char stack_buf[1024]; + char* next = stack_buf; + + // We write id, timestamp, has_more, num_parts, part_len, part_len, part_len, ... + // And then all the part blobs concatenated together. + auto write_u32 = [&next](uint32_t i) { + absl::little_endian::Store32(next, i); + next += 4; + }; + + write_u32(id); + + absl::little_endian::Store64(next, absl::GetCurrentTimeNanos()); + next += 8; + + write_u32(has_more ? 1 : 0); + write_u32(uint32_t(resp.size())); + + // Grab the lock and check if the file is still open. + lock_guard lk{tl_traffic_logger.mutex}; + + if (!tl_traffic_logger.log_file) + return; + + // Proceed with writing the blob lengths. + for (auto part : resp) { + if (size_t(next - stack_buf + 4) > sizeof(stack_buf)) { + if (!tl_traffic_logger.Write(string_view{stack_buf, size_t(next - stack_buf)})) { + return; + } + next = stack_buf; + } + write_u32(part.GetView().size()); + } + + // Write the data itself. + std::array blobs; + unsigned index = 0; + if (next != stack_buf) { + blobs[index++] = iovec{.iov_base = stack_buf, .iov_len = size_t(next - stack_buf)}; + } + + for (auto part : resp) { + blobs[index++] = iovec{.iov_base = const_cast(part.GetView().data()), + .iov_len = part.GetView().size()}; + if (index >= blobs.size()) { + if (!tl_traffic_logger.Write(blobs.data(), blobs.size())) { + return; + } + index = 0; + } + } + + if (index) { + tl_traffic_logger.Write(blobs.data(), index); + } +} + constexpr size_t kMinReadSize = 256; thread_local uint32_t free_req_release_weight = 0; @@ -691,6 +816,11 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { void Connection::DispatchCommand(uint32_t consumed, mi_heap_t* heap) { bool can_dispatch_sync = (consumed >= io_buf_.InputLen()); + if (tl_traffic_logger.log_file) { + // Log command as soon as we receive it + LogTraffic(id_, !can_dispatch_sync, absl::MakeSpan(tmp_parse_args_)); + } + // Avoid sync dispatch if an async dispatch is already in progress, or else they'll interleave. if (cc_->async_dispatch) can_dispatch_sync = false; @@ -1397,6 +1527,15 @@ bool Connection::IsTrackingOn() const { return tracking_enabled_; } +void Connection::StartTrafficLogging(string_view path) { + OpenTrafficLogger(path); +} + +void Connection::StopTrafficLogging() { + lock_guard lk(tl_traffic_logger.mutex); + tl_traffic_logger.ResetLocked(); +} + Connection::MemoryUsage Connection::GetMemoryUsage() const { size_t mem = sizeof(*this) + dfly::HeapSize(dispatch_q_) + dfly::HeapSize(name_) + dfly::HeapSize(tmp_parse_args_) + dfly::HeapSize(tmp_cmd_vec_) + diff --git a/src/facade/dragonfly_connection.h b/src/facade/dragonfly_connection.h index 076857aba..01f4dfe67 100644 --- a/src/facade/dragonfly_connection.h +++ b/src/facade/dragonfly_connection.h @@ -276,6 +276,14 @@ class Connection : public util::Connection { bool IsTrackingOn() const; + // Starts traffic logging in the calling thread. Must be a proactor thread. + // Each thread creates its own log file combining requests from all the connections in + // that thread. A noop if the thread is already logging. + static void StartTrafficLogging(std::string_view base_path); + + // Stops traffic logging in this thread. A noop if the thread is not logging. + static void StopTrafficLogging(); + protected: void OnShutdown() override; void OnPreMigrateThread() override; diff --git a/src/facade/resp_expr.h b/src/facade/resp_expr.h index 89dd4d990..43b940783 100644 --- a/src/facade/resp_expr.h +++ b/src/facade/resp_expr.h @@ -35,11 +35,15 @@ class RespExpr { return Buffer{reinterpret_cast(s->data()), s->size()}; } - std::string GetString() const { + std::string_view GetView() const { Buffer buffer = GetBuf(); return {reinterpret_cast(buffer.data()), buffer.size()}; } + std::string GetString() const { + return std::string(GetView()); + } + Buffer GetBuf() const { return std::get(u); } diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index e1cd34f9c..f3c1fb0ca 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -255,6 +255,9 @@ void DebugCmd::Run(CmdArgList args) { " Prints memory usage and key stats per shard, as well as min/max indicators.", "TX", " Performs transaction analysis per shard.", + "TRAFFIC | [STOP]" + " Starts traffic logging to the specified path. If path is not specified," + " traffic logging is stopped.", "HELP", " Prints this help.", }; @@ -308,10 +311,20 @@ void DebugCmd::Run(CmdArgList args) { if (subcmd == "EXEC") { return Exec(); } + + if (subcmd == "TRAFFIC") { + return LogTraffic(args.subspan(1)); + } + string reply = UnknownSubCmd(subcmd, "DEBUG"); return cntx_->SendError(reply, kSyntaxErrType); } +void DebugCmd::Shutdown() { + // disable traffic logging + shard_set->pool()->AwaitFiberOnAll([](auto*) { facade::Connection::StopTrafficLogging(); }); +} + void DebugCmd::Reload(CmdArgList args) { bool save = true; @@ -590,6 +603,24 @@ void DebugCmd::Exec() { rb->SendVerbatimString(res); } +void DebugCmd::LogTraffic(CmdArgList args) { + optional path; + if (args.size() == 1 && absl::AsciiStrToUpper(facade::ToSV(args.front())) != "STOP"sv) { + path = ArgS(args, 0); + LOG(INFO) << "Logging to traffic to " << *path << "*.bin"; + } else { + LOG(INFO) << "Traffic logging stopped"; + } + + shard_set->pool()->AwaitFiberOnAll([path](auto*) { + if (path) + facade::Connection::StartTrafficLogging(*path); + else + facade::Connection::StopTrafficLogging(); + }); + cntx_->SendOk(); +} + void DebugCmd::Inspect(string_view key) { EngineShardSet& ess = *shard_set; ShardId sid = Shard(key, ess.size()); diff --git a/src/server/debugcmd.h b/src/server/debugcmd.h index d79377837..9b5649e33 100644 --- a/src/server/debugcmd.h +++ b/src/server/debugcmd.h @@ -28,6 +28,8 @@ class DebugCmd { void Run(CmdArgList args); + static void Shutdown(); + private: void Populate(CmdArgList args); std::optional ParsePopulateArgs(CmdArgList args); @@ -43,6 +45,7 @@ class DebugCmd { void ObjHist(); void Stacktrace(); void Shards(); + void LogTraffic(CmdArgList); ServerFamily& sf_; ConnectionContext* cntx_; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 864a1cb80..77f5e38f5 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -565,6 +565,7 @@ void ServerFamily::Shutdown() { } dfly_cmd_->Shutdown(); + DebugCmd::Shutdown(); }); } diff --git a/tools/replay/go.mod b/tools/replay/go.mod new file mode 100644 index 000000000..090c444d7 --- /dev/null +++ b/tools/replay/go.mod @@ -0,0 +1,22 @@ +module dragonfydb.io/traffic-replay + +go 1.20 + +require ( + atomicgo.dev/cursor v0.2.0 // indirect + atomicgo.dev/keyboard v0.2.9 // indirect + atomicgo.dev/schedule v0.1.0 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect + github.com/containerd/console v1.0.3 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect + github.com/gookit/color v1.5.4 // indirect + github.com/lithammer/fuzzysearch v1.1.8 // indirect + github.com/mattn/go-runewidth v0.0.15 // indirect + github.com/pterm/pterm v0.12.74 // indirect + github.com/redis/go-redis/v9 v9.4.0 // indirect + github.com/rivo/uniseg v0.4.4 // indirect + github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e // indirect + golang.org/x/sys v0.15.0 // indirect + golang.org/x/term v0.15.0 // indirect + golang.org/x/text v0.14.0 // indirect +) diff --git a/tools/replay/go.sum b/tools/replay/go.sum new file mode 100644 index 000000000..f48d8077b --- /dev/null +++ b/tools/replay/go.sum @@ -0,0 +1,109 @@ +atomicgo.dev/cursor v0.2.0 h1:H6XN5alUJ52FZZUkI7AlJbUc1aW38GWZalpYRPpoPOw= +atomicgo.dev/cursor v0.2.0/go.mod h1:Lr4ZJB3U7DfPPOkbH7/6TOtJ4vFGHlgj1nc+n900IpU= +atomicgo.dev/keyboard v0.2.9 h1:tOsIid3nlPLZ3lwgG8KZMp/SFmr7P0ssEN5JUsm78K8= +atomicgo.dev/keyboard v0.2.9/go.mod h1:BC4w9g00XkxH/f1HXhW2sXmJFOCWbKn9xrOunSFtExQ= +atomicgo.dev/schedule v0.1.0 h1:nTthAbhZS5YZmgYbb2+DH8uQIZcTlIrd4eYr3UQxEjs= +atomicgo.dev/schedule v0.1.0/go.mod h1:xeUa3oAkiuHYh8bKiQBRojqAMq3PXXbJujjb0hw8pEU= +github.com/MarvinJWendt/testza v0.1.0/go.mod h1:7AxNvlfeHP7Z/hDQ5JtE3OKYT3XFUeLCDE2DQninSqs= +github.com/MarvinJWendt/testza v0.2.1/go.mod h1:God7bhG8n6uQxwdScay+gjm9/LnO4D3kkcZX4hv9Rp8= +github.com/MarvinJWendt/testza v0.2.8/go.mod h1:nwIcjmr0Zz+Rcwfh3/4UhBp7ePKVhuBExvZqnKYWlII= +github.com/MarvinJWendt/testza v0.2.10/go.mod h1:pd+VWsoGUiFtq+hRKSU1Bktnn+DMCSrDrXDpX2bG66k= +github.com/MarvinJWendt/testza v0.2.12/go.mod h1:JOIegYyV7rX+7VZ9r77L/eH6CfJHHzXjB69adAhzZkI= +github.com/MarvinJWendt/testza v0.3.0/go.mod h1:eFcL4I0idjtIx8P9C6KkAuLgATNKpX4/2oUqKc6bF2c= +github.com/MarvinJWendt/testza v0.4.2/go.mod h1:mSdhXiKH8sg/gQehJ63bINcCKp7RtYewEjXsvsVUPbE= +github.com/atomicgo/cursor v0.0.1/go.mod h1:cBON2QmmrysudxNBFthvMtN32r3jxVRIvzkUiF/RuIk= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/containerd/console v1.0.3 h1:lIr7SlA5PxZyMV30bDW0MGbiOPXwc63yRuCP0ARubLw= +github.com/containerd/console v1.0.3/go.mod h1:7LqA/THxQ86k76b8c/EMSiaJ3h1eZkMkXar0TQ1gf3U= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= +github.com/gookit/color v1.4.2/go.mod h1:fqRyamkC1W8uxl+lxCQxOT09l/vYfZ+QeiX3rKQHCoQ= +github.com/gookit/color v1.5.0/go.mod h1:43aQb+Zerm/BWh2GnrgOQm7ffz7tvQXEKV6BFMl7wAo= +github.com/gookit/color v1.5.4 h1:FZmqs7XOyGgCAxmWyPslpiok1k05wmY3SJTytgvYFs0= +github.com/gookit/color v1.5.4/go.mod h1:pZJOeOS8DM43rXbp4AZo1n9zCU2qjpcRko0b6/QJi9w= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= +github.com/klauspost/cpuid/v2 v2.0.10/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c= +github.com/klauspost/cpuid/v2 v2.0.12/go.mod h1:g2LTdtYhdyuGPqyWyv7qRAmj1WBqxuObKfj5c0PQa7c= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/lithammer/fuzzysearch v1.1.8 h1:/HIuJnjHuXS8bKaiTMeeDlW2/AyIWk2brx1V8LFgLN4= +github.com/lithammer/fuzzysearch v1.1.8/go.mod h1:IdqeyBClc3FFqSzYq/MXESsS4S0FsZ5ajtkr5xPLts4= +github.com/mattn/go-runewidth v0.0.13/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/mattn/go-runewidth v0.0.15 h1:UNAjwbU9l54TA3KzvqLGxwWjHmMgBUVhBiTjelZgg3U= +github.com/mattn/go-runewidth v0.0.15/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pterm/pterm v0.12.27/go.mod h1:PhQ89w4i95rhgE+xedAoqous6K9X+r6aSOI2eFF7DZI= +github.com/pterm/pterm v0.12.29/go.mod h1:WI3qxgvoQFFGKGjGnJR849gU0TsEOvKn5Q8LlY1U7lg= +github.com/pterm/pterm v0.12.30/go.mod h1:MOqLIyMOgmTDz9yorcYbcw+HsgoZo3BQfg2wtl3HEFE= +github.com/pterm/pterm v0.12.31/go.mod h1:32ZAWZVXD7ZfG0s8qqHXePte42kdz8ECtRyEejaWgXU= +github.com/pterm/pterm v0.12.33/go.mod h1:x+h2uL+n7CP/rel9+bImHD5lF3nM9vJj80k9ybiiTTE= +github.com/pterm/pterm v0.12.36/go.mod h1:NjiL09hFhT/vWjQHSj1athJpx6H8cjpHXNAK5bUw8T8= +github.com/pterm/pterm v0.12.40/go.mod h1:ffwPLwlbXxP+rxT0GsgDTzS3y3rmpAO1NMjUkGTYf8s= +github.com/pterm/pterm v0.12.74 h1:fPsds9KisCyJh4NyY6bv8QJt3FLHceb5DxI6W0An9cc= +github.com/pterm/pterm v0.12.74/go.mod h1:+M33aZWQVpmLmLbvjykyGZ4gAfeebznRo8JMbabaxQU= +github.com/redis/go-redis/v9 v9.4.0 h1:Yzoz33UZw9I/mFhx4MNrB6Fk+XHO1VukNcCa1+lwyKk= +github.com/redis/go-redis/v9 v9.4.0/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= +github.com/rivo/uniseg v0.4.4 h1:8TfxU8dW6PdqD27gjM8MVNuicgxIjxpm4K7x4jp8sis= +github.com/rivo/uniseg v0.4.4/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88= +github.com/sergi/go-diff v1.2.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/xo/terminfo v0.0.0-20210125001918-ca9a967f8778/go.mod h1:2MuV+tbUrU1zIOPMxZ5EncGwgmMJsa+9ucAQZXxsObs= +github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e h1:JVG44RsyaB9T2KIHavMF/ppJZNG9ZpyihvCd0w101no= +github.com/xo/terminfo v0.0.0-20220910002029-abceb7e1c41e/go.mod h1:RbqR21r5mrJuqunuUZ/Dhy/avygyECGrLceyNeo4LiM= +github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= +golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= +golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.6.0/go.mod h1:2Tu9+aMcznHK/AK1HMvgo6xiTLG5rD5rZLDS+rp2Bjs= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20211013075003-97ac67df715c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220319134239-a9b59b0215f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210220032956-6a3ed077a48d/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/term v0.0.0-20210615171337-6886f2dfbf5b/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= +golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= +golang.org/x/term v0.15.0 h1:y/Oo/a/q3IXu26lQgl04j/gjuBDOBlx7X6Om1j2CPW4= +golang.org/x/term v0.15.0/go.mod h1:BDl952bC7+uMoWR75FIrCDx79TPU9oHkTZ9yRbYOrX0= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= +golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.14.0 h1:ScX5w1eTa3QqT8oi6+ziP7dTV1S2+ALU0bI+0zXKWiQ= +golang.org/x/text v0.14.0/go.mod h1:18ZOQIKpY8NJVqYksKHtTdi31H5itFRjB5/qKTNYzSU= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= +golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= +golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.4/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/tools/replay/main.go b/tools/replay/main.go new file mode 100644 index 000000000..b18e5f2a0 --- /dev/null +++ b/tools/replay/main.go @@ -0,0 +1,161 @@ +package main + +import ( + "context" + "flag" + "fmt" + "math" + "os" + "sync" + "sync/atomic" + "time" + + "github.com/pterm/pterm" + "github.com/redis/go-redis/v9" +) + +var fHost = flag.String("host", "127.0.0.1:6379", "Redis host") +var fClientBuffer = flag.Int("buffer", 100, "How many records to buffer per client") + +type RecordHeader struct { + Client uint32 + Time uint64 + HasMore uint32 +} + +type Record struct { + RecordHeader + values []interface{} // instead of []string to unwrap into variadic +} + +// Determine earliest time +func DetermineBaseTime(files []string) time.Time { + var minTime uint64 = math.MaxUint64 + for _, file := range files { + parseRecords(file, func(r Record) bool { + if r.Time < minTime { + minTime = r.Time + } + return false + }) + } + return time.Unix(0, int64(minTime)) +} + +// Handles a single connection/client +type ClientWorker struct { + redis *redis.Client + incoming chan Record +} + +// Handles a single file and distributes messages to clients +type FileWorker struct { + clientGroup sync.WaitGroup + timeOffset time.Duration + // stats for output, updated by clients, read by rendering goroutine + processed atomic.Uint64 + delayed atomic.Uint64 + parsed atomic.Uint64 + clients atomic.Uint64 +} + +func (c ClientWorker) Run(worker *FileWorker) { + for msg := range c.incoming { + lag := time.Until(worker.HappensAt(time.Unix(0, int64(msg.Time)))) + if lag < 0 { + worker.delayed.Add(1) + } + time.Sleep(lag) + + c.redis.Do(context.Background(), msg.values...).Result() + worker.processed.Add(1) + } + worker.clientGroup.Done() +} + +func NewClient(w *FileWorker) *ClientWorker { + client := &ClientWorker{ + redis: redis.NewClient(&redis.Options{Addr: *fHost, PoolSize: 1, DisableIndentity: true}), + incoming: make(chan Record, *fClientBuffer), + } + w.clients.Add(1) + w.clientGroup.Add(1) + go client.Run(w) + return client +} + +func (w *FileWorker) Run(file string, wg *sync.WaitGroup) { + clients := make(map[uint32]*ClientWorker, 0) + parseRecords(file, func(r Record) bool { + client, ok := clients[r.Client] + if !ok { + client = NewClient(w) + clients[r.Client] = client + } + w.parsed.Add(1) + + client.incoming <- r + return true + }) + + for _, client := range clients { + close(client.incoming) + } + w.clientGroup.Wait() + wg.Done() +} + +func (w *FileWorker) HappensAt(recordTime time.Time) time.Time { + return recordTime.Add(w.timeOffset) +} + +func RenderTable(area *pterm.AreaPrinter, files []string, workers []FileWorker) { + tableData := pterm.TableData{{"file", "parsed", "processed", "delayed", "clients"}} + for i := range workers { + tableData = append(tableData, []string{ + files[i], + fmt.Sprint(workers[i].parsed.Load()), + fmt.Sprint(workers[i].processed.Load()), + fmt.Sprint(workers[i].delayed.Load()), + fmt.Sprint(workers[i].clients.Load()), + }) + } + content, _ := pterm.DefaultTable.WithHasHeader().WithBoxed().WithData(tableData).Srender() + area.Update(content) +} + +func main() { + flag.Parse() + files := os.Args[1:] + + timeOffset := time.Now().Add(500 * time.Millisecond).Sub(DetermineBaseTime(files)) + fmt.Println("Offset -> ", timeOffset) + + // Start a worker for every file. They take care of spawning client workers. + var wg sync.WaitGroup + workers := make([]FileWorker, len(files)) + for i := range workers { + workers[i] = FileWorker{timeOffset: timeOffset} + wg.Add(1) + go workers[i].Run(files[i], &wg) + } + + wgDone := make(chan bool) + go func() { + wg.Wait() + wgDone <- true + }() + + // Render table while running + area, _ := pterm.DefaultArea.WithCenter().Start() + for running := true; running; { + select { + case <-wgDone: + running = false + case <-time.After(100 * time.Millisecond): + RenderTable(area, files, workers) + } + } + + RenderTable(area, files, workers) // to show last stats +} diff --git a/tools/replay/parsing.go b/tools/replay/parsing.go new file mode 100644 index 000000000..7dc260fdf --- /dev/null +++ b/tools/replay/parsing.go @@ -0,0 +1,80 @@ +package main + +import ( + "bufio" + "encoding/binary" + "io" + "os" +) + +var kBigEmptyBytes = make([]byte, 100_000) + +func parseStrings(file io.Reader) (out []interface{}, err error) { + var num, strLen uint32 + err = binary.Read(file, binary.LittleEndian, &num) + if err != nil { + return nil, err + } + + out = make([]interface{}, num) + for i := range out { + err = binary.Read(file, binary.LittleEndian, &strLen) + if err != nil { + return nil, err + } + out[i] = strLen + } + + for i := range out { + strLen = out[i].(uint32) + + if strLen == 0 { + err = binary.Read(file, binary.LittleEndian, &strLen) + if err != nil { + return nil, err + } + out[i] = kBigEmptyBytes[:strLen] + continue + } + + buf := make([]byte, strLen) + _, err := io.ReadFull(file, buf) + if err != nil { + return nil, err + } + + out[i] = string(buf) + } + return +} + +func parseRecords(filename string, cb func(Record) bool) error { + file, err := os.Open(filename) + if err != nil { + return err + } + defer file.Close() + + reader := bufio.NewReader(file) + for { + var rec Record + err := binary.Read(reader, binary.LittleEndian, &rec.RecordHeader) + if err != nil { + if err == io.EOF { + break + } + return err + } + + rec.values, err = parseStrings(reader) + if err != nil { + return err + } + + if !cb(rec) { + return nil + } + } + + return nil +}