From c77e7cc09ffb3fe0bb6d80ce6a3c9dd0c7367590 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Thu, 9 Jan 2025 12:22:47 +0200 Subject: [PATCH] fix: improve error propagation with RESTORE commands (#4428) * fix: improve error propagation with RESTORE commands Also, provide better logs if AddOrNew function fails adding a new entry --- src/server/db_slice.cc | 17 ++++-- src/server/generic_family.cc | 87 +++++++++++++++---------------- src/server/generic_family_test.cc | 27 ++++++++++ 3 files changed, 82 insertions(+), 49 deletions(-) diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 4f2b5aa43..f67cf5cda 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -128,11 +128,20 @@ bool PrimeEvictionPolicy::CanGrow(const PrimeTable& tbl) const { // we estimate how much memory we will take with the current capacity // even though we may currently use less memory. // see https://github.com/dragonflydb/dragonfly/issues/256#issuecomment-1227095503 - size_t table_free_items = (tbl.capacity() - tbl.size()) + PrimeTable::kSegCapacity; - size_t obj_bytes_estimation = - db_slice_->bytes_per_object() * table_free_items * GetFlag(FLAGS_table_growth_margin); + size_t table_free_items = ((tbl.capacity() - tbl.size()) + PrimeTable::kSegCapacity) * + GetFlag(FLAGS_table_growth_margin); + + size_t obj_bytes_estimation = db_slice_->bytes_per_object() * table_free_items; bool res = mem_available > int64_t(PrimeTable::kSegBytes + obj_bytes_estimation); - VLOG(2) << "available: " << table_free_items << ", res: " << res; + if (res) { + VLOG(1) << "free_items: " << table_free_items + << ", obj_bytes: " << db_slice_->bytes_per_object() << " " + << " mem_available: " << mem_available; + } else { + LOG_EVERY_T(INFO, 1) << "Can't grow, free_items " << table_free_items + << ", obj_bytes: " << db_slice_->bytes_per_object() << " " + << " mem_available: " << mem_available; + } return res; } diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 3c37e5135..0f89fd8db 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -53,6 +53,7 @@ std::optional GetRdbVersion(std::string_view msg) { return std::nullopt; } + // The footer looks like this: version (2 bytes) | crc64 (8 bytes) const std::uint8_t* footer = reinterpret_cast(msg.data()) + (msg.size() - DUMP_FOOTER_SIZE); const RdbVersion version = (*(footer + 1) << 8 | (*footer)); @@ -63,9 +64,10 @@ std::optional GetRdbVersion(std::string_view msg) { return std::nullopt; } - uint64_t expected_cs = + // Compute expected crc64 based on the actual data upto the expected crc64 field. + uint64_t actual_cs = crc64(0, reinterpret_cast(msg.data()), msg.size() - sizeof(uint64_t)); - uint64_t actual_cs = absl::little_endian::Load64(footer + sizeof(version)); + uint64_t expected_cs = absl::little_endian::Load64(footer + 2); // skip the version if (actual_cs != expected_cs) { LOG(WARNING) << "CRC check failed for restore command, expecting: " << expected_cs << " got " @@ -155,12 +157,9 @@ class RdbRestoreValue : protected RdbLoaderBase { rdb_version_ = rdb_version; } - // Returns default ItAndUpdater if Add failed. - // In case a valid ItAndUpdater is returned, then second is true in case a new key is added, - // false if the existing key is updated (should not happen unless we have a bug). - pair Add(string_view key, string_view payload, const DbContext& cntx, - const RestoreArgs& args, DbSlice* db_slice, - EngineShard* shard); + OpResult Add(string_view key, string_view payload, + const DbContext& cntx, const RestoreArgs& args, + DbSlice* db_slice); private: std::optional Parse(io::Source* source); @@ -191,17 +190,17 @@ std::optional RdbRestoreValue::Parse(io::Source* sourc return std::optional(std::move(obj)); } -pair RdbRestoreValue::Add(string_view key, string_view data, - const DbContext& cntx, - const RestoreArgs& args, DbSlice* db_slice, - EngineShard* shard) { +OpResult RdbRestoreValue::Add(string_view key, string_view data, + const DbContext& cntx, + const RestoreArgs& args, + DbSlice* db_slice) { InMemSource data_src(data); PrimeValue pv; bool first_parse = true; do { auto opaque_res = Parse(&data_src); if (!opaque_res) { - return {}; + return OpStatus::INVALID_VALUE; } LoadConfig config; @@ -217,18 +216,16 @@ pair RdbRestoreValue::Add(string_view key, string_v if (auto ec = FromOpaque(*opaque_res, config, &pv); ec) { // we failed - report and exit LOG(WARNING) << "error while trying to read data: " << ec; - return {}; + return OpStatus::INVALID_VALUE; } } while (pending_read_.remaining > 0); - if (auto res = db_slice->AddOrUpdate(cntx, key, std::move(pv), args.ExpirationTime()); res) { + auto res = db_slice->AddOrUpdate(cntx, key, std::move(pv), args.ExpirationTime()); + if (res) { res->it->first.SetSticky(args.Sticky()); - shard->search_indices()->AddDoc(key, cntx, res->it->second); - return {DbSlice::ItAndUpdater{ - .it = res->it, .exp_it = res->exp_it, .post_updater = std::move(res->post_updater)}, - res->is_new}; + db_slice->shard_owner()->search_indices()->AddDoc(key, cntx, res->it->second); } - return {}; + return res; } [[nodiscard]] bool RestoreArgs::UpdateExpiration(int64_t now_msec) { @@ -477,15 +474,17 @@ OpStatus Renamer::DeserializeDest(Transaction* t, EngineShard* shard) { restore_args.SetSticky(serialized_value_.sticky); RdbRestoreValue loader(serialized_value_.version.value()); - auto [restored_dest_it, is_new] = loader.Add(dest_key_, serialized_value_.value, op_args.db_cntx, - restore_args, &db_slice, shard); + auto add_res = + loader.Add(dest_key_, serialized_value_.value, op_args.db_cntx, restore_args, &db_slice); - if (restored_dest_it.IsValid()) { - LOG_IF(DFATAL, !is_new) << "Unexpected override for key " << dest_key_ << " " << dest_found_; - auto bc = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id()); - if (bc) { - bc->AwakeWatched(t->GetDbIndex(), dest_key_); - } + if (!add_res) + return add_res.status(); + + LOG_IF(DFATAL, !add_res->is_new) + << "Unexpected override for key " << dest_key_ << " " << dest_found_; + auto bc = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id()); + if (bc) { + bc->AwakeWatched(t->GetDbIndex(), dest_key_); } if (shard->journal()) { @@ -534,8 +533,8 @@ OpResult OpDump(const OpArgs& op_args, string_view key) { return OpStatus::KEY_NOTFOUND; } -OpResult OpRestore(const OpArgs& op_args, std::string_view key, std::string_view payload, - RestoreArgs restore_args, RdbVersion rdb_version) { +OpStatus OpRestore(const OpArgs& op_args, std::string_view key, std::string_view payload, + RestoreArgs restore_args, RdbVersion rdb_version) { if (!restore_args.UpdateExpiration(op_args.db_cntx.time_now_ms)) { return OpStatus::OUT_OF_RANGE; } @@ -563,18 +562,17 @@ OpResult OpRestore(const OpArgs& op_args, std::string_view key, std::strin if (restore_args.Expired()) { VLOG(1) << "the new key '" << key << "' already expired, will not save the value"; - return true; + return OpStatus::OK; } RdbRestoreValue loader(rdb_version); - auto [res_it, is_new] = - loader.Add(key, payload, op_args.db_cntx, restore_args, &db_slice, op_args.shard); - LOG_IF(DFATAL, res_it.IsValid() && !is_new) + auto add_res = loader.Add(key, payload, op_args.db_cntx, restore_args, &db_slice); + LOG_IF(DFATAL, add_res && !add_res->is_new) << "Unexpected override for key " << key << ", found previous " << found_prev << " override: " << restore_args.Replace() - << ", type: " << ObjTypeToString(res_it.it->second.ObjType()); + << ", type: " << ObjTypeToString(add_res->it->second.ObjType()); - return res_it.IsValid(); + return add_res.status(); } bool ScanCb(const OpArgs& op_args, PrimeIterator prime_it, const ScanOpts& opts, string* scratch, @@ -1505,18 +1503,17 @@ void GenericFamily::Restore(CmdArgList args, const CommandContext& cmd_cntx) { rdb_version.value()); }; - OpResult result = cmd_cntx.tx->ScheduleSingleHopT(std::move(cb)); + OpStatus result = cmd_cntx.tx->ScheduleSingleHop(std::move(cb)); - if (result) { - if (result.value()) { + switch (result) { + case OpStatus::OK: return builder->SendOk(); - } else { + case OpStatus::KEY_EXISTS: + return builder->SendError("BUSYKEY Target key name already exists."); + case OpStatus::INVALID_VALUE: return builder->SendError("Bad data format"); - } - } else if (result.status() == OpStatus::KEY_EXISTS) { - return builder->SendError("BUSYKEY: key name already exists."); - } else { - return builder->SendError(result.status()); + default: + return builder->SendError(result); } } diff --git a/src/server/generic_family_test.cc b/src/server/generic_family_test.cc index 487a16061..68d54ea6f 100644 --- a/src/server/generic_family_test.cc +++ b/src/server/generic_family_test.cc @@ -698,6 +698,13 @@ TEST_F(GenericFamilyTest, Restore) { EXPECT_EQ(resp.GetString(), "OK"); resp = Run({"zrange", "my-zset", "0", "-1"}); EXPECT_EQ("elon", resp.GetString()); + + // corrupt the dump file but keep the crc correct. + ZSET_LISTPACK_DUMP[0] = 0x12; + uint8_t crc64[8] = {0x4e, 0xa3, 0x4c, 0x89, 0xc4, 0x8b, 0xd9, 0xe4}; + memcpy(ZSET_LISTPACK_DUMP + 19, crc64, 8); + resp = Run({"restore", "invalid", "0", ToSV(ZSET_LISTPACK_DUMP)}); + EXPECT_THAT(resp, ErrArg("ERR Bad data format")); } TEST_F(GenericFamilyTest, Info) { @@ -845,4 +852,24 @@ TEST_F(GenericFamilyTest, ExpireTime) { EXPECT_EQ(expire_time_in_ms, CheckedInt({"PEXPIRETIME", "foo"})); } +TEST_F(GenericFamilyTest, RestoreOOM) { + max_memory_limit = 20000000; + Run({"set", "src", string(5000, 'x')}); + auto resp = Run({"dump", "src"}); + + string dump = resp.GetString(); + + // Let Dragonfly propagate max_memory_limit to shards. It does not have to be precise, + // the loop should have enough time for the internal processes to progress. + usleep(10000); + unsigned i = 0; + for (; i < 10000; ++i) { + resp = Run({"restore", absl::StrCat("dst", i), "0", dump}); + if (resp != "OK") + break; + } + ASSERT_LT(i, 10000); + EXPECT_THAT(resp, ErrArg("Out of memory")); +} + } // namespace dfly