mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-12 19:05:47 +02:00
fix: Fix bugs around rdb save and improve memory accounting (#495)
Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
1236f92381
commit
c2294e1298
4 changed files with 24 additions and 16 deletions
|
@ -11,11 +11,11 @@
|
||||||
|
|
||||||
namespace dfly {
|
namespace dfly {
|
||||||
|
|
||||||
using facade::kWrongTypeErr;
|
using facade::kDbIndOutOfRangeErr;
|
||||||
|
using facade::kInvalidDbIndErr;
|
||||||
using facade::kInvalidIntErr;
|
using facade::kInvalidIntErr;
|
||||||
using facade::kSyntaxErr;
|
using facade::kSyntaxErr;
|
||||||
using facade::kInvalidDbIndErr;
|
using facade::kWrongTypeErr;
|
||||||
using facade::kDbIndOutOfRangeErr;
|
|
||||||
|
|
||||||
#ifndef RETURN_ON_ERR
|
#ifndef RETURN_ON_ERR
|
||||||
|
|
||||||
|
@ -23,7 +23,7 @@ using facade::kDbIndOutOfRangeErr;
|
||||||
do { \
|
do { \
|
||||||
auto __ec = (x); \
|
auto __ec = (x); \
|
||||||
if (__ec) { \
|
if (__ec) { \
|
||||||
VLOG(1) << "Error " << __ec << " while calling " #x; \
|
LOG(ERROR) << "Error " << __ec << " while calling " #x; \
|
||||||
return __ec; \
|
return __ec; \
|
||||||
} \
|
} \
|
||||||
} while (0)
|
} while (0)
|
||||||
|
|
|
@ -52,7 +52,7 @@ void MemoryCmd::Run(CmdArgList args) {
|
||||||
return (*cntx_)->SendError(kInvalidIntErr);
|
return (*cntx_)->SendError(kInvalidIntErr);
|
||||||
}
|
}
|
||||||
tid = tid % shard_set->pool()->size();
|
tid = tid % shard_set->pool()->size();
|
||||||
string res = shard_set->pool()->at(tid)->AwaitBrief([this] { return MallocStats(); });
|
string res = shard_set->pool()->at(tid)->AwaitBrief([&] { return MallocStats(tid); });
|
||||||
|
|
||||||
return (*cntx_)->SendBulkString(res);
|
return (*cntx_)->SendBulkString(res);
|
||||||
}
|
}
|
||||||
|
@ -61,28 +61,33 @@ void MemoryCmd::Run(CmdArgList args) {
|
||||||
return (*cntx_)->SendError(err, kSyntaxErrType);
|
return (*cntx_)->SendError(err, kSyntaxErrType);
|
||||||
}
|
}
|
||||||
|
|
||||||
string MemoryCmd::MallocStats() {
|
string MemoryCmd::MallocStats(unsigned tid) {
|
||||||
string str;
|
string str;
|
||||||
|
|
||||||
uint64_t start = absl::GetCurrentTimeNanos();
|
uint64_t start = absl::GetCurrentTimeNanos();
|
||||||
absl::StrAppend(&str, "___ Begin mimalloc statistics ___\n");
|
absl::StrAppend(&str, "___ Begin mimalloc statistics ___\n");
|
||||||
mi_stats_print_out(MiStatsCallback, &str);
|
mi_stats_print_out(MiStatsCallback, &str);
|
||||||
|
|
||||||
absl::StrAppend(&str, "\nArena statistics from a single thread:\n");
|
absl::StrAppend(&str, "\nArena statistics from thread:", tid, "\n");
|
||||||
absl::StrAppend(&str, "Count BlockSize Reserved Committed Used\n");
|
absl::StrAppend(&str, "Count BlockSize Reserved Committed Used\n");
|
||||||
|
|
||||||
mi_heap_t* data_heap = ServerState::tlocal()->data_heap();
|
mi_heap_t* data_heap = ServerState::tlocal()->data_heap();
|
||||||
BlockMap block_map;
|
BlockMap block_map;
|
||||||
|
|
||||||
mi_heap_visit_blocks(data_heap, false /* visit all blocks*/, MiArenaVisit, &block_map);
|
mi_heap_visit_blocks(data_heap, false /* visit all blocks*/, MiArenaVisit, &block_map);
|
||||||
|
uint64_t reserved = 0, committed = 0, used = 0;
|
||||||
for (const auto& k_v : block_map) {
|
for (const auto& k_v : block_map) {
|
||||||
absl::StrAppend(&str, k_v.second, " ", get<0>(k_v.first), " ", get<1>(k_v.first), " ",
|
absl::StrAppend(&str, k_v.second, " ", get<0>(k_v.first), " ", get<1>(k_v.first), " ",
|
||||||
get<2>(k_v.first), " ", get<3>(k_v.first), "\n");
|
get<2>(k_v.first), " ", get<3>(k_v.first), "\n");
|
||||||
|
reserved += k_v.second * get<1>(k_v.first);
|
||||||
|
committed += k_v.second * get<2>(k_v.first);
|
||||||
|
used += k_v.second * get<3>(k_v.first);
|
||||||
}
|
}
|
||||||
|
|
||||||
uint64_t delta = (absl::GetCurrentTimeNanos() - start) / 1000;
|
uint64_t delta = (absl::GetCurrentTimeNanos() - start) / 1000;
|
||||||
absl::StrAppend(&str, "--- End mimalloc statistics, took ", delta, "us ---\n");
|
absl::StrAppend(&str, "--- End mimalloc statistics, took ", delta, "us ---\n");
|
||||||
|
absl::StrAppend(&str, "total reserved: ", reserved, ", comitted: ", committed, ", used: ", used,
|
||||||
|
"\n");
|
||||||
|
|
||||||
return str;
|
return str;
|
||||||
}
|
}
|
||||||
|
|
|
@ -17,7 +17,7 @@ class MemoryCmd {
|
||||||
void Run(CmdArgList args);
|
void Run(CmdArgList args);
|
||||||
|
|
||||||
private:
|
private:
|
||||||
std::string MallocStats();
|
std::string MallocStats(unsigned tid);
|
||||||
|
|
||||||
ServerFamily& sf_;
|
ServerFamily& sf_;
|
||||||
ConnectionContext* cntx_;
|
ConnectionContext* cntx_;
|
||||||
|
|
|
@ -718,6 +718,9 @@ io::Result<size_t> AlignedBuffer::WriteSome(const iovec* v, uint32_t len) {
|
||||||
// to the nearest page boundary.
|
// to the nearest page boundary.
|
||||||
error_code AlignedBuffer::Flush() {
|
error_code AlignedBuffer::Flush() {
|
||||||
size_t len = (buf_offs_ + kAmask) & (~kAmask);
|
size_t len = (buf_offs_ + kAmask) & (~kAmask);
|
||||||
|
if (len == 0)
|
||||||
|
return error_code{};
|
||||||
|
|
||||||
iovec ivec{.iov_base = aligned_buf_, .iov_len = len};
|
iovec ivec{.iov_base = aligned_buf_, .iov_len = len};
|
||||||
buf_offs_ = 0;
|
buf_offs_ = 0;
|
||||||
|
|
||||||
|
@ -922,7 +925,7 @@ error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) {
|
||||||
|
|
||||||
error_code io_error = impl_->ConsumeChannel();
|
error_code io_error = impl_->ConsumeChannel();
|
||||||
if (io_error) {
|
if (io_error) {
|
||||||
VLOG(1) << "io error " << io_error;
|
LOG(ERROR) << "io error " << io_error;
|
||||||
return io_error;
|
return io_error;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue