mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
* bug(snapshot) : Do not preempt inside OnDbChange Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
90233bfdc8
commit
4b44fb6baf
8 changed files with 50 additions and 109 deletions
|
@ -90,10 +90,6 @@ JournalReader::JournalReader(io::Source* source, DbIndex dbid)
|
|||
: source_{source}, buf_{4_KB}, dbid_{dbid} {
|
||||
}
|
||||
|
||||
void JournalReader::SetDb(DbIndex dbid) {
|
||||
dbid_ = dbid;
|
||||
}
|
||||
|
||||
void JournalReader::SetSource(io::Source* source) {
|
||||
CHECK_EQ(buf_.InputLen(), 0ULL);
|
||||
source_ = source;
|
||||
|
|
|
@ -43,9 +43,6 @@ struct JournalReader {
|
|||
// Initialize start database index.
|
||||
JournalReader(io::Source* source, DbIndex dbid);
|
||||
|
||||
// Overwrite current db index.
|
||||
void SetDb(DbIndex dbid);
|
||||
|
||||
// Overwrite current source and ensure there is no leftover from previous.
|
||||
void SetSource(io::Source* source);
|
||||
|
||||
|
|
|
@ -1820,7 +1820,7 @@ error_code RdbLoader::Load(io::Source* src) {
|
|||
for (unsigned i = 0; i < shard_set->size(); ++i) {
|
||||
FlushShardAsync(i);
|
||||
}
|
||||
RETURN_ON_ERR(HandleJournalBlob(service_, cur_db_index_));
|
||||
RETURN_ON_ERR(HandleJournalBlob(service_));
|
||||
continue;
|
||||
}
|
||||
|
||||
|
@ -1961,7 +1961,7 @@ error_code RdbLoaderBase::HandleCompressedBlobFinish() {
|
|||
return kOk;
|
||||
}
|
||||
|
||||
error_code RdbLoaderBase::HandleJournalBlob(Service* service, DbIndex dbid) {
|
||||
error_code RdbLoaderBase::HandleJournalBlob(Service* service) {
|
||||
// Read the number of entries in the journal blob.
|
||||
size_t num_entries;
|
||||
bool _encoded;
|
||||
|
@ -1972,7 +1972,6 @@ error_code RdbLoaderBase::HandleJournalBlob(Service* service, DbIndex dbid) {
|
|||
SET_OR_RETURN(FetchGenericString(), journal_blob);
|
||||
|
||||
io::BytesSource bs{io::Buffer(journal_blob)};
|
||||
journal_reader_.SetDb(dbid);
|
||||
journal_reader_.SetSource(&bs);
|
||||
|
||||
// Parse and exectue in loop.
|
||||
|
|
|
@ -129,7 +129,7 @@ class RdbLoaderBase {
|
|||
std::error_code HandleCompressedBlobFinish();
|
||||
void AllocateDecompressOnce(int op_type);
|
||||
|
||||
std::error_code HandleJournalBlob(Service* service, DbIndex dbid);
|
||||
std::error_code HandleJournalBlob(Service* service);
|
||||
|
||||
static size_t StrLen(const RdbVariant& tset);
|
||||
|
||||
|
|
|
@ -255,6 +255,10 @@ std::error_code RdbSerializer::SaveValue(const PrimeValue& pv) {
|
|||
}
|
||||
|
||||
error_code RdbSerializer::SelectDb(uint32_t dbid) {
|
||||
if (dbid == last_entry_db_index_) {
|
||||
return error_code{};
|
||||
}
|
||||
last_entry_db_index_ = dbid;
|
||||
uint8_t buf[16];
|
||||
buf[0] = RDB_OPCODE_SELECTDB;
|
||||
unsigned enclen = WritePackedUInt(dbid, io::MutableBytes{buf}.subspan(1));
|
||||
|
@ -263,7 +267,8 @@ error_code RdbSerializer::SelectDb(uint32_t dbid) {
|
|||
|
||||
// Called by snapshot
|
||||
io::Result<uint8_t> RdbSerializer::SaveEntry(const PrimeKey& pk, const PrimeValue& pv,
|
||||
uint64_t expire_ms) {
|
||||
uint64_t expire_ms, DbIndex dbid) {
|
||||
SelectDb(dbid);
|
||||
uint8_t buf[16];
|
||||
error_code ec;
|
||||
/* Save the expire time */
|
||||
|
@ -681,6 +686,11 @@ error_code RdbSerializer::FlushToSink(io::Sink* s) {
|
|||
// interrupt point.
|
||||
RETURN_ON_ERR(s->Write(bytes));
|
||||
mem_buf_.ConsumeInput(bytes.size());
|
||||
// After every flush we should write the DB index again because the blobs in the channel are
|
||||
// interleaved and multiple savers can correspond to a single writer (in case of single file rdb
|
||||
// snapshot)
|
||||
last_entry_db_index_ = kInvalidDbId;
|
||||
|
||||
return error_code{};
|
||||
}
|
||||
|
||||
|
@ -701,17 +711,14 @@ io::Bytes RdbSerializer::PrepareFlush() {
|
|||
return mem_buf_.InputBuffer();
|
||||
}
|
||||
|
||||
error_code RdbSerializer::WriteJournalEntries(absl::Span<const journal::Entry> entries) {
|
||||
error_code RdbSerializer::WriteJournalEntry(const journal::Entry& entry) {
|
||||
io::BufSink buf_sink{&journal_mem_buf_};
|
||||
JournalWriter writer{&buf_sink};
|
||||
for (const auto& entry : entries) {
|
||||
writer.Write(entry);
|
||||
}
|
||||
writer.Write(entry);
|
||||
|
||||
RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_JOURNAL_BLOB));
|
||||
RETURN_ON_ERR(SaveLen(entries.size()));
|
||||
RETURN_ON_ERR(SaveLen(1));
|
||||
RETURN_ON_ERR(SaveString(io::View(journal_mem_buf_.InputBuffer())));
|
||||
|
||||
journal_mem_buf_.Clear();
|
||||
return error_code{};
|
||||
}
|
||||
|
@ -903,12 +910,8 @@ error_code RdbSaver::Impl::SaveAuxFieldStrStr(string_view key, string_view val)
|
|||
error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
|
||||
error_code io_error;
|
||||
|
||||
uint8_t buf[16];
|
||||
size_t channel_bytes = 0;
|
||||
SliceSnapshot::DbRecord record;
|
||||
DbIndex last_db_index = kInvalidDbId;
|
||||
|
||||
buf[0] = RDB_OPCODE_SELECTDB;
|
||||
|
||||
// we can not exit on io-error since we spawn fibers that push data.
|
||||
// TODO: we may signal them to stop processing and exit asap in case of the error.
|
||||
|
@ -922,16 +925,6 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
|
|||
if (cll->IsCancelled())
|
||||
continue;
|
||||
|
||||
if (record.db_index != last_db_index) {
|
||||
unsigned enclen = WritePackedUInt(record.db_index, io::MutableBytes{buf}.subspan(1));
|
||||
string_view str{(char*)buf, enclen + 1};
|
||||
|
||||
io_error = sink_->Write(io::Buffer(str));
|
||||
if (io_error)
|
||||
break;
|
||||
last_db_index = record.db_index;
|
||||
}
|
||||
|
||||
DVLOG(2) << "Pulled " << record.id;
|
||||
channel_bytes += record.value.size();
|
||||
|
||||
|
|
|
@ -129,7 +129,8 @@ class RdbSerializer {
|
|||
// Must be called in the thread to which `it` belongs.
|
||||
// Returns the serialized rdb_type or the error.
|
||||
// expire_ms = 0 means no expiry.
|
||||
io::Result<uint8_t> SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms);
|
||||
io::Result<uint8_t> SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms,
|
||||
DbIndex dbid);
|
||||
|
||||
std::error_code SaveLen(size_t len);
|
||||
std::error_code SaveString(std::string_view val);
|
||||
|
@ -148,8 +149,8 @@ class RdbSerializer {
|
|||
return WriteRaw(::io::Bytes{&opcode, 1});
|
||||
}
|
||||
|
||||
// Write journal entries as an embedded journal blob.
|
||||
std::error_code WriteJournalEntries(absl::Span<const journal::Entry> entries);
|
||||
// Write journal entry as an embedded journal blob.
|
||||
std::error_code WriteJournalEntry(const journal::Entry& entry);
|
||||
|
||||
// Send FULL_SYNC_CUT opcode to notify that all static data was sent.
|
||||
std::error_code SendFullSyncCut();
|
||||
|
@ -180,6 +181,7 @@ class RdbSerializer {
|
|||
base::IoBuf journal_mem_buf_;
|
||||
std::string tmp_str_;
|
||||
base::PODArray<uint8_t> tmp_buf_;
|
||||
DbIndex last_entry_db_index_ = kInvalidDbId;
|
||||
|
||||
std::unique_ptr<LZF_HSLOT[]> lzf_;
|
||||
|
||||
|
|
|
@ -51,7 +51,7 @@ void SliceSnapshot::Start(bool stream_journal, const Cancellation* cll) {
|
|||
journal_cb_id_ = journal->RegisterOnChange(move(journal_cb));
|
||||
}
|
||||
|
||||
default_serializer_.reset(new RdbSerializer(compression_mode_));
|
||||
serializer_.reset(new RdbSerializer(compression_mode_));
|
||||
|
||||
VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_;
|
||||
|
||||
|
@ -74,7 +74,7 @@ void SliceSnapshot::Stop() {
|
|||
db_slice_->shard_owner()->journal()->UnregisterOnChange(journal_cb_id_);
|
||||
}
|
||||
|
||||
FlushDefaultBuffer(true);
|
||||
PushSerializedToChannel(true);
|
||||
CloseRecordChannel();
|
||||
}
|
||||
|
||||
|
@ -135,7 +135,7 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {
|
|||
PrimeTable::Cursor next =
|
||||
pt->Traverse(cursor, absl::bind_front(&SliceSnapshot::BucketSaveCb, this));
|
||||
cursor = next;
|
||||
FlushDefaultBuffer(false);
|
||||
PushSerializedToChannel(false);
|
||||
|
||||
if (stats_.loop_serialized >= last_yield + 100) {
|
||||
DVLOG(2) << "Before sleep " << this_fiber::properties<FiberProps>().name();
|
||||
|
@ -143,14 +143,14 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {
|
|||
DVLOG(2) << "After sleep";
|
||||
|
||||
last_yield = stats_.loop_serialized;
|
||||
// flush in case other fibers (writes commands that pushed previous values)
|
||||
// Push in case other fibers (writes commands that pushed previous values)
|
||||
// filled the buffer.
|
||||
FlushDefaultBuffer(false);
|
||||
PushSerializedToChannel(false);
|
||||
}
|
||||
} while (cursor);
|
||||
|
||||
DVLOG(2) << "after loop " << this_fiber::properties<FiberProps>().name();
|
||||
FlushDefaultBuffer(true);
|
||||
PushSerializedToChannel(true);
|
||||
} // for (dbindex)
|
||||
|
||||
// Wait for SerializePhysicalBucket to finish.
|
||||
|
@ -159,8 +159,8 @@ void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {
|
|||
|
||||
// TODO: investigate why a single byte gets stuck and does not arrive to replica
|
||||
for (unsigned i = 10; i > 1; i--)
|
||||
CHECK(!default_serializer_->SendFullSyncCut());
|
||||
FlushDefaultBuffer(true);
|
||||
CHECK(!serializer_->SendFullSyncCut());
|
||||
PushSerializedToChannel(true);
|
||||
|
||||
// serialized + side_saved must be equal to the total saved.
|
||||
VLOG(1) << "Exit SnapshotSerializer (loop_serialized/side_saved/cbcalls): "
|
||||
|
@ -192,33 +192,16 @@ unsigned SliceSnapshot::SerializeBucket(DbIndex db_index, PrimeTable::bucket_ite
|
|||
unsigned result = 0;
|
||||
|
||||
lock_guard lk(mu_);
|
||||
|
||||
optional<RdbSerializer> tmp_serializer;
|
||||
RdbSerializer* serializer_ptr = default_serializer_.get();
|
||||
if (db_index != current_db_) {
|
||||
CompressionMode compression_mode = compression_mode_ == CompressionMode::NONE
|
||||
? CompressionMode::NONE
|
||||
: CompressionMode::SINGLE_ENTRY;
|
||||
tmp_serializer.emplace(compression_mode);
|
||||
serializer_ptr = &*tmp_serializer;
|
||||
}
|
||||
|
||||
while (!it.is_done()) {
|
||||
++result;
|
||||
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_ptr);
|
||||
SerializeEntry(db_index, it->first, it->second, nullopt, serializer_.get());
|
||||
++it;
|
||||
}
|
||||
|
||||
if (tmp_serializer) {
|
||||
PushBytesToChannel(db_index, &*tmp_serializer);
|
||||
VLOG(1) << "Pushed " << result << " entries via tmp_serializer";
|
||||
}
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
// This function should not block and should not preempt because it's called
|
||||
// from SerializePhysicalBucket which should execute atomically.
|
||||
// from SerializeBucket which should execute atomically.
|
||||
void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const PrimeValue& pv,
|
||||
optional<uint64_t> expire, RdbSerializer* serializer) {
|
||||
time_t expire_time = expire.value_or(0);
|
||||
|
@ -227,35 +210,30 @@ void SliceSnapshot::SerializeEntry(DbIndex db_indx, const PrimeKey& pk, const Pr
|
|||
expire_time = db_slice_->ExpireTime(eit);
|
||||
}
|
||||
|
||||
io::Result<uint8_t> res = serializer->SaveEntry(pk, pv, expire_time);
|
||||
io::Result<uint8_t> res = serializer->SaveEntry(pk, pv, expire_time, db_indx);
|
||||
CHECK(res);
|
||||
++type_freq_map_[*res];
|
||||
}
|
||||
|
||||
size_t SliceSnapshot::PushBytesToChannel(DbIndex db_index, RdbSerializer* serializer) {
|
||||
auto id = rec_id_++;
|
||||
DVLOG(2) << "Pushed " << id;
|
||||
bool SliceSnapshot::PushSerializedToChannel(bool force) {
|
||||
if (!force && serializer_->SerializedLen() < 4096)
|
||||
return false;
|
||||
|
||||
io::StringFile sfile;
|
||||
serializer->FlushToSink(&sfile);
|
||||
serializer_->FlushToSink(&sfile);
|
||||
|
||||
size_t serialized = sfile.val.size();
|
||||
if (serialized == 0)
|
||||
return 0;
|
||||
stats_.channel_bytes += serialized;
|
||||
|
||||
DbRecord db_rec{.db_index = db_index, .id = id, .value = std::move(sfile.val)};
|
||||
auto id = rec_id_++;
|
||||
DVLOG(2) << "Pushed " << id;
|
||||
DbRecord db_rec{.id = id, .value = std::move(sfile.val)};
|
||||
|
||||
dest_->Push(std::move(db_rec));
|
||||
return serialized;
|
||||
}
|
||||
|
||||
bool SliceSnapshot::FlushDefaultBuffer(bool force) {
|
||||
if (!force && default_serializer_->SerializedLen() < 4096)
|
||||
return false;
|
||||
|
||||
size_t written = PushBytesToChannel(current_db_, default_serializer_.get());
|
||||
VLOG(2) << "FlushDefaultBuffer " << written << " bytes";
|
||||
VLOG(2) << "PushSerializedToChannel " << serialized << " bytes";
|
||||
return true;
|
||||
}
|
||||
|
||||
|
@ -288,25 +266,11 @@ void SliceSnapshot::OnJournalEntry(const journal::Entry& entry, bool unused_awai
|
|||
return;
|
||||
}
|
||||
|
||||
optional<RdbSerializer> tmp_serializer;
|
||||
RdbSerializer* serializer_ptr = default_serializer_.get();
|
||||
if (entry.dbid != current_db_) {
|
||||
CompressionMode compression_mode = compression_mode_ == CompressionMode::NONE
|
||||
? CompressionMode::NONE
|
||||
: CompressionMode::SINGLE_ENTRY;
|
||||
tmp_serializer.emplace(compression_mode);
|
||||
serializer_ptr = &*tmp_serializer;
|
||||
}
|
||||
serializer_->WriteJournalEntry(entry);
|
||||
|
||||
serializer_ptr->WriteJournalEntries(absl::Span{&entry, 1});
|
||||
|
||||
if (tmp_serializer) {
|
||||
PushBytesToChannel(entry.dbid, &*tmp_serializer);
|
||||
} else {
|
||||
// This is the only place that flushes in streaming mode
|
||||
// once the iterate buckets fiber finished.
|
||||
FlushDefaultBuffer(false);
|
||||
}
|
||||
// This is the only place that flushes in streaming mode
|
||||
// once the iterate buckets fiber finished.
|
||||
PushSerializedToChannel(false);
|
||||
}
|
||||
|
||||
void SliceSnapshot::CloseRecordChannel() {
|
||||
|
|
|
@ -50,10 +50,7 @@ class RdbSerializer;
|
|||
// over the channel until explicitly stopped.
|
||||
class SliceSnapshot {
|
||||
public:
|
||||
// Each dbrecord should belong to exactly one db.
|
||||
// RdbSaver adds "select" opcodes when necessary in order to maintain consistency.
|
||||
struct DbRecord {
|
||||
DbIndex db_index;
|
||||
uint64_t id;
|
||||
std::string value;
|
||||
};
|
||||
|
@ -95,9 +92,6 @@ class SliceSnapshot {
|
|||
void SerializeEntry(DbIndex db_index, const PrimeKey& pk, const PrimeValue& pv,
|
||||
std::optional<uint64_t> expire, RdbSerializer* serializer);
|
||||
|
||||
// Push rdb serializer's internal buffer to channel. Return now many bytes were written.
|
||||
size_t PushBytesToChannel(DbIndex db_index, RdbSerializer* serializer);
|
||||
|
||||
// DbChange listener
|
||||
void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req);
|
||||
|
||||
|
@ -107,20 +101,16 @@ class SliceSnapshot {
|
|||
// Close dest channel if not closed yet.
|
||||
void CloseRecordChannel();
|
||||
|
||||
// Call PushFileToChannel on default buffer if needed.
|
||||
// Flush regradless of size if force is true.
|
||||
// Return if flushed.
|
||||
bool FlushDefaultBuffer(bool force);
|
||||
// Push serializer's internal buffer to channel.
|
||||
// Push regardless of buffer size if force is true.
|
||||
// Return if pushed.
|
||||
bool PushSerializedToChannel(bool force);
|
||||
|
||||
public:
|
||||
uint64_t snapshot_version() const {
|
||||
return snapshot_version_;
|
||||
}
|
||||
|
||||
RdbSerializer* serializer() {
|
||||
return default_serializer_.get();
|
||||
}
|
||||
|
||||
size_t channel_bytes() const {
|
||||
return stats_.channel_bytes;
|
||||
}
|
||||
|
@ -138,7 +128,7 @@ class SliceSnapshot {
|
|||
|
||||
DbIndex current_db_;
|
||||
|
||||
std::unique_ptr<RdbSerializer> default_serializer_;
|
||||
std::unique_ptr<RdbSerializer> serializer_;
|
||||
|
||||
::boost::fibers::mutex mu_;
|
||||
::boost::fibers::fiber snapshot_fb_; // IterateEntriesFb
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue