diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 735b245cd..b7373b244 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -1,16 +1,20 @@ add_executable(dragonfly dfly_main.cc) cxx_link(dragonfly base dragonfly_lib) -add_library(dragonfly_lib blocking_controller.cc channel_slice.cc command_registry.cc - common.cc config_flags.cc - conn_context.cc db_slice.cc debugcmd.cc - engine_shard_set.cc generic_family.cc hset_family.cc io_mgr.cc +add_library(dfly_transaction db_slice.cc engine_shard_set.cc blocking_controller.cc common.cc + io_mgr.cc journal/journal.cc journal/journal_shard.cc table.cc + tiered_storage.cc transaction.cc) +cxx_link(dfly_transaction uring_fiber_lib dfly_core strings_lib) + +add_library(dragonfly_lib channel_slice.cc command_registry.cc + config_flags.cc conn_context.cc debugcmd.cc dflycmd.cc + generic_family.cc hset_family.cc list_family.cc main_service.cc rdb_load.cc rdb_save.cc replica.cc snapshot.cc script_mgr.cc server_family.cc - set_family.cc stream_family.cc string_family.cc table.cc tiered_storage.cc - transaction.cc zset_family.cc version.cc) + set_family.cc stream_family.cc string_family.cc + zset_family.cc version.cc) -cxx_link(dragonfly_lib dfly_core dfly_facade redis_lib strings_lib html_lib) +cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib strings_lib html_lib) add_library(dfly_test_lib test_utils.cc) cxx_link(dfly_test_lib dragonfly_lib facade_test gtest_main_ext) diff --git a/src/server/common.h b/src/server/common.h index 146bd858e..0194e667a 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -21,13 +21,14 @@ constexpr int64_t kMaxExpireDeadlineSec = (1u << 27) - 1; using DbIndex = uint16_t; using ShardId = uint16_t; +using LSN = uint64_t; using TxId = uint64_t; using TxClock = uint64_t; -using facade::MutableSlice; +using facade::ArgS; using facade::CmdArgList; using facade::CmdArgVec; -using facade::ArgS; +using facade::MutableSlice; using ArgSlice = absl::Span; using StringVec = std::vector; @@ -55,8 +56,8 @@ struct KeyIndex { // relevant for for commands like ZUNIONSTORE/ZINTERSTORE for destination key. unsigned bonus = 0; unsigned start; - unsigned end; // does not include this index (open limit). - unsigned step; // 1 for commands like mget. 2 for commands like mset. + unsigned end; // does not include this index (open limit). + unsigned step; // 1 for commands like mget. 2 for commands like mset. bool HasSingleKey() const { return bonus == 0 && (start + step >= end); @@ -69,9 +70,15 @@ struct KeyIndex { struct OpArgs { EngineShard* shard; + TxId txid; DbIndex db_ind; -}; + OpArgs() : shard(nullptr), txid(0), db_ind(0) { + } + + OpArgs(EngineShard* s, TxId i, DbIndex d) : shard(s), txid(i), db_ind(d) { + } +}; struct TieredStats { size_t external_reads = 0; @@ -118,7 +125,6 @@ extern size_t max_memory_limit; // set upon server start. extern unsigned kernel_version; - const char* GlobalStateName(GlobalState gs); } // namespace dfly diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index d8dbfba3e..09b81cf1e 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -60,7 +60,8 @@ struct ObjInfo { void DoPopulateBatch(std::string_view prefix, size_t val_size, const SetCmd::SetParams& params, const PopulateBatch& batch) { - SetCmd sg(&EngineShard::tlocal()->db_slice()); + OpArgs op_args(EngineShard::tlocal(), 0, params.db_index); + SetCmd sg(op_args); for (unsigned i = 0; i < batch.sz; ++i) { string key = absl::StrCat(prefix, ":", batch.index[i]); diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc new file mode 100644 index 000000000..640fb3287 --- /dev/null +++ b/src/server/dflycmd.cc @@ -0,0 +1,124 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// +#include "server/dflycmd.h" + +#include +#include + +#include "base/flags.h" +#include "base/logging.h" +#include "facade/dragonfly_connection.h" + +#include "server/engine_shard_set.h" +#include "server/error.h" +#include "server/journal/journal.h" +#include "server/server_state.h" +#include "server/transaction.h" + +using namespace std; + +ABSL_DECLARE_FLAG(string, dir); + +namespace dfly { + +using namespace facade; +using namespace std; +using util::ProactorBase; + +DflyCmd::DflyCmd(util::ListenerInterface* listener, journal::Journal* journal) : listener_(listener), journal_(journal) { +} + +void DflyCmd::Run(CmdArgList args, ConnectionContext* cntx) { + DCHECK_GE(args.size(), 2u); + + ToUpper(&args[1]); + RedisReplyBuilder* rb = static_cast(cntx->reply_builder()); + + std::string_view sub_cmd = ArgS(args, 1); + if (sub_cmd == "JOURNAL") { + if (args.size() < 3) { + return rb->SendError(WrongNumArgsError("DFLY JOURNAL")); + } + HandleJournal(args, cntx); + return; + } + + if (sub_cmd == "THREAD") { + util::ProactorPool* pool = shard_set->pool(); + + if (args.size() == 2) { // DFLY THREAD : returns connection thread index and number of threads. + rb->StartArray(2); + rb->SendLong(ProactorBase::GetIndex()); + rb->SendLong(long(pool->size())); + return; + } + + // DFLY THREAD to_thread : migrates current connection to a different thread. + std::string_view arg = ArgS(args, 2); + unsigned num_thread; + if (!absl::SimpleAtoi(arg, &num_thread)) { + return rb->SendError(kSyntaxErr); + } + + if (num_thread < pool->size()) { + if (int(num_thread) != ProactorBase::GetIndex()) { + listener_->Migrate(cntx->owner(), pool->at(num_thread)); + } + + return rb->SendOk(); + } + + rb->SendError(kInvalidIntErr); + return; + } + + rb->SendError(kSyntaxErr); +} + + +void DflyCmd::HandleJournal(CmdArgList args, ConnectionContext* cntx) { + DCHECK_GE(args.size(), 3u); + ToUpper(&args[2]); + + std::string_view sub_cmd = ArgS(args, 2); + Transaction* trans = cntx->transaction; + DCHECK(trans); + RedisReplyBuilder* rb = static_cast(cntx->reply_builder()); + + if (sub_cmd == "START") { + unique_lock lk(mu_); + journal::Journal* journal = ServerState::tlocal()->journal(); + if (!journal) { + string dir = absl::GetFlag(FLAGS_dir); + journal_->StartLogging(dir); + trans->Schedule(); + auto barrier_cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; + trans->Execute(barrier_cb, true); + + // tx id starting from which we may reliably fetch journal records. + journal_txid_ = trans->txid(); + } + + return rb->SendLong(journal_txid_); + } + + if (sub_cmd == "STOP") { + unique_lock lk(mu_); + if (journal_->EnterLameDuck()) { + auto barrier_cb = [](Transaction* t, EngineShard* shard) { return OpStatus::OK; }; + trans->ScheduleSingleHop(std::move(barrier_cb)); + + auto ec = journal_->Close(); + LOG_IF(ERROR, ec) << "Error closing journal " << ec; + journal_txid_ = trans->txid(); + } + + return rb->SendLong(journal_txid_); + } + + string reply = UnknownSubCmd(sub_cmd, "DFLY"); + return rb->SendError(reply, kSyntaxErrType); +} + +} // namespace dfly diff --git a/src/server/dflycmd.h b/src/server/dflycmd.h new file mode 100644 index 000000000..df99b9fba --- /dev/null +++ b/src/server/dflycmd.h @@ -0,0 +1,38 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include + +#include "server/conn_context.h" + +namespace util { +class ListenerInterface; +} // namespace util + +namespace dfly { + +class EngineShardSet; + +namespace journal { +class Journal; +} // namespace journal + +class DflyCmd { + public: + DflyCmd(util::ListenerInterface* listener, journal::Journal* journal); + + void Run(CmdArgList args, ConnectionContext* cntx); + + private: + void HandleJournal(CmdArgList args, ConnectionContext* cntx); + + util::ListenerInterface* listener_; + journal::Journal* journal_; + ::boost::fibers::mutex mu_; + TxId journal_txid_ = 0; +}; + +} // namespace dfly diff --git a/src/server/engine_shard_set.cc b/src/server/engine_shard_set.cc index 1e6ea49b1..273be74d3 100644 --- a/src/server/engine_shard_set.cc +++ b/src/server/engine_shard_set.cc @@ -27,7 +27,9 @@ ABSL_FLAG(uint32_t, hz, 1000, "and performs other background tasks. Warning: not advised to decrease in production, " "because it can affect expiry precision for PSETEX etc."); -ABSL_DECLARE_FLAG(bool, cache_mode); +ABSL_FLAG(bool, cache_mode, false, + "If true, the backend behaves like a cache, " + "by evicting entries when getting close to maxmemory limit"); namespace dfly { @@ -42,8 +44,9 @@ vector cached_stats; // initialized in EngineShard } // namespace -thread_local EngineShard* EngineShard::shard_ = nullptr; constexpr size_t kQueueLen = 64; + +thread_local EngineShard* EngineShard::shard_ = nullptr; EngineShardSet* shard_set = nullptr; EngineShard::Stats& EngineShard::Stats::operator+=(const EngineShard::Stats& o) { diff --git a/src/server/engine_shard_set.h b/src/server/engine_shard_set.h index e09ba02b6..5ff6ea81d 100644 --- a/src/server/engine_shard_set.h +++ b/src/server/engine_shard_set.h @@ -26,6 +26,10 @@ extern "C" { namespace dfly { +namespace journal { +class Journal; +} // namespace journal + class TieredStorage; class BlockingController; @@ -134,6 +138,14 @@ class EngineShard { return counter_[unsigned(type)].SumTail(); } + journal::Journal* journal() { + return journal_; + } + + void set_journal(journal::Journal* j) { + journal_ = j; + } + void TEST_EnableHeartbeat(); private: @@ -160,6 +172,7 @@ class EngineShard { // Logical ts used to order distributed transactions. TxId committed_txid_ = 0; Transaction* continuation_trans_ = nullptr; + journal::Journal* journal_ = nullptr; IntentLock shard_lock_; uint32_t periodic_task_ = 0; diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index d757283c8..5d46d753d 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -244,7 +244,7 @@ uint64_t ScanGeneric(uint64_t cursor, const ScanOpts& scan_opts, StringVec* keys do { ess->Await(sid, [&] { - OpArgs op_args{EngineShard::tlocal(), cntx->conn_state.db_index}; + OpArgs op_args{EngineShard::tlocal(), 0, cntx->conn_state.db_index}; OpScan(op_args, scan_opts, &cursor, keys); }); @@ -281,7 +281,7 @@ void GenericFamily::Del(CmdArgList args, ConnectionContext* cntx) { auto cb = [&result](const Transaction* t, EngineShard* shard) { ArgSlice args = t->ShardArgsInShard(shard->shard_id()); - auto res = OpDel(OpArgs{shard, t->db_index()}, args); + auto res = OpDel(t->GetOpArgs(shard), args); result.fetch_add(res.value_or(0), memory_order_relaxed); return OpStatus::OK; @@ -332,7 +332,7 @@ void GenericFamily::Exists(CmdArgList args, ConnectionContext* cntx) { auto cb = [&result](Transaction* t, EngineShard* shard) { ArgSlice args = t->ShardArgsInShard(shard->shard_id()); - auto res = OpExists(OpArgs{shard, t->db_index()}, args); + auto res = OpExists(t->GetOpArgs(shard), args); result.fetch_add(res.value_or(0), memory_order_relaxed); return OpStatus::OK; @@ -362,7 +362,7 @@ void GenericFamily::Expire(CmdArgList args, ConnectionContext* cntx) { ExpireParams params{.ts = int_arg}; auto cb = [&](Transaction* t, EngineShard* shard) { - return OpExpire(OpArgs{shard, t->db_index()}, key, params); + return OpExpire(t->GetOpArgs(shard), key, params); }; OpStatus status = cntx->transaction->ScheduleSingleHop(move(cb)); @@ -381,7 +381,7 @@ void GenericFamily::ExpireAt(CmdArgList args, ConnectionContext* cntx) { ExpireParams params{.ts = int_arg, .absolute = true}; auto cb = [&](Transaction* t, EngineShard* shard) { - return OpExpire(OpArgs{shard, t->db_index()}, key, params); + return OpExpire(t->GetOpArgs(shard), key, params); }; OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -425,7 +425,7 @@ void GenericFamily::PexpireAt(CmdArgList args, ConnectionContext* cntx) { ExpireParams params{.ts = int_arg, .absolute = true, .unit = MSEC}; auto cb = [&](Transaction* t, EngineShard* shard) { - return OpExpire(OpArgs{shard, t->db_index()}, key, params); + return OpExpire(t->GetOpArgs(shard), key, params); }; OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -524,7 +524,7 @@ OpResult GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des if (transaction->unique_shard_cnt() == 1) { auto cb = [&](Transaction* t, EngineShard* shard) { - return OpRen(OpArgs{shard, t->db_index()}, key[0], key[1], skip_exist_dest); + return OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest); }; OpResult result = transaction->ScheduleSingleHopT(std::move(cb)); diff --git a/src/server/hset_family.cc b/src/server/hset_family.cc index b3ed44f27..38a54a818 100644 --- a/src/server/hset_family.cc +++ b/src/server/hset_family.cc @@ -99,7 +99,7 @@ void HSetFamily::HDel(CmdArgList args, ConnectionContext* cntx) { args.remove_prefix(2); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpDel(OpArgs{shard, t->db_index()}, key, args); + return OpDel(t->GetOpArgs(shard), key, args); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -114,7 +114,7 @@ void HSetFamily::HLen(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpLen(OpArgs{shard, t->db_index()}, key); + return OpLen(t->GetOpArgs(shard), key); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -157,7 +157,7 @@ void HSetFamily::HMGet(CmdArgList args, ConnectionContext* cntx) { args.remove_prefix(2); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpMGet(OpArgs{shard, t->db_index()}, key, args); + return OpMGet(t->GetOpArgs(shard), key, args); }; OpResult> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -186,7 +186,7 @@ void HSetFamily::HGet(CmdArgList args, ConnectionContext* cntx) { string_view field = ArgS(args, 2); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpGet(OpArgs{shard, t->db_index()}, key, field); + return OpGet(t->GetOpArgs(shard), key, field); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -214,7 +214,7 @@ void HSetFamily::HIncrBy(CmdArgList args, ConnectionContext* cntx) { IncrByParam param{ival}; auto cb = [&](Transaction* t, EngineShard* shard) { - return OpIncrBy(OpArgs{shard, t->db_index()}, key, field, ¶m); + return OpIncrBy(t->GetOpArgs(shard), key, field, ¶m); }; OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -249,7 +249,7 @@ void HSetFamily::HIncrByFloat(CmdArgList args, ConnectionContext* cntx) { IncrByParam param{dval}; auto cb = [&](Transaction* t, EngineShard* shard) { - return OpIncrBy(OpArgs{shard, t->db_index()}, key, field, ¶m); + return OpIncrBy(t->GetOpArgs(shard), key, field, ¶m); }; OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -284,7 +284,7 @@ void HSetFamily::HGetGeneric(CmdArgList args, ConnectionContext* cntx, uint8_t g string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpGetAll(OpArgs{shard, t->db_index()}, key, getall_mask); + return OpGetAll(t->GetOpArgs(shard), key, getall_mask); }; OpResult> result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -311,7 +311,7 @@ void HSetFamily::HScan(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpScan(OpArgs{shard, t->db_index()}, key, &cursor); + return OpScan(t->GetOpArgs(shard), key, &cursor); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -339,7 +339,7 @@ void HSetFamily::HSet(CmdArgList args, ConnectionContext* cntx) { args.remove_prefix(2); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpSet(OpArgs{shard, t->db_index()}, key, args, false); + return OpSet(t->GetOpArgs(shard), key, args, false); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -355,7 +355,7 @@ void HSetFamily::HSetNx(CmdArgList args, ConnectionContext* cntx) { args.remove_prefix(2); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpSet(OpArgs{shard, t->db_index()}, key, args, true); + return OpSet(t->GetOpArgs(shard), key, args, true); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -371,7 +371,7 @@ void HSetFamily::HStrLen(CmdArgList args, ConnectionContext* cntx) { string_view field = ArgS(args, 2); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpStrLen(OpArgs{shard, t->db_index()}, key, field); + return OpStrLen(t->GetOpArgs(shard), key, field); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc new file mode 100644 index 000000000..35dac782b --- /dev/null +++ b/src/server/journal/journal.cc @@ -0,0 +1,126 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/journal/journal.h" + +#include + +#include "base/logging.h" +#include "server/engine_shard_set.h" +#include "server/journal/journal_shard.h" +#include "server/server_state.h" + +namespace dfly { +namespace journal { + +namespace fs = std::filesystem; +using namespace std; +using namespace util; +namespace fibers = boost::fibers; + +namespace { + +thread_local JournalShard journal_shard; + +} // namespace + +Journal::Journal() { +} + +error_code Journal::StartLogging(std::string_view dir) { + if (journal_shard.IsOpen()) { + return error_code{}; + } + + auto* pool = shard_set->pool(); + atomic_uint32_t created{0}; + lock_guard lk(state_mu_); + + auto open_cb = [&](auto* pb) { + auto ec = journal_shard.Open(dir, unsigned(ProactorBase::GetIndex())); + if (ec) { + LOG(FATAL) << "Could not create journal " << ec; // TODO + } else { + created.fetch_add(1, memory_order_relaxed); + ServerState::tlocal()->set_journal(this); + EngineShard* shard = EngineShard::tlocal(); + if (shard) { + shard->set_journal(this); + } + } + }; + + pool->AwaitFiberOnAll(open_cb); + + if (created.load(memory_order_acquire) != pool->size()) { + LOG(FATAL) << "TBD / revert"; + } + + return error_code{}; +} + +error_code Journal::Close() { + CHECK(lameduck_.load(memory_order_relaxed)); + + VLOG(1) << "Journal::Close"; + + fibers::mutex ec_mu; + error_code res; + + lock_guard lk(state_mu_); + auto close_cb = [&](auto*) { + ServerState::tlocal()->set_journal(nullptr); + EngineShard* shard = EngineShard::tlocal(); + if (shard) { + shard->set_journal(nullptr); + } + + auto ec = journal_shard.Close(); + + if (ec) { + lock_guard lk2(ec_mu); + res = ec; + } + }; + + shard_set->pool()->AwaitFiberOnAll(close_cb); + + return res; +} + +bool Journal::SchedStartTx(TxId txid, unsigned num_keys, unsigned num_shards) { + if (!journal_shard.IsOpen() || lameduck_.load(memory_order_relaxed)) + return false; + + journal_shard.AddLogRecord(txid, unsigned(Op::SCHED)); + + return true; +} + +LSN Journal::GetLsn() const { + return journal_shard.cur_lsn(); +} + +bool Journal::EnterLameDuck() { + if (!journal_shard.IsOpen()) { + return false; + } + + bool val = false; + bool res = lameduck_.compare_exchange_strong(val, true, memory_order_acq_rel); + return res; +} + +void Journal::OpArgs(TxId txid, Op opcode, Span keys) { + DCHECK(journal_shard.IsOpen()); + + journal_shard.AddLogRecord(txid, unsigned(opcode)); +} + +void Journal::RecordEntry(TxId txid, const PrimeKey& key, const PrimeValue& pval) { + journal_shard.AddLogRecord(txid, unsigned(Op::VAL)); +} + +} // namespace journal +} // namespace dfly diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h new file mode 100644 index 000000000..e26b99cbc --- /dev/null +++ b/src/server/journal/journal.h @@ -0,0 +1,73 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include "server/common.h" +#include "server/table.h" +#include "util/proactor_pool.h" + +namespace dfly { + +class Transaction; + +namespace journal { + +enum class Op : uint8_t { + NOOP = 0, + LOCK = 1, + UNLOCK = 2, + LOCK_SHARD = 3, + UNLOCK_SHARD = 4, + SCHED = 5, + VAL = 10, + DEL, + MSET, +}; + +class Journal { + public: + using Span = absl::Span; + + Journal(); + + std::error_code StartLogging(std::string_view dir); + + // Returns true if journal has been active and changed its state to lameduck mode + // and false otherwise. + bool EnterLameDuck(); // still logs ongoing transactions but refuses to start new ones. + + // Requires: journal is in lameduck mode. + std::error_code Close(); + + // Returns true if transaction was scheduled, false if journal is inactive + // or in lameduck mode and does not log new transactions. + bool SchedStartTx(TxId txid, unsigned num_keys, unsigned num_shards); + + void AddCmd(TxId txid, Op opcode, Span args) { + OpArgs(txid, opcode, args); + } + + void Lock(TxId txid, Span keys) { + OpArgs(txid, Op::LOCK, keys); + } + + void Unlock(TxId txid, Span keys) { + OpArgs(txid, Op::UNLOCK, keys); + } + + LSN GetLsn() const; + + void RecordEntry(TxId txid, const PrimeKey& key, const PrimeValue& pval); + + private: + void OpArgs(TxId id, Op opcode, Span keys); + + mutable boost::fibers::mutex state_mu_; + + std::atomic_bool lameduck_{false}; +}; + +} // namespace journal +} // namespace dfly diff --git a/src/server/journal/journal_shard.cc b/src/server/journal/journal_shard.cc new file mode 100644 index 000000000..fbd0940c3 --- /dev/null +++ b/src/server/journal/journal_shard.cc @@ -0,0 +1,115 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#include "server/journal/journal_shard.h" + +#include + +#include +#include + +#include + +#include "base/logging.h" +#include "util/fibers/fibers_ext.h" + +namespace dfly { +namespace journal { +using namespace std; +using namespace util; +namespace fibers = boost::fibers; +namespace fs = std::filesystem; + +namespace { + +string ShardName(std::string_view base, unsigned index) { + return absl::StrCat(base, "-", absl::Dec(index, absl::kZeroPad4), ".log"); +} + +} // namespace + +#define CHECK_EC(x) \ + do { \ + auto __ec$ = (x); \ + CHECK(!__ec$) << "Error: " << __ec$ << " " << __ec$.message() << " for " << #x; \ + } while (false) + + + +JournalShard::JournalShard() { +} + +JournalShard::~JournalShard() { + CHECK(!shard_file_); +} + +std::error_code JournalShard::Open(const std::string_view dir, unsigned index) { + CHECK(!shard_file_); + + fs::path dir_path; + + if (dir.empty()) { + } else { + dir_path = dir; + error_code ec; + + fs::file_status dir_status = fs::status(dir_path, ec); + if (ec) { + if (ec == errc::no_such_file_or_directory) { + fs::create_directory(dir_path, ec); + dir_status = fs::status(dir_path, ec); + } + if (ec) + return ec; + } + // LOG(INFO) << int(dir_status.type()); + } + dir_path.append(ShardName("journal", index)); + shard_path_ = dir_path; + + // For file integrity guidelines see: + // https://lwn.net/Articles/457667/ + // https://www.evanjones.ca/durability-filesystem.html + // NOTE: O_DSYNC is omited. + constexpr auto kJournalFlags = O_CLOEXEC | O_CREAT | O_TRUNC | O_RDWR; + io::Result> res = + uring::OpenLinux(shard_path_, kJournalFlags, 0666); + if (!res) { + return res.error(); + } + DVLOG(1) << "Opened journal " << shard_path_; + + shard_file_ = std::move(res).value(); + shard_index_ = index; + file_offset_ = 0; + status_ec_.clear(); + + return error_code{}; +} + +error_code JournalShard::Close() { + VLOG(1) << "JournalShard::Close"; + + CHECK(shard_file_); + lameduck_ = true; + + auto ec = shard_file_->Close(); + + DVLOG(1) << "Closing " << shard_path_; + LOG_IF(ERROR, ec) << "Error closing journal file " << ec; + shard_file_.reset(); + + return ec; +} + +void JournalShard::AddLogRecord(TxId txid, unsigned opcode) { + string line = absl::StrCat(lsn_, " ", txid, " ", opcode, "\n"); + error_code ec = shard_file_->Write(io::Buffer(line), file_offset_, 0); + CHECK_EC(ec); + file_offset_ += line.size(); + ++lsn_; +} + +} // namespace journal +} // namespace dfly \ No newline at end of file diff --git a/src/server/journal/journal_shard.h b/src/server/journal/journal_shard.h new file mode 100644 index 000000000..f58c4b98b --- /dev/null +++ b/src/server/journal/journal_shard.h @@ -0,0 +1,56 @@ +// Copyright 2022, Roman Gershman. All rights reserved. +// See LICENSE for licensing terms. +// + +#pragma once + +#include +#include +#include +#include + +#include "server/common.h" +#include "util/uring/uring_file.h" + +namespace dfly { +namespace journal { + +class JournalShard { + public: + JournalShard(); + ~JournalShard(); + + std::error_code Open(const std::string_view dir, unsigned index); + + std::error_code Close(); + + LSN cur_lsn() const { + return lsn_; + } + + std::error_code status() const { + return status_ec_; + } + + bool IsOpen() const { + return bool(shard_file_); + } + + void AddLogRecord(TxId txid, unsigned opcode); + + private: + std::string shard_path_; + std::unique_ptr shard_file_; + + size_t file_offset_ = 0; + LSN lsn_ = 1; + + unsigned shard_index_ = -1; + + std::error_code status_ec_; + + bool lameduck_ = false; +}; + +} // namespace journal +} // namespace dfly \ No newline at end of file diff --git a/src/server/list_family.cc b/src/server/list_family.cc index f5238ea4f..aea3787da 100644 --- a/src/server/list_family.cc +++ b/src/server/list_family.cc @@ -495,7 +495,7 @@ void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) { if (cntx->transaction->unique_shard_cnt() == 1) { auto cb = [&](Transaction* t, EngineShard* shard) { - return OpRPopLPushSingleShard(OpArgs{shard, t->db_index()}, src, dest); + return OpRPopLPushSingleShard(t->GetOpArgs(shard), src, dest); }; result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -515,7 +515,7 @@ void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) { auto args = t->ShardArgsInShard(shard->shard_id()); DCHECK_EQ(1u, args.size()); bool is_dest = args.front() == dest; - find_res[is_dest] = RPeek(OpArgs{shard, t->db_index()}, args.front(), !is_dest); + find_res[is_dest] = RPeek(t->GetOpArgs(shard), args.front(), !is_dest); return OpStatus::OK; }; @@ -530,7 +530,7 @@ void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) { auto args = t->ShardArgsInShard(shard->shard_id()); bool is_dest = args.front() == dest; - OpArgs op_args{shard, t->db_index()}; + OpArgs op_args = t->GetOpArgs(shard); if (is_dest) { string_view val{find_res[0].value()}; @@ -564,7 +564,7 @@ void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) { void ListFamily::LLen(CmdArgList args, ConnectionContext* cntx) { auto key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) { - return OpLen(OpArgs{shard, t->db_index()}, key); + return OpLen(t->GetOpArgs(shard), key); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); if (result) { @@ -586,7 +586,7 @@ void ListFamily::LIndex(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpIndex(OpArgs{shard, t->db_index()}, key, index); + return OpIndex(t->GetOpArgs(shard), key, index); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -617,7 +617,7 @@ void ListFamily::LInsert(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpInsert(OpArgs{shard, t->db_index()}, key, pivot, elem, where); + return OpInsert(t->GetOpArgs(shard), key, pivot, elem, where); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -640,7 +640,7 @@ void ListFamily::LTrim(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpTrim(OpArgs{shard, t->db_index()}, key, start, end); + return OpTrim(t->GetOpArgs(shard), key, start, end); }; cntx->transaction->ScheduleSingleHop(std::move(cb)); (*cntx)->SendOk(); @@ -658,7 +658,7 @@ void ListFamily::LRange(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpRange(OpArgs{shard, t->db_index()}, key, start, end); + return OpRange(t->GetOpArgs(shard), key, start, end); }; auto res = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -682,7 +682,7 @@ void ListFamily::LRem(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpRem(OpArgs{shard, t->db_index()}, key, elem, count); + return OpRem(t->GetOpArgs(shard), key, elem, count); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); if (result) { @@ -704,7 +704,7 @@ void ListFamily::LSet(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpSet(OpArgs{shard, t->db_index()}, key, elem, count); + return OpSet(t->GetOpArgs(shard), key, elem, count); }; OpResult result = cntx->transaction->ScheduleSingleHop(std::move(cb)); if (result) { @@ -769,7 +769,7 @@ void ListFamily::PushGeneric(ListDir dir, bool skip_notexists, CmdArgList args, } absl::Span span{vals.data(), vals.size()}; auto cb = [&](Transaction* t, EngineShard* shard) { - return OpPush(OpArgs{shard, t->db_index()}, key, dir, skip_notexists, span); + return OpPush(t->GetOpArgs(shard), key, dir, skip_notexists, span); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -803,7 +803,7 @@ void ListFamily::PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cnt } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpPop(OpArgs{shard, t->db_index()}, key, dir, count, true); + return OpPop(t->GetOpArgs(shard), key, dir, count, true); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 601cbf949..fb5b598e0 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -44,9 +44,6 @@ ABSL_FLAG(uint32_t, memcache_port, 0, "Memcached port"); ABSL_FLAG(uint64_t, maxmemory, 0, "Limit on maximum-memory that is used by the database." "0 - means the program will automatically determine its maximum memory usage"); -ABSL_FLAG(bool, cache_mode, false, - "If true, the backend behaves like a cache, " - "by evicting entries when getting close to maxmemory limit"); ABSL_DECLARE_FLAG(string, requirepass); diff --git a/src/server/replica.cc b/src/server/replica.cc index 9535a9d8e..a304bbbb4 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -79,13 +79,6 @@ error_code Recv(FiberSocketBase* input, base::IoBuf* dest) { constexpr unsigned kRdbEofMarkSize = 40; -// TODO: to remove usages of this macro and make code crash-less. -#define CHECK_EC(x) \ - do { \ - auto __ec$ = (x); \ - CHECK(!__ec$) << "Error: " << __ec$ << " " << __ec$.message() << " for " << #x; \ - } while (false) - } // namespace Replica::Replica(string host, uint16_t port, Service* se) @@ -510,7 +503,7 @@ error_code Replica::ConsumeRedisStream() { // Master waits for this command in order to start sending replication stream. serializer.SendCommand("REPLCONF ACK 0"); - CHECK_EC(serializer.ec()); + RETURN_ON_ERR(serializer.ec()); VLOG(1) << "Before reading repl-log"; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 5846969ca..cd50f0575 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -25,8 +25,10 @@ extern "C" { #include "server/command_registry.h" #include "server/conn_context.h" #include "server/debugcmd.h" +#include "server/dflycmd.h" #include "server/engine_shard_set.h" #include "server/error.h" +#include "server/journal/journal.h" #include "server/main_service.h" #include "server/rdb_load.h" #include "server/rdb_save.h" @@ -150,6 +152,7 @@ ServerFamily::ServerFamily(Service* service) : service_(*service) { lsinfo_ = make_shared(); lsinfo_->save_time = start_time_; script_mgr_.reset(new ScriptMgr()); + journal_.reset(new journal::Journal); } ServerFamily::~ServerFamily() { @@ -159,6 +162,7 @@ void ServerFamily::Init(util::AcceptServer* acceptor, util::ListenerInterface* m CHECK(acceptor_ == nullptr); acceptor_ = acceptor; main_listener_ = main_listener; + dfly_cmd_.reset(new DflyCmd(main_listener, journal_.get())); pb_task_ = shard_set->pool()->GetNextProactor(); auto cache_cb = [] { @@ -207,6 +211,11 @@ void ServerFamily::Shutdown() { pb_task_->CancelPeriodic(stats_caching_task_); stats_caching_task_ = 0; + if (journal_->EnterLameDuck()) { + auto ec = journal_->Close(); + LOG_IF(ERROR, ec) << "Error closing journal " << ec; + } + unique_lock lk(replicaof_mu_); if (replica_) { replica_->Stop(); @@ -1122,6 +1131,10 @@ void ServerFamily::SyncGeneric(std::string_view repl_master_id, uint64_t offs, // TBD. } +void ServerFamily::Dfly(CmdArgList args, ConnectionContext* cntx) { + dfly_cmd_->Run(args, cntx); +} + #define HFUNC(x) SetHandler(HandlerFunc(this, &ServerFamily::x)) void ServerFamily::Register(CommandRegistry* registry) { @@ -1148,7 +1161,8 @@ void ServerFamily::Register(CommandRegistry* registry) { << CI{"ROLE", CO::LOADING | CO::FAST | CO::NOSCRIPT, 1, 0, 0, 0}.HFUNC(Role) << CI{"SYNC", CO::ADMIN | CO::GLOBAL_TRANS, 1, 0, 0, 0}.HFUNC(Sync) << CI{"PSYNC", CO::ADMIN | CO::GLOBAL_TRANS, 3, 0, 0, 0}.HFUNC(Psync) - << CI{"SCRIPT", CO::NOSCRIPT, -2, 0, 0, 0}.HFUNC(Script); + << CI{"SCRIPT", CO::NOSCRIPT, -2, 0, 0, 0}.HFUNC(Script) + << CI{"DFLY", CO::ADMIN | CO::GLOBAL_TRANS, -2, 0, 0, 0}.HFUNC(Dfly); } } // namespace dfly diff --git a/src/server/server_family.h b/src/server/server_family.h index 5135e84fb..43d133bbf 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -17,8 +17,13 @@ class HttpListenerBase; namespace dfly { +namespace journal { +class Journal; +} // namespace journal + class ConnectionContext; class CommandRegistry; +class DflyCmd; class Service; class Replica; class ScriptMgr; @@ -41,9 +46,9 @@ struct Metrics { }; struct LastSaveInfo { - time_t save_time; // epoch time in seconds. - std::string file_name; // - std::vector> freq_map; // RDB_TYPE_xxx -> count mapping. + time_t save_time; // epoch time in seconds. + std::string file_name; // + std::vector> freq_map; // RDB_TYPE_xxx -> count mapping. }; class ServerFamily { @@ -91,6 +96,7 @@ class ServerFamily { void Config(CmdArgList args, ConnectionContext* cntx); void DbSize(CmdArgList args, ConnectionContext* cntx); void Debug(CmdArgList args, ConnectionContext* cntx); + void Dfly(CmdArgList args, ConnectionContext* cntx); void Memory(CmdArgList args, ConnectionContext* cntx); void FlushDb(CmdArgList args, ConnectionContext* cntx); void FlushAll(CmdArgList args, ConnectionContext* cntx); @@ -111,7 +117,6 @@ class ServerFamily { void Load(const std::string& file_name); - boost::fibers::fiber load_fiber_; uint32_t stats_caching_task_ = 0; @@ -125,6 +130,8 @@ class ServerFamily { std::shared_ptr replica_; // protected by replica_of_mu_ std::unique_ptr script_mgr_; + std::unique_ptr journal_; + std::unique_ptr dfly_cmd_; time_t start_time_ = 0; // in seconds, epoch time. diff --git a/src/server/server_state.h b/src/server/server_state.h index 1c4bcd873..34261fb7a 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -15,6 +15,10 @@ typedef struct mi_heap_s mi_heap_t; namespace dfly { +namespace journal { +class Journal; +} // namespace journal + // Present in every server thread. This class differs from EngineShard. The latter manages // state around engine shards while the former represents coordinator/connection state. // There may be threads that handle engine shards but not IO, there may be threads that handle IO @@ -58,6 +62,7 @@ class ServerState { // public struct - to allow initialization. GlobalState gstate() const { return gstate_; } + void set_gstate(GlobalState s) { gstate_ = s; } @@ -66,7 +71,9 @@ class ServerState { // public struct - to allow initialization. // Returns sum of all requests in the last 6 seconds // (not including the current one). - uint32_t MovingSum6() const { return qps_.SumTail(); } + uint32_t MovingSum6() const { + return qps_.SumTail(); + } void RecordCmd() { ++connection_stats.command_cnt; @@ -78,9 +85,18 @@ class ServerState { // public struct - to allow initialization. return data_heap_; } + journal::Journal* journal() { + return journal_; + } + + void set_journal(journal::Journal* j) { + journal_ = j; + } + private: int64_t live_transactions_ = 0; mi_heap_t* data_heap_; + journal::Journal* journal_ = nullptr; std::optional interpreter_; GlobalState gstate_ = GlobalState::ACTIVE; diff --git a/src/server/set_family.cc b/src/server/set_family.cc index 0cca3357e..5a1c2e40d 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -452,7 +452,7 @@ OpStatus Mover::OpMutate(Transaction* t, EngineShard* es) { ArgSlice largs = t->ShardArgsInShard(es->shard_id()); DCHECK_LE(largs.size(), 2u); - OpArgs op_args{es, t->db_index()}; + OpArgs op_args = t->GetOpArgs(es); for (auto k : largs) { if (k == src_) { CHECK_EQ(1u, OpRem(op_args, k, {member_}).value()); // must succeed. @@ -677,8 +677,7 @@ void SetFamily::SAdd(CmdArgList args, ConnectionContext* cntx) { ArgSlice arg_slice{vals.data(), vals.size()}; auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpAdd(op_args, key, arg_slice, false); + return OpAdd(t->GetOpArgs(shard), key, arg_slice, false); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -741,7 +740,7 @@ void SetFamily::SRem(CmdArgList args, ConnectionContext* cntx) { ArgSlice span{vals.data(), vals.size()}; auto cb = [&](Transaction* t, EngineShard* shard) { - return OpRem(OpArgs{shard, t->db_index()}, key, span); + return OpRem(t->GetOpArgs(shard), key, span); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -791,7 +790,7 @@ void SetFamily::SPop(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpPop(OpArgs{shard, t->db_index()}, key, count); + return OpPop(t->GetOpArgs(shard), key, count); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -821,9 +820,9 @@ void SetFamily::SDiff(CmdArgList args, ConnectionContext* cntx) { ArgSlice largs = t->ShardArgsInShard(shard->shard_id()); if (shard->shard_id() == src_shard) { CHECK_EQ(src_key, largs.front()); - result_set[shard->shard_id()] = OpDiff(OpArgs{shard, t->db_index()}, largs); + result_set[shard->shard_id()] = OpDiff(t->GetOpArgs(shard), largs); } else { - result_set[shard->shard_id()] = OpUnion(OpArgs{shard, t->db_index()}, largs); + result_set[shard->shard_id()] = OpUnion(t->GetOpArgs(shard), largs); } return OpStatus::OK; @@ -864,7 +863,7 @@ void SetFamily::SDiffStore(CmdArgList args, ConnectionContext* cntx) { return OpStatus::OK; } - OpArgs op_args{shard, t->db_index()}; + OpArgs op_args = t->GetOpArgs(shard); if (shard->shard_id() == src_shard) { CHECK_EQ(src_key, largs.front()); result_set[shard->shard_id()] = OpDiff(op_args, largs); // Diff @@ -887,7 +886,7 @@ void SetFamily::SDiffStore(CmdArgList args, ConnectionContext* cntx) { SvArray result = ToSvArray(rsv.value()); auto store_cb = [&](Transaction* t, EngineShard* shard) { if (shard->shard_id() == dest_shard) { - OpAdd(OpArgs{shard, t->db_index()}, dest_key, result, true); + OpAdd(t->GetOpArgs(shard), dest_key, result, true); } return OpStatus::OK; @@ -966,7 +965,7 @@ void SetFamily::SInterStore(CmdArgList args, ConnectionContext* cntx) { auto store_cb = [&](Transaction* t, EngineShard* shard) { if (shard->shard_id() == dest_shard) { - OpAdd(OpArgs{shard, t->db_index()}, dest_key, result.value(), true); + OpAdd(t->GetOpArgs(shard), dest_key, result.value(), true); } return OpStatus::OK; @@ -981,7 +980,7 @@ void SetFamily::SUnion(CmdArgList args, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) { ArgSlice largs = t->ShardArgsInShard(shard->shard_id()); - result_set[shard->shard_id()] = OpUnion(OpArgs{shard, t->db_index()}, largs); + result_set[shard->shard_id()] = OpUnion(t->GetOpArgs(shard), largs); return OpStatus::OK; }; @@ -1012,7 +1011,7 @@ void SetFamily::SUnionStore(CmdArgList args, ConnectionContext* cntx) { if (largs.empty()) return OpStatus::OK; } - result_set[shard->shard_id()] = OpUnion(OpArgs{shard, t->db_index()}, largs); + result_set[shard->shard_id()] = OpUnion(t->GetOpArgs(shard), largs); return OpStatus::OK; }; @@ -1030,7 +1029,7 @@ void SetFamily::SUnionStore(CmdArgList args, ConnectionContext* cntx) { auto store_cb = [&](Transaction* t, EngineShard* shard) { if (shard->shard_id() == dest_shard) { - OpAdd(OpArgs{shard, t->db_index()}, dest_key, result, true); + OpAdd(t->GetOpArgs(shard), dest_key, result, true); } return OpStatus::OK; @@ -1055,7 +1054,7 @@ void SetFamily::SScan(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpScan(OpArgs{shard, t->db_index()}, key, &cursor); + return OpScan(t->GetOpArgs(shard), key, &cursor); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index f8ca84583..3caa9a9c6 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -494,8 +494,7 @@ void CreateGroup(CmdArgList args, string_view key, ConnectionContext* cntx) { opts.id = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpCreate(op_args, key, opts); + return OpCreate(t->GetOpArgs(shard), key, opts); }; OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -509,8 +508,7 @@ void CreateGroup(CmdArgList args, string_view key, ConnectionContext* cntx) { void DestroyGroup(string_view key, string_view gname, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpDestroyGroup(op_args, key, gname); + return OpDestroyGroup(t->GetOpArgs(shard), key, gname); }; OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -529,8 +527,7 @@ void DestroyGroup(string_view key, string_view gname, ConnectionContext* cntx) { void DelConsumer(string_view key, string_view gname, string_view consumer, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpDelConsumer(op_args, key, gname, consumer); + return OpDelConsumer(t->GetOpArgs(shard), key, gname, consumer); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -551,8 +548,7 @@ void SetId(string_view key, string_view gname, CmdArgList args, ConnectionContex string_view id = ArgS(args, 0); auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpSetId(op_args, key, gname, id); + return OpSetId(t->GetOpArgs(shard), key, gname, id); }; OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -607,8 +603,7 @@ void StreamFamily::XAdd(CmdArgList args, ConnectionContext* cntx) { args.remove_prefix(1); auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpAdd(op_args, key, add_opts, args); + return OpAdd(t->GetOpArgs(shard), key, add_opts, args); }; OpResult add_result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -641,8 +636,7 @@ void StreamFamily::XDel(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpDel(op_args, key, absl::Span{ids.data(), ids.size()}); + return OpDel(t->GetOpArgs(shard), key, absl::Span{ids.data(), ids.size()}); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -751,8 +745,7 @@ void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) { void StreamFamily::XLen(CmdArgList args, ConnectionContext* cntx) { string_view key = ArgS(args, 1); auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpLen(op_args, key); + return OpLen(t->GetOpArgs(shard), key); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -781,8 +774,7 @@ void StreamFamily::XSetId(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpSetId2(op_args, key, parsed_id.val); + return OpSetId2(t->GetOpArgs(shard), key, parsed_id.val); }; OpStatus result = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -835,8 +827,7 @@ void StreamFamily::XRangeGeneric(CmdArgList args, bool is_rev, ConnectionContext range_opts.is_rev = is_rev; auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpRange(op_args, key, range_opts); + return OpRange(t->GetOpArgs(shard), key, range_opts); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -876,4 +867,4 @@ void StreamFamily::Register(CommandRegistry* registry) { << CI{"XSETID", CO::WRITE | CO::DENYOOM, 3, 1, 1, 1}.HFUNC(XSetId); } -} // namespace dfly \ No newline at end of file +} // namespace dfly diff --git a/src/server/string_family.cc b/src/server/string_family.cc index c0eb64a86..da340fe4f 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -17,6 +17,7 @@ extern "C" { #include "server/engine_shard_set.h" #include "server/error.h" #include "server/io_mgr.h" +#include "server/journal/journal.h" #include "server/tiered_storage.h" #include "server/transaction.h" #include "util/varz.h" @@ -60,6 +61,12 @@ string_view GetSlice(EngineShard* shard, const PrimeValue& pv, string* tmp) { return pv.GetSlice(tmp); } +inline void RecordJournal(const OpArgs& op_args, const PrimeKey& pkey, const PrimeKey& pvalue) { + if (op_args.shard->journal()) { + op_args.shard->journal()->RecordEntry(op_args.txid, pkey, pvalue); + } +} + OpResult OpSetRange(const OpArgs& op_args, string_view key, size_t start, string_view value) { auto& db_slice = op_args.shard->db_slice(); @@ -90,9 +97,11 @@ OpResult OpSetRange(const OpArgs& op_args, string_view key, size_t sta db_slice.PreUpdate(op_args.db_ind, it); } + memcpy(s.data() + start, value.data(), value.size()); it->second.SetString(s); db_slice.PostUpdate(op_args.db_ind, it); + RecordJournal(op_args, it->first, it->second); return it->second.Size(); } @@ -129,110 +138,300 @@ OpResult OpGetRange(const OpArgs& op_args, string_view key, int32_t star return string(slice.substr(start, end - start + 1)); }; -} // namespace +// Returns the length of the extended string. if prepend is false - appends the val. +OpResult ExtendOrSet(const OpArgs& op_args, std::string_view key, std::string_view val, + bool prepend) { + auto* shard = op_args.shard; + auto& db_slice = shard->db_slice(); + auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key); + if (inserted) { + it->second.SetString(val); + db_slice.PostUpdate(op_args.db_ind, it); + RecordJournal(op_args, it->first, it->second); -SetCmd::SetCmd(DbSlice* db_slice) : db_slice_(*db_slice) { + return val.size(); + } + + if (it->second.ObjType() != OBJ_STRING) + return OpStatus::WRONG_TYPE; + + string tmp, new_val; + string_view slice = GetSlice(op_args.shard, it->second, &tmp); + if (prepend) + new_val = absl::StrCat(val, slice); + else + new_val = absl::StrCat(slice, val); + + db_slice.PreUpdate(op_args.db_ind, it); + it->second.SetString(new_val); + db_slice.PostUpdate(op_args.db_ind, it); + RecordJournal(op_args, it->first, it->second); + + return new_val.size(); } -SetCmd::~SetCmd() { +OpResult ExtendOrSkip(const OpArgs& op_args, std::string_view key, std::string_view val, + bool prepend) { + auto& db_slice = op_args.shard->db_slice(); + OpResult it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING); + if (!it_res) { + return false; + } + + CompactObj& cobj = (*it_res)->second; + + string tmp, new_val; + string_view slice = GetSlice(op_args.shard, cobj, &tmp); + if (prepend) + new_val = absl::StrCat(val, slice); + else + new_val = absl::StrCat(slice, val); + + db_slice.PreUpdate(op_args.db_ind, *it_res); + cobj.SetString(new_val); + db_slice.PostUpdate(op_args.db_ind, *it_res); + + return new_val.size(); } -OpResult SetCmd::Set(const SetParams& params, std::string_view key, std::string_view value) { - DCHECK_LT(params.db_index, db_slice_.db_array_size()); - DCHECK(db_slice_.IsDbValid(params.db_index)); +OpResult OpGet(const OpArgs& op_args, string_view key) { + OpResult it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_STRING); + if (!it_res.ok()) + return it_res.status(); - VLOG(2) << "Set " << key << "(" << db_slice_.shard_id() << ") "; + const PrimeValue& pv = it_res.value()->second; - if (params.how == SET_IF_EXISTS) { - auto [it, expire_it] = db_slice_.FindExt(params.db_index, key); + return GetString(op_args.shard, pv); +} - if (IsValid(it)) { // existing - return SetExisting(params, it, expire_it, value); +OpResult OpIncrFloat(const OpArgs& op_args, std::string_view key, double val) { + auto& db_slice = op_args.shard->db_slice(); + auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key); + + char buf[128]; + + if (inserted) { + char* str = RedisReplyBuilder::FormatDouble(val, buf, sizeof(buf)); + it->second.SetString(str); + db_slice.PostUpdate(op_args.db_ind, it); + RecordJournal(op_args, it->first, it->second); + + return val; + } + + if (it->second.ObjType() != OBJ_STRING) + return OpStatus::WRONG_TYPE; + + if (it->second.Size() == 0) + return OpStatus::INVALID_FLOAT; + + string tmp; + string_view slice = GetSlice(op_args.shard, it->second, &tmp); + + StringToDoubleConverter stod(StringToDoubleConverter::NO_FLAGS, 0, 0, NULL, NULL); + int processed_digits = 0; + double base = stod.StringToDouble(slice.data(), slice.size(), &processed_digits); + if (unsigned(processed_digits) != slice.size()) { + return OpStatus::INVALID_FLOAT; + } + + base += val; + + if (isnan(base) || isinf(base)) { + return OpStatus::INVALID_FLOAT; + } + + char* str = RedisReplyBuilder::FormatDouble(base, buf, sizeof(buf)); + + db_slice.PreUpdate(op_args.db_ind, it); + it->second.SetString(str); + db_slice.PostUpdate(op_args.db_ind, it); + RecordJournal(op_args, it->first, it->second); + + return base; +} + +// if skip_on_missing - returns KEY_NOTFOUND. +OpResult OpIncrBy(const OpArgs& op_args, std::string_view key, int64_t incr, + bool skip_on_missing) { + auto& db_slice = op_args.shard->db_slice(); + + // we avoid using AddOrFind because of skip_on_missing option for memcache. + auto [it, expire_it] = db_slice.FindExt(op_args.db_ind, key); + + if (!IsValid(it)) { + if (skip_on_missing) + return OpStatus::KEY_NOTFOUND; + + CompactObj cobj; + cobj.SetInt(incr); + + // AddNew calls PostUpdate inside. + try { + it = db_slice.AddNew(op_args.db_ind, key, std::move(cobj), 0); + } catch (bad_alloc&) { + return OpStatus::OUT_OF_MEMORY; } - return OpStatus::SKIPPED; + + RecordJournal(op_args, it->first, it->second); + + return incr; } - // New entry - tuple add_res; - try { - add_res = db_slice_.AddOrFind2(params.db_index, key); - } catch (bad_alloc& e) { - return OpStatus::OUT_OF_MEMORY; + if (it->second.ObjType() != OBJ_STRING) { + return OpStatus::WRONG_TYPE; } - PrimeIterator it = get<0>(add_res); - if (!get<2>(add_res)) { - return SetExisting(params, it, get<1>(add_res), value); + auto opt_prev = it->second.TryGetInt(); + if (!opt_prev) { + return OpStatus::INVALID_VALUE; } - // adding new value. - PrimeValue tvalue{value}; - tvalue.SetFlag(params.memcache_flags != 0); - it->second = std::move(tvalue); - db_slice_.PostUpdate(params.db_index, it); - - if (params.expire_after_ms) { - db_slice_.UpdateExpire(params.db_index, it, params.expire_after_ms + db_slice_.Now()); + long long prev = *opt_prev; + if ((incr < 0 && prev < 0 && incr < (LLONG_MIN - prev)) || + (incr > 0 && prev > 0 && incr > (LLONG_MAX - prev))) { + return OpStatus::OUT_OF_RANGE; } - if (params.memcache_flags) - db_slice_.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags); + int64_t new_val = prev + incr; + DCHECK(!it->second.IsExternal()); + db_slice.PreUpdate(op_args.db_ind, it); + it->second.SetInt(new_val); + db_slice.PostUpdate(op_args.db_ind, it); + RecordJournal(op_args, it->first, it->second); - EngineShard* shard = db_slice_.shard_owner(); + return new_val; +} - if (shard->tiered_storage()) { // external storage enabled. - if (value.size() >= kMinTieredLen) { - shard->tiered_storage()->UnloadItem(params.db_index, it); +// Returns true if keys were set, false otherwise. +OpStatus OpMSet(const OpArgs& op_args, ArgSlice args) { + DCHECK(!args.empty() && args.size() % 2 == 0); + + SetCmd::SetParams params{op_args.db_ind}; + SetCmd sg(op_args); + + for (size_t i = 0; i < args.size(); i += 2) { + DVLOG(1) << "MSet " << args[i] << ":" << args[i + 1]; + OpStatus res = sg.Set(params, args[i], args[i + 1]); + if (res != OpStatus::OK) { // OOM for example. + return res; } } return OpStatus::OK; } +} // namespace + +OpStatus SetCmd::Set(const SetParams& params, string_view key, string_view value) { + EngineShard* shard = op_args_.shard; + auto& db_slice = shard->db_slice(); + + DCHECK_LT(params.db_index, db_slice.db_array_size()); + DCHECK(db_slice.IsDbValid(params.db_index)); + + VLOG(2) << "Set " << key << "(" << db_slice.shard_id() << ") "; + + if (params.how == SET_IF_EXISTS) { + auto [it, expire_it] = db_slice.FindExt(params.db_index, key); + + if (IsValid(it)) { // existing + return SetExisting(params, it, expire_it, value); + } + + return OpStatus::SKIPPED; + } + + // Trying to add a new entry. + tuple add_res; + try { + add_res = db_slice.AddOrFind2(params.db_index, key); + } catch (bad_alloc& e) { + return OpStatus::OUT_OF_MEMORY; + } + + PrimeIterator it = get<0>(add_res); + if (!get<2>(add_res)) { // Existing. + return SetExisting(params, it, get<1>(add_res), value); + } + + // + // Adding new value. + PrimeValue tvalue{value}; + tvalue.SetFlag(params.memcache_flags != 0); + it->second = std::move(tvalue); + db_slice.PostUpdate(params.db_index, it); + + if (params.expire_after_ms) { + db_slice.UpdateExpire(params.db_index, it, params.expire_after_ms + db_slice.Now()); + } + + if (params.memcache_flags) + db_slice.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags); + + if (shard->tiered_storage()) { // external storage enabled. + // TODO: we may have a bug if we block the fiber inside UnloadItem - "it" may be invalid + // afterwards. + if (value.size() >= kMinTieredLen) { + shard->tiered_storage()->UnloadItem(params.db_index, it); + } + } + + RecordJournal(op_args_, it->first, it->second); + return OpStatus::OK; +} + OpStatus SetCmd::SetExisting(const SetParams& params, PrimeIterator it, ExpireIterator e_it, - std::string_view value) { + string_view value) { if (params.how == SET_IF_NOTEXIST) return OpStatus::SKIPPED; PrimeValue& prime_value = it->second; + EngineShard* shard = op_args_.shard; + if (params.prev_val) { if (prime_value.ObjType() != OBJ_STRING) return OpStatus::WRONG_TYPE; - string val = GetString(db_slice_.shard_owner(), prime_value); + string val = GetString(shard, prime_value); params.prev_val->emplace(move(val)); } - uint64_t at_ms = params.expire_after_ms ? params.expire_after_ms + db_slice_.Now() : 0; + DbSlice& db_slice = shard->db_slice(); + uint64_t at_ms = params.expire_after_ms ? params.expire_after_ms + db_slice.Now() : 0; if (IsValid(e_it) && at_ms) { - e_it->second = db_slice_.FromAbsoluteTime(at_ms); + e_it->second = db_slice.FromAbsoluteTime(at_ms); } else { - bool changed = db_slice_.UpdateExpire(params.db_index, it, at_ms); + // We need to update expiry, or maybe erase the object if it was expired. + bool changed = db_slice.UpdateExpire(params.db_index, it, at_ms); if (changed && at_ms == 0) // erased. - return OpStatus::OK; + return OpStatus::OK; // TODO: to update journal with deletion. } - db_slice_.PreUpdate(params.db_index, it); + db_slice.PreUpdate(params.db_index, it); // Check whether we need to update flags table. bool req_flag_update = (params.memcache_flags != 0) != prime_value.HasFlag(); if (req_flag_update) { prime_value.SetFlag(params.memcache_flags != 0); - db_slice_.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags); + db_slice.SetMCFlag(params.db_index, it->first.AsRef(), params.memcache_flags); } // overwrite existing entry. prime_value.SetString(value); if (value.size() >= kMinTieredLen) { // external storage enabled. - EngineShard* shard = db_slice_.shard_owner(); + // TODO: if UnloadItem can block the calling fiber, then we have the bug because then "it" + // can be invalid after the function returns and the functions that follow may access invalid + // entry. if (shard->tiered_storage()) { shard->tiered_storage()->UnloadItem(params.db_index, it); } } - db_slice_.PostUpdate(params.db_index, it); + db_slice.PostUpdate(params.db_index, it); + RecordJournal(op_args_, it->first, it->second); return OpStatus::OK; } @@ -261,7 +460,7 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) { builder->SendError(kSyntaxErr); } - std::string_view ex = ArgS(args, i); + string_view ex = ArgS(args, i); if (!absl::SimpleAtoi(ex, &int_arg)) { return builder->SendError(kInvalidIntErr); } @@ -291,9 +490,8 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) { DCHECK(cntx->transaction); auto cb = [&](Transaction* t, EngineShard* shard) { - SetCmd sg(&shard->db_slice()); - auto status = sg.Set(sparams, key, value).status(); - return status; + SetCmd sg(t->GetOpArgs(shard)); + return sg.Set(sparams, key, value); }; OpResult result = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -319,9 +517,7 @@ void StringFamily::Get(CmdArgList args, ConnectionContext* cntx) { std::string_view key = ArgS(args, 1); - auto cb = [&](Transaction* t, EngineShard* shard) { - return OpGet(OpArgs{shard, t->db_index()}, key); - }; + auto cb = [&](Transaction* t, EngineShard* shard) { return OpGet(t->GetOpArgs(shard), key); }; DVLOG(1) << "Before Get::ScheduleSingleHopT " << key; Transaction* trans = cntx->transaction; @@ -350,16 +546,15 @@ void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) { SetCmd::SetParams sparams{cntx->db_index()}; sparams.prev_val = &prev_val; - ShardId sid = Shard(key, shard_set->size()); - OpResult result = shard_set->Await(sid, [&] { - EngineShard* es = EngineShard::tlocal(); - SetCmd cmd(&es->db_slice()); + auto cb = [&](Transaction* t, EngineShard* shard) { + SetCmd cmd(t->GetOpArgs(shard)); return cmd.Set(sparams, key, value); - }); + }; + OpStatus status = cntx->transaction->ScheduleSingleHop(std::move(cb)); - if (!result) { - (*cntx)->SendError(result.status()); + if (status != OpStatus::OK) { + (*cntx)->SendError(status); return; } @@ -367,6 +562,7 @@ void StringFamily::GetSet(CmdArgList args, ConnectionContext* cntx) { (*cntx)->SendBulkString(*prev_val); return; } + return (*cntx)->SendNull(); } @@ -398,7 +594,7 @@ void StringFamily::IncrByFloat(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpIncrFloat(OpArgs{shard, t->db_index()}, key, val); + return OpIncrFloat(t->GetOpArgs(shard), key, val); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -444,7 +640,7 @@ void StringFamily::IncrByGeneric(std::string_view key, int64_t val, ConnectionCo bool skip_on_missing = cntx->protocol() == Protocol::MEMCACHE; auto cb = [&](Transaction* t, EngineShard* shard) { - OpResult res = OpIncrBy(OpArgs{shard, t->db_index()}, key, val, skip_on_missing); + OpResult res = OpIncrBy(t->GetOpArgs(shard), key, val, skip_on_missing); return res; }; @@ -477,7 +673,7 @@ void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContex if (cntx->protocol() == Protocol::REDIS) { auto cb = [&](Transaction* t, EngineShard* shard) { - return ExtendOrSet(OpArgs{shard, t->db_index()}, key, sval, prepend); + return ExtendOrSet(t->GetOpArgs(shard), key, sval, prepend); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -489,7 +685,7 @@ void StringFamily::ExtendGeneric(CmdArgList args, bool prepend, ConnectionContex DCHECK(cntx->protocol() == Protocol::MEMCACHE); auto cb = [&](Transaction* t, EngineShard* shard) { - return ExtendOrSkip(OpArgs{shard, t->db_index()}, key, sval, prepend); + return ExtendOrSkip(t->GetOpArgs(shard), key, sval, prepend); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -524,9 +720,8 @@ void StringFamily::SetExGeneric(bool seconds, CmdArgList args, ConnectionContext sparams.expire_after_ms = unit_vals; auto cb = [&](Transaction* t, EngineShard* shard) { - SetCmd sg(&shard->db_slice()); - auto status = sg.Set(sparams, key, value).status(); - return status; + SetCmd sg(t->GetOpArgs(shard)); + return sg.Set(sparams, key, value); }; OpResult result = cntx->transaction->ScheduleSingleHop(std::move(cb)); @@ -600,9 +795,9 @@ void StringFamily::MSet(CmdArgList args, ConnectionContext* cntx) { LOG(INFO) << "MSET/" << transaction->unique_shard_cnt() << str; } - auto cb = [&](Transaction* t, EngineShard* es) { - auto args = t->ShardArgsInShard(es->shard_id()); - return OpMSet(OpArgs{es, t->db_index()}, args); + auto cb = [&](Transaction* t, EngineShard* shard) { + auto args = t->ShardArgsInShard(shard->shard_id()); + return OpMSet(t->GetOpArgs(shard), args); }; OpStatus status = transaction->ScheduleSingleHop(std::move(cb)); @@ -635,12 +830,13 @@ void StringFamily::MSetNx(CmdArgList args, ConnectionContext* cntx) { transaction->Execute(std::move(cb), false); bool to_skip = exists.load(memory_order_relaxed) == true; - auto epilog_cb = [&](Transaction* t, EngineShard* es) { + + auto epilog_cb = [&](Transaction* t, EngineShard* shard) { if (to_skip) return OpStatus::OK; - auto args = t->ShardArgsInShard(es->shard_id()); - return OpMSet(OpArgs{es, t->db_index()}, std::move(args)); + auto args = t->ShardArgsInShard(shard->shard_id()); + return OpMSet(t->GetOpArgs(shard), std::move(args)); }; transaction->Execute(std::move(epilog_cb), true); @@ -680,7 +876,7 @@ void StringFamily::GetRange(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpGetRange(OpArgs{shard, t->db_index()}, key, start, end); + return OpGetRange(t->GetOpArgs(shard), key, start, end); }; Transaction* trans = cntx->transaction; @@ -713,7 +909,7 @@ void StringFamily::SetRange(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) -> OpResult { - return OpSetRange(OpArgs{shard, t->db_index()}, key, start, value); + return OpSetRange(t->GetOpArgs(shard), key, start, value); }; Transaction* trans = cntx->transaction; @@ -758,176 +954,6 @@ auto StringFamily::OpMGet(bool fetch_mcflag, bool fetch_mcver, const Transaction return response; } -OpStatus StringFamily::OpMSet(const OpArgs& op_args, ArgSlice args) { - DCHECK(!args.empty() && args.size() % 2 == 0); - - SetCmd::SetParams params{op_args.db_ind}; - SetCmd sg(&op_args.shard->db_slice()); - - for (size_t i = 0; i < args.size(); i += 2) { - DVLOG(1) << "MSet " << args[i] << ":" << args[i + 1]; - auto res = sg.Set(params, args[i], args[i + 1]); - if (!res) { // OOM for example. - return res.status(); - } - } - - return OpStatus::OK; -} - -OpResult StringFamily::OpIncrBy(const OpArgs& op_args, std::string_view key, int64_t incr, - bool skip_on_missing) { - auto& db_slice = op_args.shard->db_slice(); - - // we avoid using AddOrFind because of skip_on_missing option for memcache. - auto [it, expire_it] = db_slice.FindExt(op_args.db_ind, key); - - if (!IsValid(it)) { - if (skip_on_missing) - return OpStatus::KEY_NOTFOUND; - - CompactObj cobj; - cobj.SetInt(incr); - - try { - db_slice.AddNew(op_args.db_ind, key, std::move(cobj), 0); - } catch (bad_alloc&) { - return OpStatus::OUT_OF_MEMORY; - } - return incr; - } - - if (it->second.ObjType() != OBJ_STRING) { - return OpStatus::WRONG_TYPE; - } - - auto opt_prev = it->second.TryGetInt(); - if (!opt_prev) { - return OpStatus::INVALID_VALUE; - } - - long long prev = *opt_prev; - if ((incr < 0 && prev < 0 && incr < (LLONG_MIN - prev)) || - (incr > 0 && prev > 0 && incr > (LLONG_MAX - prev))) { - return OpStatus::OUT_OF_RANGE; - } - - int64_t new_val = prev + incr; - DCHECK(!it->second.IsExternal()); - db_slice.PreUpdate(op_args.db_ind, it); - it->second.SetInt(new_val); - db_slice.PostUpdate(op_args.db_ind, it); - return new_val; -} - -OpResult StringFamily::OpIncrFloat(const OpArgs& op_args, std::string_view key, - double val) { - auto& db_slice = op_args.shard->db_slice(); - auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key); - - char buf[128]; - - if (inserted) { - char* str = RedisReplyBuilder::FormatDouble(val, buf, sizeof(buf)); - it->second.SetString(str); - db_slice.PostUpdate(op_args.db_ind, it); - - return val; - } - - if (it->second.ObjType() != OBJ_STRING) - return OpStatus::WRONG_TYPE; - - if (it->second.Size() == 0) - return OpStatus::INVALID_FLOAT; - - string tmp; - string_view slice = GetSlice(op_args.shard, it->second, &tmp); - - StringToDoubleConverter stod(StringToDoubleConverter::NO_FLAGS, 0, 0, NULL, NULL); - int processed_digits = 0; - double base = stod.StringToDouble(slice.data(), slice.size(), &processed_digits); - if (unsigned(processed_digits) != slice.size()) { - return OpStatus::INVALID_FLOAT; - } - - base += val; - - if (isnan(base) || isinf(base)) { - return OpStatus::INVALID_FLOAT; - } - - char* str = RedisReplyBuilder::FormatDouble(base, buf, sizeof(buf)); - - db_slice.PreUpdate(op_args.db_ind, it); - it->second.SetString(str); - db_slice.PostUpdate(op_args.db_ind, it); - - return base; -} - -OpResult StringFamily::ExtendOrSet(const OpArgs& op_args, std::string_view key, - std::string_view val, bool prepend) { - auto& db_slice = op_args.shard->db_slice(); - auto [it, inserted] = db_slice.AddOrFind(op_args.db_ind, key); - if (inserted) { - it->second.SetString(val); - db_slice.PostUpdate(op_args.db_ind, it); - - return val.size(); - } - - if (it->second.ObjType() != OBJ_STRING) - return OpStatus::WRONG_TYPE; - - string tmp, new_val; - string_view slice = GetSlice(op_args.shard, it->second, &tmp); - if (prepend) - new_val = absl::StrCat(val, slice); - else - new_val = absl::StrCat(slice, val); - - db_slice.PreUpdate(op_args.db_ind, it); - it->second.SetString(new_val); - db_slice.PostUpdate(op_args.db_ind, it); - - return new_val.size(); -} - -OpResult StringFamily::ExtendOrSkip(const OpArgs& op_args, std::string_view key, - std::string_view val, bool prepend) { - auto& db_slice = op_args.shard->db_slice(); - OpResult it_res = db_slice.Find(op_args.db_ind, key, OBJ_STRING); - if (!it_res) { - return false; - } - - CompactObj& cobj = (*it_res)->second; - - string tmp, new_val; - string_view slice = GetSlice(op_args.shard, cobj, &tmp); - if (prepend) - new_val = absl::StrCat(val, slice); - else - new_val = absl::StrCat(slice, val); - - db_slice.PreUpdate(op_args.db_ind, *it_res); - cobj.SetString(new_val); - db_slice.PostUpdate(op_args.db_ind, *it_res); - - return new_val.size(); -} - -OpResult StringFamily::OpGet(const OpArgs& op_args, string_view key) { - OpResult it_res = op_args.shard->db_slice().Find(op_args.db_ind, key, OBJ_STRING); - if (!it_res.ok()) - return it_res.status(); - - const PrimeValue& pv = it_res.value()->second; - - return GetString(op_args.shard, pv); -} - void StringFamily::Init(util::ProactorPool* pp) { set_qps.Init(pp); get_qps.Init(pp); diff --git a/src/server/string_family.h b/src/server/string_family.h index 8e1aad17e..685d4efd3 100644 --- a/src/server/string_family.h +++ b/src/server/string_family.h @@ -16,11 +16,10 @@ using facade::OpResult; using facade::OpStatus; class SetCmd { - DbSlice& db_slice_; + const OpArgs op_args_; public: - explicit SetCmd(DbSlice* db_slice); - ~SetCmd(); + explicit SetCmd(const OpArgs& op_args) : op_args_(op_args) {} enum SetHow { SET_ALWAYS, SET_IF_NOTEXIST, SET_IF_EXISTS }; @@ -38,7 +37,7 @@ class SetCmd { } }; - OpResult Set(const SetParams& params, std::string_view key, std::string_view value); + OpStatus Set(const SetParams& params, std::string_view key, std::string_view value); private: OpStatus SetExisting(const SetParams& params, PrimeIterator it, ExpireIterator e_it, @@ -86,24 +85,6 @@ class StringFamily { using MGetResponse = std::vector>; static MGetResponse OpMGet(bool fetch_mcflag, bool fetch_mcver, const Transaction* t, EngineShard* shard); - - // Returns true if keys were set, false otherwise. - static OpStatus OpMSet(const OpArgs& op_args, ArgSlice args); - - // if skip_on_missing - returns KEY_NOTFOUND. - static OpResult OpIncrBy(const OpArgs& op_args, std::string_view key, int64_t val, - bool skip_on_missing); - static OpResult OpIncrFloat(const OpArgs& op_args, std::string_view key, double val); - - // Returns the length of the extended string. if prepend is false - appends the val. - static OpResult ExtendOrSet(const OpArgs& op_args, std::string_view key, - std::string_view val, bool prepend); - - // Returns true if was extended, false if the key was not found. - static OpResult ExtendOrSkip(const OpArgs& op_args, std::string_view key, - std::string_view val, bool prepend); - - static OpResult OpGet(const OpArgs& op_args, std::string_view key); }; } // namespace dfly diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 950126729..09881009a 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -11,6 +11,8 @@ #include "server/command_registry.h" #include "server/db_slice.h" #include "server/engine_shard_set.h" +#include "server/journal/journal.h" +#include "server/server_state.h" namespace dfly { @@ -334,9 +336,14 @@ bool Transaction::RunInShard(EngineShard* shard) { /*************************************************************************/ // Actually running the callback. + // If you change the logic here, also please change the logic try { // if transaction is suspended (blocked in watched queue), then it's a noop. - OpStatus status = was_suspended ? OpStatus::OK : cb_(this, shard); + OpStatus status = OpStatus::OK; + + if (!was_suspended) { + status = cb_(this, shard); + } if (unique_shard_cnt_ == 1) { cb_ = nullptr; // We can do it because only a single thread runs the callback. @@ -467,6 +474,13 @@ void Transaction::ScheduleInternal() { VLOG(2) << "Scheduled " << DebugId() << " OutOfOrder: " << bool(coordinator_state_ & COORD_OOO) << " num_shards: " << num_shards; + + if (mode == IntentLock::EXCLUSIVE) { + journal::Journal* j = ServerState::tlocal()->journal(); + // TODO: we may want to pass custom command name into journal. + if (j && j->SchedStartTx(txid_, 0, num_shards)) { + } + } coordinator_state_ |= COORD_SCHED; break; } diff --git a/src/server/transaction.h b/src/server/transaction.h index 8983586f5..24d01df26 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -179,6 +179,10 @@ class Transaction { //! Runs in the shard thread. KeyLockArgs GetLockArgs(ShardId sid) const; + OpArgs GetOpArgs(EngineShard* shard) const { + return OpArgs{shard, txid_, db_index_}; + } + private: struct LockCnt { diff --git a/src/server/zset_family.cc b/src/server/zset_family.cc index 96e7c17ad..c4e6320fb 100644 --- a/src/server/zset_family.cc +++ b/src/server/zset_family.cc @@ -970,8 +970,7 @@ void ZSetFamily::ZAdd(CmdArgList args, ConnectionContext* cntx) { absl::Span memb_sp{members.data(), members.size()}; auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpAdd(op_args, zparams, key, memb_sp); + return OpAdd(t->GetOpArgs(shard), zparams, key, memb_sp); }; OpResult add_result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1031,8 +1030,7 @@ void ZSetFamily::ZCount(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpCount(op_args, key, si); + return OpCount(t->GetOpArgs(shard), key, si); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1063,8 +1061,7 @@ void ZSetFamily::ZIncrBy(CmdArgList args, ConnectionContext* cntx) { zparams.flags = ZADD_IN_INCR; auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpAdd(op_args, zparams, key, ScoredMemberSpan{&scored_member, 1}); + return OpAdd(t->GetOpArgs(shard), zparams, key, ScoredMemberSpan{&scored_member, 1}); }; OpResult add_result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1139,7 +1136,7 @@ void ZSetFamily::ZInterStore(CmdArgList args, ConnectionContext* cntx) { ZParams zparams; zparams.override = true; add_result = - OpAdd(OpArgs{shard, t->db_index()}, zparams, dest_key, ScoredMemberSpan{smvec}).value(); + OpAdd(t->GetOpArgs(shard), zparams, dest_key, ScoredMemberSpan{smvec}).value(); } return OpStatus::OK; }; @@ -1161,8 +1158,7 @@ void ZSetFamily::ZLexCount(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpLexCount(op_args, key, li); + return OpLexCount(t->GetOpArgs(shard), key, li); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1237,8 +1233,7 @@ void ZSetFamily::ZRangeByLex(CmdArgList args, ConnectionContext* cntx) { range_spec.interval = li; auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpRange(range_spec, op_args, key); + return OpRange(range_spec, t->GetOpArgs(shard), key); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1318,8 +1313,7 @@ void ZSetFamily::ZRem(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpRem(op_args, key, members); + return OpRem(t->GetOpArgs(shard), key, members); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1335,8 +1329,7 @@ void ZSetFamily::ZScore(CmdArgList args, ConnectionContext* cntx) { string_view member = ArgS(args, 2); auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpScore(op_args, key, member); + return OpScore(t->GetOpArgs(shard), key, member); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1364,7 +1357,7 @@ void ZSetFamily::ZScan(CmdArgList args, ConnectionContext* cntx) { } auto cb = [&](Transaction* t, EngineShard* shard) { - return OpScan(OpArgs{shard, t->db_index()}, key, &cursor); + return OpScan(t->GetOpArgs(shard), key, &cursor); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1427,7 +1420,7 @@ void ZSetFamily::ZUnionStore(CmdArgList args, ConnectionContext* cntx) { ZParams zparams; zparams.override = true; add_result = - OpAdd(OpArgs{shard, t->db_index()}, zparams, dest_key, ScoredMemberSpan{smvec}).value(); + OpAdd(t->GetOpArgs(shard), zparams, dest_key, ScoredMemberSpan{smvec}).value(); } return OpStatus::OK; }; @@ -1449,8 +1442,7 @@ void ZSetFamily::ZRangeByScoreInternal(string_view key, string_view min_s, strin range_spec.interval = si; auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpRange(range_spec, op_args, key); + return OpRange(range_spec, t->GetOpArgs(shard), key); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1480,8 +1472,7 @@ void ZSetFamily::OutputScoredArrayResult(const OpResult& result, void ZSetFamily::ZRemRangeGeneric(string_view key, const ZRangeSpec& range_spec, ConnectionContext* cntx) { auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpRemRange(op_args, key, range_spec); + return OpRemRange(t->GetOpArgs(shard), key, range_spec); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1531,8 +1522,7 @@ void ZSetFamily::ZRangeGeneric(CmdArgList args, bool reverse, ConnectionContext* range_spec.interval = ii; auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpRange(range_spec, op_args, key); + return OpRange(range_spec, t->GetOpArgs(shard), key); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb)); @@ -1544,9 +1534,8 @@ void ZSetFamily::ZRankGeneric(CmdArgList args, bool reverse, ConnectionContext* string_view member = ArgS(args, 2); auto cb = [&](Transaction* t, EngineShard* shard) { - OpArgs op_args{shard, t->db_index()}; - return OpRank(op_args, key, member, reverse); + return OpRank(t->GetOpArgs(shard), key, member, reverse); }; OpResult result = cntx->transaction->ScheduleSingleHopT(std::move(cb));