From 2386b02234e9bd59b1985e15e6509f172d5bf7b3 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Tue, 20 Dec 2022 16:38:19 +0300 Subject: [PATCH] feat(server): Use new journal format (#563) --- src/facade/conn_context.h | 3 +- src/facade/facade.cc | 1 + src/server/CMakeLists.txt | 2 +- src/server/common.h | 5 ++- src/server/dflycmd.cc | 13 ++++---- src/server/journal/executor.cc | 29 ++++++++++++++++ src/server/journal/executor.h | 23 +++++++++++++ src/server/journal/journal.cc | 18 +++++----- src/server/journal/journal.h | 5 +-- src/server/journal/journal_slice.cc | 2 +- src/server/journal/journal_slice.h | 2 +- src/server/journal/serializer.cc | 51 +++++++++++++++-------------- src/server/journal/serializer.h | 22 +++++++------ src/server/journal/types.h | 46 +++++--------------------- src/server/journal_test.cc | 23 ++++++------- src/server/json_family.cc | 8 ----- src/server/main_service.cc | 5 +-- src/server/rdb_extensions.h | 1 + src/server/rdb_load.cc | 45 +++++++++++++++++++++++-- src/server/rdb_load.h | 15 +++++++-- src/server/rdb_save.cc | 14 ++++++++ src/server/rdb_save.h | 8 ++++- src/server/rdb_test.cc | 4 +-- src/server/replica.cc | 38 +++++++++------------ src/server/serializer_commons.cc | 17 +++++++--- src/server/server_family.cc | 2 +- src/server/snapshot.cc | 22 ++++++++----- src/server/string_family.cc | 18 ---------- src/server/transaction.cc | 30 +++++++++++++++++ src/server/transaction.h | 9 ++++- 30 files changed, 302 insertions(+), 179 deletions(-) create mode 100644 src/server/journal/executor.cc create mode 100644 src/server/journal/executor.h diff --git a/src/facade/conn_context.h b/src/facade/conn_context.h index f9abf57cd..542d4ecf2 100644 --- a/src/facade/conn_context.h +++ b/src/facade/conn_context.h @@ -50,7 +50,8 @@ class ConnectionContext { bool req_auth : 1; bool replica_conn : 1; bool authenticated : 1; - bool force_dispatch : 1; // whether we should route all requests to the dispatch fiber. + bool force_dispatch : 1; // whether we should route all requests to the dispatch fiber. + bool journal_emulated : 1; // whether it is used to dispatch journal commands. private: Connection* owner_; diff --git a/src/facade/facade.cc b/src/facade/facade.cc index 3a06b2548..d5fef083b 100644 --- a/src/facade/facade.cc +++ b/src/facade/facade.cc @@ -118,6 +118,7 @@ ConnectionContext::ConnectionContext(::io::Sink* stream, Connection* owner) : ow replica_conn = false; authenticated = false; force_dispatch = false; + journal_emulated = false; } RedisReplyBuilder* ConnectionContext::operator->() { diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 66c7be36f..ffd6ad947 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -19,7 +19,7 @@ add_library(dragonfly_lib channel_slice.cc command_registry.cc snapshot.cc script_mgr.cc server_family.cc malloc_stats.cc set_family.cc stream_family.cc string_family.cc zset_family.cc version.cc bitops_family.cc container_utils.cc - serializer_commons.cc journal/serializer.cc) + serializer_commons.cc journal/serializer.cc journal/executor.cc) cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib absl::random_random TRDP::jsoncons zstd TRDP::lz4) diff --git a/src/server/common.h b/src/server/common.h index 3ba7b3e5c..fe281adf1 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -268,7 +268,10 @@ class Context : protected Cancellation { // // Note: this function blocks when called from inside an error handler. template GenericError Error(T... ts) { - std::lock_guard lk{mu_}; + if (!mu_.try_lock()) // TODO: Maybe use two separate locks. + return GenericError{std::forward(ts)...}; + + std::lock_guard lk{mu_, std::adopt_lock}; if (err_) return err_; diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index 5900ace5f..24f14e09b 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -13,6 +13,7 @@ #include "server/engine_shard_set.h" #include "server/error.h" #include "server/journal/journal.h" +#include "server/journal/serializer.h" #include "server/rdb_save.h" #include "server/script_mgr.h" #include "server/server_family.h" @@ -353,16 +354,16 @@ OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, EngineShard* shard) { // Register journal listener and cleanup. uint32_t cb_id = 0; if (shard != nullptr) { - cb_id = sf_->journal()->RegisterOnChange([flow](const journal::Entry& je) { - // TODO: Serialize event. - ReqSerializer serializer{flow->conn->socket()}; - serializer.SendCommand(absl::StrCat("SET ", je.key, " ", je.pval_ptr->ToString())); - }); + JournalWriter writer{flow->conn->socket()}; + auto journal_cb = [flow, writer = std::move(writer)](const journal::Entry& je) mutable { + writer.Write(je); + }; + cb_id = sf_->journal()->RegisterOnChange(std::move(journal_cb)); } flow->cleanup = [flow, this, cb_id]() { if (cb_id) - sf_->journal()->Unregister(cb_id); + sf_->journal()->UnregisterOnChange(cb_id); flow->TryShutdownSocket(); }; diff --git a/src/server/journal/executor.cc b/src/server/journal/executor.cc new file mode 100644 index 000000000..b676926a7 --- /dev/null +++ b/src/server/journal/executor.cc @@ -0,0 +1,29 @@ +// Copyright 2022, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/journal/executor.h" + +#include "base/logging.h" +#include "server/main_service.h" + +namespace dfly { + +JournalExecutor::JournalExecutor(Service* service) : service_{service} { +} + +void JournalExecutor::Execute(journal::ParsedEntry&& entry) { + if (entry.payload) { + io::NullSink null_sink; + ConnectionContext conn_context{&null_sink, nullptr}; + conn_context.is_replicating = true; + conn_context.journal_emulated = true; + conn_context.conn_state.db_index = entry.dbid; + + auto span = CmdArgList{entry.payload->data(), entry.payload->size()}; + + service_->DispatchCommand(span, &conn_context); + } +} + +} // namespace dfly diff --git a/src/server/journal/executor.h b/src/server/journal/executor.h new file mode 100644 index 000000000..247d7e26c --- /dev/null +++ b/src/server/journal/executor.h @@ -0,0 +1,23 @@ +// Copyright 2022, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include "server/journal/types.h" + +namespace dfly { + +class Service; + +// JournalExecutor allows executing journal entries. +class JournalExecutor { + public: + JournalExecutor(Service* service); + void Execute(journal::ParsedEntry&& entry); + + private: + Service* service_; +}; + +} // namespace dfly diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 4ed090dea..11634314e 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -41,11 +41,11 @@ error_code Journal::OpenInThread(bool persistent, string_view dir) { } } - ServerState::tlocal()->set_journal(this); - EngineShard* shard = EngineShard::tlocal(); - if (shard) { - shard->set_journal(this); - } + ServerState::tlocal()->set_journal(this); + EngineShard* shard = EngineShard::tlocal(); + if (shard) { + shard->set_journal(this); + } return ec; } @@ -83,16 +83,16 @@ uint32_t Journal::RegisterOnChange(ChangeCallback cb) { return journal_slice.RegisterOnChange(cb); } -void Journal::Unregister(uint32_t id) { - journal_slice.Unregister(id); +void Journal::UnregisterOnChange(uint32_t id) { + journal_slice.UnregisterOnChange(id); } bool Journal::SchedStartTx(TxId txid, unsigned num_keys, unsigned num_shards) { if (!journal_slice.IsOpen() || lameduck_.load(memory_order_relaxed)) return false; - // TODO: to complete the metadata. - journal_slice.AddLogRecord(Entry::Sched(txid)); + // TODO: Handle tx entries. + // journal_slice.AddLogRecord(Entry::Sched(txid)); return true; } diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index 162787812..5434d9442 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -13,7 +13,6 @@ class Transaction; namespace journal { - class Journal { public: using Span = absl::Span; @@ -32,9 +31,8 @@ class Journal { //******* The following functions must be called in the context of the owning shard *********// - uint32_t RegisterOnChange(ChangeCallback cb); - void Unregister(uint32_t id); + void UnregisterOnChange(uint32_t id); // Returns true if transaction was scheduled, false if journal is inactive // or in lameduck mode and does not log new transactions. @@ -58,7 +56,6 @@ class Journal { void RecordEntry(const Entry& entry); private: - mutable boost::fibers::mutex state_mu_; std::atomic_bool lameduck_{false}; diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index ff78a8a5e..22d06f5ca 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -147,7 +147,7 @@ uint32_t JournalSlice::RegisterOnChange(ChangeCallback cb) { return id; } -void JournalSlice::Unregister(uint32_t id) { +void JournalSlice::UnregisterOnChange(uint32_t id) { CHECK(!iterating_cb_arr_); auto it = find_if(change_cb_arr_.begin(), change_cb_arr_.end(), diff --git a/src/server/journal/journal_slice.h b/src/server/journal/journal_slice.h index 65dc1f74e..2129d24c7 100644 --- a/src/server/journal/journal_slice.h +++ b/src/server/journal/journal_slice.h @@ -45,7 +45,7 @@ class JournalSlice { void AddLogRecord(const Entry& entry); uint32_t RegisterOnChange(ChangeCallback cb); - void Unregister(uint32_t); + void UnregisterOnChange(uint32_t); private: struct RingItem; diff --git a/src/server/journal/serializer.cc b/src/server/journal/serializer.cc index 517d0eca6..0d3fc53c1 100644 --- a/src/server/journal/serializer.cc +++ b/src/server/journal/serializer.cc @@ -56,10 +56,10 @@ error_code JournalWriter::Write(std::monostate) { return std::error_code{}; } -error_code JournalWriter::Write(const journal::EntryNew& entry) { +error_code JournalWriter::Write(const journal::Entry& entry) { // Check if entry has a new db index and we need to emit a SELECT entry. if (entry.opcode != journal::Op::SELECT && (!cur_dbid_ || entry.dbid != *cur_dbid_)) { - RETURN_ON_ERR(Write(journal::EntryNew{journal::Op::SELECT, entry.dbid})); + RETURN_ON_ERR(Write(journal::Entry{journal::Op::SELECT, entry.dbid})); cur_dbid_ = entry.dbid; } @@ -68,7 +68,7 @@ error_code JournalWriter::Write(const journal::EntryNew& entry) { switch (entry.opcode) { case journal::Op::SELECT: return Write(entry.dbid); - case journal::Op::VAL: + case journal::Op::COMMAND: RETURN_ON_ERR(Write(entry.txid)); return std::visit([this](const auto& payload) { return Write(payload); }, entry.payload); default: @@ -77,8 +77,11 @@ error_code JournalWriter::Write(const journal::EntryNew& entry) { return std::error_code{}; } -JournalReader::JournalReader(io::Source* source, DbIndex dbid) - : source_{source}, buf_{}, dbid_{dbid} { +JournalReader::JournalReader(DbIndex dbid) : buf_{}, dbid_{dbid} { +} + +void JournalReader::SetDb(DbIndex dbid) { + dbid_ = dbid; } template io::Result ReadPackedUIntTyped(io::Source* source) { @@ -89,26 +92,26 @@ template io::Result ReadPackedUIntTyped(io::Source* source) { return static_cast(v); } -io::Result JournalReader::ReadU8() { - return ReadPackedUIntTyped(source_); +io::Result JournalReader::ReadU8(io::Source* source) { + return ReadPackedUIntTyped(source); } -io::Result JournalReader::ReadU16() { - return ReadPackedUIntTyped(source_); +io::Result JournalReader::ReadU16(io::Source* source) { + return ReadPackedUIntTyped(source); } -io::Result JournalReader::ReadU64() { - return ReadPackedUIntTyped(source_); +io::Result JournalReader::ReadU64(io::Source* source) { + return ReadPackedUIntTyped(source); } -io::Result JournalReader::ReadString() { +io::Result JournalReader::ReadString(io::Source* source) { size_t size = 0; - SET_OR_UNEXPECT(ReadU64(), size); + SET_OR_UNEXPECT(ReadU64(source), size); buf_.EnsureCapacity(size); auto dest = buf_.AppendBuffer().first(size); uint64_t read = 0; - SET_OR_UNEXPECT(source_->Read(dest), read); + SET_OR_UNEXPECT(source->Read(dest), read); buf_.CommitWrite(read); if (read != size) @@ -117,16 +120,16 @@ io::Result JournalReader::ReadString() { return size; } -std::error_code JournalReader::Read(CmdArgVec* vec) { +std::error_code JournalReader::Read(io::Source* source, CmdArgVec* vec) { buf_.ConsumeInput(buf_.InputBuffer().size()); size_t size = 0; - SET_OR_RETURN(ReadU64(), size); + SET_OR_RETURN(ReadU64(source), size); vec->resize(size); for (auto& span : *vec) { size_t len; - SET_OR_RETURN(ReadString(), len); + SET_OR_RETURN(ReadString(source), len); span = MutableSlice{nullptr, len}; } @@ -141,22 +144,22 @@ std::error_code JournalReader::Read(CmdArgVec* vec) { return std::error_code{}; } -io::Result JournalReader::ReadEntry() { +io::Result JournalReader::ReadEntry(io::Source* source) { uint8_t opcode; - SET_OR_UNEXPECT(ReadU8(), opcode); + SET_OR_UNEXPECT(ReadU8(source), opcode); journal::ParsedEntry entry{static_cast(opcode), dbid_}; switch (entry.opcode) { - case journal::Op::VAL: - SET_OR_UNEXPECT(ReadU64(), entry.txid); + case journal::Op::COMMAND: + SET_OR_UNEXPECT(ReadU64(source), entry.txid); entry.payload = CmdArgVec{}; - if (auto ec = Read(&*entry.payload); ec) + if (auto ec = Read(source, &*entry.payload); ec) return make_unexpected(ec); break; case journal::Op::SELECT: - SET_OR_UNEXPECT(ReadU16(), dbid_); - return ReadEntry(); + SET_OR_UNEXPECT(ReadU16(source), dbid_); + return ReadEntry(source); default: break; }; diff --git a/src/server/journal/serializer.h b/src/server/journal/serializer.h index 2c63f3561..aceb85a2a 100644 --- a/src/server/journal/serializer.h +++ b/src/server/journal/serializer.h @@ -23,7 +23,7 @@ class JournalWriter { JournalWriter(io::Sink* sink, std::optional dbid = std::nullopt); // Write single entry. - std::error_code Write(const journal::EntryNew& entry); + std::error_code Write(const journal::Entry& entry); private: std::error_code Write(uint64_t v); // Write packed unsigned integer. @@ -42,27 +42,29 @@ class JournalWriter { // Like the writer, it automatically keeps track of the database index. struct JournalReader { public: - // Initialize with source and start database index. - JournalReader(io::Source* source, DbIndex dbid); + // Initialize start database index. + JournalReader(DbIndex dbid); + + // Overwrite current db index. + void SetDb(DbIndex dbid); // Try reading entry from source. - io::Result ReadEntry(); + io::Result ReadEntry(io::Source* source); private: // TODO: Templated endian encoding to not repeat...? - io::Result ReadU8(); - io::Result ReadU16(); - io::Result ReadU64(); + io::Result ReadU8(io::Source* source); + io::Result ReadU16(io::Source* source); + io::Result ReadU64(io::Source* source); // Read string into internal buffer and return size. - io::Result ReadString(); + io::Result ReadString(io::Source* source); // Read argument array into internal buffer and build slice. // TODO: Inline store span data inside buffer to avoid alloaction - std::error_code Read(CmdArgVec* vec); + std::error_code Read(io::Source* source, CmdArgVec* vec); private: - io::Source* source_; base::IoBuf buf_; DbIndex dbid_; }; diff --git a/src/server/journal/types.h b/src/server/journal/types.h index f4d3e7f56..58fc72835 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -14,38 +14,8 @@ namespace journal { enum class Op : uint8_t { NOOP = 0, - LOCK = 1, - UNLOCK = 2, - LOCK_SHARD = 3, - UNLOCK_SHARD = 4, - SCHED = 5, SELECT = 6, - VAL = 10, - DEL, - MSET, -}; - -// TODO: to pass all the attributes like ttl, stickiness etc. -struct Entry { - Entry(Op op, DbIndex did, TxId tid, std::string_view skey) - : opcode(op), db_ind(did), txid(tid), key(skey) { - } - - Entry(DbIndex did, TxId tid, std::string_view skey, const PrimeValue& pval) - : Entry(Op::VAL, did, tid, skey) { - pval_ptr = &pval; - } - - static Entry Sched(TxId tid) { - return Entry{Op::SCHED, 0, tid, {}}; - } - - Op opcode; - DbIndex db_ind; - TxId txid; - std::string_view key; - const PrimeValue* pval_ptr = nullptr; - uint64_t expire_ms = 0; // 0 means no expiry. + COMMAND = 10, }; struct EntryBase { @@ -56,8 +26,7 @@ struct EntryBase { // This struct represents a single journal entry. // Those are either control instructions or commands. -struct EntryNew : public EntryBase { // Called this "New" because I can't delete the old neither - // replace it partially +struct Entry : public EntryBase { // Payload represents a non-owning view into a command executed on the shard. using Payload = std::variant // Command and its shard parts. >; - EntryNew(TxId txid, DbIndex dbid, Payload pl) - : EntryBase{txid, journal::Op::VAL, dbid}, payload{pl} { + Entry(TxId txid, DbIndex dbid, Payload pl) + : EntryBase{txid, journal::Op::COMMAND, dbid}, payload{pl} { } - EntryNew(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid}, payload{} { + Entry(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid}, payload{} { } Payload payload; }; struct ParsedEntry : public EntryBase { + // Payload represents the parsed command. using Payload = std::optional; + ParsedEntry() = default; + ParsedEntry(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid}, payload{} { } ParsedEntry(TxId txid, DbIndex dbid, Payload pl) - : EntryBase{txid, journal::Op::VAL, dbid}, payload{pl} { + : EntryBase{txid, journal::Op::COMMAND, dbid}, payload{pl} { } Payload payload; diff --git a/src/server/journal_test.cc b/src/server/journal_test.cc index 91e0faa5d..8f63b18a5 100644 --- a/src/server/journal_test.cc +++ b/src/server/journal_test.cc @@ -55,7 +55,7 @@ std::string ExtractPayload(journal::ParsedEntry& entry) { return out; } -std::string ExtractPayload(journal::EntryNew& entry) { +std::string ExtractPayload(journal::Entry& entry) { std::string out; EntryPayloadVisitor visitor{&out}; std::visit(visitor, entry.payload); @@ -95,14 +95,13 @@ TEST(Journal, WriteRead) { auto slice = [v = &slices](auto... ss) { return StoreSlice(v, ss...); }; auto list = [v = &lists](auto... ss) { return StoreList(v, ss...); }; - std::vector test_entries = { - {0, 0, make_pair("MSET", slice("A", "1", "B", "2"))}, - {1, 0, make_pair("MSET", slice("C", "3"))}, - {2, 0, list("DEL", "A", "B")}, - {3, 1, list("LPUSH", "l", "v1", "v2")}, - {4, 0, make_pair("MSET", slice("D", "4"))}, - {5, 1, list("DEL", "l1")}, - {6, 2, list("SET", "E", "2")}}; + std::vector test_entries = {{0, 0, make_pair("MSET", slice("A", "1", "B", "2"))}, + {1, 0, make_pair("MSET", slice("C", "3"))}, + {2, 0, list("DEL", "A", "B")}, + {3, 1, list("LPUSH", "l", "v1", "v2")}, + {4, 0, make_pair("MSET", slice("D", "4"))}, + {5, 1, list("DEL", "l1")}, + {6, 2, list("SET", "E", "2")}}; // Write all entries to string file. io::StringSink ss; @@ -113,12 +112,12 @@ TEST(Journal, WriteRead) { // Read them back. io::BytesSource bs{io::Buffer(ss.str())}; - JournalReader reader{&bs, 0}; + JournalReader reader{0}; for (unsigned i = 0; i < test_entries.size(); i++) { auto& expected = test_entries[i]; - auto res = reader.ReadEntry(); + auto res = reader.ReadEntry(&bs); ASSERT_TRUE(res.has_value()); ASSERT_EQ(expected.opcode, res->opcode); @@ -129,3 +128,5 @@ TEST(Journal, WriteRead) { } } // namespace dfly + +// TODO: extend test. diff --git a/src/server/json_family.cc b/src/server/json_family.cc index 4bd30c0e6..a11222b0d 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -44,13 +44,6 @@ inline OpStatus JsonReplaceVerifyNoOp() { return OpStatus::OK; } -inline void RecordJournal(const OpArgs& op_args, string_view key, const PrimeKey& pvalue) { - if (op_args.shard->journal()) { - journal::Entry entry{op_args.db_cntx.db_index, op_args.txid, key, pvalue}; - op_args.shard->journal()->RecordEntry(entry); - } -} - void SetJson(const OpArgs& op_args, string_view key, JsonType&& value) { auto& db_slice = op_args.shard->db_slice(); DbIndex db_index = op_args.db_cntx.db_index; @@ -58,7 +51,6 @@ void SetJson(const OpArgs& op_args, string_view key, JsonType&& value) { db_slice.PreUpdate(db_index, it_output); it_output->second.SetJson(std::move(value)); db_slice.PostUpdate(db_index, it_output, key); - RecordJournal(op_args, key, it_output->second); } string JsonTypeToName(const JsonType& val) { diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 2b18005c9..c71e5e495 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -545,8 +545,9 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx) return; } - if ((etl.gstate() == GlobalState::LOADING && (cid->opt_mask() & CO::LOADING) == 0) || - etl.gstate() == GlobalState::SHUTTING_DOWN) { + bool blocked_by_loading = !cntx->journal_emulated && etl.gstate() == GlobalState::LOADING && + (cid->opt_mask() & CO::LOADING) == 0; + if (blocked_by_loading || etl.gstate() == GlobalState::SHUTTING_DOWN) { string err = StrCat("Can not execute during ", GlobalStateName(etl.gstate())); (*cntx)->SendError(err); return; diff --git a/src/server/rdb_extensions.h b/src/server/rdb_extensions.h index ba9937908..a4b37abb3 100644 --- a/src/server/rdb_extensions.h +++ b/src/server/rdb_extensions.h @@ -13,3 +13,4 @@ const uint8_t RDB_OPCODE_FULLSYNC_END = 200; const uint8_t RDB_OPCODE_COMPRESSED_ZSTD_BLOB_START = 201; const uint8_t RDB_OPCODE_COMPRESSED_LZ4_BLOB_START = 202; const uint8_t RDB_OPCODE_COMPRESSED_BLOB_END = 203; +const uint8_t RDB_OPCODE_JOURNAL_BLOB = 210; diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index c1b64d3b2..77cacf892 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -28,6 +28,9 @@ extern "C" { #include "server/engine_shard_set.h" #include "server/error.h" #include "server/hset_family.h" +#include "server/journal/executor.h" +#include "server/journal/serializer.h" +#include "server/main_service.h" #include "server/rdb_extensions.h" #include "server/script_mgr.h" #include "server/serializer_commons.h" @@ -1639,7 +1642,8 @@ struct RdbLoader::ObjSettings { ObjSettings() = default; }; -RdbLoader::RdbLoader(ScriptMgr* script_mgr) : script_mgr_(script_mgr) { +RdbLoader::RdbLoader(Service* service) + : service_{service}, script_mgr_{service == nullptr ? nullptr : service->script_mgr()} { shard_buf_.reset(new ItemsBuf[shard_set->size()]); } @@ -1800,6 +1804,15 @@ error_code RdbLoader::Load(io::Source* src) { continue; } + if (type == RDB_OPCODE_JOURNAL_BLOB) { + // We should flush all changes on the current db before applying incremental changes. + for (unsigned i = 0; i < shard_set->size(); ++i) { + FlushShardAsync(i); + } + RETURN_ON_ERR(HandleJournalBlob(service_, cur_db_index_)); + continue; + } + if (!rdbIsObjectType(type)) { return RdbError(errc::invalid_rdb_type); } @@ -1916,7 +1929,7 @@ auto RdbLoaderBase::LoadLen(bool* is_encoded) -> io::Result { return res; } -void RdbLoaderBase::AlocateDecompressOnce(int op_type) { +void RdbLoaderBase::AllocateDecompressOnce(int op_type) { if (decompress_impl_) { return; } @@ -1930,7 +1943,7 @@ void RdbLoaderBase::AlocateDecompressOnce(int op_type) { } error_code RdbLoaderBase::HandleCompressedBlob(int op_type) { - AlocateDecompressOnce(op_type); + AllocateDecompressOnce(op_type); // Fetch uncompress blob string res; SET_OR_RETURN(FetchGenericString(), res); @@ -1951,6 +1964,32 @@ error_code RdbLoaderBase::HandleCompressedBlobFinish() { return kOk; } +error_code RdbLoaderBase::HandleJournalBlob(Service* service, DbIndex dbid) { + // Read the number of entries in the journal blob. + size_t num_entries; + bool _encoded; + SET_OR_RETURN(LoadLen(&_encoded), num_entries); + + // Read the journal blob. + string journal_blob; + SET_OR_RETURN(FetchGenericString(), journal_blob); + + io::BytesSource bs{io::Buffer(journal_blob)}; + journal_reader_.SetDb(dbid); + + // Parse and exectue in loop. + size_t done = 0; + JournalExecutor ex{service}; + while (done < num_entries) { + journal::ParsedEntry entry{}; + SET_OR_RETURN(journal_reader_.ReadEntry(&bs), entry); + ex.Execute(std::move(entry)); + done++; + } + + return std::error_code{}; +} + error_code RdbLoader::HandleAux() { /* AUX: generic string-string fields. Use to add state to RDB * which is backward compatible. Implementations of RDB loading diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index 501ace582..3b67e4ad7 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -15,12 +15,14 @@ extern "C" { #include "core/mpsc_intrusive_queue.h" #include "io/io.h" #include "server/common.h" +#include "server/journal/serializer.h" namespace dfly { class EngineShardSet; class ScriptMgr; class CompactObj; +class Service; class DecompressImpl; @@ -121,9 +123,12 @@ class RdbLoaderBase { ::io::Result ReadZSetZL(); ::io::Result ReadListQuicklist(int rdbtype); ::io::Result ReadStreams(); + std::error_code HandleCompressedBlob(int op_type); std::error_code HandleCompressedBlobFinish(); - void AlocateDecompressOnce(int op_type); + void AllocateDecompressOnce(int op_type); + + std::error_code HandleJournalBlob(Service* service, DbIndex dbid); static size_t StrLen(const RdbVariant& tset); @@ -140,11 +145,12 @@ class RdbLoaderBase { size_t source_limit_ = SIZE_MAX; base::PODArray compr_buf_; std::unique_ptr decompress_impl_; + JournalReader journal_reader_{0}; }; class RdbLoader : protected RdbLoaderBase { public: - explicit RdbLoader(ScriptMgr* script_mgr); + explicit RdbLoader(Service* service); ~RdbLoader(); @@ -196,16 +202,19 @@ class RdbLoader : protected RdbLoaderBase { using ItemsBuf = std::vector; struct ObjSettings; + std::error_code LoadKeyValPair(int type, ObjSettings* settings); void ResizeDb(size_t key_num, size_t expire_num); std::error_code HandleAux(); std::error_code VerifyChecksum(); - void FlushShardAsync(ShardId sid); + void FinishLoad(absl::Time start_time, size_t* keys_loaded); + void FlushShardAsync(ShardId sid); void LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib); + Service* service_; ScriptMgr* script_mgr_; std::unique_ptr shard_buf_; diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index d78f3b999..970d8564b 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -27,6 +27,7 @@ extern "C" { #include "base/logging.h" #include "server/engine_shard_set.h" #include "server/error.h" +#include "server/journal/serializer.h" #include "server/rdb_extensions.h" #include "server/serializer_commons.h" #include "server/snapshot.h" @@ -690,6 +691,19 @@ size_t RdbSerializer::SerializedLen() const { return mem_buf_.InputLen(); } +error_code RdbSerializer::WriteJournalEntries(absl::Span entries) { + // Write journal blob to string file. + io::StringSink ss{}; + JournalWriter writer{&ss}; + for (const auto& entry : entries) { + RETURN_ON_ERR(writer.Write(entry)); + } + + RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_JOURNAL_BLOB)); + RETURN_ON_ERR(SaveLen(entries.size())); + return SaveString(ss.str()); +} + error_code RdbSerializer::SaveString(string_view val) { /* Try integer encoding */ if (val.size() <= 11) { diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 55a775dd4..6ed716d03 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -16,6 +16,7 @@ extern "C" { #include "base/pod_array.h" #include "io/io.h" #include "server/common.h" +#include "server/journal/types.h" #include "server/table.h" typedef struct rax rax; @@ -142,9 +143,14 @@ class RdbSerializer { // for the dump command - thus it is public function std::error_code SaveValue(const PrimeValue& pv); - std::error_code SendFullSyncCut(io::Sink* s); size_t SerializedLen() const; + // Write journal entries as an embedded journal blob. + std::error_code WriteJournalEntries(absl::Span entries); + + // Send FULL_SYNC_CUT opcode to notify that all static data was sent. + std::error_code SendFullSyncCut(io::Sink* s); + private: std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len); std::error_code SaveObject(const PrimeValue& pv); diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 6a982670e..0c08b2a37 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -84,7 +84,7 @@ TEST_F(RdbTest, LoadEmpty) { TEST_F(RdbTest, LoadSmall6) { io::FileSource fs = GetSource("redis6_small.rdb"); - RdbLoader loader(service_->script_mgr()); + RdbLoader loader{service_.get()}; // must run in proactor thread in order to avoid polluting the serverstate // in the main, testing thread. @@ -121,7 +121,7 @@ TEST_F(RdbTest, LoadSmall6) { TEST_F(RdbTest, Stream) { io::FileSource fs = GetSource("redis6_stream.rdb"); - RdbLoader loader(service_->script_mgr()); + RdbLoader loader{service_.get()}; // must run in proactor thread in order to avoid polluting the serverstate // in the main, testing thread. diff --git a/src/server/replica.cc b/src/server/replica.cc index 62fbffc62..50a269d26 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -18,6 +18,8 @@ extern "C" { #include "facade/dragonfly_connection.h" #include "facade/redis_parser.h" #include "server/error.h" +#include "server/journal/executor.h" +#include "server/journal/serializer.h" #include "server/main_service.h" #include "server/rdb_load.h" #include "util/proactor_base.h" @@ -643,7 +645,7 @@ void Replica::FullSyncDflyFb(string eof_token, fibers_ext::BlockingCounter bc, C SocketSource ss{sock_.get()}; io::PrefixSource ps{leftover_buf_->InputBuffer(), &ss}; - RdbLoader loader(NULL); + RdbLoader loader(&service_); loader.SetFullSyncCutCb([bc, ran = false]() mutable { if (!ran) { bc.Dec(); @@ -686,37 +688,29 @@ void Replica::FullSyncDflyFb(string eof_token, fibers_ext::BlockingCounter bc, C } void Replica::StableSyncDflyFb(Context* cntx) { - base::IoBuf io_buf(16_KB); - parser_.reset(new RedisParser); - - // Check leftover from stable state. + // Check leftover from full sync. + io::Bytes prefix{}; if (leftover_buf_ && leftover_buf_->InputLen() > 0) { - size_t len = leftover_buf_->InputLen(); - leftover_buf_->ReadAndConsume(len, io_buf.AppendBuffer().data()); - io_buf.CommitWrite(len); - leftover_buf_.reset(); + prefix = leftover_buf_->InputBuffer(); } - string ack_cmd; + SocketSource ss{sock_.get()}; + io::PrefixSource ps{prefix, &ss}; + JournalReader reader{0}; + JournalExecutor executor{&service_}; while (!cntx->IsCancelled()) { - io::MutableBytes buf = io_buf.AppendBuffer(); - io::Result size_res = sock_->Recv(buf); - if (!size_res) { - cntx->Error(size_res.error()); + auto res = reader.ReadEntry(&ps); + if (!res) { + cntx->Error(res.error(), "Journal format error"); return; } + executor.Execute(std::move(res.value())); + last_io_time_ = sock_->proactor()->GetMonotonicTimeNs(); - - io_buf.CommitWrite(*size_res); - repl_offs_ += *size_res; - - if (auto ec = ParseAndExecute(&io_buf); ec) { - cntx->Error(ec); - return; - } } + return; } error_code Replica::ReadRespReply(base::IoBuf* io_buf, uint32_t* consumed) { diff --git a/src/server/serializer_commons.cc b/src/server/serializer_commons.cc index bf3f0b032..15f4e8b91 100644 --- a/src/server/serializer_commons.cc +++ b/src/server/serializer_commons.cc @@ -47,22 +47,31 @@ unsigned WritePackedUInt(uint64_t value, uint8_t* buf) { io::Result ReadPackedUInt(io::Source* source) { uint8_t buf[10]; + size_t read = 0; uint8_t first = 0; - source->Read(io::MutableBytes{&first, 1}); + SET_OR_UNEXPECT(source->Read(io::MutableBytes{&first, 1}), read); + if (read != 1) + return make_unexpected(make_error_code(errc::bad_message)); int type = (first & 0xC0) >> 6; switch (type) { case RDB_6BITLEN: return first & 0x3F; case RDB_14BITLEN: - source->Read(io::MutableBytes{buf, 1}); + SET_OR_UNEXPECT(source->Read(io::MutableBytes{buf, 1}), read); + if (read != 1) + return make_unexpected(make_error_code(errc::bad_message)); return ((first & 0x3F) << 8) | buf[0]; case RDB_32BITLEN: - source->Read(io::MutableBytes{buf, 4}); + SET_OR_UNEXPECT(source->Read(io::MutableBytes{buf, 4}), read); + if (read != 4) + return make_unexpected(make_error_code(errc::bad_message)); return absl::big_endian::Load32(buf); case RDB_64BITLEN: - source->Read(io::MutableBytes{buf, 8}); + SET_OR_UNEXPECT(source->Read(io::MutableBytes{buf, 8}), read); + if (read != 8) + return make_unexpected(make_error_code(errc::bad_message)); return absl::big_endian::Load64(buf); default: return make_unexpected(make_error_code(errc::illegal_byte_sequence)); diff --git a/src/server/server_family.cc b/src/server/server_family.cc index d746df15f..3d95aeb75 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -570,7 +570,7 @@ error_code ServerFamily::LoadRdb(const std::string& rdb_file) { if (res) { io::FileSource fs(*res); - RdbLoader loader(script_mgr()); + RdbLoader loader{&service_}; ec = loader.Load(&fs); if (!ec) { LOG(INFO) << "Done loading RDB, keys loaded: " << loader.keys_loaded(); diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 5bbc6436f..04958c809 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -72,7 +72,7 @@ void SliceSnapshot::Stop() { Join(); if (journal_cb_id_) { - db_slice_->shard_owner()->journal()->Unregister(journal_cb_id_); + db_slice_->shard_owner()->journal()->UnregisterOnChange(journal_cb_id_); } FlushDefaultBuffer(true); @@ -82,7 +82,7 @@ void SliceSnapshot::Stop() { void SliceSnapshot::Cancel() { CloseRecordChannel(); if (journal_cb_id_) { - db_slice_->shard_owner()->journal()->Unregister(journal_cb_id_); + db_slice_->shard_owner()->journal()->UnregisterOnChange(journal_cb_id_); journal_cb_id_ = 0; } } @@ -272,12 +272,14 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) } } +// For any key any journal entry must arrive at the replica strictly after its first original rdb +// value. This is guaranteed by the fact that OnJournalEntry runs always after OnDbChange, and +// no database switch can be performed between those two calls, because they are part of one +// transaction. void SliceSnapshot::OnJournalEntry(const journal::Entry& entry) { - CHECK(journal::Op::VAL == entry.opcode); - optional tmp_serializer; RdbSerializer* serializer_ptr = default_serializer_.get(); - if (entry.db_ind != current_db_) { + if (entry.dbid != current_db_) { CompressionMode compression_mode = compression_mode_ == CompressionMode::NONE ? CompressionMode::NONE : CompressionMode::SINGLE_ENTRY; @@ -285,11 +287,15 @@ void SliceSnapshot::OnJournalEntry(const journal::Entry& entry) { serializer_ptr = &*tmp_serializer; } - PrimeKey pkey{entry.key}; - SerializeEntry(entry.db_ind, pkey, *entry.pval_ptr, entry.expire_ms, serializer_ptr); + CHECK(entry.opcode == journal::Op::COMMAND); + serializer_ptr->WriteJournalEntries(absl::Span{&entry, 1}); if (tmp_serializer) { - FlushTmpSerializer(entry.db_ind, &*tmp_serializer); + FlushTmpSerializer(entry.dbid, &*tmp_serializer); + } else { + // This is the only place that flushes in streaming mode + // once the iterate buckets fiber finished. + FlushDefaultBuffer(false); } } diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 0f7344674..711ccc5bb 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -64,13 +64,6 @@ string_view GetSlice(EngineShard* shard, const PrimeValue& pv, string* tmp) { return pv.GetSlice(tmp); } -inline void RecordJournal(const OpArgs& op_args, string_view key, const PrimeKey& pvalue) { - if (op_args.shard->journal()) { - journal::Entry entry{op_args.db_cntx.db_index, op_args.txid, key, pvalue}; - op_args.shard->journal()->RecordEntry(entry); - } -} - OpResult OpSetRange(const OpArgs& op_args, string_view key, size_t start, string_view value) { auto& db_slice = op_args.shard->db_slice(); @@ -105,7 +98,6 @@ OpResult OpSetRange(const OpArgs& op_args, string_view key, size_t sta memcpy(s.data() + start, value.data(), value.size()); it->second.SetString(s); db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !added); - RecordJournal(op_args, key, it->second); return it->second.Size(); } @@ -156,7 +148,6 @@ size_t ExtendExisting(const OpArgs& op_args, PrimeIterator it, string_view key, db_slice.PreUpdate(op_args.db_cntx.db_index, it); it->second.SetString(new_val); db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, true); - RecordJournal(op_args, key, it->second); return new_val.size(); } @@ -170,7 +161,6 @@ OpResult ExtendOrSet(const OpArgs& op_args, string_view key, string_vi if (inserted) { it->second.SetString(val); db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, false); - RecordJournal(op_args, key, it->second); return val.size(); } @@ -239,7 +229,6 @@ OpResult OpIncrFloat(const OpArgs& op_args, string_view key, double val) char* str = RedisReplyBuilder::FormatDouble(val, buf, sizeof(buf)); it->second.SetString(str); db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, false); - RecordJournal(op_args, key, it->second); return val; } @@ -271,7 +260,6 @@ OpResult OpIncrFloat(const OpArgs& op_args, string_view key, double val) db_slice.PreUpdate(op_args.db_cntx.db_index, it); it->second.SetString(str); db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, true); - RecordJournal(op_args, key, it->second); return base; } @@ -298,8 +286,6 @@ OpResult OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr, return OpStatus::OUT_OF_MEMORY; } - RecordJournal(op_args, key, it->second); - return incr; } @@ -323,7 +309,6 @@ OpResult OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr, db_slice.PreUpdate(op_args.db_cntx.db_index, it); it->second.SetInt(new_val); db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); - RecordJournal(op_args, key, it->second); return new_val; } @@ -431,7 +416,6 @@ OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value shard->tiered_storage()->UnloadItem(op_args_.db_cntx.db_index, it); } - RecordJournal(op_args_, key, it->second); return OpStatus::OK; } @@ -486,8 +470,6 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt } db_slice.PostUpdate(op_args_.db_cntx.db_index, it, key); - RecordJournal(op_args_, key, it->second); - return OpStatus::OK; } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index c4283109a..f7b7f8842 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -80,6 +80,7 @@ Transaction::~Transaction() { OpStatus Transaction::InitByArgs(DbIndex index, CmdArgList args) { db_index_ = index; + cmd_with_full_args_ = args; if (IsGlobal()) { unique_shard_cnt_ = shard_set->size(); @@ -372,6 +373,9 @@ bool Transaction::RunInShard(EngineShard* shard) { /*************************************************************************/ + if (!was_suspended && should_release) // Check last hop & non suspended. + LogJournalOnShard(shard); + // at least the coordinator thread owns the reference. DCHECK_GE(use_count(), 1u); @@ -771,6 +775,8 @@ void Transaction::RunQuickie(EngineShard* shard) { LOG(FATAL) << "Unexpected exception " << e.what(); } + LogJournalOnShard(shard); + sd.local_mask &= ~ARMED; cb_ = nullptr; // We can do it because only a single shard runs the callback. } @@ -1194,6 +1200,30 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) { return false; } +void Transaction::LogJournalOnShard(EngineShard* shard) { + // TODO: For now, we ignore non shard coordination. + if (shard == nullptr) + return; + + if ((cid_->opt_mask() & CO::WRITE) == 0) + return; + + auto journal = shard->journal(); + if (journal == nullptr) + return; + + // TODO: Handle complex commands like LMPOP correctly once they are implemented. + journal::Entry::Payload entry_payload; + if (unique_shard_cnt_ == 1 || args_.empty()) { + CHECK(!cmd_with_full_args_.empty()); + entry_payload = cmd_with_full_args_; + } else { + entry_payload = + make_pair(facade::ToSV(cmd_with_full_args_.front()), ShardArgsInShard(shard->shard_id())); + } + journal->RecordEntry(journal::Entry{txid_, db_index_, entry_payload}); +} + void Transaction::BreakOnShutdown() { if (coordinator_state_ & COORD_BLOCKED) { coordinator_state_ |= COORD_CANCELLED; diff --git a/src/server/transaction.h b/src/server/transaction.h index d43b35727..6942a17ea 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -238,6 +238,10 @@ class Transaction { return use_count_.load(std::memory_order_relaxed); } + // If needed, notify the jounral of the executed command on the given shard. + // Should be called immediately after the last phase (hop). + void LogJournalOnShard(EngineShard* shard); + struct PerShardData { uint32_t arg_start = 0; // Indices into args_ array. uint16_t arg_count = 0; @@ -278,9 +282,12 @@ class Transaction { // scheduled transaction is accessed between operations as well. absl::InlinedVector shard_data_; // length = shard_count - //! Stores arguments of the transaction (i.e. keys + values) partitioned by shards. + // Stores arguments of the transaction (i.e. keys + values) partitioned by shards. absl::InlinedVector args_; + // Stores the full undivided command. + CmdArgList cmd_with_full_args_; + // Reverse argument mapping. Allows to reconstruct responses according to the original order of // keys. std::vector reverse_index_;