fix: make snapshotting process more responsive (#3759)

* fix: improve BreakStalledFlowsInShard heuristic

Before this change - we wrote in a single call whatever record chunks we pulled from the channel.
This can be problematic for 1GB chunks for example, which might take 10sec to write.

Lately we added a replication breaker on the master side that breaks the fully sync after
a predefined threshold has passed. By default it was 10sec. To improve the robustness of this
breaker, we now write chunks of upto 1MB and update last_write_time_ns_ more frequently.

Also, we added more logs to make replication delays on both sides more visible.
We also added logs of breaking the replication on the master sides.

Unfortunately, this did not help making BreakStalledFlowsInShard more robust because now the
problem moved to replica side which may take 20s+ seconds to parse huge values.
Therefore, I increased the threshold for breaking the replication to 30s.

Finally, instrument GetMetrics call as it takes sometimes more than 1 sec.

---------

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-09-22 17:05:28 +03:00 committed by GitHub
parent 2e9b133ea0
commit f1f8ee17dc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
6 changed files with 56 additions and 13 deletions

View file

@ -737,8 +737,10 @@ void DflyCmd::BreakStalledFlowsInShard() {
int64_t timeout_ns = int64_t(absl::GetFlag(FLAGS_replication_timeout)) * 1'000'000LL;
int64_t now = absl::GetCurrentTimeNanos();
if (last_write_ns > 0 && last_write_ns + timeout_ns < now) {
VLOG(1) << "Breaking full sync for sync_id " << sync_id << " last_write_ts: " << last_write_ns
<< ", now: " << now;
LOG(INFO) << "Master detected replication timeout, breaking full sync with replica, sync_id: "
<< sync_id << " last_write_ms: " << last_write_ns / 1000'000
<< ", now: " << now / 1000'000;
deleted.push_back(sync_id);
replica_lock.unlock();
replica_ptr->Cancel();

View file

@ -13,7 +13,7 @@
using namespace facade;
ABSL_FLAG(uint32_t, replication_timeout, 10000,
ABSL_FLAG(uint32_t, replication_timeout, 30000,
"Time in milliseconds to wait for the replication writes being stuck.");
ABSL_FLAG(uint32_t, replication_stream_output_limit, 64_KB,

View file

@ -927,6 +927,7 @@ void Service::Init(util::AcceptServer* acceptor, std::vector<facade::Listener*>
return res.has_value();
});
config_registry.RegisterMutable("replica_partial_sync");
config_registry.RegisterMutable("replication_timeout");
config_registry.RegisterMutable("table_growth_margin");
config_registry.RegisterMutable("tcp_keepalive");

View file

@ -2153,7 +2153,11 @@ error_code RdbLoader::Load(io::Source* src) {
}
++keys_loaded;
int64_t start = absl::GetCurrentTimeNanos();
RETURN_ON_ERR(LoadKeyValPair(type, &settings));
int delta_ms = (absl::GetCurrentTimeNanos() - start) / 1000'000;
LOG_IF(INFO, delta_ms > 1000) << "Took " << delta_ms << " ms to load rdb_type " << type;
settings.Reset();
} // main load loop
@ -2466,7 +2470,8 @@ void RdbLoader::FlushShardAsync(ShardId sid) {
while (blocked_shards_.load(memory_order_relaxed) > 0)
ThisFiber::SleepFor(100us);
shard_set->Add(sid, std::move(cb));
bool preempted = shard_set->Add(sid, std::move(cb));
VLOG_IF(2, preempted) << "FlushShardAsync was throttled";
}
void RdbLoader::FlushAllShards() {
@ -2605,6 +2610,7 @@ error_code RdbLoader::LoadKeyValPair(int type, ObjSettings* settings) {
constexpr size_t kBufSize = 128;
if (out_buf.size() >= kBufSize) {
// Despite being async, this function can block if the shard queue is full.
FlushShardAsync(sid);
}

View file

@ -1118,6 +1118,8 @@ class RdbSaver::Impl {
}
private:
error_code WriteRecord(io::Bytes src);
unique_ptr<SliceSnapshot>& GetSnapshot(EngineShard* shard);
io::Sink* sink_;
@ -1201,28 +1203,56 @@ error_code RdbSaver::Impl::ConsumeChannel(const Cancellation* cll) {
continue;
DVLOG(2) << "Pulled " << record.id;
last_write_time_ns_ = absl::GetCurrentTimeNanos();
io_error = sink_->Write(io::Buffer(record.value));
stats.rdb_save_usec += (absl::GetCurrentTimeNanos() - last_write_time_ns_) / 1'000;
stats.rdb_save_count++;
last_write_time_ns_ = -1;
auto start = absl::GetCurrentTimeNanos();
io_error = WriteRecord(io::Buffer(record.value));
if (io_error) {
VLOG(1) << "Error writing to sink " << io_error.message();
break;
break; // from the inner TryPop loop.
}
auto delta_usec = (absl::GetCurrentTimeNanos() - start) / 1'000;
stats.rdb_save_usec += delta_usec;
stats.rdb_save_count++;
} while ((channel_.TryPop(record)));
} // while (channel_.Pop())
for (auto& ptr : shard_snapshots_) {
ptr->Join();
}
DVLOG(1) << "Finish ConsumeChannel";
VLOG(1) << "ConsumeChannel finished " << io_error;
DCHECK(!channel_.TryPop(record));
return io_error;
}
error_code RdbSaver::Impl::WriteRecord(io::Bytes src) {
// For huge values, we break them up into chunks of upto several MBs to send in a single call,
// so we could be more responsive.
error_code ec;
size_t start_size = src.size();
last_write_time_ns_ = absl::GetCurrentTimeNanos();
do {
io::Bytes part = src.subspan(0, 8_MB);
src.remove_prefix(part.size());
ec = sink_->Write(part);
int64_t now = absl::GetCurrentTimeNanos();
unsigned delta_ms = (now - last_write_time_ns_) / 1000'000;
last_write_time_ns_ = now;
// Log extreme timings into the log for visibility.
LOG_IF(INFO, delta_ms > 1000) << "Channel write took " << delta_ms << " ms while writing "
<< part.size() << "/" << start_size;
if (ec) {
LOG(INFO) << "Error writing to rdb sink " << ec.message();
break;
}
} while (!src.empty());
last_write_time_ns_ = -1;
return ec;
}
void RdbSaver::Impl::StartSnapshotting(bool stream_journal, const Cancellation* cll,
EngineShard* shard) {
auto& s = GetSnapshot(shard);

View file

@ -2197,7 +2197,11 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
absl::StrAppend(&info, a1, ":", a2, "\r\n");
};
uint64_t start = absl::GetCurrentTimeNanos();
Metrics m = GetMetrics(cntx->ns);
uint64_t delta_ms = (absl::GetCurrentTimeNanos() - start) / 1000'000;
LOG_IF(INFO, delta_ms > 100) << "GetMetrics took " << delta_ms << " ms";
DbStats total;
for (const auto& db_stats : m.db_stats)
total += db_stats;