mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
Feat bzpopmin (#1232)
* chore: Refactor out common code from BPop ops * feat: Implement blocking ZSET commands * Add tests for blocking ZSET commands. * Refactor changes into container_utils
This commit is contained in:
parent
e8fb0b9580
commit
6697478a28
10 changed files with 356 additions and 213 deletions
|
@ -212,7 +212,9 @@ void BlockingController::NotifyPending() {
|
|||
|
||||
// Double verify we still got the item.
|
||||
auto [it, exp_it] = owner_->db_slice().FindExt(context, sv_key);
|
||||
if (!IsValid(it) || it->second.ObjType() != OBJ_LIST) // Only LIST is allowed to block.
|
||||
if (!IsValid(it) ||
|
||||
!(it->second.ObjType() == OBJ_LIST ||
|
||||
it->second.ObjType() == OBJ_ZSET)) // Only LIST and ZSET are allowed to block.
|
||||
continue;
|
||||
|
||||
NotifyWatchQueue(sv_key, &wt.queue_map);
|
||||
|
|
|
@ -20,7 +20,7 @@ namespace CO {
|
|||
|
||||
enum CommandOpt : uint32_t {
|
||||
READONLY = 1U << 0,
|
||||
FAST = 1U << 1,
|
||||
FAST = 1U << 1, // Unused?
|
||||
WRITE = 1U << 2,
|
||||
LOADING = 1U << 3, // Command allowed during LOADING state.
|
||||
DENYOOM = 1U << 4, // use-memory in redis.
|
||||
|
|
|
@ -6,6 +6,10 @@
|
|||
#include "base/logging.h"
|
||||
#include "core/string_map.h"
|
||||
#include "core/string_set.h"
|
||||
#include "server/engine_shard_set.h"
|
||||
#include "server/server_state.h"
|
||||
#include "server/transaction.h"
|
||||
#include "src/facade/op_status.h"
|
||||
|
||||
extern "C" {
|
||||
#include "redis/intset.h"
|
||||
|
@ -188,4 +192,126 @@ string_view LpGetView(uint8_t* lp_it, uint8_t int_buf[]) {
|
|||
return std::string_view{reinterpret_cast<char*>(elem), size_t(ele_len)};
|
||||
}
|
||||
|
||||
OpResult<ShardFFResult> FindFirstNonEmptyKey(Transaction* trans, int req_obj_type) {
|
||||
using FFResult = std::pair<PrimeKey, unsigned>; // key, argument index.
|
||||
VLOG(2) << "FindFirst::Find " << trans->DebugId();
|
||||
|
||||
// Holds Find results: (iterator to a found key, and its index in the passed arguments).
|
||||
// See DbSlice::FindFirst for more details.
|
||||
// spans all the shards for now.
|
||||
std::vector<OpResult<FFResult>> find_res(shard_set->size());
|
||||
std::fill(find_res.begin(), find_res.end(), OpStatus::KEY_NOTFOUND);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
auto args = t->GetShardArgs(shard->shard_id());
|
||||
OpResult<std::pair<PrimeIterator, unsigned>> ff_res =
|
||||
shard->db_slice().FindFirst(t->GetDbContext(), args, req_obj_type);
|
||||
|
||||
if (ff_res) {
|
||||
FFResult ff_result(ff_res->first->first.AsRef(), ff_res->second);
|
||||
find_res[shard->shard_id()] = std::move(ff_result);
|
||||
} else {
|
||||
find_res[shard->shard_id()] = ff_res.status();
|
||||
}
|
||||
|
||||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
trans->Execute(std::move(cb), false);
|
||||
|
||||
uint32_t min_arg_indx = UINT32_MAX;
|
||||
ShardFFResult shard_result;
|
||||
|
||||
// We iterate over all results to find the key with the minimal arg_index
|
||||
// after reversing the arg indexing permutation.
|
||||
for (size_t sid = 0; sid < find_res.size(); ++sid) {
|
||||
const auto& fr = find_res[sid];
|
||||
auto status = fr.status();
|
||||
if (status == OpStatus::KEY_NOTFOUND)
|
||||
continue;
|
||||
if (status == OpStatus::WRONG_TYPE) {
|
||||
return status;
|
||||
}
|
||||
CHECK(fr);
|
||||
|
||||
const auto& it_pos = fr.value();
|
||||
|
||||
size_t arg_indx = trans->ReverseArgIndex(sid, it_pos.second);
|
||||
if (arg_indx < min_arg_indx) {
|
||||
min_arg_indx = arg_indx;
|
||||
shard_result.sid = sid;
|
||||
|
||||
// we do not dereference the key, do not extract the string value, so it it
|
||||
// ok to just move it. We can not dereference it due to limitations of SmallString
|
||||
// that rely on thread-local data-structure for pointer translation.
|
||||
shard_result.key = it_pos.first.AsRef();
|
||||
}
|
||||
}
|
||||
|
||||
if (shard_result.sid == kInvalidSid) {
|
||||
return OpStatus::KEY_NOTFOUND;
|
||||
}
|
||||
|
||||
return OpResult<ShardFFResult>{std::move(shard_result)};
|
||||
}
|
||||
|
||||
// If OK is returned then cb was called on the first non empty key and `out_key` is set to the key.
|
||||
facade::OpStatus RunCbOnFirstNonEmptyBlocking(BlockingResultCb&& func, std::string* out_key,
|
||||
Transaction* trans, int req_obj_type,
|
||||
unsigned limit_ms) {
|
||||
auto limit_tp = limit_ms ? std::chrono::steady_clock::now() + std::chrono::milliseconds(limit_ms)
|
||||
: Transaction::time_point::max();
|
||||
bool is_multi = trans->IsMulti();
|
||||
trans->Schedule();
|
||||
|
||||
ShardFFResult ff_result;
|
||||
OpResult<ShardFFResult> result = FindFirstNonEmptyKey(trans, req_obj_type);
|
||||
|
||||
if (result.ok()) {
|
||||
ff_result = std::move(result.value());
|
||||
} else if (result.status() == OpStatus::KEY_NOTFOUND) {
|
||||
// Close transaction and return.
|
||||
if (is_multi) {
|
||||
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
|
||||
trans->Execute(std::move(cb), true);
|
||||
return OpStatus::TIMED_OUT;
|
||||
}
|
||||
|
||||
auto wcb = [](Transaction* t, EngineShard* shard) {
|
||||
return t->GetShardArgs(shard->shard_id());
|
||||
};
|
||||
|
||||
VLOG(1) << "Blocking BLPOP " << trans->DebugId();
|
||||
auto* stats = ServerState::tl_connection_stats();
|
||||
++stats->num_blocked_clients;
|
||||
bool wait_succeeded = trans->WaitOnWatch(limit_tp, std::move(wcb));
|
||||
--stats->num_blocked_clients;
|
||||
|
||||
if (!wait_succeeded)
|
||||
return OpStatus::TIMED_OUT;
|
||||
} else {
|
||||
// Could be the wrong-type error.
|
||||
// cleanups, locks removal etc.
|
||||
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
|
||||
trans->Execute(std::move(cb), true);
|
||||
|
||||
DCHECK_NE(result.status(), OpStatus::KEY_NOTFOUND);
|
||||
return result.status();
|
||||
}
|
||||
|
||||
auto cb = [&func, &ff_result, out_key](Transaction* t, EngineShard* shard) {
|
||||
if (auto wake_key = t->GetWakeKey(shard->shard_id()); wake_key) {
|
||||
*out_key = *wake_key;
|
||||
func(t, shard, *out_key);
|
||||
} else if (shard->shard_id() == ff_result.sid) {
|
||||
ff_result.key.GetString(out_key);
|
||||
func(t, shard, *out_key);
|
||||
}
|
||||
return OpStatus::OK;
|
||||
};
|
||||
trans->Execute(std::move(cb), true);
|
||||
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
} // namespace dfly::container_utils
|
||||
|
|
|
@ -81,6 +81,18 @@ std::string_view LpGetView(uint8_t* lp_it, uint8_t int_buf[]);
|
|||
// Find value by key and return stringview to it, otherwise nullopt.
|
||||
std::optional<std::string_view> LpFind(uint8_t* lp, std::string_view key, uint8_t int_buf[]);
|
||||
|
||||
struct ShardFFResult {
|
||||
PrimeKey key;
|
||||
ShardId sid = kInvalidSid;
|
||||
};
|
||||
|
||||
OpResult<ShardFFResult> FindFirstNonEmptyKey(Transaction* trans, int req_obj_type);
|
||||
|
||||
using BlockingResultCb = std::function<void(Transaction*, EngineShard*, std::string_view)>;
|
||||
facade::OpStatus RunCbOnFirstNonEmptyBlocking(BlockingResultCb&& cb, std::string* out_key,
|
||||
Transaction* trans, int req_obj_type,
|
||||
unsigned limit_ms);
|
||||
|
||||
}; // namespace container_utils
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -341,12 +341,13 @@ pair<PrimeIterator, ExpireIterator> DbSlice::FindExt(const Context& cntx, string
|
|||
return res;
|
||||
}
|
||||
|
||||
OpResult<pair<PrimeIterator, unsigned>> DbSlice::FindFirst(const Context& cntx, ArgSlice args) {
|
||||
OpResult<pair<PrimeIterator, unsigned>> DbSlice::FindFirst(const Context& cntx, ArgSlice args,
|
||||
int req_obj_type) {
|
||||
DCHECK(!args.empty());
|
||||
|
||||
for (unsigned i = 0; i < args.size(); ++i) {
|
||||
string_view s = args[i];
|
||||
OpResult<PrimeIterator> res = Find(cntx, s, OBJ_LIST);
|
||||
OpResult<PrimeIterator> res = Find(cntx, s, req_obj_type);
|
||||
if (res)
|
||||
return make_pair(res.value(), i);
|
||||
if (res.status() != OpStatus::KEY_NOTFOUND)
|
||||
|
|
|
@ -149,7 +149,8 @@ class DbSlice {
|
|||
|
||||
// Returns (iterator, args-index) if found, KEY_NOTFOUND otherwise.
|
||||
// If multiple keys are found, returns the first index in the ArgSlice.
|
||||
OpResult<std::pair<PrimeIterator, unsigned>> FindFirst(const Context& cntx, ArgSlice args);
|
||||
OpResult<std::pair<PrimeIterator, unsigned>> FindFirst(const Context& cntx, ArgSlice args,
|
||||
int req_obj_type);
|
||||
|
||||
// Return .second=true if insertion occurred, false if we return the existing key.
|
||||
// throws: bad_alloc is insertion could not happen due to out of memory.
|
||||
|
|
|
@ -128,110 +128,13 @@ bool ElemCompare(const quicklistEntry& entry, string_view elem) {
|
|||
return elem == an.Piece();
|
||||
}
|
||||
|
||||
using FFResult = pair<PrimeKey, unsigned>; // key, argument index.
|
||||
|
||||
struct ShardFFResult {
|
||||
PrimeKey key;
|
||||
ShardId sid = kInvalidSid;
|
||||
};
|
||||
|
||||
// Used by bpopper.
|
||||
OpResult<ShardFFResult> FindFirst(Transaction* trans) {
|
||||
VLOG(2) << "FindFirst::Find " << trans->DebugId();
|
||||
|
||||
// Holds Find results: (iterator to a found key, and its index in the passed arguments).
|
||||
// See DbSlice::FindFirst for more details.
|
||||
// spans all the shards for now.
|
||||
std::vector<OpResult<FFResult>> find_res(shard_set->size());
|
||||
fill(find_res.begin(), find_res.end(), OpStatus::KEY_NOTFOUND);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
auto args = t->GetShardArgs(shard->shard_id());
|
||||
|
||||
OpResult<pair<PrimeIterator, unsigned>> ff_res =
|
||||
shard->db_slice().FindFirst(t->GetDbContext(), args);
|
||||
|
||||
if (ff_res) {
|
||||
FFResult ff_result(ff_res->first->first.AsRef(), ff_res->second);
|
||||
find_res[shard->shard_id()] = move(ff_result);
|
||||
} else {
|
||||
find_res[shard->shard_id()] = ff_res.status();
|
||||
}
|
||||
|
||||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
trans->Execute(move(cb), false);
|
||||
|
||||
uint32_t min_arg_indx = UINT32_MAX;
|
||||
|
||||
ShardFFResult shard_result;
|
||||
|
||||
for (size_t sid = 0; sid < find_res.size(); ++sid) {
|
||||
const auto& fr = find_res[sid];
|
||||
auto status = fr.status();
|
||||
if (status == OpStatus::KEY_NOTFOUND)
|
||||
continue;
|
||||
|
||||
if (status == OpStatus::WRONG_TYPE) {
|
||||
return status;
|
||||
}
|
||||
|
||||
CHECK(fr);
|
||||
|
||||
const auto& it_pos = fr.value();
|
||||
|
||||
size_t arg_indx = trans->ReverseArgIndex(sid, it_pos.second);
|
||||
if (arg_indx < min_arg_indx) {
|
||||
min_arg_indx = arg_indx;
|
||||
shard_result.sid = sid;
|
||||
|
||||
// we do not dereference the key, do not extract the string value, so it it
|
||||
// ok to just move it. We can not dereference it due to limitations of SmallString
|
||||
// that rely on thread-local data-structure for pointer translation.
|
||||
shard_result.key = it_pos.first.AsRef();
|
||||
}
|
||||
}
|
||||
|
||||
if (shard_result.sid == kInvalidSid) {
|
||||
return OpStatus::KEY_NOTFOUND;
|
||||
}
|
||||
|
||||
return OpResult<ShardFFResult>{move(shard_result)};
|
||||
}
|
||||
|
||||
class BPopper {
|
||||
public:
|
||||
explicit BPopper(ListDir dir);
|
||||
|
||||
// Returns WRONG_TYPE, OK.
|
||||
// If OK is returned then use result() to fetch the value.
|
||||
OpStatus Run(Transaction* t, unsigned msec);
|
||||
|
||||
// returns (key, value) pair.
|
||||
auto result() const {
|
||||
return make_pair<string_view, string_view>(key_, value_);
|
||||
}
|
||||
|
||||
private:
|
||||
void Pop(Transaction* t, EngineShard* shard);
|
||||
void OpPop(Transaction* t, EngineShard* shard);
|
||||
|
||||
ListDir dir_;
|
||||
|
||||
ShardFFResult ff_result_;
|
||||
|
||||
string key_;
|
||||
string value_;
|
||||
};
|
||||
|
||||
class BPopPusher {
|
||||
public:
|
||||
BPopPusher(string_view pop_key, string_view push_key, ListDir popdir, ListDir pushdir);
|
||||
|
||||
// Returns WRONG_TYPE, OK.
|
||||
// If OK is returned then use result() to fetch the value.
|
||||
OpResult<string> Run(Transaction* t, unsigned msec);
|
||||
OpResult<string> Run(Transaction* t, unsigned limit_ms);
|
||||
|
||||
private:
|
||||
OpResult<string> RunSingle(Transaction* t, time_point tp);
|
||||
|
@ -241,90 +144,30 @@ class BPopPusher {
|
|||
ListDir popdir_, pushdir_;
|
||||
};
|
||||
|
||||
BPopper::BPopper(ListDir dir) : dir_(dir) {
|
||||
}
|
||||
|
||||
OpStatus BPopper::Run(Transaction* trans, unsigned msec) {
|
||||
auto tp = msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max();
|
||||
bool is_multi = trans->IsMulti();
|
||||
|
||||
trans->Schedule();
|
||||
|
||||
auto* stats = ServerState::tl_connection_stats();
|
||||
|
||||
OpResult<ShardFFResult> result = FindFirst(trans);
|
||||
|
||||
if (result.ok()) {
|
||||
ff_result_ = move(result.value());
|
||||
} else if (result.status() == OpStatus::KEY_NOTFOUND) {
|
||||
// Close transaction and return.
|
||||
if (is_multi) {
|
||||
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
|
||||
trans->Execute(std::move(cb), true);
|
||||
return OpStatus::TIMED_OUT;
|
||||
}
|
||||
|
||||
auto wcb = [](Transaction* t, EngineShard* shard) {
|
||||
return t->GetShardArgs(shard->shard_id());
|
||||
};
|
||||
|
||||
VLOG(1) << "Blocking BLPOP " << trans->DebugId();
|
||||
++stats->num_blocked_clients;
|
||||
bool wait_succeeded = trans->WaitOnWatch(tp, std::move(wcb));
|
||||
--stats->num_blocked_clients;
|
||||
|
||||
if (!wait_succeeded)
|
||||
return OpStatus::TIMED_OUT;
|
||||
} else {
|
||||
// Could be the wrong-type error.
|
||||
// cleanups, locks removal etc.
|
||||
auto cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
|
||||
trans->Execute(std::move(cb), true);
|
||||
|
||||
DCHECK_NE(result.status(), OpStatus::KEY_NOTFOUND);
|
||||
return result.status();
|
||||
}
|
||||
|
||||
auto cb = [this](Transaction* t, EngineShard* shard) {
|
||||
Pop(t, shard);
|
||||
return OpStatus::OK;
|
||||
};
|
||||
trans->Execute(std::move(cb), true);
|
||||
|
||||
return OpStatus::OK;
|
||||
}
|
||||
|
||||
void BPopper::Pop(Transaction* t, EngineShard* shard) {
|
||||
if (auto wake_key = t->GetWakeKey(shard->shard_id()); wake_key) {
|
||||
key_ = *wake_key;
|
||||
OpPop(t, shard);
|
||||
} else if (shard->shard_id() == ff_result_.sid) {
|
||||
ff_result_.key.GetString(&key_);
|
||||
OpPop(t, shard);
|
||||
}
|
||||
}
|
||||
|
||||
void BPopper::OpPop(Transaction* t, EngineShard* shard) {
|
||||
// Called as a callback from MKBlocking after we've determined which key to pop.
|
||||
std::string OpBPop(Transaction* t, EngineShard* shard, std::string_view key, ListDir dir) {
|
||||
auto& db_slice = shard->db_slice();
|
||||
auto it_res = db_slice.Find(t->GetDbContext(), key_, OBJ_LIST);
|
||||
CHECK(it_res) << t->DebugId() << " " << key_; // must exist and must be ok.
|
||||
auto it_res = db_slice.Find(t->GetDbContext(), key, OBJ_LIST);
|
||||
CHECK(it_res) << t->DebugId() << " " << key; // must exist and must be ok.
|
||||
PrimeIterator it = *it_res;
|
||||
|
||||
quicklist* ql = GetQL(it->second);
|
||||
|
||||
DVLOG(2) << "popping from " << key_ << " " << t->DebugId();
|
||||
DVLOG(2) << "popping from " << key << " " << t->DebugId();
|
||||
db_slice.PreUpdate(t->GetDbIndex(), it);
|
||||
value_ = ListPop(dir_, ql);
|
||||
db_slice.PostUpdate(t->GetDbIndex(), it, key_);
|
||||
std::string value = ListPop(dir, ql);
|
||||
db_slice.PostUpdate(t->GetDbIndex(), it, key);
|
||||
if (quicklistCount(ql) == 0) {
|
||||
DVLOG(1) << "deleting key " << key_ << " " << t->DebugId();
|
||||
DVLOG(1) << "deleting key " << key << " " << t->DebugId();
|
||||
CHECK(shard->db_slice().Del(t->GetDbIndex(), it));
|
||||
}
|
||||
OpArgs op_args = t->GetOpArgs(shard);
|
||||
if (op_args.shard->journal()) {
|
||||
string command = dir_ == ListDir::LEFT ? "LPOP" : "RPOP";
|
||||
RecordJournal(op_args, command, ArgSlice{key_}, 1);
|
||||
string command = dir == ListDir::LEFT ? "LPOP" : "RPOP";
|
||||
RecordJournal(op_args, command, ArgSlice{key}, 1);
|
||||
}
|
||||
|
||||
return value;
|
||||
}
|
||||
|
||||
OpResult<string> OpMoveSingleShard(const OpArgs& op_args, string_view src, string_view dest,
|
||||
|
@ -951,9 +794,9 @@ BPopPusher::BPopPusher(string_view pop_key, string_view push_key, ListDir popdir
|
|||
: pop_key_(pop_key), push_key_(push_key), popdir_(popdir), pushdir_(pushdir) {
|
||||
}
|
||||
|
||||
OpResult<string> BPopPusher::Run(Transaction* t, unsigned msec) {
|
||||
OpResult<string> BPopPusher::Run(Transaction* t, unsigned limit_ms) {
|
||||
time_point tp =
|
||||
msec ? chrono::steady_clock::now() + chrono::milliseconds(msec) : time_point::max();
|
||||
limit_ms ? chrono::steady_clock::now() + chrono::milliseconds(limit_ms) : time_point::max();
|
||||
|
||||
t->Schedule();
|
||||
|
||||
|
@ -1308,16 +1151,17 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn
|
|||
VLOG(1) << "BPop timeout(" << timeout << ")";
|
||||
|
||||
Transaction* transaction = cntx->transaction;
|
||||
BPopper popper(dir);
|
||||
OpStatus result = popper.Run(transaction, unsigned(timeout * 1000));
|
||||
std::string popped_key;
|
||||
std::string popped_value;
|
||||
OpStatus result = container_utils::RunCbOnFirstNonEmptyBlocking(
|
||||
[dir, &popped_value](Transaction* t, EngineShard* shard, std::string_view key) {
|
||||
popped_value = OpBPop(t, shard, key, dir);
|
||||
},
|
||||
&popped_key, transaction, OBJ_LIST, unsigned(timeout * 1000));
|
||||
|
||||
if (result == OpStatus::OK) {
|
||||
auto res = popper.result();
|
||||
|
||||
DVLOG(1) << "BPop " << transaction->DebugId() << " popped from key " << res.first; // key.
|
||||
|
||||
std::string_view str_arr[2] = {res.first, res.second};
|
||||
|
||||
DVLOG(1) << "BPop " << transaction->DebugId() << " popped from key " << popped_key; // key.
|
||||
std::string_view str_arr[2] = {popped_key, popped_value};
|
||||
return (*cntx)->SendStringArr(str_arr);
|
||||
}
|
||||
|
||||
|
|
|
@ -14,6 +14,7 @@ extern "C" {
|
|||
#include "base/logging.h"
|
||||
#include "base/stl_util.h"
|
||||
#include "facade/error.h"
|
||||
#include "server/blocking_controller.h"
|
||||
#include "server/command_registry.h"
|
||||
#include "server/conn_context.h"
|
||||
#include "server/container_utils.h"
|
||||
|
@ -119,6 +120,12 @@ OpResult<PrimeIterator> FindZEntry(const ZParams& zparams, const OpArgs& op_args
|
|||
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
|
||||
}
|
||||
|
||||
if (add_res.second && op_args.shard->blocking_controller()) {
|
||||
string tmp;
|
||||
string_view key = it->first.GetSlice(&tmp);
|
||||
op_args.shard->blocking_controller()->AwakeWatched(op_args.db_cntx.db_index, key);
|
||||
}
|
||||
|
||||
return it;
|
||||
}
|
||||
|
||||
|
@ -1111,8 +1118,99 @@ bool ParseLimit(string_view offset_str, string_view limit_str, ZSetFamily::Range
|
|||
return true;
|
||||
}
|
||||
|
||||
ZSetFamily::ScoredArray OpBZPop(Transaction* t, EngineShard* shard, std::string_view key,
|
||||
bool is_max) {
|
||||
auto& db_slice = shard->db_slice();
|
||||
auto it_res = db_slice.Find(t->GetDbContext(), key, OBJ_ZSET);
|
||||
CHECK(it_res) << t->DebugId() << " " << key; // must exist and must be ok.
|
||||
PrimeIterator it = *it_res;
|
||||
|
||||
ZSetFamily::RangeParams range_params;
|
||||
range_params.reverse = is_max;
|
||||
range_params.with_scores = true;
|
||||
ZSetFamily::ZRangeSpec range_spec;
|
||||
range_spec.params = range_params;
|
||||
range_spec.interval = ZSetFamily::TopNScored(1);
|
||||
|
||||
DVLOG(2) << "popping from " << key << " " << t->DebugId();
|
||||
db_slice.PreUpdate(t->GetDbIndex(), it);
|
||||
robj* zobj = it_res.value()->second.AsRObj();
|
||||
|
||||
IntervalVisitor iv{Action::POP, range_spec.params, zobj};
|
||||
std::visit(iv, range_spec.interval);
|
||||
|
||||
it_res.value()->second.SyncRObj();
|
||||
db_slice.PostUpdate(t->GetDbIndex(), *it_res, key);
|
||||
|
||||
auto zlen = zsetLength(zobj);
|
||||
if (zlen == 0) {
|
||||
DVLOG(1) << "deleting key " << key << " " << t->DebugId();
|
||||
CHECK(db_slice.Del(t->GetDbIndex(), *it_res));
|
||||
}
|
||||
|
||||
OpArgs op_args = t->GetOpArgs(shard);
|
||||
if (op_args.shard->journal()) {
|
||||
string command = is_max ? "ZPOPMAX" : "ZPOPMIN";
|
||||
RecordJournal(op_args, command, ArgSlice{key}, 1);
|
||||
}
|
||||
|
||||
return iv.PopResult();
|
||||
}
|
||||
|
||||
void BZPopMinMax(CmdArgList args, ConnectionContext* cntx, bool is_max) {
|
||||
DCHECK_GE(args.size(), 2u);
|
||||
|
||||
float timeout;
|
||||
auto timeout_str = ArgS(args, args.size() - 1);
|
||||
if (!absl::SimpleAtof(timeout_str, &timeout)) {
|
||||
return (*cntx)->SendError("timeout is not a float or out of range");
|
||||
}
|
||||
if (timeout < 0) {
|
||||
return (*cntx)->SendError("timeout is negative");
|
||||
}
|
||||
VLOG(1) << "BZPop timeout(" << timeout << ")";
|
||||
|
||||
Transaction* transaction = cntx->transaction;
|
||||
std::string popped_key;
|
||||
OpResult<ZSetFamily::ScoredArray> popped_array;
|
||||
OpStatus result = container_utils::RunCbOnFirstNonEmptyBlocking(
|
||||
[is_max, &popped_array](Transaction* t, EngineShard* shard, std::string_view key) {
|
||||
popped_array = OpBZPop(t, shard, key, is_max);
|
||||
},
|
||||
&popped_key, transaction, OBJ_ZSET, unsigned(timeout * 1000));
|
||||
|
||||
if (result == OpStatus::OK) {
|
||||
DVLOG(1) << "BZPop " << transaction->DebugId() << " popped from key " << popped_key; // key.
|
||||
CHECK(popped_array->size() == 1);
|
||||
(*cntx)->StartArray(3);
|
||||
(*cntx)->SendBulkString(popped_key);
|
||||
(*cntx)->SendBulkString(popped_array->front().first);
|
||||
return (*cntx)->SendDouble(popped_array->front().second);
|
||||
}
|
||||
|
||||
DVLOG(1) << "result for " << transaction->DebugId() << " is " << result;
|
||||
|
||||
switch (result) {
|
||||
case OpStatus::WRONG_TYPE:
|
||||
return (*cntx)->SendError(kWrongTypeErr);
|
||||
case OpStatus::TIMED_OUT:
|
||||
return (*cntx)->SendNullArray();
|
||||
default:
|
||||
LOG(ERROR) << "Unexpected error " << result;
|
||||
}
|
||||
return (*cntx)->SendNullArray();
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
||||
void ZSetFamily::BZPopMin(CmdArgList args, ConnectionContext* cntx) {
|
||||
BZPopMinMax(args, cntx, false);
|
||||
}
|
||||
|
||||
void ZSetFamily::BZPopMax(CmdArgList args, ConnectionContext* cntx) {
|
||||
BZPopMinMax(args, cntx, true);
|
||||
}
|
||||
|
||||
void ZSetFamily::ZAdd(CmdArgList args, ConnectionContext* cntx) {
|
||||
string_view key = ArgS(args, 0);
|
||||
|
||||
|
@ -2195,34 +2293,39 @@ OpResult<unsigned> ZSetFamily::OpLexCount(const OpArgs& op_args, string_view key
|
|||
void ZSetFamily::Register(CommandRegistry* registry) {
|
||||
constexpr uint32_t kStoreMask = CO::WRITE | CO::VARIADIC_KEYS | CO::REVERSE_MAPPING;
|
||||
|
||||
*registry << CI{"ZADD", CO::FAST | CO::WRITE | CO::DENYOOM, -4, 1, 1, 1}.HFUNC(ZAdd)
|
||||
<< CI{"ZCARD", CO::FAST | CO::READONLY, 2, 1, 1, 1}.HFUNC(ZCard)
|
||||
<< CI{"ZCOUNT", CO::FAST | CO::READONLY, 4, 1, 1, 1}.HFUNC(ZCount)
|
||||
<< CI{"ZINCRBY", CO::FAST | CO::WRITE | CO::DENYOOM, 4, 1, 1, 1}.HFUNC(ZIncrBy)
|
||||
<< CI{"ZINTERSTORE", kStoreMask, -4, 3, 3, 1}.HFUNC(ZInterStore)
|
||||
<< CI{"ZINTERCARD", CO::READONLY | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS, -3, 2, 2, 1}
|
||||
.HFUNC(ZInterCard)
|
||||
<< CI{"ZLEXCOUNT", CO::READONLY, 4, 1, 1, 1}.HFUNC(ZLexCount)
|
||||
<< CI{"ZPOPMAX", CO::FAST | CO::WRITE, -2, 1, 1, 1}.HFUNC(ZPopMax)
|
||||
<< CI{"ZPOPMIN", CO::FAST | CO::WRITE, -2, 1, 1, 1}.HFUNC(ZPopMin)
|
||||
<< CI{"ZREM", CO::FAST | CO::WRITE, -3, 1, 1, 1}.HFUNC(ZRem)
|
||||
<< CI{"ZRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRange)
|
||||
<< CI{"ZRANK", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(ZRank)
|
||||
<< CI{"ZRANGEBYLEX", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRangeByLex)
|
||||
<< CI{"ZRANGEBYSCORE", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRangeByScore)
|
||||
<< CI{"ZSCORE", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(ZScore)
|
||||
<< CI{"ZMSCORE", CO::READONLY | CO::FAST, -3, 1, 1, 1}.HFUNC(ZMScore)
|
||||
<< CI{"ZREMRANGEBYRANK", CO::WRITE, 4, 1, 1, 1}.HFUNC(ZRemRangeByRank)
|
||||
<< CI{"ZREMRANGEBYSCORE", CO::WRITE, 4, 1, 1, 1}.HFUNC(ZRemRangeByScore)
|
||||
<< CI{"ZREMRANGEBYLEX", CO::WRITE, 4, 1, 1, 1}.HFUNC(ZRemRangeByLex)
|
||||
<< CI{"ZREVRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRevRange)
|
||||
<< CI{"ZREVRANGEBYLEX", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRevRangeByLex)
|
||||
<< CI{"ZREVRANGEBYSCORE", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRevRangeByScore)
|
||||
<< CI{"ZREVRANK", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(ZRevRank)
|
||||
<< CI{"ZSCAN", CO::READONLY, -3, 1, 1, 1}.HFUNC(ZScan)
|
||||
<< CI{"ZUNION", CO::READONLY | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS, -3, 2, 2, 1}
|
||||
.HFUNC(ZUnion)
|
||||
<< CI{"ZUNIONSTORE", kStoreMask, -4, 3, 3, 1}.HFUNC(ZUnionStore);
|
||||
*registry
|
||||
<< CI{"ZADD", CO::FAST | CO::WRITE | CO::DENYOOM, -4, 1, 1, 1}.HFUNC(ZAdd)
|
||||
<< CI{"BZPOPMIN", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING | CO::NO_AUTOJOURNAL, -3, 1, -2, 1}
|
||||
.HFUNC(BZPopMin)
|
||||
<< CI{"BZPOPMAX", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING | CO::NO_AUTOJOURNAL, -3, 1, -2, 1}
|
||||
.HFUNC(BZPopMax)
|
||||
<< CI{"ZCARD", CO::FAST | CO::READONLY, 2, 1, 1, 1}.HFUNC(ZCard)
|
||||
<< CI{"ZCOUNT", CO::FAST | CO::READONLY, 4, 1, 1, 1}.HFUNC(ZCount)
|
||||
<< CI{"ZINCRBY", CO::FAST | CO::WRITE | CO::DENYOOM, 4, 1, 1, 1}.HFUNC(ZIncrBy)
|
||||
<< CI{"ZINTERSTORE", kStoreMask, -4, 3, 3, 1}.HFUNC(ZInterStore)
|
||||
<< CI{"ZINTERCARD", CO::READONLY | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS, -3, 2, 2, 1}
|
||||
.HFUNC(ZInterCard)
|
||||
<< CI{"ZLEXCOUNT", CO::READONLY, 4, 1, 1, 1}.HFUNC(ZLexCount)
|
||||
<< CI{"ZPOPMAX", CO::FAST | CO::WRITE, -2, 1, 1, 1}.HFUNC(ZPopMax)
|
||||
<< CI{"ZPOPMIN", CO::FAST | CO::WRITE, -2, 1, 1, 1}.HFUNC(ZPopMin)
|
||||
<< CI{"ZREM", CO::FAST | CO::WRITE, -3, 1, 1, 1}.HFUNC(ZRem)
|
||||
<< CI{"ZRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRange)
|
||||
<< CI{"ZRANK", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(ZRank)
|
||||
<< CI{"ZRANGEBYLEX", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRangeByLex)
|
||||
<< CI{"ZRANGEBYSCORE", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRangeByScore)
|
||||
<< CI{"ZSCORE", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(ZScore)
|
||||
<< CI{"ZMSCORE", CO::READONLY | CO::FAST, -3, 1, 1, 1}.HFUNC(ZMScore)
|
||||
<< CI{"ZREMRANGEBYRANK", CO::WRITE, 4, 1, 1, 1}.HFUNC(ZRemRangeByRank)
|
||||
<< CI{"ZREMRANGEBYSCORE", CO::WRITE, 4, 1, 1, 1}.HFUNC(ZRemRangeByScore)
|
||||
<< CI{"ZREMRANGEBYLEX", CO::WRITE, 4, 1, 1, 1}.HFUNC(ZRemRangeByLex)
|
||||
<< CI{"ZREVRANGE", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRevRange)
|
||||
<< CI{"ZREVRANGEBYLEX", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRevRangeByLex)
|
||||
<< CI{"ZREVRANGEBYSCORE", CO::READONLY, -4, 1, 1, 1}.HFUNC(ZRevRangeByScore)
|
||||
<< CI{"ZREVRANK", CO::READONLY | CO::FAST, 3, 1, 1, 1}.HFUNC(ZRevRank)
|
||||
<< CI{"ZSCAN", CO::READONLY, -3, 1, 1, 1}.HFUNC(ZScan)
|
||||
<< CI{"ZUNION", CO::READONLY | CO::REVERSE_MAPPING | CO::VARIADIC_KEYS, -3, 2, 2, 1}.HFUNC(
|
||||
ZUnion)
|
||||
<< CI{"ZUNIONSTORE", kStoreMask, -4, 3, 3, 1}.HFUNC(ZUnionStore);
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
|
@ -55,6 +55,9 @@ class ZSetFamily {
|
|||
private:
|
||||
template <typename T> using OpResult = facade::OpResult<T>;
|
||||
|
||||
static void BZPopMin(CmdArgList args, ConnectionContext* cntx);
|
||||
static void BZPopMax(CmdArgList args, ConnectionContext* cntx);
|
||||
|
||||
static void ZAdd(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZCard(CmdArgList args, ConnectionContext* cntx);
|
||||
static void ZCount(CmdArgList args, ConnectionContext* cntx);
|
||||
|
|
|
@ -527,4 +527,55 @@ TEST_F(ZSetFamilyTest, Resp3) {
|
|||
ASSERT_THAT(resp.GetVec()[1].GetVec(), ElementsAre("b", DoubleArg(2)));
|
||||
}
|
||||
|
||||
TEST_F(ZSetFamilyTest, BlockingIsReleased) {
|
||||
// Inputs for ZSET store commands.
|
||||
Run({"ZADD", "A", "1", "x", "2", "b"});
|
||||
Run({"ZADD", "B", "1", "x", "3", "b"});
|
||||
Run({"ZADD", "C", "1", "x", "10", "a"});
|
||||
Run({"ZADD", "D", "1", "x", "5", "c"});
|
||||
|
||||
vector<string> blocking_keys{"zset1", "zset2", "zset3"};
|
||||
for (const auto& key : blocking_keys) {
|
||||
vector<vector<string>> unblocking_commands;
|
||||
// All commands output the same set {2,x}.
|
||||
unblocking_commands.push_back({"ZADD", key, "2", "x", "10", "y"});
|
||||
unblocking_commands.push_back({"ZINCRBY", key, "2", "x"});
|
||||
unblocking_commands.push_back({"ZINTERSTORE", key, "2", "A", "B"});
|
||||
unblocking_commands.push_back({"ZUNIONSTORE", key, "2", "C", "D"});
|
||||
// unblocking_commands.push_back({"ZDIFFSTORE", key, "2", "A", "B"}); // unimplemented
|
||||
|
||||
for (auto& cmd : unblocking_commands) {
|
||||
RespExpr resp0;
|
||||
auto fb0 = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] {
|
||||
resp0 = Run({"BZPOPMIN", "zset1", "zset2", "zset3", "0"});
|
||||
LOG(INFO) << "BZPOPMIN";
|
||||
});
|
||||
|
||||
pp_->at(1)->Await([&] { return Run({cmd.data(), cmd.size()}); });
|
||||
fb0.Join();
|
||||
|
||||
ASSERT_THAT(resp0, ArrLen(3)) << cmd[0];
|
||||
EXPECT_THAT(resp0.GetVec(), ElementsAre(key, "x", "2")) << cmd[0];
|
||||
|
||||
Run({"DEL", key});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
TEST_F(ZSetFamilyTest, BlockingTimeout) {
|
||||
RespExpr resp0;
|
||||
|
||||
auto start = absl::Now();
|
||||
auto fb0 = pp_->at(0)->LaunchFiber(Launch::dispatch, [&] {
|
||||
resp0 = Run({"BZPOPMIN", "zset1", "1"});
|
||||
LOG(INFO) << "BZPOPMIN";
|
||||
});
|
||||
fb0.Join();
|
||||
auto dur = absl::Now() - start;
|
||||
|
||||
// Check that the timeout duration is not too crazy.
|
||||
EXPECT_LT(AbsDuration(dur - absl::Milliseconds(1000)), absl::Milliseconds(300));
|
||||
EXPECT_THAT(resp0, ArgType(RespExpr::NIL_ARRAY));
|
||||
}
|
||||
|
||||
} // namespace dfly
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue