From 0a1b5eb297f6cb510a64e04b88e71740e9d873a9 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Tue, 20 Sep 2022 01:09:03 -0700 Subject: [PATCH] chore(server): rdb save can now save into tcp socket directly. (#317) In more detail, RdbSaver uses AlignedBuffer that writes into io::Sink in chunks of 4KB. It's great for the direct file I/O, but bad for sockets that receive blocks of 4KB with garbage at the end. I improved the code around this and actually simplified the logic, so now AlignedBuffer is just another Sink that is passed into serializer when writing into files. When sending to sockets a socket sink is passed instead. Also many other unrelated changes grouped into this pretty big cr. 1. dashtable readability improvements. 2. Move methods from facade::ConnectionContext - into facade::Service, make ConnectionContext a dumb object. 3. Optionally allow journal to be memory only (not backed up by a disk) by using a ring buffer to store last k entries in each journal slice. Also renamed journal_shard into journal_slice because journal has presence in each DF thread and not only in its shards. 4. Introduce journal::Entry that will consolidate any store change that happens in the thread. 5. Introduce GetRandomHex utility function. 6. Introduce two hooks: ServerFamily::OnClose that is called when a connection is closed, and ServerFamily::BreakOnShutdown that is called when process exits and any background fibers neet to break early. 7. Pull some noisy info logs out of rdb_load class. 8. Snapshot class now has the ability to subscribe to journal changes, thus it can include concurrent changes into the snapshot. Currently only journal::Op::VAL is supported (it's part of RDB format anyway). Signed-off-by: Roman Gershman --- src/core/dash.h | 22 +-- src/core/dash_internal.h | 2 + src/facade/conn_context.h | 8 +- src/facade/dragonfly_connection.cc | 4 +- src/facade/service_interface.h | 7 + src/redis/redis_aux.h | 2 +- src/server/CMakeLists.txt | 2 +- src/server/common.h | 25 +++ src/server/conn_context.cc | 39 ----- src/server/conn_context.h | 11 +- src/server/db_slice.cc | 1 + src/server/dflycmd.cc | 42 ++++- src/server/dflycmd.h | 5 + src/server/journal/journal.cc | 71 ++++----- src/server/journal/journal.h | 31 ++-- .../{journal_shard.cc => journal_slice.cc} | 75 +++++++-- .../{journal_shard.h => journal_slice.h} | 27 +++- src/server/journal/types.h | 50 ++++++ src/server/json_family.cc | 7 +- src/server/main_service.cc | 46 +++++- src/server/main_service.h | 2 + src/server/rdb_load.cc | 5 +- src/server/rdb_load.h | 17 +- src/server/rdb_save.cc | 145 +++++++++--------- src/server/rdb_save.h | 29 ++-- src/server/server_family.cc | 59 ++++--- src/server/server_family.h | 4 + src/server/server_state.h | 5 +- src/server/snapshot.cc | 74 ++++++--- src/server/snapshot.h | 9 +- src/server/string_family.cc | 61 ++++---- src/server/transaction.cc | 2 +- src/server/transaction.h | 2 +- 33 files changed, 573 insertions(+), 318 deletions(-) rename src/server/journal/{journal_shard.cc => journal_slice.cc} (59%) rename src/server/journal/{journal_shard.h => journal_slice.h} (55%) create mode 100644 src/server/journal/types.h diff --git a/src/core/dash.h b/src/core/dash.h index 490baeb8d..df3aacd0f 100644 --- a/src/core/dash.h +++ b/src/core/dash.h @@ -716,12 +716,12 @@ template auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, EvictionPolicy& ev) -> std::pair { uint64_t key_hash = DoHash(key); - uint32_t seg_id = SegmentId(key_hash); + uint32_t target_seg_id = SegmentId(key_hash); while (true) { // Keep last global_depth_ msb bits of the hash. - assert(seg_id < segment_.size()); - SegmentType* target = segment_[seg_id]; + assert(target_seg_id < segment_.size()); + SegmentType* target = segment_[target_seg_id]; // Load heap allocated segment data - to avoid TLB miss when accessing the bucket. __builtin_prefetch(target, 0, 1); @@ -731,12 +731,12 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio if (res) { // success ++size_; - return std::make_pair(iterator{this, seg_id, it.index, it.slot}, true); + return std::make_pair(iterator{this, target_seg_id, it.index, it.slot}, true); } /*duplicate insert, insertion failure*/ if (it.found()) { - return std::make_pair(iterator{this, seg_id, it.index, it.slot}, false); + return std::make_pair(iterator{this, target_seg_id, it.index, it.slot}, false); } // At this point we must split the segment. @@ -749,12 +749,12 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio hotspot.key_hash = key_hash; for (unsigned j = 0; j < HotspotBuckets::kRegularBuckets; ++j) { - hotspot.probes.by_type.regular_buckets[j] = bucket_iterator{this, seg_id, bid[j]}; + hotspot.probes.by_type.regular_buckets[j] = bucket_iterator{this, target_seg_id, bid[j]}; } for (unsigned i = 0; i < Policy::kStashBucketNum; ++i) { hotspot.probes.by_type.stash_buckets[i] = - bucket_iterator{this, seg_id, uint8_t(kLogicalBucketNum + i), 0}; + bucket_iterator{this, target_seg_id, uint8_t(kLogicalBucketNum + i), 0}; } hotspot.num_buckets = HotspotBuckets::kNumBuckets; @@ -770,7 +770,7 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio /*unsigned start = (bid[HotspotBuckets::kNumBuckets - 1] + 1) % kLogicalBucketNum; for (unsigned i = 0; i < HotspotBuckets::kNumBuckets; ++i) { uint8_t id = (start + i) % kLogicalBucketNum; - buckets.probes.arr[i] = bucket_iterator{this, seg_id, id}; + buckets.probes.arr[i] = bucket_iterator{this, target_seg_id, id}; } garbage_collected_ += ev.GarbageCollect(buckets, this); */ @@ -804,12 +804,12 @@ auto DashTable<_Key, _Value, Policy>::InsertInternal(U&& key, V&& value, Evictio if (target->local_depth() == global_depth_) { IncreaseDepth(global_depth_ + 1); - seg_id = SegmentId(key_hash); - assert(seg_id < segment_.size() && segment_[seg_id] == target); + target_seg_id = SegmentId(key_hash); + assert(target_seg_id < segment_.size() && segment_[target_seg_id] == target); } ev.RecordSplit(target); - Split(seg_id); + Split(target_seg_id); } return std::make_pair(iterator{}, false); diff --git a/src/core/dash_internal.h b/src/core/dash_internal.h index 8f3e9fc33..7aff1dcfd 100644 --- a/src/core/dash_internal.h +++ b/src/core/dash_internal.h @@ -1220,6 +1220,8 @@ void Segment::Split(HFunc&& hfn, Segment* dest_right) { auto it = dest_right->InsertUniq(std::forward(Key(bid, slot)), std::forward(Value(bid, slot)), hash); (void)it; + assert(it.index != kNanBid); + if constexpr (USE_VERSION) { // Update the version in the destination bucket. uint64_t ver = stash.GetVersion(); diff --git a/src/facade/conn_context.h b/src/facade/conn_context.h index 55639a155..968059aee 100644 --- a/src/facade/conn_context.h +++ b/src/facade/conn_context.h @@ -17,8 +17,8 @@ class ConnectionContext { public: ConnectionContext(::io::Sink* stream, Connection* owner); - // We won't have any virtual methods, probably. However, since we allocate derived class, - // we need to declare a virtual d-tor so we could delete them inside Connection. + // We won't have any virtual methods, probably. However, since we allocate a derived class, + // we need to declare a virtual d-tor, so we could properly delete it from Connection code. virtual ~ConnectionContext() {} Connection* owner() { @@ -51,10 +51,6 @@ class ConnectionContext { bool authenticated: 1; bool force_dispatch: 1; // whether we should route all requests to the dispatch fiber. - virtual void OnClose() {} - - virtual std::string GetContextInfo() const { return std::string{}; } - private: Connection* owner_; Protocol protocol_ = Protocol::REDIS; diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index d2ec3a3c2..50ed77baa 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -300,7 +300,7 @@ string Connection::GetClientInfo() const { absl::StrAppend(&res, " age=", now - creation_time_, " idle=", now - last_interaction_); absl::StrAppend(&res, " phase=", phase_, " "); if (cc_) { - absl::StrAppend(&res, cc_->GetContextInfo()); + absl::StrAppend(&res, service_->GetContextInfo(cc_.get())); } return res; @@ -374,7 +374,7 @@ void Connection::ConnectionFlow(FiberSocketBase* peer) { VLOG(1) << "Before dispatch_fb.join()"; dispatch_fb.join(); VLOG(1) << "After dispatch_fb.join()"; - cc_->OnClose(); + service_->OnClose(cc_.get()); stats->read_buf_capacity -= io_buf_.Capacity(); diff --git a/src/facade/service_interface.h b/src/facade/service_interface.h index c38ad26e4..75445a6e6 100644 --- a/src/facade/service_interface.h +++ b/src/facade/service_interface.h @@ -32,6 +32,13 @@ class ServiceInterface { virtual void ConfigureHttpHandlers(util::HttpListenerBase* base) { } + + virtual void OnClose(ConnectionContext* cntx) { + } + + virtual std::string GetContextInfo(ConnectionContext* cntx) { + return {}; + } }; } // namespace facade diff --git a/src/redis/redis_aux.h b/src/redis/redis_aux.h index 0614b3898..9d794f994 100644 --- a/src/redis/redis_aux.h +++ b/src/redis/redis_aux.h @@ -27,7 +27,7 @@ #define MAXMEMORY_NO_EVICTION (7<<8) -#define CONFIG_RUN_ID_SIZE 40 +#define CONFIG_RUN_ID_SIZE 40U #define EVPOOL_CACHED_SDS_SIZE 255 #define EVPOOL_SIZE 16 diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 6a00fdd0f..9fd26161a 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -2,7 +2,7 @@ add_executable(dragonfly dfly_main.cc) cxx_link(dragonfly base dragonfly_lib) add_library(dfly_transaction db_slice.cc engine_shard_set.cc blocking_controller.cc common.cc - io_mgr.cc journal/journal.cc journal/journal_shard.cc table.cc + io_mgr.cc journal/journal.cc journal/journal_slice.cc table.cc tiered_storage.cc transaction.cc) cxx_link(dfly_transaction uring_fiber_lib dfly_core strings_lib) diff --git a/src/server/common.h b/src/server/common.h index dc3d692ee..7be5b7eaa 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -5,6 +5,7 @@ #pragma once #include +#include #include #include @@ -131,4 +132,28 @@ extern unsigned kernel_version; const char* GlobalStateName(GlobalState gs); +template std::string GetRandomHex(RandGen& gen, size_t len) { + static_assert(std::is_same::value); + std::string res(len, '\0'); + size_t indx = 0; + + for (size_t i = 0; i < len / 16; ++i) { // 2 chars per byte + absl::AlphaNum an(absl::Hex(gen(), absl::kZeroPad16)); + + for (unsigned j = 0; j < 16; ++j) { + res[indx++] = an.Piece()[j]; + } + } + + if (indx < res.size()) { + absl::AlphaNum an(absl::Hex(gen(), absl::kZeroPad16)); + + for (unsigned j = 0; indx < res.size(); indx++, j++) { + res[indx] = an.Piece()[j]; + } + } + + return res; +} + } // namespace dfly diff --git a/src/server/conn_context.cc b/src/server/conn_context.cc index 10454dd99..be80718d1 100644 --- a/src/server/conn_context.cc +++ b/src/server/conn_context.cc @@ -211,45 +211,6 @@ void ConnectionContext::SendSubscriptionChangedResponse(string_view action, (*this)->SendLong(count); } -void ConnectionContext::OnClose() { - if (!conn_state.exec_info.watched_keys.empty()) { - shard_set->RunBriefInParallel([this](EngineShard* shard) { - return shard->db_slice().UnregisterConnectionWatches(&conn_state.exec_info); - }); - } - - if (!conn_state.subscribe_info) - return; - - if (!conn_state.subscribe_info->channels.empty()) { - auto token = conn_state.subscribe_info->borrow_token; - UnsubscribeAll(false); - // Check that all borrowers finished processing - token.Wait(); - } - - if (conn_state.subscribe_info) { - DCHECK(!conn_state.subscribe_info->patterns.empty()); - auto token = conn_state.subscribe_info->borrow_token; - PUnsubscribeAll(false); - // Check that all borrowers finished processing - token.Wait(); - DCHECK(!conn_state.subscribe_info); - } -} - -string ConnectionContext::GetContextInfo() const { - char buf[16] = {0}; - unsigned index = 0; - if (async_dispatch) - buf[index++] = 'a'; - - if (conn_closing) - buf[index++] = 't'; - - return index ? absl::StrCat("flags:", buf) : string(); -} - void ConnectionState::ExecInfo::Clear() { state = EXEC_INACTIVE; body.clear(); diff --git a/src/server/conn_context.h b/src/server/conn_context.h index ab3e5c269..4cc1979ee 100644 --- a/src/server/conn_context.h +++ b/src/server/conn_context.h @@ -83,8 +83,11 @@ struct ConnectionState { // For set op - it's the flag value we are storing along with the value. // For get op - we use it as a mask of MCGetMask values. uint32_t memcache_flag = 0; - // If it's a replication client - then it holds positive sync session id. - uint32_t sync_session_id = 0; + + // If this server is master, and this connection is from a secondary replica, + // then it holds positive sync session id. + uint32_t repl_session_id = 0; + uint32_t repl_threadid = kuint32max; ExecInfo exec_info; std::optional script_info; @@ -97,8 +100,6 @@ class ConnectionContext : public facade::ConnectionContext { : facade::ConnectionContext(stream, owner) { } - void OnClose() override; - struct DebugInfo { uint32_t shards_count = 0; TxClock clock = 0; @@ -123,8 +124,6 @@ class ConnectionContext : public facade::ConnectionContext { bool is_replicating = false; - std::string GetContextInfo() const override; - private: void SendSubscriptionChangedResponse(std::string_view action, std::optional topic, diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index bb81db6bd..f045df5f8 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -67,6 +67,7 @@ class PrimeEvictionPolicy { can_evict_(can_evict) { } + // A hook function that is called every time a segment is full and requires splitting. void RecordSplit(PrimeTable::Segment_t* segment) { mem_budget_ -= PrimeTable::kSegBytes; DVLOG(1) << "split: " << segment->SlowSize() << "/" << segment->capacity(); diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index e178dd8ac..1167e392e 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -113,6 +113,23 @@ void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) { rb->SendError(kSyntaxErr); } +void DflyCmd::OnClose(ConnectionContext* cntx) { + boost::fibers::fiber repl_fb; + + if (cntx->conn_state.repl_session_id > 0 && cntx->conn_state.repl_threadid != kuint32max) { + unique_lock lk(mu_); + + auto it = sync_info_.find(cntx->conn_state.repl_session_id); + if (it != sync_info_.end()) { + VLOG(1) << "Found tbd: " << cntx->conn_state.repl_session_id; + } + } + + if (repl_fb.joinable()) { + repl_fb.join(); + } +} + void DflyCmd::HandleJournal(CmdArgList args, ConnectionContext* cntx) { DCHECK_GE(args.size(), 3u); ToUpper(&args[2]); @@ -127,7 +144,26 @@ void DflyCmd::HandleJournal(CmdArgList args, ConnectionContext* cntx) { journal::Journal* journal = ServerState::tlocal()->journal(); if (!journal) { string dir = absl::GetFlag(FLAGS_dir); - sf_->journal()->StartLogging(dir); + + atomic_uint32_t created{0}; + auto* pool = shard_set->pool(); + + auto open_cb = [&](auto* pb) { + auto ec = sf_->journal()->OpenInThread(true, dir); + if (ec) { + LOG(ERROR) << "Could not create journal " << ec; + } else { + created.fetch_add(1, memory_order_relaxed); + } + }; + + pool->AwaitFiberOnAll(open_cb); + if (created.load(memory_order_acquire) != pool->size()) { + LOG(FATAL) << "TBD / revert"; + } + + // We can not use transaction distribution mechanism because we must open journal for all + // threads and not only for shards. trans->Schedule(); auto barrier_cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; trans->Execute(barrier_cb, true); @@ -165,4 +201,8 @@ uint32_t DflyCmd::AllocateSyncSession() { return it->first; } +void DflyCmd::BreakOnShutdown() { + VLOG(1) << "BreakOnShutdown"; +} + } // namespace dfly diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h index 36140e070..60841e70d 100644 --- a/src/server/dflycmd.h +++ b/src/server/dflycmd.h @@ -29,6 +29,11 @@ class DflyCmd { uint32_t AllocateSyncSession(); + void OnClose(ConnectionContext* cntx); + + // stops all background processes so we could exit in orderly manner. + void BreakOnShutdown(); + private: void HandleJournal(CmdArgList args, ConnectionContext* cntx); diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 3e461ca53..4ed090dea 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -8,7 +8,7 @@ #include "base/logging.h" #include "server/engine_shard_set.h" -#include "server/journal/journal_shard.h" +#include "server/journal/journal_slice.h" #include "server/server_state.h" namespace dfly { @@ -21,43 +21,33 @@ namespace fibers = boost::fibers; namespace { -thread_local JournalShard journal_shard; +// Present in all threads (not only in shard threads). +thread_local JournalSlice journal_slice; } // namespace Journal::Journal() { } -error_code Journal::StartLogging(std::string_view dir) { - if (journal_shard.IsOpen()) { - return error_code{}; +error_code Journal::OpenInThread(bool persistent, string_view dir) { + journal_slice.Init(unsigned(ProactorBase::GetIndex())); + + error_code ec; + + if (persistent) { + ec = journal_slice.Open(dir); + if (ec) { + return ec; + } } - auto* pool = shard_set->pool(); - atomic_uint32_t created{0}; - lock_guard lk(state_mu_); - - auto open_cb = [&](auto* pb) { - auto ec = journal_shard.Open(dir, unsigned(ProactorBase::GetIndex())); - if (ec) { - LOG(FATAL) << "Could not create journal " << ec; // TODO - } else { - created.fetch_add(1, memory_order_relaxed); ServerState::tlocal()->set_journal(this); EngineShard* shard = EngineShard::tlocal(); if (shard) { shard->set_journal(this); } - } - }; - pool->AwaitFiberOnAll(open_cb); - - if (created.load(memory_order_acquire) != pool->size()) { - LOG(FATAL) << "TBD / revert"; - } - - return error_code{}; + return ec; } error_code Journal::Close() { @@ -76,7 +66,7 @@ error_code Journal::Close() { shard->set_journal(nullptr); } - auto ec = journal_shard.Close(); + auto ec = journal_slice.Close(); if (ec) { lock_guard lk2(ec_mu); @@ -89,21 +79,30 @@ error_code Journal::Close() { return res; } +uint32_t Journal::RegisterOnChange(ChangeCallback cb) { + return journal_slice.RegisterOnChange(cb); +} + +void Journal::Unregister(uint32_t id) { + journal_slice.Unregister(id); +} + bool Journal::SchedStartTx(TxId txid, unsigned num_keys, unsigned num_shards) { - if (!journal_shard.IsOpen() || lameduck_.load(memory_order_relaxed)) + if (!journal_slice.IsOpen() || lameduck_.load(memory_order_relaxed)) return false; - journal_shard.AddLogRecord(txid, unsigned(Op::SCHED)); + // TODO: to complete the metadata. + journal_slice.AddLogRecord(Entry::Sched(txid)); return true; } LSN Journal::GetLsn() const { - return journal_shard.cur_lsn(); + return journal_slice.cur_lsn(); } bool Journal::EnterLameDuck() { - if (!journal_shard.IsOpen()) { + if (!journal_slice.IsOpen()) { return false; } @@ -112,15 +111,17 @@ bool Journal::EnterLameDuck() { return res; } +void Journal::RecordEntry(const Entry& entry) { + journal_slice.AddLogRecord(entry); +} + +/* void Journal::OpArgs(TxId txid, Op opcode, Span keys) { - DCHECK(journal_shard.IsOpen()); + DCHECK(journal_slice.IsOpen()); - journal_shard.AddLogRecord(txid, unsigned(opcode)); -} - -void Journal::RecordEntry(TxId txid, const PrimeKey& key, const PrimeValue& pval) { - journal_shard.AddLogRecord(txid, unsigned(Op::VAL)); + journal_slice.AddLogRecord(txid, opcode); } +*/ } // namespace journal } // namespace dfly diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index 14b71bde5..162787812 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -4,8 +4,7 @@ #pragma once -#include "server/common.h" -#include "server/table.h" +#include "server/journal/types.h" #include "util/proactor_pool.h" namespace dfly { @@ -14,17 +13,6 @@ class Transaction; namespace journal { -enum class Op : uint8_t { - NOOP = 0, - LOCK = 1, - UNLOCK = 2, - LOCK_SHARD = 3, - UNLOCK_SHARD = 4, - SCHED = 5, - VAL = 10, - DEL, - MSET, -}; class Journal { public: @@ -32,8 +20,6 @@ class Journal { Journal(); - std::error_code StartLogging(std::string_view dir); - // Returns true if journal has been active and changed its state to lameduck mode // and false otherwise. bool EnterLameDuck(); // still logs ongoing transactions but refuses to start new ones. @@ -41,10 +27,20 @@ class Journal { // Requires: journal is in lameduck mode. std::error_code Close(); + // Opens journal inside a Dragonfly thread. Must be called in each thread. + std::error_code OpenInThread(bool persistent, std::string_view dir); + + //******* The following functions must be called in the context of the owning shard *********// + + + uint32_t RegisterOnChange(ChangeCallback cb); + void Unregister(uint32_t id); + // Returns true if transaction was scheduled, false if journal is inactive // or in lameduck mode and does not log new transactions. bool SchedStartTx(TxId txid, unsigned num_keys, unsigned num_shards); + /* void AddCmd(TxId txid, Op opcode, Span args) { OpArgs(txid, opcode, args); } @@ -56,13 +52,12 @@ class Journal { void Unlock(TxId txid, Span keys) { OpArgs(txid, Op::UNLOCK, keys); } - +*/ LSN GetLsn() const; - void RecordEntry(TxId txid, const PrimeKey& key, const PrimeValue& pval); + void RecordEntry(const Entry& entry); private: - void OpArgs(TxId id, Op opcode, Span keys); mutable boost::fibers::mutex state_mu_; diff --git a/src/server/journal/journal_shard.cc b/src/server/journal/journal_slice.cc similarity index 59% rename from src/server/journal/journal_shard.cc rename to src/server/journal/journal_slice.cc index 73d11df3a..af11226f4 100644 --- a/src/server/journal/journal_shard.cc +++ b/src/server/journal/journal_slice.cc @@ -2,12 +2,11 @@ // See LICENSE for licensing terms. // -#include "server/journal/journal_shard.h" - -#include +#include "server/journal/journal_slice.h" #include #include +#include #include @@ -35,17 +34,30 @@ string ShardName(std::string_view base, unsigned index) { CHECK(!__ec$) << "Error: " << __ec$ << " " << __ec$.message() << " for " << #x; \ } while (false) +struct JournalSlice::RingItem { + LSN lsn; + TxId txid; + Op opcode; +}; - -JournalShard::JournalShard() { +JournalSlice::JournalSlice() { } -JournalShard::~JournalShard() { +JournalSlice::~JournalSlice() { CHECK(!shard_file_); } -std::error_code JournalShard::Open(const std::string_view dir, unsigned index) { +void JournalSlice::Init(unsigned index) { + if (ring_buffer_) // calling this function multiple times is allowed and it's a no-op. + return; + + slice_index_ = index; + ring_buffer_.emplace(128); // TODO: to make it configurable +} + +std::error_code JournalSlice::Open(std::string_view dir) { CHECK(!shard_file_); + DCHECK_NE(slice_index_, UINT32_MAX); fs::path dir_path; @@ -65,7 +77,8 @@ std::error_code JournalShard::Open(const std::string_view dir, unsigned index) { } // LOG(INFO) << int(dir_status.type()); } - dir_path.append(ShardName("journal", index)); + + dir_path.append(ShardName("journal", slice_index_)); shard_path_ = dir_path; // For file integrity guidelines see: @@ -81,15 +94,14 @@ std::error_code JournalShard::Open(const std::string_view dir, unsigned index) { DVLOG(1) << "Opened journal " << shard_path_; shard_file_ = std::move(res).value(); - shard_index_ = index; file_offset_ = 0; status_ec_.clear(); return error_code{}; } -error_code JournalShard::Close() { - VLOG(1) << "JournalShard::Close"; +error_code JournalSlice::Close() { + VLOG(1) << "JournalSlice::Close"; CHECK(shard_file_); lameduck_ = true; @@ -103,13 +115,44 @@ error_code JournalShard::Close() { return ec; } -void JournalShard::AddLogRecord(TxId txid, unsigned opcode) { - string line = absl::StrCat(lsn_, " ", txid, " ", opcode, "\n"); - error_code ec = shard_file_->Write(io::Buffer(line), file_offset_, 0); - CHECK_EC(ec); - file_offset_ += line.size(); +void JournalSlice::AddLogRecord(const Entry& entry) { + DCHECK(ring_buffer_); + + for (const auto& k_v : change_cb_arr_) { + k_v.second(entry); + } + + RingItem item; + item.lsn = lsn_; + item.opcode = entry.opcode; + item.txid = entry.txid; + VLOG(1) << "Writing item " << item.lsn; + ring_buffer_->EmplaceOrOverride(move(item)); + + if (shard_file_) { + string line = absl::StrCat(lsn_, " ", entry.txid, " ", entry.opcode, "\n"); + error_code ec = shard_file_->Write(io::Buffer(line), file_offset_, 0); + CHECK_EC(ec); + file_offset_ += line.size(); + } + ++lsn_; } +uint32_t JournalSlice::RegisterOnChange(ChangeCallback cb) { + uint32_t id = next_cb_id_++; + change_cb_arr_.emplace_back(id, std::move(cb)); + return id; +} + +void JournalSlice::Unregister(uint32_t id) { + for (auto it = change_cb_arr_.begin(); it != change_cb_arr_.end(); ++it) { + if (it->first == id) { + change_cb_arr_.erase(it); + break; + } + } +} + } // namespace journal } // namespace dfly diff --git a/src/server/journal/journal_shard.h b/src/server/journal/journal_slice.h similarity index 55% rename from src/server/journal/journal_shard.h rename to src/server/journal/journal_slice.h index 9e4860ef3..e52133704 100644 --- a/src/server/journal/journal_shard.h +++ b/src/server/journal/journal_slice.h @@ -9,18 +9,23 @@ #include #include +#include "base/ring_buffer.h" #include "server/common.h" +#include "server/journal/types.h" #include "util/uring/uring_file.h" namespace dfly { namespace journal { -class JournalShard { +// Journal slice is present for both shards and io threads. +class JournalSlice { public: - JournalShard(); - ~JournalShard(); + JournalSlice(); + ~JournalSlice(); - std::error_code Open(const std::string_view dir, unsigned index); + void Init(unsigned index); + + std::error_code Open(std::string_view dir); std::error_code Close(); @@ -32,20 +37,30 @@ class JournalShard { return status_ec_; } + // Whether the file-based journaling is open. bool IsOpen() const { return bool(shard_file_); } - void AddLogRecord(TxId txid, unsigned opcode); + void AddLogRecord(const Entry& entry); + + uint32_t RegisterOnChange(ChangeCallback cb); + void Unregister(uint32_t); private: + + struct RingItem; + std::string shard_path_; std::unique_ptr shard_file_; + std::optional> ring_buffer_; + std::vector> change_cb_arr_; size_t file_offset_ = 0; LSN lsn_ = 1; - unsigned shard_index_ = -1; + uint32_t slice_index_ = UINT32_MAX; + uint32_t next_cb_id_ = 1; std::error_code status_ec_; diff --git a/src/server/journal/types.h b/src/server/journal/types.h new file mode 100644 index 000000000..bd307e48a --- /dev/null +++ b/src/server/journal/types.h @@ -0,0 +1,50 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// +#pragma once + +#include "server/common.h" +#include "server/table.h" + +namespace dfly { +namespace journal { + +enum class Op : uint8_t { + NOOP = 0, + LOCK = 1, + UNLOCK = 2, + LOCK_SHARD = 3, + UNLOCK_SHARD = 4, + SCHED = 5, + VAL = 10, + DEL, + MSET, +}; + +// TODO: to pass all the attributes like ttl, stickiness etc. +struct Entry { + Entry(Op op, DbIndex did, TxId tid, std::string_view skey) + : opcode(op), db_ind(did), txid(tid), key(skey) { + } + + Entry(DbIndex did, TxId tid, std::string_view skey, const PrimeValue& pval) + : Entry(Op::VAL, did, tid, skey) { + pval_ptr = &pval; + } + + static Entry Sched(TxId tid) { + return Entry{Op::SCHED, 0, tid, {}}; + } + + Op opcode; + DbIndex db_ind; + TxId txid; + std::string_view key; + const PrimeValue* pval_ptr = nullptr; + uint64_t expire_ms = 0; // 0 means no expiry. +}; + +using ChangeCallback = std::function; + +} // namespace journal +} // namespace dfly diff --git a/src/server/json_family.cc b/src/server/json_family.cc index 3b3b87c60..5b5994250 100644 --- a/src/server/json_family.cc +++ b/src/server/json_family.cc @@ -51,9 +51,10 @@ string GetString(EngineShard* shard, const PrimeValue& pv) { return res; } -inline void RecordJournal(const OpArgs& op_args, const PrimeKey& pkey, const PrimeKey& pvalue) { +inline void RecordJournal(const OpArgs& op_args, string_view key, const PrimeKey& pvalue) { if (op_args.shard->journal()) { - op_args.shard->journal()->RecordEntry(op_args.txid, pkey, pvalue); + journal::Entry entry{op_args.db_ind, op_args.txid, key, pvalue}; + op_args.shard->journal()->RecordEntry(entry); } } @@ -63,7 +64,7 @@ void SetString(const OpArgs& op_args, string_view key, const string& value) { db_slice.PreUpdate(op_args.db_ind, it_output); it_output->second.SetString(value); db_slice.PostUpdate(op_args.db_ind, it_output, key); - RecordJournal(op_args, it_output->first, it_output->second); + RecordJournal(op_args, key, it_output->second); } string JsonType(const json& val) { diff --git a/src/server/main_service.cc b/src/server/main_service.cc index d6f5cd148..27217ff2a 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -675,10 +675,11 @@ facade::ConnectionContext* Service::CreateContext(util::FiberSocketBase* peer, // a bit of a hack. I set up breaker callback here for the owner. // Should work though it's confusing to have it here. - owner->RegisterOnBreak([res](uint32_t) { + owner->RegisterOnBreak([res, this](uint32_t) { if (res->transaction) { - res->transaction->BreakOnClose(); + res->transaction->BreakOnShutdown(); } + this->server_family().BreakOnShutdown(); }); return res; @@ -1060,7 +1061,7 @@ void Service::Publish(CmdArgList args, ConnectionContext* cntx) { // How do we know that subsribers did not disappear after we fetched them? // Each subscriber object hold a borrow_token. - // ConnectionContext::OnClose does not reset subscribe_info before all tokens are returned. + // OnClose does not reset subscribe_info before all tokens are returned. vector subscriber_arr = shard_set->Await(sid, std::move(cb)); atomic_uint32_t published{0}; @@ -1249,6 +1250,45 @@ void Service::ConfigureHttpHandlers(util::HttpListenerBase* base) { base->RegisterCb("/txz", TxTable); } +void Service::OnClose(facade::ConnectionContext* cntx) { + ConnectionContext* server_cntx = static_cast(cntx); + ConnectionState& conn_state = server_cntx->conn_state; + + if (conn_state.subscribe_info) { // Clean-ups related to PUBSUB + if (!conn_state.subscribe_info->channels.empty()) { + auto token = conn_state.subscribe_info->borrow_token; + server_cntx->UnsubscribeAll(false); + + // Check that all borrowers finished processing. + // token is increased in channel_slice (the publisher side). + token.Wait(); + } + + if (conn_state.subscribe_info) { + DCHECK(!conn_state.subscribe_info->patterns.empty()); + auto token = conn_state.subscribe_info->borrow_token; + server_cntx->PUnsubscribeAll(false); + // Check that all borrowers finished processing + token.Wait(); + DCHECK(!conn_state.subscribe_info); + } + } + + server_family_.OnClose(server_cntx); +} + +string Service::GetContextInfo(facade::ConnectionContext* cntx) { + char buf[16] = {0}; + unsigned index = 0; + if (cntx->async_dispatch) + buf[index++] = 'a'; + + if (cntx->conn_closing) + buf[index++] = 't'; + + return index ? absl::StrCat("flags:", buf) : string(); +} + using ServiceFunc = void (Service::*)(CmdArgList, ConnectionContext* cntx); #define HFUNC(x) SetHandler(&Service::x) diff --git a/src/server/main_service.h b/src/server/main_service.h index 09f70ffaf..07a07e8d9 100644 --- a/src/server/main_service.h +++ b/src/server/main_service.h @@ -82,6 +82,8 @@ class Service : public facade::ServiceInterface { GlobalState SwitchState(GlobalState from , GlobalState to); void ConfigureHttpHandlers(util::HttpListenerBase* base) final; + void OnClose(facade::ConnectionContext* cntx) final; + std::string GetContextInfo(facade::ConnectionContext* cntx) final; private: static void Quit(CmdArgList args, ConnectionContext* cntx); diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index ce0b8f6b8..f3bda0f9d 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -1004,9 +1004,8 @@ error_code RdbLoader::Load(io::Source* src) { bc.Wait(); // wait for sentinels to report. absl::Duration dur = absl::Now() - start; - double seconds = double(absl::ToInt64Milliseconds(dur)) / 1000; - LOG(INFO) << "Done loading RDB, keys loaded: " << keys_loaded; - LOG(INFO) << "Loading finished after " << strings::HumanReadableElapsedTime(seconds); + load_time_ = double(absl::ToInt64Milliseconds(dur)) / 1000; + keys_loaded_ = keys_loaded; return kOk; } diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index f05baa0cf..93685c030 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -34,10 +34,20 @@ class RdbLoader { ::io::Bytes Leftover() const { return mem_buf_.InputBuffer(); } + size_t bytes_read() const { return bytes_read_; } + size_t keys_loaded() const { + return keys_loaded_; + } + + // returns time in seconds. + double load_time() const { + return load_time_; + } + private: using MutableBytes = ::io::MutableBytes; struct ObjSettings; @@ -49,8 +59,8 @@ class RdbLoader { struct LoadTrace; - using RdbVariant = std::variant, LzfString, - std::unique_ptr>; + using RdbVariant = + std::variant, LzfString, std::unique_ptr>; struct OpaqueObj { RdbVariant obj; int rdb_type; @@ -164,6 +174,9 @@ class RdbLoader { ::io::Source* src_ = nullptr; size_t bytes_read_ = 0; size_t source_limit_ = SIZE_MAX; + size_t keys_loaded_ = 0; + double load_time_ = 0; + DbIndex cur_db_index_ = 0; ::boost::fibers::mutex mu_; diff --git a/src/server/rdb_save.cc b/src/server/rdb_save.cc index f7370e8d8..ee51c7d86 100644 --- a/src/server/rdb_save.cc +++ b/src/server/rdb_save.cc @@ -2,13 +2,14 @@ // See LICENSE for licensing terms. // -#include "core/string_set.h" #include "server/rdb_save.h" #include #include #include +#include "core/string_set.h" + extern "C" { #include "redis/intset.h" #include "redis/listpack.h" @@ -159,10 +160,6 @@ constexpr size_t kAmask = 4_KB - 1; RdbSerializer::RdbSerializer(io::Sink* s) : sink_(s), mem_buf_{4_KB}, tmp_buf_(nullptr) { } -RdbSerializer::RdbSerializer(AlignedBuffer* aligned_buf) : RdbSerializer((io::Sink*)nullptr) { - aligned_buf_ = aligned_buf; -} - RdbSerializer::~RdbSerializer() { } @@ -311,7 +308,7 @@ error_code RdbSerializer::SaveSetObject(const PrimeValue& obj) { RETURN_ON_ERR(SaveString(string_view{ele, sdslen(ele)})); } } else if (obj.Encoding() == kEncodingStrMap2) { - StringSet *set = (StringSet*)obj.RObjPtr(); + StringSet* set = (StringSet*)obj.RObjPtr(); RETURN_ON_ERR(SaveLen(set->Size())); @@ -593,22 +590,14 @@ error_code RdbSerializer::WriteRaw(const io::Bytes& buf) { io::Bytes ib = mem_buf_.InputBuffer(); if (ib.empty()) { - if (sink_) { - return sink_->Write(buf); - } else { - return aligned_buf_->Write(buf); - } - } else { - if (sink_) { - iovec v[2] = {{.iov_base = const_cast(ib.data()), .iov_len = ib.size()}, - {.iov_base = const_cast(buf.data()), .iov_len = buf.size()}}; - RETURN_ON_ERR(sink_->Write(v, ABSL_ARRAYSIZE(v))); - } else { - RETURN_ON_ERR(aligned_buf_->Write(ib)); - RETURN_ON_ERR(aligned_buf_->Write(buf)); - } - mem_buf_.ConsumeInput(ib.size()); + return sink_->Write(buf); } + // else + iovec v[2] = {{.iov_base = const_cast(ib.data()), .iov_len = ib.size()}, + {.iov_base = const_cast(buf.data()), .iov_len = buf.size()}}; + RETURN_ON_ERR(sink_->Write(v, ABSL_ARRAYSIZE(v))); + mem_buf_.ConsumeInput(ib.size()); + return error_code{}; } @@ -620,11 +609,7 @@ error_code RdbSerializer::FlushMem() { DVLOG(2) << "FlushMem " << sz << " bytes"; // interrupt point. - if (sink_) { - RETURN_ON_ERR(sink_->Write(mem_buf_.InputBuffer())); - } else { - RETURN_ON_ERR(aligned_buf_->Write(mem_buf_.InputBuffer())); - } + RETURN_ON_ERR(sink_->Write(mem_buf_.InputBuffer())); mem_buf_.ConsumeInput(sz); return error_code{}; @@ -705,37 +690,37 @@ AlignedBuffer::~AlignedBuffer() { mi_free(aligned_buf_); } -// TODO: maybe to derive AlignedBuffer from Sink? -std::error_code AlignedBuffer::Write(io::Bytes record) { - if (buf_offs_ + record.size() < capacity_) { - memcpy(aligned_buf_ + buf_offs_, record.data(), record.size()); - buf_offs_ += record.size(); - return error_code{}; +io::Result AlignedBuffer::WriteSome(const iovec* v, uint32_t len) { + size_t total_len = 0; + uint32_t vindx = 0; + + for (; vindx < len; ++vindx) { + auto item = v[vindx]; + total_len += item.iov_len; + + while (buf_offs_ + item.iov_len > capacity_) { + size_t to_write = capacity_ - buf_offs_; + memcpy(aligned_buf_ + buf_offs_, item.iov_base, to_write); + iovec ivec{.iov_base = aligned_buf_, .iov_len = capacity_}; + error_code ec = upstream_->Write(&ivec, 1); + if (ec) + return nonstd::make_unexpected(ec); + + item.iov_len -= to_write; + item.iov_base = reinterpret_cast(item.iov_base) + to_write; + buf_offs_ = 0; + } + + DCHECK_GT(item.iov_len, 0u); + memcpy(aligned_buf_ + buf_offs_, item.iov_base, item.iov_len); + buf_offs_ += item.iov_len; } - memcpy(aligned_buf_ + buf_offs_, record.data(), capacity_ - buf_offs_); - size_t record_offs = capacity_ - buf_offs_; - buf_offs_ = 0; - size_t needed; - do { - iovec ivec{.iov_base = aligned_buf_, .iov_len = capacity_}; - RETURN_ON_ERR(upstream_->Write(&ivec, 1)); - needed = record.size() - record_offs; - if (needed < capacity_) - break; - - memcpy(aligned_buf_, record.data() + record_offs, capacity_); - record_offs += capacity_; - } while (true); - - if (needed) { - memcpy(aligned_buf_, record.data() + record_offs, needed); - buf_offs_ = needed; - } - - return error_code{}; + return total_len; } +// Note that it may write more than AlignedBuffer has at this point since it rounds up the length +// to the nearest page boundary. error_code AlignedBuffer::Flush() { size_t len = (buf_offs_ + kAmask) & (~kAmask); iovec ivec{.iov_base = aligned_buf_, .iov_len = len}; @@ -748,7 +733,7 @@ class RdbSaver::Impl { public: // We pass K=sz to say how many producers are pushing data in order to maintain // correct closing semantics - channel is closing when K producers marked it as closed. - Impl(unsigned producers_len, io::Sink* sink); + Impl(bool align_writes, unsigned producers_len, io::Sink* sink); error_code SaveAuxFieldStrStr(string_view key, string_view val); @@ -758,10 +743,13 @@ class RdbSaver::Impl { error_code ConsumeChannel(); - void StartSnapshotting(EngineShard* shard); + void StartSnapshotting(bool include_journal_changes, EngineShard* shard); error_code Flush() { - return aligned_buf_.Flush(); + if (aligned_buf_) + return aligned_buf_->Flush(); + + return error_code{}; } size_t Size() const { @@ -771,20 +759,23 @@ class RdbSaver::Impl { void FillFreqMap(RdbTypeFreqMap* dest) const; private: - AlignedBuffer aligned_buf_; - + io::Sink* sink_; // used for serializing non-body components in the calling fiber. RdbSerializer meta_serializer_; vector> shard_snapshots_; SliceSnapshot::RecordChannel channel_; + std::optional aligned_buf_; }; // We pass K=sz to say how many producers are pushing data in order to maintain // correct closing semantics - channel is closing when K producers marked it as closed. -RdbSaver::Impl::Impl(unsigned producers_len, io::Sink* sink) - : aligned_buf_(kBufLen, sink), meta_serializer_(&aligned_buf_), +RdbSaver::Impl::Impl(bool align_writes, unsigned producers_len, io::Sink* sink) + : sink_(sink), meta_serializer_(sink), shard_snapshots_(producers_len), channel_{128, producers_len} { - + if (align_writes) { + aligned_buf_.emplace(kBufLen, sink); + meta_serializer_.set_sink(&aligned_buf_.value()); + } } error_code RdbSaver::Impl::SaveAuxFieldStrStr(string_view key, string_view val) { @@ -799,8 +790,6 @@ error_code RdbSaver::Impl::SaveAuxFieldStrStr(string_view key, string_view val) error_code RdbSaver::Impl::ConsumeChannel() { error_code io_error; - // we can not exit on io-error since we spawn fibers that push data. - // TODO: we may signal them to stop processing and exit asap in case of the error. uint8_t buf[16]; size_t channel_bytes = 0; SliceSnapshot::DbRecord record; @@ -808,6 +797,9 @@ error_code RdbSaver::Impl::ConsumeChannel() { buf[0] = RDB_OPCODE_SELECTDB; + // we can not exit on io-error since we spawn fibers that push data. + // TODO: we may signal them to stop processing and exit asap in case of the error. + auto& channel = channel_; while (channel.Pop(record)) { if (io_error) @@ -816,9 +808,13 @@ error_code RdbSaver::Impl::ConsumeChannel() { do { if (record.db_index != last_db_index) { unsigned enclen = SerializeLen(record.db_index, buf + 1); - char* str = (char*)buf; + string_view str{(char*)buf, enclen + 1}; - io_error = aligned_buf_.Write(string_view{str, enclen + 1}); + if (aligned_buf_) { + io_error = aligned_buf_->Write(str); + } else { + io_error = sink_->Write(io::Buffer(str)); + } if (io_error) break; last_db_index = record.db_index; @@ -826,7 +822,12 @@ error_code RdbSaver::Impl::ConsumeChannel() { DVLOG(2) << "Pulled " << record.id; channel_bytes += record.value.size(); - io_error = aligned_buf_.Write(record.value); + + if (aligned_buf_) { + io_error = aligned_buf_->Write(record.value); + } else { + io_error = sink_->Write(io::Buffer(record.value)); + } record.value.clear(); } while (!io_error && channel.TryPop(record)); } // while (channel.pop) @@ -844,10 +845,10 @@ error_code RdbSaver::Impl::ConsumeChannel() { return io_error; } -void RdbSaver::Impl::StartSnapshotting(EngineShard* shard) { +void RdbSaver::Impl::StartSnapshotting(bool include_journal_changes, EngineShard* shard) { auto s = make_unique(&shard->db_slice(), &channel_); - s->Start(); + s->Start(include_journal_changes); // For single shard configuration, we maintain only one snapshot, // so we do not have to map it via shard_id. @@ -863,10 +864,10 @@ void RdbSaver::Impl::FillFreqMap(RdbTypeFreqMap* dest) const { } } -RdbSaver::RdbSaver(::io::Sink* sink, bool single_shard) { +RdbSaver::RdbSaver(::io::Sink* sink, bool single_shard, bool align_writes) { CHECK_NOTNULL(sink); - impl_.reset(new Impl(single_shard ? 1 : shard_set->size(), sink)); + impl_.reset(new Impl(align_writes, single_shard ? 1 : shard_set->size(), sink)); } RdbSaver::~RdbSaver() { @@ -904,8 +905,8 @@ error_code RdbSaver::SaveBody(RdbTypeFreqMap* freq_map) { return error_code{}; } -void RdbSaver::StartSnapshotInShard(EngineShard* shard) { - impl_->StartSnapshotting(shard); +void RdbSaver::StartSnapshotInShard(bool include_journal_changes, EngineShard* shard) { + impl_->StartSnapshotting(include_journal_changes, shard); } error_code RdbSaver::SaveAux(const StringVec& lua_scripts) { diff --git a/src/server/rdb_save.h b/src/server/rdb_save.h index 153079079..278fdcb93 100644 --- a/src/server/rdb_save.h +++ b/src/server/rdb_save.h @@ -40,21 +40,24 @@ class LinuxWriteWrapper : public io::Sink { off_t offset_ = 0; }; -class AlignedBuffer { +class AlignedBuffer : public ::io::Sink { public: + using io::Sink::Write; + AlignedBuffer(size_t cap, ::io::Sink* upstream); ~AlignedBuffer(); - // TODO: maybe to derive AlignedBuffer from Sink? std::error_code Write(std::string_view buf) { return Write(io::Buffer(buf)); } - std::error_code Write(io::Bytes buf); + io::Result WriteSome(const iovec* v, uint32_t len) final; std::error_code Flush(); - ::io::Sink* upstream() { return upstream_;} + ::io::Sink* upstream() { + return upstream_; + } private: size_t capacity_; @@ -70,7 +73,9 @@ class RdbSaver { // to snapshot all the datastore shards. // single_shard - false, means we capture all the data using a single RdbSaver instance // (corresponds to legacy, redis compatible mode) - explicit RdbSaver(::io::Sink* sink, bool single_shard); + // if align_writes is true - writes data in aligned chunks of 4KB to fit direct I/O requirements. + explicit RdbSaver(::io::Sink* sink, bool single_shard, bool align_writes); + ~RdbSaver(); std::error_code SaveHeader(const StringVec& lua_scripts); @@ -81,7 +86,8 @@ class RdbSaver { std::error_code SaveBody(RdbTypeFreqMap* freq_map); // Initiates the serialization in the shard's thread. - void StartSnapshotInShard(EngineShard* shard); + // TODO: to implement break functionality to allow stopping early. + void StartSnapshotInShard(bool include_journal_changes, EngineShard* shard); private: class Impl; @@ -94,13 +100,12 @@ class RdbSaver { std::unique_ptr impl_; }; -// TODO: it does not make sense that RdbSerializer will buffer into unaligned -// mem_buf_ and then flush it into the next level. We should probably use AlignedBuffer -// directly. class RdbSerializer { public: + // TODO: for aligned cased, it does not make sense that RdbSerializer buffers into unaligned + // mem_buf_ and then flush it into the next level. We should probably use AlignedBuffer + // directly. RdbSerializer(::io::Sink* s); - RdbSerializer(AlignedBuffer* aligned_buf); ~RdbSerializer(); @@ -117,6 +122,7 @@ class RdbSerializer { // Must be called in the thread to which `it` belongs. // Returns the serialized rdb_type or the error. + // expire_ms = 0 means no expiry. io::Result SaveEntry(const PrimeKey& pk, const PrimeValue& pv, uint64_t expire_ms); std::error_code WriteRaw(const ::io::Bytes& buf); std::error_code SaveString(std::string_view val); @@ -143,8 +149,7 @@ class RdbSerializer { std::error_code SaveStreamPEL(rax* pel, bool nacks); std::error_code SaveStreamConsumers(streamCG* cg); - ::io::Sink* sink_ = nullptr; - AlignedBuffer* aligned_buf_ = nullptr; + ::io::Sink* sink_; std::unique_ptr lzf_; base::IoBuf mem_buf_; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 50f51ffaf..acf8226a6 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -8,7 +8,6 @@ #include // for master_id_ generation. #include #include - #include #include @@ -71,6 +70,8 @@ using util::http::StringResponse; namespace { +const auto kRdbWriteFlags = O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC | O_DIRECT; + using EngineFunc = void (ServerFamily::*)(CmdArgList args, ConnectionContext* cntx); inline CommandId::Handler HandlerFunc(ServerFamily* se, EngineFunc f) { @@ -152,7 +153,7 @@ bool IsValidSaveScheduleNibble(string_view time, unsigned int max) { class RdbSnapshot { public: RdbSnapshot(bool single_shard, uring::LinuxFile* fl) - : file_(fl), linux_sink_(fl), saver_(&linux_sink_, single_shard) { + : file_(fl), linux_sink_(fl), saver_(&linux_sink_, single_shard, kRdbWriteFlags & O_DIRECT) { } error_code Start(const StringVec& lua_scripts); @@ -191,7 +192,7 @@ error_code RdbSnapshot::Close() { } void RdbSnapshot::StartInShard(EngineShard* shard) { - saver_.StartSnapshotInShard(shard); + saver_.StartSnapshotInShard(false, shard); started_ = true; } @@ -279,19 +280,9 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) { journal_.reset(new journal::Journal); { - // TODO: if we start using random generator in more places, we should probably - // refactor this code. - absl::InsecureBitGen eng; - absl::uniform_int_distribution ud; - - absl::AlphaNum a1(absl::Hex(eng(), absl::kZeroPad16)); - absl::AlphaNum a2(absl::Hex(eng(), absl::kZeroPad16)); - absl::AlphaNum a3(absl::Hex(ud(eng), absl::kZeroPad8)); - absl::StrAppend(&master_id_, a1, a2, a3); - - size_t constexpr kConfigRunIdSize = CONFIG_RUN_ID_SIZE; - DCHECK_EQ(kConfigRunIdSize, master_id_.size()); + master_id_ = GetRandomHex(eng, CONFIG_RUN_ID_SIZE); + DCHECK_EQ(CONFIG_RUN_ID_SIZE, master_id_.size()); } } @@ -472,6 +463,11 @@ error_code ServerFamily::LoadRdb(const std::string& rdb_file) { RdbLoader loader(script_mgr()); ec = loader.Load(&fs); + if (!ec) { + LOG(INFO) << "Done loading RDB, keys loaded: " << loader.keys_loaded(); + LOG(INFO) << "Loading finished after " + << strings::HumanReadableElapsedTime(loader.load_time()); + } } else { ec = res.error(); } @@ -556,8 +552,8 @@ void PrintPrometheusMetrics(const Metrics& m, StringResponse* resp) { &resp->body()); AppendMetricWithoutLabels("memory_used_peak_bytes", "", used_mem_peak.load(memory_order_relaxed), MetricType::GAUGE, &resp->body()); - AppendMetricWithoutLabels("comitted_memory", "", GetMallocCurrentCommitted(), - MetricType::GAUGE, &resp->body()); + AppendMetricWithoutLabels("comitted_memory", "", GetMallocCurrentCommitted(), MetricType::GAUGE, + &resp->body()); AppendMetricWithoutLabels("memory_max_bytes", "", max_memory_limit, MetricType::GAUGE, &resp->body()); @@ -618,6 +614,10 @@ void ServerFamily::PauseReplication(bool pause) { } } +void ServerFamily::OnClose(ConnectionContext* cntx) { + dfly_cmd_->OnClose(cntx); +} + void ServerFamily::StatsMC(std::string_view section, facade::ConnectionContext* cntx) { if (!section.empty()) { return cntx->reply_builder()->SendError(""); @@ -697,7 +697,6 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er service_.SwitchState(GlobalState::SAVING, GlobalState::ACTIVE); }; - const auto kFlags = O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC | O_DIRECT; auto start = absl::Now(); shared_ptr save_info; StringVec lua_scripts = script_mgr_->GetLuaScripts(); @@ -745,7 +744,7 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er abs_path += shard_file; VLOG(1) << "Saving to " << abs_path; - auto res = uring::OpenLinux(abs_path.generic_string(), kFlags, 0666); + auto res = uring::OpenLinux(abs_path.generic_string(), kRdbWriteFlags, 0666); if (res) { snapshots[sid].reset(new RdbSnapshot{true, res.value().release()}); @@ -773,7 +772,7 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er ExtendFilename(now, -1, &filename); path += filename; - auto res = uring::OpenLinux(path.generic_string(), kFlags, 0666); + auto res = uring::OpenLinux(path.generic_string(), kRdbWriteFlags, 0666); if (!res) { return res.error(); } @@ -862,6 +861,10 @@ void ServerFamily::DbSize(CmdArgList args, ConnectionContext* cntx) { return (*cntx)->SendLong(num_keys.load(memory_order_relaxed)); } +void ServerFamily::BreakOnShutdown() { + dfly_cmd_->BreakOnShutdown(); +} + void ServerFamily::FlushDb(CmdArgList args, ConnectionContext* cntx) { DCHECK(cntx->transaction); DoFlush(cntx->transaction, cntx->transaction->db_index()); @@ -910,7 +913,9 @@ void ServerFamily::Client(CmdArgList args, ConnectionContext* cntx) { if (sub_cmd == "SETNAME" && args.size() == 3) { cntx->owner()->SetName(ArgS(args, 2)); return (*cntx)->SendOk(); - } else if (sub_cmd == "LIST") { + } + + if (sub_cmd == "LIST") { vector client_info; fibers::mutex mu; auto cb = [&](util::Connection* conn) { @@ -1377,14 +1382,16 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) { if (cmd == "CAPA") { if (arg == "dragonfly" && args.size() == 3 && i == 1) { uint32_t sid = dfly_cmd_->AllocateSyncSession(); + cntx->owner()->SetName(absl::StrCat("repl_ctrl_", sid)); + string sync_id = absl::StrCat("SYNC", sid); - cntx->conn_state.sync_session_id = sid; + cntx->conn_state.repl_session_id = sid; // The response for 'capa dragonfly' is: (*cntx)->StartArray(3); (*cntx)->SendSimpleString(master_id_); (*cntx)->SendSimpleString(sync_id); - (*cntx)->SendLong(shard_set->size()); + (*cntx)->SendLong(shard_set->pool()->size()); return; } } else { @@ -1487,8 +1494,10 @@ void ServerFamily::Register(CommandRegistry* registry) { << CI{"REPLICAOF", kReplicaOpts, 3, 0, 0, 0}.HFUNC(ReplicaOf) << CI{"REPLCONF", CO::ADMIN | CO::LOADING, -1, 0, 0, 0}.HFUNC(ReplConf) << CI{"ROLE", CO::LOADING | CO::FAST | CO::NOSCRIPT, 1, 0, 0, 0}.HFUNC(Role) - << CI{"SYNC", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Sync) - << CI{"PSYNC", CO::ADMIN | CO::GLOBAL_TRANS, 3, 0, 0, 0}.HFUNC(Psync) + // We won't support DF->REDIS replication for now, hence we do not need to support + // these commands. + // << CI{"SYNC", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Sync) + // << CI{"PSYNC", CO::ADMIN | CO::GLOBAL_TRANS, 3, 0, 0, 0}.HFUNC(Psync) << CI{"SCRIPT", CO::NOSCRIPT, -2, 0, 0, 0}.HFUNC(Script) << CI{"DFLY", CO::ADMIN | CO::GLOBAL_TRANS, -2, 0, 0, 0}.HFUNC(Dfly); } diff --git a/src/server/server_family.h b/src/server/server_family.h index b97449b6f..2e65efe41 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -102,6 +102,10 @@ class ServerFamily { return journal_.get(); } + void OnClose(ConnectionContext* cntx); + + void BreakOnShutdown(); + private: uint32_t shard_count() const { return shard_set->size(); diff --git a/src/server/server_state.h b/src/server/server_state.h index f06763353..fb58a869d 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -22,8 +22,9 @@ class Journal; // Present in every server thread. This class differs from EngineShard. The latter manages // state around engine shards while the former represents coordinator/connection state. // There may be threads that handle engine shards but not IO, there may be threads that handle IO -// but not engine shards and there can be threads that handle both. This class is present only -// for threads that handle IO and manage incoming connections. +// but not engine shards and there can be threads that handle both. +// Instances of ServerState are present only for threads that handle +// IO and manage incoming connections. class ServerState { // public struct - to allow initialization. ServerState(const ServerState&) = delete; void operator=(const ServerState&) = delete; diff --git a/src/server/snapshot.cc b/src/server/snapshot.cc index efd19adce..1b57c15df 100644 --- a/src/server/snapshot.cc +++ b/src/server/snapshot.cc @@ -13,6 +13,8 @@ extern "C" { #include "base/logging.h" #include "server/db_slice.h" +#include "server/engine_shard_set.h" +#include "server/journal/journal.h" #include "server/rdb_save.h" #include "util/fiber_sched_algo.h" #include "util/proactor_base.h" @@ -25,15 +27,14 @@ using namespace chrono_literals; namespace this_fiber = ::boost::this_fiber; using boost::fibers::fiber; -SliceSnapshot::SliceSnapshot(DbSlice* slice, RecordChannel* dest) - : db_slice_(slice), dest_(dest) { +SliceSnapshot::SliceSnapshot(DbSlice* slice, RecordChannel* dest) : db_slice_(slice), dest_(dest) { db_array_ = slice->databases(); } SliceSnapshot::~SliceSnapshot() { } -void SliceSnapshot::Start() { +void SliceSnapshot::Start(bool include_journal_changes) { DCHECK(!fb_.joinable()); auto on_change = [this](DbIndex db_index, const DbSlice::ChangeReq& req) { @@ -42,6 +43,14 @@ void SliceSnapshot::Start() { snapshot_version_ = db_slice_->RegisterOnChange(move(on_change)); VLOG(1) << "DbSaver::Start - saving entries with version less than " << snapshot_version_; + + if (include_journal_changes) { + auto* journal = db_slice_->shard_owner()->journal(); + DCHECK(journal); + journal_cb_id_ = journal->RegisterOnChange( + [this](const journal::Entry& e) { OnJournalEntry(e); }); + } + sfile_.reset(new io::StringFile); rdb_serializer_.reset(new RdbSerializer(sfile_.get())); @@ -49,6 +58,8 @@ void SliceSnapshot::Start() { fb_ = fiber([this] { FiberFunc(); db_slice_->UnregisterOnChange(snapshot_version_); + if (journal_cb_id_) + db_slice_->shard_owner()->journal()->Unregister(journal_cb_id_); }); } @@ -141,15 +152,8 @@ bool SliceSnapshot::FlushSfile(bool force) { } VLOG(2) << "FlushSfile " << sfile_->val.size() << " bytes"; - string tmp = std::move(sfile_->val); // important to move before pushing! - channel_bytes_ += tmp.size(); - DbRecord rec{.db_index = savecb_current_db_, - .id = rec_id_, - .num_records = num_records_in_blob_, - .value = std::move(tmp)}; - DVLOG(2) << "Pushed " << rec_id_; - ++rec_id_; - num_records_in_blob_ = 0; + DbRecord rec = GetDbRecord(savecb_current_db_, std::move(sfile_->val), num_records_in_blob_); + num_records_in_blob_ = 0; // We can not move this line after the push, because Push is blocking. dest_->Push(std::move(rec)); return true; @@ -206,6 +210,32 @@ void SliceSnapshot::OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req) } } +void SliceSnapshot::OnJournalEntry(const journal::Entry& entry) { + CHECK(journal::Op::VAL == entry.opcode); + + PrimeKey pkey{entry.key}; + + if (entry.db_ind == savecb_current_db_) { + ++num_records_in_blob_; + io::Result res = + rdb_serializer_->SaveEntry(pkey, *entry.pval_ptr, entry.expire_ms); + CHECK(res); // we write to StringFile. + } else { + io::StringFile sfile; + RdbSerializer tmp_serializer(&sfile); + + io::Result res = + tmp_serializer.SaveEntry(pkey, *entry.pval_ptr, entry.expire_ms); + CHECK(res); // we write to StringFile. + + error_code ec = tmp_serializer.FlushMem(); + CHECK(!ec && !sfile.val.empty()); + + DbRecord rec = GetDbRecord(entry.db_ind, std::move(sfile.val), 1); + dest_->Push(std::move(rec)); + } +} + unsigned SliceSnapshot::SerializePhysicalBucket(DbIndex db_index, PrimeTable::bucket_iterator it) { DCHECK_LT(it.GetVersion(), snapshot_version_); @@ -234,17 +264,19 @@ unsigned SliceSnapshot::SerializePhysicalBucket(DbIndex db_index, PrimeTable::bu error_code ec = tmp_serializer.FlushMem(); CHECK(!ec && !sfile.val.empty()); - string tmp = std::move(sfile.val); - channel_bytes_ += tmp.size(); - - DbRecord rec{ - .db_index = db_index, .id = rec_id_, .num_records = result, .value = std::move(tmp)}; - DVLOG(2) << "Pushed " << rec_id_; - ++rec_id_; - - dest_->Push(std::move(rec)); + dest_->Push(GetDbRecord(db_index, std::move(sfile.val), result)); } return result; } +auto SliceSnapshot::GetDbRecord(DbIndex db_index, std::string value, unsigned num_records) + -> DbRecord { + channel_bytes_ += value.size(); + auto id = rec_id_++; + DVLOG(2) << "Pushed " << id; + + return DbRecord{ + .db_index = db_index, .id = id, .num_records = num_records, .value = std::move(value)}; +} + } // namespace dfly diff --git a/src/server/snapshot.h b/src/server/snapshot.h index 42af9114d..4684d3683 100644 --- a/src/server/snapshot.h +++ b/src/server/snapshot.h @@ -13,6 +13,10 @@ namespace dfly { +namespace journal { +struct Entry; +} // namespace journal + class RdbSerializer; class SliceSnapshot { @@ -32,7 +36,7 @@ class SliceSnapshot { SliceSnapshot(DbSlice* slice, RecordChannel* dest); ~SliceSnapshot(); - void Start(); + void Start(bool include_journal_changes); void Join(); uint64_t snapshot_version() const { @@ -59,10 +63,12 @@ class SliceSnapshot { bool SaveCb(PrimeIterator it); void OnDbChange(DbIndex db_index, const DbSlice::ChangeReq& req); + void OnJournalEntry(const journal::Entry& entry); // Returns number of entries serialized. // Updates the version of the bucket to snapshot version. unsigned SerializePhysicalBucket(DbIndex db_index, PrimeTable::bucket_iterator it); + DbRecord GetDbRecord(DbIndex db_index, std::string value, unsigned num_records); ::boost::fibers::fiber fb_; @@ -82,6 +88,7 @@ class SliceSnapshot { size_t serialized_ = 0, skipped_ = 0, side_saved_ = 0, savecb_calls_ = 0; uint64_t rec_id_ = 0; uint32_t num_records_in_blob_ = 0; + uint32_t journal_cb_id_ = 0; }; } // namespace dfly diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 50570c89a..ca590d529 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -64,9 +64,10 @@ string_view GetSlice(EngineShard* shard, const PrimeValue& pv, string* tmp) { return pv.GetSlice(tmp); } -inline void RecordJournal(const OpArgs& op_args, const PrimeKey& pkey, const PrimeKey& pvalue) { +inline void RecordJournal(const OpArgs& op_args, string_view key, const PrimeKey& pvalue) { if (op_args.shard->journal()) { - op_args.shard->journal()->RecordEntry(op_args.txid, pkey, pvalue); + journal::Entry entry{op_args.db_ind, op_args.txid, key, pvalue}; + op_args.shard->journal()->RecordEntry(entry); } } @@ -104,7 +105,7 @@ OpResult OpSetRange(const OpArgs& op_args, string_view key, size_t sta memcpy(s.data() + start, value.data(), value.size()); it->second.SetString(s); db_slice.PostUpdate(op_args.db_ind, it, key, !added); - RecordJournal(op_args, it->first, it->second); + RecordJournal(op_args, key, it->second); return it->second.Size(); } @@ -141,8 +142,8 @@ OpResult OpGetRange(const OpArgs& op_args, string_view key, int32_t star return string(slice.substr(start, end - start + 1)); }; -size_t ExtendExisting(const OpArgs& op_args, PrimeIterator it, string_view key, string_view val, - bool prepend) { +size_t ExtendExisting(const OpArgs& op_args, PrimeIterator it, string_view key, + string_view val, bool prepend) { string tmp, new_val; auto* shard = op_args.shard; string_view slice = GetSlice(shard, it->second, &tmp); @@ -155,7 +156,7 @@ size_t ExtendExisting(const OpArgs& op_args, PrimeIterator it, string_view key, db_slice.PreUpdate(op_args.db_ind, it); it->second.SetString(new_val); db_slice.PostUpdate(op_args.db_ind, it, key, true); - RecordJournal(op_args, it->first, it->second); + RecordJournal(op_args, key, it->second); return new_val.size(); } @@ -169,7 +170,7 @@ OpResult ExtendOrSet(const OpArgs& op_args, string_view key, string_vi if (inserted) { it->second.SetString(val); db_slice.PostUpdate(op_args.db_ind, it, key, false); - RecordJournal(op_args, it->first, it->second); + RecordJournal(op_args, key, it->second); return val.size(); } @@ -180,7 +181,7 @@ OpResult ExtendOrSet(const OpArgs& op_args, string_view key, string_vi return ExtendExisting(op_args, it, key, val, prepend); } -OpResult ExtendOrSkip(const OpArgs& op_args, std::string_view key, std::string_view val, +OpResult ExtendOrSkip(const OpArgs& op_args, string_view key, string_view val, bool prepend) { auto& db_slice = op_args.shard->db_slice(); OpResult it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING); @@ -201,7 +202,7 @@ OpResult OpGet(const OpArgs& op_args, string_view key) { return GetString(op_args.shard, pv); } -OpResult OpIncrFloat(const OpArgs& op_args, std::string_view key, double val) { +OpResult OpIncrFloat(const OpArgs& op_args, string_view key, double val) { auto& db_slice = op_args.shard->db_slice(); auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key); @@ -211,7 +212,7 @@ OpResult OpIncrFloat(const OpArgs& op_args, std::string_view key, double char* str = RedisReplyBuilder::FormatDouble(val, buf, sizeof(buf)); it->second.SetString(str); db_slice.PostUpdate(op_args.db_ind, it, key, false); - RecordJournal(op_args, it->first, it->second); + RecordJournal(op_args, key, it->second); return val; } @@ -243,13 +244,13 @@ OpResult OpIncrFloat(const OpArgs& op_args, std::string_view key, double db_slice.PreUpdate(op_args.db_ind, it); it->second.SetString(str); db_slice.PostUpdate(op_args.db_ind, it, key, true); - RecordJournal(op_args, it->first, it->second); + RecordJournal(op_args, key, it->second); return base; } // if skip_on_missing - returns KEY_NOTFOUND. -OpResult OpIncrBy(const OpArgs& op_args, std::string_view key, int64_t incr, +OpResult OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr, bool skip_on_missing) { auto& db_slice = op_args.shard->db_slice(); @@ -270,7 +271,7 @@ OpResult OpIncrBy(const OpArgs& op_args, std::string_view key, int64_t return OpStatus::OUT_OF_MEMORY; } - RecordJournal(op_args, it->first, it->second); + RecordJournal(op_args, key, it->second); return incr; } @@ -295,7 +296,7 @@ OpResult OpIncrBy(const OpArgs& op_args, std::string_view key, int64_t db_slice.PreUpdate(op_args.db_ind, it); it->second.SetInt(new_val); db_slice.PostUpdate(op_args.db_ind, it, key); - RecordJournal(op_args, it->first, it->second); + RecordJournal(op_args, key, it->second); return new_val; } @@ -393,7 +394,7 @@ OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value } } - RecordJournal(op_args_, it->first, it->second); + RecordJournal(op_args_, key, it->second); return OpStatus::OK; } @@ -447,7 +448,7 @@ OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIt } db_slice.PostUpdate(params.db_index, it, key); - RecordJournal(op_args_, it->first, it->second); + RecordJournal(op_args_, key, it->second); return OpStatus::OK; } @@ -572,7 +573,7 @@ void StringFamily::SetNx(CmdArgList args, ConnectionContext* cntx) { void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) { get_qps.Inc(); - std::string_view key = ArgS(args, 1); + string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) { return OpGet(t->GetOpArgs(shard), key); }; @@ -596,8 +597,8 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) { } void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) { - std::string_view key = ArgS(args, 1); - std::string_view value = ArgS(args, 2); + string_view key = ArgS(args, 1); + string_view value = ArgS(args, 2); std::optional prev_val; SetCmd::SetParams sparams{cntx->db_index()}; @@ -624,15 +625,15 @@ void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) { } void StringFamily::Incr(CmdArgList args, ConnectionContext* cntx) { - std::string_view key = ArgS(args, 1); + string_view key = ArgS(args, 1); return IncrByGeneric(key, 1, cntx); } void StringFamily::IncrBy(CmdArgList args, ConnectionContext* cntx) { DCHECK_EQ(3u, args.size()); - std::string_view key = ArgS(args, 1); - std::string_view sval = ArgS(args, 2); + string_view key = ArgS(args, 1); + string_view sval = ArgS(args, 2); int64_t val; if (!absl::SimpleAtoi(sval, &val)) { @@ -642,8 +643,8 @@ void StringFamily::IncrBy(CmdArgList args, ConnectionContext* cntx) { } void StringFamily::IncrByFloat(CmdArgList args, ConnectionContext* cntx) { - std::string_view key = ArgS(args, 1); - std::string_view sval = ArgS(args, 2); + string_view key = ArgS(args, 1); + string_view sval = ArgS(args, 2); double val; if (!absl::SimpleAtod(sval, &val)) { @@ -666,13 +667,13 @@ void StringFamily::IncrByFloat(CmdArgList args, ConnectionContext* cntx) { } void StringFamily::Decr(CmdArgList args, ConnectionContext* cntx) { - std::string_view key = ArgS(args, 1); + string_view key = ArgS(args, 1); return IncrByGeneric(key, -1, cntx); } void StringFamily::DecrBy(CmdArgList args, ConnectionContext* cntx) { - std::string_view key = ArgS(args, 1); - std::string_view sval = ArgS(args, 2); + string_view key = ArgS(args, 1); + string_view sval = ArgS(args, 2); int64_t val; if (!absl::SimpleAtoi(sval, &val)) { @@ -693,7 +694,7 @@ void StringFamily::Prepend(CmdArgList args, ConnectionContext* cntx) { ExtendGeneric(std::move(args), true, cntx); } -void StringFamily::IncrByGeneric(std::string_view key, int64_t val, ConnectionContext* cntx) { +void StringFamily::IncrByGeneric(string_view key, int64_t val, ConnectionContext* cntx) { bool skip_on_missing = cntx->protocol() == Protocol::MEMCACHE; auto cb = [&](Transaction* t, EngineShard* shard) { @@ -725,8 +726,8 @@ void StringFamily::IncrByGeneric(std::string_view key, int64_t val, ConnectionCo } void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContext* cntx) { - std::string_view key = ArgS(args, 1); - std::string_view sval = ArgS(args, 2); + string_view key = ArgS(args, 1); + string_view sval = ArgS(args, 2); if (cntx->protocol() == Protocol::REDIS) { auto cb = [&](Transaction* t, EngineShard* shard) { diff --git a/src/server/transaction.cc b/src/server/transaction.cc index f0790e294..1d81af00b 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -1160,7 +1160,7 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) { return false; } -void Transaction::BreakOnClose() { +void Transaction::BreakOnShutdown() { if (coordinator_state_ & COORD_BLOCKED) { coordinator_state_ |= COORD_CANCELLED; blocking_ec_.notify(); diff --git a/src/server/transaction.h b/src/server/transaction.h index b3fa6d462..d222d874f 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -169,7 +169,7 @@ class Transaction { // this transaction has been awaked. bool NotifySuspended(TxId committed_ts, ShardId sid); - void BreakOnClose(); + void BreakOnShutdown(); // Called by EngineShard when performing Execute over the tx queue. // Returns true if transaction should be kept in the queue.