mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
fix(tiering): Throttle snapshot load (#3249)
This commit is contained in:
parent
d8946247df
commit
2bf4451ec7
8 changed files with 118 additions and 36 deletions
|
@ -5,6 +5,7 @@
|
|||
#include "server/rdb_load.h"
|
||||
|
||||
#include "absl/strings/escaping.h"
|
||||
#include "server/tiered_storage.h"
|
||||
|
||||
extern "C" {
|
||||
#include "redis/intset.h"
|
||||
|
@ -1917,8 +1918,9 @@ struct RdbLoader::ObjSettings {
|
|||
};
|
||||
|
||||
RdbLoader::RdbLoader(Service* service)
|
||||
: service_{service}, script_mgr_{service == nullptr ? nullptr : service->script_mgr()} {
|
||||
shard_buf_.reset(new ItemsBuf[shard_set->size()]);
|
||||
: service_{service},
|
||||
script_mgr_{service == nullptr ? nullptr : service->script_mgr()},
|
||||
shard_buf_{shard_set->size()} {
|
||||
}
|
||||
|
||||
RdbLoader::~RdbLoader() {
|
||||
|
@ -2421,8 +2423,17 @@ void RdbLoader::FlushShardAsync(ShardId sid) {
|
|||
|
||||
auto cb = [indx = this->cur_db_index_, this, ib = std::move(out_buf)] {
|
||||
this->LoadItemsBuffer(indx, ib);
|
||||
|
||||
// Block, if tiered storage is active, but can't keep up
|
||||
while (EngineShard::tlocal()->ShouldThrottleForTiering()) {
|
||||
this->blocked_shards_.fetch_add(1, memory_order_relaxed); // stop adding items to shard queue
|
||||
ThisFiber::SleepFor(100us);
|
||||
this->blocked_shards_.fetch_sub(1, memory_order_relaxed);
|
||||
}
|
||||
};
|
||||
|
||||
while (blocked_shards_.load(memory_order_relaxed) > 0)
|
||||
ThisFiber::SleepFor(100us);
|
||||
shard_set->Add(sid, std::move(cb));
|
||||
}
|
||||
|
||||
|
@ -2439,7 +2450,8 @@ std::error_code RdbLoaderBase::FromOpaque(const OpaqueObj& opaque, CompactObj* p
|
|||
}
|
||||
|
||||
void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
|
||||
DbSlice& db_slice = EngineShard::tlocal()->db_slice();
|
||||
EngineShard* es = EngineShard::tlocal();
|
||||
DbSlice& db_slice = es->db_slice();
|
||||
DbContext db_cntx{db_ind, GetCurrentTimeMs()};
|
||||
|
||||
for (const auto* item : ib) {
|
||||
|
@ -2466,6 +2478,9 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {
|
|||
if (!res.is_new) {
|
||||
LOG(WARNING) << "RDB has duplicated key '" << item->key << "' in DB " << db_ind;
|
||||
}
|
||||
|
||||
if (auto* ts = es->tiered_storage(); ts)
|
||||
ts->TryStash(db_cntx.db_index, item->key, &res.it->second);
|
||||
}
|
||||
|
||||
for (auto* item : ib) {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue