diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index e5e9f129f..94d661bd2 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -16,7 +16,7 @@ endif() add_library(dfly_transaction db_slice.cc malloc_stats.cc blocking_controller.cc - command_registry.cc + command_registry.cc cluster/unique_slot_checker.cc common.cc journal/journal.cc journal/types.cc journal/journal_slice.cc server_state.cc table.cc top_keys.cc transaction.cc serializer_commons.cc journal/serializer.cc journal/executor.cc journal/streamer.cc diff --git a/src/server/cluster/cluster_config.h b/src/server/cluster/cluster_config.h index 87b55286c..ed6da436b 100644 --- a/src/server/cluster/cluster_config.h +++ b/src/server/cluster/cluster_config.h @@ -24,6 +24,7 @@ using SlotSet = absl::flat_hash_set; class ClusterConfig { public: static constexpr SlotId kMaxSlotNum = 0x3FFF; + static constexpr SlotId kInvalidSlotId = kMaxSlotNum + 1; struct Node { std::string id; diff --git a/src/server/cluster/cluster_family.cc b/src/server/cluster/cluster_family.cc index 5579227d9..ed68bd2d2 100644 --- a/src/server/cluster/cluster_family.cc +++ b/src/server/cluster/cluster_family.cc @@ -478,9 +478,10 @@ void WriteFlushSlotsToJournal(const SlotSet& slots) { } // Send journal entry + // TODO: Break slot migration upon FLUSHSLOTS journal->RecordEntry(/* txid= */ 0, journal::Op::COMMAND, /* dbid= */ 0, - /* shard_cnt= */ shard_set->size(), make_pair("DFLYCLUSTER", args_view), - false); + /* shard_cnt= */ shard_set->size(), nullopt, + make_pair("DFLYCLUSTER", args_view), false); }; shard_set->pool()->AwaitFiberOnAll(std::move(cb)); } diff --git a/src/server/cluster/unique_slot_checker.cc b/src/server/cluster/unique_slot_checker.cc new file mode 100644 index 000000000..156df23d3 --- /dev/null +++ b/src/server/cluster/unique_slot_checker.cc @@ -0,0 +1,38 @@ +#include "server/cluster/unique_slot_checker.h" + +using namespace std; + +namespace dfly { + +void UniqueSlotChecker::Add(std::string_view key) { + if (!ClusterConfig::IsEnabled()) { + return; + } + + Add(ClusterConfig::KeySlot(key)); +} + +void UniqueSlotChecker::Add(SlotId slot_id) { + if (!ClusterConfig::IsEnabled()) { + return; + } + + if (!slot_id_.has_value()) { + slot_id_ = slot_id; + return; + } + + if (*slot_id_ != slot_id) { + slot_id_ = ClusterConfig::kInvalidSlotId; + } +} + +optional UniqueSlotChecker::GetUniqueSlotId() const { + if (slot_id_.has_value() && *slot_id_ == ClusterConfig::kInvalidSlotId) { + return nullopt; + } + + return slot_id_; +} + +} // namespace dfly diff --git a/src/server/cluster/unique_slot_checker.h b/src/server/cluster/unique_slot_checker.h new file mode 100644 index 000000000..1a8740b0f --- /dev/null +++ b/src/server/cluster/unique_slot_checker.h @@ -0,0 +1,27 @@ +// Copyright 2023, DragonflyDB authors. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include +#include + +#include "server/cluster/cluster_config.h" + +namespace dfly { + +// A simple utility class that "aggregates" SlotId-s and can tell whether all inputs were the same. +// Only works when cluster is enabled. +class UniqueSlotChecker { + public: + void Add(std::string_view key); + void Add(SlotId slot_id); + + std::optional GetUniqueSlotId() const; + + private: + std::optional slot_id_; +}; + +} // namespace dfly diff --git a/src/server/common.cc b/src/server/common.cc index 3de826c4b..b9a42b23e 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -220,13 +220,14 @@ void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt) { void RecordExpiry(DbIndex dbid, string_view key) { auto journal = EngineShard::tlocal()->journal(); CHECK(journal); - journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, make_pair("DEL", ArgSlice{key}), false); + journal->RecordEntry(0, journal::Op::EXPIRED, dbid, 1, ClusterConfig::KeySlot(key), + make_pair("DEL", ArgSlice{key}), false); } void TriggerJournalWriteToSink() { auto journal = EngineShard::tlocal()->journal(); CHECK(journal); - journal->RecordEntry(0, journal::Op::NOOP, 0, 0, {}, true); + journal->RecordEntry(0, journal::Op::NOOP, 0, 0, nullopt, {}, true); } #define ADD(x) (x) += o.x diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index cbc1c1f9c..c915ef265 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -1279,12 +1279,11 @@ void DbSlice::FreeMemWithEvictionStep(DbIndex db_ind, size_t increase_goal_bytes finish: // send the deletion to the replicas. // fiber preemption could happen in this phase. - vector args(keys_to_journal.begin(), keys_to_journal.end()); - if (!args.empty()) { - ArgSlice delete_args(&args[0], args.size()); - if (auto journal = owner_->journal(); journal) { - journal->RecordEntry(0, journal::Op::EXPIRED, db_ind, 1, make_pair("DEL", delete_args), - false); + if (auto journal = owner_->journal(); journal) { + for (string_view key : keys_to_journal) { + ArgSlice delete_args(&key, 1); + journal->RecordEntry(0, journal::Op::EXPIRED, db_ind, 1, ClusterConfig::KeySlot(key), + make_pair("DEL", delete_args), false); } } diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index e5af8137f..e28318ded 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -452,7 +452,7 @@ void DflyCmd::TakeOver(CmdArgList args, ConnectionContext* cntx) { &status](EngineShard* shard) { FlowInfo* flow = &replica_ptr->flows[shard->shard_id()]; - shard->journal()->RecordEntry(0, journal::Op::PING, 0, 0, {}, true); + shard->journal()->RecordEntry(0, journal::Op::PING, 0, 0, nullopt, {}, true); while (flow->last_acked_lsn < shard->journal()->GetLsn()) { if (absl::Now() - start > timeout_dur) { LOG(WARNING) << "Couldn't synchronize with replica for takeover in time: " diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 48a2ce9d1..3a441381f 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -119,8 +119,8 @@ bool Journal::EnterLameDuck() { } void Journal::RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, - Entry::Payload payload, bool await) { - journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, std::move(payload)}, await); + std::optional slot, Entry::Payload payload, bool await) { + journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, slot, std::move(payload)}, await); } /* diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index 3d7d024f7..623ce49c2 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -57,8 +57,8 @@ class Journal { */ LSN GetLsn() const; - void RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, Entry::Payload payload, - bool await); + void RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, + std::optional slot, Entry::Payload payload, bool await); private: mutable Mutex state_mu_; diff --git a/src/server/journal/journal_slice.cc b/src/server/journal/journal_slice.cc index 452a88db5..585af26ec 100644 --- a/src/server/journal/journal_slice.cc +++ b/src/server/journal/journal_slice.cc @@ -151,6 +151,7 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) { item->lsn = -1; item->opcode = entry.opcode; item->data = ""; + item->slot = entry.slot; } else { FiberAtomicGuard fg; // GetTail gives a pointer to a new tail entry in the buffer, possibly overriding the last entry @@ -158,6 +159,7 @@ void JournalSlice::AddLogRecord(const Entry& entry, bool await) { item = ring_buffer_->GetTail(true); item->opcode = entry.opcode; item->lsn = lsn_++; + item->slot = entry.slot; io::BufSink buf_sink{&ring_serialize_buf_}; JournalWriter writer{&buf_sink}; diff --git a/src/server/journal/journal_test.cc b/src/server/journal/journal_test.cc index 1c31c448c..6f24790e1 100644 --- a/src/server/journal/journal_test.cc +++ b/src/server/journal/journal_test.cc @@ -98,15 +98,15 @@ TEST(Journal, WriteRead) { auto list = [v = &lists](auto... ss) { return StoreList(v, ss...); }; std::vector test_entries = { - {0, journal::Op::COMMAND, 0, 2, make_pair("MSET", slice("A", "1", "B", "2"))}, - {0, journal::Op::COMMAND, 0, 2, make_pair("MSET", slice("C", "3"))}, - {1, journal::Op::COMMAND, 0, 2, make_pair("DEL", list("A", "B"))}, - {2, journal::Op::COMMAND, 1, 1, make_pair("LPUSH", list("l", "v1", "v2"))}, - {3, journal::Op::COMMAND, 0, 1, make_pair("MSET", slice("D", "4"))}, - {4, journal::Op::COMMAND, 1, 1, make_pair("DEL", list("l1"))}, - {5, journal::Op::COMMAND, 2, 1, make_pair("DEL", list("E", "2"))}, - {6, journal::Op::MULTI_COMMAND, 2, 1, make_pair("SET", list("E", "2"))}, - {6, journal::Op::EXEC, 2, 1}}; + {0, journal::Op::COMMAND, 0, 2, nullopt, make_pair("MSET", slice("A", "1", "B", "2"))}, + {0, journal::Op::COMMAND, 0, 2, nullopt, make_pair("MSET", slice("C", "3"))}, + {1, journal::Op::COMMAND, 0, 2, nullopt, make_pair("DEL", list("A", "B"))}, + {2, journal::Op::COMMAND, 1, 1, nullopt, make_pair("LPUSH", list("l", "v1", "v2"))}, + {3, journal::Op::COMMAND, 0, 1, nullopt, make_pair("MSET", slice("D", "4"))}, + {4, journal::Op::COMMAND, 1, 1, nullopt, make_pair("DEL", list("l1"))}, + {5, journal::Op::COMMAND, 2, 1, nullopt, make_pair("DEL", list("E", "2"))}, + {6, journal::Op::MULTI_COMMAND, 2, 1, nullopt, make_pair("SET", list("E", "2"))}, + {6, journal::Op::EXEC, 2, 1, nullopt}}; // Write all entries to a buffer. base::IoBuf buf; diff --git a/src/server/journal/serializer.cc b/src/server/journal/serializer.cc index 08da8d954..658e8a7e5 100644 --- a/src/server/journal/serializer.cc +++ b/src/server/journal/serializer.cc @@ -64,7 +64,7 @@ void JournalWriter::Write(std::monostate) { void JournalWriter::Write(const journal::Entry& entry) { // Check if entry has a new db index and we need to emit a SELECT entry. if (entry.opcode != journal::Op::SELECT && (!cur_dbid_ || entry.dbid != *cur_dbid_)) { - Write(journal::Entry{journal::Op::SELECT, entry.dbid}); + Write(journal::Entry{journal::Op::SELECT, entry.dbid, entry.slot}); cur_dbid_ = entry.dbid; } diff --git a/src/server/journal/streamer.cc b/src/server/journal/streamer.cc index e152109ff..43c808849 100644 --- a/src/server/journal/streamer.cc +++ b/src/server/journal/streamer.cc @@ -4,6 +4,10 @@ #include "server/journal/streamer.h" +#include + +#include "base/logging.h" + namespace dfly { using namespace util; @@ -11,10 +15,15 @@ void JournalStreamer::Start(io::Sink* dest) { using namespace journal; write_fb_ = fb2::Fiber("journal_stream", &JournalStreamer::WriterFb, this, dest); journal_cb_id_ = journal_->RegisterOnChange([this](const JournalItem& item, bool allow_await) { + if (!ShouldWrite(item)) { + return; + } + if (item.opcode == Op::NOOP) { // No record to write, just await if data was written so consumer will read the data. return AwaitIfWritten(); } + Write(io::Buffer(item.data)); NotifyWritten(allow_await); }); @@ -40,4 +49,127 @@ void JournalStreamer::WriterFb(io::Sink* dest) { } } +RestoreStreamer::RestoreStreamer(DbSlice* slice, SlotSet slots, journal::Journal* journal, + Context* cntx) + : JournalStreamer(journal, cntx), db_slice_(slice), my_slots_(std::move(slots)) { + DCHECK(slice != nullptr); +} + +void RestoreStreamer::Start(io::Sink* dest) { + auto db_cb = absl::bind_front(&RestoreStreamer::OnDbChange, this); + snapshot_version_ = db_slice_->RegisterOnChange(std::move(db_cb)); + + JournalStreamer::Start(dest); + + DCHECK(!snapshot_fb_.IsJoinable()); + snapshot_fb_ = fb2::Fiber("slot-snapshot", [this] { + PrimeTable::Cursor cursor; + uint64_t last_yield = 0; + PrimeTable* pt = &db_slice_->databases()[0]->prime; + + do { + if (fiber_cancellation_.IsCancelled()) + return; + + cursor = pt->Traverse(cursor, absl::bind_front(&RestoreStreamer::WriteBucket, this)); + + if (last_yield >= 100) { + ThisFiber::Yield(); + last_yield = 0; + } + } while (cursor); + + WriteCommand(make_pair("DFLYMIGRATE", ArgSlice{"FULL-SYNC-CUT"})); + }); +} + +void RestoreStreamer::Cancel() { + fiber_cancellation_.Cancel(); + snapshot_fb_.JoinIfNeeded(); + db_slice_->UnregisterOnChange(snapshot_version_); + JournalStreamer::Cancel(); +} + +bool RestoreStreamer::ShouldWrite(const journal::JournalItem& item) const { + if (!item.slot.has_value()) { + return false; + } + + return ShouldWrite(*item.slot); +} + +bool RestoreStreamer::ShouldWrite(SlotId slot_id) const { + return my_slots_.contains(slot_id); +} + +void RestoreStreamer::WriteBucket(PrimeTable::bucket_iterator it) { + DCHECK_LT(it.GetVersion(), snapshot_version_); + it.SetVersion(snapshot_version_); + + while (!it.is_done()) { + const auto& pv = it->second; + + string key_buffer; + string_view key = it->first.GetSlice(&key_buffer); + + uint64_t expire = 0; + if (pv.HasExpire()) { + auto eit = db_slice_->databases()[0]->expire.Find(it->first); + expire = db_slice_->ExpireTime(eit); + } + + WriteEntry(key, pv, expire); + + ++it; + } +} + +void RestoreStreamer::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) { + DCHECK_EQ(db_index, 0) << "Restore migration only allowed in cluster mode in db0"; + + FiberAtomicGuard fg; + PrimeTable* table = db_slice_->GetTables(0).first; + + if (const PrimeTable::bucket_iterator* bit = req.update()) { + if (bit->GetVersion() < snapshot_version_) { + WriteBucket(*bit); + } + } else { + string_view key = get(req.change); + table->CVCUponInsert(snapshot_version_, key, [this](PrimeTable::bucket_iterator it) { + DCHECK_LT(it.GetVersion(), snapshot_version_); + WriteBucket(it); + }); + } +} + +void RestoreStreamer::WriteEntry(string_view key, const PrimeValue& pv, uint64_t expire_ms) { + absl::InlinedVector args; + + args.push_back(key); + + string expire_str = absl::StrCat(expire_ms); + args.push_back(expire_str); + + io::StringSink value_dump_sink; + SerializerBase::DumpObject(pv, &value_dump_sink); + args.push_back(value_dump_sink.str()); + + args.push_back("ABSTTL"); // Means expire string is since epoch + + WriteCommand(make_pair("RESTORE", ArgSlice{args})); +} + +void RestoreStreamer::WriteCommand(journal::Entry::Payload cmd_payload) { + journal::Entry entry(0, // txid + journal::Op::COMMAND, // single command + 0, // db index + 1, // shard count + 0, // slot-id, but it is ignored at this level + cmd_payload); + + JournalWriter writer{this}; + writer.Write(entry); +} + } // namespace dfly diff --git a/src/server/journal/streamer.h b/src/server/journal/streamer.h index 3fafb9646..f77f8f27d 100644 --- a/src/server/journal/streamer.h +++ b/src/server/journal/streamer.h @@ -4,9 +4,11 @@ #pragma once +#include "server/db_slice.h" #include "server/io_utils.h" #include "server/journal/journal.h" #include "server/journal/serializer.h" +#include "server/rdb_save.h" namespace dfly { @@ -23,17 +25,20 @@ class JournalStreamer : protected BufferedStreamerBase { JournalStreamer(JournalStreamer&& other) = delete; // Register journal listener and start writer in fiber. - void Start(io::Sink* dest); + virtual void Start(io::Sink* dest); // Must be called on context cancellation for unblocking // and manual cleanup. - void Cancel(); + virtual void Cancel(); using BufferedStreamerBase::GetTotalBufferCapacities; private: // Writer fiber that steals buffer contents and writes them to dest. void WriterFb(io::Sink* dest); + virtual bool ShouldWrite(const journal::JournalItem& item) const { + return true; + } private: Context* cntx_; @@ -44,4 +49,29 @@ class JournalStreamer : protected BufferedStreamerBase { Fiber write_fb_{}; }; +// Serializes existing DB as RESTORE commands, and sends updates as regular commands. +// Only handles relevant slots, while ignoring all others. +class RestoreStreamer : public JournalStreamer { + public: + RestoreStreamer(DbSlice* slice, SlotSet slots, journal::Journal* journal, Context* cntx); + + void Start(io::Sink* dest) override; + void Cancel() override; + + private: + void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req); + bool ShouldWrite(const journal::JournalItem& item) const override; + bool ShouldWrite(SlotId slot_id) const; + + void WriteBucket(PrimeTable::bucket_iterator it); + void WriteEntry(string_view key, const PrimeValue& pv, uint64_t expire_ms); + void WriteCommand(journal::Entry::Payload cmd_payload); + + DbSlice* db_slice_; + uint64_t snapshot_version_ = 0; + SlotSet my_slots_; + Fiber snapshot_fb_; + Cancellation fiber_cancellation_; +}; + } // namespace dfly diff --git a/src/server/journal/types.h b/src/server/journal/types.h index b4134227a..0a21ddc99 100644 --- a/src/server/journal/types.h +++ b/src/server/journal/types.h @@ -3,9 +3,11 @@ // #pragma once +#include #include #include +#include "server/cluster/cluster_config.h" #include "server/common.h" #include "server/table.h" @@ -27,6 +29,7 @@ struct EntryBase { Op opcode; DbIndex dbid; uint32_t shard_cnt; + std::optional slot; }; // This struct represents a single journal entry. @@ -39,15 +42,18 @@ struct Entry : public EntryBase { std::pair // Command and its shard parts. >; - Entry(TxId txid, Op opcode, DbIndex dbid, uint32_t shard_cnt, Payload pl) - : EntryBase{txid, opcode, dbid, shard_cnt}, payload{pl} { + Entry(TxId txid, Op opcode, DbIndex dbid, uint32_t shard_cnt, std::optional slot_id, + Payload pl) + : EntryBase{txid, opcode, dbid, shard_cnt, slot_id}, payload{pl} { } - Entry(journal::Op opcode, DbIndex dbid) : EntryBase{0, opcode, dbid, 0}, payload{} { + Entry(journal::Op opcode, DbIndex dbid, std::optional slot_id) + : EntryBase{0, opcode, dbid, 0, slot_id}, payload{} { } - Entry(TxId txid, journal::Op opcode, DbIndex dbid, uint32_t shard_cnt) - : EntryBase{txid, opcode, dbid, shard_cnt}, payload{} { + Entry(TxId txid, journal::Op opcode, DbIndex dbid, uint32_t shard_cnt, + std::optional slot_id) + : EntryBase{txid, opcode, dbid, shard_cnt, slot_id}, payload{} { } bool HasPayload() const { @@ -73,6 +79,7 @@ struct JournalItem { LSN lsn; Op opcode; std::string data; + std::optional slot; }; using ChangeCallback = std::function; diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 2e5f60634..ec99ce26a 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -34,6 +34,7 @@ extern "C" { #include "server/acl/validator.h" #include "server/bitops_family.h" #include "server/cluster/cluster_family.h" +#include "server/cluster/unique_slot_checker.h" #include "server/conn_context.h" #include "server/error.h" #include "server/generic_family.h" @@ -1807,8 +1808,11 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret sinfo->keys.reserve(eval_args.keys.size()); optional sid; + + UniqueSlotChecker slot_checker; for (size_t i = 0; i < eval_args.keys.size(); ++i) { string_view key = ArgS(eval_args.keys, i); + slot_checker.Add(key); sinfo->keys.insert(KeyLockArgs::GetLockKey(key)); ShardId cur_sid = Shard(key, shard_count()); @@ -1845,7 +1849,8 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret ++ServerState::tlocal()->stats.eval_shardlocal_coordination_cnt; tx->PrepareMultiForScheduleSingleHop(*sid, tx->GetDbIndex(), args); tx->ScheduleSingleHop([&](Transaction*, EngineShard*) { - boost::intrusive_ptr stub_tx = new Transaction{tx, *sid}; + boost::intrusive_ptr stub_tx = + new Transaction{tx, *sid, slot_checker.GetUniqueSlotId()}; cntx->transaction = stub_tx.get(); result = interpreter->RunFunction(eval_args.sha, &error); diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 752694065..4b9c92b8b 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -3,6 +3,7 @@ #include #include "facade/dragonfly_connection.h" +#include "server/cluster/unique_slot_checker.h" #include "server/command_registry.h" #include "server/conn_context.h" #include "server/engine_shard_set.h" @@ -46,14 +47,15 @@ MultiCommandSquasher::MultiCommandSquasher(absl::Span cmds, Connectio atomic_ = mode != Transaction::NON_ATOMIC; } -MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo(ShardId sid) { +MultiCommandSquasher::ShardExecInfo& MultiCommandSquasher::PrepareShardInfo( + ShardId sid, optional slot_id) { if (sharded_.empty()) sharded_.resize(shard_set->size()); auto& sinfo = sharded_[sid]; if (!sinfo.local_tx) { if (IsAtomic()) { - sinfo.local_tx = new Transaction{cntx_->transaction, sid}; + sinfo.local_tx = new Transaction{cntx_->transaction, sid, slot_id}; } else { // Non-atomic squashing does not use the transactional framework for fan out, so local // transactions have to be fully standalone, check locks and release them immediately. @@ -82,11 +84,17 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm // Check if all commands belong to one shard bool found_more = false; + UniqueSlotChecker slot_checker; ShardId last_sid = kInvalidSid; - IterateKeys(args, *keys, [&last_sid, &found_more](MutableSlice key) { + IterateKeys(args, *keys, [&last_sid, &found_more, &slot_checker](MutableSlice key) { if (found_more) return; - ShardId sid = Shard(facade::ToSV(key), shard_set->size()); + + string_view key_sv = facade::ToSV(key); + + slot_checker.Add(key_sv); + + ShardId sid = Shard(key_sv, shard_set->size()); if (last_sid == kInvalidSid || last_sid == sid) { last_sid = sid; return; @@ -97,7 +105,7 @@ MultiCommandSquasher::SquashResult MultiCommandSquasher::TrySquash(StoredCmd* cm if (found_more || last_sid == kInvalidSid) return SquashResult::NOT_SQUASHED; - auto& sinfo = PrepareShardInfo(last_sid); + auto& sinfo = PrepareShardInfo(last_sid, slot_checker.GetUniqueSlotId()); sinfo.had_writes |= cmd->Cid()->IsWriteOnly(); sinfo.cmds.push_back(cmd); diff --git a/src/server/multi_command_squasher.h b/src/server/multi_command_squasher.h index 61c150628..bcd64f678 100644 --- a/src/server/multi_command_squasher.h +++ b/src/server/multi_command_squasher.h @@ -50,7 +50,7 @@ class MultiCommandSquasher { bool verify_commands, bool error_abort); // Lazy initialize shard info. - ShardExecInfo& PrepareShardInfo(ShardId sid); + ShardExecInfo& PrepareShardInfo(ShardId sid, std::optional slot_id); // Retrun squash flags SquashResult TrySquash(StoredCmd* cmd); diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index 53467d96d..3b724e49f 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -1537,38 +1537,4 @@ void SerializerBase::CompressBlob() { ++compression_stats_->compressed_blobs; } -RestoreSerializer::RestoreSerializer(CompressionMode compression_mode) - : SerializerBase(compression_mode) { -} - -error_code RestoreSerializer::SaveEntry(const PrimeKey& pk, const PrimeValue& pv, - uint64_t expire_ms, DbIndex dbid) { - absl::InlinedVector args; - - key_buffer_.clear(); - string_view key = pk.GetSlice(&key_buffer_); - args.push_back(key); - - string expire_str = absl::StrCat(expire_ms); - args.push_back(expire_str); - - value_dump_sink_.Clear(); - DumpObject(pv, &value_dump_sink_); - args.push_back(value_dump_sink_.str()); - - args.push_back("ABSTTL"); // Means expire string is since epoch - - journal::Entry entry(0, // txid - journal::Op::COMMAND, // single command - dbid, // - 1, // shard count - make_pair("RESTORE", ArgSlice{args})); - - sink_.Clear(); - JournalWriter writer{&sink_}; - writer.Write(entry); - - return WriteRaw(io::Buffer(sink_.str())); -} - } // namespace dfly diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index dc7a09bcc..ba3c32007 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -196,9 +196,6 @@ class RdbSerializer : public SerializerBase { ~RdbSerializer(); - // Dumps `obj` in DUMP command format. Uses default compression mode. - static std::string DumpObject(const CompactObj& obj); - std::error_code FlushToSink(io::Sink* s) override; std::error_code SelectDb(uint32_t dbid); @@ -235,19 +232,4 @@ class RdbSerializer : public SerializerBase { DbIndex last_entry_db_index_ = kInvalidDbId; }; -// Serializes CompactObj as RESTORE commands. -class RestoreSerializer : public SerializerBase { - public: - explicit RestoreSerializer(CompressionMode compression_mode); - - std::error_code SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms, - DbIndex dbid); - - private: - // All members are used for saving allocations. - std::string key_buffer_; - io::StringSink value_dump_sink_; - io::StringSink sink_; -}; - } // namespace dfly diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 32a9ec369..f87425c88 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -55,7 +55,7 @@ Transaction::Transaction(const CommandId* cid) : cid_{cid} { } } -Transaction::Transaction(const Transaction* parent, ShardId shard_id) +Transaction::Transaction(const Transaction* parent, ShardId shard_id, std::optional slot_id) : multi_{make_unique()}, txid_{parent->txid()}, unique_shard_cnt_{1}, @@ -69,6 +69,10 @@ Transaction::Transaction(const Transaction* parent, ShardId shard_id) multi_->role = SQUASHED_STUB; time_now_ms_ = parent->time_now_ms_; + + if (slot_id.has_value()) { + unique_slot_checker_.Add(*slot_id); + } } Transaction::~Transaction() { @@ -105,12 +109,16 @@ void Transaction::BuildShardIndex(const KeyIndex& key_index, bool rev_mapping, if (key_index.bonus) { DCHECK(key_index.step == 1); - uint32_t sid = Shard(ArgS(args, *key_index.bonus), shard_data_.size()); + string_view key = ArgS(args, *key_index.bonus); + unique_slot_checker_.Add(key); + uint32_t sid = Shard(key, shard_data_.size()); add(sid, *key_index.bonus); } for (unsigned i = key_index.start; i < key_index.end; ++i) { - uint32_t sid = Shard(ArgS(args, i), shard_data_.size()); + string_view key = ArgS(args, i); + unique_slot_checker_.Add(key); + uint32_t sid = Shard(key, shard_data_.size()); add(sid, i); DCHECK_LE(key_index.step, 2u); @@ -1307,7 +1315,8 @@ void Transaction::UnlockMultiShardCb(const KeyList& sharded_keys, EngineShard* s auto journal = shard->journal(); if (journal != nullptr && multi_->shard_journal_write[shard->shard_id()]) { - journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_journals_cnt, {}, true); + journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_journals_cnt, + unique_slot_checker_.GetUniqueSlotId(), {}, true); } if (multi_->mode == GLOBAL) { @@ -1464,7 +1473,8 @@ void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload& bool is_multi = multi_commands || IsAtomicMulti(); auto opcode = is_multi ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND; - journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, std::move(payload), allow_await); + journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, unique_slot_checker_.GetUniqueSlotId(), + std::move(payload), allow_await); } void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const { @@ -1473,7 +1483,8 @@ void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt } auto journal = shard->journal(); CHECK(journal); - journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt, {}, false); + journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt, + unique_slot_checker_.GetUniqueSlotId(), {}, false); } void Transaction::RunOnceAsCommand(const CommandId* cid, RunnableType cb) { diff --git a/src/server/transaction.h b/src/server/transaction.h index 18e4ff466..5a8bba12b 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -16,6 +16,7 @@ #include "core/intent_lock.h" #include "core/tx_queue.h" #include "facade/op_status.h" +#include "server/cluster/unique_slot_checker.h" #include "server/common.h" #include "server/journal/types.h" #include "server/table.h" @@ -146,7 +147,7 @@ class Transaction { explicit Transaction(const CommandId* cid); // Initialize transaction for squashing placed on a specific shard with a given parent tx - explicit Transaction(const Transaction* parent, ShardId shard_id); + explicit Transaction(const Transaction* parent, ShardId shard_id, std::optional slot_id); // Initialize from command (args) on specific db. OpStatus InitByArgs(DbIndex index, CmdArgList args); @@ -574,6 +575,7 @@ class Transaction { // unique_shard_cnt_ and unique_shard_id_ are accessed only by coordinator thread. uint32_t unique_shard_cnt_{0}; // Number of unique shards active ShardId unique_shard_id_{kInvalidSid}; // Set if unique_shard_cnt_ = 1 + UniqueSlotChecker unique_slot_checker_; EventCount blocking_ec_; // Used to wake blocking transactions. EventCount run_ec_; // Used to wait for shard callbacks