diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 3f7facd85..2fff0172c 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -48,7 +48,7 @@ namespace { using namespace std; using namespace facade; using namespace util; - +using Payload = journal::Entry::Payload; using CI = CommandId; constexpr char kIdNotFound[] = "syncid not found"; @@ -485,7 +485,7 @@ void WriteFlushSlotsToJournal(const SlotRanges& slot_ranges) { // TODO: Break slot migration upon FLUSHSLOTS journal->RecordEntry(/* txid= */ 0, journal::Op::COMMAND, /* dbid= */ 0, /* shard_cnt= */ shard_set->size(), nullopt, - make_pair("DFLYCLUSTER", args_view), false); + Payload("DFLYCLUSTER", args_view), false); }; shard_set->pool()->AwaitFiberOnAll(std::move(cb)); } diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 12382f123..87ffde8c4 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -39,6 +39,7 @@ using namespace std; using namespace util; using absl::GetFlag; using facade::OpStatus; +using Payload = journal::Entry::Payload; namespace { @@ -205,7 +206,7 @@ unsigned PrimeEvictionPolicy::Evict(const PrimeTable::HotspotBuckets& eb, PrimeT if (auto journal = db_slice_->shard_owner()->journal(); journal) { ArgSlice delete_args(&key, 1); journal->RecordEntry(0, journal::Op::EXPIRED, cntx_.db_index, 1, cluster::KeySlot(key), - make_pair("DEL", delete_args), false); + Payload("DEL", delete_args), false); } db_slice_->PerformDeletion(DbSlice::Iterator(last_slot_it, StringOrView::FromView(key)), table); @@ -1230,7 +1231,7 @@ finish: for (string_view key : keys_to_journal) { ArgSlice delete_args(&key, 1); journal->RecordEntry(0, journal::Op::EXPIRED, db_ind, 1, cluster::KeySlot(key), - make_pair("DEL", delete_args), false); + Payload("DEL", delete_args), false); } } diff --git a/src/server/journal/journal_test.cc b/src/server/journal/journal_test.cc index 6f24790e1..06313bf07 100644 --- a/src/server/journal/journal_test.cc +++ b/src/server/journal/journal_test.cc @@ -11,58 +11,50 @@ using namespace std; using namespace util; namespace dfly { +namespace journal { +template string ConCat(const T& list) { + string res; + for (auto arg : list) { + res += string_view{arg.data(), arg.size()}; + res += ' '; + } + return res; +} + +template <> string ConCat(const CmdArgList& list) { + string res; + for (auto arg : list) { + res += facade::ToSV(arg); + res += ' '; + } + return res; +} struct EntryPayloadVisitor { - void operator()(const string_view sv) { - *out += sv; - *out += ' '; - } - - void operator()(const CmdArgList list) { - for (auto arg : list) { - *out += facade::ToSV(arg); - *out += ' '; - } - } - - void operator()(const ArgSlice slice) { - for (auto arg : slice) { - *out += arg; - *out += ' '; - } - } - - template void operator()(const pair p) { - (*this)(p.first); - (*this)(p.second); - } - - void operator()(monostate) { + void operator()(const Entry::Payload& p) { + out->append(p.cmd).append(" "); + *out += visit([this](const auto& args) { return ConCat(args); }, p.args); } string* out; }; // Extract payload from entry in string form. -std::string ExtractPayload(journal::ParsedEntry& entry) { - std::string out; - EntryPayloadVisitor visitor{&out}; +std::string ExtractPayload(ParsedEntry& entry) { + std::string out = ConCat(entry.cmd.cmd_args); - CmdArgList list{entry.cmd.cmd_args.data(), entry.cmd.cmd_args.size()}; - visitor(list); - - if (out.size() > 0 && out.back() == ' ') + if (out.size() > 0) out.pop_back(); return out; } -std::string ExtractPayload(journal::Entry& entry) { +std::string ExtractPayload(Entry& entry) { std::string out; EntryPayloadVisitor visitor{&out}; - std::visit(visitor, entry.payload); + visitor(entry.payload); - if (out.size() > 0 && out.back() == ' ') + if (out.size() > 0) out.pop_back(); return out; @@ -96,17 +88,18 @@ TEST(Journal, WriteRead) { auto slice = [v = &slices](auto... ss) { return StoreSlice(v, ss...); }; auto list = [v = &lists](auto... ss) { return StoreList(v, ss...); }; + using Payload = Entry::Payload; - std::vector test_entries = { - {0, journal::Op::COMMAND, 0, 2, nullopt, make_pair("MSET", slice("A", "1", "B", "2"))}, - {0, journal::Op::COMMAND, 0, 2, nullopt, make_pair("MSET", slice("C", "3"))}, - {1, journal::Op::COMMAND, 0, 2, nullopt, make_pair("DEL", list("A", "B"))}, - {2, journal::Op::COMMAND, 1, 1, nullopt, make_pair("LPUSH", list("l", "v1", "v2"))}, - {3, journal::Op::COMMAND, 0, 1, nullopt, make_pair("MSET", slice("D", "4"))}, - {4, journal::Op::COMMAND, 1, 1, nullopt, make_pair("DEL", list("l1"))}, - {5, journal::Op::COMMAND, 2, 1, nullopt, make_pair("DEL", list("E", "2"))}, - {6, journal::Op::MULTI_COMMAND, 2, 1, nullopt, make_pair("SET", list("E", "2"))}, - {6, journal::Op::EXEC, 2, 1, nullopt}}; + std::vector test_entries = { + {0, Op::COMMAND, 0, 2, nullopt, Payload("MSET", slice("A", "1", "B", "2"))}, + {0, Op::COMMAND, 0, 2, nullopt, Payload("MSET", slice("C", "3"))}, + {1, Op::COMMAND, 0, 2, nullopt, Payload("DEL", list("A", "B"))}, + {2, Op::COMMAND, 1, 1, nullopt, Payload("LPUSH", list("l", "v1", "v2"))}, + {3, Op::COMMAND, 0, 1, nullopt, Payload("MSET", slice("D", "4"))}, + {4, Op::COMMAND, 1, 1, nullopt, Payload("DEL", list("l1"))}, + {5, Op::COMMAND, 2, 1, nullopt, Payload("DEL", list("E", "2"))}, + {6, Op::MULTI_COMMAND, 2, 1, nullopt, Payload("SET", list("E", "2"))}, + {6, Op::EXEC, 2, 1, nullopt}}; // Write all entries to a buffer. base::IoBuf buf; @@ -134,6 +127,5 @@ TEST(Journal, WriteRead) { } } +} // namespace journal } // namespace dfly - -// TODO: extend test. diff --git a/src/server/journal/serializer.cc b/src/server/journal/serializer.cc index 715a0217a..f127f2cac 100644 --- a/src/server/journal/serializer.cc +++ b/src/server/journal/serializer.cc @@ -35,30 +35,36 @@ void JournalWriter::Write(std::string_view sv) { sink_->Write(io::Buffer(sv)); } -template void JournalWriter::Write(std::pair args) { - auto [cmd, tail_args] = args; - - Write(1 + tail_args.size()); - - size_t cmd_size = cmd.size(); - for (auto v : tail_args) { - cmd_size += v.size(); - } - Write(cmd_size); - - Write(cmd); - for (auto v : tail_args) { - if constexpr (is_same_v) - Write(facade::ToSV(v)); - else - Write(v); +// element count, total size +template pair SliceSize(const C& list) { + size_t res = 0, count = 0; + for (auto a : list) { + res += a.size(); + ++count; } + return {count, res}; } -template void JournalWriter::Write(pair); -template void JournalWriter::Write(pair); +void JournalWriter::Write(const journal::Entry::Payload& payload) { + if (payload.cmd.empty()) + return; -void JournalWriter::Write(std::monostate) { + auto [num_elems, size] = + std::visit([](const auto& list) { return SliceSize(list); }, payload.args); + + Write(1 + num_elems); + + size_t cmd_size = payload.cmd.size() + size; + Write(cmd_size); + Write(payload.cmd); + + std::visit( + [this](const auto& list) { + for (auto v : list) { + this->Write(v); + } + }, + payload.args); } void JournalWriter::Write(const journal::Entry& entry) { @@ -86,7 +92,8 @@ void JournalWriter::Write(const journal::Entry& entry) { case journal::Op::EXEC: Write(entry.txid); Write(entry.shard_cnt); - return std::visit([this](const auto& payload) { return Write(payload); }, entry.payload); + Write(entry.payload); + break; default: break; }; diff --git a/src/server/journal/serializer.h b/src/server/journal/serializer.h index 4d832a234..39bb66179 100644 --- a/src/server/journal/serializer.h +++ b/src/server/journal/serializer.h @@ -26,11 +26,11 @@ class JournalWriter { private: void Write(std::string_view sv); // Write string. + void Write(facade::MutableSlice slice) { + Write(facade::ToSV(slice)); + } - template // CmdArgList or ArgSlice. - void Write(std::pair args); - - void Write(std::monostate); // Overload for empty std::variant + void Write(const journal::Entry::Payload& payload); private: io::Sink* sink_; diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index 8543cfc9e..ac6adf198 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -201,7 +201,7 @@ void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pv, uint64_t args.push_back("ABSTTL"); // Means expire string is since epoch - WriteCommand(make_pair("RESTORE", ArgSlice{args})); + WriteCommand(journal::Entry::Payload("RESTORE", ArgSlice(args))); } void RestoreStreamer::WriteCommand(journal::Entry::Payload cmd_payload) { diff --git a/src/server/journal/types.cc b/src/server/journal/types.cc index e0a30b1e1..6b8d7f414 100644 --- a/src/server/journal/types.cc +++ b/src/server/journal/types.cc @@ -8,35 +8,48 @@ namespace dfly::journal { -std::string Entry::ToString() const { - std::string rv = absl::StrCat("{op=", opcode, ", dbid=", dbid); - std::visit( - [&rv](const auto& payload) { - if constexpr (std::is_same_v, std::monostate>) { - absl::StrAppend(&rv, ", empty"); - } else { - const auto& [cmd, args] = payload; - absl::StrAppend(&rv, ", cmd='"); - absl::StrAppend(&rv, cmd); - absl::StrAppend(&rv, "', args=["); - for (size_t i = 0; i < args.size(); i++) { - absl::StrAppend(&rv, "'"); - absl::StrAppend(&rv, facade::ToSV(args[i])); - absl::StrAppend(&rv, "'"); - if (i + 1 != args.size()) - absl::StrAppend(&rv, ", "); - } - absl::StrAppend(&rv, "]"); - } - }, - payload); +using namespace std; +using facade::ToSV; + +void AppendPrefix(string_view cmd, string* dest) { + absl::StrAppend(dest, ", cmd='"); + absl::StrAppend(dest, cmd); + absl::StrAppend(dest, "', args=["); +} + +void AppendSuffix(string* dest) { + if (dest->back() == ',') + dest->pop_back(); + absl::StrAppend(dest, "]"); +} + +template string Concat(const C& list) { + string res; + for (auto arg : list) { + absl::StrAppend(&res, "'"); + absl::StrAppend(&res, ToSV(arg)); + absl::StrAppend(&res, "',"); + } + return res; +} + +string Entry::ToString() const { + string rv = absl::StrCat("{op=", opcode, ", dbid=", dbid); + + if (HasPayload()) { + AppendPrefix(payload.cmd, &rv); + rv += visit([](const auto& list) { return Concat(list); }, payload.args); + AppendSuffix(&rv); + } else { + absl::StrAppend(&rv, ", empty"); + } rv += "}"; return rv; } -std::string ParsedEntry::ToString() const { - std::string rv = absl::StrCat("{op=", opcode, ", dbid=", dbid, ", cmd='"); +string ParsedEntry::ToString() const { + string rv = absl::StrCat("{op=", opcode, ", dbid=", dbid, ", cmd='"); for (auto& arg : cmd.cmd_args) { absl::StrAppend(&rv, facade::ToSV(arg)); absl::StrAppend(&rv, " "); diff --git a/src/server/journal/types.h b/src/server/journal/types.h index 6671c88a6..4d86e3325 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -39,11 +39,19 @@ struct EntryBase { // Those are either control instructions or commands. struct Entry : public EntryBase { // Payload represents a non-owning view into a command executed on the shard. - using Payload = - std::variant, // Parts of a full command. - std::pair // Command and its shard parts. - >; + struct Payload { + std::string_view cmd; + std::variant + args; + + Payload() = default; + Payload(std::string_view c, CmdArgList a) : cmd(c), args(a) { + } + Payload(std::string_view c, const ShardArgs& a) : cmd(c), args(a) { + } + }; Entry(TxId txid, Op opcode, DbIndex dbid, uint32_t shard_cnt, std::optional slot_id, Payload pl) @@ -63,7 +71,7 @@ struct Entry : public EntryBase { } bool HasPayload() const { - return !std::holds_alternative(payload); + return !payload.cmd.empty(); } std::string ToString() const; diff --git a/src/server/json_family.cc b/src/server/json_family.cc index e139379a0..da5ff13ed 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -19,7 +19,6 @@ #include "core/flatbuffers.h" #include "core/json/json_object.h" #include "core/json/path.h" -#include "core/overloaded.h" #include "facade/cmd_arg_parser.h" #include "facade/op_status.h" #include "server/acl/acl_commands_def.h" diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 3ae781eec..65795a52e 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1422,9 +1422,9 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard, RunnableResult resul string_view cmd{cid_->name()}; if (unique_shard_cnt_ == 1 || kv_args_.empty()) { - entry_payload = make_pair(cmd, full_args_); + entry_payload = journal::Entry::Payload(cmd, full_args_); } else { - entry_payload = make_pair(cmd, GetShardArgs(shard->shard_id()).AsSlice()); + entry_payload = journal::Entry::Payload(cmd, GetShardArgs(shard->shard_id()).AsSlice()); } LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false, true); } diff --git a/src/server/tx_base.cc b/src/server/tx_base.cc index 5f1993c27..2ad8558f3 100644 --- a/src/server/tx_base.cc +++ b/src/server/tx_base.cc @@ -13,11 +13,12 @@ namespace dfly { using namespace std; +using Payload = journal::Entry::Payload; void RecordJournal(const OpArgs& op_args, string_view cmd, ArgSlice args, uint32_t shard_cnt, bool multi_commands) { VLOG(2) << "Logging command " << cmd << " from txn " << op_args.tx->txid(); - op_args.tx->LogJournalOnShard(op_args.shard, make_pair(cmd, args), shard_cnt, multi_commands, + op_args.tx->LogJournalOnShard(op_args.shard, Payload(cmd, args), shard_cnt, multi_commands, false); } @@ -29,7 +30,7 @@ void RecordExpiry(DbIndex dbid, string_view key) { auto journal = EngineShard::tlocal()->journal(); CHECK(journal); journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, cluster::KeySlot(key), - make_pair("DEL", ArgSlice{key}), false); + Payload("DEL", ArgSlice{key}), false); } void TriggerJournalWriteToSink() { diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index a17a74ad4..82e929b8a 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -1033,7 +1033,9 @@ async def assert_lag_condition(inst, client, condition): @dfly_args({"proactor_threads": 2}) @pytest.mark.asyncio -async def test_replication_info(df_local_factory, df_seeder_factory, n_keys=2000): +async def test_replication_info( + df_local_factory: DflyInstanceFactory, df_seeder_factory, n_keys=2000 +): master = df_local_factory.create() replica = df_local_factory.create(logtostdout=True, replication_acks_interval=100) df_local_factory.start_all([master, replica])