mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
fix(tiering): fix tiering crash on setting expire (#2285)
fixes #2224 the bug: when updateing entry pre update will resets the PrimeValue if this is an external entries leading to crash if we insert entries with expire time. the fix: reserve the expire mask in PrimeValue on pre update Signed-off-by: adi_holden <adi@dragonflydb.io>
This commit is contained in:
parent
1ce3f983c9
commit
dc3bc9e92f
3 changed files with 58 additions and 9 deletions
|
@ -1013,7 +1013,9 @@ void DbSlice::PreUpdate(DbIndex db_ind, PrimeIterator it) {
|
||||||
TieredStorage* tiered = shard_owner()->tiered_storage();
|
TieredStorage* tiered = shard_owner()->tiered_storage();
|
||||||
auto [offset, size] = it->second.GetExternalSlice();
|
auto [offset, size] = it->second.GetExternalSlice();
|
||||||
tiered->Free(offset, size);
|
tiered->Free(offset, size);
|
||||||
|
bool has_expire = it->second.HasExpire();
|
||||||
it->second.Reset();
|
it->second.Reset();
|
||||||
|
it->second.SetExpire(has_expire); // we keep expire data
|
||||||
|
|
||||||
stats->tiered_entries -= 1;
|
stats->tiered_entries -= 1;
|
||||||
stats->tiered_size -= size;
|
stats->tiered_size -= size;
|
||||||
|
|
|
@ -277,12 +277,7 @@ unsigned TieredStorage::InflightWriteRequest::ExternalizeEntries(PerDb::BinRecor
|
||||||
if (it != bin_record->enqueued_entries.end() && it->second == this) {
|
if (it != bin_record->enqueued_entries.end() && it->second == this) {
|
||||||
PrimeIterator pit = pt->Find(pkey);
|
PrimeIterator pit = pt->Find(pkey);
|
||||||
size_t item_offset = page_index_ * 4096 + offset + i * bin_size;
|
size_t item_offset = page_index_ * 4096 + offset + i * bin_size;
|
||||||
|
CHECK(!pit.is_done());
|
||||||
// TODO: the key may be deleted or overriden. The last one is especially dangerous.
|
|
||||||
// we should update active pending request with any change we make to the entry.
|
|
||||||
// it should not be a problem since we have HasIoPending tag that mean we must
|
|
||||||
// update the inflight request (or mark the entry as cancelled).
|
|
||||||
CHECK(!pit.is_done()) << "TBD";
|
|
||||||
|
|
||||||
ExternalizeEntry(item_offset, stats, &pit->second);
|
ExternalizeEntry(item_offset, stats, &pit->second);
|
||||||
VLOG(2) << "ExternalizeEntry: " << it->first;
|
VLOG(2) << "ExternalizeEntry: " << it->first;
|
||||||
|
@ -300,9 +295,7 @@ void TieredStorage::InflightWriteRequest::Undo(PerDb::BinRecord* bin_record, DbS
|
||||||
if (it != bin_record->enqueued_entries.end() && it->second == this) {
|
if (it != bin_record->enqueued_entries.end() && it->second == this) {
|
||||||
PrimeIterator pit = pt->Find(pkey);
|
PrimeIterator pit = pt->Find(pkey);
|
||||||
|
|
||||||
// TODO: what happens when if the entry was deleted meanwhile
|
CHECK(pit->second.HasIoPending());
|
||||||
// or it has been serialized again?
|
|
||||||
CHECK(pit->second.HasIoPending()) << "TBD: fix inconsistencies";
|
|
||||||
VLOG(2) << "Undo key:" << pkey;
|
VLOG(2) << "Undo key:" << pkey;
|
||||||
pit->second.SetIoPending(false);
|
pit->second.SetIoPending(false);
|
||||||
|
|
||||||
|
|
|
@ -25,6 +25,7 @@ class TieredStorageTest : public BaseFamilyTest {
|
||||||
}
|
}
|
||||||
|
|
||||||
void FillExternalKeys(unsigned count, int val_size = 256);
|
void FillExternalKeys(unsigned count, int val_size = 256);
|
||||||
|
void FillKeysWithExpire(unsigned count, int val_size = 256, uint32_t expire = 3);
|
||||||
|
|
||||||
static void SetUpTestSuite();
|
static void SetUpTestSuite();
|
||||||
};
|
};
|
||||||
|
@ -62,6 +63,13 @@ void TieredStorageTest::FillExternalKeys(unsigned count, int val_size) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void TieredStorageTest::FillKeysWithExpire(unsigned count, int val_size, uint32_t expire) {
|
||||||
|
string val(val_size, 'a');
|
||||||
|
for (unsigned i = 0; i < count; ++i) {
|
||||||
|
Run({"set", StrCat("k", i), val, "ex", StrCat(expire)});
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(TieredStorageTest, Basic) {
|
TEST_F(TieredStorageTest, Basic) {
|
||||||
FillExternalKeys(5000);
|
FillExternalKeys(5000);
|
||||||
|
|
||||||
|
@ -198,4 +206,50 @@ TEST_F(TieredStorageTest, DelBigValues) {
|
||||||
EXPECT_GT(m.db_stats[0].tiered_entries, 0u);
|
EXPECT_GT(m.db_stats[0].tiered_entries, 0u);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(TieredStorageTest, AddBigValuesWithExpire) {
|
||||||
|
const int kKeyNum = 10;
|
||||||
|
for (int i = 0; i < 3; ++i) {
|
||||||
|
FillKeysWithExpire(kKeyNum, 8000);
|
||||||
|
usleep(20000); // 0.02 milliseconds
|
||||||
|
|
||||||
|
Metrics m = GetMetrics();
|
||||||
|
EXPECT_EQ(m.db_stats[0].tiered_entries, 10);
|
||||||
|
}
|
||||||
|
for (int i = 0; i < kKeyNum; ++i) {
|
||||||
|
auto resp = Run({"ttl", StrCat("k", i)});
|
||||||
|
EXPECT_GT(resp.GetInt(), 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(TieredStorageTest, AddSmallValuesWithExpire) {
|
||||||
|
const int kKeyNum = 100;
|
||||||
|
for (int i = 0; i < 3; ++i) {
|
||||||
|
FillKeysWithExpire(kKeyNum);
|
||||||
|
usleep(20000); // 0.02 milliseconds
|
||||||
|
|
||||||
|
Metrics m = GetMetrics();
|
||||||
|
EXPECT_GT(m.db_stats[0].tiered_entries, 0);
|
||||||
|
}
|
||||||
|
for (int i = 0; i < kKeyNum; ++i) {
|
||||||
|
auto resp = Run({"ttl", StrCat("k", i)});
|
||||||
|
EXPECT_GT(resp.GetInt(), 0);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
TEST_F(TieredStorageTest, SetAndExpire) {
|
||||||
|
string val(5000, 'a');
|
||||||
|
Run({"set", "key", val});
|
||||||
|
usleep(20000); // 0.02 milliseconds
|
||||||
|
|
||||||
|
Metrics m = GetMetrics();
|
||||||
|
EXPECT_EQ(m.db_stats[0].tiered_entries, 1);
|
||||||
|
Run({"expire", "key", "3"});
|
||||||
|
|
||||||
|
Run({"set", "key", val});
|
||||||
|
usleep(20000); // 0.02 milliseconds
|
||||||
|
|
||||||
|
m = GetMetrics();
|
||||||
|
EXPECT_EQ(m.db_stats[0].tiered_entries, 1);
|
||||||
|
Run({"expire", "key", "3"});
|
||||||
|
}
|
||||||
} // namespace dfly
|
} // namespace dfly
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue