diff --git a/src/server/bitops_family_test.cc b/src/server/bitops_family_test.cc index e5b680e2e..78394a49d 100644 --- a/src/server/bitops_family_test.cc +++ b/src/server/bitops_family_test.cc @@ -19,7 +19,6 @@ #include "server/error.h" #include "server/test_utils.h" #include "server/transaction.h" -#include "util/uring/uring_pool.h" using namespace testing; using namespace std; diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index 24a0be2c7..487af77da 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -19,7 +19,7 @@ #include "server/server_state.h" #include "server/string_family.h" #include "server/transaction.h" -#include "util/uring/uring_fiber_algo.h" +#include "util/fiber_sched_algo.h" using namespace std; diff --git a/src/server/dragonfly_test.cc b/src/server/dragonfly_test.cc index 8fbbade05..3d5631725 100644 --- a/src/server/dragonfly_test.cc +++ b/src/server/dragonfly_test.cc @@ -19,7 +19,6 @@ extern "C" { #include "server/conn_context.h" #include "server/main_service.h" #include "server/test_utils.h" -#include "util/uring/uring_pool.h" namespace dfly { diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index 70211db2c..f2348d8f9 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -13,7 +13,6 @@ #include "server/string_family.h" #include "server/test_utils.h" #include "server/transaction.h" -#include "util/uring/uring_pool.h" using namespace testing; using namespace std; diff --git a/src/server/list_family_test.cc b/src/server/list_family_test.cc index 5f5f522b1..761792e6f 100644 --- a/src/server/list_family_test.cc +++ b/src/server/list_family_test.cc @@ -15,7 +15,6 @@ #include "server/string_family.h" #include "server/test_utils.h" #include "server/transaction.h" -#include "util/uring/uring_pool.h" using namespace testing; using namespace std; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 27217ff2a..ae34593df 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -36,7 +36,6 @@ extern "C" { #include "server/version.h" #include "server/zset_family.h" #include "util/html/sorted_table.h" -#include "util/uring/uring_fiber_algo.h" #include "util/varz.h" using namespace std; diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index ee51c7d86..711439c9f 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -672,15 +672,6 @@ error_code RdbSerializer::SaveLzfBlob(const io::Bytes& src, size_t uncompressed_ return error_code{}; } -io::Result LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) { - io::Result res = lf_->WriteSome(v, len, offset_, 0); - if (res) { - offset_ += *res; - } - - return res; -} - AlignedBuffer::AlignedBuffer(size_t cap, ::io::Sink* upstream) : capacity_(cap), upstream_(upstream) { aligned_buf_ = (char*)mi_malloc_aligned(kBufLen, 4_KB); diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 278fdcb93..009aa50d6 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -15,7 +15,6 @@ extern "C" { #include "io/io.h" #include "server/common.h" #include "server/table.h" -#include "util/uring/uring_file.h" typedef struct rax rax; typedef struct streamCG streamCG; @@ -24,22 +23,6 @@ namespace dfly { class EngineShard; -class LinuxWriteWrapper : public io::Sink { - public: - LinuxWriteWrapper(util::uring::LinuxFile* lf) : lf_(lf) { - } - - io::Result WriteSome(const iovec* v, uint32_t len) final; - - std::error_code Close() { - return lf_->Close(); - } - - private: - util::uring::LinuxFile* lf_; - off_t offset_ = 0; -}; - class AlignedBuffer : public ::io::Sink { public: using io::Sink::Write; diff --git a/src/server/rdb_test.cc b/src/server/rdb_test.cc index 4dcb81726..2c5d4fd1d 100644 --- a/src/server/rdb_test.cc +++ b/src/server/rdb_test.cc @@ -20,7 +20,6 @@ extern "C" { #include "server/engine_shard_set.h" #include "server/rdb_load.h" #include "server/test_utils.h" -#include "util/uring/uring_pool.h" using namespace testing; using namespace std; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index d80e7c3dc..df4db0ddb 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -150,13 +150,28 @@ bool IsValidSaveScheduleNibble(string_view time, unsigned int max) { return min_match <= max; } -class RdbSnapshot { +// takes ownership over the file. +class LinuxWriteWrapper : public io::Sink { public: - RdbSnapshot(bool single_shard, uring::LinuxFile* fl) - : file_(fl), linux_sink_(fl), saver_(&linux_sink_, single_shard, kRdbWriteFlags & O_DIRECT) { + LinuxWriteWrapper(uring::LinuxFile* lf) : lf_(lf) { } - error_code Start(const StringVec& lua_scripts); + io::Result WriteSome(const iovec* v, uint32_t len) final; + + std::error_code Close() { + return lf_->Close(); + } + + private: + std::unique_ptr lf_; + off_t offset_ = 0; +}; + +class RdbSnapshot { + public: + RdbSnapshot() {} + + error_code Start(bool single_shard, const std::string& path, const StringVec& lua_scripts); void StartInShard(EngineShard* shard); error_code SaveBody(); @@ -173,26 +188,45 @@ class RdbSnapshot { private: bool started_ = false; - unique_ptr file_; - LinuxWriteWrapper linux_sink_; - RdbSaver saver_; + std::unique_ptr io_sink_; + std::unique_ptr saver_; RdbTypeFreqMap freq_map_; }; -error_code RdbSnapshot::Start(const StringVec& lua_scripts) { - return saver_.SaveHeader(lua_scripts); + +io::Result LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) { + io::Result res = lf_->WriteSome(v, len, offset_, 0); + if (res) { + offset_ += *res; + } + + return res; +} + +error_code RdbSnapshot::Start(bool sharded_snapshot, + const std::string& path, const StringVec& lua_scripts) { + auto res = uring::OpenLinux(path, kRdbWriteFlags, 0666); + if (!res) { + return res.error(); + } + + io_sink_.reset(new LinuxWriteWrapper(res->release())); + saver_.reset(new RdbSaver(io_sink_.get(), sharded_snapshot, kRdbWriteFlags & O_DIRECT)); + + return saver_->SaveHeader(lua_scripts); } error_code RdbSnapshot::SaveBody() { - return saver_.SaveBody(&freq_map_); + return saver_->SaveBody(&freq_map_); } error_code RdbSnapshot::Close() { - return linux_sink_.Close(); + // TODO: to solve it in a more elegant way. + return static_cast(io_sink_.get())->Close(); } void RdbSnapshot::StartInShard(EngineShard* shard) { - saver_.StartSnapshotInShard(false, shard); + saver_->StartSnapshotInShard(false, shard); started_ = true; } @@ -738,28 +772,21 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er auto cb = [&](Transaction* t, EngineShard* shard) { fs::path shard_file = filename, abs_path = path; ShardId sid = shard->shard_id(); - error_code local_ec; ExtendFilename(now, sid, &shard_file); abs_path += shard_file; VLOG(1) << "Saving to " << abs_path; - auto res = uring::OpenLinux(abs_path.generic_string(), kRdbWriteFlags, 0666); - - if (res) { - snapshots[sid].reset(new RdbSnapshot{true, res.value().release()}); - auto& snapshot = *snapshots[sid]; - local_ec = snapshot.Start(lua_scripts); - if (!local_ec) { - snapshot.StartInShard(shard); - } - } else { - local_ec = res.error(); - } - + snapshots[sid].reset(new RdbSnapshot); + auto& snapshot = snapshots[sid]; + error_code local_ec = snapshot->Start(true, abs_path.generic_string(), lua_scripts); if (local_ec) { + snapshot.reset(); // Reset to make sure stages won't block on faulty snapshots. + lock_guard lk(mu); UpdateError(local_ec, &ec); + } else { + snapshot->StartInShard(shard); } return OpStatus::OK; @@ -771,15 +798,10 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er ExtendFilename(now, -1, &filename); path += filename; - - auto res = uring::OpenLinux(path.generic_string(), kRdbWriteFlags, 0666); - if (!res) { - return res.error(); - } VLOG(1) << "Saving to " << path; - snapshots[0].reset(new RdbSnapshot{false, res.value().release()}); - ec = snapshots[0]->Start(lua_scripts); + snapshots[0].reset(new RdbSnapshot); + ec = snapshots[0]->Start(false, path.generic_string(), lua_scripts); if (!ec) { auto cb = [&](Transaction* t, EngineShard* shard) { diff --git a/src/server/string_family_test.cc b/src/server/string_family_test.cc index 142552892..6e74b684e 100644 --- a/src/server/string_family_test.cc +++ b/src/server/string_family_test.cc @@ -13,7 +13,6 @@ #include "server/error.h" #include "server/test_utils.h" #include "server/transaction.h" -#include "util/uring/uring_pool.h" using namespace testing; using namespace std;