mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
feat(set family): rewrite set store commands to journal (#691)
This commit is contained in:
parent
daf5473ce1
commit
1f5811fb78
4 changed files with 45 additions and 24 deletions
|
@ -194,7 +194,7 @@ bool ParseDouble(string_view src, double* value) {
|
|||
}
|
||||
|
||||
void OpArgs::RecordJournal(string_view cmd, ArgSlice args) const {
|
||||
tx->LogJournalOnShard(shard, make_pair(cmd, args));
|
||||
tx->LogJournalOnShard(shard, make_pair(cmd, args), 1);
|
||||
}
|
||||
|
||||
#define ADD(x) (x) += o.x
|
||||
|
|
|
@ -527,8 +527,8 @@ OpStatus NoOpCb(Transaction* t, EngineShard* shard) {
|
|||
};
|
||||
|
||||
// if overwrite is true then OpAdd writes vals into the key and discards its previous value.
|
||||
OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice vals,
|
||||
bool overwrite) {
|
||||
OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice vals, bool overwrite,
|
||||
bool journal_update) {
|
||||
auto* es = op_args.shard;
|
||||
auto& db_slice = es->db_slice();
|
||||
|
||||
|
@ -538,7 +538,9 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
|
|||
if (overwrite && vals.empty()) {
|
||||
auto it = db_slice.FindExt(op_args.db_cntx, key).first;
|
||||
db_slice.Del(op_args.db_cntx.db_index, it);
|
||||
|
||||
if (journal_update && op_args.shard->journal()) {
|
||||
op_args.RecordJournal("DEL"sv, ArgSlice{key});
|
||||
}
|
||||
return 0;
|
||||
}
|
||||
|
||||
|
@ -609,7 +611,13 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
|
|||
}
|
||||
|
||||
db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !new_key);
|
||||
|
||||
if (journal_update && op_args.shard->journal()) {
|
||||
op_args.RecordJournal("DEL"sv, ArgSlice{key});
|
||||
vector<string_view> mapped(vals.size() + 1);
|
||||
mapped[0] = key;
|
||||
std::copy(vals.begin(), vals.end(), mapped.begin() + 1);
|
||||
op_args.RecordJournal("SADD"sv, mapped);
|
||||
}
|
||||
return res;
|
||||
}
|
||||
|
||||
|
@ -658,7 +666,8 @@ OpResult<uint32_t> OpAddEx(const OpArgs& op_args, string_view key, uint32_t ttl_
|
|||
return res;
|
||||
}
|
||||
|
||||
OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, const ArgSlice& vals) {
|
||||
OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, const ArgSlice& vals,
|
||||
bool journal_rewrite) {
|
||||
auto* es = op_args.shard;
|
||||
auto& db_slice = es->db_slice();
|
||||
OpResult<PrimeIterator> find_res = db_slice.Find(op_args.db_cntx, key, OBJ_SET);
|
||||
|
@ -676,6 +685,12 @@ OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, const ArgSlice&
|
|||
if (isempty) {
|
||||
CHECK(db_slice.Del(op_args.db_cntx.db_index, find_res.value()));
|
||||
}
|
||||
if (journal_rewrite && op_args.shard->journal()) {
|
||||
vector<string_view> mapped(vals.size() + 1);
|
||||
mapped[0] = key;
|
||||
std::copy(vals.begin(), vals.end(), mapped.begin() + 1);
|
||||
op_args.RecordJournal("SREM"sv, mapped);
|
||||
}
|
||||
|
||||
return removed;
|
||||
}
|
||||
|
@ -685,8 +700,8 @@ OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, const ArgSlice&
|
|||
// and reports the result.
|
||||
class Mover {
|
||||
public:
|
||||
Mover(string_view src, string_view dest, string_view member)
|
||||
: src_(src), dest_(dest), member_(member) {
|
||||
Mover(string_view src, string_view dest, string_view member, bool journal_rewrite)
|
||||
: src_(src), dest_(dest), member_(member), journal_rewrite_(journal_rewrite) {
|
||||
}
|
||||
|
||||
void Find(Transaction* t);
|
||||
|
@ -698,6 +713,7 @@ class Mover {
|
|||
|
||||
string_view src_, dest_, member_;
|
||||
OpResult<bool> found_[2];
|
||||
bool journal_rewrite_;
|
||||
};
|
||||
|
||||
OpStatus Mover::OpFind(Transaction* t, EngineShard* es) {
|
||||
|
@ -729,10 +745,10 @@ OpStatus Mover::OpMutate(Transaction* t, EngineShard* es) {
|
|||
OpArgs op_args = t->GetOpArgs(es);
|
||||
for (auto k : largs) {
|
||||
if (k == src_) {
|
||||
CHECK_EQ(1u, OpRem(op_args, k, {member_}).value()); // must succeed.
|
||||
CHECK_EQ(1u, OpRem(op_args, k, {member_}, journal_rewrite_).value()); // must succeed.
|
||||
} else {
|
||||
DCHECK_EQ(k, dest_);
|
||||
OpAdd(op_args, k, {member_}, false);
|
||||
OpAdd(op_args, k, {member_}, false, journal_rewrite_);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -1036,7 +1052,7 @@ void SAdd(CmdArgList args, ConnectionContext* cntx) {
|
|||
ArgSlice arg_slice{vals.data(), vals.size()};
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpAdd(t->GetOpArgs(shard), key, arg_slice, false);
|
||||
return OpAdd(t->GetOpArgs(shard), key, arg_slice, false, false);
|
||||
};
|
||||
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
@ -1107,7 +1123,7 @@ void SMove(CmdArgList args, ConnectionContext* cntx) {
|
|||
string_view dest = ArgS(args, 2);
|
||||
string_view member = ArgS(args, 3);
|
||||
|
||||
Mover mover{src, dest, member};
|
||||
Mover mover{src, dest, member, true};
|
||||
cntx->transaction->Schedule();
|
||||
|
||||
mover.Find(cntx->transaction);
|
||||
|
@ -1130,7 +1146,7 @@ void SRem(CmdArgList args, ConnectionContext* cntx) {
|
|||
ArgSlice span{vals.data(), vals.size()};
|
||||
|
||||
auto cb = [&](Transaction* t, EngineShard* shard) {
|
||||
return OpRem(t->GetOpArgs(shard), key, span);
|
||||
return OpRem(t->GetOpArgs(shard), key, span, false);
|
||||
};
|
||||
OpResult<uint32_t> result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
|
||||
|
||||
|
@ -1276,7 +1292,7 @@ void SDiffStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
SvArray result = ToSvArray(rsv.value());
|
||||
auto store_cb = [&](Transaction* t, EngineShard* shard) {
|
||||
if (shard->shard_id() == dest_shard) {
|
||||
OpAdd(t->GetOpArgs(shard), dest_key, result, true);
|
||||
OpAdd(t->GetOpArgs(shard), dest_key, result, true, true);
|
||||
}
|
||||
|
||||
return OpStatus::OK;
|
||||
|
@ -1355,7 +1371,7 @@ void SInterStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
auto store_cb = [&](Transaction* t, EngineShard* shard) {
|
||||
if (shard->shard_id() == dest_shard) {
|
||||
OpAdd(t->GetOpArgs(shard), dest_key, result.value(), true);
|
||||
OpAdd(t->GetOpArgs(shard), dest_key, result.value(), true, true);
|
||||
}
|
||||
|
||||
return OpStatus::OK;
|
||||
|
@ -1419,7 +1435,7 @@ void SUnionStore(CmdArgList args, ConnectionContext* cntx) {
|
|||
|
||||
auto store_cb = [&](Transaction* t, EngineShard* shard) {
|
||||
if (shard->shard_id() == dest_shard) {
|
||||
OpAdd(t->GetOpArgs(shard), dest_key, result, true);
|
||||
OpAdd(t->GetOpArgs(shard), dest_key, result, true, true);
|
||||
}
|
||||
|
||||
return OpStatus::OK;
|
||||
|
@ -1551,18 +1567,21 @@ using CI = CommandId;
|
|||
void SetFamily::Register(CommandRegistry* registry) {
|
||||
*registry << CI{"SADD", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(SAdd)
|
||||
<< CI{"SDIFF", CO::READONLY, -2, 1, -1, 1}.HFUNC(SDiff)
|
||||
<< CI{"SDIFFSTORE", CO::WRITE | CO::DENYOOM, -3, 1, -1, 1}.HFUNC(SDiffStore)
|
||||
<< CI{"SDIFFSTORE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, -3, 1, -1, 1}.HFUNC(
|
||||
SDiffStore)
|
||||
<< CI{"SINTER", CO::READONLY, -2, 1, -1, 1}.HFUNC(SInter)
|
||||
<< CI{"SINTERSTORE", CO::WRITE | CO::DENYOOM, -3, 1, -1, 1}.HFUNC(SInterStore)
|
||||
<< CI{"SINTERSTORE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, -3, 1, -1, 1}.HFUNC(
|
||||
SInterStore)
|
||||
<< CI{"SMEMBERS", CO::READONLY, 2, 1, 1, 1}.HFUNC(SMembers)
|
||||
<< CI{"SISMEMBER", CO::FAST | CO::READONLY, 3, 1, 1, 1}.HFUNC(SIsMember)
|
||||
<< CI{"SMISMEMBER", CO::READONLY, -3, 1, 1, 1}.HFUNC(SMIsMember)
|
||||
<< CI{"SMOVE", CO::FAST | CO::WRITE, 4, 1, 2, 1}.HFUNC(SMove)
|
||||
<< CI{"SMOVE", CO::FAST | CO::WRITE | CO::NO_AUTOJOURNAL, 4, 1, 2, 1}.HFUNC(SMove)
|
||||
<< CI{"SREM", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(SRem)
|
||||
<< CI{"SCARD", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(SCard)
|
||||
<< CI{"SPOP", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, -2, 1, 1, 1}.HFUNC(SPop)
|
||||
<< CI{"SUNION", CO::READONLY, -2, 1, -1, 1}.HFUNC(SUnion)
|
||||
<< CI{"SUNIONSTORE", CO::WRITE | CO::DENYOOM, -3, 1, -1, 1}.HFUNC(SUnionStore)
|
||||
<< CI{"SUNIONSTORE", CO::WRITE | CO::DENYOOM | CO::NO_AUTOJOURNAL, -3, 1, -1, 1}.HFUNC(
|
||||
SUnionStore)
|
||||
<< CI{"SSCAN", CO::READONLY, -3, 1, 1, 1}.HFUNC(SScan);
|
||||
|
||||
if (absl::GetFlag(FLAGS_use_set2)) {
|
||||
|
|
|
@ -1239,14 +1239,15 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) {
|
|||
entry_payload = make_pair(cmd, ShardArgsInShard(shard->shard_id()));
|
||||
}
|
||||
|
||||
LogJournalOnShard(shard, std::move(entry_payload));
|
||||
LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_);
|
||||
}
|
||||
|
||||
void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload) const {
|
||||
void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload,
|
||||
uint32_t shard_cnt) const {
|
||||
auto journal = shard->journal();
|
||||
CHECK(journal);
|
||||
auto opcode = multi_ ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND;
|
||||
journal->RecordEntry(txid_, opcode, db_index_, unique_shard_cnt_, std::move(payload));
|
||||
journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, std::move(payload));
|
||||
}
|
||||
|
||||
void Transaction::BreakOnShutdown() {
|
||||
|
|
|
@ -194,7 +194,8 @@ class Transaction {
|
|||
}
|
||||
|
||||
// Log a journal entry on shard with payload.
|
||||
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload) const;
|
||||
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload,
|
||||
uint32_t shard_cnt) const;
|
||||
|
||||
private:
|
||||
struct LockCnt {
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue