mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-12 02:45:45 +02:00
Initial support for lua transactions.
Extend multi-transactions to scripts. Differentiate between incremental and instant locking for multi-transactions.
This commit is contained in:
parent
b1829c3fe0
commit
e1c852dfcc
10 changed files with 173 additions and 67 deletions
|
@ -157,6 +157,18 @@ TEST_F(InterpreterTest, Basic) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
TEST_F(InterpreterTest, UnknownFunc) {
|
||||||
|
string_view code(R"(
|
||||||
|
function foo(n)
|
||||||
|
return myunknownfunc(1, n)
|
||||||
|
end)");
|
||||||
|
|
||||||
|
CHECK_EQ(0, luaL_loadbuffer(lua(), code.data(), code.size(), "code1"));
|
||||||
|
CHECK_EQ(0, lua_pcall(lua(), 0, 0, 0));
|
||||||
|
int type = lua_getglobal(lua(), "myunknownfunc");
|
||||||
|
ASSERT_EQ(LUA_TNIL, type);
|
||||||
|
}
|
||||||
|
|
||||||
TEST_F(InterpreterTest, Stack) {
|
TEST_F(InterpreterTest, Stack) {
|
||||||
RunInline(R"(
|
RunInline(R"(
|
||||||
local x = {}
|
local x = {}
|
||||||
|
|
|
@ -33,6 +33,7 @@ CommandRegistry::CommandRegistry() {
|
||||||
CommandId cd("COMMAND", CO::RANDOM | CO::LOADING, 0, 0, 0, 0);
|
CommandId cd("COMMAND", CO::RANDOM | CO::LOADING, 0, 0, 0, 0);
|
||||||
|
|
||||||
cd.SetHandler([this](const auto& args, auto* cntx) { return Command(args, cntx); });
|
cd.SetHandler([this](const auto& args, auto* cntx) { return Command(args, cntx); });
|
||||||
|
|
||||||
const char* nm = cd.name();
|
const char* nm = cd.name();
|
||||||
cmd_map_.emplace(nm, std::move(cd));
|
cmd_map_.emplace(nm, std::move(cd));
|
||||||
}
|
}
|
||||||
|
@ -70,6 +71,38 @@ CommandRegistry& CommandRegistry::operator<<(CommandId cmd) {
|
||||||
return *this;
|
return *this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
KeyIndex DetermineKeys(const CommandId* cid, const CmdArgList& args) {
|
||||||
|
DCHECK_EQ(0u, cid->opt_mask() & CO::GLOBAL_TRANS);
|
||||||
|
|
||||||
|
KeyIndex key_index;
|
||||||
|
|
||||||
|
if (cid->first_key_pos() > 0) {
|
||||||
|
key_index.start = cid->first_key_pos();
|
||||||
|
int last = cid->last_key_pos();
|
||||||
|
key_index.end = last > 0 ? last + 1 : (int(args.size()) + 1 + last);
|
||||||
|
key_index.step = cid->key_arg_step();
|
||||||
|
|
||||||
|
return key_index;
|
||||||
|
}
|
||||||
|
|
||||||
|
string_view name{cid->name()};
|
||||||
|
if (name == "EVAL" || name == "EVALSHA") {
|
||||||
|
DCHECK_GE(args.size(), 3u);
|
||||||
|
uint32_t num_keys;
|
||||||
|
|
||||||
|
CHECK(absl::SimpleAtoi(ArgS(args, 2), &num_keys));
|
||||||
|
key_index.start = 3;
|
||||||
|
key_index.end = 3 + num_keys;
|
||||||
|
key_index.step = 1;
|
||||||
|
|
||||||
|
return key_index;
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG(FATAL) << "TBD: Not supported";
|
||||||
|
|
||||||
|
return key_index;
|
||||||
|
}
|
||||||
|
|
||||||
namespace CO {
|
namespace CO {
|
||||||
|
|
||||||
const char* OptName(CO::CommandOpt fl) {
|
const char* OptName(CO::CommandOpt fl) {
|
||||||
|
|
|
@ -37,8 +37,13 @@ const char* OptName(CommandOpt fl);
|
||||||
|
|
||||||
class CommandId {
|
class CommandId {
|
||||||
public:
|
public:
|
||||||
using Handler = std::function<void(CmdArgList, ConnectionContext*)>;
|
using Handler =
|
||||||
using ArgValidator = std::function<bool(CmdArgList, ConnectionContext*)>;
|
fu2::function_base<true /*owns*/, true /*copyable*/, fu2::capacity_default,
|
||||||
|
false /* non-throwing*/, false /* strong exceptions guarantees*/,
|
||||||
|
void(CmdArgList, ConnectionContext*) const>;
|
||||||
|
|
||||||
|
using ArgValidator = fu2::function_base<true, true, fu2::capacity_default, false, false,
|
||||||
|
bool(CmdArgList, ConnectionContext*) const>;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @brief Construct a new Command Id object
|
* @brief Construct a new Command Id object
|
||||||
|
@ -152,4 +157,8 @@ class CommandRegistry {
|
||||||
void Command(CmdArgList args, ConnectionContext* cntx);
|
void Command(CmdArgList args, ConnectionContext* cntx);
|
||||||
};
|
};
|
||||||
|
|
||||||
|
|
||||||
|
// Given the command and the arguments determines the keys range (index).
|
||||||
|
KeyIndex DetermineKeys(const CommandId* cid, const CmdArgList& args);
|
||||||
|
|
||||||
} // namespace dfly
|
} // namespace dfly
|
||||||
|
|
|
@ -43,6 +43,12 @@ struct KeyLockArgs {
|
||||||
unsigned key_step;
|
unsigned key_step;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Describes key indices.
|
||||||
|
struct KeyIndex {
|
||||||
|
unsigned start;
|
||||||
|
unsigned end; // does not include this index (open limit).
|
||||||
|
unsigned step; // 1 for commands like mget. 2 for commands like mset.
|
||||||
|
};
|
||||||
|
|
||||||
struct ConnectionStats {
|
struct ConnectionStats {
|
||||||
uint32_t num_conns = 0;
|
uint32_t num_conns = 0;
|
||||||
|
|
|
@ -4,6 +4,8 @@
|
||||||
|
|
||||||
#pragma once
|
#pragma once
|
||||||
|
|
||||||
|
#include <absl/container/flat_hash_set.h>
|
||||||
|
|
||||||
#include "server/common_types.h"
|
#include "server/common_types.h"
|
||||||
#include "server/reply_builder.h"
|
#include "server/reply_builder.h"
|
||||||
|
|
||||||
|
@ -50,6 +52,8 @@ struct ConnectionState {
|
||||||
// Lua-script related data.
|
// Lua-script related data.
|
||||||
struct Script {
|
struct Script {
|
||||||
bool is_write = true;
|
bool is_write = true;
|
||||||
|
|
||||||
|
absl::flat_hash_set<std::string_view> keys;
|
||||||
};
|
};
|
||||||
std::optional<Script> script_info;
|
std::optional<Script> script_info;
|
||||||
};
|
};
|
||||||
|
|
|
@ -335,6 +335,9 @@ bool DbSlice::Acquire(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
|
void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
|
||||||
|
DCHECK(!lock_args.args.empty());
|
||||||
|
|
||||||
|
DVLOG(2) << "Release " << IntentLock::ModeName(mode) << " for " << lock_args.args[0];
|
||||||
if (lock_args.args.size() == 1) {
|
if (lock_args.args.size() == 1) {
|
||||||
Release(mode, lock_args.db_index, lock_args.args.front(), 1);
|
Release(mode, lock_args.db_index, lock_args.args.front(), 1);
|
||||||
} else {
|
} else {
|
||||||
|
@ -351,7 +354,7 @@ void DbSlice::Release(IntentLock::Mode mode, const KeyLockArgs& lock_args) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
DVLOG(1) << "Release " << IntentLock::ModeName(mode) << " for " << lock_args.args[0];
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -221,8 +221,22 @@ TEST_F(DflyEngineTest, Eval) {
|
||||||
resp = Run({"incrby", "foo", "42"});
|
resp = Run({"incrby", "foo", "42"});
|
||||||
EXPECT_THAT(resp[0], IntArg(42));
|
EXPECT_THAT(resp[0], IntArg(42));
|
||||||
|
|
||||||
// resp = Run({"eval", "return redis.call('get', 'foo')", "0"});
|
resp = Run({"eval", "return redis.call('get', 'foo')", "0"});
|
||||||
// EXPECT_THAT(resp[0], StrArg("42"));
|
EXPECT_THAT(resp[0], ErrArg("undeclared"));
|
||||||
|
|
||||||
|
resp = Run({"eval", "return redis.call('get', 'foo')", "1", "bar"});
|
||||||
|
EXPECT_THAT(resp[0], ErrArg("undeclared"));
|
||||||
|
|
||||||
|
ASSERT_FALSE(service_->IsLocked(0, "foo"));
|
||||||
|
|
||||||
|
resp = Run({"eval", "return redis.call('get', 'foo')", "1", "foo"});
|
||||||
|
EXPECT_THAT(resp[0], StrArg("42"));
|
||||||
|
|
||||||
|
resp = Run({"eval", "return redis.call('get', KEYS[1])", "1", "foo"});
|
||||||
|
EXPECT_THAT(resp[0], StrArg("42"));
|
||||||
|
|
||||||
|
ASSERT_FALSE(service_->IsLocked(0, "foo"));
|
||||||
|
ASSERT_FALSE(service_->IsShardSetLocked());
|
||||||
}
|
}
|
||||||
|
|
||||||
TEST_F(DflyEngineTest, EvalSha) {
|
TEST_F(DflyEngineTest, EvalSha) {
|
||||||
|
|
|
@ -430,7 +430,18 @@ void Service::DispatchCommand(CmdArgList args, ConnectionContext* cntx) {
|
||||||
// Create command transaction
|
// Create command transaction
|
||||||
intrusive_ptr<Transaction> dist_trans;
|
intrusive_ptr<Transaction> dist_trans;
|
||||||
|
|
||||||
if (!under_script) {
|
if (under_script) {
|
||||||
|
DCHECK(cntx->transaction);
|
||||||
|
KeyIndex key_index = DetermineKeys(cid, args);
|
||||||
|
for (unsigned i = key_index.start; i < key_index.end; ++i) {
|
||||||
|
string_view key = ArgS(args, i);
|
||||||
|
if (!cntx->conn_state.script_info->keys.contains(key)) {
|
||||||
|
return (*cntx)->SendError("script tried accessing undeclared key");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
cntx->transaction->SetExecCmd(cid);
|
||||||
|
cntx->transaction->InitByArgs(cntx->conn_state.db_index, args);
|
||||||
|
} else {
|
||||||
DCHECK(cntx->transaction == nullptr);
|
DCHECK(cntx->transaction == nullptr);
|
||||||
|
|
||||||
if (IsTransactional(cid)) {
|
if (IsTransactional(cid)) {
|
||||||
|
@ -545,6 +556,7 @@ void Service::Multi(CmdArgList args, ConnectionContext* cntx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
void Service::CallFromScript(CmdArgList args, ObjectExplorer* reply, ConnectionContext* cntx) {
|
void Service::CallFromScript(CmdArgList args, ObjectExplorer* reply, ConnectionContext* cntx) {
|
||||||
|
DCHECK(cntx->transaction);
|
||||||
InterpreterReplier replier(reply);
|
InterpreterReplier replier(reply);
|
||||||
ReplyBuilderInterface* orig = cntx->Inject(&replier);
|
ReplyBuilderInterface* orig = cntx->Inject(&replier);
|
||||||
|
|
||||||
|
@ -647,6 +659,13 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter,
|
||||||
// and checking whether all invocations consist of RO commands.
|
// and checking whether all invocations consist of RO commands.
|
||||||
// we can do it once during script insertion into script mgr.
|
// we can do it once during script insertion into script mgr.
|
||||||
cntx->conn_state.script_info.emplace();
|
cntx->conn_state.script_info.emplace();
|
||||||
|
for (size_t i = 0; i < eval_args.keys.size(); ++i) {
|
||||||
|
cntx->conn_state.script_info->keys.insert(ArgS(eval_args.keys, i));
|
||||||
|
}
|
||||||
|
DCHECK(cntx->transaction);
|
||||||
|
|
||||||
|
if (!eval_args.keys.empty())
|
||||||
|
cntx->transaction->Schedule();
|
||||||
|
|
||||||
auto lk = interpreter->Lock();
|
auto lk = interpreter->Lock();
|
||||||
|
|
||||||
|
@ -659,6 +678,10 @@ void Service::EvalInternal(const EvalArgs& eval_args, Interpreter* interpreter,
|
||||||
|
|
||||||
cntx->conn_state.script_info.reset(); // reset script_info
|
cntx->conn_state.script_info.reset(); // reset script_info
|
||||||
|
|
||||||
|
// Conclude the transaction.
|
||||||
|
if (!eval_args.keys.empty())
|
||||||
|
cntx->transaction->UnlockMulti();
|
||||||
|
|
||||||
if (result == Interpreter::RUN_ERR) {
|
if (result == Interpreter::RUN_ERR) {
|
||||||
string resp = absl::StrCat("Error running script (call to ", eval_args.sha, "): ", error);
|
string resp = absl::StrCat("Error running script (call to ", eval_args.sha, "): ", error);
|
||||||
return (*cntx)->SendError(resp);
|
return (*cntx)->SendError(resp);
|
||||||
|
|
|
@ -24,42 +24,6 @@ std::atomic_uint64_t op_seq{1};
|
||||||
|
|
||||||
[[maybe_unused]] constexpr size_t kTransSize = sizeof(Transaction);
|
[[maybe_unused]] constexpr size_t kTransSize = sizeof(Transaction);
|
||||||
|
|
||||||
struct KeyIndex {
|
|
||||||
unsigned start;
|
|
||||||
unsigned end; // not including
|
|
||||||
unsigned step;
|
|
||||||
};
|
|
||||||
|
|
||||||
KeyIndex DetermineKeys(const CommandId* cid, const CmdArgList& args) {
|
|
||||||
DCHECK_EQ(0u, cid->opt_mask() & CO::GLOBAL_TRANS);
|
|
||||||
|
|
||||||
KeyIndex key_index;
|
|
||||||
|
|
||||||
if (cid->first_key_pos() > 0) {
|
|
||||||
key_index.start = cid->first_key_pos();
|
|
||||||
int last = cid->last_key_pos();
|
|
||||||
key_index.end = last > 0 ? last + 1 : (int(args.size()) + 1 + last);
|
|
||||||
key_index.step = cid->key_arg_step();
|
|
||||||
|
|
||||||
return key_index;
|
|
||||||
}
|
|
||||||
|
|
||||||
string_view name{cid->name()};
|
|
||||||
if (name == "EVAL" || name == "EVALSHA") {
|
|
||||||
DCHECK_GE(args.size(), 3u);
|
|
||||||
uint32_t num_keys;
|
|
||||||
|
|
||||||
CHECK(absl::SimpleAtoi(ArgS(args, 2), &num_keys));
|
|
||||||
key_index.start = 3;
|
|
||||||
key_index.end = 3 + num_keys;
|
|
||||||
key_index.step = 1;
|
|
||||||
|
|
||||||
return key_index;
|
|
||||||
}
|
|
||||||
LOG(FATAL) << "Not supported";
|
|
||||||
|
|
||||||
return key_index;
|
|
||||||
}
|
|
||||||
|
|
||||||
} // namespace
|
} // namespace
|
||||||
|
|
||||||
|
@ -130,7 +94,7 @@ OpResult<Transaction::FindFirstResult> Transaction::FindFirstProcessor::Process(
|
||||||
}
|
}
|
||||||
|
|
||||||
IntentLock::Mode Transaction::Mode() const {
|
IntentLock::Mode Transaction::Mode() const {
|
||||||
return (trans_options_ & CO::READONLY) ? IntentLock::SHARED : IntentLock::EXCLUSIVE;
|
return (cid_->opt_mask() & CO::READONLY) ? IntentLock::SHARED : IntentLock::EXCLUSIVE;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -141,10 +105,15 @@ IntentLock::Mode Transaction::Mode() const {
|
||||||
* @param cs
|
* @param cs
|
||||||
*/
|
*/
|
||||||
Transaction::Transaction(const CommandId* cid, EngineShardSet* ess) : cid_(cid), ess_(ess) {
|
Transaction::Transaction(const CommandId* cid, EngineShardSet* ess) : cid_(cid), ess_(ess) {
|
||||||
if (strcmp(cid_->name(), "EXEC") == 0) {
|
string_view cmd_name(cid_->name());
|
||||||
|
if (cmd_name == "EXEC" || cmd_name == "EVAL" || cmd_name == "EVALSHA") {
|
||||||
multi_.reset(new Multi);
|
multi_.reset(new Multi);
|
||||||
|
multi_->multi_opts = cid->opt_mask();
|
||||||
|
|
||||||
|
if (cmd_name == "EVAL" || cmd_name == "EVALSHA") {
|
||||||
|
multi_->incremental = false; // we lock all the keys at once.
|
||||||
|
}
|
||||||
}
|
}
|
||||||
trans_options_ = cid_->opt_mask();
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Transaction::~Transaction() {
|
Transaction::~Transaction() {
|
||||||
|
@ -185,22 +154,19 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) {
|
||||||
|
|
||||||
KeyIndex key_index = DetermineKeys(cid_, args);
|
KeyIndex key_index = DetermineKeys(cid_, args);
|
||||||
|
|
||||||
if (key_index.start == args.size()) {
|
if (key_index.start == args.size()) { // eval with 0 keys.
|
||||||
CHECK(absl::StartsWith(cid_->name(), "EVAL"));
|
CHECK(absl::StartsWith(cid_->name(), "EVAL"));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
DCHECK_LT(key_index.start, args.size());
|
DCHECK_LT(key_index.start, args.size());
|
||||||
|
DCHECK_GT(key_index.start, 0u);
|
||||||
|
|
||||||
bool single_key = key_index.start > 0 && !cid_->is_multi_key();
|
bool incremental_locking = multi_ && multi_->incremental;
|
||||||
if (!multi_ && single_key) {
|
bool single_key = !multi_ && (key_index.start + key_index.step) >= key_index.end;
|
||||||
|
|
||||||
|
if (single_key) {
|
||||||
shard_data_.resize(1); // Single key optimization
|
shard_data_.resize(1); // Single key optimization
|
||||||
} else {
|
|
||||||
// Our shard_data is not sparse, so we must allocate for all threads :(
|
|
||||||
shard_data_.resize(ess_->size());
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!multi_ && single_key) { // Single key optimization.
|
|
||||||
auto key = ArgS(args, key_index.start);
|
auto key = ArgS(args, key_index.start);
|
||||||
args_.push_back(key);
|
args_.push_back(key);
|
||||||
|
|
||||||
|
@ -210,10 +176,12 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Our shard_data is not sparse, so we must allocate for all threads :(
|
||||||
|
shard_data_.resize(ess_->size());
|
||||||
CHECK(key_index.step == 1 || key_index.step == 2);
|
CHECK(key_index.step == 1 || key_index.step == 2);
|
||||||
DCHECK(key_index.step == 1 || (args.size() % 2) == 1);
|
DCHECK(key_index.step == 1 || (args.size() % 2) == 1);
|
||||||
|
|
||||||
// Reuse thread-local temporary storage. Since this code is non-preemptive we can use it here.
|
// Reuse thread-local temporary storage. Since this code is atomic we can use it here.
|
||||||
auto& shard_index = tmp_space.shard_cache;
|
auto& shard_index = tmp_space.shard_cache;
|
||||||
shard_index.resize(shard_data_.size());
|
shard_index.resize(shard_data_.size());
|
||||||
for (auto& v : shard_index) {
|
for (auto& v : shard_index) {
|
||||||
|
@ -223,10 +191,12 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) {
|
||||||
// TODO: to determine correctly locking mode for transactions, scripts
|
// TODO: to determine correctly locking mode for transactions, scripts
|
||||||
// and regular commands.
|
// and regular commands.
|
||||||
IntentLock::Mode mode = IntentLock::EXCLUSIVE;
|
IntentLock::Mode mode = IntentLock::EXCLUSIVE;
|
||||||
|
bool should_record_locks = false;
|
||||||
if (multi_) {
|
if (multi_) {
|
||||||
mode = Mode();
|
mode = Mode();
|
||||||
tmp_space.uniq_keys.clear();
|
tmp_space.uniq_keys.clear();
|
||||||
DCHECK_LT(int(mode), 2);
|
DCHECK_LT(int(mode), 2);
|
||||||
|
should_record_locks = incremental_locking || !multi_->locks_recorded;
|
||||||
}
|
}
|
||||||
|
|
||||||
for (unsigned i = key_index.start; i < key_index.end; ++i) {
|
for (unsigned i = key_index.start; i < key_index.end; ++i) {
|
||||||
|
@ -235,7 +205,7 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) {
|
||||||
shard_index[sid].args.push_back(key);
|
shard_index[sid].args.push_back(key);
|
||||||
shard_index[sid].original_index.push_back(i - 1);
|
shard_index[sid].original_index.push_back(i - 1);
|
||||||
|
|
||||||
if (multi_ && tmp_space.uniq_keys.insert(key).second) {
|
if (should_record_locks && tmp_space.uniq_keys.insert(key).second) {
|
||||||
multi_->locks[key].cnt[int(mode)]++;
|
multi_->locks[key].cnt[int(mode)]++;
|
||||||
};
|
};
|
||||||
|
|
||||||
|
@ -247,6 +217,10 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (multi_) {
|
||||||
|
multi_->locks_recorded = true;
|
||||||
|
}
|
||||||
|
|
||||||
args_.resize(key_index.end - key_index.start);
|
args_.resize(key_index.end - key_index.start);
|
||||||
reverse_index_.resize(args_.size());
|
reverse_index_.resize(args_.size());
|
||||||
|
|
||||||
|
@ -263,7 +237,14 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) {
|
||||||
|
|
||||||
sd.arg_count = si.args.size();
|
sd.arg_count = si.args.size();
|
||||||
sd.arg_start = next_arg - args_.begin();
|
sd.arg_start = next_arg - args_.begin();
|
||||||
sd.local_mask = 0;
|
|
||||||
|
// We reset the local_mask for incremental locking to allow locking of arguments
|
||||||
|
// for each operation within the same transaction. For instant locking we lock at
|
||||||
|
// the beginning all the keys so we must preserve the mask to avoid double locking.
|
||||||
|
if (incremental_locking) {
|
||||||
|
sd.local_mask = 0;
|
||||||
|
}
|
||||||
|
|
||||||
if (!sd.arg_count)
|
if (!sd.arg_count)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
|
@ -297,7 +278,8 @@ void Transaction::InitByArgs(DbIndex index, CmdArgList args) {
|
||||||
|
|
||||||
// Validation.
|
// Validation.
|
||||||
for (const auto& sd : shard_data_) {
|
for (const auto& sd : shard_data_) {
|
||||||
DCHECK_EQ(sd.local_mask, 0u);
|
// sd.local_mask may be non-zero for multi transactions with instant locking.
|
||||||
|
// Specifically EVALs may maintain state between calls.
|
||||||
DCHECK_EQ(0, sd.local_mask & ARMED);
|
DCHECK_EQ(0, sd.local_mask & ARMED);
|
||||||
if (!multi_) {
|
if (!multi_) {
|
||||||
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
|
DCHECK_EQ(TxQueue::kEnd, sd.pq_pos);
|
||||||
|
@ -309,13 +291,17 @@ void Transaction::SetExecCmd(const CommandId* cid) {
|
||||||
DCHECK(multi_);
|
DCHECK(multi_);
|
||||||
DCHECK(!cb_);
|
DCHECK(!cb_);
|
||||||
|
|
||||||
|
// The order is important, we are Schedule() for multi transaction before overriding cid_.
|
||||||
|
// TODO: The flow is ugly. I should introduce a proper interface for Multi transactions
|
||||||
|
// like SetupMulti/TurndownMulti. We already have UnlockMulti that should be part of
|
||||||
|
// TurndownMulti.
|
||||||
|
|
||||||
if (txid_ == 0) {
|
if (txid_ == 0) {
|
||||||
Schedule();
|
Schedule();
|
||||||
}
|
}
|
||||||
|
|
||||||
unique_shard_cnt_ = 0;
|
unique_shard_cnt_ = 0;
|
||||||
cid_ = cid;
|
cid_ = cid;
|
||||||
trans_options_ = cid->opt_mask();
|
|
||||||
cb_ = nullptr;
|
cb_ = nullptr;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -344,6 +330,7 @@ bool Transaction::RunInShard(EngineShard* shard) {
|
||||||
DCHECK_EQ(sd.local_mask & (SUSPENDED_Q | EXPIRED_Q), 0);
|
DCHECK_EQ(sd.local_mask & (SUSPENDED_Q | EXPIRED_Q), 0);
|
||||||
|
|
||||||
bool awaked_prerun = (sd.local_mask & AWAKED_Q) != 0;
|
bool awaked_prerun = (sd.local_mask & AWAKED_Q) != 0;
|
||||||
|
bool incremental_lock = multi_ && multi_->incremental;
|
||||||
|
|
||||||
// For multi we unlock transaction (i.e. its keys) in UnlockMulti() call.
|
// For multi we unlock transaction (i.e. its keys) in UnlockMulti() call.
|
||||||
// Therefore we differentiate between concluding, which says that this specific
|
// Therefore we differentiate between concluding, which says that this specific
|
||||||
|
@ -354,8 +341,8 @@ bool Transaction::RunInShard(EngineShard* shard) {
|
||||||
IntentLock::Mode mode = Mode();
|
IntentLock::Mode mode = Mode();
|
||||||
|
|
||||||
// We make sure that we lock exactly once for each (multi-hop) transaction inside
|
// We make sure that we lock exactly once for each (multi-hop) transaction inside
|
||||||
// multi-transactions.
|
// transactions that lock incrementally.
|
||||||
if (multi_ && ((sd.local_mask & KEYLOCK_ACQUIRED) == 0)) {
|
if (incremental_lock && ((sd.local_mask & KEYLOCK_ACQUIRED) == 0)) {
|
||||||
DCHECK(!awaked_prerun); // we should not have blocking transaction inside multi block.
|
DCHECK(!awaked_prerun); // we should not have blocking transaction inside multi block.
|
||||||
|
|
||||||
sd.local_mask |= KEYLOCK_ACQUIRED;
|
sd.local_mask |= KEYLOCK_ACQUIRED;
|
||||||
|
@ -458,6 +445,11 @@ void Transaction::ScheduleInternal() {
|
||||||
|
|
||||||
uint32_t num_shards;
|
uint32_t num_shards;
|
||||||
std::function<bool(uint32_t)> is_active;
|
std::function<bool(uint32_t)> is_active;
|
||||||
|
|
||||||
|
// TODO: For multi-transactions we should be able to deduce mode() at run-time based
|
||||||
|
// on the context. For regular multi-transactions we can actually inspect all commands.
|
||||||
|
// For eval-like transactions - we can decided based on the command flavor (EVAL/EVALRO) or
|
||||||
|
// auto-tune based on the static analysis (by identifying commands with hardcoded command names).
|
||||||
IntentLock::Mode mode = Mode();
|
IntentLock::Mode mode = Mode();
|
||||||
|
|
||||||
if (span_all) {
|
if (span_all) {
|
||||||
|
@ -534,6 +526,10 @@ OpStatus Transaction::ScheduleSingleHop(RunnableType cb) {
|
||||||
// single hop -> concluding.
|
// single hop -> concluding.
|
||||||
coordinator_state_ |= (COORD_EXEC | COORD_EXEC_CONCLUDING);
|
coordinator_state_ |= (COORD_EXEC | COORD_EXEC_CONCLUDING);
|
||||||
|
|
||||||
|
if (!multi_) { // for non-multi transactions we schedule exactly once.
|
||||||
|
DCHECK_EQ(0, coordinator_state_ & COORD_SCHED);
|
||||||
|
}
|
||||||
|
|
||||||
bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !multi_;
|
bool schedule_fast = (unique_shard_cnt_ == 1) && !IsGlobal() && !multi_;
|
||||||
if (schedule_fast) { // Single shard (local) optimization.
|
if (schedule_fast) { // Single shard (local) optimization.
|
||||||
// We never resize shard_data because that would affect MULTI transaction correctness.
|
// We never resize shard_data because that would affect MULTI transaction correctness.
|
||||||
|
@ -598,7 +594,10 @@ void Transaction::UnlockMulti() {
|
||||||
auto cb = [&] {
|
auto cb = [&] {
|
||||||
EngineShard* shard = EngineShard::tlocal();
|
EngineShard* shard = EngineShard::tlocal();
|
||||||
|
|
||||||
shard->shard_lock()->Release(IntentLock::EXCLUSIVE);
|
if (multi_->multi_opts & CO::GLOBAL_TRANS) {
|
||||||
|
shard->shard_lock()->Release(IntentLock::EXCLUSIVE);
|
||||||
|
}
|
||||||
|
|
||||||
ShardId sid = shard->shard_id();
|
ShardId sid = shard->shard_id();
|
||||||
for (const auto& k_v : sharded_keys[sid]) {
|
for (const auto& k_v : sharded_keys[sid]) {
|
||||||
auto release = [&](IntentLock::Mode mode) {
|
auto release = [&](IntentLock::Mode mode) {
|
||||||
|
@ -606,6 +605,7 @@ void Transaction::UnlockMulti() {
|
||||||
shard->db_slice().Release(mode, this->db_index_, k_v.first, k_v.second.cnt[mode]);
|
shard->db_slice().Release(mode, this->db_index_, k_v.first, k_v.second.cnt[mode]);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
release(IntentLock::SHARED);
|
release(IntentLock::SHARED);
|
||||||
release(IntentLock::EXCLUSIVE);
|
release(IntentLock::EXCLUSIVE);
|
||||||
}
|
}
|
||||||
|
@ -1102,7 +1102,7 @@ inline uint32_t Transaction::DecreaseRunCnt() {
|
||||||
}
|
}
|
||||||
|
|
||||||
bool Transaction::IsGlobal() const {
|
bool Transaction::IsGlobal() const {
|
||||||
return (trans_options_ & CO::GLOBAL_TRANS) != 0;
|
return (cid_->opt_mask() & CO::GLOBAL_TRANS) != 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
// Runs only in the shard thread.
|
// Runs only in the shard thread.
|
||||||
|
|
|
@ -127,8 +127,7 @@ class Transaction {
|
||||||
return txid_;
|
return txid_;
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO: for multi trans_options_ changes with every operation.
|
// based on cid_->opt_mask.
|
||||||
// Does it mean we lock every key differently during the same transaction?
|
|
||||||
IntentLock::Mode Mode() const;
|
IntentLock::Mode Mode() const;
|
||||||
|
|
||||||
const char* Name() const;
|
const char* Name() const;
|
||||||
|
@ -266,6 +265,10 @@ class Transaction {
|
||||||
|
|
||||||
struct Multi {
|
struct Multi {
|
||||||
absl::flat_hash_map<std::string_view, LockCnt> locks;
|
absl::flat_hash_map<std::string_view, LockCnt> locks;
|
||||||
|
uint32_t multi_opts = 0; // options of the parent transaction.
|
||||||
|
|
||||||
|
bool incremental = true;
|
||||||
|
bool locks_recorded = false;
|
||||||
};
|
};
|
||||||
|
|
||||||
util::fibers_ext::EventCount blocking_ec_; // used to wake blocking transactions.
|
util::fibers_ext::EventCount blocking_ec_; // used to wake blocking transactions.
|
||||||
|
@ -298,8 +301,6 @@ class Transaction {
|
||||||
// unique_shard_cnt_ and unique_shard_id_ is accessed only by coordinator thread.
|
// unique_shard_cnt_ and unique_shard_id_ is accessed only by coordinator thread.
|
||||||
uint32_t unique_shard_cnt_{0}; // number of unique shards span by args_
|
uint32_t unique_shard_cnt_{0}; // number of unique shards span by args_
|
||||||
|
|
||||||
uint32_t trans_options_ = 0;
|
|
||||||
|
|
||||||
ShardId unique_shard_id_{kInvalidSid};
|
ShardId unique_shard_id_{kInvalidSid};
|
||||||
DbIndex db_index_ = 0;
|
DbIndex db_index_ = 0;
|
||||||
|
|
||||||
|
@ -309,6 +310,7 @@ class Transaction {
|
||||||
enum CoordinatorState : uint8_t {
|
enum CoordinatorState : uint8_t {
|
||||||
COORD_SCHED = 1,
|
COORD_SCHED = 1,
|
||||||
COORD_EXEC = 2,
|
COORD_EXEC = 2,
|
||||||
|
|
||||||
// We are running the last execution step in multi-hop operation.
|
// We are running the last execution step in multi-hop operation.
|
||||||
COORD_EXEC_CONCLUDING = 4,
|
COORD_EXEC_CONCLUDING = 4,
|
||||||
COORD_BLOCKED = 8,
|
COORD_BLOCKED = 8,
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue