Add string_family_test.cc unit test

Add unit test helper code that we will leverage for writing more unit tests.
Numerous fixes in the transactional code. Allow controlling of expiration clock
in unit-test environment with configurable task synchronization. The latter is enabled
for prod and disable in the tests.
This commit is contained in:
Roman Gershman 2022-01-03 11:20:08 +02:00
parent d64b4e01ea
commit a4d8ded6ce
13 changed files with 605 additions and 206 deletions

2
helio

@ -1 +1 @@
Subproject commit dab0fc768494d8e7cc42d77b08fbee1a81415072
Subproject commit b4bfdd0b8e67b293e98d5d3ce92c60ed05646b61

View file

@ -14,4 +14,5 @@ add_library(dfly_test_lib test_utils.cc)
cxx_link(dfly_test_lib dragonfly_lib gtest_main_ext)
cxx_test(redis_parser_test dfly_test_lib LABELS DFLY)
cxx_test(string_family_test dfly_test_lib LABELS DFLY)
cxx_test(memcache_parser_test dfly_test_lib LABELS DFLY)

View file

@ -36,6 +36,13 @@ class ConnectionContext : public ReplyBuilder {
public:
ConnectionContext(::io::Sink* stream, Connection* owner);
struct DebugInfo {
uint32_t shards_count = 0;
TxClock clock = 0;
};
DebugInfo last_command_debug;
// TODO: to introduce proper accessors.
Transaction* transaction = nullptr;
const CommandId* cid = nullptr;

View file

@ -19,19 +19,22 @@ namespace fibers = ::boost::fibers;
thread_local EngineShard* EngineShard::shard_ = nullptr;
constexpr size_t kQueueLen = 64;
EngineShard::EngineShard(util::ProactorBase* pb)
: queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }), db_slice_(pb->GetIndex(), this) {
EngineShard::EngineShard(util::ProactorBase* pb, bool update_db_time)
: queue_(kQueueLen), txq_([](const Transaction* t) { return t->txid(); }),
db_slice_(pb->GetIndex(), this) {
fiber_q_ = fibers::fiber([this, index = pb->GetIndex()] {
this_fiber::properties<FiberProps>().set_name(absl::StrCat("shard_queue", index));
queue_.Run();
});
periodic_task_ = pb->AddPeriodic(1, [] {
auto* shard = EngineShard::tlocal();
DCHECK(shard);
// absl::GetCurrentTimeNanos() returns current time since the Unix Epoch.
shard->db_slice().UpdateExpireClock(absl::GetCurrentTimeNanos() / 1000000);
});
if (update_db_time) {
periodic_task_ = pb->AddPeriodic(1, [] {
auto* shard = EngineShard::tlocal();
DCHECK(shard);
// absl::GetCurrentTimeNanos() returns current time since the Unix Epoch.
shard->db_slice().UpdateExpireClock(absl::GetCurrentTimeNanos() / 1000000);
});
}
tmp_str = sdsempty();
}
@ -45,9 +48,9 @@ EngineShard::~EngineShard() {
}
}
void EngineShard::InitThreadLocal(ProactorBase* pb) {
void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
CHECK(shard_ == nullptr) << pb->GetIndex();
shard_ = new EngineShard(pb);
shard_ = new EngineShard(pb, update_db_time);
}
void EngineShard::DestroyThreadLocal() {
@ -61,33 +64,29 @@ void EngineShard::DestroyThreadLocal() {
VLOG(1) << "Shard reset " << index;
}
void EngineShard::RunContinuationTransaction() {
auto sid = shard_id();
if (continuation_trans_->IsArmedInShard(sid)) {
bool to_keep = continuation_trans_->RunInShard(sid);
DVLOG(1) << "RunContTransaction " << continuation_trans_->DebugId() << " keep: " << to_keep;
if (!to_keep) {
continuation_trans_ = nullptr;
}
}
}
// Is called by Transaction::ExecuteAsync in order to run transaction tasks.
// Only runs in its own thread.
void EngineShard::Execute(Transaction* trans) {
void EngineShard::PollExecution(Transaction* trans) {
DVLOG(1) << "PollExecution " << (trans ? trans->DebugId() : "");
ShardId sid = shard_id();
if (continuation_trans_) {
if (trans == continuation_trans_)
trans = nullptr;
RunContinuationTransaction();
// Once we start executing transaction we do not continue until it's finished.
// This preserves atomicity property of multi-hop transactions.
if (continuation_trans_)
if (continuation_trans_->IsArmedInShard(sid)) {
bool to_keep = continuation_trans_->RunInShard(this);
DVLOG(1) << "RunContTrans: " << continuation_trans_->DebugId() << " keep: " << to_keep;
if (!to_keep) {
continuation_trans_ = nullptr;
}
}
if (continuation_trans_) {
// Once we start executing transaction we do not continue until it's finished.
// This preserves atomicity property of multi-hop transactions.
return;
}
}
DCHECK(!continuation_trans_);
@ -98,6 +97,8 @@ void EngineShard::Execute(Transaction* trans) {
auto val = txq_.Front();
head = absl::get<Transaction*>(val);
// The fact that Tx is in the queue, already means that coordinator fiber will not progress,
// hence here it's enough to test for run_count and check local_mask.
bool is_armed = head->IsArmedInShard(sid);
if (!is_armed)
break;
@ -112,16 +113,18 @@ void EngineShard::Execute(Transaction* trans) {
trans = nullptr;
TxId txid = head->txid();
// Could be equal to ts in case the same transaction had few hops.
DCHECK_LE(committed_txid_, txid);
DCHECK_LT(committed_txid_, txid);
// We update committed_ts_ before calling Run() to avoid cases where a late transaction might
// try to push back this one.
// We update committed_txid_ before calling RunInShard() to avoid cases where
// a transaction stalls the execution with IO while another fiber queries this shard for
// committed_txid_ (for example during the scheduling).
committed_txid_ = txid;
if (VLOG_IS_ON(2)) {
dbg_id = head->DebugId();
}
bool keep = head->RunInShard(sid);
bool keep = head->RunInShard(this);
DCHECK(head == absl::get<Transaction*>(txq_.Front()));
// We should not access head from this point since RunInShard callback decrements refcount.
DLOG_IF(INFO, !dbg_id.empty()) << "RunHead " << dbg_id << ", keep " << keep;
txq_.PopFront();
@ -135,13 +138,12 @@ void EngineShard::Execute(Transaction* trans) {
if (!trans)
return;
if (txq_.Empty())
return;
uint16_t local_mask = trans->GetLocalMask(sid);
// If trans is out of order, i.e. locks keys that previous transactions have not locked.
// It may be that there are other transactions that touch those keys but they necessary ordered
// after trans in the queue, hence it's safe to run trans out of order.
if (trans->IsOutOfOrder() && trans->IsArmedInShard(sid)) {
if (local_mask & Transaction::OUT_OF_ORDER) {
DCHECK(trans != head);
dbg_id.clear();
@ -151,7 +153,7 @@ void EngineShard::Execute(Transaction* trans) {
dbg_id = trans->DebugId();
}
bool keep = trans->RunInShard(sid); // resets TxQueuePos, this is why we get it before.
bool keep = trans->RunInShard(this); // resets TxQueuePos, this is why we get it before.
DLOG_IF(INFO, !dbg_id.empty()) << "Eager run " << sid << ", " << dbg_id << ", keep " << keep;
// Should be enforced via Schedule(). TODO: to remove the check once the code is mature.
@ -166,10 +168,10 @@ void EngineShardSet::Init(uint32_t sz) {
shard_queue_.resize(sz);
}
void EngineShardSet::InitThreadLocal(ProactorBase* pb) {
EngineShard::InitThreadLocal(pb);
void EngineShardSet::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
EngineShard::InitThreadLocal(pb, update_db_time);
EngineShard* es = EngineShard::tlocal();
shard_queue_[es->shard_id()] = es->GetQueue();
shard_queue_[es->shard_id()] = es->GetFiberQueue();
}
} // namespace dfly

View file

@ -23,7 +23,10 @@ class EngineShard {
// EngineShard() is private down below.
~EngineShard();
static void InitThreadLocal(util::ProactorBase* pb);
// Sets up a new EngineShard in the thread.
// If update_db_time is true, initializes periodic time update for its db_slice.
static void InitThreadLocal(util::ProactorBase* pb, bool update_db_time);
static void DestroyThreadLocal();
static EngineShard* tlocal() {
@ -42,12 +45,13 @@ class EngineShard {
return db_slice_;
}
::util::fibers_ext::FiberQueue* GetQueue() {
::util::fibers_ext::FiberQueue* GetFiberQueue() {
return &queue_;
}
// Executes a transaction. This transaction is pending in the queue.
void Execute(Transaction* trans);
// Processes TxQueue, blocked transactions or any other execution state related to that
// shard. Tries executing the passed transaction if possible (does not guarantee though).
void PollExecution(Transaction* trans);
// Returns transaction queue.
TxQueue* txq() {
@ -65,9 +69,7 @@ class EngineShard {
sds tmp_str;
private:
EngineShard(util::ProactorBase* pb);
void RunContinuationTransaction();
EngineShard(util::ProactorBase* pb, bool update_db_time);
::util::fibers_ext::FiberQueue queue_;
::boost::fibers::fiber fiber_q_;
@ -98,22 +100,25 @@ class EngineShardSet {
}
void Init(uint32_t size);
void InitThreadLocal(util::ProactorBase* pb);
void InitThreadLocal(util::ProactorBase* pb, bool update_db_time);
// Uses a shard queue to dispatch. Callback runs in a dedicated fiber.
template <typename F> auto Await(ShardId sid, F&& f) {
return shard_queue_[sid]->Await(std::forward<F>(f));
}
// Uses a shard queue to dispatch. Callback runs in a dedicated fiber.
template <typename F> auto Add(ShardId sid, F&& f) {
assert(sid < shard_queue_.size());
return shard_queue_[sid]->Add(std::forward<F>(f));
}
// Runs a brief function on all shards.
// Runs a brief function on all shards. Waits for it to complete.
template <typename U> void RunBriefInParallel(U&& func) {
RunBriefInParallel(std::forward<U>(func), [](auto i) { return true; });
}
// Runs a brief function on selected shards. Waits for it to complete.
template <typename U, typename P> void RunBriefInParallel(U&& func, P&& pred);
template <typename U> void RunBlockingInParallel(U&& func);
@ -145,8 +150,8 @@ template <typename U> void EngineShardSet::RunBlockingInParallel(U&& func) {
for (uint32_t i = 0; i < size(); ++i) {
util::ProactorBase* dest = pp_->at(i);
dest->AsyncFiber([f = std::forward<U>(func), bc]() mutable {
f(EngineShard::tlocal());
dest->AsyncFiber([func, bc]() mutable {
func(EngineShard::tlocal());
bc.Dec();
});
}

View file

@ -4,6 +4,10 @@
#include "server/main_service.h"
extern "C" {
#include "redis/redis_aux.h"
}
#include <absl/strings/ascii.h>
#include <xxhash.h>
@ -42,24 +46,32 @@ DEFINE_VARZ(VarzQps, ping_qps);
std::optional<VarzFunction> engine_varz;
metrics::CounterFamily cmd_req("requests_total", "Number of served redis requests");
constexpr size_t kMaxThreadSize = 1024;
} // namespace
Service::Service(ProactorPool* pp) : shard_set_(pp), pp_(*pp) {
CHECK(pp);
// We support less than 1024 threads.
CHECK_LT(pp->size(), kMaxThreadSize);
RegisterCommands();
engine_varz.emplace("engine", [this] { return GetVarzStats(); });
}
Service::~Service() {
}
void Service::Init(util::AcceptServer* acceptor) {
void Service::Init(util::AcceptServer* acceptor, const InitOpts& opts) {
InitRedisTables();
uint32_t shard_num = pp_.size() > 1 ? pp_.size() - 1 : pp_.size();
shard_set_.Init(shard_num);
pp_.AwaitOnAll([&](uint32_t index, ProactorBase* pb) {
if (index < shard_count()) {
shard_set_.InitThreadLocal(pb);
shard_set_.InitThreadLocal(pb, !opts.disable_time_update);
}
});
@ -78,6 +90,7 @@ void Service::Shutdown() {
ping_qps.Shutdown();
StringFamily::Shutdown();
GenericFamily::Shutdown();
cmd_req.Shutdown();
shard_set_.RunBlockingInParallel([&](EngineShard*) { EngineShard::DestroyThreadLocal(); });
}
@ -115,7 +128,8 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) {
cntx->transaction = dist_trans.get();
if (cid->first_key_pos() > 0) {
dist_trans->InitByArgs(args);
dist_trans->InitByArgs(cntx->conn_state.db_index, args);
cntx->last_command_debug.shards_count = cntx->transaction->unique_shard_cnt();
}
} else {
cntx->transaction = nullptr;
@ -127,6 +141,9 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) {
end_usec = ProactorBase::GetMonotonicTimeNs();
request_latency_usec.IncBy(cmd_str, (end_usec - start_usec) / 1000);
if (dist_trans) {
cntx->last_command_debug.clock = dist_trans->txid();
}
}
void Service::DispatchMC(const MemcacheParser::Command& cmd, std::string_view value,

View file

@ -20,12 +20,19 @@ class Service {
public:
using error_code = std::error_code;
struct InitOpts {
bool disable_time_update;
InitOpts() : disable_time_update{false} {
}
};
explicit Service(util::ProactorPool* pp);
~Service();
void RegisterHttp(util::HttpListenerBase* listener);
void Init(util::AcceptServer* acceptor);
void Init(util::AcceptServer* acceptor, const InitOpts& opts = InitOpts{});
void Shutdown();

View file

@ -151,9 +151,6 @@ void ReplyBuilder::SendError(OpStatus status) {
case OpStatus::OK:
SendOk();
break;
case OpStatus::KEY_NOTFOUND:
SendError("no such key");
break;
default:
LOG(ERROR) << "Unsupported status " << status;
SendError("Internal error");

View file

@ -0,0 +1,130 @@
// Copyright 2021, Roman Gershman. All rights reserved.
// See LICENSE for licensing terms.
//
#include "server/string_family.h"
#include "base/gtest.h"
#include "base/logging.h"
#include "server/command_registry.h"
#include "server/conn_context.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/test_utils.h"
#include "server/transaction.h"
#include "util/uring/uring_pool.h"
using namespace testing;
using namespace std;
using namespace util;
using absl::StrCat;
namespace dfly {
class StringFamilyTest : public BaseFamilyTest {
protected:
};
TEST_F(StringFamilyTest, SetGet) {
auto resp = Run({"set", "key", "val"});
EXPECT_THAT(resp, RespEq("OK"));
EXPECT_THAT(Run({"get", "key"}), RespEq("val"));
EXPECT_THAT(Run({"set", "key1", "1"}), RespEq("OK"));
EXPECT_THAT(Run({"get", "key1"}), RespEq("1"));
EXPECT_THAT(Run({"set", "key", "2"}), RespEq("OK"));
EXPECT_THAT(Run({"get", "key"}), RespEq("2"));
}
TEST_F(StringFamilyTest, Expire) {
constexpr uint64_t kNow = 232279092000;
UpdateTime(kNow);
ASSERT_THAT(Run({"set", "key", "val", "PX", "20"}), RespEq("OK"));
UpdateTime(kNow + 10);
EXPECT_THAT(Run({"get", "key"}), RespEq("val"));
UpdateTime(kNow + 20);
EXPECT_THAT(Run({"get", "key"}), ElementsAre(ArgType(RespExpr::NIL)));
ASSERT_THAT(Run({"set", "i", "1", "PX", "10"}), RespEq("OK"));
}
TEST_F(StringFamilyTest, Set) {
auto resp = Run({"set", "foo", "bar", "XX"});
EXPECT_THAT(resp, ElementsAre(ArgType(RespExpr::NIL)));
resp = Run({"set", "foo", "bar", "NX"});
ASSERT_THAT(resp, RespEq("OK"));
resp = Run({"set", "foo", "bar", "NX"});
EXPECT_THAT(resp, ElementsAre(ArgType(RespExpr::NIL)));
resp = Run({"set", "foo", "bar", "xx"});
ASSERT_THAT(resp, RespEq("OK"));
resp = Run({"set", "foo", "bar", "ex", "1"});
ASSERT_THAT(resp, RespEq("OK"));
}
TEST_F(StringFamilyTest, MGetSet) {
Run({"mset", "x", "0", "b", "0"});
ASSERT_EQ(2, GetDebugInfo("IO0").shards_count);
auto mget_fb = pp_->at(0)->LaunchFiber([&] {
for (size_t i = 0; i < 1000; ++i) {
auto resp = Run({"mget", "b", "x"});
ASSERT_EQ(2, resp.size());
auto ivec = ToIntArr(resp);
ASSERT_GE(ivec[1], ivec[0]);
}
});
auto set_fb = pp_->at(1)->LaunchFiber([&] {
for (size_t i = 1; i < 2000; ++i) {
Run({"set", "x", StrCat(i)});
Run({"set", "b", StrCat(i)});
}
});
mget_fb.join();
set_fb.join();
}
TEST_F(StringFamilyTest, IntKey) {
Run({"mset", "1", "1", "-1000", "-1000"});
auto resp = Run({"get", "1"});
ASSERT_THAT(resp, RespEq("1"));
}
TEST_F(StringFamilyTest, SingleShard) {
Run({"mset", "x", "1", "y", "1"});
ASSERT_EQ(1, GetDebugInfo("IO0").shards_count);
Run({"mget", "x", "y", "b"});
ASSERT_EQ(2, GetDebugInfo("IO0").shards_count);
auto resp = Run({"mget", "x", "y"});
ASSERT_EQ(1, GetDebugInfo("IO0").shards_count);
ASSERT_THAT(ToIntArr(resp), ElementsAre(1, 1));
auto mset_fb = pp_->at(0)->LaunchFiber([&] {
for (size_t i = 0; i < 100; ++i) {
Run({"mset", "x", "0", "y", "0"});
}
});
// Specially multiple shards to avoid fast-path.
auto mget_fb = pp_->at(1)->LaunchFiber([&] {
for (size_t i = 0; i < 100; ++i) {
Run({"mget", "x", "b", "y"});
}
});
mset_fb.join();
mget_fb.join();
}
} // namespace dfly

View file

@ -7,6 +7,7 @@
#include <absl/strings/match.h>
#include "base/logging.h"
#include "server/dragonfly_connection.h"
#include "util/uring/uring_pool.h"
namespace dfly {
@ -29,7 +30,7 @@ bool RespMatcher::MatchAndExplain(const RespExpr& e, MatchResultListener* listen
*listener << "Actual does not contain '" << exp_str_ << "'";
return false;
}
if (type_ == RespExpr::STRING && exp_str_ != actual) {
if (type_ == RespExpr::STRING && exp_str_ != actual) {
*listener << "\nActual string: " << actual;
return false;
}
@ -111,4 +112,154 @@ vector<int64_t> ToIntArr(const RespVec& vec) {
return res;
}
BaseFamilyTest::TestConn::TestConn()
: dummy_conn(new Connection(Protocol::REDIS, nullptr, nullptr)),
cmd_cntx(&sink, dummy_conn.get()) {
}
BaseFamilyTest::TestConn::~TestConn() {
}
BaseFamilyTest::BaseFamilyTest() {
}
BaseFamilyTest::~BaseFamilyTest() {
}
void BaseFamilyTest::SetUp() {
pp_.reset(new uring::UringPool(16, num_threads_));
pp_->Run();
service_.reset(new Service{pp_.get()});
Service::InitOpts opts;
opts.disable_time_update = true;
service_->Init(nullptr, opts);
ess_ = &service_->shard_set();
const TestInfo* const test_info = UnitTest::GetInstance()->current_test_info();
LOG(INFO) << "Starting " << test_info->name();
}
void BaseFamilyTest::TearDown() {
service_->Shutdown();
service_.reset();
pp_->Stop();
const TestInfo* const test_info = UnitTest::GetInstance()->current_test_info();
LOG(INFO) << "Finishing " << test_info->name();
}
// ts is ms
void BaseFamilyTest::UpdateTime(uint64_t ms) {
auto cb = [ms](EngineShard* s) { s->db_slice().UpdateExpireClock(ms); };
ess_->RunBriefInParallel(cb);
}
RespVec BaseFamilyTest::Run(initializer_list<std::string_view> list) {
if (!ProactorBase::IsProactorThread()) {
return pp_->at(0)->AwaitBlocking([&] { return this->Run(list); });
}
mu_.lock();
string id = GetId();
auto [it, inserted] = connections_.emplace(id, nullptr);
if (inserted) {
it->second.reset(new TestConn);
} else {
it->second->sink.Clear();
}
TestConn* conn = it->second.get();
mu_.unlock();
CmdArgVec args = conn->Args(list);
CmdArgList cmd_arg_list{args.data(), args.size()};
auto& context = conn->cmd_cntx;
context.shard_set = ess_;
service_->DispatchCommand(cmd_arg_list, &context);
unique_lock lk(mu_);
last_cmd_dbg_info_ = context.last_command_debug;
RespVec vec = conn->ParseResp();
if (vec.size() == 1) {
auto buf = vec.front().GetBuf();
if (!buf.empty() && buf[0] == '+') {
buf.remove_prefix(1);
std::get<RespExpr::Buffer>(vec.front().u) = buf;
}
}
return vec;
}
int64_t BaseFamilyTest::CheckedInt(std::initializer_list<std::string_view> list) {
RespVec resp = Run(list);
CHECK_EQ(1u, resp.size());
if (resp.front().type == RespExpr::INT64) {
return get<int64_t>(resp.front().u);
}
if (resp.front().type == RespExpr::NIL) {
return INT64_MIN;
}
CHECK_EQ(RespExpr::STRING, int(resp.front().type));
string_view sv = ToSV(resp.front().GetBuf());
int64_t res;
CHECK(absl::SimpleAtoi(sv, &res)) << "|" << sv << "|";
return res;
}
CmdArgVec BaseFamilyTest::TestConn::Args(std::initializer_list<std::string_view> list) {
CHECK_NE(0u, list.size());
CmdArgVec res;
for (auto v : list) {
tmp_str_vec.emplace_back(new string{v});
auto& s = *tmp_str_vec.back();
res.emplace_back(s.data(), s.size());
}
return res;
}
RespVec BaseFamilyTest::TestConn::ParseResp() {
tmp_str_vec.emplace_back(new string{sink.str()});
auto& s = *tmp_str_vec.back();
auto buf = RespExpr::buffer(&s);
uint32_t consumed = 0;
parser.reset(new RedisParser);
RespVec res;
RedisParser::Result st = parser->Parse(buf, &consumed, &res);
CHECK_EQ(RedisParser::OK, st);
return res;
}
bool BaseFamilyTest::IsLocked(DbIndex db_index, std::string_view key) const {
ShardId sid = Shard(key, ess_->size());
KeyLockArgs args;
args.db_index = db_index;
args.args = ArgSlice{&key, 1};
args.key_step = 1;
bool is_open = pp_->at(sid)->AwaitBrief(
[args] { return EngineShard::tlocal()->db_slice().CheckLock(IntentLock::EXCLUSIVE, args); });
return !is_open;
}
string BaseFamilyTest::GetId() const {
int32 id = ProactorBase::GetIndex();
CHECK_GE(id, 0);
return absl::StrCat("IO", id);
}
ConnectionContext::DebugInfo BaseFamilyTest::GetDebugInfo(const std::string& id) const {
auto it = connections_.find(id);
CHECK(it != connections_.end());
return it->second->cmd_cntx.last_command_debug;
}
} // namespace dfly

View file

@ -7,6 +7,8 @@
#include <gmock/gmock.h>
#include "io/io.h"
#include "server/conn_context.h"
#include "server/main_service.h"
#include "server/redis_parser.h"
#include "util/proactor_pool.h"
@ -83,4 +85,56 @@ MATCHER_P(RespEq, val, "") {
return ::testing::ExplainMatchResult(::testing::ElementsAre(StrArg(val)), arg, result_listener);
}
std::vector<int64_t> ToIntArr(const RespVec& vec);
class BaseFamilyTest : public ::testing::Test {
protected:
BaseFamilyTest();
~BaseFamilyTest();
void SetUp() override;
void TearDown() override;
protected:
RespVec Run(std::initializer_list<std::string_view> list);
int64_t CheckedInt(std::initializer_list<std::string_view> list);
bool IsLocked(DbIndex db_index, std::string_view key) const;
ConnectionContext::DebugInfo GetDebugInfo(const std::string& id) const;
ConnectionContext::DebugInfo GetDebugInfo() const {
return GetDebugInfo("IO0");
}
// ts is ms
void UpdateTime(uint64_t ms);
std::string GetId() const;
std::unique_ptr<util::ProactorPool> pp_;
std::unique_ptr<Service> service_;
EngineShardSet* ess_ = nullptr;
unsigned num_threads_ = 3;
struct TestConn {
io::StringSink sink;
std::unique_ptr<Connection> dummy_conn;
ConnectionContext cmd_cntx;
std::vector<std::unique_ptr<std::string>> tmp_str_vec;
std::unique_ptr<RedisParser> parser;
TestConn();
~TestConn();
CmdArgVec Args(std::initializer_list<std::string_view> list);
RespVec ParseResp();
};
absl::flat_hash_map<std::string, std::unique_ptr<TestConn>> connections_;
::boost::fibers::mutex mu_;
ConnectionContext::DebugInfo last_cmd_dbg_info_;
};
} // namespace dfly

View file

@ -22,10 +22,8 @@ std::atomic_uint64_t op_seq{1};
constexpr size_t kTransSize = sizeof(Transaction);
} // namespace
IntentLock::Mode Transaction::Mode() const {
return (trans_options_ & CO::READONLY) ? IntentLock::SHARED : IntentLock::EXCLUSIVE;
}
@ -46,10 +44,10 @@ Transaction::Transaction(const CommandId* cid, EngineShardSet* ess) : cid_(cid),
bool single_key = cid_->first_key_pos() > 0 && !cid_->is_multi_key();
if (single_key) {
dist_.shard_data.resize(1); // Single key optimization
shard_data_.resize(1); // Single key optimization
} else {
// Our shard_data is not sparse, so we must allocate for all threads :(
dist_.shard_data.resize(ess_->size());
shard_data_.resize(ess_->size());
}
}
@ -73,18 +71,18 @@ Transaction::Transaction(const CommandId* cid, EngineShardSet* ess) : cid_(cid),
*
**/
void Transaction::InitByArgs(CmdArgList args) {
void Transaction::InitByArgs(DbIndex index, CmdArgList args) {
CHECK_GT(args.size(), 1U);
CHECK_LT(size_t(cid_->first_key_pos()), args.size());
DCHECK_EQ(unique_shard_cnt_, 0u);
db_index_ = index;
if (!cid_->is_multi_key()) { // Single key optimization.
auto key = ArgS(args, cid_->first_key_pos());
args_.push_back(key);
unique_shard_cnt_ = 1;
unique_shard_id_ = Shard(key, ess_->size());
num_keys_ = 1;
return;
}
@ -93,7 +91,7 @@ void Transaction::InitByArgs(CmdArgList args) {
// Reuse thread-local temporary storage. Since this code is non-preemptive we can use it here.
auto& shard_index = tmp_space.shard_cache;
shard_index.resize(dist_.shard_data.size());
shard_index.resize(shard_data_.size());
for (auto& v : shard_index) {
v.Clear();
}
@ -102,10 +100,9 @@ void Transaction::InitByArgs(CmdArgList args) {
: (args.size() + 1 + cid_->last_key_pos());
for (size_t i = 1; i < key_end; ++i) {
std::string_view key = ArgS(args, i);
uint32_t sid = Shard(key, dist_.shard_data.size());
uint32_t sid = Shard(key, shard_data_.size());
shard_index[sid].args.push_back(key);
shard_index[sid].original_index.push_back(i - 1);
++num_keys_;
if (cid_->key_arg_step() == 2) { // value
++i;
@ -116,15 +113,15 @@ void Transaction::InitByArgs(CmdArgList args) {
}
args_.resize(key_end - 1);
dist_.reverse_index.resize(args_.size());
reverse_index_.resize(args_.size());
auto next_arg = args_.begin();
auto rev_indx_it = dist_.reverse_index.begin();
auto rev_indx_it = reverse_index_.begin();
// slice.arg_start/arg_count point to args_ array which is sorted according to shard of each key.
// reverse_index_[i] says what's the original position of args_[i] in args.
for (size_t i = 0; i < dist_.shard_data.size(); ++i) {
auto& sd = dist_.shard_data[i];
for (size_t i = 0; i < shard_data_.size(); ++i) {
auto& sd = shard_data_[i];
auto& si = shard_index[i];
CHECK_LT(si.args.size(), 1u << 15);
sd.arg_count = si.args.size();
@ -152,14 +149,14 @@ void Transaction::InitByArgs(CmdArgList args) {
if (unique_shard_cnt_ == 1) {
PerShardData* sd;
dist_.shard_data.resize(1);
sd = &dist_.shard_data.front();
shard_data_.resize(1);
sd = &shard_data_.front();
sd->arg_count = -1;
sd->arg_start = -1;
}
// Validation.
for (const auto& sd : dist_.shard_data) {
for (const auto& sd : shard_data_) {
DCHECK_EQ(sd.local_mask, 0u);
DCHECK_EQ(0, sd.local_mask & ARMED);
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
@ -171,37 +168,29 @@ string Transaction::DebugId() const {
}
// Runs in the dbslice thread. Returns true if transaction needs to be kept in the queue.
bool Transaction::RunInShard(ShardId sid) {
CHECK(cb_);
bool Transaction::RunInShard(EngineShard* shard) {
DCHECK_GT(run_count_.load(memory_order_relaxed), 0u);
CHECK(cb_) << DebugId();
DCHECK_GT(txid_, 0u);
EngineShard* shard = EngineShard::tlocal();
// Unlike with regular transactions we do not acquire locks upon scheduling
// because Scheduling is done before multi-exec batch is executed. Therefore we
// lock keys right before the execution of each statement.
DVLOG(1) << "RunInShard: " << DebugId() << " sid:" << sid;
DVLOG(1) << "RunInShard: " << DebugId() << " sid:" << shard->shard_id();
unsigned idx = SidToId(shard->shard_id());
auto& sd = shard_data_[idx];
sid = SidToId(sid);
auto& sd = dist_.shard_data[sid];
DCHECK(sd.local_mask & ARMED);
sd.local_mask &= ~ARMED;
bool concluding = dist_.is_concluding_cb;
DCHECK(sd.local_mask & KEYS_ACQUIRED);
/*************************************************************************/
// Actually running the callback.
OpStatus status = cb_(this, shard);
// If it's a final hop we should release the locks.
if (concluding) {
auto largs = GetLockArgs(sid);
shard->db_slice().Release(Mode(), largs);
sd.local_mask &= ~KEYS_ACQUIRED;
}
/*************************************************************************/
if (unique_shard_cnt_ == 1) {
cb_ = nullptr; // We can do it because only a single thread runs the callback.
@ -210,28 +199,27 @@ bool Transaction::RunInShard(ShardId sid) {
CHECK_EQ(OpStatus::OK, status);
}
// This shard should own a reference for transaction as well as coordinator thread.
DCHECK_GT(use_count(), 1u);
CHECK_GE(DecreaseRunCnt(), 1u);
// at least the coordinator thread owns the reference.
DCHECK_GE(use_count(), 1u);
// must be computed before intrusive_ptr_release call.
if (concluding) {
sd.pq_pos = TxQueue::kEnd;
// For multi-transaction we need to clear this flag to allow locking of the next set of keys
// during the next child transaction.
// If it's a final hop we should release the locks.
if (is_concluding_cb_) {
KeyLockArgs largs = GetLockArgs(idx);
shard->db_slice().Release(Mode(), largs);
sd.local_mask &= ~KEYS_ACQUIRED;
DVLOG(2) << "ptr_release " << DebugId() << " " << this->use_count();
intrusive_ptr_release(this); // Against ScheduleInternal.
}
return !concluding; // keep
CHECK_GE(DecreaseRunCnt(), 1u);
return !is_concluding_cb_; // keep
}
void Transaction::ScheduleInternal(bool single_hop) {
DCHECK_EQ(0, state_mask_.load(memory_order_acquire) & SCHEDULED);
DCHECK_EQ(0u, txid_);
bool out_of_order = false;
uint32_t num_shards;
std::function<bool(uint32_t)> is_active;
@ -239,22 +227,19 @@ void Transaction::ScheduleInternal(bool single_hop) {
DCHECK_GT(num_shards, 0u);
is_active = [&](uint32_t i) {
return num_shards == 1 ? (i == unique_shard_id_) : dist_.shard_data[i].arg_count > 0;
return num_shards == 1 ? (i == unique_shard_id_) : shard_data_[i].arg_count > 0;
};
// intrusive_ptr_add num_shards times.
use_count_.fetch_add(num_shards, memory_order_relaxed);
while (true) {
txid_ = op_seq.fetch_add(1, std::memory_order_relaxed);
std::atomic_uint32_t lock_acquire_cnt{0};
std::atomic_uint32_t lock_granted_cnt{0};
std::atomic_uint32_t success{0};
auto cb = [&](EngineShard* shard) {
pair<bool, bool> res = ScheduleInShard(shard);
success.fetch_add(res.first, memory_order_relaxed);
lock_acquire_cnt.fetch_add(res.second, memory_order_relaxed);
lock_granted_cnt.fetch_add(res.second, memory_order_relaxed);
};
ess_->RunBriefInParallel(std::move(cb), is_active);
@ -263,10 +248,10 @@ void Transaction::ScheduleInternal(bool single_hop) {
// We allow out of order execution only for single hop transactions.
// It might be possible to do it for multi-hop transactions as well but currently is
// too complicated to reason about.
if (single_hop && lock_acquire_cnt.load(memory_order_relaxed) == num_shards) {
dist_.out_of_order.store(true, memory_order_relaxed);
if (single_hop && lock_granted_cnt.load(memory_order_relaxed) == num_shards) {
out_of_order = true;
}
DVLOG(1) << "Scheduled " << DebugId() << " OutOfOrder: " << dist_.out_of_order;
DVLOG(1) << "Scheduled " << DebugId() << " OutOfOrder: " << out_of_order;
state_mask_.fetch_or(SCHEDULED, memory_order_release);
break;
@ -281,6 +266,12 @@ void Transaction::ScheduleInternal(bool single_hop) {
ess_->RunBriefInParallel(std::move(cancel), is_active);
CHECK_EQ(0u, success.load(memory_order_relaxed));
}
if (out_of_order) {
for (auto& sd : shard_data_) {
sd.local_mask |= OUT_OF_ORDER;
}
}
}
// Optimized "Schedule and execute" function for the most common use-case of a single hop
@ -291,25 +282,43 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
cb_ = std::move(cb);
bool run_eager = false;
bool schedule_fast = (unique_shard_cnt_ == 1);
if (schedule_fast) { // Single shard (local) optimization.
// We never resize shard_data because that would affect MULTI transaction correctness.
DCHECK_EQ(1u, dist_.shard_data.size());
DCHECK_EQ(1u, shard_data_.size());
dist_.shard_data[0].local_mask |= ARMED;
run_count_.fetch_add(1, memory_order_release); // Decreases in RunLocal.
auto schedule_cb = [&] { return ScheduleUniqueShard(EngineShard::tlocal()); };
run_eager = ess_->Await(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier.
(void)run_eager;
} else { // Transaction spans multiple shards or it's global (like flushdb)
shard_data_[0].local_mask |= ARMED;
// memory_order_release because we do not want it to be reordered with shard_data writes
// above.
// IsArmedInShard() first checks run_count_ before accessing shard_data.
run_count_.fetch_add(1, memory_order_release);
// Please note that schedule_cb can not update any data on ScheduleSingleHop stack
// since the latter can exit before ScheduleUniqueShard returns.
// The problematic flow is as follows: ScheduleUniqueShard schedules into TxQueue and then
// call PollExecute that runs the callback which calls DecreaseRunCnt.
// As a result WaitForShardCallbacks below is unblocked.
auto schedule_cb = [&] {
bool run_eager = ScheduleUniqueShard(EngineShard::tlocal());
if (run_eager) {
// it's important to DecreaseRunCnt only for run_eager and after run_eager was assigned.
// If DecreaseRunCnt were called before ScheduleUniqueShard finishes
// then WaitForShardCallbacks below could exit before schedule_cb assigns return value
// to run_eager and cause stack corruption.
CHECK_GE(DecreaseRunCnt(), 1u);
}
};
ess_->Add(unique_shard_id_, std::move(schedule_cb)); // serves as a barrier.
} else {
ScheduleInternal(true);
ExecuteAsync(true);
}
DVLOG(1) << "Before DoneWait " << DebugId() << " " << args_.front();
DVLOG(1) << "ScheduleSingleHop before Wait " << DebugId() << " " << run_count_.load();
WaitForShardCallbacks();
DVLOG(1) << "After DoneWait";
DVLOG(1) << "ScheduleSingleHop after Wait " << DebugId();
cb_ = nullptr;
state_mask_.fetch_or(AFTERRUN, memory_order_release);
@ -323,23 +332,24 @@ void Transaction::Execute(RunnableType cb, bool conclude) {
ExecuteAsync(conclude);
DVLOG(1) << "Wait on " << DebugId();
DVLOG(1) << "Wait on Exec " << DebugId();
WaitForShardCallbacks();
DVLOG(1) << "Wait on " << DebugId() << " completed";
DVLOG(1) << "Wait on Exec " << DebugId() << " completed";
cb_ = nullptr;
dist_.out_of_order.store(false, memory_order_relaxed);
uint32_t mask = conclude ? AFTERRUN : RUNNING;
state_mask_.fetch_or(mask, memory_order_release);
state_mask_.fetch_or(mask, memory_order_relaxed);
}
// Runs in coordinator thread.
void Transaction::ExecuteAsync(bool concluding_cb) {
DVLOG(1) << "ExecuteAsync " << DebugId() << " concluding " << concluding_cb;
dist_.is_concluding_cb = concluding_cb;
is_concluding_cb_ = concluding_cb;
DCHECK_GT(unique_shard_cnt_, 0u);
DCHECK_GT(use_count_.load(memory_order_relaxed), 0u);
// We do not necessarily Execute this transaction in 'cb' below. It well may be that it will be
// executed by the engine shard once it has been armed and coordinator thread will finish the
@ -349,10 +359,10 @@ void Transaction::ExecuteAsync(bool concluding_cb) {
use_count_.fetch_add(unique_shard_cnt_, memory_order_relaxed);
if (unique_shard_cnt_ == 1) {
dist_.shard_data[SidToId(unique_shard_id_)].local_mask |= ARMED;
shard_data_[SidToId(unique_shard_id_)].local_mask |= ARMED;
} else {
for (ShardId i = 0; i < dist_.shard_data.size(); ++i) {
auto& sd = dist_.shard_data[i];
for (ShardId i = 0; i < shard_data_.size(); ++i) {
auto& sd = shard_data_[i];
if (sd.arg_count == 0)
continue;
DCHECK_LT(sd.arg_count, 1u << 15);
@ -360,18 +370,37 @@ void Transaction::ExecuteAsync(bool concluding_cb) {
}
}
uint32_t seq = seqlock_.load(memory_order_relaxed);
// this fence prevents that a read or write operation before a release fence will be reordered
// with a write operation after a release fence. Specifically no writes below will be reordered
// upwards. Important, because it protects non-threadsafe local_mask from being accessed by
// IsArmedInShard in other threads.
run_count_.fetch_add(unique_shard_cnt_, memory_order_acq_rel);
run_count_.store(unique_shard_cnt_, memory_order_release);
auto cb = [this] {
// We verify seq lock has the same generation number. See below for more info.
auto cb = [seq, this] {
EngineShard* shard = EngineShard::tlocal();
DVLOG(2) << "TriggerExec " << DebugId() << " sid:" << shard->shard_id();
DVLOG(2) << "EngineShard::Exec " << DebugId() << " sid:" << shard->shard_id() << " "
<< run_count_.load(memory_order_relaxed);
// Everything that should be handled during the callback execution should go into RunInShard.
shard->Execute(this);
uint16_t local_mask = GetLocalMask(shard->shard_id());
// we use fetch_add with release trick to make sure that local_mask is loaded before
// we load seq_after. We could gain similar result with "atomic_thread_fence(acquire)"
uint32_t seq_after = seqlock_.fetch_add(0, memory_order_release);
// We verify that this callback is still relevant.
// If we still have the same sequence number and local_mask is ARMED it means
// the coordinator thread has not crossed WaitForShardCallbacks barrier.
// Otherwise, this callback is redundant. We may still call PollExecution but
// we should not pass this to it since it can be in undefined state for this callback.
if (seq_after == seq && (local_mask & ARMED)) {
// shard->PollExecution(this) does not necessarily execute this transaction.
// Therefore, everything that should be handled during the callback execution
// should go into RunInShard.
shard->PollExecution(this);
}
DVLOG(2) << "ptr_release " << DebugId() << " " << use_count();
intrusive_ptr_release(this); // against use_count_.fetch_add above.
@ -381,8 +410,8 @@ void Transaction::ExecuteAsync(bool concluding_cb) {
if (unique_shard_cnt_ == 1) {
ess_->Add(unique_shard_id_, std::move(cb)); // serves as a barrier.
} else {
for (ShardId i = 0; i < dist_.shard_data.size(); ++i) {
auto& sd = dist_.shard_data[i];
for (ShardId i = 0; i < shard_data_.size(); ++i) {
auto& sd = shard_data_[i];
if (sd.arg_count == 0)
continue;
ess_->Add(i, cb); // serves as a barrier.
@ -391,11 +420,11 @@ void Transaction::ExecuteAsync(bool concluding_cb) {
}
void Transaction::RunQuickie() {
DCHECK_EQ(1u, dist_.shard_data.size());
DCHECK_EQ(1u, shard_data_.size());
DCHECK_EQ(0u, txid_);
EngineShard* shard = EngineShard::tlocal();
auto& sd = dist_.shard_data[0];
auto& sd = shard_data_[0];
DCHECK_EQ(0, sd.local_mask & KEYS_ACQUIRED);
DVLOG(1) << "RunQuickSingle " << DebugId() << " " << shard->shard_id() << " " << args_[0];
@ -405,7 +434,6 @@ void Transaction::RunQuickie() {
sd.local_mask &= ~ARMED;
cb_ = nullptr; // We can do it because only a single shard runs the callback.
CHECK_GE(DecreaseRunCnt(), 1u);
}
const char* Transaction::Name() const {
@ -414,7 +442,7 @@ const char* Transaction::Name() const {
KeyLockArgs Transaction::GetLockArgs(ShardId sid) const {
KeyLockArgs res;
res.db_index = 0; // TODO
res.db_index = db_index_;
res.key_step = cid_->key_arg_step();
res.args = ShardArgsInShard(sid);
@ -426,19 +454,19 @@ KeyLockArgs Transaction::GetLockArgs(ShardId sid) const {
// Returns true if was eagerly executed, false if it was scheduled into queue.
bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
DCHECK_EQ(0u, txid_);
DCHECK_EQ(1u, dist_.shard_data.size());
DCHECK_EQ(1u, shard_data_.size());
auto mode = Mode();
auto lock_args = GetLockArgs(shard->shard_id());
auto& sd = dist_.shard_data.front();
auto& sd = shard_data_.front();
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
// Fast path - for uncontended keys, just run the callback.
// That applies for single key operations like set, get, lpush etc.
if (shard->db_slice().CheckLock(mode, lock_args)) {
RunQuickie(); // TODO: for journal - this can become multi-shard
// transaction on replica.
// transaction on replica.
return true;
}
@ -455,6 +483,9 @@ bool Transaction::ScheduleUniqueShard(EngineShard* shard) {
DCHECK(!lock_acquired); // Because CheckLock above failed.
state_mask_.fetch_or(SCHEDULED, memory_order_release);
DVLOG(1) << "Rescheduling into TxQueue " << DebugId();
shard->PollExecution(nullptr);
return false;
}
@ -475,7 +506,7 @@ pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) {
bool lock_granted = false;
ShardId sid = SidToId(shard->shard_id());
auto& sd = dist_.shard_data[sid];
auto& sd = shard_data_[sid];
bool shard_unlocked = true;
lock_args = GetLockArgs(shard->shard_id());
@ -495,7 +526,7 @@ pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) {
// before we start scheduling them we lock the shards and disable OOO.
// We may record when they disable OOO via barrier_ts so if the queue contains transactions
// that were only scheduled afterwards we know they are not free so we can still
// reorder the queue. Currently, this optimization is disabled: barrier_ts < pq->HeadRank().
// reorder the queue. Currently, this optimization is disabled: barrier_ts < pq->HeadScore().
bool to_proceed = lock_granted || pq->TailScore() < txid_;
if (!to_proceed) {
if (sd.local_mask & KEYS_ACQUIRED) { // rollback the lock.
@ -521,7 +552,7 @@ pair<bool, bool> Transaction::ScheduleInShard(EngineShard* shard) {
bool Transaction::CancelInShard(EngineShard* shard) {
ShardId sid = SidToId(shard->shard_id());
auto& sd = dist_.shard_data[sid];
auto& sd = shard_data_[sid];
auto pos = sd.pq_pos;
if (pos == TxQueue::kEnd)
@ -555,7 +586,7 @@ ArgSlice Transaction::ShardArgsInShard(ShardId sid) const {
return args_;
}
const auto& sd = dist_.shard_data[sid];
const auto& sd = shard_data_[sid];
return ArgSlice{args_.data() + sd.arg_start, sd.arg_count};
}
@ -563,12 +594,13 @@ size_t Transaction::ReverseArgIndex(ShardId shard_id, size_t arg_index) const {
if (unique_shard_cnt_ == 1)
return arg_index;
return dist_.reverse_index[dist_.shard_data[shard_id].arg_start + arg_index];
return reverse_index_[shard_data_[shard_id].arg_start + arg_index];
}
inline uint32_t Transaction::DecreaseRunCnt() {
// We use release so that no stores will be reordered after.
uint32_t res = run_count_.fetch_sub(1, std::memory_order_release);
if (res == 1)
run_ec_.notify();
return res;

View file

@ -7,15 +7,15 @@
#include <absl/container/flat_hash_map.h>
#include <absl/container/flat_hash_set.h>
#include <absl/container/inlined_vector.h>
#include <string_view>
#include <variant>
#include <boost/smart_ptr/intrusive_ptr.hpp>
#include <string_view>
#include <variant>
#include <vector>
#include "core/intent_lock.h"
#include "core/tx_queue.h"
#include "core/op_status.h"
#include "core/tx_queue.h"
#include "server/common_types.h"
#include "server/table.h"
#include "util/fibers/fibers_ext.h"
@ -32,7 +32,7 @@ class Transaction {
~Transaction();
// Transactions are reference counted.
// Transactions are reference counted.
friend void intrusive_ptr_add_ref(Transaction* trans) noexcept {
trans->use_count_.fetch_add(1, std::memory_order_relaxed);
}
@ -49,8 +49,9 @@ class Transaction {
using time_point = ::std::chrono::steady_clock::time_point;
enum LocalState : uint8_t {
ARMED = 1, // Transaction was armed with the callback
KEYS_ACQUIRED = 0x20,
ARMED = 1, // Transaction was armed with the callback
OUT_OF_ORDER = 2,
KEYS_ACQUIRED = 4,
};
enum State : uint8_t {
@ -61,7 +62,7 @@ class Transaction {
Transaction(const CommandId* cid, EngineShardSet* ess);
void InitByArgs(CmdArgList args);
void InitByArgs(DbIndex index, CmdArgList args);
std::string DebugId() const;
@ -77,35 +78,30 @@ class Transaction {
//! Runs from the coordinator thread.
bool IsActive(ShardId shard_id) const {
return unique_shard_cnt_ == 1 ? unique_shard_id_ == shard_id
: dist_.shard_data[shard_id].arg_count > 0;
: shard_data_[shard_id].arg_count > 0;
}
//! Returns true if the transaction is armed for execution on this sid (used to avoid
//! duplicate runs). Supports local transactions under multi as well.
bool IsArmedInShard(ShardId sid) const {
if (sid >= dist_.shard_data.size())
if (sid >= shard_data_.size())
sid = 0;
// We use acquire so that no reordering will move before this load.
return run_count_.load(std::memory_order_acquire) > 0 &&
dist_.shard_data[sid].local_mask & ARMED;
return run_count_.load(std::memory_order_acquire) > 0 && shard_data_[sid].local_mask & ARMED;
}
// Called from engine set shard threads.
uint16_t GetLocalMask(ShardId sid) const {
return dist_.shard_data[SidToId(sid)].local_mask;
return shard_data_[SidToId(sid)].local_mask;
}
uint32_t GetStateMask() const {
return state_mask_.load(std::memory_order_relaxed);
}
bool IsOutOfOrder() const {
return dist_.out_of_order.load(std::memory_order_relaxed);
}
// Relevant only when unique_shards_ > 1.
uint32_t TxQueuePos(ShardId sid) const {
return dist_.shard_data[sid].pq_pos;
return shard_data_[sid].pq_pos;
}
// if conclude is true, removes the transaction from the pending queue.
@ -145,16 +141,17 @@ class Transaction {
return unique_shard_cnt_;
}
EngineShardSet* shard_set() { return ess_; }
EngineShardSet* shard_set() {
return ess_;
}
// Called by EngineShard when performing Execute over the tx queue.
// Returns true if transaction should be kept in the queue.
bool RunInShard(ShardId sid);
bool RunInShard(EngineShard* shard);
private:
unsigned SidToId(ShardId sid) const {
return sid < dist_.shard_data.size() ? sid : 0;
return sid < shard_data_.size() ? sid : 0;
}
void ScheduleInternal(bool single_hop);
@ -182,9 +179,13 @@ class Transaction {
void WaitForShardCallbacks() {
run_ec_.await([this] { return 0 == run_count_.load(std::memory_order_relaxed); });
// store operations below can not be ordered above the fence
std::atomic_thread_fence(std::memory_order_release);
seqlock_.fetch_add(1, std::memory_order_relaxed);
}
// Returns the previous value of arm count.
// Returns the previous value of run count.
uint32_t DecreaseRunCnt();
uint32_t use_count() const {
@ -208,59 +209,54 @@ class Transaction {
};
enum { kPerShardSize = sizeof(PerShardData) };
struct Dist {
// shard_data spans all the shards in ess_.
// I wish we could use a dense array of size [0..uniq_shards] but since
// multiple threads access this array to synchronize between themselves using
// PerShardData.state, it can be tricky. The complication comes from multi_ transactions where
// scheduled transaction is accessed between operations as well.
absl::InlinedVector<PerShardData, 4> shard_data; // length = shard_count
// Reverse argument mapping. Allows to reconstruct responses according to the original order of
// keys.
std::vector<uint32_t> reverse_index;
// NOTE: to move to bitmask if it grows.
// Written by coordinator thread, read by shard threads but not concurrently.
// Says whether the current callback function is concluding for this operation.
bool is_concluding_cb{true};
// out_of_order true - transaction can execute before other scheduled transactions,
// not necessary according to its queue order.
std::atomic_bool out_of_order{false};
struct LockCnt {
unsigned cnt[2] = {0, 0};
};
enum { kDistSize = sizeof(Dist) };
util::fibers_ext::EventCount blocking_ec_; // used to wake blocking transactions.
util::fibers_ext::EventCount run_ec_;
// shard_data spans all the shards in ess_.
// I wish we could use a dense array of size [0..uniq_shards] but since
// multiple threads access this array to synchronize between themselves using
// PerShardData.state, it can be tricky. The complication comes from multi_ transactions where
// scheduled transaction is accessed between operations as well.
absl::InlinedVector<PerShardData, 4> shard_data_; // length = shard_count
//! Stores arguments of the transaction (i.e. keys + values) partitioned by shards.
absl::InlinedVector<std::string_view, 4> args_;
// Reverse argument mapping. Allows to reconstruct responses according to the original order of
// keys.
std::vector<uint32_t> reverse_index_;
RunnableType cb_;
const CommandId* cid_;
EngineShardSet* ess_;
TxId txid_{0};
std::atomic_uint32_t use_count_{0}, run_count_{0};
std::atomic_uint32_t use_count_{0}, run_count_{0}, seqlock_{0};
// unique_shard_cnt_ and unique_shard_id_ is accessed only by coordinator thread.
uint32_t unique_shard_cnt_{0}; // number of unique shards span by args_
ShardId unique_shard_id_{kInvalidSid};
uint32_t trans_options_ = 0;
// Written by coordination thread but may be read by Shard threads.
// A mask of State values. Mostly used for debugging and for invariant checks.
std::atomic<uint16_t> state_mask_{0};
ShardId unique_shard_id_{kInvalidSid};
DbIndex db_index_ = 0;
// For single-hop transactions with unique_shards_ == 1, hence no data-race.
OpStatus local_result_ = OpStatus::OK;
uint32_t trans_options_ = 0;
uint32_t num_keys_ = 0;
Dist dist_;
util::fibers_ext::EventCount run_ec_;
//! Stores arguments of the transaction (i.e. keys + values) ordered by shards.
absl::InlinedVector<std::string_view, 4> args_;
RunnableType cb_;
// NOTE: to move to bitmask if it grows.
// Written by coordinator thread, read by shard threads but not concurrently.
// Says whether the current callback function is concluding for this operation.
bool is_concluding_cb_{true};
struct PerShardCache {
std::vector<std::string_view> args;