From 443ab9587d9360d76a43b70a6f4eeded868273b0 Mon Sep 17 00:00:00 2001 From: Vladislav Date: Sat, 31 Dec 2022 13:32:00 +0300 Subject: [PATCH] feat(server): Update helio, optimize and clean up rdb/snapshot (#625) --- helio | 2 +- src/server/journal/serializer.cc | 8 ++---- src/server/rdb_load.cc | 4 +++ src/server/rdb_load.h | 4 +-- src/server/rdb_save.cc | 32 ++++++++++++++-------- src/server/rdb_save.h | 32 ++++++++++++++-------- src/server/replica.cc | 16 +++++------ src/server/replica.h | 2 +- src/server/snapshot.cc | 28 +++++++------------ src/server/snapshot.h | 47 ++++++++++++++------------------ 10 files changed, 88 insertions(+), 87 deletions(-) diff --git a/helio b/helio index 020d5f85e..497ca6eed 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 020d5f85e78c1d8d38bb8a14b283193822160ab2 +Subproject commit 497ca6eed4769a8cc12aa06d67ce454081406857 diff --git a/src/server/journal/serializer.cc b/src/server/journal/serializer.cc index 789d6dcd8..b35ed1216 100644 --- a/src/server/journal/serializer.cc +++ b/src/server/journal/serializer.cc @@ -32,16 +32,12 @@ base::IoBuf& JournalWriter::Accumulated() { void JournalWriter::Write(uint64_t v) { uint8_t buf[10]; unsigned len = WritePackedUInt(v, buf); - buf_.EnsureCapacity(sizeof(buf)); - memcpy(buf_.AppendBuffer().data(), buf, len); - buf_.CommitWrite(len); + buf_.WriteAndCommit(buf, len); } void JournalWriter::Write(std::string_view sv) { Write(sv.size()); - buf_.EnsureCapacity(sv.size()); - memcpy(buf_.AppendBuffer().data(), sv.data(), sv.size()); - buf_.CommitWrite(sv.size()); + buf_.WriteAndCommit(sv.data(), sv.size()); } void JournalWriter::Write(CmdArgList args) { diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 20f0ff19b..30cedc473 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -1608,6 +1608,10 @@ template io::Result RdbLoaderBase::FetchInt() { return base::LE::LoadT>(buf); } +io::Result RdbLoaderBase::FetchType() { + return FetchInt(); +} + // -------------- RdbLoader ---------------------------- struct RdbLoader::ObjSettings { diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index 3b784e156..cfd763c89 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -90,9 +90,7 @@ class RdbLoaderBase { class OpaqueObjLoader; - ::io::Result FetchType() { - return FetchInt(); - } + io::Result FetchType(); template io::Result FetchInt(); diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 84521ab8b..b5497a5ac 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -643,9 +643,8 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) { return error_code{}; } -error_code RdbSerializer::SendFullSyncCut(io::Sink* s) { - RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_FULLSYNC_END)); - return FlushToSink(s); +error_code RdbSerializer::SendFullSyncCut() { + return WriteOpcode(RDB_OPCODE_FULLSYNC_END); } error_code RdbSerializer::WriteRaw(const io::Bytes& buf) { @@ -656,12 +655,10 @@ error_code RdbSerializer::WriteRaw(const io::Bytes& buf) { return error_code{}; } -error_code RdbSerializer::FlushToSink(io::Sink* s) { +io::Bytes RdbSerializer::Flush() { size_t sz = mem_buf_.InputLen(); if (sz == 0) - return error_code{}; - - DVLOG(2) << "FlushToSink " << sz << " bytes"; + return mem_buf_.InputBuffer(); if (compression_mode_ == CompressionMode::MULTY_ENTRY_ZSTD || compression_mode_ == CompressionMode::MULTY_ENTRY_LZ4) { @@ -670,10 +667,19 @@ error_code RdbSerializer::FlushToSink(io::Sink* s) { sz = mem_buf_.InputLen(); } - // interrupt point. - RETURN_ON_ERR(s->Write(mem_buf_.InputBuffer())); - mem_buf_.ConsumeInput(sz); + return mem_buf_.InputBuffer(); +} +error_code RdbSerializer::FlushToSink(io::Sink* s) { + auto bytes = Flush(); + if (bytes.empty()) + return error_code{}; + + DVLOG(2) << "FlushToSink " << bytes.size() << " bytes"; + + // interrupt point. + RETURN_ON_ERR(s->Write(bytes)); + mem_buf_.ConsumeInput(bytes.size()); return error_code{}; } @@ -681,6 +687,10 @@ size_t RdbSerializer::SerializedLen() const { return mem_buf_.InputLen(); } +void RdbSerializer::Clear() { + mem_buf_.Clear(); +} + error_code RdbSerializer::WriteJournalEntries(absl::Span entries) { for (const auto& entry : entries) { journal_writer_.Write(entry); @@ -1045,7 +1055,7 @@ error_code RdbSaver::SaveBody(const Cancellation* cll, RdbTypeFreqMap* freq_map) RETURN_ON_ERR(impl_->serializer()->FlushToSink(impl_->sink())); if (save_mode_ == SaveMode::SUMMARY) { - impl_->serializer()->SendFullSyncCut(impl_->sink()); + impl_->serializer()->SendFullSyncCut(); } else { VLOG(1) << "SaveBody , snapshots count: " << impl_->Size(); error_code io_error = impl_->ConsumeChannel(cll); diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 2fc62282b..15f6e0a24 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -118,9 +118,17 @@ class RdbSerializer { ~RdbSerializer(); - std::error_code WriteOpcode(uint8_t opcode) { - return WriteRaw(::io::Bytes{&opcode, 1}); - } + // Get access to internal buffer, compressed, if enabled. + io::Bytes Flush(); + + // Internal buffer size. Might shrink after flush due to compression. + size_t SerializedLen() const; + + // Flush internal buffer to sink. + std::error_code FlushToSink(io::Sink* s); + + // Clear internal buffer contents. + void Clear(); std::error_code SelectDb(uint32_t dbid); @@ -128,15 +136,12 @@ class RdbSerializer { // Returns the serialized rdb_type or the error. // expire_ms = 0 means no expiry. io::Result SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms); - std::error_code WriteRaw(const ::io::Bytes& buf); - std::error_code SaveString(std::string_view val); - std::error_code SaveString(const uint8_t* buf, size_t len) { - return SaveString(std::string_view{reinterpret_cast(buf), len}); - } - - std::error_code FlushToSink(io::Sink* s); std::error_code SaveLen(size_t len); + std::error_code SaveString(std::string_view val); + std::error_code SaveString(const uint8_t* buf, size_t len) { + return SaveString(io::View(io::Bytes{buf, len})); + } // This would work for either string or an object. // The arg pv is taken from it->second if accessing @@ -144,13 +149,16 @@ class RdbSerializer { // for the dump command - thus it is public function std::error_code SaveValue(const PrimeValue& pv); - size_t SerializedLen() const; + std::error_code WriteRaw(const ::io::Bytes& buf); + std::error_code WriteOpcode(uint8_t opcode) { + return WriteRaw(::io::Bytes{&opcode, 1}); + } // Write journal entries as an embedded journal blob. std::error_code WriteJournalEntries(absl::Span entries); // Send FULL_SYNC_CUT opcode to notify that all static data was sent. - std::error_code SendFullSyncCut(io::Sink* s); + std::error_code SendFullSyncCut(); private: std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len); diff --git a/src/server/replica.cc b/src/server/replica.cc index c85dd04ff..e815d3215 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -404,13 +404,13 @@ error_code Replica::InitiatePSync() { // TODO: handle gracefully... CHECK_EQ(0, memcmp(token->data(), buf, kRdbEofMarkSize)); - CHECK(chained.unused_prefix().empty()); + CHECK(chained.UnusedPrefix().empty()); } else { CHECK_EQ(0u, loader.Leftover().size()); CHECK_EQ(snapshot_size, loader.bytes_read()); } - CHECK(ps.unused_prefix().empty()); + CHECK(ps.UnusedPrefix().empty()); io_buf.ConsumeInput(io_buf.InputLen()); last_io_time_ = sock_thread->GetMonotonicTimeNs(); } @@ -653,9 +653,9 @@ error_code Replica::StartFullSyncFlow(fibers_ext::BlockingCounter sb, Context* c parser_.reset(new RedisParser{false}); // client mode - leftover_buf_.reset(new base::IoBuf(128)); + leftover_buf_.emplace(128); unsigned consumed = 0; - RETURN_ON_ERR(ReadRespReply(leftover_buf_.get(), &consumed)); // uses parser_ + RETURN_ON_ERR(ReadRespReply(&*leftover_buf_, &consumed)); // uses parser_ if (!CheckRespFirstTypes({RespExpr::STRING, RespExpr::STRING})) { LOG(ERROR) << "Bad FLOW response " << ToSV(leftover_buf_->InputBuffer()); @@ -728,12 +728,10 @@ void Replica::FullSyncDflyFb(string eof_token, fibers_ext::BlockingCounter bc, C } // Keep loader leftover. - io::Bytes unused = chained_tail.unused_prefix(); + io::Bytes unused = chained_tail.UnusedPrefix(); if (unused.size() > 0) { - leftover_buf_.reset(new base::IoBuf{unused.size()}); - auto mut_bytes = leftover_buf_->AppendBuffer(); - memcpy(mut_bytes.data(), unused.data(), unused.size()); - leftover_buf_->CommitWrite(unused.size()); + leftover_buf_.emplace(unused.size()); + leftover_buf_->WriteAndCommit(unused.data(), unused.size()); } else { leftover_buf_.reset(); } diff --git a/src/server/replica.h b/src/server/replica.h index 3b12fd803..9940fc74a 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -185,7 +185,7 @@ class Replica { // Guard operations where flows might be in a mixed state (transition/setup) ::boost::fibers::mutex flows_op_mu_; - std::unique_ptr leftover_buf_; + std::optional leftover_buf_; std::unique_ptr parser_; facade::RespVec resp_args_; facade::CmdArgVec cmd_str_args_; diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 04958c809..9d31c7929 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -51,7 +51,6 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll) { journal_cb_id_ = journal->RegisterOnChange(move(journal_cb)); } - default_buffer_.reset(new io::StringFile); default_serializer_.reset(new RdbSerializer(compression_mode_)); VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_; @@ -158,7 +157,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) { // TODO: investigate why a single byte gets stuck and does not arrive to replica for (unsigned i = 10; i > 1; i--) - CHECK(!default_serializer_->SendFullSyncCut(default_buffer_.get())); + CHECK(!default_serializer_->SendFullSyncCut()); FlushDefaultBuffer(true); VLOG(1) << "Exit SnapshotSerializer (serialized/side_saved/cbcalls): " << stats_.serialized << "/" @@ -216,7 +215,7 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite } if (tmp_serializer) { - FlushTmpSerializer(db_index, &*tmp_serializer); + PushBytesToChannel(db_index, tmp_serializer->Flush()); } return result; @@ -237,22 +236,21 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr ++type_freq_map_[*res]; } -void SliceSnapshot::PushFileToChannel(DbIndex db_index, io::StringFile* sfile) { - dest_->Push(GetDbRecord(db_index, std::move(sfile->val))); +void SliceSnapshot::PushBytesToChannel(DbIndex db_index, io::Bytes bytes) { + dest_->Push(GetDbRecord(db_index, std::string{io::View(bytes)})); } bool SliceSnapshot::FlushDefaultBuffer(bool force) { if (!force && default_serializer_->SerializedLen() < 4096) return false; - CHECK(!default_serializer_->FlushToSink(default_buffer_.get())); - - if (default_buffer_->val.empty()) + auto bytes = default_serializer_->Flush(); + if (bytes.empty()) return false; - VLOG(2) << "FlushDefaultBuffer " << default_buffer_->val.size() << " bytes"; - - PushFileToChannel(current_db_, default_buffer_.get()); + VLOG(2) << "FlushDefaultBuffer " << bytes.size() << " bytes"; + PushBytesToChannel(current_db_, bytes); + default_serializer_->Clear(); return true; } @@ -291,7 +289,7 @@ void SliceSnapshot::OnJournalEntry(const journal::Entry& entry) { serializer_ptr->WriteJournalEntries(absl::Span{&entry, 1}); if (tmp_serializer) { - FlushTmpSerializer(entry.dbid, &*tmp_serializer); + PushBytesToChannel(entry.dbid, tmp_serializer->Flush()); } else { // This is the only place that flushes in streaming mode // once the iterate buckets fiber finished. @@ -320,10 +318,4 @@ SliceSnapshot::DbRecord SliceSnapshot::GetDbRecord(DbIndex db_index, std::string return DbRecord{.db_index = db_index, .id = id, .value = std::move(value)}; } -void SliceSnapshot::FlushTmpSerializer(DbIndex db_index, RdbSerializer* serializer) { - io::StringFile sfile{}; - error_code ec = serializer->FlushToSink(&sfile); - CHECK(!ec && !sfile.val.empty()); - PushFileToChannel(db_index, &sfile); -} } // namespace dfly diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 1f2a45e66..f73c153e0 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -22,27 +22,27 @@ struct Entry; class RdbSerializer; -//┌────────────────┐ ┌─────────────┐ -//│IterateBucketsFb│ │ OnDbChange │ -//└──────┬─────────┘ └─┬───────────┘ -// │ │ OnDbChange forces whole bucket to be -// ▼ ▼ serialized if iterate didn't reach it yet -//┌──────────────────────────┐ -//│ SerializeBucket │ Both might fall back to a temporary serializer -//└────────────┬─────────────┘ if default is used on another db index -// │ -// | Channel is left open in journal streaming mode -// ▼ -//┌──────────────────────────┐ ┌─────────────────────────┐ -//│ SerializeEntry │ ◄────────┤ OnJournalEntry │ -//└─────────────┬────────────┘ └─────────────────────────┘ +// ┌────────────────┐ ┌─────────────┐ +// │IterateBucketsFb│ │ OnDbChange │ +// └──────┬─────────┘ └─┬───────────┘ +// │ │ OnDbChange forces whole bucket to be +// ▼ ▼ serialized if iterate didn't reach it yet +// ┌──────────────────────────┐ +// │ SerializeBucket │ Both might fall back to a temporary serializer +// └────────────┬─────────────┘ if default is used on another db index // │ -// PushFileToChannel Default buffer gets flushed on iteration, -// │ temporary on destruction +// | Channel is left open in journal streaming mode // ▼ -//┌──────────────────────────────┐ -//│ dest->Push(buffer) │ -//└──────────────────────────────┘ +// ┌──────────────────────────┐ ┌─────────────────────────┐ +// │ SerializeEntry │ ◄────────┤ OnJournalEntry │ +// └─────────────┬────────────┘ └─────────────────────────┘ +// │ +// PushBytesToChannel Default buffer gets flushed on iteration, +// │ temporary on destruction +// ▼ +// ┌──────────────────────────────┐ +// │ dest->Push(buffer) │ +// └──────────────────────────────┘ // SliceSnapshot is used for iterating over a shard at a specified point-in-time // and submitting all values to an output channel. @@ -95,8 +95,8 @@ class SliceSnapshot { void SerializeEntry(DbIndex db_index, const PrimeKey& pk, const PrimeValue& pv, std::optional expire, RdbSerializer* serializer); - // Push StringFile buffer to channel. - void PushFileToChannel(DbIndex db_index, io::StringFile* sfile); + // Push byte slice to channel. + void PushBytesToChannel(DbIndex db_index, io::Bytes bytes); // DbChange listener void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req); @@ -115,9 +115,6 @@ class SliceSnapshot { // Convert value into DbRecord. DbRecord GetDbRecord(DbIndex db_index, std::string value); - // Flush internals of a temporary serializer. - void FlushTmpSerializer(DbIndex db_index, RdbSerializer* serializer); - public: uint64_t snapshot_version() const { return snapshot_version_; @@ -144,8 +141,6 @@ class SliceSnapshot { DbIndex current_db_; - // TODO : drop default_buffer from this class, we dont realy need it. - std::unique_ptr default_buffer_; // filled by default_serializer_ std::unique_ptr default_serializer_; ::boost::fibers::mutex mu_;