Fix bugs related to error reporting.

1. Fix #9
2. SINTER now reports error if some of its keys are of wrong type.
This commit is contained in:
Roman Gershman 2022-04-07 12:01:18 +03:00
parent d03cea5e36
commit 5ef63f41d2
5 changed files with 38 additions and 11 deletions

View file

@ -676,8 +676,8 @@ void CompactObj::SetString(std::string_view str) {
if (str.size() <= kInlineLen) { if (str.size() <= kInlineLen) {
SetMeta(str.size(), mask); SetMeta(str.size(), mask);
if (!str.empty())
memcpy(u_.inline_str, str.data(), str.size()); memcpy(u_.inline_str, str.data(), str.size());
return; return;
} }
} }

View file

@ -65,7 +65,6 @@ class CompactObjectTest : public ::testing::Test {
TEST_F(CompactObjectTest, Basic) { TEST_F(CompactObjectTest, Basic) {
robj* rv = createRawStringObject("foo", 3); robj* rv = createRawStringObject("foo", 3);
cobj_.ImportRObj(rv); cobj_.ImportRObj(rv);
return;
CompactObj a; CompactObj a;
a.SetExpire(true); a.SetExpire(true);
@ -83,6 +82,8 @@ TEST_F(CompactObjectTest, Basic) {
CompactObj c = a.AsRef(); CompactObj c = a.AsRef();
EXPECT_EQ(a, c); EXPECT_EQ(a, c);
EXPECT_TRUE(c.HasExpire()); EXPECT_TRUE(c.HasExpire());
cobj_.SetString(string_view{});
} }
TEST_F(CompactObjectTest, NonInline) { TEST_F(CompactObjectTest, NonInline) {

View file

@ -258,9 +258,12 @@ void ListFamily::LIndex(CmdArgList args, ConnectionContext* cntx) {
auto cb = [&](Transaction* t, EngineShard* shard) { auto cb = [&](Transaction* t, EngineShard* shard) {
return OpIndex(OpArgs{shard, t->db_index()}, key, index); return OpIndex(OpArgs{shard, t->db_index()}, key, index);
}; };
OpResult<string> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); OpResult<string> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
if (result) { if (result) {
(*cntx)->SendBulkString(result.value()); (*cntx)->SendBulkString(result.value());
} else if (result.status() == OpStatus::WRONG_TYPE) {
(*cntx)->SendError(result.status());
} else { } else {
(*cntx)->SendNull(); (*cntx)->SendNull();
} }

View file

@ -12,6 +12,7 @@ extern "C" {
} }
#include "base/logging.h" #include "base/logging.h"
#include "base/stl_util.h"
#include "server/command_registry.h" #include "server/command_registry.h"
#include "server/conn_context.h" #include "server/conn_context.h"
#include "server/engine_shard_set.h" #include "server/engine_shard_set.h"
@ -158,15 +159,22 @@ ResultSetView DiffResultVec(const ResultStringVec& result_vec, ShardId src_shard
OpResult<SvArray> InterResultVec(const ResultStringVec& result_vec, unsigned required_shard_cnt) { OpResult<SvArray> InterResultVec(const ResultStringVec& result_vec, unsigned required_shard_cnt) {
absl::flat_hash_map<std::string_view, unsigned> uniques; absl::flat_hash_map<std::string_view, unsigned> uniques;
for (const auto& res : result_vec) {
if (!res && !base::_in(res.status(), {OpStatus::SKIPPED, OpStatus::KEY_NOTFOUND}))
return res.status();
}
for (const auto& res : result_vec) {
if (res.status() == OpStatus::KEY_NOTFOUND)
return OpStatus::OK; // empty set.
}
bool first = true; bool first = true;
for (const auto& res : result_vec) { for (const auto& res : result_vec) {
if (res.status() == OpStatus::SKIPPED) if (res.status() == OpStatus::SKIPPED)
continue; continue;
if (res.status() == OpStatus::KEY_NOTFOUND)
return SvArray{}; DCHECK(res); // we handled it above.
if (!res) {
return res.status();
}
// I use this awkward 'first' condition instead of table[s]++ deliberately. // I use this awkward 'first' condition instead of table[s]++ deliberately.
// I do not want to add keys that I know will not stay in the set. // I do not want to add keys that I know will not stay in the set.
@ -1042,15 +1050,25 @@ OpResult<StringVec> SetFamily::OpInter(const Transaction* t, EngineShard* es, bo
// we must copy by value because AsRObj is temporary. // we must copy by value because AsRObj is temporary.
vector<SetType> sets(keys.size()); vector<SetType> sets(keys.size());
OpStatus status = OpStatus::OK;
for (size_t i = 0; i < keys.size(); ++i) { for (size_t i = 0; i < keys.size(); ++i) {
OpResult<PrimeIterator> find_res = es->db_slice().Find(t->db_index(), keys[i], OBJ_SET); OpResult<PrimeIterator> find_res = es->db_slice().Find(t->db_index(), keys[i], OBJ_SET);
if (!find_res) if (!find_res) {
return find_res.status(); if (status == OpStatus::OK || status == OpStatus::KEY_NOTFOUND ||
find_res.status() != OpStatus::KEY_NOTFOUND) {
status = find_res.status();
}
continue;
}
const PrimeValue& pv = find_res.value()->second; const PrimeValue& pv = find_res.value()->second;
void* ptr = pv.RObjPtr(); void* ptr = pv.RObjPtr();
sets[i] = make_pair(ptr, pv.Encoding()); sets[i] = make_pair(ptr, pv.Encoding());
} }
if (status != OpStatus::OK)
return status;
auto comp = [](const SetType& left, const SetType& right) { auto comp = [](const SetType& left, const SetType& right) {
return SetTypeLen(left) < SetTypeLen(right); return SetTypeLen(left) < SetTypeLen(right);
}; };

View file

@ -80,6 +80,11 @@ TEST_F(SetFamilyTest, SInter) {
EXPECT_THAT(resp[0], IntArg(2)); EXPECT_THAT(resp[0], IntArg(2));
resp = Run({"smembers", "d"}); resp = Run({"smembers", "d"});
EXPECT_THAT(resp, UnorderedElementsAre("3", "2")); EXPECT_THAT(resp, UnorderedElementsAre("3", "2"));
Run({"set", "y", ""});
resp = Run({"sinter", "x", "y"});
ASSERT_EQ(1, GetDebugInfo("IO0").shards_count);
EXPECT_THAT(resp, ElementsAre(ErrArg("WRONGTYPE Operation against a key")));
} }
TEST_F(SetFamilyTest, SMove) { TEST_F(SetFamilyTest, SMove) {