mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
feat: Support ACKs from replica to master (#1243)
* feat: Support ACKs from replica to master * Rework after CR * Split the acks into a different fiber and remove the PING loop * const convention * move around the order. * revert sleep removal * Exit ack fiber on cancellation * Don't send ACKs if server doesn't support it
This commit is contained in:
parent
76801fa8c3
commit
29c258df9b
9 changed files with 123 additions and 27 deletions
|
@ -18,6 +18,7 @@ namespace dfly {
|
|||
class EngineShardSet;
|
||||
class ConnectionContext;
|
||||
class ChannelStore;
|
||||
class FlowInfo;
|
||||
|
||||
// Stores command id and arguments for delayed invocation.
|
||||
// Used for storing MULTI/EXEC commands.
|
||||
|
@ -172,7 +173,10 @@ class ConnectionContext : public facade::ConnectionContext {
|
|||
void PUnsubscribeAll(bool to_reply);
|
||||
void ChangeMonitor(bool start); // either start or stop monitor on a given connection
|
||||
|
||||
// Whether this connection is a connection from a replica to its master.
|
||||
bool is_replicating = false;
|
||||
// Reference to a FlowInfo for this connection if from a master to a replica.
|
||||
FlowInfo* replication_flow;
|
||||
bool monitor = false; // when a monitor command is sent over a given connection, we need to aware
|
||||
// of it as a state for the connection
|
||||
|
||||
|
|
|
@ -250,6 +250,7 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) {
|
|||
absl::InsecureBitGen gen;
|
||||
string eof_token = GetRandomHex(gen, 40);
|
||||
|
||||
cntx->replication_flow = &replica_ptr->flows[flow_id];
|
||||
replica_ptr->flows[flow_id].conn = cntx->owner();
|
||||
replica_ptr->flows[flow_id].eof_token = eof_token;
|
||||
listener_->Migrate(cntx->owner(), shard_set->pool()->at(flow_id));
|
||||
|
@ -612,17 +613,17 @@ void DflyCmd::Shutdown() {
|
|||
}
|
||||
}
|
||||
|
||||
void DflyCmd::FlowInfo::TryShutdownSocket() {
|
||||
void FlowInfo::TryShutdownSocket() {
|
||||
// Close socket for clean disconnect.
|
||||
if (conn->socket()->IsOpen()) {
|
||||
(void)conn->socket()->Shutdown(SHUT_RDWR);
|
||||
}
|
||||
}
|
||||
|
||||
DflyCmd::FlowInfo::~FlowInfo() {
|
||||
FlowInfo::~FlowInfo() {
|
||||
}
|
||||
|
||||
DflyCmd::FlowInfo::FlowInfo() {
|
||||
FlowInfo::FlowInfo() {
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -26,6 +26,25 @@ class ServerFamily;
|
|||
class RdbSaver;
|
||||
class JournalStreamer;
|
||||
|
||||
// Stores information related to a single flow.
|
||||
struct FlowInfo {
|
||||
FlowInfo();
|
||||
~FlowInfo();
|
||||
// Shutdown associated socket if its still open.
|
||||
void TryShutdownSocket();
|
||||
|
||||
facade::Connection* conn;
|
||||
|
||||
Fiber full_sync_fb; // Full sync fiber.
|
||||
std::unique_ptr<RdbSaver> saver; // Saver used by the full sync phase.
|
||||
std::unique_ptr<JournalStreamer> streamer;
|
||||
std::string eof_token;
|
||||
|
||||
uint64_t last_acked_lsn;
|
||||
|
||||
std::function<void()> cleanup; // Optional cleanup for cancellation.
|
||||
};
|
||||
|
||||
// DflyCmd is responsible for managing replication. A master instance can be connected
|
||||
// to many replica instances, what is more, each of them can open multiple connections.
|
||||
// This is why its important to understand replica lifecycle management before making
|
||||
|
@ -75,23 +94,6 @@ class DflyCmd {
|
|||
// See header comments for state descriptions.
|
||||
enum class SyncState { PREPARATION, FULL_SYNC, STABLE_SYNC, CANCELLED };
|
||||
|
||||
// Stores information related to a single flow.
|
||||
struct FlowInfo {
|
||||
FlowInfo();
|
||||
~FlowInfo();
|
||||
// Shutdown associated socket if its still open.
|
||||
void TryShutdownSocket();
|
||||
|
||||
facade::Connection* conn;
|
||||
|
||||
Fiber full_sync_fb; // Full sync fiber.
|
||||
std::unique_ptr<RdbSaver> saver; // Saver used by the full sync phase.
|
||||
std::unique_ptr<JournalStreamer> streamer;
|
||||
std::string eof_token;
|
||||
|
||||
std::function<void()> cleanup; // Optional cleanup for cancellation.
|
||||
};
|
||||
|
||||
// Stores information related to a single replica.
|
||||
struct ReplicaInfo {
|
||||
ReplicaInfo(unsigned flow_count, std::string address, uint32_t listening_port,
|
||||
|
|
|
@ -72,6 +72,8 @@ void JournalWriter::Write(const journal::Entry& entry) {
|
|||
switch (entry.opcode) {
|
||||
case journal::Op::SELECT:
|
||||
return Write(entry.dbid);
|
||||
case journal::Op::PING:
|
||||
return;
|
||||
case journal::Op::COMMAND:
|
||||
case journal::Op::EXPIRED:
|
||||
case journal::Op::MULTI_COMMAND:
|
||||
|
@ -186,6 +188,10 @@ io::Result<journal::ParsedEntry> JournalReader::ReadEntry() {
|
|||
entry.dbid = dbid_;
|
||||
entry.opcode = opcode;
|
||||
|
||||
if (opcode == journal::Op::PING) {
|
||||
return entry;
|
||||
}
|
||||
|
||||
SET_OR_UNEXPECT(ReadUInt<uint64_t>(), entry.txid);
|
||||
SET_OR_UNEXPECT(ReadUInt<uint32_t>(), entry.shard_cnt);
|
||||
|
||||
|
|
|
@ -19,6 +19,7 @@ enum class Op : uint8_t {
|
|||
COMMAND = 10,
|
||||
MULTI_COMMAND = 11,
|
||||
EXEC = 12,
|
||||
PING = 13,
|
||||
};
|
||||
|
||||
struct EntryBase {
|
||||
|
|
|
@ -138,6 +138,9 @@ Replica::~Replica() {
|
|||
if (sync_fb_.IsJoinable()) {
|
||||
sync_fb_.Join();
|
||||
}
|
||||
if (acks_fb_.IsJoinable()) {
|
||||
acks_fb_.Join();
|
||||
}
|
||||
if (execution_fb_.IsJoinable()) {
|
||||
execution_fb_.Join();
|
||||
}
|
||||
|
@ -202,8 +205,11 @@ void Replica::Stop() {
|
|||
|
||||
// Make sure the replica fully stopped and did all cleanup,
|
||||
// so we can freely release resources (connections).
|
||||
waker_.notifyAll();
|
||||
if (sync_fb_.IsJoinable())
|
||||
sync_fb_.Join();
|
||||
if (acks_fb_.IsJoinable())
|
||||
acks_fb_.Join();
|
||||
}
|
||||
|
||||
void Replica::Pause(bool pause) {
|
||||
|
@ -783,6 +789,9 @@ void Replica::JoinAllFlows() {
|
|||
if (flow->sync_fb_.IsJoinable()) {
|
||||
flow->sync_fb_.Join();
|
||||
}
|
||||
if (flow->acks_fb_.IsJoinable()) {
|
||||
flow->acks_fb_.Join();
|
||||
}
|
||||
if (flow->execution_fb_.IsJoinable()) {
|
||||
flow->execution_fb_.Join();
|
||||
}
|
||||
|
@ -934,6 +943,11 @@ void Replica::StableSyncDflyReadFb(Context* cntx) {
|
|||
|
||||
JournalReader reader{&ps, 0};
|
||||
TransactionReader tx_reader{};
|
||||
|
||||
if (master_context_.version > DflyVersion::VER0) {
|
||||
acks_fb_ = MakeFiber(&Replica::StableSyncDflyAcksFb, this, cntx);
|
||||
}
|
||||
|
||||
while (!cntx->IsCancelled()) {
|
||||
waker_.await([&]() {
|
||||
return ((trans_data_queue_.size() < kYieldAfterItemsInQueue) || cntx->IsCancelled());
|
||||
|
@ -947,16 +961,53 @@ void Replica::StableSyncDflyReadFb(Context* cntx) {
|
|||
|
||||
last_io_time_ = sock_->proactor()->GetMonotonicTimeNs();
|
||||
|
||||
if (use_multi_shard_exe_sync_) {
|
||||
InsertTxDataToShardResource(std::move(*tx_data));
|
||||
if (!tx_data->is_ping) {
|
||||
if (use_multi_shard_exe_sync_) {
|
||||
InsertTxDataToShardResource(std::move(*tx_data));
|
||||
} else {
|
||||
ExecuteTxWithNoShardSync(std::move(*tx_data), cntx);
|
||||
}
|
||||
} else {
|
||||
ExecuteTxWithNoShardSync(std::move(*tx_data), cntx);
|
||||
force_ping_ = true;
|
||||
journal_rec_executed_.fetch_add(1, std::memory_order_relaxed);
|
||||
}
|
||||
|
||||
waker_.notify();
|
||||
}
|
||||
}
|
||||
|
||||
void Replica::StableSyncDflyAcksFb(Context* cntx) {
|
||||
constexpr std::chrono::duration kAckTimeMaxInterval = 3s;
|
||||
constexpr size_t kAckRecordMaxInterval = 1024;
|
||||
std::string ack_cmd;
|
||||
ReqSerializer serializer{sock_.get()};
|
||||
|
||||
auto next_ack_tp = std::chrono::steady_clock::now();
|
||||
|
||||
while (!cntx->IsCancelled()) {
|
||||
// Handle ACKs with the master. PING opcodes from the master mean we should immediately
|
||||
// answer.
|
||||
uint64_t current_offset = journal_rec_executed_.load(std::memory_order_relaxed);
|
||||
VLOG(1) << "Sending an ACK with offset=" << current_offset << " forced=" << force_ping_;
|
||||
ack_cmd = absl::StrCat("REPLCONF ACK ", current_offset);
|
||||
force_ping_ = false;
|
||||
next_ack_tp = std::chrono::steady_clock::now() + kAckTimeMaxInterval;
|
||||
if (auto ec = SendCommand(ack_cmd, &serializer); ec) {
|
||||
cntx->ReportError(ec);
|
||||
break;
|
||||
}
|
||||
ack_offs_ = current_offset;
|
||||
|
||||
waker_.await_until(
|
||||
[&]() {
|
||||
return journal_rec_executed_.load(std::memory_order_relaxed) >
|
||||
ack_offs_ + kAckRecordMaxInterval ||
|
||||
force_ping_ || cntx->IsCancelled();
|
||||
},
|
||||
next_ack_tp);
|
||||
}
|
||||
}
|
||||
|
||||
void Replica::ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx) {
|
||||
if (cntx->IsCancelled()) {
|
||||
return;
|
||||
|
@ -1314,6 +1365,9 @@ bool Replica::TransactionData::AddEntry(journal::ParsedEntry&& entry) {
|
|||
++journal_rec_count;
|
||||
|
||||
switch (entry.opcode) {
|
||||
case journal::Op::PING:
|
||||
is_ping = true;
|
||||
return true;
|
||||
case journal::Op::EXPIRED:
|
||||
case journal::Op::COMMAND:
|
||||
commands.push_back(std::move(entry.cmd));
|
||||
|
@ -1355,7 +1409,8 @@ auto Replica::TransactionReader::NextTxData(JournalReader* reader, Context* cntx
|
|||
|
||||
// Check if journal command can be executed right away.
|
||||
// Expiration checks lock on master, so it never conflicts with running multi transactions.
|
||||
if (res->opcode == journal::Op::EXPIRED || res->opcode == journal::Op::COMMAND)
|
||||
if (res->opcode == journal::Op::EXPIRED || res->opcode == journal::Op::COMMAND ||
|
||||
res->opcode == journal::Op::PING)
|
||||
return TransactionData::FromSingle(std::move(res.value()));
|
||||
|
||||
// Otherwise, continue building multi command.
|
||||
|
|
|
@ -72,6 +72,7 @@ class Replica {
|
|||
uint32_t shard_cnt{0};
|
||||
absl::InlinedVector<journal::ParsedEntry::CmdData, 1> commands{0};
|
||||
uint32_t journal_rec_count{0}; // Count number of source entries to check offset.
|
||||
bool is_ping = false; // For Op::PING entries.
|
||||
};
|
||||
|
||||
// Utility for reading TransactionData from a journal reader.
|
||||
|
@ -162,6 +163,8 @@ class Replica {
|
|||
// Single flow stable state sync fiber spawned by StartStableSyncFlow.
|
||||
void StableSyncDflyReadFb(Context* cntx);
|
||||
|
||||
void StableSyncDflyAcksFb(Context* cntx);
|
||||
|
||||
void StableSyncDflyExecFb(Context* cntx);
|
||||
|
||||
private: /* Utility */
|
||||
|
@ -252,6 +255,8 @@ class Replica {
|
|||
|
||||
// MainReplicationFb in standalone mode, FullSyncDflyFb in flow mode.
|
||||
Fiber sync_fb_;
|
||||
Fiber acks_fb_;
|
||||
bool force_ping_ = false;
|
||||
Fiber execution_fb_;
|
||||
|
||||
std::vector<std::unique_ptr<Replica>> shard_flows_;
|
||||
|
|
|
@ -1939,8 +1939,26 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
|
|||
VLOG(1) << "Client version for session_id="
|
||||
<< cntx->conn_state.replicaiton_info.repl_session_id << " is " << version;
|
||||
cntx->conn_state.replicaiton_info.repl_version = DflyVersion(version);
|
||||
} else if (cmd == "ACK" && args.size() == 2) {
|
||||
// Don't send error/Ok back through the socket, because we don't want to interleave with
|
||||
// the journal writes that we write into the same socket.
|
||||
|
||||
if (!cntx->replication_flow) {
|
||||
LOG(ERROR) << "No replication flow assigned";
|
||||
return;
|
||||
}
|
||||
|
||||
uint64_t ack;
|
||||
if (!absl::SimpleAtoi(arg, &ack)) {
|
||||
LOG(ERROR) << "Bad int in REPLCONF ACK command! arg=" << arg;
|
||||
return;
|
||||
}
|
||||
VLOG(1) << "Received client ACK=" << ack;
|
||||
cntx->replication_flow->last_acked_lsn = ack;
|
||||
return;
|
||||
} else {
|
||||
VLOG(1) << cmd << " " << arg;
|
||||
VLOG(1) << cmd << " " << arg << " " << args.size();
|
||||
goto err;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1948,6 +1966,7 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {
|
|||
return;
|
||||
|
||||
err:
|
||||
LOG(ERROR) << "Error in receiving command: " << args;
|
||||
(*cntx)->SendError(kSyntaxErr);
|
||||
}
|
||||
|
||||
|
|
|
@ -292,7 +292,7 @@ async def test_disconnect_master(df_local_factory, df_seeder_factory, t_master,
|
|||
seeder = df_seeder_factory.create(port=master.port, keys=n_keys, dbcount=2)
|
||||
|
||||
async def crash_master_fs():
|
||||
await asyncio.sleep(random.random() / 10 + 0.1 * len(replicas))
|
||||
await asyncio.sleep(random.random() / 10)
|
||||
master.stop(kill=True)
|
||||
|
||||
async def start_master():
|
||||
|
@ -307,7 +307,8 @@ async def test_disconnect_master(df_local_factory, df_seeder_factory, t_master,
|
|||
|
||||
# Crash master during full sync, but with all passing initial connection phase
|
||||
await asyncio.gather(*(c_replica.execute_command("REPLICAOF localhost " + str(master.port))
|
||||
for c_replica in c_replicas), crash_master_fs())
|
||||
for c_replica in c_replicas))
|
||||
await crash_master_fs()
|
||||
|
||||
await asyncio.sleep(1 + len(replicas) * 0.5)
|
||||
|
||||
|
@ -890,6 +891,8 @@ async def test_role_command(df_local_factory, n_keys=20):
|
|||
assert await c_replica.execute_command("role") == [
|
||||
b'replica', b'localhost', bytes(str(master.port), 'ascii'), b'stable_sync']
|
||||
|
||||
# This tests that we react fast to socket shutdowns and don't hang on
|
||||
# things like the ACK or execution fibers.
|
||||
master.stop()
|
||||
await asyncio.sleep(0.1)
|
||||
assert await c_replica.execute_command("role") == [
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue