feat(server): Buffered journal serializers (#619)

This commit is contained in:
Vladislav 2022-12-30 16:18:37 +03:00 committed by GitHub
parent f204f1c670
commit 7788600c9b
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 240 additions and 189 deletions

View file

@ -290,7 +290,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
FlowInfo* flow = &replica_ptr->flows[index]; FlowInfo* flow = &replica_ptr->flows[index];
StopFullSyncInThread(flow, shard); StopFullSyncInThread(flow, shard);
status = StartStableSyncInThread(flow, shard); status = StartStableSyncInThread(flow, &replica_ptr->cntx, shard);
return OpStatus::OK; return OpStatus::OK;
}; };
shard_set->pool()->AwaitFiberOnAll(std::move(cb)); shard_set->pool()->AwaitFiberOnAll(std::move(cb));
@ -350,18 +350,23 @@ void DflyCmd::StopFullSyncInThread(FlowInfo* flow, EngineShard* shard) {
flow->saver.reset(); flow->saver.reset();
} }
OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, EngineShard* shard) { OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard) {
// Register journal listener and cleanup. // Register journal listener and cleanup.
uint32_t cb_id = 0; uint32_t cb_id = 0;
JournalWriter* writer = nullptr;
if (shard != nullptr) { if (shard != nullptr) {
JournalWriter writer{flow->conn->socket()}; writer = new JournalWriter{};
auto journal_cb = [flow, writer = std::move(writer)](const journal::Entry& je) mutable { auto journal_cb = [flow, cntx, writer](const journal::Entry& je) mutable {
writer.Write(je); writer->Write(je);
if (auto ec = writer->Flush(flow->conn->socket()); ec)
cntx->ReportError(ec);
}; };
cb_id = sf_->journal()->RegisterOnChange(std::move(journal_cb)); cb_id = sf_->journal()->RegisterOnChange(std::move(journal_cb));
} }
flow->cleanup = [flow, this, cb_id]() { flow->cleanup = [this, cb_id, writer, flow]() {
if (writer)
delete writer;
if (cb_id) if (cb_id)
sf_->journal()->UnregisterOnChange(cb_id); sf_->journal()->UnregisterOnChange(cb_id);
flow->TryShutdownSocket(); flow->TryShutdownSocket();

View file

@ -159,7 +159,7 @@ class DflyCmd {
void StopFullSyncInThread(FlowInfo* flow, EngineShard* shard); void StopFullSyncInThread(FlowInfo* flow, EngineShard* shard);
// Start stable sync in thread. Called for each flow. // Start stable sync in thread. Called for each flow.
facade::OpStatus StartStableSyncInThread(FlowInfo* flow, EngineShard* shard); facade::OpStatus StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineShard* shard);
// Fiber that runs full sync for each flow. // Fiber that runs full sync for each flow.
void FullSyncFb(FlowInfo* flow, Context* cntx); void FullSyncFb(FlowInfo* flow, Context* cntx);

View file

@ -18,154 +18,180 @@ using namespace std;
namespace dfly { namespace dfly {
JournalWriter::JournalWriter(io::Sink* sink, std::optional<DbIndex> dbid) std::error_code JournalWriter::Flush(io::Sink* sink) {
: sink_{sink}, cur_dbid_{dbid} { if (auto ec = sink->Write(buf_.InputBuffer()); ec)
return ec;
buf_.Clear();
return {};
} }
error_code JournalWriter::Write(uint64_t v) { base::IoBuf& JournalWriter::Accumulated() {
return buf_;
}
void JournalWriter::Write(uint64_t v) {
uint8_t buf[10]; uint8_t buf[10];
unsigned len = WritePackedUInt(v, buf); unsigned len = WritePackedUInt(v, buf);
return sink_->Write(io::Bytes{buf, len}); buf_.EnsureCapacity(sizeof(buf));
memcpy(buf_.AppendBuffer().data(), buf, len);
buf_.CommitWrite(len);
} }
error_code JournalWriter::Write(std::string_view sv) { void JournalWriter::Write(std::string_view sv) {
RETURN_ON_ERR(Write(sv.size())); Write(sv.size());
return sink_->Write(io::Buffer(sv)); buf_.EnsureCapacity(sv.size());
memcpy(buf_.AppendBuffer().data(), sv.data(), sv.size());
buf_.CommitWrite(sv.size());
} }
error_code JournalWriter::Write(CmdArgList args) { void JournalWriter::Write(CmdArgList args) {
RETURN_ON_ERR(Write(args.size())); Write(args.size());
for (auto v : args) for (auto v : args)
RETURN_ON_ERR(Write(facade::ToSV(v))); Write(facade::ToSV(v));
return std::error_code{};
} }
error_code JournalWriter::Write(std::pair<std::string_view, ArgSlice> args) { void JournalWriter::Write(std::pair<std::string_view, ArgSlice> args) {
auto [cmd, tail_args] = args; auto [cmd, tail_args] = args;
RETURN_ON_ERR(Write(1 + tail_args.size())); Write(1 + tail_args.size());
RETURN_ON_ERR(Write(cmd)); Write(cmd);
for (auto v : tail_args) for (auto v : tail_args)
RETURN_ON_ERR(Write(v)); Write(v);
return std::error_code{};
} }
error_code JournalWriter::Write(std::monostate) { void JournalWriter::Write(std::monostate) {
return std::error_code{};
} }
error_code JournalWriter::Write(const journal::Entry& entry) { void JournalWriter::Write(const journal::Entry& entry) {
// Check if entry has a new db index and we need to emit a SELECT entry. // Check if entry has a new db index and we need to emit a SELECT entry.
if (entry.opcode != journal::Op::SELECT && (!cur_dbid_ || entry.dbid != *cur_dbid_)) { if (entry.opcode != journal::Op::SELECT && (!cur_dbid_ || entry.dbid != *cur_dbid_)) {
RETURN_ON_ERR(Write(journal::Entry{journal::Op::SELECT, entry.dbid})); Write(journal::Entry{journal::Op::SELECT, entry.dbid});
cur_dbid_ = entry.dbid; cur_dbid_ = entry.dbid;
} }
RETURN_ON_ERR(Write(uint8_t(entry.opcode))); Write(uint8_t(entry.opcode));
switch (entry.opcode) { switch (entry.opcode) {
case journal::Op::SELECT: case journal::Op::SELECT:
return Write(entry.dbid); return Write(entry.dbid);
case journal::Op::COMMAND: case journal::Op::COMMAND:
RETURN_ON_ERR(Write(entry.txid)); Write(entry.txid);
RETURN_ON_ERR(Write(entry.shard_cnt)); Write(entry.shard_cnt);
return std::visit([this](const auto& payload) { return Write(payload); }, entry.payload); return std::visit([this](const auto& payload) { return Write(payload); }, entry.payload);
default: default:
break; break;
}; };
return std::error_code{};
} }
JournalReader::JournalReader(DbIndex dbid) : buf_{}, dbid_{dbid} { JournalReader::JournalReader(io::Source* source, DbIndex dbid)
: str_buf_{}, source_{source}, buf_{4096}, dbid_{dbid} {
} }
void JournalReader::SetDb(DbIndex dbid) { void JournalReader::SetDb(DbIndex dbid) {
dbid_ = dbid; dbid_ = dbid;
} }
template <typename UT> io::Result<UT> ReadPackedUIntTyped(io::Source* source) { void JournalReader::SetSource(io::Source* source) {
uint64_t v; CHECK_EQ(buf_.InputLen(), 0ULL);
SET_OR_UNEXPECT(ReadPackedUInt(source), v); source_ = source;
if (v > std::numeric_limits<UT>::max())
return make_unexpected(make_error_code(errc::result_out_of_range));
return static_cast<UT>(v);
} }
io::Result<uint8_t> JournalReader::ReadU8(io::Source* source) { std::error_code JournalReader::EnsureRead(size_t num) {
return ReadPackedUIntTyped<uint8_t>(source); // Check if we already have enough.
} if (buf_.InputLen() >= num)
return {};
io::Result<uint16_t> JournalReader::ReadU16(io::Source* source) { uint64_t remainder = num - buf_.InputLen();
return ReadPackedUIntTyped<uint16_t>(source); buf_.EnsureCapacity(remainder);
}
io::Result<uint32_t> JournalReader::ReadU32(io::Source* source) { // Try reading at least how much we need, but possibly more
return ReadPackedUIntTyped<uint32_t>(source); uint64_t read;
} SET_OR_RETURN(source_->ReadAtLeast(buf_.AppendBuffer(), remainder), read);
CHECK(read >= remainder);
io::Result<uint64_t> JournalReader::ReadU64(io::Source* source) {
return ReadPackedUIntTyped<uint64_t>(source);
}
io::Result<size_t> JournalReader::ReadString(io::Source* source) {
size_t size = 0;
SET_OR_UNEXPECT(ReadU64(source), size);
buf_.EnsureCapacity(size);
auto dest = buf_.AppendBuffer().first(size);
uint64_t read = 0;
SET_OR_UNEXPECT(source->Read(dest), read);
buf_.CommitWrite(read); buf_.CommitWrite(read);
if (read != size) return {};
return make_unexpected(std::make_error_code(std::errc::message_size)); }
template <typename UT> io::Result<UT> JournalReader::ReadUInt() {
// Determine type and number of following bytes.
if (auto ec = EnsureRead(1); ec)
return make_unexpected(ec);
PackedUIntMeta meta{buf_.InputBuffer()[0]};
buf_.ConsumeInput(1);
if (auto ec = EnsureRead(meta.ByteSize()); ec)
return make_unexpected(ec);
// Read and check intenger.
uint64_t res;
SET_OR_UNEXPECT(ReadPackedUInt(meta, buf_.InputBuffer()), res);
buf_.ConsumeInput(meta.ByteSize());
if (res > std::numeric_limits<UT>::max())
return make_unexpected(make_error_code(errc::result_out_of_range));
return static_cast<UT>(res);
}
template io::Result<uint8_t> JournalReader::ReadUInt<uint8_t>();
template io::Result<uint16_t> JournalReader::ReadUInt<uint16_t>();
template io::Result<uint32_t> JournalReader::ReadUInt<uint32_t>();
template io::Result<uint64_t> JournalReader::ReadUInt<uint64_t>();
io::Result<size_t> JournalReader::ReadString() {
size_t size = 0;
SET_OR_UNEXPECT(ReadUInt<uint64_t>(), size);
if (auto ec = EnsureRead(size); ec)
return make_unexpected(ec);
unsigned offset = str_buf_.size();
str_buf_.resize(offset + size);
buf_.ReadAndConsume(size, str_buf_.data() + offset);
return size; return size;
} }
std::error_code JournalReader::Read(io::Source* source, CmdArgVec* vec) { std::error_code JournalReader::Read(CmdArgVec* vec) {
buf_.ConsumeInput(buf_.InputBuffer().size()); size_t num_strings = 0;
SET_OR_RETURN(ReadUInt<uint64_t>(), num_strings);
vec->resize(num_strings);
size_t size = 0; // Read all strings consecutively.
SET_OR_RETURN(ReadU64(source), size); str_buf_.clear();
vec->resize(size);
for (auto& span : *vec) { for (auto& span : *vec) {
size_t len; size_t size;
SET_OR_RETURN(ReadString(source), len); SET_OR_RETURN(ReadString(), size);
span = MutableSlice{nullptr, len}; span = MutableSlice{nullptr, size};
} }
size_t offset = 0; // Set span pointers, now that string buffer won't reallocate.
char* ptr = str_buf_.data();
for (auto& span : *vec) { for (auto& span : *vec) {
size_t len = span.size(); span = {ptr, span.size()};
auto ptr = buf_.InputBuffer().subspan(offset).data(); ptr += span.size();
span = MutableSlice{reinterpret_cast<char*>(ptr), len};
offset += len;
} }
return std::error_code{}; return std::error_code{};
} }
io::Result<journal::ParsedEntry> JournalReader::ReadEntry(io::Source* source) { io::Result<journal::ParsedEntry> JournalReader::ReadEntry() {
uint8_t opcode; uint8_t opcode;
SET_OR_UNEXPECT(ReadU8(source), opcode); SET_OR_UNEXPECT(ReadUInt<uint8_t>(), opcode);
journal::ParsedEntry entry{static_cast<journal::Op>(opcode), dbid_}; journal::ParsedEntry entry{static_cast<journal::Op>(opcode), dbid_};
switch (entry.opcode) { switch (entry.opcode) {
case journal::Op::COMMAND: case journal::Op::COMMAND:
SET_OR_UNEXPECT(ReadU64(source), entry.txid); SET_OR_UNEXPECT(ReadUInt<uint64_t>(), entry.txid);
SET_OR_UNEXPECT(ReadU32(source), entry.shard_cnt); SET_OR_UNEXPECT(ReadUInt<uint32_t>(), entry.shard_cnt);
entry.payload = CmdArgVec{}; entry.payload = CmdArgVec{};
if (auto ec = Read(source, &*entry.payload); ec) if (auto ec = Read(&*entry.payload); ec)
return make_unexpected(ec); return make_unexpected(ec);
break; break;
case journal::Op::SELECT: case journal::Op::SELECT:
SET_OR_UNEXPECT(ReadU16(source), dbid_); SET_OR_UNEXPECT(ReadUInt<uint16_t>(), dbid_);
return ReadEntry(source); return ReadEntry();
default: default:
break; break;
}; };

View file

@ -18,24 +18,26 @@ namespace dfly {
// It automatically keeps track of the current database index. // It automatically keeps track of the current database index.
class JournalWriter { class JournalWriter {
public: public:
// Initialize with sink and optional start database index. If no start index is set, // Write single entry to internal buffer.
// a SELECT will be issued before the first entry. void Write(const journal::Entry& entry);
JournalWriter(io::Sink* sink, std::optional<DbIndex> dbid = std::nullopt);
// Write single entry. // Flush internal buffer to sink.
std::error_code Write(const journal::Entry& entry); std::error_code Flush(io::Sink* sink_);
// Return reference to internal buffer.
base::IoBuf& Accumulated();
private: private:
std::error_code Write(uint64_t v); // Write packed unsigned integer. void Write(uint64_t v); // Write packed unsigned integer.
std::error_code Write(std::string_view sv); // Write string. void Write(std::string_view sv); // Write string.
std::error_code Write(CmdArgList args); void Write(CmdArgList args);
std::error_code Write(std::pair<std::string_view, ArgSlice> args); void Write(std::pair<std::string_view, ArgSlice> args);
std::error_code Write(std::monostate); // Overload for empty std::variant void Write(std::monostate); // Overload for empty std::variant
private: private:
io::Sink* sink_; base::IoBuf buf_{};
std::optional<DbIndex> cur_dbid_; std::optional<DbIndex> cur_dbid_{};
}; };
// JournalReader allows deserializing journal entries from a source. // JournalReader allows deserializing journal entries from a source.
@ -43,29 +45,33 @@ class JournalWriter {
struct JournalReader { struct JournalReader {
public: public:
// Initialize start database index. // Initialize start database index.
JournalReader(DbIndex dbid); JournalReader(io::Source* source, DbIndex dbid);
// Overwrite current db index. // Overwrite current db index.
void SetDb(DbIndex dbid); void SetDb(DbIndex dbid);
// Overwrite current source and ensure there is no leftover from previous.
void SetSource(io::Source* source);
// Try reading entry from source. // Try reading entry from source.
io::Result<journal::ParsedEntry> ReadEntry(io::Source* source); io::Result<journal::ParsedEntry> ReadEntry();
private: private:
// TODO: Templated endian encoding to not repeat...? // Read from source until buffer contains at least num bytes.
io::Result<uint8_t> ReadU8(io::Source* source); std::error_code EnsureRead(size_t num);
io::Result<uint16_t> ReadU16(io::Source* source);
io::Result<uint32_t> ReadU32(io::Source* source);
io::Result<uint64_t> ReadU64(io::Source* source);
// Read string into internal buffer and return size. // Read unsigned integer in packed encoding.
io::Result<size_t> ReadString(io::Source* source); template <typename UT> io::Result<UT> ReadUInt();
// Read argument array into internal buffer and build slice. // Read and append string to string buffer, return size.
// TODO: Inline store span data inside buffer to avoid alloaction io::Result<size_t> ReadString();
std::error_code Read(io::Source* source, CmdArgVec* vec);
// Read argument array into string buffer.
std::error_code Read(CmdArgVec* vec);
private: private:
std::string str_buf_; // last parsed entry points here
io::Source* source_;
base::IoBuf buf_; base::IoBuf buf_;
DbIndex dbid_; DbIndex dbid_;
}; };

View file

@ -4,6 +4,7 @@
#include "base/logging.h" #include "base/logging.h"
#include "server/journal/serializer.h" #include "server/journal/serializer.h"
#include "server/journal/types.h" #include "server/journal/types.h"
#include "server/serializer_commons.h"
using namespace testing; using namespace testing;
using namespace std; using namespace std;
@ -105,20 +106,19 @@ TEST(Journal, WriteRead) {
{5, 2, list("SET", "E", "2"), 1}}; {5, 2, list("SET", "E", "2"), 1}};
// Write all entries to string file. // Write all entries to string file.
io::StringSink ss; JournalWriter writer{};
JournalWriter writer{&ss};
for (const auto& entry : test_entries) { for (const auto& entry : test_entries) {
writer.Write(entry); writer.Write(entry);
} }
// Read them back. // Read them back.
io::BytesSource bs{io::Buffer(ss.str())}; io::BytesSource bs{writer.Accumulated().InputBuffer()};
JournalReader reader{0}; JournalReader reader{&bs, 0};
for (unsigned i = 0; i < test_entries.size(); i++) { for (unsigned i = 0; i < test_entries.size(); i++) {
auto& expected = test_entries[i]; auto& expected = test_entries[i];
auto res = reader.ReadEntry(&bs); auto res = reader.ReadEntry();
ASSERT_TRUE(res.has_value()); ASSERT_TRUE(res.has_value());
ASSERT_EQ(expected.opcode, res->opcode); ASSERT_EQ(expected.opcode, res->opcode);

View file

@ -1875,7 +1875,7 @@ error_code RdbLoaderBase::EnsureReadInternal(size_t min_sz) {
return kOk; return kOk;
} }
auto RdbLoaderBase::LoadLen(bool* is_encoded) -> io::Result<uint64_t> { io::Result<uint64_t> RdbLoaderBase::LoadLen(bool* is_encoded) {
if (is_encoded) if (is_encoded)
*is_encoded = false; *is_encoded = false;
@ -1885,33 +1885,19 @@ auto RdbLoaderBase::LoadLen(bool* is_encoded) -> io::Result<uint64_t> {
if (ec) if (ec)
return make_unexpected(ec); return make_unexpected(ec);
uint64_t res = 0; // Read integer meta info.
uint8_t first = mem_buf_->InputBuffer()[0]; auto bytes = mem_buf_->InputBuffer();
int type = (first & 0xC0) >> 6; PackedUIntMeta meta{bytes[0]};
mem_buf_->ConsumeInput(1); bytes.remove_prefix(1);
if (type == RDB_ENCVAL) {
/* Read a 6 bit encoding type. */ // Read integer.
if (is_encoded) uint64_t res;
*is_encoded = true; SET_OR_UNEXPECT(ReadPackedUInt(meta, bytes), res);
res = first & 0x3F;
} else if (type == RDB_6BITLEN) { if (meta.Type() == RDB_ENCVAL && is_encoded)
/* Read a 6 bit len. */ *is_encoded = true;
res = first & 0x3F;
} else if (type == RDB_14BITLEN) { mem_buf_->ConsumeInput(1 + meta.ByteSize());
res = ((first & 0x3F) << 8) | mem_buf_->InputBuffer()[0];
mem_buf_->ConsumeInput(1);
} else if (first == RDB_32BITLEN) {
/* Read a 32 bit len. */
res = absl::big_endian::Load32(mem_buf_->InputBuffer().data());
mem_buf_->ConsumeInput(4);
} else if (first == RDB_64BITLEN) {
/* Read a 64 bit len. */
res = absl::big_endian::Load64(mem_buf_->InputBuffer().data());
mem_buf_->ConsumeInput(8);
} else {
LOG(ERROR) << "Bad length encoding " << type << " in rdbLoadLen()";
return Unexpected(errc::rdb_file_corrupted);
}
return res; return res;
} }
@ -1963,13 +1949,14 @@ error_code RdbLoaderBase::HandleJournalBlob(Service* service, DbIndex dbid) {
io::BytesSource bs{io::Buffer(journal_blob)}; io::BytesSource bs{io::Buffer(journal_blob)};
journal_reader_.SetDb(dbid); journal_reader_.SetDb(dbid);
journal_reader_.SetSource(&bs);
// Parse and exectue in loop. // Parse and exectue in loop.
size_t done = 0; size_t done = 0;
JournalExecutor ex{service}; JournalExecutor ex{service};
while (done < num_entries) { while (done < num_entries) {
journal::ParsedEntry entry{}; journal::ParsedEntry entry{};
SET_OR_RETURN(journal_reader_.ReadEntry(&bs), entry); SET_OR_RETURN(journal_reader_.ReadEntry(), entry);
ex.Execute(entry); ex.Execute(entry);
done++; done++;
} }

View file

@ -145,7 +145,7 @@ class RdbLoaderBase {
size_t source_limit_ = SIZE_MAX; size_t source_limit_ = SIZE_MAX;
base::PODArray<uint8_t> compr_buf_; base::PODArray<uint8_t> compr_buf_;
std::unique_ptr<DecompressImpl> decompress_impl_; std::unique_ptr<DecompressImpl> decompress_impl_;
JournalReader journal_reader_{0}; JournalReader journal_reader_{nullptr, 0};
}; };
class RdbLoader : protected RdbLoaderBase { class RdbLoader : protected RdbLoaderBase {

View file

@ -27,7 +27,6 @@ extern "C" {
#include "core/string_set.h" #include "core/string_set.h"
#include "server/engine_shard_set.h" #include "server/engine_shard_set.h"
#include "server/error.h" #include "server/error.h"
#include "server/journal/serializer.h"
#include "server/rdb_extensions.h" #include "server/rdb_extensions.h"
#include "server/serializer_commons.h" #include "server/serializer_commons.h"
#include "server/snapshot.h" #include "server/snapshot.h"
@ -253,7 +252,7 @@ std::error_code RdbSerializer::SaveValue(const PrimeValue& pv) {
error_code RdbSerializer::SelectDb(uint32_t dbid) { error_code RdbSerializer::SelectDb(uint32_t dbid) {
uint8_t buf[16]; uint8_t buf[16];
buf[0] = RDB_OPCODE_SELECTDB; buf[0] = RDB_OPCODE_SELECTDB;
unsigned enclen = WritePackedUInt(dbid, buf + 1); unsigned enclen = WritePackedUInt(dbid, io::MutableBytes{buf}.subspan(1));
return WriteRaw(Bytes{buf, enclen + 1}); return WriteRaw(Bytes{buf, enclen + 1});
} }
@ -683,16 +682,19 @@ size_t RdbSerializer::SerializedLen() const {
} }
error_code RdbSerializer::WriteJournalEntries(absl::Span<const journal::Entry> entries) { error_code RdbSerializer::WriteJournalEntries(absl::Span<const journal::Entry> entries) {
// Write journal blob to string file.
io::StringSink ss{};
JournalWriter writer{&ss};
for (const auto& entry : entries) { for (const auto& entry : entries) {
RETURN_ON_ERR(writer.Write(entry)); journal_writer_.Write(entry);
} }
RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_JOURNAL_BLOB)); RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_JOURNAL_BLOB));
RETURN_ON_ERR(SaveLen(entries.size())); RETURN_ON_ERR(SaveLen(entries.size()));
return SaveString(ss.str());
auto& buf = journal_writer_.Accumulated();
auto bytes = buf.InputBuffer();
RETURN_ON_ERR(SaveString(string_view{reinterpret_cast<const char*>(bytes.data()), bytes.size()}));
buf.Clear();
return error_code{};
} }
error_code RdbSerializer::SaveString(string_view val) { error_code RdbSerializer::SaveString(string_view val) {
@ -902,7 +904,7 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
continue; continue;
if (record.db_index != last_db_index) { if (record.db_index != last_db_index) {
unsigned enclen = WritePackedUInt(record.db_index, buf + 1); unsigned enclen = WritePackedUInt(record.db_index, io::MutableBytes{buf}.subspan(1));
string_view str{(char*)buf, enclen + 1}; string_view str{(char*)buf, enclen + 1};
io_error = sink_->Write(io::Buffer(str)); io_error = sink_->Write(io::Buffer(str));
@ -1171,7 +1173,7 @@ void RdbSerializer::CompressBlob() {
// Write encoded compressed blob len // Write encoded compressed blob len
dest = mem_buf_.AppendBuffer(); dest = mem_buf_.AppendBuffer();
unsigned enclen = WritePackedUInt(compressed_blob.length(), dest.data()); unsigned enclen = WritePackedUInt(compressed_blob.length(), dest);
mem_buf_.CommitWrite(enclen); mem_buf_.CommitWrite(enclen);
// Write compressed blob // Write compressed blob

View file

@ -16,6 +16,7 @@ extern "C" {
#include "base/pod_array.h" #include "base/pod_array.h"
#include "io/io.h" #include "io/io.h"
#include "server/common.h" #include "server/common.h"
#include "server/journal/serializer.h"
#include "server/journal/types.h" #include "server/journal/types.h"
#include "server/table.h" #include "server/table.h"
@ -168,10 +169,13 @@ class RdbSerializer {
void CompressBlob(); void CompressBlob();
void AllocateCompressorOnce(); void AllocateCompressorOnce();
JournalWriter journal_writer_;
std::unique_ptr<LZF_HSLOT[]> lzf_; std::unique_ptr<LZF_HSLOT[]> lzf_;
base::IoBuf mem_buf_; base::IoBuf mem_buf_;
base::PODArray<uint8_t> tmp_buf_; base::PODArray<uint8_t> tmp_buf_;
std::string tmp_str_; std::string tmp_str_;
CompressionMode compression_mode_; CompressionMode compression_mode_;
// TODO : This compressor impl should support different compression algorithms zstd/lz4 etc. // TODO : This compressor impl should support different compression algorithms zstd/lz4 etc.
std::unique_ptr<CompressorImpl> compressor_impl_; std::unique_ptr<CompressorImpl> compressor_impl_;

View file

@ -751,10 +751,10 @@ void Replica::StableSyncDflyFb(Context* cntx) {
SocketSource ss{sock_.get()}; SocketSource ss{sock_.get()};
io::PrefixSource ps{prefix, &ss}; io::PrefixSource ps{prefix, &ss};
JournalReader reader{0}; JournalReader reader{&ps, 0};
JournalExecutor executor{&service_}; JournalExecutor executor{&service_};
while (!cntx->IsCancelled()) { while (!cntx->IsCancelled()) {
auto res = reader.ReadEntry(&ps); auto res = reader.ReadEntry();
if (!res) { if (!res) {
cntx->ReportError(res.error(), "Journal format error"); cntx->ReportError(res.error(), "Journal format error");
return; return;

View file

@ -14,11 +14,32 @@ using namespace std;
namespace dfly { namespace dfly {
int PackedUIntMeta::Type() const {
return (first_byte & 0xC0) >> 6;
}
unsigned PackedUIntMeta::ByteSize() const {
switch (Type()) {
case RDB_ENCVAL:
case RDB_6BITLEN:
return 0;
case RDB_14BITLEN:
return 1;
};
switch (first_byte) {
case RDB_32BITLEN:
return 4;
case RDB_64BITLEN:
return 8;
};
return 0;
}
/* Saves an encoded unsigned integer. The first two bits in the first byte are used to /* Saves an encoded unsigned integer. The first two bits in the first byte are used to
* hold the encoding type. See the RDB_* definitions for more information * hold the encoding type. See the RDB_* definitions for more information
* on the types of encoding. buf must be at least 9 bytes. * on the types of encoding. buf must be at least 9 bytes.
* */ * */
unsigned WritePackedUInt(uint64_t value, uint8_t* buf) { unsigned WritePackedUInt(uint64_t value, io::MutableBytes buf) {
if (value < (1 << 6)) { if (value < (1 << 6)) {
/* Save a 6 bit value */ /* Save a 6 bit value */
buf[0] = (value & 0xFF) | (RDB_6BITLEN << 6); buf[0] = (value & 0xFF) | (RDB_6BITLEN << 6);
@ -35,47 +56,32 @@ unsigned WritePackedUInt(uint64_t value, uint8_t* buf) {
if (value <= UINT32_MAX) { if (value <= UINT32_MAX) {
/* Save a 32 bit value */ /* Save a 32 bit value */
buf[0] = RDB_32BITLEN; buf[0] = RDB_32BITLEN;
absl::big_endian::Store32(buf + 1, value); absl::big_endian::Store32(buf.data() + 1, value);
return 1 + 4; return 1 + 4;
} }
/* Save a 64 bit value */ /* Save a 64 bit value */
buf[0] = RDB_64BITLEN; buf[0] = RDB_64BITLEN;
absl::big_endian::Store64(buf + 1, value); absl::big_endian::Store64(buf.data() + 1, value);
return 1 + 8; return 1 + 8;
} }
io::Result<uint64_t> ReadPackedUInt(io::Source* source) { io::Result<uint64_t> ReadPackedUInt(PackedUIntMeta meta, io::Bytes bytes) {
uint8_t buf[10]; DCHECK(meta.ByteSize() <= bytes.size());
size_t read = 0; switch (meta.Type()) {
case RDB_ENCVAL:
uint8_t first = 0;
SET_OR_UNEXPECT(source->Read(io::MutableBytes{&first, 1}), read);
if (read != 1)
return make_unexpected(make_error_code(errc::bad_message));
int type = (first & 0xC0) >> 6;
switch (type) {
case RDB_6BITLEN: case RDB_6BITLEN:
return first & 0x3F; return meta.first_byte & 0x3F;
case RDB_14BITLEN: case RDB_14BITLEN:
SET_OR_UNEXPECT(source->Read(io::MutableBytes{buf, 1}), read); return ((meta.first_byte & 0x3F) << 8) | bytes[0];
if (read != 1) };
return make_unexpected(make_error_code(errc::bad_message)); switch (meta.first_byte) {
return ((first & 0x3F) << 8) | buf[0];
case RDB_32BITLEN: case RDB_32BITLEN:
SET_OR_UNEXPECT(source->Read(io::MutableBytes{buf, 4}), read); return absl::big_endian::Load32(bytes.data());
if (read != 4)
return make_unexpected(make_error_code(errc::bad_message));
return absl::big_endian::Load32(buf);
case RDB_64BITLEN: case RDB_64BITLEN:
SET_OR_UNEXPECT(source->Read(io::MutableBytes{buf, 8}), read); return absl::big_endian::Load64(bytes.data());
if (read != 8) };
return make_unexpected(make_error_code(errc::bad_message)); return make_unexpected(make_error_code(errc::illegal_byte_sequence));
return absl::big_endian::Load64(buf);
default:
return make_unexpected(make_error_code(errc::illegal_byte_sequence));
}
} }
} // namespace dfly } // namespace dfly

View file

@ -30,12 +30,27 @@ using nonstd::make_unexpected;
dest = std::move(exp_res.value()); \ dest = std::move(exp_res.value()); \
} }
// Represents meta information for an encoded packed unsigned integer.
struct PackedUIntMeta {
// Initialize by first byte in sequence.
PackedUIntMeta(uint8_t first_byte) : first_byte{first_byte} {
}
// Get underlying RDB type.
int Type() const;
// Get additional size in bytes (excluding first one).
unsigned ByteSize() const;
uint8_t first_byte;
};
// Saves an packed unsigned integer. The first two bits in the first byte are used to // Saves an packed unsigned integer. The first two bits in the first byte are used to
// hold the encoding type. See the RDB_* definitions for more information // hold the encoding type. See the RDB_* definitions for more information
// on the types of encoding. buf must be at least 9 bytes. // on the types of encoding. buf must be at least 9 bytes.
unsigned WritePackedUInt(uint64_t value, uint8_t* buf); unsigned WritePackedUInt(uint64_t value, io::MutableBytes dest);
// Deserialize packed unsigned integer. // Deserialize packed unsigned integer.
io::Result<uint64_t> ReadPackedUInt(io::Source* source); io::Result<uint64_t> ReadPackedUInt(PackedUIntMeta meta, io::Bytes source);
} // namespace dfly } // namespace dfly

View file

@ -67,7 +67,7 @@ async def test_replication_all(df_local_factory, t_master, t_replicas, n_keys, n
# Check range [n_stream_keys, n_keys] is of seed 1 # Check range [n_stream_keys, n_keys] is of seed 1
await batch_check_data_async(c_replica, gen_test_data(n_keys, start=n_stream_keys, seed=1)) await batch_check_data_async(c_replica, gen_test_data(n_keys, start=n_stream_keys, seed=1))
# Check range [0, n_stream_keys] is of seed 2 # Check range [0, n_stream_keys] is of seed 2
await asyncio.sleep(0.2) await asyncio.sleep(1.0)
await batch_check_data_async(c_replica, gen_test_data(n_stream_keys, seed=2)) await batch_check_data_async(c_replica, gen_test_data(n_stream_keys, seed=2))
# Start streaming data and run REPLICAOF in parallel # Start streaming data and run REPLICAOF in parallel
@ -85,7 +85,7 @@ async def test_replication_all(df_local_factory, t_master, t_replicas, n_keys, n
# Check stable state streaming # Check stable state streaming
await batch_fill_data_async(c_master, gen_test_data(n_keys, seed=3)) await batch_fill_data_async(c_master, gen_test_data(n_keys, seed=3))
await asyncio.sleep(0.5) await asyncio.sleep(1.0)
await asyncio.gather(*(batch_check_data_async(c, gen_test_data(n_keys, seed=3)) await asyncio.gather(*(batch_check_data_async(c, gen_test_data(n_keys, seed=3))
for c in c_replicas)) for c in c_replicas))