Add HSTRLEN. Bug fixes.

This commit is contained in:
Roman Gershman 2022-04-05 08:36:00 +03:00
parent cae1403191
commit 19583ca7f2
4 changed files with 71 additions and 12 deletions

View file

@ -267,6 +267,8 @@ Design config support. ~10-20 commands overall...
Probably implement cluster-API decorators to allow cluster-configured clients to connect to a single
instance.
- [X] HSTRLEN
## Design decisions along the way
### Expiration deadlines with relative accuracy
I decided to limit the expiration range to 365 days. Moreover, expiration deadlines

View file

@ -8,6 +8,7 @@ extern "C" {
#include "redis/listpack.h"
#include "redis/object.h"
#include "redis/redis_aux.h"
#include "redis/util.h"
}
#include "base/logging.h"
@ -55,7 +56,7 @@ pair<uint8_t*, bool> LpInsert(uint8_t* lp, string_view field, string_view val, b
uint8_t* vptr;
uint8_t* fptr = lpFirst(lp);
uint8_t* fsrc = (uint8_t*)field.data();
uint8_t* fsrc = field.empty() ? lp : (uint8_t*)field.data();
// if we vsrc is NULL then lpReplace will delete the element, which is not what we want.
// therefore, for an empty val we set it to some other valid address so that lpReplace
@ -101,7 +102,7 @@ void HSetFamily::HDel(CmdArgList args, ConnectionContext* cntx) {
};
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
if (result || result.status() == OpStatus::KEY_NOTFOUND) {
(*cntx)->SendLong(*result);
} else {
(*cntx)->SendError(result.status());
@ -305,7 +306,19 @@ void HSetFamily::HSetNx(CmdArgList args, ConnectionContext* cntx) {
}
void HSetFamily::HStrLen(CmdArgList args, ConnectionContext* cntx) {
LOG(DFATAL) << "TBD";
string_view key = ArgS(args, 1);
string_view field = ArgS(args, 2);
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpStrLen(OpArgs{shard, t->db_index()}, key, field);
};
OpResult<size_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) {
(*cntx)->SendLong(*result);
} else {
(*cntx)->SendError(result.status());
}
}
void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) {
@ -637,6 +650,37 @@ OpResult<vector<string>> HSetFamily::OpGetAll(const OpArgs& op_args, string_view
return res;
}
OpResult<size_t> HSetFamily::OpStrLen(const OpArgs& op_args, string_view key, string_view field) {
auto& db_slice = op_args.shard->db_slice();
auto it_res = db_slice.Find(op_args.db_ind, key, OBJ_HASH);
if (!it_res) {
if (it_res.status() == OpStatus::KEY_NOTFOUND)
return 0;
return it_res.status();
}
robj* hset = (*it_res)->second.AsRObj();
size_t field_len = 0;
op_args.shard->tmp_str1 = sdscpylen(op_args.shard->tmp_str1, field.data(), field.size());
if (hset->encoding == OBJ_ENCODING_LISTPACK) {
unsigned char* vstr = NULL;
unsigned int vlen = UINT_MAX;
long long vll = LLONG_MAX;
if (hashTypeGetFromListpack(hset, op_args.shard->tmp_str1, &vstr, &vlen, &vll) == 0)
field_len = vstr ? vlen : sdigits10(vll);
return field_len;
}
DCHECK_EQ(hset->encoding, OBJ_ENCODING_HT);
dictEntry* de = dictFind((dict*)hset->ptr, op_args.shard->tmp_str1);
return de ? sdslen((sds)de->v.val) : 0;
}
OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_view field,
IncrByParam* param) {
auto& db_slice = op_args.shard->db_slice();
@ -681,7 +725,8 @@ OpStatus HSetFamily::OpIncrBy(const OpArgs& op_args, string_view key, string_vie
LOG(FATAL) << "TBD";
} else {
if (exist_res == C_OK && vstr) {
if (!absl::SimpleAtoi(string_view{reinterpret_cast<char*>(vstr), vlen}, &old_val)) {
const char* exist_val = reinterpret_cast<char*>(vstr);
if (!string2ll(exist_val, vlen, &old_val)) {
stats->listpack_bytes += lpb;
return OpStatus::INVALID_VALUE;

View file

@ -55,11 +55,12 @@ class HSetFamily {
static OpResult<std::vector<std::string>> OpGetAll(const OpArgs& op_args, std::string_view key,
uint8_t getall_mask);
static OpResult<size_t> OpStrLen(const OpArgs& op_args, std::string_view key,
std::string_view field);
using IncrByParam = std::variant<double, int64_t>;
static OpStatus OpIncrBy(const OpArgs& op_args, std::string_view key,
std::string_view field, IncrByParam* param);
static OpStatus OpIncrBy(const OpArgs& op_args, std::string_view key, std::string_view field,
IncrByParam* param);
};
} // namespace dfly

View file

@ -56,15 +56,18 @@ TEST_F(HSetFamilyTest, Basic) {
EXPECT_EQ(2, CheckedInt({"hset", "y", "a", "c", "d", "e"}));
EXPECT_EQ(2, CheckedInt({"hdel", "y", "a", "d"}));
EXPECT_THAT(Run({"hdel", "nokey", "a"}), ElementsAre(IntArg(0)));
}
TEST_F(HSetFamilyTest, HSetLarge) {
TEST_F(HSetFamilyTest, HSet) {
string val(1024, 'b');
auto resp = Run({"hset", "x", "a", val});
EXPECT_THAT(resp[0], IntArg(1));
resp = Run({"hlen", "x"});
EXPECT_THAT(resp[0], IntArg(1));
EXPECT_EQ(1, CheckedInt({"hset", "large", "a", val}));
EXPECT_EQ(1, CheckedInt({"hlen", "large"}));
EXPECT_EQ(1024, CheckedInt({"hstrlen", "large", "a"}));
EXPECT_EQ(1, CheckedInt({"hset", "small", "", "565323349817"}));
}
TEST_F(HSetFamilyTest, Get) {
@ -102,4 +105,12 @@ TEST_F(HSetFamilyTest, HSetNx) {
EXPECT_THAT(Run({"hget", "key", "field2"}), RespEq("val2"));
}
TEST_F(HSetFamilyTest, HIncr) {
EXPECT_EQ(10, CheckedInt({"hincrby", "key", "field", "10"}));
Run({"hset", "key", "a", " 1"});
auto resp = Run({"hincrby", "key", "a", "10"});
EXPECT_THAT(resp[0], ErrArg("hash value is not an integer"));
}
} // namespace dfly