From 3bbdb8e4b1ab5cce9700b282008c1c5743c78e00 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Sun, 17 Jul 2022 21:12:50 +0300 Subject: [PATCH] feat(journal): Introduce basic journal support. (#211) 1. No entries data is written into a journal yet. 2. Introduced a state machine to start and stop journal using a new auxillary command "dfly". 3. string_family currently calls an universal command Journal::RecordEntry that should save the current key/value of that entry. Please note that we won't use it for all the operations because for some it's more efficient to record the action itself than the final value. 4. no locking information is recorded yet so atomicity of multi-key operations is not preserved for now. Signed-off-by: Roman Gershman --- src/server/CMakeLists.txt | 18 +- src/server/common.h | 18 +- src/server/debugcmd.cc | 3 +- src/server/dflycmd.cc | 124 +++++++ src/server/dflycmd.h | 38 ++ src/server/engine_shard_set.cc | 7 +- src/server/engine_shard_set.h | 13 + src/server/generic_family.cc | 14 +- src/server/hset_family.cc | 22 +- src/server/journal/journal.cc | 126 +++++++ src/server/journal/journal.h | 73 ++++ src/server/journal/journal_shard.cc | 115 +++++++ src/server/journal/journal_shard.h | 56 +++ src/server/list_family.cc | 24 +- src/server/main_service.cc | 3 - src/server/replica.cc | 9 +- src/server/server_family.cc | 16 +- src/server/server_family.h | 15 +- src/server/server_state.h | 18 +- src/server/set_family.cc | 27 +- src/server/stream_family.cc | 29 +- src/server/string_family.cc | 514 +++++++++++++++------------- src/server/string_family.h | 25 +- src/server/transaction.cc | 16 +- src/server/transaction.h | 4 + src/server/zset_family.cc | 39 +-- 26 files changed, 978 insertions(+), 388 deletions(-) create mode 100644 src/server/dflycmd.cc create mode 100644 src/server/dflycmd.h create mode 100644 src/server/journal/journal.cc create mode 100644 src/server/journal/journal.h create mode 100644 src/server/journal/journal_shard.cc create mode 100644 src/server/journal/journal_shard.h diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 735b245cd..b7373b244 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -1,16 +1,20 @@ add_executable(dragonfly dfly_main.cc) cxx_link(dragonfly base dragonfly_lib) -add_library(dragonfly_lib blocking_controller.cc channel_slice.cc command_registry.cc - common.cc config_flags.cc - conn_context.cc db_slice.cc debugcmd.cc - engine_shard_set.cc generic_family.cc hset_family.cc io_mgr.cc +add_library(dfly_transaction db_slice.cc engine_shard_set.cc blocking_controller.cc common.cc + io_mgr.cc journal/journal.cc journal/journal_shard.cc table.cc + tiered_storage.cc transaction.cc) +cxx_link(dfly_transaction uring_fiber_lib dfly_core strings_lib) + +add_library(dragonfly_lib channel_slice.cc command_registry.cc + config_flags.cc conn_context.cc debugcmd.cc dflycmd.cc + generic_family.cc hset_family.cc list_family.cc main_service.cc rdb_load.cc rdb_save.cc replica.cc snapshot.cc script_mgr.cc server_family.cc - set_family.cc stream_family.cc string_family.cc table.cc tiered_storage.cc - transaction.cc zset_family.cc version.cc) + set_family.cc stream_family.cc string_family.cc + zset_family.cc version.cc) -cxx_link(dragonfly_lib dfly_core dfly_facade redis_lib strings_lib html_lib) +cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib) add_library(dfly_test_lib test_utils.cc) cxx_link(dfly_test_lib dragonfly_lib facade_test gtest_main_ext) diff --git a/src/server/common.h b/src/server/common.h index 146bd858e..0194e667a 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -21,13 +21,14 @@ constexpr int64_t kMaxExpireDeadlineSec = (1u << 27) - 1; using DbIndex = uint16_t; using ShardId = uint16_t; +using LSN = uint64_t; using TxId = uint64_t; using TxClock = uint64_t; -using facade::MutableSlice; +using facade::ArgS; using facade::CmdArgList; using facade::CmdArgVec; -using facade::ArgS; +using facade::MutableSlice; using ArgSlice = absl::Span; using StringVec = std::vector; @@ -55,8 +56,8 @@ struct KeyIndex { // relevant for for commands like ZUNIONSTORE/ZINTERSTORE for destination key. unsigned bonus = 0; unsigned start; - unsigned end; // does not include this index (open limit). - unsigned step; // 1 for commands like mget. 2 for commands like mset. + unsigned end; // does not include this index (open limit). + unsigned step; // 1 for commands like mget. 2 for commands like mset. bool HasSingleKey() const { return bonus == 0 && (start + step >= end); @@ -69,9 +70,15 @@ struct KeyIndex { struct OpArgs { EngineShard* shard; + TxId txid; DbIndex db_ind; -}; + OpArgs() : shard(nullptr), txid(0), db_ind(0) { + } + + OpArgs(EngineShard* s, TxId i, DbIndex d) : shard(s), txid(i), db_ind(d) { + } +}; struct TieredStats { size_t external_reads = 0; @@ -118,7 +125,6 @@ extern size_t max_memory_limit; // set upon server start. extern unsigned kernel_version; - const char* GlobalStateName(GlobalState gs); } // namespace dfly diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index d8dbfba3e..09b81cf1e 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -60,7 +60,8 @@ struct ObjInfo { void DoPopulateBatch(std::string_view prefix, size_t val_size, const SetCmd::SetParams& params, const PopulateBatch& batch) { - SetCmd sg(&EngineShard::tlocal()->db_slice()); + OpArgs op_args(EngineShard::tlocal(), 0, params.db_index); + SetCmd sg(op_args); for (unsigned i = 0; i < batch.sz; ++i) { string key = absl::StrCat(prefix, ":", batch.index[i]); diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc new file mode 100644 index 000000000..640fb3287 --- /dev/null +++ b/src/server/dflycmd.cc @@ -0,0 +1,124 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// +#include "server/dflycmd.h" + +#include +#include + +#include "base/flags.h" +#include "base/logging.h" +#include "facade/dragonfly_connection.h" + +#include "server/engine_shard_set.h" +#include "server/error.h" +#include "server/journal/journal.h" +#include "server/server_state.h" +#include "server/transaction.h" + +using namespace std; + +ABSL_DECLARE_FLAG(string, dir); + +namespace dfly { + +using namespace facade; +using namespace std; +using util::ProactorBase; + +DflyCmd::DflyCmd(util::ListenerInterface* listener, journal::Journal* journal) : listener_(listener), journal_(journal) { +} + +void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) { + DCHECK_GE(args.size(), 2u); + + ToUpper(&args[1]); + RedisReplyBuilder* rb = static_cast(cntx->reply_builder()); + + std::string_view sub_cmd = ArgS(args, 1); + if (sub_cmd == "JOURNAL") { + if (args.size() < 3) { + return rb->SendError(WrongNumArgsError("DFLY JOURNAL")); + } + HandleJournal(args, cntx); + return; + } + + if (sub_cmd == "THREAD") { + util::ProactorPool* pool = shard_set->pool(); + + if (args.size() == 2) { // DFLY THREAD : returns connection thread index and number of threads. + rb->StartArray(2); + rb->SendLong(ProactorBase::GetIndex()); + rb->SendLong(long(pool->size())); + return; + } + + // DFLY THREAD to_thread : migrates current connection to a different thread. + std::string_view arg = ArgS(args, 2); + unsigned num_thread; + if (!absl::SimpleAtoi(arg, &num_thread)) { + return rb->SendError(kSyntaxErr); + } + + if (num_thread < pool->size()) { + if (int(num_thread) != ProactorBase::GetIndex()) { + listener_->Migrate(cntx->owner(), pool->at(num_thread)); + } + + return rb->SendOk(); + } + + rb->SendError(kInvalidIntErr); + return; + } + + rb->SendError(kSyntaxErr); +} + + +void DflyCmd::HandleJournal(CmdArgList args, ConnectionContext* cntx) { + DCHECK_GE(args.size(), 3u); + ToUpper(&args[2]); + + std::string_view sub_cmd = ArgS(args, 2); + Transaction* trans = cntx->transaction; + DCHECK(trans); + RedisReplyBuilder* rb = static_cast(cntx->reply_builder()); + + if (sub_cmd == "START") { + unique_lock lk(mu_); + journal::Journal* journal = ServerState::tlocal()->journal(); + if (!journal) { + string dir = absl::GetFlag(FLAGS_dir); + journal_->StartLogging(dir); + trans->Schedule(); + auto barrier_cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; + trans->Execute(barrier_cb, true); + + // tx id starting from which we may reliably fetch journal records. + journal_txid_ = trans->txid(); + } + + return rb->SendLong(journal_txid_); + } + + if (sub_cmd == "STOP") { + unique_lock lk(mu_); + if (journal_->EnterLameDuck()) { + auto barrier_cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; + trans->ScheduleSingleHop(std::move(barrier_cb)); + + auto ec = journal_->Close(); + LOG_IF(ERROR, ec) << "Error closing journal " << ec; + journal_txid_ = trans->txid(); + } + + return rb->SendLong(journal_txid_); + } + + string reply = UnknownSubCmd(sub_cmd, "DFLY"); + return rb->SendError(reply, kSyntaxErrType); +} + +} // namespace dfly diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h new file mode 100644 index 000000000..df99b9fba --- /dev/null +++ b/src/server/dflycmd.h @@ -0,0 +1,38 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include + +#include "server/conn_context.h" + +namespace util { +class ListenerInterface; +} // namespace util + +namespace dfly { + +class EngineShardSet; + +namespace journal { +class Journal; +} // namespace journal + +class DflyCmd { + public: + DflyCmd(util::ListenerInterface* listener, journal::Journal* journal); + + void Run(CmdArgList args, ConnectionContext* cntx); + + private: + void HandleJournal(CmdArgList args, ConnectionContext* cntx); + + util::ListenerInterface* listener_; + journal::Journal* journal_; + ::boost::fibers::mutex mu_; + TxId journal_txid_ = 0; +}; + +} // namespace dfly diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 1e6ea49b1..273be74d3 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -27,7 +27,9 @@ ABSL_FLAG(uint32_t, hz, 1000, "and performs other background tasks. Warning: not advised to decrease in production, " "because it can affect expiry precision for PSETEX etc."); -ABSL_DECLARE_FLAG(bool, cache_mode); +ABSL_FLAG(bool, cache_mode, false, + "If true, the backend behaves like a cache, " + "by evicting entries when getting close to maxmemory limit"); namespace dfly { @@ -42,8 +44,9 @@ vector cached_stats; // initialized in EngineShard } // namespace -thread_local EngineShard* EngineShard::shard_ = nullptr; constexpr size_t kQueueLen = 64; + +thread_local EngineShard* EngineShard::shard_ = nullptr; EngineShardSet* shard_set = nullptr; EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) { diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index e09ba02b6..5ff6ea81d 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -26,6 +26,10 @@ extern "C" { namespace dfly { +namespace journal { +class Journal; +} // namespace journal + class TieredStorage; class BlockingController; @@ -134,6 +138,14 @@ class EngineShard { return counter_[unsigned(type)].SumTail(); } + journal::Journal* journal() { + return journal_; + } + + void set_journal(journal::Journal* j) { + journal_ = j; + } + void TEST_EnableHeartbeat(); private: @@ -160,6 +172,7 @@ class EngineShard { // Logical ts used to order distributed transactions. TxId committed_txid_ = 0; Transaction* continuation_trans_ = nullptr; + journal::Journal* journal_ = nullptr; IntentLock shard_lock_; uint32_t periodic_task_ = 0; diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index d757283c8..5d46d753d 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -244,7 +244,7 @@ uint64_t ScanGeneric(uint64_t cursor, const ScanOpts& scan_opts, StringVec* keys do { ess->Await(sid, [&] { - OpArgs op_args{EngineShard::tlocal(), cntx->conn_state.db_index}; + OpArgs op_args{EngineShard::tlocal(), 0, cntx->conn_state.db_index}; OpScan(op_args, scan_opts, &cursor, keys); }); @@ -281,7 +281,7 @@ void GenericFamily::Del(CmdArgList args, ConnectionContext* cntx) { auto cb = [&result](const Transaction* t, EngineShard* shard) { ArgSlice args = t->ShardArgsInShard(shard->shard_id()); - auto res = OpDel(OpArgs{shard, t->db_index()}, args); + auto res = OpDel(t->GetOpArgs(shard), args); result.fetch_add(res.value_or(0), memory_order_relaxed); return OpStatus::OK; @@ -332,7 +332,7 @@ void GenericFamily::Exists(CmdArgList args, ConnectionContext* cntx) { auto cb = [&result](Transaction* t, EngineShard* shard) { ArgSlice args = t->ShardArgsInShard(shard->shard_id()); - auto res = OpExists(OpArgs{shard, t->db_index()}, args); + auto res = OpExists(t->GetOpArgs(shard), args); result.fetch_add(res.value_or(0), memory_order_relaxed); return OpStatus::OK; @@ -362,7 +362,7 @@ void GenericFamily::Expire(CmdArgList args, ConnectionContext* cntx) { ExpireParams params{.ts = int_arg}; auto cb = [&](Transaction* t, EngineShard* shard) { - return OpExpire(OpArgs{shard, t->db_index()}, key, params); + return OpExpire(t->GetOpArgs(shard), key, params); }; OpStatus status = cntx->transaction->ScheduleSingleHop(move(cb)); @@ -381,7 +381,7 @@ void GenericFamily::ExpireAt(CmdArgList args, ConnectionContext* cntx) { ExpireParams params{.ts = int_arg, .absolute = true}; auto cb = [&](Transaction* t, EngineShard* shard) { - return OpExpire(OpArgs{shard, t->db_index()}, key, params); + return OpExpire(t->GetOpArgs(shard), key, params); }; OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -425,7 +425,7 @@ void GenericFamily::PexpireAt(CmdArgList args, ConnectionContext* cntx) { ExpireParams params{.ts = int_arg, .absolute = true, .unit = MSEC}; auto cb = [&](Transaction* t, EngineShard* shard) { - return OpExpire(OpArgs{shard, t->db_index()}, key, params); + return OpExpire(t->GetOpArgs(shard), key, params); }; OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -524,7 +524,7 @@ OpResult GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des if (transaction->unique_shard_cnt() == 1) { auto cb = [&](Transaction* t, EngineShard* shard) { - return OpRen(OpArgs{shard, t->db_index()}, key[0], key[1], skip_exist_dest); + return OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest); }; OpResult result = transaction->ScheduleSingleHopT(std::move(cb)); diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index b3ed44f27..38a54a818 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -99,7 +99,7 @@ void HSetFamily::HDel(CmdArgList args, ConnectionContext* cntx) { args.remove_prefix(2); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpDel(OpArgs{shard, t->db_index()}, key, args); + return OpDel(t->GetOpArgs(shard), key, args); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -114,7 +114,7 @@ void HSetFamily::HLen(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpLen(OpArgs{shard, t->db_index()}, key); + return OpLen(t->GetOpArgs(shard), key); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -157,7 +157,7 @@ void HSetFamily::HMGet(CmdArgList args, ConnectionContext* cntx) { args.remove_prefix(2); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpMGet(OpArgs{shard, t->db_index()}, key, args); + return OpMGet(t->GetOpArgs(shard), key, args); }; OpResult> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -186,7 +186,7 @@ void HSetFamily::HGet(CmdArgList args, ConnectionContext* cntx) { string_view field = ArgS(args, 2); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpGet(OpArgs{shard, t->db_index()}, key, field); + return OpGet(t->GetOpArgs(shard), key, field); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -214,7 +214,7 @@ void HSetFamily::HIncrBy(CmdArgList args, ConnectionContext* cntx) { IncrByParam param{ival}; auto cb = [&](Transaction* t, EngineShard* shard) { - return OpIncrBy(OpArgs{shard, t->db_index()}, key, field, ¶m); + return OpIncrBy(t->GetOpArgs(shard), key, field, ¶m); }; OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -249,7 +249,7 @@ void HSetFamily::HIncrByFloat(CmdArgList args, ConnectionContext* cntx) { IncrByParam param{dval}; auto cb = [&](Transaction* t, EngineShard* shard) { - return OpIncrBy(OpArgs{shard, t->db_index()}, key, field, ¶m); + return OpIncrBy(t->GetOpArgs(shard), key, field, ¶m); }; OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -284,7 +284,7 @@ void HSetFamily::HGetGeneric(CmdArgList args, ConnectionContext* cntx, uint8_t g string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpGetAll(OpArgs{shard, t->db_index()}, key, getall_mask); + return OpGetAll(t->GetOpArgs(shard), key, getall_mask); }; OpResult> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -311,7 +311,7 @@ void HSetFamily::HScan(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpScan(OpArgs{shard, t->db_index()}, key, &cursor); + return OpScan(t->GetOpArgs(shard), key, &cursor); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -339,7 +339,7 @@ void HSetFamily::HSet(CmdArgList args, ConnectionContext* cntx) { args.remove_prefix(2); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpSet(OpArgs{shard, t->db_index()}, key, args, false); + return OpSet(t->GetOpArgs(shard), key, args, false); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -355,7 +355,7 @@ void HSetFamily::HSetNx(CmdArgList args, ConnectionContext* cntx) { args.remove_prefix(2); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpSet(OpArgs{shard, t->db_index()}, key, args, true); + return OpSet(t->GetOpArgs(shard), key, args, true); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -371,7 +371,7 @@ void HSetFamily::HStrLen(CmdArgList args, ConnectionContext* cntx) { string_view field = ArgS(args, 2); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpStrLen(OpArgs{shard, t->db_index()}, key, field); + return OpStrLen(t->GetOpArgs(shard), key, field); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc new file mode 100644 index 000000000..35dac782b --- /dev/null +++ b/src/server/journal/journal.cc @@ -0,0 +1,126 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/journal/journal.h" + +#include + +#include "base/logging.h" +#include "server/engine_shard_set.h" +#include "server/journal/journal_shard.h" +#include "server/server_state.h" + +namespace dfly { +namespace journal { + +namespace fs = std::filesystem; +using namespace std; +using namespace util; +namespace fibers = boost::fibers; + +namespace { + +thread_local JournalShard journal_shard; + +} // namespace + +Journal::Journal() { +} + +error_code Journal::StartLogging(std::string_view dir) { + if (journal_shard.IsOpen()) { + return error_code{}; + } + + auto* pool = shard_set->pool(); + atomic_uint32_t created{0}; + lock_guard lk(state_mu_); + + auto open_cb = [&](auto* pb) { + auto ec = journal_shard.Open(dir, unsigned(ProactorBase::GetIndex())); + if (ec) { + LOG(FATAL) << "Could not create journal " << ec; // TODO + } else { + created.fetch_add(1, memory_order_relaxed); + ServerState::tlocal()->set_journal(this); + EngineShard* shard = EngineShard::tlocal(); + if (shard) { + shard->set_journal(this); + } + } + }; + + pool->AwaitFiberOnAll(open_cb); + + if (created.load(memory_order_acquire) != pool->size()) { + LOG(FATAL) << "TBD / revert"; + } + + return error_code{}; +} + +error_code Journal::Close() { + CHECK(lameduck_.load(memory_order_relaxed)); + + VLOG(1) << "Journal::Close"; + + fibers::mutex ec_mu; + error_code res; + + lock_guard lk(state_mu_); + auto close_cb = [&](auto*) { + ServerState::tlocal()->set_journal(nullptr); + EngineShard* shard = EngineShard::tlocal(); + if (shard) { + shard->set_journal(nullptr); + } + + auto ec = journal_shard.Close(); + + if (ec) { + lock_guard lk2(ec_mu); + res = ec; + } + }; + + shard_set->pool()->AwaitFiberOnAll(close_cb); + + return res; +} + +bool Journal::SchedStartTx(TxId txid, unsigned num_keys, unsigned num_shards) { + if (!journal_shard.IsOpen() || lameduck_.load(memory_order_relaxed)) + return false; + + journal_shard.AddLogRecord(txid, unsigned(Op::SCHED)); + + return true; +} + +LSN Journal::GetLsn() const { + return journal_shard.cur_lsn(); +} + +bool Journal::EnterLameDuck() { + if (!journal_shard.IsOpen()) { + return false; + } + + bool val = false; + bool res = lameduck_.compare_exchange_strong(val, true, memory_order_acq_rel); + return res; +} + +void Journal::OpArgs(TxId txid, Op opcode, Span keys) { + DCHECK(journal_shard.IsOpen()); + + journal_shard.AddLogRecord(txid, unsigned(opcode)); +} + +void Journal::RecordEntry(TxId txid, const PrimeKey& key, const PrimeValue& pval) { + journal_shard.AddLogRecord(txid, unsigned(Op::VAL)); +} + +} // namespace journal +} // namespace dfly diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h new file mode 100644 index 000000000..e26b99cbc --- /dev/null +++ b/src/server/journal/journal.h @@ -0,0 +1,73 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include "server/common.h" +#include "server/table.h" +#include "util/proactor_pool.h" + +namespace dfly { + +class Transaction; + +namespace journal { + +enum class Op : uint8_t { + NOOP = 0, + LOCK = 1, + UNLOCK = 2, + LOCK_SHARD = 3, + UNLOCK_SHARD = 4, + SCHED = 5, + VAL = 10, + DEL, + MSET, +}; + +class Journal { + public: + using Span = absl::Span; + + Journal(); + + std::error_code StartLogging(std::string_view dir); + + // Returns true if journal has been active and changed its state to lameduck mode + // and false otherwise. + bool EnterLameDuck(); // still logs ongoing transactions but refuses to start new ones. + + // Requires: journal is in lameduck mode. + std::error_code Close(); + + // Returns true if transaction was scheduled, false if journal is inactive + // or in lameduck mode and does not log new transactions. + bool SchedStartTx(TxId txid, unsigned num_keys, unsigned num_shards); + + void AddCmd(TxId txid, Op opcode, Span args) { + OpArgs(txid, opcode, args); + } + + void Lock(TxId txid, Span keys) { + OpArgs(txid, Op::LOCK, keys); + } + + void Unlock(TxId txid, Span keys) { + OpArgs(txid, Op::UNLOCK, keys); + } + + LSN GetLsn() const; + + void RecordEntry(TxId txid, const PrimeKey& key, const PrimeValue& pval); + + private: + void OpArgs(TxId id, Op opcode, Span keys); + + mutable boost::fibers::mutex state_mu_; + + std::atomic_bool lameduck_{false}; +}; + +} // namespace journal +} // namespace dfly diff --git a/src/server/journal/journal_shard.cc b/src/server/journal/journal_shard.cc new file mode 100644 index 000000000..fbd0940c3 --- /dev/null +++ b/src/server/journal/journal_shard.cc @@ -0,0 +1,115 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/journal/journal_shard.h" + +#include + +#include +#include + +#include + +#include "base/logging.h" +#include "util/fibers/fibers_ext.h" + +namespace dfly { +namespace journal { +using namespace std; +using namespace util; +namespace fibers = boost::fibers; +namespace fs = std::filesystem; + +namespace { + +string ShardName(std::string_view base, unsigned index) { + return absl::StrCat(base, "-", absl::Dec(index, absl::kZeroPad4), ".log"); +} + +} // namespace + +#define CHECK_EC(x) \ + do { \ + auto __ec$ = (x); \ + CHECK(!__ec$) << "Error: " << __ec$ << " " << __ec$.message() << " for " << #x; \ + } while (false) + + + +JournalShard::JournalShard() { +} + +JournalShard::~JournalShard() { + CHECK(!shard_file_); +} + +std::error_code JournalShard::Open(const std::string_view dir, unsigned index) { + CHECK(!shard_file_); + + fs::path dir_path; + + if (dir.empty()) { + } else { + dir_path = dir; + error_code ec; + + fs::file_status dir_status = fs::status(dir_path, ec); + if (ec) { + if (ec == errc::no_such_file_or_directory) { + fs::create_directory(dir_path, ec); + dir_status = fs::status(dir_path, ec); + } + if (ec) + return ec; + } + // LOG(INFO) << int(dir_status.type()); + } + dir_path.append(ShardName("journal", index)); + shard_path_ = dir_path; + + // For file integrity guidelines see: + // https://lwn.net/Articles/457667/ + // https://www.evanjones.ca/durability-filesystem.html + // NOTE: O_DSYNC is omited. + constexpr auto kJournalFlags = O_CLOEXEC | O_CREAT | O_TRUNC | O_RDWR; + io::Result> res = + uring::OpenLinux(shard_path_, kJournalFlags, 0666); + if (!res) { + return res.error(); + } + DVLOG(1) << "Opened journal " << shard_path_; + + shard_file_ = std::move(res).value(); + shard_index_ = index; + file_offset_ = 0; + status_ec_.clear(); + + return error_code{}; +} + +error_code JournalShard::Close() { + VLOG(1) << "JournalShard::Close"; + + CHECK(shard_file_); + lameduck_ = true; + + auto ec = shard_file_->Close(); + + DVLOG(1) << "Closing " << shard_path_; + LOG_IF(ERROR, ec) << "Error closing journal file " << ec; + shard_file_.reset(); + + return ec; +} + +void JournalShard::AddLogRecord(TxId txid, unsigned opcode) { + string line = absl::StrCat(lsn_, " ", txid, " ", opcode, "\n"); + error_code ec = shard_file_->Write(io::Buffer(line), file_offset_, 0); + CHECK_EC(ec); + file_offset_ += line.size(); + ++lsn_; +} + +} // namespace journal +} // namespace dfly \ No newline at end of file diff --git a/src/server/journal/journal_shard.h b/src/server/journal/journal_shard.h new file mode 100644 index 000000000..f58c4b98b --- /dev/null +++ b/src/server/journal/journal_shard.h @@ -0,0 +1,56 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include +#include +#include +#include + +#include "server/common.h" +#include "util/uring/uring_file.h" + +namespace dfly { +namespace journal { + +class JournalShard { + public: + JournalShard(); + ~JournalShard(); + + std::error_code Open(const std::string_view dir, unsigned index); + + std::error_code Close(); + + LSN cur_lsn() const { + return lsn_; + } + + std::error_code status() const { + return status_ec_; + } + + bool IsOpen() const { + return bool(shard_file_); + } + + void AddLogRecord(TxId txid, unsigned opcode); + + private: + std::string shard_path_; + std::unique_ptr shard_file_; + + size_t file_offset_ = 0; + LSN lsn_ = 1; + + unsigned shard_index_ = -1; + + std::error_code status_ec_; + + bool lameduck_ = false; +}; + +} // namespace journal +} // namespace dfly \ No newline at end of file diff --git a/src/server/list_family.cc b/src/server/list_family.cc index f5238ea4f..aea3787da 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -495,7 +495,7 @@ void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) { if (cntx->transaction->unique_shard_cnt() == 1) { auto cb = [&](Transaction* t, EngineShard* shard) { - return OpRPopLPushSingleShard(OpArgs{shard, t->db_index()}, src, dest); + return OpRPopLPushSingleShard(t->GetOpArgs(shard), src, dest); }; result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -515,7 +515,7 @@ void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) { auto args = t->ShardArgsInShard(shard->shard_id()); DCHECK_EQ(1u, args.size()); bool is_dest = args.front() == dest; - find_res[is_dest] = RPeek(OpArgs{shard, t->db_index()}, args.front(), !is_dest); + find_res[is_dest] = RPeek(t->GetOpArgs(shard), args.front(), !is_dest); return OpStatus::OK; }; @@ -530,7 +530,7 @@ void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) { auto args = t->ShardArgsInShard(shard->shard_id()); bool is_dest = args.front() == dest; - OpArgs op_args{shard, t->db_index()}; + OpArgs op_args = t->GetOpArgs(shard); if (is_dest) { string_view val{find_res[0].value()}; @@ -564,7 +564,7 @@ void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) { void ListFamily::LLen(CmdArgList args, ConnectionContext* cntx) { auto key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpLen(OpArgs{shard, t->db_index()}, key); + return OpLen(t->GetOpArgs(shard), key); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); if (result) { @@ -586,7 +586,7 @@ void ListFamily::LIndex(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpIndex(OpArgs{shard, t->db_index()}, key, index); + return OpIndex(t->GetOpArgs(shard), key, index); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -617,7 +617,7 @@ void ListFamily::LInsert(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpInsert(OpArgs{shard, t->db_index()}, key, pivot, elem, where); + return OpInsert(t->GetOpArgs(shard), key, pivot, elem, where); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -640,7 +640,7 @@ void ListFamily::LTrim(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpTrim(OpArgs{shard, t->db_index()}, key, start, end); + return OpTrim(t->GetOpArgs(shard), key, start, end); }; cntx->transaction->ScheduleSingleHop(std::move(cb)); (*cntx)->SendOk(); @@ -658,7 +658,7 @@ void ListFamily::LRange(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpRange(OpArgs{shard, t->db_index()}, key, start, end); + return OpRange(t->GetOpArgs(shard), key, start, end); }; auto res = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -682,7 +682,7 @@ void ListFamily::LRem(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpRem(OpArgs{shard, t->db_index()}, key, elem, count); + return OpRem(t->GetOpArgs(shard), key, elem, count); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); if (result) { @@ -704,7 +704,7 @@ void ListFamily::LSet(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpSet(OpArgs{shard, t->db_index()}, key, elem, count); + return OpSet(t->GetOpArgs(shard), key, elem, count); }; OpResult result = cntx->transaction->ScheduleSingleHop(std::move(cb)); if (result) { @@ -769,7 +769,7 @@ void ListFamily::PushGeneric(ListDir dir, bool skip_notexists, CmdArgList args, } absl::Span span{vals.data(), vals.size()}; auto cb = [&](Transaction* t, EngineShard* shard) { - return OpPush(OpArgs{shard, t->db_index()}, key, dir, skip_notexists, span); + return OpPush(t->GetOpArgs(shard), key, dir, skip_notexists, span); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -803,7 +803,7 @@ void ListFamily::PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cnt } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpPop(OpArgs{shard, t->db_index()}, key, dir, count, true); + return OpPop(t->GetOpArgs(shard), key, dir, count, true); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 601cbf949..fb5b598e0 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -44,9 +44,6 @@ ABSL_FLAG(uint32_t, memcache_port, 0, "Memcached port"); ABSL_FLAG(uint64_t, maxmemory, 0, "Limit on maximum-memory that is used by the database." "0 - means the program will automatically determine its maximum memory usage"); -ABSL_FLAG(bool, cache_mode, false, - "If true, the backend behaves like a cache, " - "by evicting entries when getting close to maxmemory limit"); ABSL_DECLARE_FLAG(string, requirepass); diff --git a/src/server/replica.cc b/src/server/replica.cc index 9535a9d8e..a304bbbb4 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -79,13 +79,6 @@ error_code Recv(FiberSocketBase* input, base::IoBuf* dest) { constexpr unsigned kRdbEofMarkSize = 40; -// TODO: to remove usages of this macro and make code crash-less. -#define CHECK_EC(x) \ - do { \ - auto __ec$ = (x); \ - CHECK(!__ec$) << "Error: " << __ec$ << " " << __ec$.message() << " for " << #x; \ - } while (false) - } // namespace Replica::Replica(string host, uint16_t port, Service* se) @@ -510,7 +503,7 @@ error_code Replica::ConsumeRedisStream() { // Master waits for this command in order to start sending replication stream. serializer.SendCommand("REPLCONF ACK 0"); - CHECK_EC(serializer.ec()); + RETURN_ON_ERR(serializer.ec()); VLOG(1) << "Before reading repl-log"; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 5846969ca..cd50f0575 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -25,8 +25,10 @@ extern "C" { #include "server/command_registry.h" #include "server/conn_context.h" #include "server/debugcmd.h" +#include "server/dflycmd.h" #include "server/engine_shard_set.h" #include "server/error.h" +#include "server/journal/journal.h" #include "server/main_service.h" #include "server/rdb_load.h" #include "server/rdb_save.h" @@ -150,6 +152,7 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) { lsinfo_ = make_shared(); lsinfo_->save_time = start_time_; script_mgr_.reset(new ScriptMgr()); + journal_.reset(new journal::Journal); } ServerFamily::~ServerFamily() { @@ -159,6 +162,7 @@ void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* m CHECK(acceptor_ == nullptr); acceptor_ = acceptor; main_listener_ = main_listener; + dfly_cmd_.reset(new DflyCmd(main_listener, journal_.get())); pb_task_ = shard_set->pool()->GetNextProactor(); auto cache_cb = [] { @@ -207,6 +211,11 @@ void ServerFamily::Shutdown() { pb_task_->CancelPeriodic(stats_caching_task_); stats_caching_task_ = 0; + if (journal_->EnterLameDuck()) { + auto ec = journal_->Close(); + LOG_IF(ERROR, ec) << "Error closing journal " << ec; + } + unique_lock lk(replicaof_mu_); if (replica_) { replica_->Stop(); @@ -1122,6 +1131,10 @@ void ServerFamily::SyncGeneric(std::string_view repl_master_id, uint64_t offs, // TBD. } +void ServerFamily::Dfly(CmdArgList args, ConnectionContext* cntx) { + dfly_cmd_->Run(args, cntx); +} + #define HFUNC(x) SetHandler(HandlerFunc(this, &ServerFamily::x)) void ServerFamily::Register(CommandRegistry* registry) { @@ -1148,7 +1161,8 @@ void ServerFamily::Register(CommandRegistry* registry) { << CI{"ROLE", CO::LOADING | CO::FAST | CO::NOSCRIPT, 1, 0, 0, 0}.HFUNC(Role) << CI{"SYNC", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Sync) << CI{"PSYNC", CO::ADMIN | CO::GLOBAL_TRANS, 3, 0, 0, 0}.HFUNC(Psync) - << CI{"SCRIPT", CO::NOSCRIPT, -2, 0, 0, 0}.HFUNC(Script); + << CI{"SCRIPT", CO::NOSCRIPT, -2, 0, 0, 0}.HFUNC(Script) + << CI{"DFLY", CO::ADMIN | CO::GLOBAL_TRANS, -2, 0, 0, 0}.HFUNC(Dfly); } } // namespace dfly diff --git a/src/server/server_family.h b/src/server/server_family.h index 5135e84fb..43d133bbf 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -17,8 +17,13 @@ class HttpListenerBase; namespace dfly { +namespace journal { +class Journal; +} // namespace journal + class ConnectionContext; class CommandRegistry; +class DflyCmd; class Service; class Replica; class ScriptMgr; @@ -41,9 +46,9 @@ struct Metrics { }; struct LastSaveInfo { - time_t save_time; // epoch time in seconds. - std::string file_name; // - std::vector> freq_map; // RDB_TYPE_xxx -> count mapping. + time_t save_time; // epoch time in seconds. + std::string file_name; // + std::vector> freq_map; // RDB_TYPE_xxx -> count mapping. }; class ServerFamily { @@ -91,6 +96,7 @@ class ServerFamily { void Config(CmdArgList args, ConnectionContext* cntx); void DbSize(CmdArgList args, ConnectionContext* cntx); void Debug(CmdArgList args, ConnectionContext* cntx); + void Dfly(CmdArgList args, ConnectionContext* cntx); void Memory(CmdArgList args, ConnectionContext* cntx); void FlushDb(CmdArgList args, ConnectionContext* cntx); void FlushAll(CmdArgList args, ConnectionContext* cntx); @@ -111,7 +117,6 @@ class ServerFamily { void Load(const std::string& file_name); - boost::fibers::fiber load_fiber_; uint32_t stats_caching_task_ = 0; @@ -125,6 +130,8 @@ class ServerFamily { std::shared_ptr replica_; // protected by replica_of_mu_ std::unique_ptr script_mgr_; + std::unique_ptr journal_; + std::unique_ptr dfly_cmd_; time_t start_time_ = 0; // in seconds, epoch time. diff --git a/src/server/server_state.h b/src/server/server_state.h index 1c4bcd873..34261fb7a 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -15,6 +15,10 @@ typedef struct mi_heap_s mi_heap_t; namespace dfly { +namespace journal { +class Journal; +} // namespace journal + // Present in every server thread. This class differs from EngineShard. The latter manages // state around engine shards while the former represents coordinator/connection state. // There may be threads that handle engine shards but not IO, there may be threads that handle IO @@ -58,6 +62,7 @@ class ServerState { // public struct - to allow initialization. GlobalState gstate() const { return gstate_; } + void set_gstate(GlobalState s) { gstate_ = s; } @@ -66,7 +71,9 @@ class ServerState { // public struct - to allow initialization. // Returns sum of all requests in the last 6 seconds // (not including the current one). - uint32_t MovingSum6() const { return qps_.SumTail(); } + uint32_t MovingSum6() const { + return qps_.SumTail(); + } void RecordCmd() { ++connection_stats.command_cnt; @@ -78,9 +85,18 @@ class ServerState { // public struct - to allow initialization. return data_heap_; } + journal::Journal* journal() { + return journal_; + } + + void set_journal(journal::Journal* j) { + journal_ = j; + } + private: int64_t live_transactions_ = 0; mi_heap_t* data_heap_; + journal::Journal* journal_ = nullptr; std::optional interpreter_; GlobalState gstate_ = GlobalState::ACTIVE; diff --git a/src/server/set_family.cc b/src/server/set_family.cc index 0cca3357e..5a1c2e40d 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -452,7 +452,7 @@ OpStatus Mover::OpMutate(Transaction* t, EngineShard* es) { ArgSlice largs = t->ShardArgsInShard(es->shard_id()); DCHECK_LE(largs.size(), 2u); - OpArgs op_args{es, t->db_index()}; + OpArgs op_args = t->GetOpArgs(es); for (auto k : largs) { if (k == src_) { CHECK_EQ(1u, OpRem(op_args, k, {member_}).value()); // must succeed. @@ -677,8 +677,7 @@ void SetFamily::SAdd(CmdArgList args, ConnectionContext* cntx) { ArgSlice arg_slice{vals.data(), vals.size()}; auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpAdd(op_args, key, arg_slice, false); + return OpAdd(t->GetOpArgs(shard), key, arg_slice, false); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -741,7 +740,7 @@ void SetFamily::SRem(CmdArgList args, ConnectionContext* cntx) { ArgSlice span{vals.data(), vals.size()}; auto cb = [&](Transaction* t, EngineShard* shard) { - return OpRem(OpArgs{shard, t->db_index()}, key, span); + return OpRem(t->GetOpArgs(shard), key, span); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -791,7 +790,7 @@ void SetFamily::SPop(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpPop(OpArgs{shard, t->db_index()}, key, count); + return OpPop(t->GetOpArgs(shard), key, count); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -821,9 +820,9 @@ void SetFamily::SDiff(CmdArgList args, ConnectionContext* cntx) { ArgSlice largs = t->ShardArgsInShard(shard->shard_id()); if (shard->shard_id() == src_shard) { CHECK_EQ(src_key, largs.front()); - result_set[shard->shard_id()] = OpDiff(OpArgs{shard, t->db_index()}, largs); + result_set[shard->shard_id()] = OpDiff(t->GetOpArgs(shard), largs); } else { - result_set[shard->shard_id()] = OpUnion(OpArgs{shard, t->db_index()}, largs); + result_set[shard->shard_id()] = OpUnion(t->GetOpArgs(shard), largs); } return OpStatus::OK; @@ -864,7 +863,7 @@ void SetFamily::SDiffStore(CmdArgList args, ConnectionContext* cntx) { return OpStatus::OK; } - OpArgs op_args{shard, t->db_index()}; + OpArgs op_args = t->GetOpArgs(shard); if (shard->shard_id() == src_shard) { CHECK_EQ(src_key, largs.front()); result_set[shard->shard_id()] = OpDiff(op_args, largs); // Diff @@ -887,7 +886,7 @@ void SetFamily::SDiffStore(CmdArgList args, ConnectionContext* cntx) { SvArray result = ToSvArray(rsv.value()); auto store_cb = [&](Transaction* t, EngineShard* shard) { if (shard->shard_id() == dest_shard) { - OpAdd(OpArgs{shard, t->db_index()}, dest_key, result, true); + OpAdd(t->GetOpArgs(shard), dest_key, result, true); } return OpStatus::OK; @@ -966,7 +965,7 @@ void SetFamily::SInterStore(CmdArgList args, ConnectionContext* cntx) { auto store_cb = [&](Transaction* t, EngineShard* shard) { if (shard->shard_id() == dest_shard) { - OpAdd(OpArgs{shard, t->db_index()}, dest_key, result.value(), true); + OpAdd(t->GetOpArgs(shard), dest_key, result.value(), true); } return OpStatus::OK; @@ -981,7 +980,7 @@ void SetFamily::SUnion(CmdArgList args, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) { ArgSlice largs = t->ShardArgsInShard(shard->shard_id()); - result_set[shard->shard_id()] = OpUnion(OpArgs{shard, t->db_index()}, largs); + result_set[shard->shard_id()] = OpUnion(t->GetOpArgs(shard), largs); return OpStatus::OK; }; @@ -1012,7 +1011,7 @@ void SetFamily::SUnionStore(CmdArgList args, ConnectionContext* cntx) { if (largs.empty()) return OpStatus::OK; } - result_set[shard->shard_id()] = OpUnion(OpArgs{shard, t->db_index()}, largs); + result_set[shard->shard_id()] = OpUnion(t->GetOpArgs(shard), largs); return OpStatus::OK; }; @@ -1030,7 +1029,7 @@ void SetFamily::SUnionStore(CmdArgList args, ConnectionContext* cntx) { auto store_cb = [&](Transaction* t, EngineShard* shard) { if (shard->shard_id() == dest_shard) { - OpAdd(OpArgs{shard, t->db_index()}, dest_key, result, true); + OpAdd(t->GetOpArgs(shard), dest_key, result, true); } return OpStatus::OK; @@ -1055,7 +1054,7 @@ void SetFamily::SScan(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpScan(OpArgs{shard, t->db_index()}, key, &cursor); + return OpScan(t->GetOpArgs(shard), key, &cursor); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index f8ca84583..3caa9a9c6 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -494,8 +494,7 @@ void CreateGroup(CmdArgList args, string_view key, ConnectionContext* cntx) { opts.id = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpCreate(op_args, key, opts); + return OpCreate(t->GetOpArgs(shard), key, opts); }; OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -509,8 +508,7 @@ void CreateGroup(CmdArgList args, string_view key, ConnectionContext* cntx) { void DestroyGroup(string_view key, string_view gname, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpDestroyGroup(op_args, key, gname); + return OpDestroyGroup(t->GetOpArgs(shard), key, gname); }; OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -529,8 +527,7 @@ void DestroyGroup(string_view key, string_view gname, ConnectionContext* cntx) { void DelConsumer(string_view key, string_view gname, string_view consumer, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpDelConsumer(op_args, key, gname, consumer); + return OpDelConsumer(t->GetOpArgs(shard), key, gname, consumer); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -551,8 +548,7 @@ void SetId(string_view key, string_view gname, CmdArgList args, ConnectionContex string_view id = ArgS(args, 0); auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpSetId(op_args, key, gname, id); + return OpSetId(t->GetOpArgs(shard), key, gname, id); }; OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -607,8 +603,7 @@ void StreamFamily::XAdd(CmdArgList args, ConnectionContext* cntx) { args.remove_prefix(1); auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpAdd(op_args, key, add_opts, args); + return OpAdd(t->GetOpArgs(shard), key, add_opts, args); }; OpResult add_result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -641,8 +636,7 @@ void StreamFamily::XDel(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpDel(op_args, key, absl::Span{ids.data(), ids.size()}); + return OpDel(t->GetOpArgs(shard), key, absl::Span{ids.data(), ids.size()}); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -751,8 +745,7 @@ void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) { void StreamFamily::XLen(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpLen(op_args, key); + return OpLen(t->GetOpArgs(shard), key); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -781,8 +774,7 @@ void StreamFamily::XSetId(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpSetId2(op_args, key, parsed_id.val); + return OpSetId2(t->GetOpArgs(shard), key, parsed_id.val); }; OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -835,8 +827,7 @@ void StreamFamily::XRangeGeneric(CmdArgList args, bool is_rev, ConnectionContext range_opts.is_rev = is_rev; auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpRange(op_args, key, range_opts); + return OpRange(t->GetOpArgs(shard), key, range_opts); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -876,4 +867,4 @@ void StreamFamily::Register(CommandRegistry* registry) { << CI{"XSETID", CO::WRITE | CO::DENYOOM, 3, 1, 1, 1}.HFUNC(XSetId); } -} // namespace dfly \ No newline at end of file +} // namespace dfly diff --git a/src/server/string_family.cc b/src/server/string_family.cc index c0eb64a86..da340fe4f 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -17,6 +17,7 @@ extern "C" { #include "server/engine_shard_set.h" #include "server/error.h" #include "server/io_mgr.h" +#include "server/journal/journal.h" #include "server/tiered_storage.h" #include "server/transaction.h" #include "util/varz.h" @@ -60,6 +61,12 @@ string_view GetSlice(EngineShard* shard, const PrimeValue& pv, string* tmp) { return pv.GetSlice(tmp); } +inline void RecordJournal(const OpArgs& op_args, const PrimeKey& pkey, const PrimeKey& pvalue) { + if (op_args.shard->journal()) { + op_args.shard->journal()->RecordEntry(op_args.txid, pkey, pvalue); + } +} + OpResult OpSetRange(const OpArgs& op_args, string_view key, size_t start, string_view value) { auto& db_slice = op_args.shard->db_slice(); @@ -90,9 +97,11 @@ OpResult OpSetRange(const OpArgs& op_args, string_view key, size_t sta db_slice.PreUpdate(op_args.db_ind, it); } + memcpy(s.data() + start, value.data(), value.size()); it->second.SetString(s); db_slice.PostUpdate(op_args.db_ind, it); + RecordJournal(op_args, it->first, it->second); return it->second.Size(); } @@ -129,110 +138,300 @@ OpResult OpGetRange(const OpArgs& op_args, string_view key, int32_t star return string(slice.substr(start, end - start + 1)); }; -} // namespace +// Returns the length of the extended string. if prepend is false - appends the val. +OpResult ExtendOrSet(const OpArgs& op_args, std::string_view key, std::string_view val, + bool prepend) { + auto* shard = op_args.shard; + auto& db_slice = shard->db_slice(); + auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key); + if (inserted) { + it->second.SetString(val); + db_slice.PostUpdate(op_args.db_ind, it); + RecordJournal(op_args, it->first, it->second); -SetCmd::SetCmd(DbSlice* db_slice) : db_slice_(*db_slice) { + return val.size(); + } + + if (it->second.ObjType() != OBJ_STRING) + return OpStatus::WRONG_TYPE; + + string tmp, new_val; + string_view slice = GetSlice(op_args.shard, it->second, &tmp); + if (prepend) + new_val = absl::StrCat(val, slice); + else + new_val = absl::StrCat(slice, val); + + db_slice.PreUpdate(op_args.db_ind, it); + it->second.SetString(new_val); + db_slice.PostUpdate(op_args.db_ind, it); + RecordJournal(op_args, it->first, it->second); + + return new_val.size(); } -SetCmd::~SetCmd() { +OpResult ExtendOrSkip(const OpArgs& op_args, std::string_view key, std::string_view val, + bool prepend) { + auto& db_slice = op_args.shard->db_slice(); + OpResult it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING); + if (!it_res) { + return false; + } + + CompactObj& cobj = (*it_res)->second; + + string tmp, new_val; + string_view slice = GetSlice(op_args.shard, cobj, &tmp); + if (prepend) + new_val = absl::StrCat(val, slice); + else + new_val = absl::StrCat(slice, val); + + db_slice.PreUpdate(op_args.db_ind, *it_res); + cobj.SetString(new_val); + db_slice.PostUpdate(op_args.db_ind, *it_res); + + return new_val.size(); } -OpResult SetCmd::Set(const SetParams& params, std::string_view key, std::string_view value) { - DCHECK_LT(params.db_index, db_slice_.db_array_size()); - DCHECK(db_slice_.IsDbValid(params.db_index)); +OpResult OpGet(const OpArgs& op_args, string_view key) { + OpResult it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_STRING); + if (!it_res.ok()) + return it_res.status(); - VLOG(2) << "Set " << key << "(" << db_slice_.shard_id() << ") "; + const PrimeValue& pv = it_res.value()->second; - if (params.how == SET_IF_EXISTS) { - auto [it, expire_it] = db_slice_.FindExt(params.db_index, key); + return GetString(op_args.shard, pv); +} - if (IsValid(it)) { // existing - return SetExisting(params, it, expire_it, value); +OpResult OpIncrFloat(const OpArgs& op_args, std::string_view key, double val) { + auto& db_slice = op_args.shard->db_slice(); + auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key); + + char buf[128]; + + if (inserted) { + char* str = RedisReplyBuilder::FormatDouble(val, buf, sizeof(buf)); + it->second.SetString(str); + db_slice.PostUpdate(op_args.db_ind, it); + RecordJournal(op_args, it->first, it->second); + + return val; + } + + if (it->second.ObjType() != OBJ_STRING) + return OpStatus::WRONG_TYPE; + + if (it->second.Size() == 0) + return OpStatus::INVALID_FLOAT; + + string tmp; + string_view slice = GetSlice(op_args.shard, it->second, &tmp); + + StringToDoubleConverter stod(StringToDoubleConverter::NO_FLAGS, 0, 0, NULL, NULL); + int processed_digits = 0; + double base = stod.StringToDouble(slice.data(), slice.size(), &processed_digits); + if (unsigned(processed_digits) != slice.size()) { + return OpStatus::INVALID_FLOAT; + } + + base += val; + + if (isnan(base) || isinf(base)) { + return OpStatus::INVALID_FLOAT; + } + + char* str = RedisReplyBuilder::FormatDouble(base, buf, sizeof(buf)); + + db_slice.PreUpdate(op_args.db_ind, it); + it->second.SetString(str); + db_slice.PostUpdate(op_args.db_ind, it); + RecordJournal(op_args, it->first, it->second); + + return base; +} + +// if skip_on_missing - returns KEY_NOTFOUND. +OpResult OpIncrBy(const OpArgs& op_args, std::string_view key, int64_t incr, + bool skip_on_missing) { + auto& db_slice = op_args.shard->db_slice(); + + // we avoid using AddOrFind because of skip_on_missing option for memcache. + auto [it, expire_it] = db_slice.FindExt(op_args.db_ind, key); + + if (!IsValid(it)) { + if (skip_on_missing) + return OpStatus::KEY_NOTFOUND; + + CompactObj cobj; + cobj.SetInt(incr); + + // AddNew calls PostUpdate inside. + try { + it = db_slice.AddNew(op_args.db_ind, key, std::move(cobj), 0); + } catch (bad_alloc&) { + return OpStatus::OUT_OF_MEMORY; } - return OpStatus::SKIPPED; + + RecordJournal(op_args, it->first, it->second); + + return incr; } - // New entry - tuple add_res; - try { - add_res = db_slice_.AddOrFind2(params.db_index, key); - } catch (bad_alloc& e) { - return OpStatus::OUT_OF_MEMORY; + if (it->second.ObjType() != OBJ_STRING) { + return OpStatus::WRONG_TYPE; } - PrimeIterator it = get<0>(add_res); - if (!get<2>(add_res)) { - return SetExisting(params, it, get<1>(add_res), value); + auto opt_prev = it->second.TryGetInt(); + if (!opt_prev) { + return OpStatus::INVALID_VALUE; } - // adding new value. - PrimeValue tvalue{value}; - tvalue.SetFlag(params.memcache_flags != 0); - it->second = std::move(tvalue); - db_slice_.PostUpdate(params.db_index, it); - - if (params.expire_after_ms) { - db_slice_.UpdateExpire(params.db_index, it, params.expire_after_ms + db_slice_.Now()); + long long prev = *opt_prev; + if ((incr < 0 && prev < 0 && incr < (LLONG_MIN - prev)) || + (incr > 0 && prev > 0 && incr > (LLONG_MAX - prev))) { + return OpStatus::OUT_OF_RANGE; } - if (params.memcache_flags) - db_slice_.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags); + int64_t new_val = prev + incr; + DCHECK(!it->second.IsExternal()); + db_slice.PreUpdate(op_args.db_ind, it); + it->second.SetInt(new_val); + db_slice.PostUpdate(op_args.db_ind, it); + RecordJournal(op_args, it->first, it->second); - EngineShard* shard = db_slice_.shard_owner(); + return new_val; +} - if (shard->tiered_storage()) { // external storage enabled. - if (value.size() >= kMinTieredLen) { - shard->tiered_storage()->UnloadItem(params.db_index, it); +// Returns true if keys were set, false otherwise. +OpStatus OpMSet(const OpArgs& op_args, ArgSlice args) { + DCHECK(!args.empty() && args.size() % 2 == 0); + + SetCmd::SetParams params{op_args.db_ind}; + SetCmd sg(op_args); + + for (size_t i = 0; i < args.size(); i += 2) { + DVLOG(1) << "MSet " << args[i] << ":" << args[i + 1]; + OpStatus res = sg.Set(params, args[i], args[i + 1]); + if (res != OpStatus::OK) { // OOM for example. + return res; } } return OpStatus::OK; } +} // namespace + +OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value) { + EngineShard* shard = op_args_.shard; + auto& db_slice = shard->db_slice(); + + DCHECK_LT(params.db_index, db_slice.db_array_size()); + DCHECK(db_slice.IsDbValid(params.db_index)); + + VLOG(2) << "Set " << key << "(" << db_slice.shard_id() << ") "; + + if (params.how == SET_IF_EXISTS) { + auto [it, expire_it] = db_slice.FindExt(params.db_index, key); + + if (IsValid(it)) { // existing + return SetExisting(params, it, expire_it, value); + } + + return OpStatus::SKIPPED; + } + + // Trying to add a new entry. + tuple add_res; + try { + add_res = db_slice.AddOrFind2(params.db_index, key); + } catch (bad_alloc& e) { + return OpStatus::OUT_OF_MEMORY; + } + + PrimeIterator it = get<0>(add_res); + if (!get<2>(add_res)) { // Existing. + return SetExisting(params, it, get<1>(add_res), value); + } + + // + // Adding new value. + PrimeValue tvalue{value}; + tvalue.SetFlag(params.memcache_flags != 0); + it->second = std::move(tvalue); + db_slice.PostUpdate(params.db_index, it); + + if (params.expire_after_ms) { + db_slice.UpdateExpire(params.db_index, it, params.expire_after_ms + db_slice.Now()); + } + + if (params.memcache_flags) + db_slice.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags); + + if (shard->tiered_storage()) { // external storage enabled. + // TODO: we may have a bug if we block the fiber inside UnloadItem - "it" may be invalid + // afterwards. + if (value.size() >= kMinTieredLen) { + shard->tiered_storage()->UnloadItem(params.db_index, it); + } + } + + RecordJournal(op_args_, it->first, it->second); + return OpStatus::OK; +} + OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIterator e_it, - std::string_view value) { + string_view value) { if (params.how == SET_IF_NOTEXIST) return OpStatus::SKIPPED; PrimeValue& prime_value = it->second; + EngineShard* shard = op_args_.shard; + if (params.prev_val) { if (prime_value.ObjType() != OBJ_STRING) return OpStatus::WRONG_TYPE; - string val = GetString(db_slice_.shard_owner(), prime_value); + string val = GetString(shard, prime_value); params.prev_val->emplace(move(val)); } - uint64_t at_ms = params.expire_after_ms ? params.expire_after_ms + db_slice_.Now() : 0; + DbSlice& db_slice = shard->db_slice(); + uint64_t at_ms = params.expire_after_ms ? params.expire_after_ms + db_slice.Now() : 0; if (IsValid(e_it) && at_ms) { - e_it->second = db_slice_.FromAbsoluteTime(at_ms); + e_it->second = db_slice.FromAbsoluteTime(at_ms); } else { - bool changed = db_slice_.UpdateExpire(params.db_index, it, at_ms); + // We need to update expiry, or maybe erase the object if it was expired. + bool changed = db_slice.UpdateExpire(params.db_index, it, at_ms); if (changed && at_ms == 0) // erased. - return OpStatus::OK; + return OpStatus::OK; // TODO: to update journal with deletion. } - db_slice_.PreUpdate(params.db_index, it); + db_slice.PreUpdate(params.db_index, it); // Check whether we need to update flags table. bool req_flag_update = (params.memcache_flags != 0) != prime_value.HasFlag(); if (req_flag_update) { prime_value.SetFlag(params.memcache_flags != 0); - db_slice_.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags); + db_slice.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags); } // overwrite existing entry. prime_value.SetString(value); if (value.size() >= kMinTieredLen) { // external storage enabled. - EngineShard* shard = db_slice_.shard_owner(); + // TODO: if UnloadItem can block the calling fiber, then we have the bug because then "it" + // can be invalid after the function returns and the functions that follow may access invalid + // entry. if (shard->tiered_storage()) { shard->tiered_storage()->UnloadItem(params.db_index, it); } } - db_slice_.PostUpdate(params.db_index, it); + db_slice.PostUpdate(params.db_index, it); + RecordJournal(op_args_, it->first, it->second); return OpStatus::OK; } @@ -261,7 +460,7 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) { builder->SendError(kSyntaxErr); } - std::string_view ex = ArgS(args, i); + string_view ex = ArgS(args, i); if (!absl::SimpleAtoi(ex, &int_arg)) { return builder->SendError(kInvalidIntErr); } @@ -291,9 +490,8 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) { DCHECK(cntx->transaction); auto cb = [&](Transaction* t, EngineShard* shard) { - SetCmd sg(&shard->db_slice()); - auto status = sg.Set(sparams, key, value).status(); - return status; + SetCmd sg(t->GetOpArgs(shard)); + return sg.Set(sparams, key, value); }; OpResult result = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -319,9 +517,7 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) { std::string_view key = ArgS(args, 1); - auto cb = [&](Transaction* t, EngineShard* shard) { - return OpGet(OpArgs{shard, t->db_index()}, key); - }; + auto cb = [&](Transaction* t, EngineShard* shard) { return OpGet(t->GetOpArgs(shard), key); }; DVLOG(1) << "Before Get::ScheduleSingleHopT " << key; Transaction* trans = cntx->transaction; @@ -350,16 +546,15 @@ void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) { SetCmd::SetParams sparams{cntx->db_index()}; sparams.prev_val = &prev_val; - ShardId sid = Shard(key, shard_set->size()); - OpResult result = shard_set->Await(sid, [&] { - EngineShard* es = EngineShard::tlocal(); - SetCmd cmd(&es->db_slice()); + auto cb = [&](Transaction* t, EngineShard* shard) { + SetCmd cmd(t->GetOpArgs(shard)); return cmd.Set(sparams, key, value); - }); + }; + OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); - if (!result) { - (*cntx)->SendError(result.status()); + if (status != OpStatus::OK) { + (*cntx)->SendError(status); return; } @@ -367,6 +562,7 @@ void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) { (*cntx)->SendBulkString(*prev_val); return; } + return (*cntx)->SendNull(); } @@ -398,7 +594,7 @@ void StringFamily::IncrByFloat(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpIncrFloat(OpArgs{shard, t->db_index()}, key, val); + return OpIncrFloat(t->GetOpArgs(shard), key, val); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -444,7 +640,7 @@ void StringFamily::IncrByGeneric(std::string_view key, int64_t val, ConnectionCo bool skip_on_missing = cntx->protocol() == Protocol::MEMCACHE; auto cb = [&](Transaction* t, EngineShard* shard) { - OpResult res = OpIncrBy(OpArgs{shard, t->db_index()}, key, val, skip_on_missing); + OpResult res = OpIncrBy(t->GetOpArgs(shard), key, val, skip_on_missing); return res; }; @@ -477,7 +673,7 @@ void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContex if (cntx->protocol() == Protocol::REDIS) { auto cb = [&](Transaction* t, EngineShard* shard) { - return ExtendOrSet(OpArgs{shard, t->db_index()}, key, sval, prepend); + return ExtendOrSet(t->GetOpArgs(shard), key, sval, prepend); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -489,7 +685,7 @@ void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContex DCHECK(cntx->protocol() == Protocol::MEMCACHE); auto cb = [&](Transaction* t, EngineShard* shard) { - return ExtendOrSkip(OpArgs{shard, t->db_index()}, key, sval, prepend); + return ExtendOrSkip(t->GetOpArgs(shard), key, sval, prepend); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -524,9 +720,8 @@ void StringFamily::SetExGeneric(bool seconds, CmdArgList args, ConnectionContext sparams.expire_after_ms = unit_vals; auto cb = [&](Transaction* t, EngineShard* shard) { - SetCmd sg(&shard->db_slice()); - auto status = sg.Set(sparams, key, value).status(); - return status; + SetCmd sg(t->GetOpArgs(shard)); + return sg.Set(sparams, key, value); }; OpResult result = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -600,9 +795,9 @@ void StringFamily::MSet(CmdArgList args, ConnectionContext* cntx) { LOG(INFO) << "MSET/" << transaction->unique_shard_cnt() << str; } - auto cb = [&](Transaction* t, EngineShard* es) { - auto args = t->ShardArgsInShard(es->shard_id()); - return OpMSet(OpArgs{es, t->db_index()}, args); + auto cb = [&](Transaction* t, EngineShard* shard) { + auto args = t->ShardArgsInShard(shard->shard_id()); + return OpMSet(t->GetOpArgs(shard), args); }; OpStatus status = transaction->ScheduleSingleHop(std::move(cb)); @@ -635,12 +830,13 @@ void StringFamily::MSetNx(CmdArgList args, ConnectionContext* cntx) { transaction->Execute(std::move(cb), false); bool to_skip = exists.load(memory_order_relaxed) == true; - auto epilog_cb = [&](Transaction* t, EngineShard* es) { + + auto epilog_cb = [&](Transaction* t, EngineShard* shard) { if (to_skip) return OpStatus::OK; - auto args = t->ShardArgsInShard(es->shard_id()); - return OpMSet(OpArgs{es, t->db_index()}, std::move(args)); + auto args = t->ShardArgsInShard(shard->shard_id()); + return OpMSet(t->GetOpArgs(shard), std::move(args)); }; transaction->Execute(std::move(epilog_cb), true); @@ -680,7 +876,7 @@ void StringFamily::GetRange(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpGetRange(OpArgs{shard, t->db_index()}, key, start, end); + return OpGetRange(t->GetOpArgs(shard), key, start, end); }; Transaction* trans = cntx->transaction; @@ -713,7 +909,7 @@ void StringFamily::SetRange(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - return OpSetRange(OpArgs{shard, t->db_index()}, key, start, value); + return OpSetRange(t->GetOpArgs(shard), key, start, value); }; Transaction* trans = cntx->transaction; @@ -758,176 +954,6 @@ auto StringFamily::OpMGet(bool fetch_mcflag, bool fetch_mcver, const Transaction return response; } -OpStatus StringFamily::OpMSet(const OpArgs& op_args, ArgSlice args) { - DCHECK(!args.empty() && args.size() % 2 == 0); - - SetCmd::SetParams params{op_args.db_ind}; - SetCmd sg(&op_args.shard->db_slice()); - - for (size_t i = 0; i < args.size(); i += 2) { - DVLOG(1) << "MSet " << args[i] << ":" << args[i + 1]; - auto res = sg.Set(params, args[i], args[i + 1]); - if (!res) { // OOM for example. - return res.status(); - } - } - - return OpStatus::OK; -} - -OpResult StringFamily::OpIncrBy(const OpArgs& op_args, std::string_view key, int64_t incr, - bool skip_on_missing) { - auto& db_slice = op_args.shard->db_slice(); - - // we avoid using AddOrFind because of skip_on_missing option for memcache. - auto [it, expire_it] = db_slice.FindExt(op_args.db_ind, key); - - if (!IsValid(it)) { - if (skip_on_missing) - return OpStatus::KEY_NOTFOUND; - - CompactObj cobj; - cobj.SetInt(incr); - - try { - db_slice.AddNew(op_args.db_ind, key, std::move(cobj), 0); - } catch (bad_alloc&) { - return OpStatus::OUT_OF_MEMORY; - } - return incr; - } - - if (it->second.ObjType() != OBJ_STRING) { - return OpStatus::WRONG_TYPE; - } - - auto opt_prev = it->second.TryGetInt(); - if (!opt_prev) { - return OpStatus::INVALID_VALUE; - } - - long long prev = *opt_prev; - if ((incr < 0 && prev < 0 && incr < (LLONG_MIN - prev)) || - (incr > 0 && prev > 0 && incr > (LLONG_MAX - prev))) { - return OpStatus::OUT_OF_RANGE; - } - - int64_t new_val = prev + incr; - DCHECK(!it->second.IsExternal()); - db_slice.PreUpdate(op_args.db_ind, it); - it->second.SetInt(new_val); - db_slice.PostUpdate(op_args.db_ind, it); - return new_val; -} - -OpResult StringFamily::OpIncrFloat(const OpArgs& op_args, std::string_view key, - double val) { - auto& db_slice = op_args.shard->db_slice(); - auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key); - - char buf[128]; - - if (inserted) { - char* str = RedisReplyBuilder::FormatDouble(val, buf, sizeof(buf)); - it->second.SetString(str); - db_slice.PostUpdate(op_args.db_ind, it); - - return val; - } - - if (it->second.ObjType() != OBJ_STRING) - return OpStatus::WRONG_TYPE; - - if (it->second.Size() == 0) - return OpStatus::INVALID_FLOAT; - - string tmp; - string_view slice = GetSlice(op_args.shard, it->second, &tmp); - - StringToDoubleConverter stod(StringToDoubleConverter::NO_FLAGS, 0, 0, NULL, NULL); - int processed_digits = 0; - double base = stod.StringToDouble(slice.data(), slice.size(), &processed_digits); - if (unsigned(processed_digits) != slice.size()) { - return OpStatus::INVALID_FLOAT; - } - - base += val; - - if (isnan(base) || isinf(base)) { - return OpStatus::INVALID_FLOAT; - } - - char* str = RedisReplyBuilder::FormatDouble(base, buf, sizeof(buf)); - - db_slice.PreUpdate(op_args.db_ind, it); - it->second.SetString(str); - db_slice.PostUpdate(op_args.db_ind, it); - - return base; -} - -OpResult StringFamily::ExtendOrSet(const OpArgs& op_args, std::string_view key, - std::string_view val, bool prepend) { - auto& db_slice = op_args.shard->db_slice(); - auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key); - if (inserted) { - it->second.SetString(val); - db_slice.PostUpdate(op_args.db_ind, it); - - return val.size(); - } - - if (it->second.ObjType() != OBJ_STRING) - return OpStatus::WRONG_TYPE; - - string tmp, new_val; - string_view slice = GetSlice(op_args.shard, it->second, &tmp); - if (prepend) - new_val = absl::StrCat(val, slice); - else - new_val = absl::StrCat(slice, val); - - db_slice.PreUpdate(op_args.db_ind, it); - it->second.SetString(new_val); - db_slice.PostUpdate(op_args.db_ind, it); - - return new_val.size(); -} - -OpResult StringFamily::ExtendOrSkip(const OpArgs& op_args, std::string_view key, - std::string_view val, bool prepend) { - auto& db_slice = op_args.shard->db_slice(); - OpResult it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING); - if (!it_res) { - return false; - } - - CompactObj& cobj = (*it_res)->second; - - string tmp, new_val; - string_view slice = GetSlice(op_args.shard, cobj, &tmp); - if (prepend) - new_val = absl::StrCat(val, slice); - else - new_val = absl::StrCat(slice, val); - - db_slice.PreUpdate(op_args.db_ind, *it_res); - cobj.SetString(new_val); - db_slice.PostUpdate(op_args.db_ind, *it_res); - - return new_val.size(); -} - -OpResult StringFamily::OpGet(const OpArgs& op_args, string_view key) { - OpResult it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_STRING); - if (!it_res.ok()) - return it_res.status(); - - const PrimeValue& pv = it_res.value()->second; - - return GetString(op_args.shard, pv); -} - void StringFamily::Init(util::ProactorPool* pp) { set_qps.Init(pp); get_qps.Init(pp); diff --git a/src/server/string_family.h b/src/server/string_family.h index 8e1aad17e..685d4efd3 100644 --- a/src/server/string_family.h +++ b/src/server/string_family.h @@ -16,11 +16,10 @@ using facade::OpResult; using facade::OpStatus; class SetCmd { - DbSlice& db_slice_; + const OpArgs op_args_; public: - explicit SetCmd(DbSlice* db_slice); - ~SetCmd(); + explicit SetCmd(const OpArgs& op_args) : op_args_(op_args) {} enum SetHow { SET_ALWAYS, SET_IF_NOTEXIST, SET_IF_EXISTS }; @@ -38,7 +37,7 @@ class SetCmd { } }; - OpResult Set(const SetParams& params, std::string_view key, std::string_view value); + OpStatus Set(const SetParams& params, std::string_view key, std::string_view value); private: OpStatus SetExisting(const SetParams& params, PrimeIterator it, ExpireIterator e_it, @@ -86,24 +85,6 @@ class StringFamily { using MGetResponse = std::vector>; static MGetResponse OpMGet(bool fetch_mcflag, bool fetch_mcver, const Transaction* t, EngineShard* shard); - - // Returns true if keys were set, false otherwise. - static OpStatus OpMSet(const OpArgs& op_args, ArgSlice args); - - // if skip_on_missing - returns KEY_NOTFOUND. - static OpResult OpIncrBy(const OpArgs& op_args, std::string_view key, int64_t val, - bool skip_on_missing); - static OpResult OpIncrFloat(const OpArgs& op_args, std::string_view key, double val); - - // Returns the length of the extended string. if prepend is false - appends the val. - static OpResult ExtendOrSet(const OpArgs& op_args, std::string_view key, - std::string_view val, bool prepend); - - // Returns true if was extended, false if the key was not found. - static OpResult ExtendOrSkip(const OpArgs& op_args, std::string_view key, - std::string_view val, bool prepend); - - static OpResult OpGet(const OpArgs& op_args, std::string_view key); }; } // namespace dfly diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 950126729..09881009a 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -11,6 +11,8 @@ #include "server/command_registry.h" #include "server/db_slice.h" #include "server/engine_shard_set.h" +#include "server/journal/journal.h" +#include "server/server_state.h" namespace dfly { @@ -334,9 +336,14 @@ bool Transaction::RunInShard(EngineShard* shard) { /*************************************************************************/ // Actually running the callback. + // If you change the logic here, also please change the logic try { // if transaction is suspended (blocked in watched queue), then it's a noop. - OpStatus status = was_suspended ? OpStatus::OK : cb_(this, shard); + OpStatus status = OpStatus::OK; + + if (!was_suspended) { + status = cb_(this, shard); + } if (unique_shard_cnt_ == 1) { cb_ = nullptr; // We can do it because only a single thread runs the callback. @@ -467,6 +474,13 @@ void Transaction::ScheduleInternal() { VLOG(2) << "Scheduled " << DebugId() << " OutOfOrder: " << bool(coordinator_state_ & COORD_OOO) << " num_shards: " << num_shards; + + if (mode == IntentLock::EXCLUSIVE) { + journal::Journal* j = ServerState::tlocal()->journal(); + // TODO: we may want to pass custom command name into journal. + if (j && j->SchedStartTx(txid_, 0, num_shards)) { + } + } coordinator_state_ |= COORD_SCHED; break; } diff --git a/src/server/transaction.h b/src/server/transaction.h index 8983586f5..24d01df26 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -179,6 +179,10 @@ class Transaction { //! Runs in the shard thread. KeyLockArgs GetLockArgs(ShardId sid) const; + OpArgs GetOpArgs(EngineShard* shard) const { + return OpArgs{shard, txid_, db_index_}; + } + private: struct LockCnt { diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index 96e7c17ad..c4e6320fb 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -970,8 +970,7 @@ void ZSetFamily::ZAdd(CmdArgList args, ConnectionContext* cntx) { absl::Span memb_sp{members.data(), members.size()}; auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpAdd(op_args, zparams, key, memb_sp); + return OpAdd(t->GetOpArgs(shard), zparams, key, memb_sp); }; OpResult add_result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1031,8 +1030,7 @@ void ZSetFamily::ZCount(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpCount(op_args, key, si); + return OpCount(t->GetOpArgs(shard), key, si); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1063,8 +1061,7 @@ void ZSetFamily::ZIncrBy(CmdArgList args, ConnectionContext* cntx) { zparams.flags = ZADD_IN_INCR; auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpAdd(op_args, zparams, key, ScoredMemberSpan{&scored_member, 1}); + return OpAdd(t->GetOpArgs(shard), zparams, key, ScoredMemberSpan{&scored_member, 1}); }; OpResult add_result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1139,7 +1136,7 @@ void ZSetFamily::ZInterStore(CmdArgList args, ConnectionContext* cntx) { ZParams zparams; zparams.override = true; add_result = - OpAdd(OpArgs{shard, t->db_index()}, zparams, dest_key, ScoredMemberSpan{smvec}).value(); + OpAdd(t->GetOpArgs(shard), zparams, dest_key, ScoredMemberSpan{smvec}).value(); } return OpStatus::OK; }; @@ -1161,8 +1158,7 @@ void ZSetFamily::ZLexCount(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpLexCount(op_args, key, li); + return OpLexCount(t->GetOpArgs(shard), key, li); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1237,8 +1233,7 @@ void ZSetFamily::ZRangeByLex(CmdArgList args, ConnectionContext* cntx) { range_spec.interval = li; auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpRange(range_spec, op_args, key); + return OpRange(range_spec, t->GetOpArgs(shard), key); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1318,8 +1313,7 @@ void ZSetFamily::ZRem(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpRem(op_args, key, members); + return OpRem(t->GetOpArgs(shard), key, members); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1335,8 +1329,7 @@ void ZSetFamily::ZScore(CmdArgList args, ConnectionContext* cntx) { string_view member = ArgS(args, 2); auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpScore(op_args, key, member); + return OpScore(t->GetOpArgs(shard), key, member); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1364,7 +1357,7 @@ void ZSetFamily::ZScan(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpScan(OpArgs{shard, t->db_index()}, key, &cursor); + return OpScan(t->GetOpArgs(shard), key, &cursor); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1427,7 +1420,7 @@ void ZSetFamily::ZUnionStore(CmdArgList args, ConnectionContext* cntx) { ZParams zparams; zparams.override = true; add_result = - OpAdd(OpArgs{shard, t->db_index()}, zparams, dest_key, ScoredMemberSpan{smvec}).value(); + OpAdd(t->GetOpArgs(shard), zparams, dest_key, ScoredMemberSpan{smvec}).value(); } return OpStatus::OK; }; @@ -1449,8 +1442,7 @@ void ZSetFamily::ZRangeByScoreInternal(string_view key, string_view min_s, strin range_spec.interval = si; auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpRange(range_spec, op_args, key); + return OpRange(range_spec, t->GetOpArgs(shard), key); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1480,8 +1472,7 @@ void ZSetFamily::OutputScoredArrayResult(const OpResult& result, void ZSetFamily::ZRemRangeGeneric(string_view key, const ZRangeSpec& range_spec, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpRemRange(op_args, key, range_spec); + return OpRemRange(t->GetOpArgs(shard), key, range_spec); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1531,8 +1522,7 @@ void ZSetFamily::ZRangeGeneric(CmdArgList args, bool reverse, ConnectionContext* range_spec.interval = ii; auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpRange(range_spec, op_args, key); + return OpRange(range_spec, t->GetOpArgs(shard), key); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1544,9 +1534,8 @@ void ZSetFamily::ZRankGeneric(CmdArgList args, bool reverse, ConnectionContext* string_view member = ArgS(args, 2); auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpRank(op_args, key, member, reverse); + return OpRank(t->GetOpArgs(shard), key, member, reverse); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb));