mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
chore: provide basic logging to catch possible command errors (#3213)
* chore: provide basic logging to catch possible command errors --------- Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
1dfb604d97
commit
55e445b511
18 changed files with 115 additions and 82 deletions
|
@ -41,4 +41,16 @@ size_t ConnectionContext::UsedMemory() const {
|
|||
return dfly::HeapSize(rbuilder_) + dfly::HeapSize(authed_username) + dfly::HeapSize(acl_commands);
|
||||
}
|
||||
|
||||
void ConnectionContext::SendError(std::string_view str, std::string_view type) {
|
||||
rbuilder_->SendError(str, type);
|
||||
}
|
||||
|
||||
void ConnectionContext::SendError(ErrorReply error) {
|
||||
rbuilder_->SendError(error);
|
||||
}
|
||||
|
||||
void ConnectionContext::SendError(OpStatus status) {
|
||||
rbuilder_->SendError(status);
|
||||
}
|
||||
|
||||
} // namespace facade
|
||||
|
|
|
@ -47,17 +47,11 @@ class ConnectionContext {
|
|||
return res;
|
||||
}
|
||||
|
||||
void SendError(std::string_view str, std::string_view type = std::string_view{}) {
|
||||
rbuilder_->SendError(str, type);
|
||||
}
|
||||
virtual void SendError(std::string_view str, std::string_view type = std::string_view{});
|
||||
|
||||
void SendError(ErrorReply error) {
|
||||
rbuilder_->SendError(error);
|
||||
}
|
||||
virtual void SendError(ErrorReply error);
|
||||
|
||||
void SendError(OpStatus status) {
|
||||
rbuilder_->SendError(status);
|
||||
}
|
||||
virtual void SendError(OpStatus status);
|
||||
|
||||
void SendStored() {
|
||||
rbuilder_->SendStored();
|
||||
|
|
|
@ -33,7 +33,7 @@ class OkService : public ServiceInterface {
|
|||
|
||||
void DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,
|
||||
ConnectionContext* cntx) final {
|
||||
cntx->reply_builder()->SendError("");
|
||||
cntx->SendError("");
|
||||
}
|
||||
|
||||
ConnectionContext* CreateContext(util::FiberSocketBase* peer, Connection* owner) final {
|
||||
|
|
|
@ -585,12 +585,12 @@ void ClusterFamily::DflyClusterGetSlotInfo(CmdArgList args, ConnectionContext* c
|
|||
do {
|
||||
auto sid = parser.Next<uint32_t>();
|
||||
if (sid > kMaxSlotNum)
|
||||
return rb->SendError("Invalid slot id");
|
||||
return cntx->SendError("Invalid slot id");
|
||||
slots_stats.emplace_back(sid, SlotStats{});
|
||||
} while (parser.HasNext());
|
||||
|
||||
if (auto err = parser.Error(); err)
|
||||
return rb->SendError(err->MakeReply());
|
||||
return cntx->SendError(err->MakeReply());
|
||||
|
||||
fb2::Mutex mu;
|
||||
|
||||
|
@ -675,7 +675,7 @@ void ClusterFamily::DflySlotMigrationStatus(CmdArgList args, ConnectionContext*
|
|||
if (parser.HasNext()) {
|
||||
node_id = parser.Next<std::string_view>();
|
||||
if (auto err = parser.Error(); err) {
|
||||
return rb->SendError(err->MakeReply());
|
||||
return cntx->SendError(err->MakeReply());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -265,6 +265,29 @@ size_t ConnectionContext::UsedMemory() const {
|
|||
return facade::ConnectionContext::UsedMemory() + dfly::HeapSize(conn_state);
|
||||
}
|
||||
|
||||
void ConnectionContext::SendError(std::string_view str, std::string_view type) {
|
||||
string_view name = cid ? cid->name() : string_view{};
|
||||
|
||||
VLOG(1) << "Sending error " << str << " " << type << " during " << name;
|
||||
facade::ConnectionContext::SendError(str, type);
|
||||
}
|
||||
|
||||
void ConnectionContext::SendError(facade::ErrorReply error) {
|
||||
string_view name = cid ? cid->name() : string_view{};
|
||||
|
||||
VLOG(1) << "Sending error " << error.ToSv() << " during " << name;
|
||||
facade::ConnectionContext::SendError(std::move(error));
|
||||
}
|
||||
|
||||
void ConnectionContext::SendError(facade::OpStatus status) {
|
||||
if (status != facade::OpStatus::OK) {
|
||||
string_view name = cid ? cid->name() : string_view{};
|
||||
VLOG(1) << "Sending error " << status << " during " << name;
|
||||
}
|
||||
|
||||
facade::ConnectionContext::SendError(status);
|
||||
}
|
||||
|
||||
void ConnectionState::ExecInfo::Clear() {
|
||||
DCHECK(!preborrowed_interpreter); // Must have been released properly
|
||||
state = EXEC_INACTIVE;
|
||||
|
|
|
@ -299,6 +299,10 @@ class ConnectionContext : public facade::ConnectionContext {
|
|||
|
||||
size_t UsedMemory() const override;
|
||||
|
||||
void SendError(std::string_view str, std::string_view type = std::string_view{}) override;
|
||||
void SendError(facade::ErrorReply error) override;
|
||||
void SendError(facade::OpStatus status) override;
|
||||
|
||||
// Whether this connection is a connection from a replica to its master.
|
||||
// This flag is true only on replica side, where we need to setup a special ConnectionContext
|
||||
// instance that helps applying commands coming from master.
|
||||
|
|
|
@ -520,10 +520,10 @@ void DebugCmd::Replica(CmdArgList args) {
|
|||
}
|
||||
return;
|
||||
} else {
|
||||
return rb->SendError("I am master");
|
||||
return cntx_->SendError("I am master");
|
||||
}
|
||||
}
|
||||
return rb->SendError(UnknownSubCmd("replica", "DEBUG"));
|
||||
return cntx_->SendError(UnknownSubCmd("replica", "DEBUG"));
|
||||
}
|
||||
|
||||
void DebugCmd::Load(string_view filename) {
|
||||
|
|
|
@ -184,7 +184,7 @@ void DflyCmd::Thread(CmdArgList args, ConnectionContext* cntx) {
|
|||
string_view arg = ArgS(args, 1);
|
||||
unsigned num_thread;
|
||||
if (!absl::SimpleAtoi(arg, &num_thread)) {
|
||||
return rb->SendError(kSyntaxErr);
|
||||
return cntx->SendError(kSyntaxErr);
|
||||
}
|
||||
|
||||
if (num_thread < pool->size()) {
|
||||
|
@ -192,7 +192,7 @@ void DflyCmd::Thread(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (!cntx->conn()->Migrate(pool->at(num_thread))) {
|
||||
// Listener::PreShutdown() triggered
|
||||
if (cntx->conn()->socket()->IsOpen()) {
|
||||
return rb->SendError(kInvalidState);
|
||||
return cntx->SendError(kInvalidState);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -201,7 +201,7 @@ void DflyCmd::Thread(CmdArgList args, ConnectionContext* cntx) {
|
|||
return rb->SendOk();
|
||||
}
|
||||
|
||||
return rb->SendError(kInvalidIntErr);
|
||||
return cntx->SendError(kInvalidIntErr);
|
||||
}
|
||||
|
||||
void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) {
|
||||
|
@ -214,7 +214,7 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (args.size() == 5) {
|
||||
seqid.emplace();
|
||||
if (!absl::SimpleAtoi(ArgS(args, 4), &seqid.value())) {
|
||||
return rb->SendError(facade::kInvalidIntErr);
|
||||
return cntx->SendError(facade::kInvalidIntErr);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -222,12 +222,12 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) {
|
|||
<< " flow: " << flow_id_str << " seq: " << seqid.value_or(-1);
|
||||
|
||||
if (master_id != sf_->master_replid()) {
|
||||
return rb->SendError(kBadMasterId);
|
||||
return cntx->SendError(kBadMasterId);
|
||||
}
|
||||
|
||||
unsigned flow_id;
|
||||
if (!absl::SimpleAtoi(flow_id_str, &flow_id) || flow_id >= shard_set->size()) {
|
||||
return rb->SendError(facade::kInvalidIntErr);
|
||||
return cntx->SendError(facade::kInvalidIntErr);
|
||||
}
|
||||
|
||||
auto [sync_id, replica_ptr] = GetReplicaInfoOrReply(sync_id_str, rb);
|
||||
|
@ -236,7 +236,7 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
unique_lock lk(replica_ptr->mu);
|
||||
if (replica_ptr->replica_state != SyncState::PREPARATION)
|
||||
return rb->SendError(kInvalidState);
|
||||
return cntx->SendError(kInvalidState);
|
||||
|
||||
// Set meta info on connection.
|
||||
cntx->conn()->SetName(absl::StrCat("repl_flow_", sync_id));
|
||||
|
@ -254,7 +254,7 @@ void DflyCmd::Flow(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (!cntx->conn()->Migrate(shard_set->pool()->at(flow_id))) {
|
||||
// Listener::PreShutdown() triggered
|
||||
if (cntx->conn()->socket()->IsOpen()) {
|
||||
return rb->SendError(kInvalidState);
|
||||
return cntx->SendError(kInvalidState);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
@ -311,7 +311,7 @@ void DflyCmd::Sync(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
// TODO: Send better error
|
||||
if (*status != OpStatus::OK)
|
||||
return rb->SendError(kInvalidState);
|
||||
return cntx->SendError(kInvalidState);
|
||||
}
|
||||
|
||||
LOG(INFO) << "Started sync with replica " << replica_ptr->address << ":"
|
||||
|
@ -349,7 +349,7 @@ void DflyCmd::StartStable(CmdArgList args, ConnectionContext* cntx) {
|
|||
shard_set->RunBlockingInParallel(std::move(cb));
|
||||
|
||||
if (*status != OpStatus::OK)
|
||||
return rb->SendError(kInvalidState);
|
||||
return cntx->SendError(kInvalidState);
|
||||
}
|
||||
|
||||
LOG(INFO) << "Transitioned into stable sync with replica " << replica_ptr->address << ":"
|
||||
|
@ -436,7 +436,7 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
if (*status != OpStatus::OK) {
|
||||
sf_->service().SwitchState(GlobalState::TAKEN_OVER, GlobalState::ACTIVE);
|
||||
return rb->SendError("Takeover failed!");
|
||||
return cntx->SendError("Takeover failed!");
|
||||
}
|
||||
cntx->SendOk();
|
||||
|
||||
|
|
|
@ -710,7 +710,7 @@ void HGetGeneric(CmdArgList args, ConnectionContext* cntx, uint8_t getall_mask)
|
|||
rb->SendStringArr(absl::Span<const string>{*result},
|
||||
is_map ? RedisReplyBuilder::MAP : RedisReplyBuilder::ARRAY);
|
||||
} else {
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -822,7 +822,7 @@ void HSetFamily::HMGet(CmdArgList args, ConnectionContext* cntx) {
|
|||
rb->SendNull();
|
||||
}
|
||||
} else {
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -842,7 +842,7 @@ void HSetFamily::HGet(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (result.status() == OpStatus::KEY_NOTFOUND) {
|
||||
rb->SendNull();
|
||||
} else {
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -965,7 +965,7 @@ void HSetFamily::HScan(CmdArgList args, ConnectionContext* cntx) {
|
|||
rb->SendBulkString(k);
|
||||
}
|
||||
} else {
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1149,7 +1149,7 @@ void HSetFamily::HRandField(CmdArgList args, ConnectionContext* cntx) {
|
|||
else
|
||||
rb->SendEmptyArray();
|
||||
} else {
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -1439,7 +1439,7 @@ void JsonFamily::Set(CmdArgList args, ConnectionContext* cntx) {
|
|||
rb->SendNull();
|
||||
}
|
||||
} else {
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1510,7 +1510,7 @@ void JsonFamily::Resp(CmdArgList args, ConnectionContext* cntx) {
|
|||
SendJsonValue(rb, it);
|
||||
}
|
||||
} else {
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1793,7 +1793,7 @@ void JsonFamily::ArrPop(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1865,7 +1865,7 @@ void JsonFamily::ObjKeys(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
} else {
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1916,7 +1916,7 @@ void JsonFamily::NumIncrBy(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (result) {
|
||||
rb->SendBulkString(*result);
|
||||
} else {
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1944,7 +1944,7 @@ void JsonFamily::NumMultBy(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (result) {
|
||||
rb->SendBulkString(*result);
|
||||
} else {
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1992,7 +1992,7 @@ void JsonFamily::Type(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (result.status() == OpStatus::KEY_NOTFOUND) {
|
||||
rb->SendNullArray();
|
||||
} else {
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -2112,7 +2112,7 @@ void JsonFamily::Get(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (result == facade::OpStatus::KEY_NOTFOUND) {
|
||||
rb->SendNull(); // Match Redis
|
||||
} else {
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -735,7 +735,7 @@ void MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest, Lis
|
|||
break;
|
||||
|
||||
default:
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -776,7 +776,7 @@ void BRPopLPush(CmdArgList args, ConnectionContext* cntx) {
|
|||
break;
|
||||
|
||||
default:
|
||||
return rb->SendError(op_res.status());
|
||||
return cntx->SendError(op_res.status());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -819,7 +819,7 @@ void BLMove(CmdArgList args, ConnectionContext* cntx) {
|
|||
break;
|
||||
|
||||
default:
|
||||
return rb->SendError(op_res.status());
|
||||
return cntx->SendError(op_res.status());
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -1034,7 +1034,7 @@ void ListFamily::LIndex(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (result) {
|
||||
rb->SendBulkString(result.value());
|
||||
} else if (result.status() == OpStatus::WRONG_TYPE) {
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
} else {
|
||||
rb->SendNull();
|
||||
}
|
||||
|
@ -1217,13 +1217,13 @@ void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cn
|
|||
|
||||
switch (popped_key.status()) {
|
||||
case OpStatus::WRONG_TYPE:
|
||||
return rb->SendError(kWrongTypeErr);
|
||||
return cntx->SendError(kWrongTypeErr);
|
||||
case OpStatus::CANCELLED:
|
||||
case OpStatus::TIMED_OUT:
|
||||
return rb->SendNullArray();
|
||||
case OpStatus::KEY_MOVED:
|
||||
// TODO: proper error for moved
|
||||
return rb->SendError("-MOVED");
|
||||
return cntx->SendError("-MOVED");
|
||||
default:
|
||||
LOG(ERROR) << "Unexpected error " << popped_key.status();
|
||||
}
|
||||
|
@ -1277,7 +1277,7 @@ void ListFamily::PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cnt
|
|||
case OpStatus::KEY_NOTFOUND:
|
||||
return rb->SendNull();
|
||||
case OpStatus::WRONG_TYPE:
|
||||
return rb->SendError(kWrongTypeErr);
|
||||
return cntx->SendError(kWrongTypeErr);
|
||||
default:;
|
||||
}
|
||||
|
||||
|
|
|
@ -1226,7 +1226,7 @@ void Service::DispatchCommand(CmdArgList args, facade::ConnectionContext* cntx)
|
|||
dfly_cntx->cid = cid;
|
||||
|
||||
if (!InvokeCmd(cid, args_no_cmd, dfly_cntx)) {
|
||||
dfly_cntx->reply_builder()->SendError("Internal Error");
|
||||
dfly_cntx->SendError("Internal Error");
|
||||
dfly_cntx->reply_builder()->CloseConnection();
|
||||
}
|
||||
|
||||
|
@ -1756,7 +1756,7 @@ void Service::Eval(CmdArgList args, ConnectionContext* cntx) {
|
|||
BorrowedInterpreter interpreter{cntx};
|
||||
auto res = server_family_.script_mgr()->Insert(body, interpreter);
|
||||
if (!res)
|
||||
return rb->SendError(res.error().Format(), facade::kScriptErrType);
|
||||
return cntx->SendError(res.error().Format(), facade::kScriptErrType);
|
||||
|
||||
string sha{std::move(res.value())};
|
||||
|
||||
|
@ -2035,7 +2035,7 @@ void Service::Discard(CmdArgList args, ConnectionContext* cntx) {
|
|||
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||
|
||||
if (!cntx->conn_state.exec_info.IsCollecting()) {
|
||||
return rb->SendError("DISCARD without MULTI");
|
||||
return cntx->SendError("DISCARD without MULTI");
|
||||
}
|
||||
|
||||
MultiCleanup(cntx);
|
||||
|
@ -2146,15 +2146,15 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
// Check basic invariants
|
||||
if (!exec_info.IsCollecting()) {
|
||||
return rb->SendError("EXEC without MULTI");
|
||||
return cntx->SendError("EXEC without MULTI");
|
||||
}
|
||||
|
||||
if (IsWatchingOtherDbs(cntx->db_index(), exec_info)) {
|
||||
return rb->SendError("Dragonfly does not allow WATCH and EXEC on different databases");
|
||||
return cntx->SendError("Dragonfly does not allow WATCH and EXEC on different databases");
|
||||
}
|
||||
|
||||
if (exec_info.state == ConnectionState::ExecInfo::EXEC_ERROR) {
|
||||
return rb->SendError("-EXECABORT Transaction discarded because of previous errors");
|
||||
return cntx->SendError("-EXECABORT Transaction discarded because of previous errors");
|
||||
}
|
||||
|
||||
if (exec_info.watched_dirty.load(memory_order_relaxed)) {
|
||||
|
|
|
@ -146,7 +146,7 @@ void ScriptMgr::LoadCmd(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
auto res = Insert(body, interpreter);
|
||||
if (!res)
|
||||
return rb->SendError(res.error().Format());
|
||||
return cntx->SendError(res.error().Format());
|
||||
|
||||
// Schedule empty callback inorder to journal command via transaction framework.
|
||||
cntx->transaction->ScheduleSingleHop([](auto* t, auto* shard) { return OpStatus::OK; });
|
||||
|
|
|
@ -1407,7 +1407,7 @@ void ServerFamily::OnClose(ConnectionContext* cntx) {
|
|||
|
||||
void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext* cntx) {
|
||||
if (!section.empty()) {
|
||||
return cntx->reply_builder()->SendError("");
|
||||
return cntx->SendError("");
|
||||
}
|
||||
string info;
|
||||
|
||||
|
|
|
@ -1283,7 +1283,7 @@ void SRandMember(CmdArgList args, ConnectionContext* cntx) {
|
|||
rb->SendNull();
|
||||
}
|
||||
} else {
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1476,7 +1476,7 @@ void SScan(CmdArgList args, ConnectionContext* cntx) {
|
|||
rb->SendBulkString(k);
|
||||
}
|
||||
} else {
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -2402,7 +2402,7 @@ void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
return;
|
||||
}
|
||||
return rb->SendError(result.status());
|
||||
return cntx->SendError(result.status());
|
||||
} else if (sub_cmd == "STREAM") {
|
||||
int full = 0;
|
||||
size_t count = 10; // default count for xinfo streams
|
||||
|
@ -2560,7 +2560,7 @@ void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
return;
|
||||
}
|
||||
return rb->SendError(sinfo.status());
|
||||
return cntx->SendError(sinfo.status());
|
||||
} else if (sub_cmd == "CONSUMERS") {
|
||||
string_view stream_name = ArgS(args, 1);
|
||||
string_view group_name = ArgS(args, 2);
|
||||
|
@ -2584,12 +2584,12 @@ void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) {
|
|||
return;
|
||||
}
|
||||
if (result.status() == OpStatus::INVALID_VALUE) {
|
||||
return rb->SendError(NoGroupError(stream_name, group_name));
|
||||
return cntx->SendError(NoGroupError(stream_name, group_name));
|
||||
}
|
||||
return rb->SendError(result.status());
|
||||
return cntx->SendError(result.status());
|
||||
}
|
||||
}
|
||||
return rb->SendError(UnknownSubCmd(sub_cmd, "XINFO"));
|
||||
return cntx->SendError(UnknownSubCmd(sub_cmd, "XINFO"));
|
||||
}
|
||||
|
||||
void StreamFamily::XLen(CmdArgList args, ConnectionContext* cntx) {
|
||||
|
@ -2996,7 +2996,7 @@ std::optional<vector<RecordVec>> XReadImplSingleShard(ConnectionContext* cntx, R
|
|||
});
|
||||
|
||||
if (detailed_err.has_value()) {
|
||||
rb->SendError(*detailed_err);
|
||||
cntx->SendError(*detailed_err);
|
||||
return std::nullopt;
|
||||
}
|
||||
|
||||
|
@ -3043,7 +3043,7 @@ void XReadImpl(CmdArgList args, ReadOpts* opts, ConnectionContext* cntx) {
|
|||
auto has_entries = HasEntries(opts, *last_ids);
|
||||
if (!has_entries.has_value()) {
|
||||
cntx->transaction->Conclude();
|
||||
rb->SendError(has_entries.error());
|
||||
cntx->SendError(has_entries.error());
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -3221,7 +3221,7 @@ void StreamFamily::XRangeGeneric(CmdArgList args, bool is_rev, ConnectionContext
|
|||
if (result.status() == OpStatus::KEY_NOTFOUND) {
|
||||
return rb->SendEmptyArray();
|
||||
}
|
||||
return rb->SendError(result.status());
|
||||
return cntx->SendError(result.status());
|
||||
}
|
||||
|
||||
void StreamFamily::XAck(CmdArgList args, ConnectionContext* cntx) {
|
||||
|
|
|
@ -736,18 +736,18 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
|
|||
auto [opt, int_arg] = parser.Next<string_view, int64_t>();
|
||||
|
||||
if (auto err = parser.Error(); err) {
|
||||
return builder->SendError(err->MakeReply());
|
||||
return cntx->SendError(err->MakeReply());
|
||||
}
|
||||
|
||||
// We can set expiry only once.
|
||||
if (sparams.flags & SetCmd::SET_EXPIRE_AFTER_MS)
|
||||
return builder->SendError(kSyntaxErr);
|
||||
return cntx->SendError(kSyntaxErr);
|
||||
|
||||
sparams.flags |= SetCmd::SET_EXPIRE_AFTER_MS;
|
||||
|
||||
// Since PXAT/EXAT can change this, we need to check this ahead
|
||||
if (int_arg <= 0) {
|
||||
return builder->SendError(InvalidExpireTime("set"));
|
||||
return cntx->SendError(InvalidExpireTime("set"));
|
||||
}
|
||||
|
||||
bool is_ms = (opt[0] == 'P');
|
||||
|
@ -787,14 +787,14 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
if (auto err = parser.Error(); err) {
|
||||
return builder->SendError(err->MakeReply());
|
||||
return cntx->SendError(err->MakeReply());
|
||||
}
|
||||
|
||||
auto has_mask = [&](uint16_t m) { return (sparams.flags & m) == m; };
|
||||
|
||||
if (has_mask(SetCmd::SET_IF_EXISTS | SetCmd::SET_IF_NOTEXIST) ||
|
||||
has_mask(SetCmd::SET_KEEP_EXPIRE | SetCmd::SET_EXPIRE_AFTER_MS)) {
|
||||
return builder->SendError(kSyntaxErr);
|
||||
return cntx->SendError(kSyntaxErr);
|
||||
}
|
||||
|
||||
StringValue prev;
|
||||
|
@ -816,7 +816,7 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
if (result == OpStatus::OUT_OF_MEMORY) {
|
||||
return builder->SendError(kOutOfMemory);
|
||||
return cntx->SendError(kOutOfMemory);
|
||||
}
|
||||
|
||||
DCHECK_EQ(result, OpStatus::SKIPPED); // in case of NX option
|
||||
|
@ -851,7 +851,7 @@ void StringFamily::SetNx(CmdArgList args, ConnectionContext* cntx) {
|
|||
return builder->SendLong(1); // this means that we successfully set the value
|
||||
}
|
||||
if (results == OpStatus::OUT_OF_MEMORY) {
|
||||
return builder->SendError(kOutOfMemory);
|
||||
return cntx->SendError(kOutOfMemory);
|
||||
}
|
||||
CHECK_EQ(results, OpStatus::SKIPPED); // in this case it must be skipped!
|
||||
return builder->SendLong(0); // value do exists, we need to report that we didn't change it
|
||||
|
@ -1093,10 +1093,10 @@ void StringFamily::IncrByGeneric(string_view key, int64_t val, ConnectionContext
|
|||
builder->SendLong(result.value());
|
||||
break;
|
||||
case OpStatus::INVALID_VALUE:
|
||||
builder->SendError(kInvalidIntErr);
|
||||
cntx->SendError(kInvalidIntErr);
|
||||
break;
|
||||
case OpStatus::OUT_OF_RANGE:
|
||||
builder->SendError(kIncrOverflow);
|
||||
cntx->SendError(kIncrOverflow);
|
||||
break;
|
||||
case OpStatus::KEY_NOTFOUND: // Relevant only for MC
|
||||
reinterpret_cast<MCReplyBuilder*>(builder)->SendNotFound();
|
||||
|
|
|
@ -1357,7 +1357,7 @@ void BZPopMinMax(CmdArgList args, ConnectionContext* cntx, bool is_max) {
|
|||
DVLOG(1) << "result for " << transaction->DebugId() << " is " << popped_key.status();
|
||||
switch (popped_key.status()) {
|
||||
case OpStatus::WRONG_TYPE:
|
||||
return rb->SendError(kWrongTypeErr);
|
||||
return cntx->SendError(kWrongTypeErr);
|
||||
case OpStatus::CANCELLED:
|
||||
case OpStatus::TIMED_OUT:
|
||||
return rb->SendNullArray();
|
||||
|
@ -1784,7 +1784,7 @@ void ZAddGeneric(string_view key, const ZParams& zparams, ScoredMemberSpan memb_
|
|||
} else if (add_result.status() == OpStatus::SKIPPED) {
|
||||
rb->SendNull();
|
||||
} else if (add_result->is_nan) {
|
||||
rb->SendError(kScoreNaN);
|
||||
cntx->SendError(kScoreNaN);
|
||||
} else {
|
||||
if (zparams.flags & ZADD_IN_INCR) {
|
||||
rb->SendDouble(add_result->new_score);
|
||||
|
@ -2053,7 +2053,7 @@ void ZSetFamily::ZIncrBy(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
|
||||
if (add_result->is_nan) {
|
||||
return rb->SendError(kScoreNaN);
|
||||
return cntx->SendError(kScoreNaN);
|
||||
}
|
||||
|
||||
rb->SendDouble(add_result->new_score);
|
||||
|
@ -2239,7 +2239,7 @@ void ZSetFamily::ZRange(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
i += 2;
|
||||
} else {
|
||||
return cntx->reply_builder()->SendError(absl::StrCat("unsupported option ", cur_arg));
|
||||
return cntx->SendError(absl::StrCat("unsupported option ", cur_arg));
|
||||
}
|
||||
}
|
||||
ZRangeGeneric(std::move(args), range_params, cntx);
|
||||
|
@ -2260,7 +2260,7 @@ void ZSetFamily::ZRevRange(CmdArgList args, ConnectionContext* cntx) {
|
|||
if (cur_arg == "WITHSCORES") {
|
||||
range_params.with_scores = true;
|
||||
} else {
|
||||
return cntx->reply_builder()->SendError(absl::StrCat("unsupported option ", cur_arg));
|
||||
return cntx->SendError(absl::StrCat("unsupported option ", cur_arg));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2409,7 +2409,7 @@ void ZSetFamily::ZRandMember(CmdArgList args, ConnectionContext* cntx) {
|
|||
rb->SendNull();
|
||||
}
|
||||
} else {
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2424,7 +2424,7 @@ void ZSetFamily::ZScore(CmdArgList args, ConnectionContext* cntx) {
|
|||
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||
OpResult<double> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
if (result.status() == OpStatus::WRONG_TYPE) {
|
||||
rb->SendError(kWrongTypeErr);
|
||||
cntx->SendError(kWrongTypeErr);
|
||||
} else if (!result) {
|
||||
rb->SendNull();
|
||||
} else {
|
||||
|
@ -2481,7 +2481,7 @@ void ZSetFamily::ZScan(CmdArgList args, ConnectionContext* cntx) {
|
|||
rb->SendBulkString(k);
|
||||
}
|
||||
} else {
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -2576,7 +2576,7 @@ void ZSetFamily::ZRankGeneric(CmdArgList args, bool reverse, ConnectionContext*
|
|||
} else if (result.status() == OpStatus::KEY_NOTFOUND) {
|
||||
rb->SendNull();
|
||||
} else {
|
||||
rb->SendError(result.status());
|
||||
cntx->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue