refactor(SerializerBase): Move some logic from RdbSerializer to SerializerBase (#2373)

* refactor(SerializerBase): Move some logic from RdbSerializer to SerializerBase

Specifically SendFullSyncCut and WriteJournalEntry

* gh review
This commit is contained in:
Shahar Mike 2024-01-07 07:48:29 +02:00 committed by GitHub
parent 13c2e672b0
commit 2d46a584a3
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 30 additions and 28 deletions

View file

@ -282,10 +282,10 @@ io::Result<io::Bytes> Lz4Compressor::Compress(io::Bytes data) {
} }
SerializerBase::SerializerBase(CompressionMode compression_mode) SerializerBase::SerializerBase(CompressionMode compression_mode)
: compression_mode_(compression_mode), mem_buf_{4_KB} { : compression_mode_(compression_mode), mem_buf_{4_KB}, tmp_buf_(nullptr) {
} }
RdbSerializer::RdbSerializer(CompressionMode compression_mode)
: SerializerBase(compression_mode), tmp_buf_(nullptr) { RdbSerializer::RdbSerializer(CompressionMode compression_mode) : SerializerBase(compression_mode) {
} }
RdbSerializer::~RdbSerializer() { RdbSerializer::~RdbSerializer() {
@ -755,7 +755,7 @@ error_code RdbSerializer::SendJournalOffset(uint64_t journal_offset) {
return WriteRaw(buf); return WriteRaw(buf);
} }
error_code RdbSerializer::SendFullSyncCut() { error_code SerializerBase::SendFullSyncCut() {
VLOG(2) << "SendFullSyncCut"; VLOG(2) << "SendFullSyncCut";
RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_FULLSYNC_END)); RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_FULLSYNC_END));
@ -767,6 +767,10 @@ error_code RdbSerializer::SendFullSyncCut() {
return WriteRaw(buf); return WriteRaw(buf);
} }
std::error_code SerializerBase::WriteOpcode(uint8_t opcode) {
return WriteRaw(::io::Bytes{&opcode, 1});
}
size_t SerializerBase::GetTotalBufferCapacity() const { size_t SerializerBase::GetTotalBufferCapacity() const {
return mem_buf_.Capacity(); return mem_buf_.Capacity();
} }
@ -879,7 +883,7 @@ io::Bytes SerializerBase::PrepareFlush() {
return mem_buf_.InputBuffer(); return mem_buf_.InputBuffer();
} }
error_code RdbSerializer::WriteJournalEntry(std::string_view serialized_entry) { error_code SerializerBase::WriteJournalEntry(std::string_view serialized_entry) {
VLOG(2) << "WriteJournalEntry"; VLOG(2) << "WriteJournalEntry";
RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_JOURNAL_BLOB)); RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_JOURNAL_BLOB));
RETURN_ON_ERR(SaveLen(1)); RETURN_ON_ERR(SaveLen(1));
@ -887,7 +891,7 @@ error_code RdbSerializer::WriteJournalEntry(std::string_view serialized_entry) {
return error_code{}; return error_code{};
} }
error_code RdbSerializer::SaveString(string_view val) { error_code SerializerBase::SaveString(string_view val) {
/* Try integer encoding */ /* Try integer encoding */
if (val.size() <= 11) { if (val.size() <= 11) {
uint8_t buf[16]; uint8_t buf[16];
@ -927,13 +931,13 @@ error_code RdbSerializer::SaveString(string_view val) {
return error_code{}; return error_code{};
} }
error_code RdbSerializer::SaveLen(size_t len) { error_code SerializerBase::SaveLen(size_t len) {
uint8_t buf[16]; uint8_t buf[16];
unsigned enclen = WritePackedUInt(len, buf); unsigned enclen = WritePackedUInt(len, buf);
return WriteRaw(Bytes{buf, enclen}); return WriteRaw(Bytes{buf, enclen});
} }
error_code RdbSerializer::SaveLzfBlob(const io::Bytes& src, size_t uncompressed_len) { error_code SerializerBase::SaveLzfBlob(const io::Bytes& src, size_t uncompressed_len) {
/* Data compressed! Let's save it on disk */ /* Data compressed! Let's save it on disk */
uint8_t opcode = (RDB_ENCVAL << 6) | RDB_ENC_LZF; uint8_t opcode = (RDB_ENCVAL << 6) | RDB_ENC_LZF;
RETURN_ON_ERR(WriteOpcode(opcode)); RETURN_ON_ERR(WriteOpcode(opcode));

View file

@ -149,6 +149,20 @@ class SerializerBase {
std::error_code WriteRaw(const ::io::Bytes& buf); std::error_code WriteRaw(const ::io::Bytes& buf);
// Write journal entry as an embedded journal blob.
std::error_code WriteJournalEntry(std::string_view entry);
// Send FULL_SYNC_CUT opcode to notify that all static data was sent.
std::error_code SendFullSyncCut();
std::error_code WriteOpcode(uint8_t opcode);
std::error_code SaveLen(size_t len);
std::error_code SaveString(std::string_view val);
std::error_code SaveString(const uint8_t* buf, size_t len) {
return SaveString(io::View(io::Bytes{buf, len}));
}
protected: protected:
// Prepare internal buffer for flush. Compress it. // Prepare internal buffer for flush. Compress it.
io::Bytes PrepareFlush(); io::Bytes PrepareFlush();
@ -157,6 +171,8 @@ class SerializerBase {
void CompressBlob(); void CompressBlob();
void AllocateCompressorOnce(); void AllocateCompressorOnce();
std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len);
CompressionMode compression_mode_; CompressionMode compression_mode_;
base::IoBuf mem_buf_; base::IoBuf mem_buf_;
std::unique_ptr<CompressorImpl> compressor_impl_; std::unique_ptr<CompressorImpl> compressor_impl_;
@ -170,6 +186,8 @@ class SerializerBase {
uint32_t compressed_blobs = 0; uint32_t compressed_blobs = 0;
}; };
std::optional<CompressionStats> compression_stats_; std::optional<CompressionStats> compression_stats_;
base::PODArray<uint8_t> tmp_buf_;
std::unique_ptr<LZF_HSLOT[]> lzf_;
}; };
class RdbSerializer : public SerializerBase { class RdbSerializer : public SerializerBase {
@ -190,32 +208,15 @@ class RdbSerializer : public SerializerBase {
io::Result<uint8_t> SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms, io::Result<uint8_t> SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms,
DbIndex dbid); DbIndex dbid);
std::error_code SaveLen(size_t len);
std::error_code SaveString(std::string_view val);
std::error_code SaveString(const uint8_t* buf, size_t len) {
return SaveString(io::View(io::Bytes{buf, len}));
}
// This would work for either string or an object. // This would work for either string or an object.
// The arg pv is taken from it->second if accessing // The arg pv is taken from it->second if accessing
// this by finding the key. This function is used // this by finding the key. This function is used
// for the dump command - thus it is public function // for the dump command - thus it is public function
std::error_code SaveValue(const PrimeValue& pv); std::error_code SaveValue(const PrimeValue& pv);
std::error_code WriteOpcode(uint8_t opcode) {
return WriteRaw(::io::Bytes{&opcode, 1});
}
// Write journal entry as an embedded journal blob.
std::error_code WriteJournalEntry(std::string_view entry);
std::error_code SendJournalOffset(uint64_t journal_offset); std::error_code SendJournalOffset(uint64_t journal_offset);
// Send FULL_SYNC_CUT opcode to notify that all static data was sent.
std::error_code SendFullSyncCut();
private: private:
std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len);
std::error_code SaveObject(const PrimeValue& pv); std::error_code SaveObject(const PrimeValue& pv);
std::error_code SaveListObject(const robj* obj); std::error_code SaveListObject(const robj* obj);
std::error_code SaveSetObject(const PrimeValue& pv); std::error_code SaveSetObject(const PrimeValue& pv);
@ -231,10 +232,7 @@ class RdbSerializer : public SerializerBase {
std::error_code SaveStreamConsumers(streamCG* cg); std::error_code SaveStreamConsumers(streamCG* cg);
std::string tmp_str_; std::string tmp_str_;
base::PODArray<uint8_t> tmp_buf_;
DbIndex last_entry_db_index_ = kInvalidDbId; DbIndex last_entry_db_index_ = kInvalidDbId;
std::unique_ptr<LZF_HSLOT[]> lzf_;
}; };
// Serializes CompactObj as RESTORE commands. // Serializes CompactObj as RESTORE commands.