fix(migration): Use transactions! (#3266)

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2024-07-16 13:06:34 +02:00 committed by GitHub
parent cdd8d50e70
commit 22756eeb81
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 101 additions and 117 deletions

View file

@ -16,6 +16,7 @@
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/streamer.h"
#include "server/main_service.h"
#include "server/server_family.h"
ABSL_FLAG(int, slot_migration_connection_timeout_ms, 2000, "Timeout for network operations");
@ -33,7 +34,10 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
: ProtocolClient(server_context), streamer_(slice, std::move(slots), journal, &cntx_) {
}
void Sync(const std::string& node_id, uint32_t shard_id) {
// Send DFLYMIGRATE FLOW
void PrepareFlow(const std::string& node_id) {
uint32_t shard_id = EngineShard::tlocal()->shard_id();
VLOG(1) << "Connecting to source node_id " << node_id << " shard_id " << shard_id;
auto timeout = absl::GetFlag(FLAGS_slot_migration_connection_timeout_ms) * 1ms;
if (auto ec = ConnectAndAuth(timeout, &cntx_); ec) {
@ -57,10 +61,18 @@ class OutgoingMigration::SliceSlotMigration : private ProtocolClient {
cntx_.ReportError("Incorrect response for FLOW cmd");
return;
}
}
// Register db_slice and journal change listeners
void PrepareSync() {
streamer_.Start(Sock());
}
// Run restore streamer
void RunSync() {
streamer_.Run();
}
void Cancel() {
streamer_.Cancel();
}
@ -82,18 +94,17 @@ OutgoingMigration::OutgoingMigration(MigrationInfo info, ClusterFamily* cf, Serv
migration_info_(std::move(info)),
slot_migrations_(shard_set->size()),
server_family_(sf),
cf_(cf) {
cf_(cf),
tx_(new Transaction{sf->service().FindCmd("DFLYCLUSTER")}) {
tx_->InitByArgs(0, {});
}
OutgoingMigration::~OutgoingMigration() {
main_sync_fb_.JoinIfNeeded();
// Destroy each flow in its dedicated thread, because we could be the last owner of the db tables
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
slot_migrations_[shard->shard_id()].reset();
}
});
// Destroy each flow in its dedicated thread, because we could be the last
// owner of the db tables
OnAllShards([](auto& migration) { migration.reset(); });
}
bool OutgoingMigration::ChangeState(MigrationState new_state) {
@ -106,6 +117,15 @@ bool OutgoingMigration::ChangeState(MigrationState new_state) {
return true;
}
void OutgoingMigration::OnAllShards(
std::function<void(std::unique_ptr<SliceSlotMigration>&)> func) {
shard_set->pool()->AwaitFiberOnAll([this, &func](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
func(slot_migrations_[shard->shard_id()]);
}
});
}
void OutgoingMigration::Finish(bool is_error) {
VLOG(1) << "Finish outgoing migration for " << cf_->MyID() << " : "
<< migration_info_.node_info.id;
@ -132,12 +152,9 @@ void OutgoingMigration::Finish(bool is_error) {
}
if (should_cancel_flows) {
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
auto& flow = slot_migrations_[shard->shard_id()];
CHECK(flow != nullptr);
flow->Cancel();
}
OnAllShards([](auto& migration) {
CHECK(migration != nullptr);
migration->Cancel();
});
}
}
@ -161,8 +178,7 @@ void OutgoingMigration::SyncFb() {
if (last_error_) {
LOG(ERROR) << last_error_.Format();
// if error is happened on the previous attempt we wait for some time and try again
ThisFiber::SleepFor(1000ms);
ThisFiber::SleepFor(1000ms); // wait some time before next retry
}
VLOG(2) << "Connecting to source";
@ -195,27 +211,34 @@ void OutgoingMigration::SyncFb() {
continue;
}
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (auto* shard = EngineShard::tlocal(); shard) {
OnAllShards([this](auto& migration) {
auto* shard = EngineShard::tlocal();
server_family_->journal()->StartInThread();
slot_migrations_[shard->shard_id()] = std::make_unique<SliceSlotMigration>(
migration = std::make_unique<SliceSlotMigration>(
&shard->db_slice(), server(), migration_info_.slot_ranges, server_family_->journal());
}
});
if (!ChangeState(MigrationState::C_SYNC)) {
break;
}
shard_set->pool()->AwaitFiberOnAll([this](util::ProactorBase* pb) {
if (auto* shard = EngineShard::tlocal(); shard) {
auto& migration = slot_migrations_[shard->shard_id()];
CHECK(migration != nullptr);
migration->Sync(cf_->MyID(), shard->shard_id());
if (migration->GetError()) {
OnAllShards([this](auto& migration) { migration->PrepareFlow(cf_->MyID()); });
if (CheckFlowsForErrors()) {
LOG(WARNING) << "Preparation error detected, retrying outgoing migration";
continue;
}
// Global transactional cut for migration to register db_slice and journal
// listeners
{
Transaction::Guard tg{tx_.get()};
OnAllShards([](auto& migration) { migration->PrepareSync(); });
}
OnAllShards([this](auto& migration) {
migration->RunSync();
if (migration->GetError())
Finish(true);
}
}
});
if (CheckFlowsForErrors()) {
@ -240,8 +263,8 @@ void OutgoingMigration::SyncFb() {
}
bool OutgoingMigration::FinalizeMigration(long attempt) {
// if it's not the 1st attempt and flows are work correctly we try to reconnect and ACK one more
// time
// if it's not the 1st attempt and flows are work correctly we try to
// reconnect and ACK one more time
VLOG(1) << "FinalizeMigration for " << cf_->MyID() << " : " << migration_info_.node_info.id;
if (attempt > 1) {
if (CheckFlowsForErrors()) {
@ -255,6 +278,9 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
return false;
}
}
// Migration finalization has to be done via client pause because commands need to
// be blocked on coordinator level to avoid intializing transactions with stale cluster slot info
// TODO implement blocking on migrated slots only
bool is_block_active = true;
auto is_pause_in_progress = [&is_block_active] { return is_block_active; };
@ -270,14 +296,8 @@ bool OutgoingMigration::FinalizeMigration(long attempt) {
pause_fb_opt->JoinIfNeeded();
});
auto cb = [this, attempt](util::ProactorBase* pb) {
if (const auto* shard = EngineShard::tlocal(); shard) {
slot_migrations_[shard->shard_id()]->Finalize(attempt);
}
};
VLOG(1) << "FINALIZE flows for " << cf_->MyID() << " : " << migration_info_.node_info.id;
shard_set->pool()->AwaitFiberOnAll(std::move(cb));
OnAllShards([attempt](auto& migration) { migration->Finalize(attempt); });
auto cmd = absl::StrCat("DFLYMIGRATE ACK ", cf_->MyID(), " ", attempt);
VLOG(1) << "send " << cmd;

View file

@ -3,9 +3,12 @@
//
#pragma once
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include "io/io.h"
#include "server/cluster/cluster_defs.h"
#include "server/protocol_client.h"
#include "server/transaction.h"
namespace dfly {
class DbSlice;
@ -75,6 +78,8 @@ class OutgoingMigration : private ProtocolClient {
bool ChangeState(MigrationState new_state) ABSL_LOCKS_EXCLUDED(state_mu_);
void OnAllShards(std::function<void(std::unique_ptr<SliceSlotMigration>&)>);
private:
MigrationInfo migration_info_;
std::vector<std::unique_ptr<SliceSlotMigration>> slot_migrations_;
@ -87,6 +92,8 @@ class OutgoingMigration : private ProtocolClient {
mutable util::fb2::Mutex state_mu_;
MigrationState state_ ABSL_GUARDED_BY(state_mu_) = MigrationState::C_NO_STATE;
boost::intrusive_ptr<Transaction> tx_;
// when migration is finished we need to store number of migrated keys
// because new request can add or remove keys and we get incorrect statistic
size_t keys_number_ = 0;

View file

@ -1089,19 +1089,7 @@ void DbSlice::ExpireAllIfNeeded() {
}
uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {
// TODO rewrite this logic to be more clear
// this mutex lock is needed to check that this method is not called simultaneously with
// change_cb_ calls and journal_slice::change_cb_arr_ calls.
// It can be unlocked anytime because DbSlice::RegisterOnChange
// and journal_slice::RegisterOnChange calls without preemption
std::lock_guard lk(cb_mu_);
uint64_t ver = NextVersion();
change_cb_.emplace_back(ver, std::move(cb));
DCHECK(std::is_sorted(change_cb_.begin(), change_cb_.end(),
[](auto& a, auto& b) { return a.first < b.first; }));
return ver;
return change_cb_.emplace_back(NextVersion(), std::move(cb)).first;
}
void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_t upper_bound) {
@ -1125,14 +1113,10 @@ void DbSlice::FlushChangeToEarlierCallbacks(DbIndex db_ind, Iterator it, uint64_
//! Unregisters the callback.
void DbSlice::UnregisterOnChange(uint64_t id) {
lock_guard lk(cb_mu_); // we need to wait until callback is finished before remove it
for (auto it = change_cb_.begin(); it != change_cb_.end(); ++it) {
if (it->first == id) {
auto it = find_if(change_cb_.begin(), change_cb_.end(),
[id](const auto& cb) { return cb.first == id; });
CHECK(it != change_cb_.end());
change_cb_.erase(it);
return;
}
}
LOG(DFATAL) << "Could not find " << id << " to unregister";
}
auto DbSlice::DeleteExpiredStep(const Context& cntx, unsigned count) -> DeleteExpiredStats {

View file

@ -469,14 +469,6 @@ class DbSlice {
void PerformDeletion(Iterator del_it, DbTable* table);
void PerformDeletion(PrimeIterator del_it, DbTable* table);
void LockChangeCb() const {
return cb_mu_.lock_shared();
}
void UnlockChangeCb() const {
return cb_mu_.unlock_shared();
}
private:
void PreUpdate(DbIndex db_ind, Iterator it, std::string_view key);
void PostUpdate(DbIndex db_ind, Iterator it, std::string_view key, size_t orig_size);
@ -552,12 +544,6 @@ class DbSlice {
// Used in temporary computations in Acquire/Release.
mutable absl::flat_hash_set<uint64_t> uniq_fps_;
// To ensure correct data replication, we must serialize the buckets that each running command
// will modify, followed by serializing the command to the journal. We use a mutex to prevent
// interleaving between bucket and journal registrations, and the command execution with its
// journaling. LockChangeCb is called before the callback, and UnlockChangeCb is called after
// journaling is completed. Register to bucket and journal changes is also does without preemption
mutable util::fb2::SharedMutex cb_mu_;
// ordered from the smallest to largest version.
std::vector<std::pair<uint64_t, ChangeCallback>> change_cb_;

View file

@ -254,11 +254,7 @@ void SaveStagesController::SaveDfs() {
// Save shard files.
auto cb = [this](Transaction* t, EngineShard* shard) {
auto& db_slice = shard->db_slice();
// a hack to avoid deadlock in Transaction::RunCallback(...)
db_slice.UnlockChangeCb();
SaveDfsSingle(shard);
db_slice.LockChangeCb();
return OpStatus::OK;
};
trans_->ScheduleSingleHop(std::move(cb));
@ -298,11 +294,7 @@ void SaveStagesController::SaveRdb() {
}
auto cb = [snapshot = snapshot.get()](Transaction* t, EngineShard* shard) {
// a hack to avoid deadlock in Transaction::RunCallback(...)
auto& db_slice = shard->db_slice();
db_slice.UnlockChangeCb();
snapshot->StartInShard(shard);
db_slice.LockChangeCb();
return OpStatus::OK;
};
trans_->ScheduleSingleHop(std::move(cb));

View file

@ -71,32 +71,6 @@ std::string_view SyncStateName(DflyCmd::SyncState sync_state) {
return "unsupported";
}
struct TransactionGuard {
static OpStatus ExitGuardCb(Transaction* t, EngineShard* shard) {
t->GetDbSlice(shard->shard_id()).SetExpireAllowed(true);
return OpStatus::OK;
};
explicit TransactionGuard(Transaction* t, bool disable_expirations = false) : t(t) {
t->Execute(
[disable_expirations](Transaction* t, EngineShard* shard) {
if (disable_expirations) {
t->GetDbSlice(shard->shard_id()).SetExpireAllowed(!disable_expirations);
}
return OpStatus::OK;
},
false);
VLOG(2) << "Transaction guard engaged";
}
~TransactionGuard() {
VLOG(2) << "Releasing transaction guard";
t->Execute(ExitGuardCb, true);
}
Transaction* t;
};
OpStatus WaitReplicaFlowToCatchup(absl::Time end_time, shared_ptr<DflyCmd::ReplicaInfo> replica,
EngineShard* shard) {
// We don't want any writes to the journal after we send the `PING`,
@ -299,7 +273,7 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
// Start full sync.
{
TransactionGuard tg{cntx->transaction};
Transaction::Guard tg{cntx->transaction};
AggregateStatus status;
// Use explicit assignment for replica_ptr, because capturing structured bindings is C++20.
@ -337,7 +311,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
return;
{
TransactionGuard tg{cntx->transaction};
Transaction::Guard tg{cntx->transaction};
AggregateStatus status;
auto cb = [this, &status, replica_ptr = replica_ptr](EngineShard* shard) {

View file

@ -201,6 +201,10 @@ void RestoreStreamer::Start(util::FiberSocketBase* dest, bool send_lsn) {
snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb));
JournalStreamer::Start(dest, send_lsn);
}
void RestoreStreamer::Run() {
VLOG(1) << "RestoreStreamer run";
PrimeTable::Cursor cursor;
uint64_t last_yield = 0;

View file

@ -77,6 +77,9 @@ class RestoreStreamer : public JournalStreamer {
~RestoreStreamer() override;
void Start(util::FiberSocketBase* dest, bool send_lsn = false) override;
void Run();
// Cancel() must be called if Start() is called
void Cancel() override;

View file

@ -127,6 +127,16 @@ cv_status Transaction::BatonBarrier::Wait(time_point tp) {
return cv_status::no_timeout;
}
Transaction::Guard::Guard(Transaction* tx) : tx(tx) {
DCHECK(tx->cid_->opt_mask() & CO::GLOBAL_TRANS);
tx->Execute([](auto*, auto*) { return OpStatus::OK; }, false);
}
Transaction::Guard::~Guard() {
tx->Conclude();
tx->Refurbish();
}
Transaction::Transaction(const CommandId* cid) : cid_{cid} {
InitTxTime();
string_view cmd_name(cid_->name());
@ -620,7 +630,6 @@ void Transaction::RunCallback(EngineShard* shard) {
RunnableResult result;
auto& db_slice = GetDbSlice(shard->shard_id());
db_slice.LockChangeCb();
try {
result = (*cb_ptr_)(this, shard);
@ -658,10 +667,7 @@ void Transaction::RunCallback(EngineShard* shard) {
// Log to journal only once the command finished running
if ((coordinator_state_ & COORD_CONCLUDING) || (multi_ && multi_->concluding)) {
LogAutoJournalOnShard(shard, result);
db_slice.UnlockChangeCb();
MaybeInvokeTrackingCb();
} else {
db_slice.UnlockChangeCb();
}
}
@ -1223,11 +1229,11 @@ OpStatus Transaction::RunSquashedMultiCb(RunnableType cb) {
auto* shard = EngineShard::tlocal();
auto& db_slice = GetDbSlice(shard->shard_id());
db_slice.LockChangeCb();
auto result = cb(this, shard);
db_slice.OnCbFinish();
LogAutoJournalOnShard(shard, result);
db_slice.UnlockChangeCb();
MaybeInvokeTrackingCb();
DCHECK_EQ(result.flags, 0); // if it's sophisticated, we shouldn't squash it

View file

@ -170,6 +170,14 @@ class Transaction {
RAN_IMMEDIATELY = 1 << 7, // Whether the shard executed immediately (during schedule)
};
struct Guard {
explicit Guard(Transaction* tx);
~Guard();
private:
Transaction* tx;
};
explicit Transaction(const CommandId* cid);
// Initialize transaction for squashing placed on a specific shard with a given parent tx