bug(rdb load): Rdb loader tasks stop running on failure #567 (#576)

Signed-off-by: adi_holden <adi@dragonflydb.io>

Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
adiholden 2022-12-18 18:25:13 +02:00 committed by GitHub
parent c18cb8208e
commit d11b0d10da
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 9 additions and 4 deletions

View file

@ -1696,6 +1696,8 @@ error_code RdbLoader::Load(io::Source* src) {
settings.now = mstime();
size_t keys_loaded = 0;
auto cleanup = absl::Cleanup([&] { FinishLoad(start, &keys_loaded); });
while (!stop_early_.load(memory_order_relaxed)) {
/* Read type. */
SET_OR_RETURN(FetchType(), type);
@ -1814,6 +1816,10 @@ error_code RdbLoader::Load(io::Source* src) {
/* Verify the checksum if RDB version is >= 5 */
RETURN_ON_ERR(VerifyChecksum());
return kOk;
}
void RdbLoader::FinishLoad(absl::Time start_time, size_t* keys_loaded) {
fibers_ext::BlockingCounter bc(shard_set->size());
for (unsigned i = 0; i < shard_set->size(); ++i) {
// Flush the remaining items.
@ -1824,11 +1830,9 @@ error_code RdbLoader::Load(io::Source* src) {
}
bc.Wait(); // wait for sentinels to report.
absl::Duration dur = absl::Now() - start;
absl::Duration dur = absl::Now() - start_time;
load_time_ = double(absl::ToInt64Milliseconds(dur)) / 1000;
keys_loaded_ = keys_loaded;
return kOk;
keys_loaded_ = *keys_loaded;
}
std::error_code RdbLoaderBase::EnsureRead(size_t min_sz) {

View file

@ -202,6 +202,7 @@ class RdbLoader : protected RdbLoaderBase {
std::error_code VerifyChecksum();
void FlushShardAsync(ShardId sid);
void FinishLoad(absl::Time start_time, size_t* keys_loaded);
void LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib);