fix: do not check-fail in OpRestore (#4332)

fix: do not check-fail OpRestore

In some rare cases we reach inconsistent state inside OpRestore where a key already exists, though it should not.
In that case log the error instead of crashing the server. In addition, we update the existing entry to the latest restored value.

Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
Roman Gershman 2024-12-18 09:53:03 +02:00 committed by GitHub
parent bf410b6e0b
commit c22c9448b5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 52 additions and 33 deletions

View file

@ -281,6 +281,10 @@ class DbSlice {
Iterator it; Iterator it;
ExpIterator exp_it; ExpIterator exp_it;
AutoUpdater post_updater; AutoUpdater post_updater;
bool IsValid() const {
return !it.is_done();
}
}; };
ItAndUpdater FindMutable(const Context& cntx, std::string_view key); ItAndUpdater FindMutable(const Context& cntx, std::string_view key);

View file

@ -120,24 +120,28 @@ class RestoreArgs {
: expiration_(expiration), abs_time_(abs_time), replace_(replace) { : expiration_(expiration), abs_time_(abs_time), replace_(replace) {
} }
constexpr bool Replace() const { bool Replace() const {
return replace_; return replace_;
} }
constexpr bool Sticky() const { bool Sticky() const {
return sticky_; return sticky_;
} }
void SetSticky(bool sticky) {
sticky_ = sticky;
}
uint64_t ExpirationTime() const { uint64_t ExpirationTime() const {
DCHECK_GE(expiration_, 0); DCHECK_GE(expiration_, 0);
return expiration_; return expiration_;
} }
[[nodiscard]] constexpr bool Expired() const { bool Expired() const {
return expiration_ < 0; return expiration_ < 0;
} }
[[nodiscard]] constexpr bool HasExpiration() const { bool HasExpiration() const {
return expiration_ != NO_EXPIRATION; return expiration_ != NO_EXPIRATION;
} }
@ -152,9 +156,12 @@ class RdbRestoreValue : protected RdbLoaderBase {
rdb_version_ = rdb_version; rdb_version_ = rdb_version;
} }
std::optional<DbSlice::ItAndUpdater> Add(std::string_view payload, std::string_view key, // Returns default ItAndUpdater if Add failed.
DbSlice& db_slice, const DbContext& cntx, // In case a valid ItAndUpdater is returned, then second is true in case a new key is added,
const RestoreArgs& args, EngineShard* shard); // false if the existing key is updated (should not happen unless we have a bug).
pair<DbSlice::ItAndUpdater, bool> Add(string_view key, string_view payload, const DbContext& cntx,
const RestoreArgs& args, DbSlice* db_slice,
EngineShard* shard);
private: private:
std::optional<OpaqueObj> Parse(io::Source* source); std::optional<OpaqueObj> Parse(io::Source* source);
@ -185,10 +192,9 @@ std::optional<RdbLoaderBase::OpaqueObj> RdbRestoreValue::Parse(io::Source* sourc
return std::optional<OpaqueObj>(std::move(obj)); return std::optional<OpaqueObj>(std::move(obj));
} }
std::optional<DbSlice::ItAndUpdater> RdbRestoreValue::Add(std::string_view data, pair<DbSlice::ItAndUpdater, bool> RdbRestoreValue::Add(string_view key, string_view data,
std::string_view key, DbSlice& db_slice,
const DbContext& cntx, const DbContext& cntx,
const RestoreArgs& args, const RestoreArgs& args, DbSlice* db_slice,
EngineShard* shard) { EngineShard* shard) {
InMemSource data_src(data); InMemSource data_src(data);
PrimeValue pv; PrimeValue pv;
@ -196,7 +202,7 @@ std::optional<DbSlice::ItAndUpdater> RdbRestoreValue::Add(std::string_view data,
do { do {
auto opaque_res = Parse(&data_src); auto opaque_res = Parse(&data_src);
if (!opaque_res) { if (!opaque_res) {
return std::nullopt; return {};
} }
LoadConfig config; LoadConfig config;
@ -212,16 +218,18 @@ std::optional<DbSlice::ItAndUpdater> RdbRestoreValue::Add(std::string_view data,
if (auto ec = FromOpaque(*opaque_res, config, &pv); ec) { if (auto ec = FromOpaque(*opaque_res, config, &pv); ec) {
// we failed - report and exit // we failed - report and exit
LOG(WARNING) << "error while trying to read data: " << ec; LOG(WARNING) << "error while trying to read data: " << ec;
return std::nullopt; return {};
} }
} while (pending_read_.remaining > 0); } while (pending_read_.remaining > 0);
if (auto res = db_slice.AddNew(cntx, key, std::move(pv), args.ExpirationTime()); res) { if (auto res = db_slice->AddOrUpdate(cntx, key, std::move(pv), args.ExpirationTime()); res) {
res->it->first.SetSticky(args.Sticky()); res->it->first.SetSticky(args.Sticky());
shard->search_indices()->AddDoc(key, cntx, res->it->second); shard->search_indices()->AddDoc(key, cntx, res->it->second);
return std::move(res.value()); return {DbSlice::ItAndUpdater{
.it = res->it, .exp_it = res->exp_it, .post_updater = std::move(res->post_updater)},
res->is_new};
} }
return std::nullopt; return {};
} }
[[nodiscard]] bool RestoreArgs::UpdateExpiration(int64_t now_msec) { [[nodiscard]] bool RestoreArgs::UpdateExpiration(int64_t now_msec) {
@ -467,14 +475,14 @@ OpStatus Renamer::DeserializeDest(Transaction* t, EngineShard* shard) {
return OpStatus::OK; return OpStatus::OK;
} }
restore_args.SetSticky(serialized_value_.sticky);
RdbRestoreValue loader(serialized_value_.version.value()); RdbRestoreValue loader(serialized_value_.version.value());
auto restored_dest_it = loader.Add(serialized_value_.value, dest_key_, db_slice, op_args.db_cntx, auto [restored_dest_it, is_new] = loader.Add(dest_key_, serialized_value_.value, op_args.db_cntx,
restore_args, shard); restore_args, &db_slice, shard);
if (restored_dest_it) {
auto& dest_it = restored_dest_it->it;
dest_it->first.SetSticky(serialized_value_.sticky);
if (restored_dest_it.IsValid()) {
LOG_IF(DFATAL, !is_new) << "Unexpected override for key " << dest_key_ << " " << dest_found_;
auto bc = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id()); auto bc = op_args.db_cntx.ns->GetBlockingController(op_args.shard->shard_id());
if (bc) { if (bc) {
bc->AwakeWatched(t->GetDbIndex(), dest_key_); bc->AwakeWatched(t->GetDbIndex(), dest_key_);
@ -527,27 +535,28 @@ OpResult<std::string> OpDump(const OpArgs& op_args, string_view key) {
return OpStatus::KEY_NOTFOUND; return OpStatus::KEY_NOTFOUND;
} }
OpResult<bool> OnRestore(const OpArgs& op_args, std::string_view key, std::string_view payload, OpResult<bool> OpRestore(const OpArgs& op_args, std::string_view key, std::string_view payload,
RestoreArgs restore_args, RdbVersion rdb_version) { RestoreArgs restore_args, RdbVersion rdb_version) {
if (!restore_args.UpdateExpiration(op_args.db_cntx.time_now_ms)) { if (!restore_args.UpdateExpiration(op_args.db_cntx.time_now_ms)) {
return OpStatus::OUT_OF_RANGE; return OpStatus::OUT_OF_RANGE;
} }
auto& db_slice = op_args.GetDbSlice(); auto& db_slice = op_args.GetDbSlice();
bool found_prev = false;
// The redis impl (see cluster.c function restoreCommand), remove the old key if // The redis impl (see cluster.c function restoreCommand), remove the old key if
// the replace option is set, so lets do the same here // the replace option is set, so lets do the same here
{ {
auto res = db_slice.FindMutable(op_args.db_cntx, key); auto res = db_slice.FindMutable(op_args.db_cntx, key);
if (restore_args.Replace()) {
if (IsValid(res.it)) { if (IsValid(res.it)) {
found_prev = true;
if (restore_args.Replace()) {
VLOG(1) << "restore command is running with replace, found old key '" << key VLOG(1) << "restore command is running with replace, found old key '" << key
<< "' and removing it"; << "' and removing it";
res.post_updater.Run(); res.post_updater.Run();
CHECK(db_slice.Del(op_args.db_cntx, res.it)); CHECK(db_slice.Del(op_args.db_cntx, res.it));
}
} else { } else {
// we are not allowed to replace it, so make sure it doesn't exist // we are not allowed to replace it.
if (IsValid(res.it)) {
return OpStatus::KEY_EXISTS; return OpStatus::KEY_EXISTS;
} }
} }
@ -559,8 +568,14 @@ OpResult<bool> OnRestore(const OpArgs& op_args, std::string_view key, std::strin
} }
RdbRestoreValue loader(rdb_version); RdbRestoreValue loader(rdb_version);
auto res = loader.Add(payload, key, db_slice, op_args.db_cntx, restore_args, op_args.shard); auto [res_it, is_new] =
return res.has_value(); loader.Add(key, payload, op_args.db_cntx, restore_args, &db_slice, op_args.shard);
LOG_IF(DFATAL, res_it.IsValid() && !is_new)
<< "Unexpected override for key " << key << ", found previous " << found_prev
<< " override: " << restore_args.Replace()
<< ", type: " << ObjTypeToString(res_it.it->second.ObjType());
return res_it.IsValid();
} }
bool ScanCb(const OpArgs& op_args, PrimeIterator prime_it, const ScanOpts& opts, string* scratch, bool ScanCb(const OpArgs& op_args, PrimeIterator prime_it, const ScanOpts& opts, string* scratch,
@ -1476,7 +1491,7 @@ void GenericFamily::Restore(CmdArgList args, const CommandContext& cmd_cntx) {
} }
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return OnRestore(t->GetOpArgs(shard), key, serialized_value, restore_args.value(), return OpRestore(t->GetOpArgs(shard), key, serialized_value, restore_args.value(),
rdb_version.value()); rdb_version.value());
}; };