feat(server): Auto expiry + small replica refactor (#718)

This commit is contained in:
Vladislav 2023-01-31 12:55:52 +03:00 committed by GitHub
parent 152f16bc14
commit 4c9b30ca43
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
13 changed files with 206 additions and 73 deletions

View file

@ -202,6 +202,12 @@ void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt) {
op_args.tx->FinishLogJournalOnShard(op_args.shard, shard_cnt);
}
void RecordExpiry(DbIndex dbid, string_view key) {
auto journal = EngineShard::tlocal()->journal();
CHECK(journal);
journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, make_pair("DEL", ArgSlice{key}));
}
#define ADD(x) (x) += o.x
TieredStats& TieredStats::operator+=(const TieredStats& o) {

View file

@ -99,6 +99,10 @@ void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args,
// Record non auto journal command finish. Call only when command translates to multi commands.
void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt);
// Record expiry in journal with independent transaction. Must be called from shard thread holding
// key.
void RecordExpiry(DbIndex dbid, std::string_view key);
struct TieredStats {
size_t tiered_reads = 0;
size_t tiered_writes = 0;

View file

@ -802,9 +802,17 @@ pair<PrimeIterator, ExpireIterator> DbSlice::ExpireIfNeeded(const Context& cntx,
// TODO: to employ multi-generation update of expire-base and the underlying values.
time_t expire_time = ExpireTime(expire_it);
if (time_t(cntx.time_now_ms) < expire_time)
// Never do expiration on replica.
if (time_t(cntx.time_now_ms) < expire_time || owner_->IsReplica())
return make_pair(it, expire_it);
// Replicate expiry
if (auto journal = EngineShard::tlocal()->journal(); journal) {
std::string stash;
it->first.GetString(&stash);
RecordExpiry(cntx.db_index, stash);
}
PerformDeletion(it, expire_it, shard_owner(), db.get());
++events_.expired_keys;

View file

@ -443,6 +443,10 @@ bool EngineShard::HasResultConverged(TxId notifyid) const {
void EngineShard::Heartbeat() {
CacheStats();
if (IsReplica()) // Never run expiration on replica.
return;
constexpr double kTtlDeleteLimit = 200;
constexpr double kRedLimitFactor = 0.1;

View file

@ -143,6 +143,14 @@ class EngineShard {
journal_ = j;
}
void SetReplica(bool replica) {
is_replica_ = replica;
}
bool IsReplica() const {
return is_replica_;
}
void TEST_EnableHeartbeat();
private:
@ -195,6 +203,9 @@ class EngineShard {
Stats stats_;
// Become passive if replica: don't automatially evict expired items.
bool is_replica_ = false;
// Logical ts used to order distributed transactions.
TxId committed_txid_ = 0;
Transaction* continuation_trans_ = nullptr;

View file

@ -75,6 +75,7 @@ void JournalWriter::Write(const journal::Entry& entry) {
case journal::Op::SELECT:
return Write(entry.dbid);
case journal::Op::COMMAND:
case journal::Op::EXPIRED:
case journal::Op::MULTI_COMMAND:
case journal::Op::EXEC:
Write(entry.txid);

View file

@ -15,6 +15,7 @@ namespace journal {
enum class Op : uint8_t {
NOOP = 0,
SELECT = 6,
EXPIRED = 9,
COMMAND = 10,
MULTI_COMMAND = 11,
EXEC = 12,
@ -43,10 +44,15 @@ struct Entry : public EntryBase {
Entry(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid, 0}, payload{} {
}
Entry(TxId txid, journal::Op opcode, DbIndex dbid, uint32_t shard_cnt)
: EntryBase{txid, opcode, dbid, shard_cnt}, payload{} {
}
bool HasPayload() const {
return !std::holds_alternative<std::monostate>(payload);
}
Payload payload;
};

View file

@ -1981,9 +1981,7 @@ error_code RdbLoaderBase::HandleJournalBlob(Service* service, DbIndex dbid) {
while (done < num_entries) {
journal::ParsedEntry entry{};
SET_OR_RETURN(journal_reader_.ReadEntry(), entry);
if (entry.opcode == journal::Op::COMMAND || entry.opcode == journal::Op::MULTI_COMMAND) {
ex.Execute(entry.dbid, entry.cmd);
}
ex.Execute(entry.dbid, entry.cmd);
done++;
}

View file

@ -182,6 +182,9 @@ void Replica::Pause(bool pause) {
}
void Replica::MainReplicationFb() {
// Switch shard states to replication.
SetShardStates(true);
error_code ec;
while (state_mask_ & R_ENABLED) {
// Discard all previous errors and set default error handler.
@ -239,8 +242,12 @@ void Replica::MainReplicationFb() {
state_mask_ &= ~R_SYNC_OK;
}
// Wait for unblocking cleanup to finish.
cntx_.JoinErrorHandler();
// Revert shard states to normal state.
SetShardStates(false);
VLOG(1) << "Main replication fiber finished";
}
@ -646,6 +653,10 @@ void Replica::JoinAllFlows() {
}
}
void Replica::SetShardStates(bool replica) {
shard_set->RunBriefInParallel([replica](EngineShard* shard) { shard->SetReplica(replica); });
}
void Replica::DefaultErrorHandler(const GenericError& err) {
CloseSocket();
}
@ -793,37 +804,29 @@ void Replica::StableSyncDflyReadFb(Context* cntx) {
JournalReader reader{&ps, 0};
while (!cntx->IsCancelled()) {
TranactionData tx_data;
while (!cntx->IsCancelled()) {
waker_.await([&]() {
return ((trans_data_queue_.size() < kYieldAfterItemsInQueue) || cntx->IsCancelled());
});
if (cntx->IsCancelled()) {
return;
}
auto res = reader.ReadEntry();
if (!res) {
cntx->ReportError(res.error(), "Journal format error");
return;
}
last_io_time_ = sock_->proactor()->GetMonotonicTimeNs();
bool tx_data_full = tx_data.UpdateFromParsedEntry(std::move(*res));
if (tx_data_full == true) {
break;
}
}
waker_.await([&]() {
return ((trans_data_queue_.size() < kYieldAfterItemsInQueue) || cntx->IsCancelled());
});
if (cntx->IsCancelled())
break;
auto tx_data = TransactionData::ReadNext(&reader, cntx);
if (!tx_data)
break;
last_io_time_ = sock_->proactor()->GetMonotonicTimeNs();
if (use_multi_shard_exe_sync_) {
InsertTxDataToShardResource(std::move(tx_data));
InsertTxDataToShardResource(std::move(*tx_data));
} else {
ExecuteTxWithNoShardSync(std::move(tx_data), cntx);
ExecuteTxWithNoShardSync(std::move(*tx_data), cntx);
}
waker_.notify();
}
return;
}
void Replica::ExecuteTxWithNoShardSync(TranactionData&& tx_data, Context* cntx) {
void Replica::ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx) {
if (cntx->IsCancelled()) {
return;
}
@ -836,8 +839,8 @@ void Replica::ExecuteTxWithNoShardSync(TranactionData&& tx_data, Context* cntx)
ExecuteTx(std::move(tx_data), was_insert, cntx);
}
bool Replica::InsertTxToSharedMap(const TranactionData& tx_data) {
std::lock_guard lg{multi_shard_exe_->map_mu};
bool Replica::InsertTxToSharedMap(const TransactionData& tx_data) {
std::lock_guard lk{multi_shard_exe_->map_mu};
auto [it, was_insert] =
multi_shard_exe_->tx_sync_execution.emplace(tx_data.txid, tx_data.shard_cnt);
@ -848,7 +851,7 @@ bool Replica::InsertTxToSharedMap(const TranactionData& tx_data) {
return was_insert;
}
void Replica::InsertTxDataToShardResource(TranactionData&& tx_data) {
void Replica::InsertTxDataToShardResource(TransactionData&& tx_data) {
bool was_insert = false;
if (tx_data.shard_cnt > 1) {
was_insert = InsertTxToSharedMap(tx_data);
@ -870,11 +873,9 @@ void Replica::StableSyncDflyExecFb(Context* cntx) {
trans_data_queue_.pop();
waker_.notify();
}
return;
}
void Replica::ExecuteTx(TranactionData&& tx_data, bool inserted_by_me, Context* cntx) {
void Replica::ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* cntx) {
if (cntx->IsCancelled()) {
return;
}
@ -925,7 +926,8 @@ void Replica::ExecuteTx(TranactionData&& tx_data, bool inserted_by_me, Context*
}
// Erase from map can be done only after all flow fibers executed the transaction commands.
// The last fiber which will decrease the counter to 0 will be the one to erase the data from map
// The last fiber which will decrease the counter to 0 will be the one to erase the data from
// map
auto val = multi_shard_data.counter.fetch_sub(1, std::memory_order_relaxed);
VLOG(2) << "txid: " << tx_data.txid << " counter: " << val;
if (val == 1) {
@ -1162,29 +1164,46 @@ error_code Replica::SendCommand(string_view command, ReqSerializer* serializer)
return ec;
}
bool Replica::TranactionData::UpdateFromParsedEntry(journal::ParsedEntry&& entry) {
if (entry.opcode == journal::Op::EXEC) {
shard_cnt = entry.shard_cnt;
dbid = entry.dbid;
txid = entry.txid;
return true;
} else if (entry.opcode == journal::Op::COMMAND) {
txid = entry.txid;
shard_cnt = entry.shard_cnt;
dbid = entry.dbid;
commands.push_back(std::move(entry.cmd));
return true;
} else if (entry.opcode == journal::Op::MULTI_COMMAND) {
commands.push_back(std::move(entry.cmd));
return false;
} else {
DCHECK(false) << "Unsupported opcode";
bool Replica::TransactionData::AddEntry(journal::ParsedEntry&& entry) {
// Expiry joins multi transaction or runs standalone.
if (entry.opcode == journal::Op::EXPIRED) {
entry.opcode = commands.empty() ? journal::Op::COMMAND : journal::Op::MULTI_COMMAND;
}
switch (entry.opcode) {
case journal::Op::COMMAND:
commands.push_back(std::move(entry.cmd));
[[fallthrough]];
case journal::Op::EXEC:
shard_cnt = entry.shard_cnt;
dbid = entry.dbid;
txid = entry.txid;
return true;
case journal::Op::MULTI_COMMAND:
commands.push_back(std::move(entry.cmd));
return false;
default:
DCHECK(false) << "Unsupported opcode";
}
return false;
}
bool Replica::TranactionData::IsGlobalCmd() const {
bool Replica::TransactionData::IsGlobalCmd() const {
return commands.size() == 1 && commands.front().cmd_args.size() == 1;
}
auto Replica::TransactionData::ReadNext(JournalReader* reader, Context* cntx)
-> optional<TransactionData> {
TransactionData out;
io::Result<journal::ParsedEntry> res;
do {
if (res = reader->ReadEntry(); !res) {
cntx->ReportError(res.error());
return std::nullopt;
}
} while (!cntx->IsCancelled() && !out.AddEntry(std::move(*res)));
return cntx->IsCancelled() ? std::nullopt : make_optional(std::move(out));
}
} // namespace dfly

View file

@ -26,6 +26,7 @@ namespace dfly {
class Service;
class ConnectionContext;
class JournalExecutor;
class JournalReader;
class Replica {
private:
@ -53,16 +54,22 @@ class Replica {
// This class holds the commands of transaction in single shard.
// Once all commands recieved the command can be executed.
struct TranactionData {
TxId txid;
uint32_t shard_cnt;
DbIndex dbid;
std::vector<journal::ParsedEntry::CmdData> commands;
struct TransactionData {
// Update the data from ParsedEntry and return if all shard transaction commands were recieved.
bool UpdateFromParsedEntry(journal::ParsedEntry&& entry);
bool AddEntry(journal::ParsedEntry&& entry);
bool IsGlobalCmd() const;
// Collect next complete transaction data from journal reader.
static std::optional<TransactionData> ReadNext(JournalReader* reader, Context* cntx);
TxId txid;
DbIndex dbid;
uint32_t shard_cnt;
std::vector<journal::ParsedEntry::CmdData> commands;
};
// Coorindator for multi shard execution.
struct MultiShardExecution {
boost::fibers::mutex map_mu;
@ -104,8 +111,9 @@ class Replica {
std::error_code ConsumeRedisStream(); // Redis stable state.
std::error_code ConsumeDflyStream(); // Dragonfly stable state.
void CloseSocket(); // Close replica sockets.
void JoinAllFlows(); // Join all flows if possible.
void CloseSocket(); // Close replica sockets.
void JoinAllFlows(); // Join all flows if possible.
void SetShardStates(bool replica); // Call SetReplica(replica) on all shards.
// Send DFLY SYNC or DFLY STARTSTABLE if stable is true.
std::error_code SendNextPhaseRequest(bool stable);
@ -158,10 +166,10 @@ class Replica {
// Send command, update last_io_time, return error.
std::error_code SendCommand(std::string_view command, facade::ReqSerializer* serializer);
void ExecuteTx(TranactionData&& tx_data, bool inserted_by_me, Context* cntx);
void InsertTxDataToShardResource(TranactionData&& tx_data);
void ExecuteTxWithNoShardSync(TranactionData&& tx_data, Context* cntx);
bool InsertTxToSharedMap(const TranactionData& tx_data);
void ExecuteTx(TransactionData&& tx_data, bool inserted_by_me, Context* cntx);
void InsertTxDataToShardResource(TransactionData&& tx_data);
void ExecuteTxWithNoShardSync(TransactionData&& tx_data, Context* cntx);
bool InsertTxToSharedMap(const TransactionData& tx_data);
public: /* Utility */
struct Info {
@ -197,10 +205,11 @@ class Replica {
std::shared_ptr<MultiShardExecution> multi_shard_exe_;
std::queue<std::pair<TranactionData, bool>> trans_data_queue_;
std::queue<std::pair<TransactionData, bool>> trans_data_queue_;
static constexpr size_t kYieldAfterItemsInQueue = 50;
::util::fibers_ext::EventCount waker_;
::util::fibers_ext::EventCount waker_; // waker for trans_data_queue_
bool use_multi_shard_exe_sync_;
std::unique_ptr<JournalExecutor> executor_;
// MainReplicationFb in standalone mode, FullSyncDflyFb in flow mode.

View file

@ -279,9 +279,12 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req)
// no database switch can be performed between those two calls, because they are part of one
// transaction.
void SliceSnapshot::OnJournalEntry(const journal::Entry& entry) {
if (entry.opcode != journal::Op::COMMAND && entry.opcode != journal::Op::MULTI_COMMAND) {
// We ignore non payload entries like EXEC because we have no transactional ordering during
// LOAD phase on replica.
if (!entry.HasPayload()) {
return;
}
optional<RdbSerializer> tmp_serializer;
RdbSerializer* serializer_ptr = default_serializer_.get();
if (entry.dbid != current_db_) {

View file

@ -370,6 +370,7 @@ async def test_flushall(df_local_factory):
vals = await pipe.execute()
assert all(v is not None for v in vals)
"""
Test journal rewrites.
"""
@ -536,3 +537,60 @@ async def test_rewrites(df_local_factory):
# Check RENAMENX turns into DEL SET and PEXPIREAT
await check_list_ooo("RENAMENX renamed renamekey", [r"DEL renamed", r"SET renamekey 1000", r"PEXPIREAT renamekey (.*?)"])
await check_expire("renamekey")
"""
Test automatic replication of expiry.
"""
@dfly_args({"proactor_threads": 4})
@pytest.mark.asyncio
async def test_expiry(df_local_factory, n_keys=1000):
master = df_local_factory.create(port=BASE_PORT)
replica = df_local_factory.create(port=BASE_PORT+1, logtostdout=True)
df_local_factory.start_all([master, replica])
c_master = aioredis.Redis(port=master.port)
c_replica = aioredis.Redis(port=replica.port)
# Connect replica to master
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
# Set keys
pipe = c_master.pipeline(transaction=False)
batch_fill_data(pipe, gen_test_data(n_keys))
await pipe.execute()
# Check keys are on replica
res = await c_replica.mget(k for k, _ in gen_test_data(n_keys))
assert all(v is not None for v in res)
# Set key expries in 500ms
pipe = c_master.pipeline(transaction=True)
for k, _ in gen_test_data(n_keys):
pipe.pexpire(k, 500)
await pipe.execute()
# Wait two seconds for heatbeat to pick them up
await asyncio.sleep(2.0)
assert len(await c_master.keys()) == 0
assert len(await c_replica.keys()) == 0
# Set keys
pipe = c_master.pipeline(transaction=False)
batch_fill_data(pipe, gen_test_data(n_keys))
for k, _ in gen_test_data(n_keys):
pipe.pexpire(k, 500)
await pipe.execute()
await asyncio.sleep(1.0)
# Disconnect from master
await c_replica.execute_command("REPLICAOF NO ONE")
# Check replica evicts keys on its own
await asyncio.sleep(1.0)
assert len(await c_replica.keys()) == 0

View file

@ -174,15 +174,21 @@ class CommandGenerator:
def gen_shrink_cmd(self):
"""
Generate command that shrinks data: DEL of random keys.
Generate command that shrinks data: DEL of random keys or almost immediate <=50ms PEXPIRE.
"""
keys_gen = (self.randomize_key(pop=True)
if random.random() < 0.3:
key, _ = self.randomize_key(pop=True)
if key == None:
return None, 0
return f"PEXPIRE k{key} {random.randint(0, 50)}", -1
else:
keys_gen = (self.randomize_key(pop=True)
for _ in range(random.randint(1, self.max_multikey)))
keys = [f"k{k}" for k, _ in keys_gen if k is not None]
keys = [f"k{k}" for k, _ in keys_gen if k is not None]
if len(keys) == 0:
return None, 0
return "DEL " + " ".join(keys), -len(keys)
if len(keys) == 0:
return None, 0
return "DEL " + " ".join(keys), -len(keys)
UPDATE_ACTIONS = [
('APPEND {k} {val}', ValueType.STRING),