feat: Use journal LSNs for absolute replication offsets (#1242)

* feat: Use journal LSNs for absolute replication offsets

* 1 - Address small CR comments
2 - Simplify the offset accounting so that we send the correct offset
    in `SliceSnapshot::Stop` instead of counting in RdbLoader. This
    allows us to revert the changes to slice journaling of EXEC
    commands, for example.

* Store int with absl::little_endian

* Document the offset management
This commit is contained in:
Roy Jacobson 2023-05-22 15:34:32 +03:00 committed by GitHub
parent 6962771c21
commit cbb2afc792
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
17 changed files with 90 additions and 43 deletions

View file

@ -119,7 +119,7 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
return Expire(args, cntx);
}
if (sub_cmd == "REPLICAOFFSET" && args.size() == 2) {
if (sub_cmd == "REPLICAOFFSET" && args.size() == 1) {
return ReplicaOffset(args, cntx);
}
@ -347,23 +347,16 @@ void DflyCmd::Expire(CmdArgList args, ConnectionContext* cntx) {
void DflyCmd::ReplicaOffset(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
string_view sync_id_str = ArgS(args, 1);
VLOG(1) << "Got DFLY REPLICAOFFSET " << sync_id_str;
auto [sync_id, replica_ptr] = GetReplicaInfoOrReply(sync_id_str, rb);
if (!sync_id)
return;
rb->StartArray(shard_set->size());
std::vector<LSN> lsns(shard_set->size());
shard_set->RunBriefInParallel([&](EngineShard* shard) {
auto* journal = shard->journal();
lsns[shard->shard_id()] = journal ? journal->GetLsn() : 0;
});
string result;
unique_lock lk(replica_ptr->mu);
rb->StartArray(replica_ptr->flows.size());
for (size_t flow_id = 0; flow_id < replica_ptr->flows.size(); ++flow_id) {
JournalStreamer* streamer = replica_ptr->flows[flow_id].streamer.get();
if (streamer) {
rb->SendLong(streamer->GetRecordCount());
} else {
rb->SendLong(0);
}
for (size_t shard_id = 0; shard_id < shard_set->size(); ++shard_id) {
rb->SendLong(lsns[shard_id]);
}
}

View file

@ -124,12 +124,15 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) {
}
}
if (entry.opcode == Op::NOOP)
return;
// TODO: This is preparation for AOC style journaling, currently unused.
RingItem item;
item.lsn = lsn_;
item.opcode = entry.opcode;
item.txid = entry.txid;
VLOG(1) << "Writing item " << item.lsn;
VLOG(1) << "Writing item [" << item.lsn << "]: " << entry.ToString();
ring_buffer_->EmplaceOrOverride(move(item));
if (shard_file_) {

View file

@ -27,6 +27,7 @@ class JournalSlice {
std::error_code Close();
// This is always the LSN of the *next* journal entry.
LSN cur_lsn() const {
return lsn_;
}

View file

@ -65,6 +65,8 @@ void JournalWriter::Write(const journal::Entry& entry) {
cur_dbid_ = entry.dbid;
}
VLOG(1) << "Writing entry " << entry.ToString();
Write(uint8_t(entry.opcode));
switch (entry.opcode) {
@ -187,6 +189,8 @@ io::Result<journal::ParsedEntry> JournalReader::ReadEntry() {
SET_OR_UNEXPECT(ReadUInt<uint64_t>(), entry.txid);
SET_OR_UNEXPECT(ReadUInt<uint32_t>(), entry.shard_cnt);
VLOG(1) << "Read entry " << entry.ToString();
if (opcode == journal::Op::EXEC) {
return entry;
}

View file

@ -8,21 +8,16 @@ namespace dfly {
using namespace util;
void JournalStreamer::Start(io::Sink* dest) {
using namespace journal;
write_fb_ = MakeFiber(&JournalStreamer::WriterFb, this, dest);
journal_cb_id_ =
journal_->RegisterOnChange([this](const journal::Entry& entry, bool allow_await) {
if (entry.opcode == journal::Op::NOOP) {
// No recode to write, just await if data was written so consumer will read the data.
return AwaitIfWritten();
}
writer_.Write(entry);
record_cnt_.fetch_add(1, std::memory_order_relaxed);
NotifyWritten(allow_await);
});
}
uint64_t JournalStreamer::GetRecordCount() const {
return record_cnt_.load(std::memory_order_relaxed);
journal_cb_id_ = journal_->RegisterOnChange([this](const Entry& entry, bool allow_await) {
if (entry.opcode == Op::NOOP) {
// No recode to write, just await if data was written so consumer will read the data.
return AwaitIfWritten();
}
writer_.Write(entry);
NotifyWritten(allow_await);
});
}
void JournalStreamer::Cancel() {

View file

@ -28,7 +28,6 @@ class JournalStreamer : protected BufferedStreamerBase {
// Must be called on context cancellation for unblocking
// and manual cleanup.
void Cancel();
uint64_t GetRecordCount() const;
private:
// Writer fiber that steals buffer contents and writes them to dest.
@ -42,8 +41,6 @@ class JournalStreamer : protected BufferedStreamerBase {
Fiber write_fb_{};
JournalWriter writer_{this};
std::atomic_uint64_t record_cnt_{0};
};
} // namespace dfly

View file

@ -7,7 +7,7 @@
namespace dfly::journal {
std::string Entry::ToString() const {
std::string rv = absl::StrCat("{dbid=", dbid);
std::string rv = absl::StrCat("{op=", opcode, ", dbid=", dbid);
std::visit(
[&rv](const auto& payload) {
if constexpr (std::is_same_v<std::decay_t<decltype(payload)>, std::monostate>) {
@ -33,4 +33,15 @@ std::string Entry::ToString() const {
return rv;
}
std::string ParsedEntry::ToString() const {
std::string rv = absl::StrCat("{op=", opcode, ", dbid=", dbid, ", cmd='");
for (auto& arg : cmd.cmd_args) {
absl::StrAppend(&rv, facade::ToSV(arg));
absl::StrAppend(&rv, " ");
}
rv.pop_back();
rv += "'}";
return rv;
}
} // namespace dfly::journal

View file

@ -64,6 +64,8 @@ struct ParsedEntry : public EntryBase {
CmdArgVec cmd_args; // represents the parsed command.
};
CmdData cmd;
std::string ToString() const;
};
using ChangeCallback = std::function<void(const Entry&, bool await)>;

View file

@ -27,3 +27,9 @@ const uint8_t RDB_OPCODE_COMPRESSED_LZ4_BLOB_START = 202;
const uint8_t RDB_OPCODE_COMPRESSED_BLOB_END = 203;
const uint8_t RDB_OPCODE_JOURNAL_BLOB = 210;
// A full sync will continue to send information in journal blobs until the replica
// sends a `DFLY STARTSTABLE` to the master.
// We use this opcode to synchronize the journal offsets at the end of the full sync,
// so it is always sent at the end of the RDB stream.
const uint8_t RDB_OPCODE_JOURNAL_OFFSET = 211;

View file

@ -1826,6 +1826,15 @@ error_code RdbLoader::Load(io::Source* src) {
continue;
}
if (type == RDB_OPCODE_JOURNAL_OFFSET) {
VLOG(1) << "Read RDB_OPCODE_JOURNAL_OFFSET";
uint64_t journal_offset;
SET_OR_RETURN(FetchInt<uint64_t>(), journal_offset);
VLOG(1) << "Got offset " << journal_offset;
journal_offset_ = journal_offset;
continue;
}
if (type == RDB_OPCODE_SELECTDB) {
unsigned dbid = 0;
@ -1838,7 +1847,7 @@ error_code RdbLoader::Load(io::Source* src) {
return RdbError(errc::bad_db_index);
}
VLOG(1) << "Select DB: " << dbid;
VLOG(2) << "Select DB: " << dbid;
for (unsigned i = 0; i < shard_set->size(); ++i) {
// we should flush pending items before switching dbid.
FlushShardAsync(i);
@ -2050,6 +2059,7 @@ error_code RdbLoaderBase::HandleJournalBlob(Service* service) {
journal::ParsedEntry entry{};
SET_OR_RETURN(journal_reader_.ReadEntry(), entry);
ex.Execute(entry.dbid, entry.cmd);
VLOG(1) << "Reading item: " << entry.ToString();
done++;
}

View file

@ -154,6 +154,7 @@ class RdbLoaderBase {
base::PODArray<uint8_t> compr_buf_;
std::unique_ptr<DecompressImpl> decompress_impl_;
JournalReader journal_reader_{nullptr, 0};
std::optional<uint64_t> journal_offset_ = std::nullopt;
};
class RdbLoader : protected RdbLoaderBase {
@ -184,6 +185,12 @@ class RdbLoader : protected RdbLoaderBase {
return load_time_;
}
// Return the offset that was received with a RDB_OPCODE_JOURNAL_OFFSET command,
// or 0 if no offset was received.
uint64_t journal_offset() const {
return journal_offset_.value_or(0);
}
// Set callback for receiving RDB_OPCODE_FULLSYNC_END.
// This opcode is used by a master instance to notify it finished streaming static data
// and is ready to switch to stable state sync.

View file

@ -666,6 +666,13 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) {
return error_code{};
}
error_code RdbSerializer::SendJournalOffset(uint64_t journal_offset) {
RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_JOURNAL_OFFSET));
uint8_t buf[sizeof(uint64_t)];
absl::little_endian::Store64(buf, journal_offset);
return WriteRaw(buf);
}
error_code RdbSerializer::SendFullSyncCut() {
RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_FULLSYNC_END));

View file

@ -154,6 +154,8 @@ class RdbSerializer {
// Write journal entry as an embedded journal blob.
std::error_code WriteJournalEntry(const journal::Entry& entry);
std::error_code SendJournalOffset(uint64_t journal_offset);
// Send FULL_SYNC_CUT opcode to notify that all static data was sent.
std::error_code SendFullSyncCut();

View file

@ -698,9 +698,9 @@ error_code Replica::ConsumeDflyStream() {
flow->waker_.notifyAll();
}
// Iterate over map and cancle all blocking entities
// Iterate over map and cancel all blocking entities
{
lock_guard l{multi_shard_exe_->map_mu};
lock_guard lk{multi_shard_exe_->map_mu};
for (auto& tx_data : multi_shard_exe_->tx_sync_execution) {
tx_data.second.barrier.Cancel();
tx_data.second.block.Cancel();
@ -888,6 +888,7 @@ void Replica::FullSyncDflyFb(string eof_token, BlockingCounter bc, Context* cntx
leftover_buf_.reset();
}
this->journal_rec_executed_.store(loader.journal_offset());
VLOG(1) << "FullSyncDflyFb finished after reading " << loader.bytes_read() << " bytes";
}
@ -1020,7 +1021,7 @@ void Replica::ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context*
// Check if we woke up due to cancellation.
if (cntx_.IsCancelled())
return;
} else { // Non gloabl command will be executed by each the flow fiber
} else { // Non global command will be executed by each flow fiber
VLOG(2) << "Execute txid: " << tx_data.txid << " executing shard transaction commands";
executor_->Execute(tx_data.dbid, absl::MakeSpan(tx_data.commands));
}

View file

@ -234,7 +234,14 @@ class Replica {
bool use_multi_shard_exe_sync_;
std::unique_ptr<JournalExecutor> executor_;
// Count the number of journal records executed in specific flow
// The master instance has a LSN for each journal record. This counts
// the number of journal records executed in this flow plus the initial
// journal offset that we received in the transition from full sync
// to stable sync.
// Note: This is not 1-to-1 the LSN in the master, because this counts
// **executed** records, which might be received interleaved when commands
// run out-of-order on the master instance.
std::atomic_uint64_t journal_rec_executed_ = 0;
// MainReplicationFb in standalone mode, FullSyncDflyFb in flow mode.

View file

@ -66,7 +66,9 @@ void SliceSnapshot::Stop() {
Join();
if (journal_cb_id_) {
db_slice_->shard_owner()->journal()->UnregisterOnChange(journal_cb_id_);
auto* journal = db_slice_->shard_owner()->journal();
serializer_->SendJournalOffset(journal->GetLsn());
journal->UnregisterOnChange(journal_cb_id_);
}
PushSerializedToChannel(true);

View file

@ -83,8 +83,7 @@ async def test_replication_all(df_local_factory, df_seeder_factory, t_master, t_
async def check_replica_finished_exec(c_replica, c_master):
syncid, r_offset = await c_replica.execute_command("DEBUG REPLICA OFFSET")
command = "DFLY REPLICAOFFSET " + syncid.decode()
m_offset = await c_master.execute_command(command)
m_offset = await c_master.execute_command("DFLY REPLICAOFFSET")
print(" offset", syncid.decode(), r_offset, m_offset)
return r_offset == m_offset