mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
feat(server): Implement DFLY EXPIRE command (#404)
* feat(server): Implement DFLY EXPIRE command Signed-off-by: Vladislav Oleshko <vlad@dragonflydb.io>
This commit is contained in:
parent
6ad083acae
commit
b5cbed79d7
3 changed files with 34 additions and 0 deletions
|
@ -737,6 +737,28 @@ pair<PrimeIterator, ExpireIterator> DbSlice::ExpireIfNeeded(const Context& cntx,
|
|||
return make_pair(PrimeIterator{}, ExpireIterator{});
|
||||
}
|
||||
|
||||
void DbSlice::ExpireAllIfNeeded() {
|
||||
for (DbIndex db_index = 0; db_index < db_arr_.size(); db_index++) {
|
||||
if (!db_arr_[db_index])
|
||||
continue;
|
||||
auto& db = *db_arr_[db_index];
|
||||
|
||||
auto cb = [&](ExpireTable::iterator exp_it) {
|
||||
auto prime_it = db.prime.Find(exp_it->first);
|
||||
if (!IsValid(prime_it)) {
|
||||
LOG(ERROR) << "Expire entry " << exp_it->first.ToString() << " not found in prime table";
|
||||
return;
|
||||
}
|
||||
ExpireIfNeeded(DbSlice::Context{db_index, GetCurrentTimeMs()}, prime_it);
|
||||
};
|
||||
|
||||
ExpireTable::Cursor cursor;
|
||||
do {
|
||||
cursor = db.expire.Traverse(cursor, cb);
|
||||
} while (cursor);
|
||||
}
|
||||
}
|
||||
|
||||
uint64_t DbSlice::RegisterOnChange(ChangeCallback cb) {
|
||||
uint64_t ver = NextVersion();
|
||||
change_cb_.emplace_back(ver, std::move(cb));
|
||||
|
|
|
@ -238,6 +238,9 @@ class DbSlice {
|
|||
std::pair<PrimeIterator, ExpireIterator> ExpireIfNeeded(const Context& cntx,
|
||||
PrimeIterator it) const;
|
||||
|
||||
// Iterate over all expire table entries and delete expired.
|
||||
void ExpireAllIfNeeded();
|
||||
|
||||
// Current version of this slice.
|
||||
// We maintain a shared versioning scheme for all databases in the slice.
|
||||
uint64_t version() const {
|
||||
|
|
|
@ -110,6 +110,15 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
if (sub_cmd == "EXPIRE") {
|
||||
cntx->transaction->ScheduleSingleHop([](Transaction* t, EngineShard* shard){
|
||||
shard->db_slice().ExpireAllIfNeeded();
|
||||
return OpStatus::OK;
|
||||
});
|
||||
|
||||
return rb->SendOk();
|
||||
}
|
||||
|
||||
rb->SendError(kSyntaxErr);
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue