feat(server): Switch to stable state replication (#473)

* feat(server): Switch to stable state replication

Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
Vladislav 2022-11-17 21:41:33 +03:00 committed by GitHub
parent 96989b2124
commit 96c9332297
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
18 changed files with 401 additions and 163 deletions

View file

@ -1,4 +1,5 @@
default_stages: [commit] default_stages: [commit]
exclude: 'src\/redis\/.*'
repos: repos:
- repo: local - repo: local
hooks: hooks:

View file

@ -103,6 +103,8 @@
/* Test if a type is an object type. */ /* Test if a type is an object type. */
#define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18)) #define rdbIsObjectType(t) ((t >= 0 && t <= 7) || (t >= 9 && t <= 18))
/* Range 200-240 is used by Dragonfly specific opcodes */
/* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */ /* Special RDB opcodes (saved/loaded with rdbSaveType/rdbLoadType). */
#define RDB_OPCODE_FUNCTION 246 /* engine data */ #define RDB_OPCODE_FUNCTION 246 /* engine data */
#define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */ #define RDB_OPCODE_MODULE_AUX 247 /* Module auxiliary data. */

View file

@ -542,7 +542,7 @@ uint32_t DbSlice::GetMCFlag(DbIndex db_ind, const PrimeKey& key) const {
PrimeIterator DbSlice::AddNew(const Context& cntx, string_view key, PrimeValue obj, PrimeIterator DbSlice::AddNew(const Context& cntx, string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false) { uint64_t expire_at_ms) noexcept(false) {
auto [it, added] = AddEntry(cntx, key, std::move(obj), expire_at_ms); auto [it, added] = AddOrSkip(cntx, key, std::move(obj), expire_at_ms);
CHECK(added); CHECK(added);
return it; return it;
@ -571,12 +571,14 @@ OpStatus DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime_it,
return OpStatus::OK; return OpStatus::OK;
} }
pair<PrimeIterator, bool> DbSlice::AddEntry(const Context& cntx, string_view key, PrimeValue obj, std::pair<PrimeIterator, bool> DbSlice::AddOrUpdateInternal(const Context& cntx,
uint64_t expire_at_ms) noexcept(false) { std::string_view key, PrimeValue obj,
uint64_t expire_at_ms,
bool force_update) noexcept(false) {
DCHECK(!obj.IsRef()); DCHECK(!obj.IsRef());
pair<PrimeIterator, bool> res = AddOrFind(cntx, key); pair<PrimeIterator, bool> res = AddOrFind(cntx, key);
if (!res.second) // have not inserted. if (!res.second && !force_update) // have not inserted.
return res; return res;
auto& db = *db_arr_[cntx.db_index]; auto& db = *db_arr_[cntx.db_index];
@ -588,12 +590,26 @@ pair<PrimeIterator, bool> DbSlice::AddEntry(const Context& cntx, string_view key
if (expire_at_ms) { if (expire_at_ms) {
it->second.SetExpire(true); it->second.SetExpire(true);
uint64_t delta = expire_at_ms - expire_base_[0]; uint64_t delta = expire_at_ms - expire_base_[0];
CHECK(db.expire.Insert(it->first.AsRef(), ExpirePeriod(delta)).second); auto [eit, inserted] = db.expire.Insert(it->first.AsRef(), ExpirePeriod(delta));
CHECK(inserted || force_update);
if (!inserted) {
eit->second = ExpirePeriod(delta);
}
} }
return res; return res;
} }
pair<PrimeIterator, bool> DbSlice::AddOrUpdate(const Context& cntx, string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false) {
return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, true);
}
pair<PrimeIterator, bool> DbSlice::AddOrSkip(const Context& cntx, string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false) {
return AddOrUpdateInternal(cntx, key, std::move(obj), expire_at_ms, false);
}
size_t DbSlice::DbSize(DbIndex db_ind) const { size_t DbSlice::DbSize(DbIndex db_ind) const {
DCHECK_LT(db_ind, db_array_size()); DCHECK_LT(db_ind, db_array_size());

View file

@ -150,11 +150,16 @@ class DbSlice {
std::tuple<PrimeIterator, ExpireIterator, bool> AddOrFind2(const Context& cntx, std::tuple<PrimeIterator, ExpireIterator, bool> AddOrFind2(const Context& cntx,
std::string_view key) noexcept(false); std::string_view key) noexcept(false);
// Same as AddOrSkip, but overwrites in case entry exists.
// Returns second=true if insertion took place.
std::pair<PrimeIterator, bool> AddOrUpdate(const Context& cntx, std::string_view key,
PrimeValue obj, uint64_t expire_at_ms) noexcept(false);
// Returns second=true if insertion took place, false otherwise. // Returns second=true if insertion took place, false otherwise.
// expire_at_ms equal to 0 - means no expiry. // expire_at_ms equal to 0 - means no expiry.
// throws: bad_alloc is insertion could not happen due to out of memory. // throws: bad_alloc is insertion could not happen due to out of memory.
std::pair<PrimeIterator, bool> AddEntry(const Context& cntx, std::string_view key, PrimeValue obj, std::pair<PrimeIterator, bool> AddOrSkip(const Context& cntx, std::string_view key,
uint64_t expire_at_ms) noexcept(false); PrimeValue obj, uint64_t expire_at_ms) noexcept(false);
// Adds a new entry. Requires: key does not exist in this slice. // Adds a new entry. Requires: key does not exist in this slice.
// Returns the iterator to the newly added entry. // Returns the iterator to the newly added entry.
@ -285,6 +290,10 @@ class DbSlice {
void InvalidateDbWatches(DbIndex db_indx); void InvalidateDbWatches(DbIndex db_indx);
private: private:
std::pair<PrimeIterator, bool> AddOrUpdateInternal(const Context& cntx, std::string_view key,
PrimeValue obj, uint64_t expire_at_ms,
bool force_update) noexcept(false);
void CreateDb(DbIndex index); void CreateDb(DbIndex index);
size_t EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* table); size_t EvictObjects(size_t memory_to_free, PrimeIterator it, DbTable* table);

View file

@ -87,6 +87,10 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
return Sync(args, cntx); return Sync(args, cntx);
} }
if (sub_cmd == "STARTSTABLE" && args.size() == 3) {
return StartStable(args, cntx);
}
if (sub_cmd == "EXPIRE") { if (sub_cmd == "EXPIRE") {
return Expire(args, cntx); return Expire(args, cntx);
} }
@ -258,16 +262,8 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
return; return;
unique_lock lk(sync_info->mu); unique_lock lk(sync_info->mu);
if (sync_info->state != SyncState::PREPARATION) if (!CheckReplicaStateOrReply(*sync_info, SyncState::PREPARATION, rb))
return rb->SendError(kInvalidState); return;
// Check all flows are connected.
// This might happen if a flow abruptly disconnected before sending the SYNC request.
for (const FlowInfo& flow : sync_info->flows) {
if (!flow.conn) {
return rb->SendError(kInvalidState);
}
}
// Start full sync. // Start full sync.
{ {
@ -288,6 +284,38 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
return rb->SendOk(); return rb->SendOk();
} }
void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
string_view sync_id_str = ArgS(args, 2);
VLOG(1) << "Got DFLY STARTSTABLE " << sync_id_str;
auto [sync_id, sync_info] = GetSyncInfoOrReply(sync_id_str, rb);
if (!sync_id)
return;
unique_lock lk(sync_info->mu);
if (!CheckReplicaStateOrReply(*sync_info, SyncState::FULL_SYNC, rb))
return;
{
TransactionGuard tg{cntx->transaction};
AggregateStatus status;
auto cb = [this, &status, sync_info = sync_info](unsigned index, auto*) {
status = StartStableSyncInThread(&sync_info->flows[index], EngineShard::tlocal());
return OpStatus::OK;
};
shard_set->pool()->AwaitFiberOnAll(std::move(cb));
if (*status != OpStatus::OK)
return rb->SendError(kInvalidState);
}
sync_info->state = SyncState::STABLE_SYNC;
return rb->SendOk();
}
void DflyCmd::Expire(CmdArgList args, ConnectionContext* cntx) { void DflyCmd::Expire(CmdArgList args, ConnectionContext* cntx) {
RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder()); RedisReplyBuilder* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
cntx->transaction->ScheduleSingleHop([](Transaction* t, EngineShard* shard) { cntx->transaction->ScheduleSingleHop([](Transaction* t, EngineShard* shard) {
@ -304,14 +332,42 @@ OpStatus DflyCmd::StartFullSyncInThread(FlowInfo* flow, EngineShard* shard) {
SaveMode save_mode = shard == nullptr ? SaveMode::SUMMARY : SaveMode::SINGLE_SHARD; SaveMode save_mode = shard == nullptr ? SaveMode::SUMMARY : SaveMode::SINGLE_SHARD;
flow->saver.reset(new RdbSaver(flow->conn->socket(), save_mode, false)); flow->saver.reset(new RdbSaver(flow->conn->socket(), save_mode, false));
// Shard can be null for io thread.
if (shard != nullptr) { if (shard != nullptr) {
flow->saver->StartSnapshotInShard(false, shard); auto ec = sf_->journal()->OpenInThread(false, string_view());
CHECK(!ec);
flow->saver->StartSnapshotInShard(true, shard);
} }
flow->fb = ::boost::fibers::fiber(&DflyCmd::FullSyncFb, this, flow); flow->fb = ::boost::fibers::fiber(&DflyCmd::FullSyncFb, this, flow);
return OpStatus::OK; return OpStatus::OK;
} }
OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, EngineShard* shard) {
// Shard can be null for io thread.
if (shard != nullptr) {
flow->saver->StopSnapshotInShard(shard);
}
// Wait for full sync to finish.
if (flow->fb.joinable()) {
flow->fb.join();
}
if (shard != nullptr) {
flow->saver.reset();
// TODO: Add cancellation.
auto cb = sf_->journal()->RegisterOnChange([flow](const journal::Entry& je) {
// TODO: Serialize event.
ReqSerializer serializer{flow->conn->socket()};
serializer.SendCommand(absl::StrCat("SET ", je.key, " ", je.pval_ptr->ToString()));
});
}
return OpStatus::OK;
}
void DflyCmd::FullSyncFb(FlowInfo* flow) { void DflyCmd::FullSyncFb(FlowInfo* flow) {
error_code ec; error_code ec;
RdbSaver* saver = flow->saver.get(); RdbSaver* saver = flow->saver.get();
@ -328,22 +384,20 @@ void DflyCmd::FullSyncFb(FlowInfo* flow) {
return; return;
} }
if (saver->Mode() != SaveMode::SUMMARY) { // TODO: we should be able to stop earlier if requested.
// TODO: we should be able to stop earlier if requested. ec = saver->SaveBody(nullptr);
ec = saver->SaveBody(nullptr); if (ec) {
if (ec) { LOG(ERROR) << ec;
LOG(ERROR) << ec; return;
return;
}
} }
VLOG(1) << "Sending full sync EOF";
ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token)); ec = flow->conn->socket()->Write(io::Buffer(flow->eof_token));
if (ec) { if (ec) {
LOG(ERROR) << ec; LOG(ERROR) << ec;
return; return;
} }
ec = flow->conn->socket()->Shutdown(SHUT_RDWR);
} }
uint32_t DflyCmd::CreateSyncSession() { uint32_t DflyCmd::CreateSyncSession() {
@ -429,6 +483,25 @@ pair<uint32_t, shared_ptr<DflyCmd::SyncInfo>> DflyCmd::GetSyncInfoOrReply(std::s
return {sync_id, sync_it->second}; return {sync_id, sync_it->second};
} }
bool DflyCmd::CheckReplicaStateOrReply(const SyncInfo& sync_info, SyncState expected,
RedisReplyBuilder* rb) {
if (sync_info.state != expected) {
rb->SendError(kInvalidState);
return false;
}
// Check all flows are connected.
// This might happen if a flow abruptly disconnected before sending the SYNC request.
for (const FlowInfo& flow : sync_info.flows) {
if (!flow.conn) {
rb->SendError(kInvalidState);
return false;
}
}
return true;
}
void DflyCmd::BreakOnShutdown() { void DflyCmd::BreakOnShutdown() {
VLOG(1) << "BreakOnShutdown"; VLOG(1) << "BreakOnShutdown";
} }

View file

@ -31,7 +31,7 @@ class Journal;
class DflyCmd { class DflyCmd {
public: public:
enum class SyncState { PREPARATION, FULL_SYNC, CANCELLED }; enum class SyncState { PREPARATION, FULL_SYNC, STABLE_SYNC, CANCELLED };
struct FlowInfo { struct FlowInfo {
FlowInfo() = default; FlowInfo() = default;
@ -80,11 +80,14 @@ class DflyCmd {
// Register connection as flow for sync session. // Register connection as flow for sync session.
void Flow(CmdArgList args, ConnectionContext* cntx); void Flow(CmdArgList args, ConnectionContext* cntx);
// SYNC <masterid> <syncid> <flowid> // SYNC <syncid>
// Migrate connection to required flow thread. // Initiate full sync.
// Stub: will be replcaed with full sync.
void Sync(CmdArgList args, ConnectionContext* cntx); void Sync(CmdArgList args, ConnectionContext* cntx);
// STARTSTABLE <syncid>
// Switch to stable state replication.
void StartStable(CmdArgList args, ConnectionContext* cntx);
// EXPIRE // EXPIRE
// Check all keys for expiry. // Check all keys for expiry.
void Expire(CmdArgList args, ConnectionContext* cntx); void Expire(CmdArgList args, ConnectionContext* cntx);
@ -92,6 +95,9 @@ class DflyCmd {
// Start full sync in thread. Start FullSyncFb. Called for each flow. // Start full sync in thread. Start FullSyncFb. Called for each flow.
facade::OpStatus StartFullSyncInThread(FlowInfo* flow, EngineShard* shard); facade::OpStatus StartFullSyncInThread(FlowInfo* flow, EngineShard* shard);
// Start stable sync in thread. Called for each flow.
facade::OpStatus StartStableSyncInThread(FlowInfo* flow, EngineShard* shard);
// Fiber that runs full sync for each flow. // Fiber that runs full sync for each flow.
void FullSyncFb(FlowInfo* flow); void FullSyncFb(FlowInfo* flow);
@ -108,6 +114,9 @@ class DflyCmd {
std::pair<uint32_t, std::shared_ptr<SyncInfo>> GetSyncInfoOrReply(std::string_view id, std::pair<uint32_t, std::shared_ptr<SyncInfo>> GetSyncInfoOrReply(std::string_view id,
facade::RedisReplyBuilder* rb); facade::RedisReplyBuilder* rb);
bool CheckReplicaStateOrReply(const SyncInfo& si, SyncState expected,
facade::RedisReplyBuilder* rb);
ServerFamily* sf_; ServerFamily* sf_;
util::ListenerInterface* listener_; util::ListenerInterface* listener_;

View file

@ -161,7 +161,7 @@ bool RdbRestoreValue::Add(std::string_view data, std::string_view key, DbSlice&
return false; return false;
} }
DbContext context{.db_index = index, .time_now_ms = GetCurrentTimeMs()}; DbContext context{.db_index = index, .time_now_ms = GetCurrentTimeMs()};
auto [it, added] = db_slice.AddEntry(context, key, std::move(pv), item.expire_ms); auto [it, added] = db_slice.AddOrSkip(context, key, std::move(pv), item.expire_ms);
return added; return added;
} }

View file

@ -0,0 +1,12 @@
// Copyright 2022, DragonflyDB authors. All rights reserved.
// See LICENSE for licensing terms.
//
#pragma once
// Range 200-240 is used by DF extensions.
// This opcode is sent by the master Dragonfly instance to a replica
// to notify that it finished streaming static data and is ready
// to switch to the stable state replication phase.
const uint8_t RDB_OPCODE_FULLSYNC_END = 200;

View file

@ -27,6 +27,7 @@ extern "C" {
#include "server/engine_shard_set.h" #include "server/engine_shard_set.h"
#include "server/error.h" #include "server/error.h"
#include "server/hset_family.h" #include "server/hset_family.h"
#include "server/rdb_extensions.h"
#include "server/script_mgr.h" #include "server/script_mgr.h"
#include "server/server_state.h" #include "server/server_state.h"
#include "server/set_family.h" #include "server/set_family.h"
@ -1553,6 +1554,12 @@ error_code RdbLoader::Load(io::Source* src) {
break; break;
} }
if (type == RDB_OPCODE_FULLSYNC_END) {
if (full_sync_cut_cb)
full_sync_cut_cb();
continue;
}
if (type == RDB_OPCODE_SELECTDB) { if (type == RDB_OPCODE_SELECTDB) {
unsigned dbid = 0; unsigned dbid = 0;
@ -1815,8 +1822,7 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
if (item.expire_ms > 0 && db_cntx.time_now_ms >= item.expire_ms) if (item.expire_ms > 0 && db_cntx.time_now_ms >= item.expire_ms)
continue; continue;
auto [it, added] = db_slice.AddEntry(db_cntx, item.key, std::move(pv), item.expire_ms); auto [it, added] = db_slice.AddOrUpdate(db_cntx, item.key, std::move(pv), item.expire_ms);
if (!added) { if (!added) {
LOG(WARNING) << "RDB has duplicated key '" << item.key << "' in DB " << db_ind; LOG(WARNING) << "RDB has duplicated key '" << item.key << "' in DB " << db_ind;
} }

View file

@ -172,6 +172,13 @@ class RdbLoader : protected RdbLoaderBase {
return load_time_; return load_time_;
} }
// Set callback for receiving RDB_OPCODE_FULLSYNC_END.
// This opcode is used by a master instance to notify it finished streaming static data
// and is ready to switch to stable state sync.
void SetFullSyncCutCb(std::function<void()> cb) {
full_sync_cut_cb = std::move(cb);
}
private: private:
struct ObjSettings; struct ObjSettings;
std::error_code LoadKeyValPair(int type, ObjSettings* settings); std::error_code LoadKeyValPair(int type, ObjSettings* settings);
@ -194,6 +201,9 @@ class RdbLoader : protected RdbLoaderBase {
::boost::fibers::mutex mu_; ::boost::fibers::mutex mu_;
std::error_code ec_; // guarded by mu_ std::error_code ec_; // guarded by mu_
std::atomic_bool stop_early_{false}; std::atomic_bool stop_early_{false};
// Callback when receiving RDB_OPCODE_FULLSYNC_END
std::function<void()> full_sync_cut_cb;
}; };
} // namespace dfly } // namespace dfly

View file

@ -24,6 +24,7 @@ extern "C" {
#include "base/logging.h" #include "base/logging.h"
#include "server/engine_shard_set.h" #include "server/engine_shard_set.h"
#include "server/error.h" #include "server/error.h"
#include "server/rdb_extensions.h"
#include "server/snapshot.h" #include "server/snapshot.h"
#include "util/fibers/simple_channel.h" #include "util/fibers/simple_channel.h"
@ -581,6 +582,11 @@ error_code RdbSerializer::SaveStreamConsumers(streamCG* cg) {
return error_code{}; return error_code{};
} }
error_code RdbSerializer::SendFullSyncCut() {
RETURN_ON_ERR(WriteOpcode(RDB_OPCODE_FULLSYNC_END));
return FlushMem();
}
// TODO: if buf is large enough, it makes sense to write both mem_buf and buf // TODO: if buf is large enough, it makes sense to write both mem_buf and buf
// directly to sink_. // directly to sink_.
error_code RdbSerializer::WriteRaw(const io::Bytes& buf) { error_code RdbSerializer::WriteRaw(const io::Bytes& buf) {
@ -921,12 +927,15 @@ error_code RdbSaver::SaveHeader(const StringVec& lua_scripts) {
error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) { error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
RETURN_ON_ERR(impl_->serializer()->FlushMem()); RETURN_ON_ERR(impl_->serializer()->FlushMem());
VLOG(1) << "SaveBody , snapshots count: " << impl_->Size(); if (save_mode_ == SaveMode::SUMMARY) {
impl_->serializer()->SendFullSyncCut();
error_code io_error = impl_->ConsumeChannel(); } else {
if (io_error) { VLOG(1) << "SaveBody , snapshots count: " << impl_->Size();
LOG(ERROR) << "io error " << io_error; error_code io_error = impl_->ConsumeChannel();
return io_error; if (io_error) {
LOG(ERROR) << "io error " << io_error;
return io_error;
}
} }
RETURN_ON_ERR(SaveEpilog()); RETURN_ON_ERR(SaveEpilog());

View file

@ -85,7 +85,9 @@ class RdbSaver {
// freq_map can optionally be null. // freq_map can optionally be null.
std::error_code SaveBody(RdbTypeFreqMap* freq_map); std::error_code SaveBody(RdbTypeFreqMap* freq_map);
SaveMode Mode() const { return save_mode_; } SaveMode Mode() const {
return save_mode_;
}
private: private:
class Impl; class Impl;
@ -140,6 +142,8 @@ class RdbSerializer {
// for the dump command - thus it is public function // for the dump command - thus it is public function
std::error_code SaveValue(const PrimeValue& pv); std::error_code SaveValue(const PrimeValue& pv);
std::error_code SendFullSyncCut();
private: private:
std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len); std::error_code SaveLzfBlob(const ::io::Bytes& src, size_t uncompressed_len);
std::error_code SaveObject(const PrimeValue& pv); std::error_code SaveObject(const PrimeValue& pv);

View file

@ -204,13 +204,13 @@ void Replica::MainReplicationFb() {
this_fiber::sleep_for(50ms); this_fiber::sleep_for(50ms);
} }
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
if (ec) { if (ec) {
LOG(WARNING) << "Error syncing " << ec << " " << ec.message(); LOG(WARNING) << "Error syncing " << ec << " " << ec.message();
state_mask_ &= R_ENABLED; // reset all flags besides R_ENABLED state_mask_ &= R_ENABLED; // reset all flags besides R_ENABLED
continue; continue;
} }
service_.SwitchState(GlobalState::LOADING, GlobalState::ACTIVE);
VLOG(1) << "Replica greet ok"; VLOG(1) << "Replica greet ok";
} }
@ -422,17 +422,18 @@ error_code Replica::InitiateDflySync() {
shard_flows_[i].reset(new Replica(master_context_, i, &service_)); shard_flows_[i].reset(new Replica(master_context_, i, &service_));
} }
SyncBlock sb{num_df_flows_};
AggregateError ec; AggregateError ec;
auto partition = Partition(num_df_flows_); auto partition = Partition(num_df_flows_);
shard_set->pool()->AwaitFiberOnAll([&](unsigned index, auto*) { shard_set->pool()->AwaitFiberOnAll([&](unsigned index, auto*) {
for (auto id : partition[index]) { for (auto id : partition[index]) {
if (ec = shard_flows_[id]->StartAsDflyFlow()) if (ec = shard_flows_[id]->StartFullSyncFlow(&sb))
break; break;
} }
}); });
if (ec) RETURN_ON_ERR(*ec);
return *ec;
ReqSerializer serializer{sock_.get()}; ReqSerializer serializer{sock_.get()};
@ -447,8 +448,12 @@ error_code Replica::InitiateDflySync() {
return make_error_code(errc::bad_message); return make_error_code(errc::bad_message);
} }
for (auto& flow : shard_flows_) // Wait for all flows to receive full sync cut.
flow->sync_fb_.join(); {
VLOG(1) << "Blocking before full sync cut";
std::unique_lock lk(sb.mu_);
sb.cv_.wait(lk, [&]() { return sb.flows_left == 0; });
}
LOG(INFO) << "Full sync finished"; LOG(INFO) << "Full sync finished";
state_mask_ |= R_SYNC_OK; state_mask_ |= R_SYNC_OK;
@ -503,20 +508,43 @@ error_code Replica::ConsumeRedisStream() {
} }
error_code Replica::ConsumeDflyStream() { error_code Replica::ConsumeDflyStream() {
ReqSerializer serializer{sock_.get()}; // Request master to transition to stable sync.
// TBD {
serializer.SendCommand("QUIT"); ReqSerializer serializer{sock_.get()};
state_mask_ &= ~R_ENABLED; // disable further - TODO: not finished. serializer.SendCommand(StrCat("DFLY STARTSTABLE ", master_context_.dfly_session_id));
RETURN_ON_ERR(serializer.ec()); RETURN_ON_ERR(serializer.ec());
}
base::IoBuf io_buf{128}; // Wait for all flows to finish full sync.
for (auto& sub_repl : shard_flows_)
sub_repl->sync_fb_.join();
RETURN_ON_ERR(Recv(sock_.get(), &io_buf)); AggregateError all_ec;
vector<vector<unsigned>> partition = Partition(num_df_flows_);
shard_set->pool()->AwaitFiberOnAll([&](unsigned index, auto*) {
const auto& local_ids = partition[index];
for (unsigned id : local_ids) {
all_ec = shard_flows_[id]->StartStableSyncFlow();
if (all_ec)
break;
}
});
RETURN_ON_ERR(*all_ec);
base::IoBuf io_buf(16_KB);
std::error_code ec;
while (!ec) {
io::MutableBytes buf = io_buf.AppendBuffer();
io::Result<size_t> size_res = sock_->Recv(buf);
if (!size_res)
return size_res.error();
}
return error_code{}; return error_code{};
} }
error_code Replica::StartAsDflyFlow() { error_code Replica::StartFullSyncFlow(SyncBlock* sb) {
CHECK(!sock_); CHECK(!sock_);
DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty()); DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty());
@ -536,12 +564,12 @@ error_code Replica::StartAsDflyFlow() {
parser_.reset(new RedisParser{false}); // client mode parser_.reset(new RedisParser{false}); // client mode
std::unique_ptr<base::IoBuf> io_buf{new base::IoBuf(128)}; leftover_buf_.reset(new base::IoBuf(128));
unsigned consumed = 0; unsigned consumed = 0;
RETURN_ON_ERR(ReadRespReply(io_buf.get(), &consumed)); // uses parser_ RETURN_ON_ERR(ReadRespReply(leftover_buf_.get(), &consumed)); // uses parser_
if (!CheckRespFirstTypes({RespExpr::STRING, RespExpr::STRING})) { if (!CheckRespFirstTypes({RespExpr::STRING, RespExpr::STRING})) {
LOG(ERROR) << "Bad FLOW response " << ToSV(io_buf->InputBuffer()); LOG(ERROR) << "Bad FLOW response " << ToSV(leftover_buf_->InputBuffer());
return make_error_code(errc::bad_message); return make_error_code(errc::bad_message);
} }
@ -550,40 +578,105 @@ error_code Replica::StartAsDflyFlow() {
if (flow_directive == "FULL") { if (flow_directive == "FULL") {
eof_token = ToSV(resp_args_[1].GetBuf()); eof_token = ToSV(resp_args_[1].GetBuf());
} else { } else {
LOG(ERROR) << "Bad FLOW response " << ToSV(io_buf->InputBuffer()); LOG(ERROR) << "Bad FLOW response " << ToSV(leftover_buf_->InputBuffer());
} }
io_buf->ConsumeInput(consumed); leftover_buf_->ConsumeInput(consumed);
state_mask_ = R_ENABLED | R_TCP_CONNECTED; state_mask_ = R_ENABLED | R_TCP_CONNECTED;
// We can not discard io_buf because it may contain data // We can not discard io_buf because it may contain data
// besides the response we parsed. Therefore we pass it further to ReplicateDFFb. // besides the response we parsed. Therefore we pass it further to ReplicateDFFb.
sync_fb_ = sync_fb_ = ::boost::fibers::fiber(&Replica::FullSyncDflyFb, this, sb, move(eof_token));
::boost::fibers::fiber(&Replica::FullSyncDflyFb, this, std::move(io_buf), move(eof_token));
return error_code{}; return error_code{};
} }
void Replica::FullSyncDflyFb(unique_ptr<base::IoBuf> io_buf, string eof_token) { error_code Replica::StartStableSyncFlow() {
DCHECK(!master_context_.master_repl_id.empty() && !master_context_.dfly_session_id.empty());
ProactorBase* mythread = ProactorBase::me();
CHECK(mythread);
CHECK(sock_->IsOpen());
// sock_.reset(mythread->CreateSocket());
// RETURN_ON_ERR(sock_->Connect(master_context_.master_ep));
sync_fb_ = ::boost::fibers::fiber(&Replica::StableSyncDflyFb, this);
return std::error_code{};
}
void Replica::FullSyncDflyFb(SyncBlock* sb, string eof_token) {
DCHECK(leftover_buf_);
SocketSource ss{sock_.get()}; SocketSource ss{sock_.get()};
io::PrefixSource ps{io_buf->InputBuffer(), &ss}; io::PrefixSource ps{leftover_buf_->InputBuffer(), &ss};
RdbLoader loader(NULL); RdbLoader loader(NULL);
loader.SetFullSyncCutCb([this, sb, ran = false]() mutable {
if (!ran) {
std::unique_lock lk(sb->mu_);
sb->flows_left--;
ran = true;
}
sb->cv_.notify_all();
});
loader.Load(&ps); loader.Load(&ps);
// Try finding eof token.
io::PrefixSource chained_tail{loader.Leftover(), &ps};
if (!eof_token.empty()) { if (!eof_token.empty()) {
unique_ptr<uint8_t[]> buf(new uint8_t[eof_token.size()]); unique_ptr<uint8_t[]> buf{new uint8_t[eof_token.size()]};
// pass leftover data from the loader.
io::PrefixSource chained(loader.Leftover(), &ps); io::Result<size_t> res =
VLOG(1) << "Before reading from chained stream"; chained_tail.ReadAtLeast(io::MutableBytes{buf.get(), eof_token.size()}, eof_token.size());
io::Result<size_t> eof_res = chained.Read(io::MutableBytes{buf.get(), eof_token.size()});
if (!eof_res || *eof_res != eof_token.size()) { if (!res || *res != eof_token.size()) {
LOG(ERROR) << "Error finding eof token in the stream"; LOG(ERROR) << "Error finding eof token in the stream";
} }
// TODO - to compare tokens
} }
VLOG(1) << "ReplicateDFFb finished after reading " << loader.bytes_read() << " bytes";
// Keep loader leftover.
io::Bytes unused = chained_tail.unused_prefix();
if (unused.size() > 0) {
leftover_buf_.reset(new base::IoBuf{unused.size()});
auto mut_bytes = leftover_buf_->AppendBuffer();
memcpy(mut_bytes.data(), unused.data(), unused.size());
leftover_buf_->CommitWrite(unused.size());
} else {
leftover_buf_.reset();
}
VLOG(1) << "FullSyncDflyFb finished after reading " << loader.bytes_read() << " bytes";
}
void Replica::StableSyncDflyFb() {
base::IoBuf io_buf(16_KB);
parser_.reset(new RedisParser);
// Check leftover from stable state.
if (leftover_buf_ && leftover_buf_->InputLen() > 0) {
size_t len = leftover_buf_->InputLen();
leftover_buf_->ReadAndConsume(len, io_buf.AppendBuffer().data());
io_buf.CommitWrite(len);
leftover_buf_.reset();
}
error_code ec;
string ack_cmd;
while (!ec) {
io::MutableBytes buf = io_buf.AppendBuffer();
io::Result<size_t> size_res = sock_->Recv(buf);
if (!size_res)
return;
last_io_time_ = sock_->proactor()->GetMonotonicTimeNs();
io_buf.CommitWrite(*size_res);
repl_offs_ += *size_res;
ec = ParseAndExecute(&io_buf);
}
return;
} }
error_code Replica::ReadRespReply(base::IoBuf* io_buf, uint32_t* consumed) { error_code Replica::ReadRespReply(base::IoBuf* io_buf, uint32_t* consumed) {

View file

@ -3,7 +3,9 @@
// //
#pragma once #pragma once
#include <boost/fiber/condition_variable.hpp>
#include <boost/fiber/fiber.hpp> #include <boost/fiber/fiber.hpp>
#include <boost/fiber/mutex.hpp>
#include <variant> #include <variant>
#include "base/io_buf.h" #include "base/io_buf.h"
@ -44,6 +46,16 @@ class Replica {
R_SYNC_OK = 0x10, R_SYNC_OK = 0x10,
}; };
// A generic barrier that is used for waiting for
// flow fibers to become ready for the stable state switch.
struct SyncBlock {
SyncBlock(unsigned flows) : flows_left{flows} {
}
unsigned flows_left;
::boost::fibers::mutex mu_;
::boost::fibers::condition_variable cv_;
};
public: public:
Replica(std::string master_host, uint16_t port, Service* se); Replica(std::string master_host, uint16_t port, Service* se);
~Replica(); ~Replica();
@ -75,10 +87,16 @@ class Replica {
Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* service); Replica(const MasterContext& context, uint32_t dfly_flow_id, Service* service);
// Start replica initialized as dfly flow. // Start replica initialized as dfly flow.
std::error_code StartAsDflyFlow(); std::error_code StartFullSyncFlow(SyncBlock* block);
// Sindle flow Dragonfly full sync fiber spawned by StartAsDflyFlow. // Transition into stable state mode as dfly flow.
void FullSyncDflyFb(std::unique_ptr<base::IoBuf> io_buf, std::string eof_token); std::error_code StartStableSyncFlow();
// Single flow full sync fiber spawned by StartFullSyncFlow.
void FullSyncDflyFb(SyncBlock* block, std::string eof_token);
// Single flow stable state sync fiber spawned by StartStableSyncFlow.
void StableSyncDflyFb();
private: /* Utility */ private: /* Utility */
struct PSyncResponse { struct PSyncResponse {
@ -142,6 +160,7 @@ class Replica {
::boost::fibers::fiber sync_fb_; ::boost::fibers::fiber sync_fb_;
std::vector<std::unique_ptr<Replica>> shard_flows_; std::vector<std::unique_ptr<Replica>> shard_flows_;
std::unique_ptr<base::IoBuf> leftover_buf_;
std::unique_ptr<facade::RedisParser> parser_; std::unique_ptr<facade::RedisParser> parser_;
facade::RespVec resp_args_; facade::RespVec resp_args_;
facade::CmdArgVec cmd_str_args_; facade::CmdArgVec cmd_str_args_;

View file

@ -126,6 +126,9 @@ void SliceSnapshot::SerializeEntriesFb() {
mu_.lock(); mu_.lock();
mu_.unlock(); mu_.unlock();
CHECK(!rdb_serializer_->SendFullSyncCut());
FlushSfile(true);
VLOG(1) << "Exit SnapshotSerializer (serialized/side_saved/cbcalls): " << serialized_ << "/" VLOG(1) << "Exit SnapshotSerializer (serialized/side_saved/cbcalls): " << serialized_ << "/"
<< side_saved_ << "/" << savecb_calls_; << side_saved_ << "/" << savecb_calls_;
} }
@ -250,6 +253,7 @@ void SliceSnapshot::OnJournalEntry(const journal::Entry& entry) {
CHECK(!ec && !sfile.val.empty()); CHECK(!ec && !sfile.val.empty());
DbRecord rec = GetDbRecord(entry.db_ind, std::move(sfile.val), 1); DbRecord rec = GetDbRecord(entry.db_ind, std::move(sfile.val), 1);
dest_->Push(std::move(rec)); dest_->Push(std::move(rec));
} }
} }

View file

@ -577,7 +577,7 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
// The problematic flow is as follows: ScheduleUniqueShard schedules into TxQueue and then // The problematic flow is as follows: ScheduleUniqueShard schedules into TxQueue and then
// call PollExecute that runs the callback which calls DecreaseRunCnt. // call PollExecute that runs the callback which calls DecreaseRunCnt.
// As a result WaitForShardCallbacks below is unblocked. // As a result WaitForShardCallbacks below is unblocked.
auto schedule_cb = [&] { auto schedule_cb = [this] {
bool run_eager = ScheduleUniqueShard(EngineShard::tlocal()); bool run_eager = ScheduleUniqueShard(EngineShard::tlocal());
if (run_eager) { if (run_eager) {
// it's important to DecreaseRunCnt only for run_eager and after run_eager was assigned. // it's important to DecreaseRunCnt only for run_eager and after run_eager was assigned.
@ -833,7 +833,7 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
// Fast path - for uncontended keys, just run the callback. // Fast path - for uncontended keys, just run the callback.
// That applies for single key operations like set, get, lpush etc. // That applies for single key operations like set, get, lpush etc.
if (shard->db_slice().CheckLock(mode, lock_args)) { if (shard->db_slice().CheckLock(mode, lock_args) && shard->shard_lock()->Check(mode)) {
RunQuickie(shard); RunQuickie(shard);
return true; return true;
} }
@ -845,7 +845,6 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
DCHECK_EQ(0, sd.local_mask & KEYLOCK_ACQUIRED); DCHECK_EQ(0, sd.local_mask & KEYLOCK_ACQUIRED);
bool lock_acquired = shard->db_slice().Acquire(mode, lock_args); bool lock_acquired = shard->db_slice().Acquire(mode, lock_args);
sd.local_mask |= KEYLOCK_ACQUIRED; sd.local_mask |= KEYLOCK_ACQUIRED;
DCHECK(!lock_acquired); // Because CheckLock above failed.
DVLOG(1) << "Rescheduling into TxQueue " << DebugId(); DVLOG(1) << "Rescheduling into TxQueue " << DebugId();

View file

@ -2,8 +2,6 @@
import pytest import pytest
import asyncio import asyncio
import aioredis import aioredis
import redis
import time
from .utility import * from .utility import *
@ -11,65 +9,22 @@ from .utility import *
BASE_PORT = 1111 BASE_PORT = 1111
""" """
Test simple full sync on one replica without altering data during replication. Test full replication pipeline. Test full sync with streaming changes and stable state streaming.
""" """
# (threads_master, threads_replica, n entries) replication_cases = [
simple_full_sync_cases = [ (8, [8], 20000, 5000),
(2, 2, 100), (8, [8], 10000, 10000),
(8, 2, 500), (8, [2, 2, 2, 2], 20000, 5000),
(2, 8, 500), (6, [6, 6, 6], 30000, 15000),
(6, 4, 500) (4, [1] * 12, 10000, 4000),
]
@pytest.mark.parametrize("t_master, t_replica, n_keys", simple_full_sync_cases)
def test_simple_full_sync(df_local_factory, t_master, t_replica, n_keys):
master = df_local_factory.create(port=1111, proactor_threads=t_master)
replica = df_local_factory.create(port=1112, proactor_threads=t_replica)
# Start master and fill with test data
master.start()
c_master = redis.Redis(port=master.port)
batch_fill_data(c_master, gen_test_data(n_keys))
# Start replica and run REPLICAOF
replica.start()
c_replica = redis.Redis(port=replica.port)
c_replica.replicaof("localhost", str(master.port))
# Check replica received test data
wait_available(c_replica)
batch_check_data(c_replica, gen_test_data(n_keys))
# Stop replication manually
c_replica.replicaof("NO", "ONE")
assert c_replica.set("writeable", "true")
# Check test data persisted
batch_check_data(c_replica, gen_test_data(n_keys))
"""
Test simple full sync on multiple replicas without altering data during replication.
The replicas start running in parallel.
"""
# (threads_master, threads_replicas, n entries)
simple_full_sync_multi_cases = [
(4, [3, 2], 500),
(8, [6, 5, 4], 500),
(8, [2] * 5, 100),
(4, [1] * 20, 500)
] ]
@pytest.mark.asyncio @pytest.mark.asyncio
@pytest.mark.parametrize("t_master, t_replicas, n_keys", simple_full_sync_multi_cases) @pytest.mark.parametrize("t_master, t_replicas, n_keys, n_stream_keys", replication_cases)
async def test_simple_full_sync_multi(df_local_factory, t_master, t_replicas, n_keys): async def test_replication_all(df_local_factory, t_master, t_replicas, n_keys, n_stream_keys):
def data_gen(): return gen_test_data(n_keys) master = df_local_factory.create(port=1111, proactor_threads=t_master)
master = df_local_factory.create(port=BASE_PORT, proactor_threads=t_master)
replicas = [ replicas = [
df_local_factory.create(port=BASE_PORT+i+1, proactor_threads=t) df_local_factory.create(port=BASE_PORT+i+1, proactor_threads=t)
for i, t in enumerate(t_replicas) for i, t in enumerate(t_replicas)
@ -77,34 +32,51 @@ async def test_simple_full_sync_multi(df_local_factory, t_master, t_replicas, n_
# Start master and fill with test data # Start master and fill with test data
master.start() master.start()
c_master = aioredis.Redis(port=master.port, single_connection_client=True) c_master = aioredis.Redis(port=master.port)
await batch_fill_data_async(c_master, data_gen()) await batch_fill_data_async(c_master, gen_test_data(n_keys, seed=1))
# Start replica tasks in parallel # Start replicas
tasks = [ for replica in replicas:
asyncio.create_task(run_sfs_replica( replica.start()
replica, master, data_gen), name="replica-"+str(replica.port))
for replica in replicas
]
for task in tasks: c_replicas = [aioredis.Redis(port=replica.port) for replica in replicas]
assert await task
await c_master.connection_pool.disconnect() async def stream_data():
""" Stream data during stable state replication phase and afterwards """
gen = gen_test_data(n_stream_keys, seed=2)
for chunk in grouper(3, gen):
await c_master.mset({k: v for k, v in chunk})
async def run_replication(c_replica):
await c_replica.execute_command("REPLICAOF localhost " + str(master.port))
async def run_sfs_replica(replica, master, data_gen): async def check_replication(c_replica):
replica.start() """ Check that static and streamed data arrived """
c_replica = aioredis.Redis( await wait_available_async(c_replica)
port=replica.port, single_connection_client=None) # Check range [n_stream_keys, n_keys] is of seed 1
await batch_check_data_async(c_replica, gen_test_data(n_keys, start=n_stream_keys, seed=1))
# Check range [0, n_stream_keys] is of seed 2
await asyncio.sleep(0.2)
await batch_check_data_async(c_replica, gen_test_data(n_stream_keys, seed=2))
await c_replica.execute_command("REPLICAOF localhost " + str(master.port)) # Start streaming data and run REPLICAOF in parallel
stream_fut = asyncio.create_task(stream_data())
await asyncio.gather(*(asyncio.create_task(run_replication(c))
for c in c_replicas))
await wait_available_async(c_replica) assert not stream_fut.done(
await batch_check_data_async(c_replica, data_gen()) ), "Weak testcase. Increase number of streamed keys to surpass full sync"
await stream_fut
await c_replica.connection_pool.disconnect() # Check full sync results
return True await asyncio.gather(*(check_replication(c) for c in c_replicas))
# Check stable state streaming
await batch_fill_data_async(c_master, gen_test_data(n_keys, seed=3))
await asyncio.sleep(0.5)
await asyncio.gather(*(batch_check_data_async(c, gen_test_data(n_keys, seed=3))
for c in c_replicas))
""" """

View file

@ -18,9 +18,9 @@ def grouper(n, iterable):
BATCH_SIZE = 100 BATCH_SIZE = 100
def gen_test_data(n): def gen_test_data(n, start=0, seed=None):
for i in range(n): for i in range(start, n):
yield "k-"+str(i), "v-"+str(i) yield "k-"+str(i), "v-"+str(i) + ("-"+str(seed) if seed else "")
def batch_fill_data(client: redis.Redis, gen): def batch_fill_data(client: redis.Redis, gen):
@ -44,15 +44,15 @@ def as_str_val(v) -> str:
def batch_check_data(client: redis.Redis, gen): def batch_check_data(client: redis.Redis, gen):
for group in grouper(BATCH_SIZE, gen): for group in grouper(BATCH_SIZE, gen):
vals = client.mget(k for k, _ in group) vals = [as_str_val(v) for v in client.mget(k for k, _ in group)]
assert all(as_str_val(vals[i]) == v for i, (_, v) in enumerate(group)) gvals = [v for _, v in group]
assert vals == gvals
async def batch_check_data_async(client: aioredis.Redis, gen): async def batch_check_data_async(client: aioredis.Redis, gen):
for group in grouper(BATCH_SIZE, gen): for group in grouper(BATCH_SIZE, gen):
vals = await client.mget(k for k, _ in group) vals = [as_str_val(v) for v in await client.mget(k for k, _ in group)]
assert all(as_str_val(vals[i]) == v for i, (_, v) in enumerate(group)) gvals = [v for _, v in group]
assert vals == gvals
def wait_available(client: redis.Redis): def wait_available(client: redis.Redis):
its = 0 its = 0