feat: process migration data after FIN opcode #2864 (#2918)

* feat: process migration data after FIN opcode #2864
This commit is contained in:
Borys 2024-04-17 23:16:49 +03:00 committed by GitHub
parent 56965edbe1
commit 81fc57a5a9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 44 additions and 26 deletions

View file

@ -879,11 +879,7 @@ void ClusterFamily::DflyMigrateAck(CmdArgList args, ConnectionContext* cntx) {
migration->Join();
VLOG(1) << "Migration is finished for " << source_id;
if (migration->GetState() != MigrationState::C_FINISHED) {
return cntx->SendError("Migration process is not in C_FINISHED state");
}
VLOG(1) << "Migration is joined for " << source_id;
UpdateConfig(migration->GetSlots(), true);
VLOG(1) << "Config is updated for " << MyID();

View file

@ -4,6 +4,7 @@
#include "server/cluster/incoming_slot_migration.h"
#include "absl/cleanup/cleanup.h"
#include "base/logging.h"
#include "server/error.h"
#include "server/journal/executor.h"
@ -21,46 +22,62 @@ using absl::GetFlag;
// It is created per shard on the target node to initiate FLOW step.
class ClusterShardMigration {
public:
ClusterShardMigration(uint32_t shard_id, Service* service) : source_shard_id_(shard_id) {
executor_ = std::make_unique<JournalExecutor>(service);
ClusterShardMigration(uint32_t shard_id, Service* service)
: source_shard_id_(shard_id), socket_(nullptr), executor_(service) {
}
void Start(Context* cntx, util::FiberSocketBase* source) {
socket_ = source;
void Start(Context* cntx, util::FiberSocketBase* source, util::fb2::BlockingCounter bc) {
{
std::lock_guard lk(mu_);
socket_ = source;
}
absl::Cleanup cleanup([this]() {
std::lock_guard lk(mu_);
socket_ = nullptr;
});
JournalReader reader{source, 0};
TransactionReader tx_reader{false};
while (!cntx->IsCancelled()) {
if (cntx->IsCancelled())
break;
auto tx_data = tx_reader.NextTxData(&reader, cntx);
if (!tx_data) {
// TODO add error processing
VLOG(1) << "No tx data";
break;
}
if (tx_data->opcode == journal::Op::FIN) {
VLOG(2) << "Flow " << source_shard_id_ << " is finalized";
break;
} else if (tx_data->opcode == journal::Op::PING) {
while (tx_data->opcode == journal::Op::FIN) {
VLOG(2) << "Attempt to finalize flow " << source_shard_id_;
bc->Dec(); // we can Join the flow now
// if we get new data attempt is failed
if (tx_data = tx_reader.NextTxData(&reader, cntx); !tx_data) {
VLOG(1) << "Finalized flow " << source_shard_id_;
return;
}
bc->Add(); // the flow isn't finished so we lock it again
}
if (tx_data->opcode == journal::Op::PING) {
// TODO check about ping logic
} else {
ExecuteTxWithNoShardSync(std::move(*tx_data), cntx);
}
}
socket_ = nullptr;
bc->Dec(); // we should provide ability to join the flow
}
void Cancel() {
std::error_code Cancel() {
std::lock_guard lk(mu_);
if (socket_ != nullptr) {
socket_->proactor()->Await([s = socket_, sid = source_shard_id_]() {
return socket_->proactor()->Await([s = socket_, sid = source_shard_id_]() {
if (s->IsOpen()) {
s->Shutdown(SHUT_RDWR); // Does not Close(), only forbids further I/O.
return s->Shutdown(SHUT_RDWR); // Does not Close(), only forbids further I/O.
}
return std::error_code();
});
}
return {};
}
private:
@ -71,7 +88,7 @@ class ClusterShardMigration {
CHECK(tx_data.shard_cnt <= 1); // we don't support sync for multishard execution
if (!tx_data.IsGlobalCmd()) {
VLOG(3) << "Execute cmd without sync between shards. txid: " << tx_data.txid;
executor_->Execute(tx_data.dbid, absl::MakeSpan(tx_data.commands));
executor_.Execute(tx_data.dbid, absl::MakeSpan(tx_data.commands));
} else {
// TODO check which global commands should be supported
CHECK(false) << "We don't support command: " << ToSV(tx_data.commands.front().cmd_args[0])
@ -81,8 +98,9 @@ class ClusterShardMigration {
private:
uint32_t source_shard_id_;
util::FiberSocketBase* socket_ = nullptr;
std::unique_ptr<JournalExecutor> executor_;
util::fb2::Mutex mu_;
util::FiberSocketBase* socket_ ABSL_GUARDED_BY(mu_);
JournalExecutor executor_;
};
IncomingSlotMigration::IncomingSlotMigration(string source_id, Service* se, SlotRanges slots,
@ -119,8 +137,7 @@ void IncomingSlotMigration::StartFlow(uint32_t shard, util::FiberSocketBase* sou
VLOG(1) << "Start flow for shard: " << shard;
state_.store(MigrationState::C_SYNC);
shard_flows_[shard]->Start(&cntx_, source);
bc_->Dec();
shard_flows_[shard]->Start(&cntx_, source, bc_);
}
} // namespace dfly

View file

@ -21,8 +21,13 @@ class IncomingSlotMigration {
~IncomingSlotMigration();
// process data from FDLYMIGRATE FLOW cmd
// executes until Cancel called or connection closed
void StartFlow(uint32_t shard, util::FiberSocketBase* source);
// wait untill all flows are got FIN opcode
// Waits until all flows got FIN opcode.
// Join can't be finished if after FIN opcode we get new data
// Connection can be closed by another side, or using Cancel
// After Join we still can get data due to error situation
void Join();
void Cancel();