mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
chore(server): reduce iouring exposure across the codebase (#342)
Refactor RdbSnapshot and consolidate iouring linux file handling in a single place.
This commit is contained in:
parent
21feebe47b
commit
cf779c08a4
11 changed files with 56 additions and 67 deletions
|
@ -19,7 +19,6 @@
|
||||||
#include "server/error.h"
|
#include "server/error.h"
|
||||||
#include "server/test_utils.h"
|
#include "server/test_utils.h"
|
||||||
#include "server/transaction.h"
|
#include "server/transaction.h"
|
||||||
#include "util/uring/uring_pool.h"
|
|
||||||
|
|
||||||
using namespace testing;
|
using namespace testing;
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
|
@ -19,7 +19,7 @@
|
||||||
#include "server/server_state.h"
|
#include "server/server_state.h"
|
||||||
#include "server/string_family.h"
|
#include "server/string_family.h"
|
||||||
#include "server/transaction.h"
|
#include "server/transaction.h"
|
||||||
#include "util/uring/uring_fiber_algo.h"
|
#include "util/fiber_sched_algo.h"
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
||||||
|
|
|
@ -19,7 +19,6 @@ extern "C" {
|
||||||
#include "server/conn_context.h"
|
#include "server/conn_context.h"
|
||||||
#include "server/main_service.h"
|
#include "server/main_service.h"
|
||||||
#include "server/test_utils.h"
|
#include "server/test_utils.h"
|
||||||
#include "util/uring/uring_pool.h"
|
|
||||||
|
|
||||||
namespace dfly {
|
namespace dfly {
|
||||||
|
|
||||||
|
|
|
@ -13,7 +13,6 @@
|
||||||
#include "server/string_family.h"
|
#include "server/string_family.h"
|
||||||
#include "server/test_utils.h"
|
#include "server/test_utils.h"
|
||||||
#include "server/transaction.h"
|
#include "server/transaction.h"
|
||||||
#include "util/uring/uring_pool.h"
|
|
||||||
|
|
||||||
using namespace testing;
|
using namespace testing;
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
|
@ -15,7 +15,6 @@
|
||||||
#include "server/string_family.h"
|
#include "server/string_family.h"
|
||||||
#include "server/test_utils.h"
|
#include "server/test_utils.h"
|
||||||
#include "server/transaction.h"
|
#include "server/transaction.h"
|
||||||
#include "util/uring/uring_pool.h"
|
|
||||||
|
|
||||||
using namespace testing;
|
using namespace testing;
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
|
@ -36,7 +36,6 @@ extern "C" {
|
||||||
#include "server/version.h"
|
#include "server/version.h"
|
||||||
#include "server/zset_family.h"
|
#include "server/zset_family.h"
|
||||||
#include "util/html/sorted_table.h"
|
#include "util/html/sorted_table.h"
|
||||||
#include "util/uring/uring_fiber_algo.h"
|
|
||||||
#include "util/varz.h"
|
#include "util/varz.h"
|
||||||
|
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
|
@ -672,15 +672,6 @@ error_code RdbSerializer::SaveLzfBlob(const io::Bytes& src, size_t uncompressed_
|
||||||
return error_code{};
|
return error_code{};
|
||||||
}
|
}
|
||||||
|
|
||||||
io::Result<size_t> LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) {
|
|
||||||
io::Result<size_t> res = lf_->WriteSome(v, len, offset_, 0);
|
|
||||||
if (res) {
|
|
||||||
offset_ += *res;
|
|
||||||
}
|
|
||||||
|
|
||||||
return res;
|
|
||||||
}
|
|
||||||
|
|
||||||
AlignedBuffer::AlignedBuffer(size_t cap, ::io::Sink* upstream)
|
AlignedBuffer::AlignedBuffer(size_t cap, ::io::Sink* upstream)
|
||||||
: capacity_(cap), upstream_(upstream) {
|
: capacity_(cap), upstream_(upstream) {
|
||||||
aligned_buf_ = (char*)mi_malloc_aligned(kBufLen, 4_KB);
|
aligned_buf_ = (char*)mi_malloc_aligned(kBufLen, 4_KB);
|
||||||
|
|
|
@ -15,7 +15,6 @@ extern "C" {
|
||||||
#include "io/io.h"
|
#include "io/io.h"
|
||||||
#include "server/common.h"
|
#include "server/common.h"
|
||||||
#include "server/table.h"
|
#include "server/table.h"
|
||||||
#include "util/uring/uring_file.h"
|
|
||||||
|
|
||||||
typedef struct rax rax;
|
typedef struct rax rax;
|
||||||
typedef struct streamCG streamCG;
|
typedef struct streamCG streamCG;
|
||||||
|
@ -24,22 +23,6 @@ namespace dfly {
|
||||||
|
|
||||||
class EngineShard;
|
class EngineShard;
|
||||||
|
|
||||||
class LinuxWriteWrapper : public io::Sink {
|
|
||||||
public:
|
|
||||||
LinuxWriteWrapper(util::uring::LinuxFile* lf) : lf_(lf) {
|
|
||||||
}
|
|
||||||
|
|
||||||
io::Result<size_t> WriteSome(const iovec* v, uint32_t len) final;
|
|
||||||
|
|
||||||
std::error_code Close() {
|
|
||||||
return lf_->Close();
|
|
||||||
}
|
|
||||||
|
|
||||||
private:
|
|
||||||
util::uring::LinuxFile* lf_;
|
|
||||||
off_t offset_ = 0;
|
|
||||||
};
|
|
||||||
|
|
||||||
class AlignedBuffer : public ::io::Sink {
|
class AlignedBuffer : public ::io::Sink {
|
||||||
public:
|
public:
|
||||||
using io::Sink::Write;
|
using io::Sink::Write;
|
||||||
|
|
|
@ -20,7 +20,6 @@ extern "C" {
|
||||||
#include "server/engine_shard_set.h"
|
#include "server/engine_shard_set.h"
|
||||||
#include "server/rdb_load.h"
|
#include "server/rdb_load.h"
|
||||||
#include "server/test_utils.h"
|
#include "server/test_utils.h"
|
||||||
#include "util/uring/uring_pool.h"
|
|
||||||
|
|
||||||
using namespace testing;
|
using namespace testing;
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
|
@ -150,13 +150,28 @@ bool IsValidSaveScheduleNibble(string_view time, unsigned int max) {
|
||||||
return min_match <= max;
|
return min_match <= max;
|
||||||
}
|
}
|
||||||
|
|
||||||
class RdbSnapshot {
|
// takes ownership over the file.
|
||||||
|
class LinuxWriteWrapper : public io::Sink {
|
||||||
public:
|
public:
|
||||||
RdbSnapshot(bool single_shard, uring::LinuxFile* fl)
|
LinuxWriteWrapper(uring::LinuxFile* lf) : lf_(lf) {
|
||||||
: file_(fl), linux_sink_(fl), saver_(&linux_sink_, single_shard, kRdbWriteFlags & O_DIRECT) {
|
|
||||||
}
|
}
|
||||||
|
|
||||||
error_code Start(const StringVec& lua_scripts);
|
io::Result<size_t> WriteSome(const iovec* v, uint32_t len) final;
|
||||||
|
|
||||||
|
std::error_code Close() {
|
||||||
|
return lf_->Close();
|
||||||
|
}
|
||||||
|
|
||||||
|
private:
|
||||||
|
std::unique_ptr<uring::LinuxFile> lf_;
|
||||||
|
off_t offset_ = 0;
|
||||||
|
};
|
||||||
|
|
||||||
|
class RdbSnapshot {
|
||||||
|
public:
|
||||||
|
RdbSnapshot() {}
|
||||||
|
|
||||||
|
error_code Start(bool single_shard, const std::string& path, const StringVec& lua_scripts);
|
||||||
void StartInShard(EngineShard* shard);
|
void StartInShard(EngineShard* shard);
|
||||||
|
|
||||||
error_code SaveBody();
|
error_code SaveBody();
|
||||||
|
@ -173,26 +188,45 @@ class RdbSnapshot {
|
||||||
private:
|
private:
|
||||||
bool started_ = false;
|
bool started_ = false;
|
||||||
|
|
||||||
unique_ptr<uring::LinuxFile> file_;
|
std::unique_ptr<io::Sink> io_sink_;
|
||||||
LinuxWriteWrapper linux_sink_;
|
std::unique_ptr<RdbSaver> saver_;
|
||||||
RdbSaver saver_;
|
|
||||||
RdbTypeFreqMap freq_map_;
|
RdbTypeFreqMap freq_map_;
|
||||||
};
|
};
|
||||||
|
|
||||||
error_code RdbSnapshot::Start(const StringVec& lua_scripts) {
|
|
||||||
return saver_.SaveHeader(lua_scripts);
|
io::Result<size_t> LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) {
|
||||||
|
io::Result<size_t> res = lf_->WriteSome(v, len, offset_, 0);
|
||||||
|
if (res) {
|
||||||
|
offset_ += *res;
|
||||||
|
}
|
||||||
|
|
||||||
|
return res;
|
||||||
|
}
|
||||||
|
|
||||||
|
error_code RdbSnapshot::Start(bool sharded_snapshot,
|
||||||
|
const std::string& path, const StringVec& lua_scripts) {
|
||||||
|
auto res = uring::OpenLinux(path, kRdbWriteFlags, 0666);
|
||||||
|
if (!res) {
|
||||||
|
return res.error();
|
||||||
|
}
|
||||||
|
|
||||||
|
io_sink_.reset(new LinuxWriteWrapper(res->release()));
|
||||||
|
saver_.reset(new RdbSaver(io_sink_.get(), sharded_snapshot, kRdbWriteFlags & O_DIRECT));
|
||||||
|
|
||||||
|
return saver_->SaveHeader(lua_scripts);
|
||||||
}
|
}
|
||||||
|
|
||||||
error_code RdbSnapshot::SaveBody() {
|
error_code RdbSnapshot::SaveBody() {
|
||||||
return saver_.SaveBody(&freq_map_);
|
return saver_->SaveBody(&freq_map_);
|
||||||
}
|
}
|
||||||
|
|
||||||
error_code RdbSnapshot::Close() {
|
error_code RdbSnapshot::Close() {
|
||||||
return linux_sink_.Close();
|
// TODO: to solve it in a more elegant way.
|
||||||
|
return static_cast<LinuxWriteWrapper*>(io_sink_.get())->Close();
|
||||||
}
|
}
|
||||||
|
|
||||||
void RdbSnapshot::StartInShard(EngineShard* shard) {
|
void RdbSnapshot::StartInShard(EngineShard* shard) {
|
||||||
saver_.StartSnapshotInShard(false, shard);
|
saver_->StartSnapshotInShard(false, shard);
|
||||||
started_ = true;
|
started_ = true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -738,28 +772,21 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er
|
||||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||||
fs::path shard_file = filename, abs_path = path;
|
fs::path shard_file = filename, abs_path = path;
|
||||||
ShardId sid = shard->shard_id();
|
ShardId sid = shard->shard_id();
|
||||||
error_code local_ec;
|
|
||||||
|
|
||||||
ExtendFilename(now, sid, &shard_file);
|
ExtendFilename(now, sid, &shard_file);
|
||||||
abs_path += shard_file;
|
abs_path += shard_file;
|
||||||
|
|
||||||
VLOG(1) << "Saving to " << abs_path;
|
VLOG(1) << "Saving to " << abs_path;
|
||||||
auto res = uring::OpenLinux(abs_path.generic_string(), kRdbWriteFlags, 0666);
|
snapshots[sid].reset(new RdbSnapshot);
|
||||||
|
auto& snapshot = snapshots[sid];
|
||||||
if (res) {
|
error_code local_ec = snapshot->Start(true, abs_path.generic_string(), lua_scripts);
|
||||||
snapshots[sid].reset(new RdbSnapshot{true, res.value().release()});
|
|
||||||
auto& snapshot = *snapshots[sid];
|
|
||||||
local_ec = snapshot.Start(lua_scripts);
|
|
||||||
if (!local_ec) {
|
|
||||||
snapshot.StartInShard(shard);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
local_ec = res.error();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (local_ec) {
|
if (local_ec) {
|
||||||
|
snapshot.reset(); // Reset to make sure stages won't block on faulty snapshots.
|
||||||
|
|
||||||
lock_guard lk(mu);
|
lock_guard lk(mu);
|
||||||
UpdateError(local_ec, &ec);
|
UpdateError(local_ec, &ec);
|
||||||
|
} else {
|
||||||
|
snapshot->StartInShard(shard);
|
||||||
}
|
}
|
||||||
|
|
||||||
return OpStatus::OK;
|
return OpStatus::OK;
|
||||||
|
@ -771,15 +798,10 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er
|
||||||
|
|
||||||
ExtendFilename(now, -1, &filename);
|
ExtendFilename(now, -1, &filename);
|
||||||
path += filename;
|
path += filename;
|
||||||
|
|
||||||
auto res = uring::OpenLinux(path.generic_string(), kRdbWriteFlags, 0666);
|
|
||||||
if (!res) {
|
|
||||||
return res.error();
|
|
||||||
}
|
|
||||||
VLOG(1) << "Saving to " << path;
|
VLOG(1) << "Saving to " << path;
|
||||||
|
|
||||||
snapshots[0].reset(new RdbSnapshot{false, res.value().release()});
|
snapshots[0].reset(new RdbSnapshot);
|
||||||
ec = snapshots[0]->Start(lua_scripts);
|
ec = snapshots[0]->Start(false, path.generic_string(), lua_scripts);
|
||||||
|
|
||||||
if (!ec) {
|
if (!ec) {
|
||||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||||
|
|
|
@ -13,7 +13,6 @@
|
||||||
#include "server/error.h"
|
#include "server/error.h"
|
||||||
#include "server/test_utils.h"
|
#include "server/test_utils.h"
|
||||||
#include "server/transaction.h"
|
#include "server/transaction.h"
|
||||||
#include "util/uring/uring_pool.h"
|
|
||||||
|
|
||||||
using namespace testing;
|
using namespace testing;
|
||||||
using namespace std;
|
using namespace std;
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue