diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 0bf3585ba..2384c99e5 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,4 +1,5 @@ default_stages: [commit] +exclude: 'src\/redis\/.*' repos: - repo: local hooks: diff --git a/src/redis/rdb.h b/src/redis/rdb.h index 78770e59b..c9d58eb36 100644 --- a/src/redis/rdb.h +++ b/src/redis/rdb.h @@ -103,6 +103,8 @@ /* Test if a type is an object type. */ #define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18)) +/* Range 200-240 is used by Dragonfly specific opcodes */ + /* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */ #define RDB_OPCODE_FUNCTION 246 /* engine data */ #define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */ diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index ea45dcd16..eb1d14abe 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -542,7 +542,7 @@ uint32_t DbSlice::GetMCFlag(DbIndex db_ind, const PrimeKey& key) const { PrimeIterator DbSlice::AddNew(const Context& cntx, string_view key, PrimeValue obj, uint64_t expire_at_ms) noexcept(false) { - auto [it, added] = AddEntry(cntx, key, std::move(obj), expire_at_ms); + auto [it, added] = AddOrSkip(cntx, key, std::move(obj), expire_at_ms); CHECK(added); return it; @@ -571,12 +571,14 @@ OpStatus DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime_it, return OpStatus::OK; } -pair DbSlice::AddEntry(const Context& cntx, string_view key, PrimeValue obj, - uint64_t expire_at_ms) noexcept(false) { +std::pair DbSlice::AddOrUpdateInternal(const Context& cntx, + std::string_view key, PrimeValue obj, + uint64_t expire_at_ms, + bool force_update) noexcept(false) { DCHECK(!obj.IsRef()); pair res = AddOrFind(cntx, key); - if (!res.second) // have not inserted. + if (!res.second && !force_update) // have not inserted. return res; auto& db = *db_arr_[cntx.db_index]; @@ -588,12 +590,26 @@ pair DbSlice::AddEntry(const Context& cntx, string_view key if (expire_at_ms) { it->second.SetExpire(true); uint64_t delta = expire_at_ms - expire_base_[0]; - CHECK(db.expire.Insert(it->first.AsRef(), ExpirePeriod(delta)).second); + auto [eit, inserted] = db.expire.Insert(it->first.AsRef(), ExpirePeriod(delta)); + CHECK(inserted || force_update); + if (!inserted) { + eit->second = ExpirePeriod(delta); + } } return res; } +pair DbSlice::AddOrUpdate(const Context& cntx, string_view key, PrimeValue obj, + uint64_t expire_at_ms) noexcept(false) { + return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, true); +} + +pair DbSlice::AddOrSkip(const Context& cntx, string_view key, PrimeValue obj, + uint64_t expire_at_ms) noexcept(false) { + return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, false); +} + size_t DbSlice::DbSize(DbIndex db_ind) const { DCHECK_LT(db_ind, db_array_size()); diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 295af37ee..97c089301 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -150,11 +150,16 @@ class DbSlice { std::tuple AddOrFind2(const Context& cntx, std::string_view key) noexcept(false); + // Same as AddOrSkip, but overwrites in case entry exists. + // Returns second=true if insertion took place. + std::pair AddOrUpdate(const Context& cntx, std::string_view key, + PrimeValue obj, uint64_t expire_at_ms) noexcept(false); + // Returns second=true if insertion took place, false otherwise. // expire_at_ms equal to 0 - means no expiry. // throws: bad_alloc is insertion could not happen due to out of memory. - std::pair AddEntry(const Context& cntx, std::string_view key, PrimeValue obj, - uint64_t expire_at_ms) noexcept(false); + std::pair AddOrSkip(const Context& cntx, std::string_view key, + PrimeValue obj, uint64_t expire_at_ms) noexcept(false); // Adds a new entry. Requires: key does not exist in this slice. // Returns the iterator to the newly added entry. @@ -285,6 +290,10 @@ class DbSlice { void InvalidateDbWatches(DbIndex db_indx); private: + std::pair AddOrUpdateInternal(const Context& cntx, std::string_view key, + PrimeValue obj, uint64_t expire_at_ms, + bool force_update) noexcept(false); + void CreateDb(DbIndex index); size_t EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* table); diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index a9cdea550..6531d2799 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -87,6 +87,10 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) { return Sync(args, cntx); } + if (sub_cmd == "STARTSTABLE" && args.size() == 3) { + return StartStable(args, cntx); + } + if (sub_cmd == "EXPIRE") { return Expire(args, cntx); } @@ -258,16 +262,8 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) { return; unique_lock lk(sync_info->mu); - if (sync_info->state != SyncState::PREPARATION) - return rb->SendError(kInvalidState); - - // Check all flows are connected. - // This might happen if a flow abruptly disconnected before sending the SYNC request. - for (const FlowInfo& flow : sync_info->flows) { - if (!flow.conn) { - return rb->SendError(kInvalidState); - } - } + if (!CheckReplicaStateOrReply(*sync_info, SyncState::PREPARATION, rb)) + return; // Start full sync. { @@ -288,6 +284,38 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) { return rb->SendOk(); } +void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) { + RedisReplyBuilder* rb = static_cast(cntx->reply_builder()); + string_view sync_id_str = ArgS(args, 2); + + VLOG(1) << "Got DFLY STARTSTABLE " << sync_id_str; + + auto [sync_id, sync_info] = GetSyncInfoOrReply(sync_id_str, rb); + if (!sync_id) + return; + + unique_lock lk(sync_info->mu); + if (!CheckReplicaStateOrReply(*sync_info, SyncState::FULL_SYNC, rb)) + return; + + { + TransactionGuard tg{cntx->transaction}; + AggregateStatus status; + + auto cb = [this, &status, sync_info = sync_info](unsigned index, auto*) { + status = StartStableSyncInThread(&sync_info->flows[index], EngineShard::tlocal()); + return OpStatus::OK; + }; + shard_set->pool()->AwaitFiberOnAll(std::move(cb)); + + if (*status != OpStatus::OK) + return rb->SendError(kInvalidState); + } + + sync_info->state = SyncState::STABLE_SYNC; + return rb->SendOk(); +} + void DflyCmd::Expire(CmdArgList args, ConnectionContext* cntx) { RedisReplyBuilder* rb = static_cast(cntx->reply_builder()); cntx->transaction->ScheduleSingleHop([](Transaction* t, EngineShard* shard) { @@ -304,14 +332,42 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, EngineShard* shard) { SaveMode save_mode = shard == nullptr ? SaveMode::SUMMARY : SaveMode::SINGLE_SHARD; flow->saver.reset(new RdbSaver(flow->conn->socket(), save_mode, false)); + // Shard can be null for io thread. if (shard != nullptr) { - flow->saver->StartSnapshotInShard(false, shard); + auto ec = sf_->journal()->OpenInThread(false, string_view()); + CHECK(!ec); + flow->saver->StartSnapshotInShard(true, shard); } flow->fb = ::boost::fibers::fiber(&DflyCmd::FullSyncFb, this, flow); return OpStatus::OK; } +OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, EngineShard* shard) { + // Shard can be null for io thread. + if (shard != nullptr) { + flow->saver->StopSnapshotInShard(shard); + } + + // Wait for full sync to finish. + if (flow->fb.joinable()) { + flow->fb.join(); + } + + if (shard != nullptr) { + flow->saver.reset(); + + // TODO: Add cancellation. + auto cb = sf_->journal()->RegisterOnChange([flow](const journal::Entry& je) { + // TODO: Serialize event. + ReqSerializer serializer{flow->conn->socket()}; + serializer.SendCommand(absl::StrCat("SET ", je.key, " ", je.pval_ptr->ToString())); + }); + } + + return OpStatus::OK; +} + void DflyCmd::FullSyncFb(FlowInfo* flow) { error_code ec; RdbSaver* saver = flow->saver.get(); @@ -328,22 +384,20 @@ void DflyCmd::FullSyncFb(FlowInfo* flow) { return; } - if (saver->Mode() != SaveMode::SUMMARY) { - // TODO: we should be able to stop earlier if requested. - ec = saver->SaveBody(nullptr); - if (ec) { - LOG(ERROR) << ec; - return; - } + // TODO: we should be able to stop earlier if requested. + ec = saver->SaveBody(nullptr); + if (ec) { + LOG(ERROR) << ec; + return; } + VLOG(1) << "Sending full sync EOF"; + ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token)); if (ec) { LOG(ERROR) << ec; return; } - - ec = flow->conn->socket()->Shutdown(SHUT_RDWR); } uint32_t DflyCmd::CreateSyncSession() { @@ -429,6 +483,25 @@ pair> DflyCmd::GetSyncInfoOrReply(std::s return {sync_id, sync_it->second}; } +bool DflyCmd::CheckReplicaStateOrReply(const SyncInfo& sync_info, SyncState expected, + RedisReplyBuilder* rb) { + if (sync_info.state != expected) { + rb->SendError(kInvalidState); + return false; + } + + // Check all flows are connected. + // This might happen if a flow abruptly disconnected before sending the SYNC request. + for (const FlowInfo& flow : sync_info.flows) { + if (!flow.conn) { + rb->SendError(kInvalidState); + return false; + } + } + + return true; +} + void DflyCmd::BreakOnShutdown() { VLOG(1) << "BreakOnShutdown"; } diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index 47a77f646..abbfbf5ab 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -31,7 +31,7 @@ class Journal; class DflyCmd { public: - enum class SyncState { PREPARATION, FULL_SYNC, CANCELLED }; + enum class SyncState { PREPARATION, FULL_SYNC, STABLE_SYNC, CANCELLED }; struct FlowInfo { FlowInfo() = default; @@ -80,11 +80,14 @@ class DflyCmd { // Register connection as flow for sync session. void Flow(CmdArgList args, ConnectionContext* cntx); - // SYNC - // Migrate connection to required flow thread. - // Stub: will be replcaed with full sync. + // SYNC + // Initiate full sync. void Sync(CmdArgList args, ConnectionContext* cntx); + // STARTSTABLE + // Switch to stable state replication. + void StartStable(CmdArgList args, ConnectionContext* cntx); + // EXPIRE // Check all keys for expiry. void Expire(CmdArgList args, ConnectionContext* cntx); @@ -92,6 +95,9 @@ class DflyCmd { // Start full sync in thread. Start FullSyncFb. Called for each flow. facade::OpStatus StartFullSyncInThread(FlowInfo* flow, EngineShard* shard); + // Start stable sync in thread. Called for each flow. + facade::OpStatus StartStableSyncInThread(FlowInfo* flow, EngineShard* shard); + // Fiber that runs full sync for each flow. void FullSyncFb(FlowInfo* flow); @@ -108,6 +114,9 @@ class DflyCmd { std::pair> GetSyncInfoOrReply(std::string_view id, facade::RedisReplyBuilder* rb); + bool CheckReplicaStateOrReply(const SyncInfo& si, SyncState expected, + facade::RedisReplyBuilder* rb); + ServerFamily* sf_; util::ListenerInterface* listener_; diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 5b5915ff2..5fcfcbf23 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -161,7 +161,7 @@ bool RdbRestoreValue::Add(std::string_view data, std::string_view key, DbSlice& return false; } DbContext context{.db_index = index, .time_now_ms = GetCurrentTimeMs()}; - auto [it, added] = db_slice.AddEntry(context, key, std::move(pv), item.expire_ms); + auto [it, added] = db_slice.AddOrSkip(context, key, std::move(pv), item.expire_ms); return added; } diff --git a/src/server/rdb_extensions.h b/src/server/rdb_extensions.h new file mode 100644 index 000000000..2a9243fd9 --- /dev/null +++ b/src/server/rdb_extensions.h @@ -0,0 +1,12 @@ +// Copyright 2022, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +// Range 200-240 is used by DF extensions. + +// This opcode is sent by the master Dragonfly instance to a replica +// to notify that it finished streaming static data and is ready +// to switch to the stable state replication phase. +const uint8_t RDB_OPCODE_FULLSYNC_END = 200; diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 958089e3e..63c1a41fb 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -27,6 +27,7 @@ extern "C" { #include "server/engine_shard_set.h" #include "server/error.h" #include "server/hset_family.h" +#include "server/rdb_extensions.h" #include "server/script_mgr.h" #include "server/server_state.h" #include "server/set_family.h" @@ -1553,6 +1554,12 @@ error_code RdbLoader::Load(io::Source* src) { break; } + if (type == RDB_OPCODE_FULLSYNC_END) { + if (full_sync_cut_cb) + full_sync_cut_cb(); + continue; + } + if (type == RDB_OPCODE_SELECTDB) { unsigned dbid = 0; @@ -1815,8 +1822,7 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) { if (item.expire_ms > 0 && db_cntx.time_now_ms >= item.expire_ms) continue; - auto [it, added] = db_slice.AddEntry(db_cntx, item.key, std::move(pv), item.expire_ms); - + auto [it, added] = db_slice.AddOrUpdate(db_cntx, item.key, std::move(pv), item.expire_ms); if (!added) { LOG(WARNING) << "RDB has duplicated key '" << item.key << "' in DB " << db_ind; } diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index ff1994e82..b8d04bb64 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -172,6 +172,13 @@ class RdbLoader : protected RdbLoaderBase { return load_time_; } + // Set callback for receiving RDB_OPCODE_FULLSYNC_END. + // This opcode is used by a master instance to notify it finished streaming static data + // and is ready to switch to stable state sync. + void SetFullSyncCutCb(std::function cb) { + full_sync_cut_cb = std::move(cb); + } + private: struct ObjSettings; std::error_code LoadKeyValPair(int type, ObjSettings* settings); @@ -194,6 +201,9 @@ class RdbLoader : protected RdbLoaderBase { ::boost::fibers::mutex mu_; std::error_code ec_; // guarded by mu_ std::atomic_bool stop_early_{false}; + + // Callback when receiving RDB_OPCODE_FULLSYNC_END + std::function full_sync_cut_cb; }; } // namespace dfly diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 4119490c2..890ea13ed 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -24,6 +24,7 @@ extern "C" { #include "base/logging.h" #include "server/engine_shard_set.h" #include "server/error.h" +#include "server/rdb_extensions.h" #include "server/snapshot.h" #include "util/fibers/simple_channel.h" @@ -581,6 +582,11 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) { return error_code{}; } +error_code RdbSerializer::SendFullSyncCut() { + RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_FULLSYNC_END)); + return FlushMem(); +} + // TODO: if buf is large enough, it makes sense to write both mem_buf and buf // directly to sink_. error_code RdbSerializer::WriteRaw(const io::Bytes& buf) { @@ -921,12 +927,15 @@ error_code RdbSaver::SaveHeader(const StringVec& lua_scripts) { error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) { RETURN_ON_ERR(impl_->serializer()->FlushMem()); - VLOG(1) << "SaveBody , snapshots count: " << impl_->Size(); - - error_code io_error = impl_->ConsumeChannel(); - if (io_error) { - LOG(ERROR) << "io error " << io_error; - return io_error; + if (save_mode_ == SaveMode::SUMMARY) { + impl_->serializer()->SendFullSyncCut(); + } else { + VLOG(1) << "SaveBody , snapshots count: " << impl_->Size(); + error_code io_error = impl_->ConsumeChannel(); + if (io_error) { + LOG(ERROR) << "io error " << io_error; + return io_error; + } } RETURN_ON_ERR(SaveEpilog()); diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 66d5ea6a3..02185eb3b 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -85,7 +85,9 @@ class RdbSaver { // freq_map can optionally be null. std::error_code SaveBody(RdbTypeFreqMap* freq_map); - SaveMode Mode() const { return save_mode_; } + SaveMode Mode() const { + return save_mode_; + } private: class Impl; @@ -140,6 +142,8 @@ class RdbSerializer { // for the dump command - thus it is public function std::error_code SaveValue(const PrimeValue& pv); + std::error_code SendFullSyncCut(); + private: std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len); std::error_code SaveObject(const PrimeValue& pv); diff --git a/src/server/replica.cc b/src/server/replica.cc index f61495006..611c47eaa 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -204,13 +204,13 @@ void Replica::MainReplicationFb() { this_fiber::sleep_for(50ms); } + service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); if (ec) { LOG(WARNING) << "Error syncing " << ec << " " << ec.message(); state_mask_ &= R_ENABLED; // reset all flags besides R_ENABLED continue; } - service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE); VLOG(1) << "Replica greet ok"; } @@ -422,17 +422,18 @@ error_code Replica::InitiateDflySync() { shard_flows_[i].reset(new Replica(master_context_, i, &service_)); } + SyncBlock sb{num_df_flows_}; + AggregateError ec; auto partition = Partition(num_df_flows_); shard_set->pool()->AwaitFiberOnAll([&](unsigned index, auto*) { for (auto id : partition[index]) { - if (ec = shard_flows_[id]->StartAsDflyFlow()) + if (ec = shard_flows_[id]->StartFullSyncFlow(&sb)) break; } }); - if (ec) - return *ec; + RETURN_ON_ERR(*ec); ReqSerializer serializer{sock_.get()}; @@ -447,8 +448,12 @@ error_code Replica::InitiateDflySync() { return make_error_code(errc::bad_message); } - for (auto& flow : shard_flows_) - flow->sync_fb_.join(); + // Wait for all flows to receive full sync cut. + { + VLOG(1) << "Blocking before full sync cut"; + std::unique_lock lk(sb.mu_); + sb.cv_.wait(lk, [&]() { return sb.flows_left == 0; }); + } LOG(INFO) << "Full sync finished"; state_mask_ |= R_SYNC_OK; @@ -503,20 +508,43 @@ error_code Replica::ConsumeRedisStream() { } error_code Replica::ConsumeDflyStream() { - ReqSerializer serializer{sock_.get()}; - // TBD - serializer.SendCommand("QUIT"); - state_mask_ &= ~R_ENABLED; // disable further - TODO: not finished. - RETURN_ON_ERR(serializer.ec()); + // Request master to transition to stable sync. + { + ReqSerializer serializer{sock_.get()}; + serializer.SendCommand(StrCat("DFLY STARTSTABLE ", master_context_.dfly_session_id)); + RETURN_ON_ERR(serializer.ec()); + } - base::IoBuf io_buf{128}; + // Wait for all flows to finish full sync. + for (auto& sub_repl : shard_flows_) + sub_repl->sync_fb_.join(); - RETURN_ON_ERR(Recv(sock_.get(), &io_buf)); + AggregateError all_ec; + vector> partition = Partition(num_df_flows_); + shard_set->pool()->AwaitFiberOnAll([&](unsigned index, auto*) { + const auto& local_ids = partition[index]; + for (unsigned id : local_ids) { + all_ec = shard_flows_[id]->StartStableSyncFlow(); + if (all_ec) + break; + } + }); + + RETURN_ON_ERR(*all_ec); + + base::IoBuf io_buf(16_KB); + std::error_code ec; + while (!ec) { + io::MutableBytes buf = io_buf.AppendBuffer(); + io::Result size_res = sock_->Recv(buf); + if (!size_res) + return size_res.error(); + } return error_code{}; } -error_code Replica::StartAsDflyFlow() { +error_code Replica::StartFullSyncFlow(SyncBlock* sb) { CHECK(!sock_); DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty()); @@ -536,12 +564,12 @@ error_code Replica::StartAsDflyFlow() { parser_.reset(new RedisParser{false}); // client mode - std::unique_ptr io_buf{new base::IoBuf(128)}; + leftover_buf_.reset(new base::IoBuf(128)); unsigned consumed = 0; - RETURN_ON_ERR(ReadRespReply(io_buf.get(), &consumed)); // uses parser_ + RETURN_ON_ERR(ReadRespReply(leftover_buf_.get(), &consumed)); // uses parser_ if (!CheckRespFirstTypes({RespExpr::STRING, RespExpr::STRING})) { - LOG(ERROR) << "Bad FLOW response " << ToSV(io_buf->InputBuffer()); + LOG(ERROR) << "Bad FLOW response " << ToSV(leftover_buf_->InputBuffer()); return make_error_code(errc::bad_message); } @@ -550,40 +578,105 @@ error_code Replica::StartAsDflyFlow() { if (flow_directive == "FULL") { eof_token = ToSV(resp_args_[1].GetBuf()); } else { - LOG(ERROR) << "Bad FLOW response " << ToSV(io_buf->InputBuffer()); + LOG(ERROR) << "Bad FLOW response " << ToSV(leftover_buf_->InputBuffer()); } - io_buf->ConsumeInput(consumed); + leftover_buf_->ConsumeInput(consumed); state_mask_ = R_ENABLED | R_TCP_CONNECTED; // We can not discard io_buf because it may contain data // besides the response we parsed. Therefore we pass it further to ReplicateDFFb. - sync_fb_ = - ::boost::fibers::fiber(&Replica::FullSyncDflyFb, this, std::move(io_buf), move(eof_token)); + sync_fb_ = ::boost::fibers::fiber(&Replica::FullSyncDflyFb, this, sb, move(eof_token)); return error_code{}; } -void Replica::FullSyncDflyFb(unique_ptr io_buf, string eof_token) { +error_code Replica::StartStableSyncFlow() { + DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty()); + ProactorBase* mythread = ProactorBase::me(); + CHECK(mythread); + + CHECK(sock_->IsOpen()); + // sock_.reset(mythread->CreateSocket()); + // RETURN_ON_ERR(sock_->Connect(master_context_.master_ep)); + sync_fb_ = ::boost::fibers::fiber(&Replica::StableSyncDflyFb, this); + + return std::error_code{}; +} + +void Replica::FullSyncDflyFb(SyncBlock* sb, string eof_token) { + DCHECK(leftover_buf_); SocketSource ss{sock_.get()}; - io::PrefixSource ps{io_buf->InputBuffer(), &ss}; + io::PrefixSource ps{leftover_buf_->InputBuffer(), &ss}; RdbLoader loader(NULL); + loader.SetFullSyncCutCb([this, sb, ran = false]() mutable { + if (!ran) { + std::unique_lock lk(sb->mu_); + sb->flows_left--; + ran = true; + } + sb->cv_.notify_all(); + }); loader.Load(&ps); + // Try finding eof token. + io::PrefixSource chained_tail{loader.Leftover(), &ps}; if (!eof_token.empty()) { - unique_ptr buf(new uint8_t[eof_token.size()]); - // pass leftover data from the loader. - io::PrefixSource chained(loader.Leftover(), &ps); - VLOG(1) << "Before reading from chained stream"; - io::Result eof_res = chained.Read(io::MutableBytes{buf.get(), eof_token.size()}); - if (!eof_res || *eof_res != eof_token.size()) { + unique_ptr buf{new uint8_t[eof_token.size()]}; + + io::Result res = + chained_tail.ReadAtLeast(io::MutableBytes{buf.get(), eof_token.size()}, eof_token.size()); + + if (!res || *res != eof_token.size()) { LOG(ERROR) << "Error finding eof token in the stream"; } - - // TODO - to compare tokens } - VLOG(1) << "ReplicateDFFb finished after reading " << loader.bytes_read() << " bytes"; + + // Keep loader leftover. + io::Bytes unused = chained_tail.unused_prefix(); + if (unused.size() > 0) { + leftover_buf_.reset(new base::IoBuf{unused.size()}); + auto mut_bytes = leftover_buf_->AppendBuffer(); + memcpy(mut_bytes.data(), unused.data(), unused.size()); + leftover_buf_->CommitWrite(unused.size()); + } else { + leftover_buf_.reset(); + } + + VLOG(1) << "FullSyncDflyFb finished after reading " << loader.bytes_read() << " bytes"; +} + +void Replica::StableSyncDflyFb() { + base::IoBuf io_buf(16_KB); + parser_.reset(new RedisParser); + + // Check leftover from stable state. + if (leftover_buf_ && leftover_buf_->InputLen() > 0) { + size_t len = leftover_buf_->InputLen(); + leftover_buf_->ReadAndConsume(len, io_buf.AppendBuffer().data()); + io_buf.CommitWrite(len); + leftover_buf_.reset(); + } + + error_code ec; + string ack_cmd; + + while (!ec) { + io::MutableBytes buf = io_buf.AppendBuffer(); + io::Result size_res = sock_->Recv(buf); + if (!size_res) + return; + + last_io_time_ = sock_->proactor()->GetMonotonicTimeNs(); + + io_buf.CommitWrite(*size_res); + repl_offs_ += *size_res; + + ec = ParseAndExecute(&io_buf); + } + + return; } error_code Replica::ReadRespReply(base::IoBuf* io_buf, uint32_t* consumed) { diff --git a/src/server/replica.h b/src/server/replica.h index 4de130f6a..6c2500306 100644 --- a/src/server/replica.h +++ b/src/server/replica.h @@ -3,7 +3,9 @@ // #pragma once +#include #include +#include #include #include "base/io_buf.h" @@ -44,6 +46,16 @@ class Replica { R_SYNC_OK = 0x10, }; + // A generic barrier that is used for waiting for + // flow fibers to become ready for the stable state switch. + struct SyncBlock { + SyncBlock(unsigned flows) : flows_left{flows} { + } + unsigned flows_left; + ::boost::fibers::mutex mu_; + ::boost::fibers::condition_variable cv_; + }; + public: Replica(std::string master_host, uint16_t port, Service* se); ~Replica(); @@ -75,10 +87,16 @@ class Replica { Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* service); // Start replica initialized as dfly flow. - std::error_code StartAsDflyFlow(); + std::error_code StartFullSyncFlow(SyncBlock* block); - // Sindle flow Dragonfly full sync fiber spawned by StartAsDflyFlow. - void FullSyncDflyFb(std::unique_ptr io_buf, std::string eof_token); + // Transition into stable state mode as dfly flow. + std::error_code StartStableSyncFlow(); + + // Single flow full sync fiber spawned by StartFullSyncFlow. + void FullSyncDflyFb(SyncBlock* block, std::string eof_token); + + // Single flow stable state sync fiber spawned by StartStableSyncFlow. + void StableSyncDflyFb(); private: /* Utility */ struct PSyncResponse { @@ -142,6 +160,7 @@ class Replica { ::boost::fibers::fiber sync_fb_; std::vector> shard_flows_; + std::unique_ptr leftover_buf_; std::unique_ptr parser_; facade::RespVec resp_args_; facade::CmdArgVec cmd_str_args_; diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index 7ce91ba00..66047a1bd 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -126,6 +126,9 @@ void SliceSnapshot::SerializeEntriesFb() { mu_.lock(); mu_.unlock(); + CHECK(!rdb_serializer_->SendFullSyncCut()); + FlushSfile(true); + VLOG(1) << "Exit SnapshotSerializer (serialized/side_saved/cbcalls): " << serialized_ << "/" << side_saved_ << "/" << savecb_calls_; } @@ -250,6 +253,7 @@ void SliceSnapshot::OnJournalEntry(const journal::Entry& entry) { CHECK(!ec && !sfile.val.empty()); DbRecord rec = GetDbRecord(entry.db_ind, std::move(sfile.val), 1); + dest_->Push(std::move(rec)); } } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index eefd5ecf8..6c52e38e3 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -577,7 +577,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) { // The problematic flow is as follows: ScheduleUniqueShard schedules into TxQueue and then // call PollExecute that runs the callback which calls DecreaseRunCnt. // As a result WaitForShardCallbacks below is unblocked. - auto schedule_cb = [&] { + auto schedule_cb = [this] { bool run_eager = ScheduleUniqueShard(EngineShard::tlocal()); if (run_eager) { // it's important to DecreaseRunCnt only for run_eager and after run_eager was assigned. @@ -833,7 +833,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { // Fast path - for uncontended keys, just run the callback. // That applies for single key operations like set, get, lpush etc. - if (shard->db_slice().CheckLock(mode, lock_args)) { + if (shard->db_slice().CheckLock(mode, lock_args) && shard->shard_lock()->Check(mode)) { RunQuickie(shard); return true; } @@ -845,7 +845,6 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) { DCHECK_EQ(0, sd.local_mask & KEYLOCK_ACQUIRED); bool lock_acquired = shard->db_slice().Acquire(mode, lock_args); sd.local_mask |= KEYLOCK_ACQUIRED; - DCHECK(!lock_acquired); // Because CheckLock above failed. DVLOG(1) << "Rescheduling into TxQueue " << DebugId(); diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 8c55ce7c8..f53e7e704 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -2,8 +2,6 @@ import pytest import asyncio import aioredis -import redis -import time from .utility import * @@ -11,65 +9,22 @@ from .utility import * BASE_PORT = 1111 """ -Test simple full sync on one replica without altering data during replication. +Test full replication pipeline. Test full sync with streaming changes and stable state streaming. """ -# (threads_master, threads_replica, n entries) -simple_full_sync_cases = [ - (2, 2, 100), - (8, 2, 500), - (2, 8, 500), - (6, 4, 500) -] - - -@pytest.mark.parametrize("t_master, t_replica, n_keys", simple_full_sync_cases) -def test_simple_full_sync(df_local_factory, t_master, t_replica, n_keys): - master = df_local_factory.create(port=1111, proactor_threads=t_master) - replica = df_local_factory.create(port=1112, proactor_threads=t_replica) - - # Start master and fill with test data - master.start() - c_master = redis.Redis(port=master.port) - batch_fill_data(c_master, gen_test_data(n_keys)) - - # Start replica and run REPLICAOF - replica.start() - c_replica = redis.Redis(port=replica.port) - c_replica.replicaof("localhost", str(master.port)) - - # Check replica received test data - wait_available(c_replica) - batch_check_data(c_replica, gen_test_data(n_keys)) - - # Stop replication manually - c_replica.replicaof("NO", "ONE") - assert c_replica.set("writeable", "true") - - # Check test data persisted - batch_check_data(c_replica, gen_test_data(n_keys)) - - -""" -Test simple full sync on multiple replicas without altering data during replication. -The replicas start running in parallel. -""" - -# (threads_master, threads_replicas, n entries) -simple_full_sync_multi_cases = [ - (4, [3, 2], 500), - (8, [6, 5, 4], 500), - (8, [2] * 5, 100), - (4, [1] * 20, 500) +replication_cases = [ + (8, [8], 20000, 5000), + (8, [8], 10000, 10000), + (8, [2, 2, 2, 2], 20000, 5000), + (6, [6, 6, 6], 30000, 15000), + (4, [1] * 12, 10000, 4000), ] @pytest.mark.asyncio -@pytest.mark.parametrize("t_master, t_replicas, n_keys", simple_full_sync_multi_cases) -async def test_simple_full_sync_multi(df_local_factory, t_master, t_replicas, n_keys): - def data_gen(): return gen_test_data(n_keys) - - master = df_local_factory.create(port=BASE_PORT, proactor_threads=t_master) +@pytest.mark.parametrize("t_master, t_replicas, n_keys, n_stream_keys", replication_cases) +async def test_replication_all(df_local_factory, t_master, t_replicas, n_keys, n_stream_keys): + master = df_local_factory.create(port=1111, proactor_threads=t_master) replicas = [ df_local_factory.create(port=BASE_PORT+i+1, proactor_threads=t) for i, t in enumerate(t_replicas) @@ -77,34 +32,51 @@ async def test_simple_full_sync_multi(df_local_factory, t_master, t_replicas, n_ # Start master and fill with test data master.start() - c_master = aioredis.Redis(port=master.port, single_connection_client=True) - await batch_fill_data_async(c_master, data_gen()) + c_master = aioredis.Redis(port=master.port) + await batch_fill_data_async(c_master, gen_test_data(n_keys, seed=1)) - # Start replica tasks in parallel - tasks = [ - asyncio.create_task(run_sfs_replica( - replica, master, data_gen), name="replica-"+str(replica.port)) - for replica in replicas - ] + # Start replicas + for replica in replicas: + replica.start() - for task in tasks: - assert await task + c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas] - await c_master.connection_pool.disconnect() + async def stream_data(): + """ Stream data during stable state replication phase and afterwards """ + gen = gen_test_data(n_stream_keys, seed=2) + for chunk in grouper(3, gen): + await c_master.mset({k: v for k, v in chunk}) + async def run_replication(c_replica): + await c_replica.execute_command("REPLICAOF localhost " + str(master.port)) -async def run_sfs_replica(replica, master, data_gen): - replica.start() - c_replica = aioredis.Redis( - port=replica.port, single_connection_client=None) + async def check_replication(c_replica): + """ Check that static and streamed data arrived """ + await wait_available_async(c_replica) + # Check range [n_stream_keys, n_keys] is of seed 1 + await batch_check_data_async(c_replica, gen_test_data(n_keys, start=n_stream_keys, seed=1)) + # Check range [0, n_stream_keys] is of seed 2 + await asyncio.sleep(0.2) + await batch_check_data_async(c_replica, gen_test_data(n_stream_keys, seed=2)) - await c_replica.execute_command("REPLICAOF localhost " + str(master.port)) + # Start streaming data and run REPLICAOF in parallel + stream_fut = asyncio.create_task(stream_data()) + await asyncio.gather(*(asyncio.create_task(run_replication(c)) + for c in c_replicas)) - await wait_available_async(c_replica) - await batch_check_data_async(c_replica, data_gen()) + assert not stream_fut.done( + ), "Weak testcase. Increase number of streamed keys to surpass full sync" + await stream_fut - await c_replica.connection_pool.disconnect() - return True + # Check full sync results + await asyncio.gather(*(check_replication(c) for c in c_replicas)) + + # Check stable state streaming + await batch_fill_data_async(c_master, gen_test_data(n_keys, seed=3)) + + await asyncio.sleep(0.5) + await asyncio.gather(*(batch_check_data_async(c, gen_test_data(n_keys, seed=3)) + for c in c_replicas)) """ diff --git a/tests/dragonfly/utility.py b/tests/dragonfly/utility.py index 57b0b142f..25d569e5d 100644 --- a/tests/dragonfly/utility.py +++ b/tests/dragonfly/utility.py @@ -18,9 +18,9 @@ def grouper(n, iterable): BATCH_SIZE = 100 -def gen_test_data(n): - for i in range(n): - yield "k-"+str(i), "v-"+str(i) +def gen_test_data(n, start=0, seed=None): + for i in range(start, n): + yield "k-"+str(i), "v-"+str(i) + ("-"+str(seed) if seed else "") def batch_fill_data(client: redis.Redis, gen): @@ -44,15 +44,15 @@ def as_str_val(v) -> str: def batch_check_data(client: redis.Redis, gen): for group in grouper(BATCH_SIZE, gen): - vals = client.mget(k for k, _ in group) - assert all(as_str_val(vals[i]) == v for i, (_, v) in enumerate(group)) - + vals = [as_str_val(v) for v in client.mget(k for k, _ in group)] + gvals = [v for _, v in group] + assert vals == gvals async def batch_check_data_async(client: aioredis.Redis, gen): for group in grouper(BATCH_SIZE, gen): - vals = await client.mget(k for k, _ in group) - assert all(as_str_val(vals[i]) == v for i, (_, v) in enumerate(group)) - + vals = [as_str_val(v) for v in await client.mget(k for k, _ in group)] + gvals = [v for _, v in group] + assert vals == gvals def wait_available(client: redis.Redis): its = 0