mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 18:35:46 +02:00
chore: pass SinkReplyBuilder and Transaction explicitly. Part3 (#3966)
This commit is contained in:
parent
e24f697bb3
commit
bf42eb0330
3 changed files with 120 additions and 125 deletions
|
@ -28,25 +28,26 @@ using namespace facade;
|
|||
|
||||
namespace {
|
||||
|
||||
template <typename T> void HandleOpValueResult(const OpResult<T>& result, ConnectionContext* cntx) {
|
||||
template <typename T>
|
||||
void HandleOpValueResult(const OpResult<T>& result, SinkReplyBuilder* builder) {
|
||||
static_assert(std::is_integral<T>::value,
|
||||
"we are only handling types that are integral types in the return types from "
|
||||
"here");
|
||||
if (result) {
|
||||
cntx->SendLong(result.value());
|
||||
builder->SendLong(result.value());
|
||||
} else {
|
||||
switch (result.status()) {
|
||||
case OpStatus::WRONG_TYPE:
|
||||
cntx->SendError(kWrongTypeErr);
|
||||
builder->SendError(kWrongTypeErr);
|
||||
break;
|
||||
case OpStatus::OUT_OF_MEMORY:
|
||||
cntx->SendError(kOutOfMemory);
|
||||
builder->SendError(kOutOfMemory);
|
||||
break;
|
||||
case OpStatus::INVALID_VALUE:
|
||||
cntx->SendError(HllFamily::kInvalidHllErr);
|
||||
builder->SendError(HllFamily::kInvalidHllErr);
|
||||
break;
|
||||
default:
|
||||
cntx->SendLong(0); // in case we don't have the value we should just send 0
|
||||
builder->SendLong(0); // in case we don't have the value we should just send 0
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -124,7 +125,7 @@ OpResult<int> AddToHll(const OpArgs& op_args, string_view key, CmdArgList values
|
|||
return std::min(updated, 1);
|
||||
}
|
||||
|
||||
void PFAdd(CmdArgList args, ConnectionContext* cntx) {
|
||||
void PFAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
string_view key = ArgS(args, 0);
|
||||
args.remove_prefix(1);
|
||||
|
||||
|
@ -132,9 +133,8 @@ void PFAdd(CmdArgList args, ConnectionContext* cntx) {
|
|||
return AddToHll(t->GetOpArgs(shard), key, args);
|
||||
};
|
||||
|
||||
Transaction* trans = cntx->transaction;
|
||||
OpResult<int> res = trans->ScheduleSingleHopT(std::move(cb));
|
||||
HandleOpValueResult(res, cntx);
|
||||
OpResult<int> res = tx->ScheduleSingleHopT(std::move(cb));
|
||||
HandleOpValueResult(res, builder);
|
||||
}
|
||||
|
||||
OpResult<int64_t> CountHllsSingle(const OpArgs& op_args, string_view key) {
|
||||
|
@ -204,7 +204,7 @@ vector<HllBufferPtr> ConvertShardVector(const vector<vector<string>>& hlls) {
|
|||
return ptrs;
|
||||
}
|
||||
|
||||
OpResult<int64_t> PFCountMulti(CmdArgList args, ConnectionContext* cntx) {
|
||||
OpResult<int64_t> PFCountMulti(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
vector<vector<string>> hlls;
|
||||
hlls.resize(shard_set->size());
|
||||
|
||||
|
@ -218,8 +218,7 @@ OpResult<int64_t> PFCountMulti(CmdArgList args, ConnectionContext* cntx) {
|
|||
return result.status();
|
||||
};
|
||||
|
||||
Transaction* trans = cntx->transaction;
|
||||
trans->ScheduleSingleHop(std::move(cb));
|
||||
tx->ScheduleSingleHop(std::move(cb));
|
||||
|
||||
vector<HllBufferPtr> ptrs = ConvertShardVector(hlls);
|
||||
int64_t pf_count = pfcountMulti(ptrs.data(), ptrs.size());
|
||||
|
@ -230,22 +229,21 @@ OpResult<int64_t> PFCountMulti(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
}
|
||||
|
||||
void PFCount(CmdArgList args, ConnectionContext* cntx) {
|
||||
void PFCount(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
if (args.size() == 1) {
|
||||
string_view key = ArgS(args, 0);
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return CountHllsSingle(t->GetOpArgs(shard), key);
|
||||
};
|
||||
|
||||
Transaction* trans = cntx->transaction;
|
||||
OpResult<int64_t> res = trans->ScheduleSingleHopT(std::move(cb));
|
||||
HandleOpValueResult(res, cntx);
|
||||
OpResult<int64_t> res = tx->ScheduleSingleHopT(std::move(cb));
|
||||
HandleOpValueResult(res, builder);
|
||||
} else {
|
||||
HandleOpValueResult(PFCountMulti(args, cntx), cntx);
|
||||
HandleOpValueResult(PFCountMulti(args, tx, builder), builder);
|
||||
}
|
||||
}
|
||||
|
||||
OpResult<int> PFMergeInternal(CmdArgList args, ConnectionContext* cntx) {
|
||||
OpResult<int> PFMergeInternal(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
vector<vector<string>> hlls;
|
||||
hlls.resize(shard_set->size());
|
||||
|
||||
|
@ -262,11 +260,10 @@ OpResult<int> PFMergeInternal(CmdArgList args, ConnectionContext* cntx) {
|
|||
return result.status();
|
||||
};
|
||||
|
||||
Transaction* trans = cntx->transaction;
|
||||
trans->Execute(std::move(cb), false);
|
||||
tx->Execute(std::move(cb), false);
|
||||
|
||||
if (!success) {
|
||||
trans->Conclude();
|
||||
tx->Conclude();
|
||||
return OpStatus::INVALID_VALUE;
|
||||
}
|
||||
|
||||
|
@ -292,21 +289,21 @@ OpResult<int> PFMergeInternal(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
return OpStatus::OK;
|
||||
};
|
||||
trans->Execute(std::move(set_cb), true);
|
||||
tx->Execute(std::move(set_cb), true);
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
void PFMerge(CmdArgList args, ConnectionContext* cntx) {
|
||||
OpResult<int> result = PFMergeInternal(args, cntx);
|
||||
void PFMerge(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
OpResult<int> result = PFMergeInternal(args, tx, builder);
|
||||
if (result.ok()) {
|
||||
if (result.value() == 0) {
|
||||
cntx->SendOk();
|
||||
builder->SendOk();
|
||||
} else {
|
||||
cntx->SendError(HllFamily::kInvalidHllErr);
|
||||
builder->SendError(HllFamily::kInvalidHllErr);
|
||||
}
|
||||
} else {
|
||||
HandleOpValueResult(result, cntx);
|
||||
HandleOpValueResult(result, builder);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
@ -950,6 +950,17 @@ OpResult<StringVec> OpScan(const OpArgs& op_args, string_view key, uint64_t* cur
|
|||
return res;
|
||||
}
|
||||
|
||||
void SendNumeric(OpResult<uint32_t> result, SinkReplyBuilder* builder) {
|
||||
switch (result.status()) {
|
||||
case OpStatus::OK:
|
||||
return builder->SendLong(result.value());
|
||||
case OpStatus::WRONG_TYPE:
|
||||
return builder->SendError(kWrongTypeErr);
|
||||
default:
|
||||
return builder->SendLong(0);
|
||||
}
|
||||
}
|
||||
|
||||
struct SetReplies {
|
||||
SetReplies(ConnectionContext* cntx)
|
||||
: rb{static_cast<RedisReplyBuilder*>(cntx->reply_builder())},
|
||||
|
@ -957,17 +968,6 @@ struct SetReplies {
|
|||
DCHECK(dynamic_cast<RedisReplyBuilder*>(cntx->reply_builder()));
|
||||
}
|
||||
|
||||
void SendNumeric(OpResult<uint32_t> result) const {
|
||||
switch (result.status()) {
|
||||
case OpStatus::OK:
|
||||
return rb->SendLong(result.value());
|
||||
case OpStatus::WRONG_TYPE:
|
||||
return rb->SendError(kWrongTypeErr);
|
||||
default:
|
||||
return rb->SendLong(0);
|
||||
}
|
||||
}
|
||||
|
||||
template <typename T> void Send(vector<T>* sv) {
|
||||
if (script) // output is sorted under scripts
|
||||
sort(sv->begin(), sv->end());
|
||||
|
@ -987,7 +987,7 @@ struct SetReplies {
|
|||
bool script;
|
||||
};
|
||||
|
||||
void SAdd(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SAdd(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
string_view key = ArgS(args, 0);
|
||||
auto values = args.subspan(1);
|
||||
|
||||
|
@ -995,15 +995,15 @@ void SAdd(CmdArgList args, ConnectionContext* cntx) {
|
|||
return OpAdd(t->GetOpArgs(shard), key, values, false, false);
|
||||
};
|
||||
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb));
|
||||
if (result) {
|
||||
return cntx->SendLong(result.value());
|
||||
return builder->SendLong(result.value());
|
||||
}
|
||||
|
||||
cntx->SendError(result.status());
|
||||
builder->SendError(result.status());
|
||||
}
|
||||
|
||||
void SIsMember(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SIsMember(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
string_view key = ArgS(args, 0);
|
||||
string_view val = ArgS(args, 1);
|
||||
|
||||
|
@ -1018,11 +1018,11 @@ void SIsMember(CmdArgList args, ConnectionContext* cntx) {
|
|||
return find_res.status();
|
||||
};
|
||||
|
||||
OpResult<void> result = cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
SetReplies{cntx}.SendNumeric(result ? OpResult<uint32_t>(1) : result.status());
|
||||
OpResult<void> result = tx->ScheduleSingleHop(std::move(cb));
|
||||
SendNumeric(result ? OpResult<uint32_t>(1) : result.status(), builder);
|
||||
}
|
||||
|
||||
void SMIsMember(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SMIsMember(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
string_view key = ArgS(args, 0);
|
||||
auto members = args.subspan(1);
|
||||
|
||||
|
@ -1043,35 +1043,35 @@ void SMIsMember(CmdArgList args, ConnectionContext* cntx) {
|
|||
return find_res.status();
|
||||
};
|
||||
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||
OpResult<void> result = cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(builder);
|
||||
OpResult<void> result = tx->ScheduleSingleHop(std::move(cb));
|
||||
if (result || result == OpStatus::KEY_NOTFOUND) {
|
||||
rb->StartArray(memberships.size());
|
||||
for (bool b : memberships)
|
||||
rb->SendLong(int(b));
|
||||
return;
|
||||
}
|
||||
cntx->SendError(result.status());
|
||||
builder->SendError(result.status());
|
||||
}
|
||||
|
||||
void SMove(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SMove(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
string_view src = ArgS(args, 0);
|
||||
string_view dest = ArgS(args, 1);
|
||||
string_view member = ArgS(args, 2);
|
||||
|
||||
Mover mover{src, dest, member, true};
|
||||
mover.Find(cntx->transaction);
|
||||
mover.Find(tx);
|
||||
|
||||
OpResult<unsigned> result = mover.Commit(cntx->transaction);
|
||||
OpResult<unsigned> result = mover.Commit(tx);
|
||||
if (!result) {
|
||||
return cntx->SendError(result.status());
|
||||
return builder->SendError(result.status());
|
||||
return;
|
||||
}
|
||||
|
||||
cntx->SendLong(result.value());
|
||||
builder->SendLong(result.value());
|
||||
}
|
||||
|
||||
void SRem(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SRem(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
string_view key = ArgS(args, 0);
|
||||
auto vals = args.subspan(1);
|
||||
|
||||
|
@ -1079,11 +1079,11 @@ void SRem(CmdArgList args, ConnectionContext* cntx) {
|
|||
return OpRem(t->GetOpArgs(shard), key, vals, false);
|
||||
};
|
||||
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
SetReplies{cntx}.SendNumeric(result);
|
||||
OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb));
|
||||
SendNumeric(result, builder);
|
||||
}
|
||||
|
||||
void SCard(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SCard(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
string_view key = ArgS(args, 0);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<uint32_t> {
|
||||
|
@ -1095,17 +1095,17 @@ void SCard(CmdArgList args, ConnectionContext* cntx) {
|
|||
return find_res.value()->second.Size();
|
||||
};
|
||||
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
SetReplies{cntx}.SendNumeric(result);
|
||||
OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb));
|
||||
SendNumeric(result, builder);
|
||||
}
|
||||
|
||||
void SPop(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SPop(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
string_view key = ArgS(args, 0);
|
||||
unsigned count = 1;
|
||||
if (args.size() > 1) {
|
||||
string_view arg = ArgS(args, 1);
|
||||
if (!absl::SimpleAtoi(arg, &count)) {
|
||||
cntx->SendError(kInvalidIntErr);
|
||||
builder->SendError(kInvalidIntErr);
|
||||
return;
|
||||
}
|
||||
}
|
||||
|
@ -1114,8 +1114,8 @@ void SPop(CmdArgList args, ConnectionContext* cntx) {
|
|||
return OpPop(t->GetOpArgs(shard), key, count);
|
||||
};
|
||||
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||
OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(builder);
|
||||
OpResult<StringVec> result = tx->ScheduleSingleHopT(std::move(cb));
|
||||
if (result || result.status() == OpStatus::KEY_NOTFOUND) {
|
||||
if (args.size() == 1) { // SPOP key
|
||||
if (result.status() == OpStatus::KEY_NOTFOUND) {
|
||||
|
@ -1130,10 +1130,10 @@ void SPop(CmdArgList args, ConnectionContext* cntx) {
|
|||
return;
|
||||
}
|
||||
|
||||
cntx->SendError(result.status());
|
||||
builder->SendError(result.status());
|
||||
}
|
||||
|
||||
void SDiff(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SDiff(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) {
|
||||
ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED);
|
||||
string_view src_key = ArgS(args, 0);
|
||||
ShardId src_shard = Shard(src_key, result_set.size());
|
||||
|
@ -1150,12 +1150,12 @@ void SDiff(CmdArgList args, ConnectionContext* cntx) {
|
|||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
tx->ScheduleSingleHop(std::move(cb));
|
||||
ResultSetView rsv = DiffResultVec(result_set, src_shard);
|
||||
SetReplies{cntx}.Send(rsv);
|
||||
}
|
||||
|
||||
void SDiffStore(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SDiffStore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED);
|
||||
string_view dest_key = ArgS(args, 0);
|
||||
ShardId dest_shard = Shard(dest_key, result_set.size());
|
||||
|
@ -1188,11 +1188,11 @@ void SDiffStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
cntx->transaction->Execute(std::move(diff_cb), false);
|
||||
tx->Execute(std::move(diff_cb), false);
|
||||
ResultSetView rsv = DiffResultVec(result_set, src_shard);
|
||||
if (!rsv) {
|
||||
cntx->transaction->Conclude();
|
||||
cntx->SendError(rsv.status());
|
||||
tx->Conclude();
|
||||
builder->SendError(rsv.status());
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1205,23 +1205,24 @@ void SDiffStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
cntx->transaction->Execute(std::move(store_cb), true);
|
||||
cntx->SendLong(result_size);
|
||||
tx->Execute(std::move(store_cb), true);
|
||||
builder->SendLong(result_size);
|
||||
}
|
||||
|
||||
void SMembers(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SMembers(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder,
|
||||
ConnectionContext* cntx) {
|
||||
auto cb = [](Transaction* t, EngineShard* shard) { return OpInter(t, shard, false); };
|
||||
|
||||
OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
OpResult<StringVec> result = tx->ScheduleSingleHopT(std::move(cb));
|
||||
|
||||
if (result || result.status() == OpStatus::KEY_NOTFOUND) {
|
||||
SetReplies{cntx}.Send(&result.value());
|
||||
} else {
|
||||
cntx->SendError(result.status());
|
||||
builder->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
void SRandMember(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SRandMember(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
CmdArgParser parser{args};
|
||||
string_view key = parser.Next();
|
||||
|
||||
|
@ -1229,17 +1230,17 @@ void SRandMember(CmdArgList args, ConnectionContext* cntx) {
|
|||
int count = is_count ? parser.Next<int>() : 1;
|
||||
|
||||
if (parser.HasNext())
|
||||
return cntx->SendError(WrongNumArgsError("SRANDMEMBER"));
|
||||
return builder->SendError(WrongNumArgsError("SRANDMEMBER"));
|
||||
|
||||
if (auto err = parser.Error(); err)
|
||||
return cntx->SendError(err->MakeReply());
|
||||
return builder->SendError(err->MakeReply());
|
||||
|
||||
const auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult<StringVec> {
|
||||
return OpRandMember(t->GetOpArgs(shard), key, count);
|
||||
};
|
||||
|
||||
OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(cb);
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||
OpResult<StringVec> result = tx->ScheduleSingleHopT(cb);
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(builder);
|
||||
if (result || result == OpStatus::KEY_NOTFOUND) {
|
||||
if (is_count) {
|
||||
rb->SendStringArr(*result, RedisReplyBuilder::SET);
|
||||
|
@ -1250,10 +1251,10 @@ void SRandMember(CmdArgList args, ConnectionContext* cntx) {
|
|||
}
|
||||
return;
|
||||
}
|
||||
cntx->SendError(result.status());
|
||||
builder->SendError(result.status());
|
||||
}
|
||||
|
||||
void SInter(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SInter(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) {
|
||||
ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED);
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
|
@ -1262,16 +1263,16 @@ void SInter(CmdArgList args, ConnectionContext* cntx) {
|
|||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
OpResult<SvArray> result = InterResultVec(result_set, cntx->transaction->GetUniqueShardCnt());
|
||||
tx->ScheduleSingleHop(std::move(cb));
|
||||
OpResult<SvArray> result = InterResultVec(result_set, tx->GetUniqueShardCnt());
|
||||
if (result) {
|
||||
SetReplies{cntx}.Send(&*result);
|
||||
} else {
|
||||
cntx->SendError(result.status());
|
||||
builder->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
void SInterStore(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SInterStore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED);
|
||||
string_view dest_key = ArgS(args, 0);
|
||||
ShardId dest_shard = Shard(dest_key, result_set.size());
|
||||
|
@ -1289,12 +1290,12 @@ void SInterStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
cntx->transaction->Execute(std::move(inter_cb), false);
|
||||
tx->Execute(std::move(inter_cb), false);
|
||||
|
||||
OpResult<SvArray> result = InterResultVec(result_set, inter_shard_cnt.load(memory_order_relaxed));
|
||||
if (!result) {
|
||||
cntx->transaction->Conclude();
|
||||
cntx->SendError(result.status());
|
||||
tx->Conclude();
|
||||
builder->SendError(result.status());
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1306,21 +1307,21 @@ void SInterStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
cntx->transaction->Execute(std::move(store_cb), true);
|
||||
cntx->SendLong(result->size());
|
||||
tx->Execute(std::move(store_cb), true);
|
||||
builder->SendLong(result->size());
|
||||
}
|
||||
|
||||
void SInterCard(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SInterCard(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
unsigned num_keys;
|
||||
if (!absl::SimpleAtoi(ArgS(args, 0), &num_keys))
|
||||
return cntx->SendError(kSyntaxErr);
|
||||
return builder->SendError(kSyntaxErr);
|
||||
|
||||
unsigned limit = 0;
|
||||
if (args.size() == (num_keys + 3) && ArgS(args, 1 + num_keys) == "LIMIT") {
|
||||
if (!absl::SimpleAtoi(ArgS(args, num_keys + 2), &limit))
|
||||
return cntx->SendError("limit can't be negative");
|
||||
return builder->SendError("limit can't be negative");
|
||||
} else if (args.size() > (num_keys + 1))
|
||||
return cntx->SendError(kSyntaxErr);
|
||||
return builder->SendError(kSyntaxErr);
|
||||
|
||||
ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED);
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
|
@ -1328,17 +1329,16 @@ void SInterCard(CmdArgList args, ConnectionContext* cntx) {
|
|||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
OpResult<SvArray> result =
|
||||
InterResultVec(result_set, cntx->transaction->GetUniqueShardCnt(), limit);
|
||||
tx->ScheduleSingleHop(std::move(cb));
|
||||
OpResult<SvArray> result = InterResultVec(result_set, tx->GetUniqueShardCnt(), limit);
|
||||
|
||||
if (result) {
|
||||
return cntx->SendLong(result->size());
|
||||
return builder->SendLong(result->size());
|
||||
}
|
||||
cntx->SendError(result.status());
|
||||
builder->SendError(result.status());
|
||||
}
|
||||
|
||||
void SUnion(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SUnion(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder, ConnectionContext* cntx) {
|
||||
ResultStringVec result_set(shard_set->size());
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
|
@ -1347,13 +1347,13 @@ void SUnion(CmdArgList args, ConnectionContext* cntx) {
|
|||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
cntx->transaction->ScheduleSingleHop(std::move(cb));
|
||||
tx->ScheduleSingleHop(std::move(cb));
|
||||
|
||||
ResultSetView unionset = UnionResultVec(result_set);
|
||||
SetReplies{cntx}.Send(unionset);
|
||||
}
|
||||
|
||||
void SUnionStore(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SUnionStore(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
ResultStringVec result_set(shard_set->size(), OpStatus::SKIPPED);
|
||||
string_view dest_key = ArgS(args, 0);
|
||||
ShardId dest_shard = Shard(dest_key, result_set.size());
|
||||
|
@ -1371,12 +1371,12 @@ void SUnionStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
cntx->transaction->Execute(std::move(union_cb), false);
|
||||
tx->Execute(std::move(union_cb), false);
|
||||
|
||||
ResultSetView unionset = UnionResultVec(result_set);
|
||||
if (!unionset) {
|
||||
cntx->transaction->Conclude();
|
||||
cntx->SendError(unionset.status());
|
||||
tx->Conclude();
|
||||
builder->SendError(unionset.status());
|
||||
return;
|
||||
}
|
||||
|
||||
|
@ -1389,30 +1389,30 @@ void SUnionStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
return OpStatus::OK;
|
||||
};
|
||||
|
||||
cntx->transaction->Execute(std::move(store_cb), true);
|
||||
cntx->SendLong(result_size);
|
||||
tx->Execute(std::move(store_cb), true);
|
||||
builder->SendLong(result_size);
|
||||
}
|
||||
|
||||
void SScan(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SScan(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
string_view key = ArgS(args, 0);
|
||||
string_view token = ArgS(args, 1);
|
||||
|
||||
uint64_t cursor = 0;
|
||||
|
||||
if (!absl::SimpleAtoi(token, &cursor)) {
|
||||
return cntx->SendError("invalid cursor");
|
||||
return builder->SendError("invalid cursor");
|
||||
}
|
||||
|
||||
// SSCAN key cursor [MATCH pattern] [COUNT count]
|
||||
if (args.size() > 6) {
|
||||
DVLOG(1) << "got " << args.size() << " this is more than it should be";
|
||||
return cntx->SendError(kSyntaxErr);
|
||||
return builder->SendError(kSyntaxErr);
|
||||
}
|
||||
|
||||
OpResult<ScanOpts> ops = ScanOpts::TryFrom(args.subspan(2));
|
||||
if (!ops) {
|
||||
DVLOG(1) << "SScan invalid args - return " << ops << " to the user";
|
||||
return cntx->SendError(ops.status());
|
||||
return builder->SendError(ops.status());
|
||||
}
|
||||
|
||||
ScanOpts scan_op = ops.value();
|
||||
|
@ -1421,8 +1421,8 @@ void SScan(CmdArgList args, ConnectionContext* cntx) {
|
|||
return OpScan(t->GetOpArgs(shard), key, &cursor, scan_op);
|
||||
};
|
||||
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
|
||||
OpResult<StringVec> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
auto* rb = static_cast<RedisReplyBuilder*>(builder);
|
||||
OpResult<StringVec> result = tx->ScheduleSingleHopT(std::move(cb));
|
||||
if (result.status() != OpStatus::WRONG_TYPE) {
|
||||
rb->StartArray(2);
|
||||
rb->SendBulkString(absl::StrCat(cursor));
|
||||
|
@ -1431,19 +1431,19 @@ void SScan(CmdArgList args, ConnectionContext* cntx) {
|
|||
rb->SendBulkString(k);
|
||||
}
|
||||
} else {
|
||||
cntx->SendError(result.status());
|
||||
builder->SendError(result.status());
|
||||
}
|
||||
}
|
||||
|
||||
// Syntax: saddex key ttl_sec member [member...]
|
||||
void SAddEx(CmdArgList args, ConnectionContext* cntx) {
|
||||
void SAddEx(CmdArgList args, Transaction* tx, SinkReplyBuilder* builder) {
|
||||
string_view key = ArgS(args, 0);
|
||||
string_view ttl_str = ArgS(args, 1);
|
||||
uint32_t ttl_sec;
|
||||
constexpr uint32_t kMaxTtl = (1UL << 26);
|
||||
|
||||
if (!absl::SimpleAtoi(ttl_str, &ttl_sec) || ttl_sec == 0 || ttl_sec > kMaxTtl) {
|
||||
return cntx->SendError(kInvalidIntErr);
|
||||
return builder->SendError(kInvalidIntErr);
|
||||
}
|
||||
|
||||
auto vals = args.subspan(2);
|
||||
|
@ -1451,12 +1451,12 @@ void SAddEx(CmdArgList args, ConnectionContext* cntx) {
|
|||
return OpAddEx(t->GetOpArgs(shard), key, ttl_sec, vals);
|
||||
};
|
||||
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
OpResult<uint32_t> result = tx->ScheduleSingleHopT(std::move(cb));
|
||||
if (result) {
|
||||
return cntx->SendLong(result.value());
|
||||
return builder->SendLong(result.value());
|
||||
}
|
||||
|
||||
cntx->SendError(result.status());
|
||||
builder->SendError(result.status());
|
||||
}
|
||||
|
||||
} // namespace
|
||||
|
|
|
@ -5,8 +5,8 @@
|
|||
#pragma once
|
||||
|
||||
#include "facade/op_status.h"
|
||||
#include "server/common.h"
|
||||
#include "server/table.h"
|
||||
#include "server/tx_base.h"
|
||||
|
||||
typedef struct intset intset;
|
||||
|
||||
|
@ -14,9 +14,7 @@ namespace dfly {
|
|||
|
||||
using facade::OpResult;
|
||||
|
||||
class ConnectionContext;
|
||||
class CommandRegistry;
|
||||
class EngineShard;
|
||||
class StringSet;
|
||||
|
||||
class SetFamily {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue