mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-10 18:05:44 +02:00
758 lines
21 KiB
C++
758 lines
21 KiB
C++
// Copyright 2022, DragonflyDB authors. All rights reserved.
|
|
// See LICENSE for licensing terms.
|
|
//
|
|
|
|
#include "server/test_utils.h"
|
|
|
|
#include "server/acl/acl_commands_def.h"
|
|
#include "server/acl/acl_family.h"
|
|
#include "util/fibers/fibers.h"
|
|
|
|
extern "C" {
|
|
#include "redis/zmalloc.h"
|
|
}
|
|
|
|
#include <absl/flags/reflection.h>
|
|
#include <absl/strings/match.h>
|
|
#include <absl/strings/str_split.h>
|
|
#include <mimalloc.h>
|
|
|
|
#include "base/flags.h"
|
|
#include "base/logging.h"
|
|
#include "base/stl_util.h"
|
|
#include "facade/dragonfly_connection.h"
|
|
#include "io/file_util.h"
|
|
#include "server/acl/acl_log.h"
|
|
#include "util/fibers/pool.h"
|
|
|
|
using namespace std;
|
|
|
|
ABSL_DECLARE_FLAG(string, dbfilename);
|
|
ABSL_DECLARE_FLAG(double, rss_oom_deny_ratio);
|
|
ABSL_DECLARE_FLAG(uint32_t, num_shards);
|
|
ABSL_FLAG(bool, force_epoll, false, "If true, uses epoll api instead iouring to run tests");
|
|
ABSL_DECLARE_FLAG(uint32_t, acllog_max_len);
|
|
namespace dfly {
|
|
|
|
namespace {
|
|
|
|
// Default stack size for fibers. We decrease it by 16 bytes because some allocators
|
|
// need additional 8-16 bytes for their internal structures, thus over reserving additional
|
|
// memory pages if using round sizes.
|
|
#ifdef NDEBUG
|
|
constexpr size_t kFiberDefaultStackSize = 32_KB - 16;
|
|
#elif defined SANITIZERS
|
|
// Increase stack size for sanitizers builds.
|
|
constexpr size_t kFiberDefaultStackSize = 64_KB - 16;
|
|
#else
|
|
// Increase stack size for debug builds.
|
|
constexpr size_t kFiberDefaultStackSize = 50_KB - 16;
|
|
#endif
|
|
|
|
} // namespace
|
|
|
|
std::ostream& operator<<(std::ostream& os, const DbStats& stats) {
|
|
os << "keycount: " << stats.key_count << ", tiered_size: " << stats.tiered_used_bytes
|
|
<< ", tiered_entries: " << stats.tiered_entries << "\n";
|
|
|
|
return os;
|
|
}
|
|
|
|
extern unsigned kInitSegmentLog;
|
|
|
|
using MP = MemcacheParser;
|
|
using namespace util;
|
|
using namespace testing;
|
|
|
|
static vector<string> SplitLines(const std::string& src) {
|
|
vector<string> res = absl::StrSplit(src, "\r\n");
|
|
if (res.back().empty())
|
|
res.pop_back();
|
|
for (auto& v : res) {
|
|
absl::StripAsciiWhitespace(&v);
|
|
}
|
|
return res;
|
|
}
|
|
|
|
TestConnection::TestConnection(Protocol protocol)
|
|
: facade::Connection(protocol, nullptr, nullptr, nullptr) {
|
|
cc_.reset(new dfly::ConnectionContext(this, {}));
|
|
cc_->skip_acl_validation = true;
|
|
SetSocket(ProactorBase::me()->CreateSocket());
|
|
OnConnectionStart();
|
|
}
|
|
|
|
void TestConnection::SendPubMessageAsync(PubMessage pmsg) {
|
|
messages.push_back(std::move(pmsg));
|
|
}
|
|
|
|
void TestConnection::SendInvalidationMessageAsync(InvalidationMessage msg) {
|
|
invalidate_messages.push_back(std::move(msg));
|
|
}
|
|
|
|
std::string TestConnection::RemoteEndpointStr() const {
|
|
return "";
|
|
}
|
|
|
|
void TransactionSuspension::Start() {
|
|
static CommandId cid{"TEST", CO::WRITE | CO::GLOBAL_TRANS, -1, 0, 0, acl::NONE};
|
|
|
|
transaction_ = new dfly::Transaction{&cid};
|
|
|
|
auto st = transaction_->InitByArgs(&namespaces->GetDefaultNamespace(), 0, {});
|
|
CHECK_EQ(st, OpStatus::OK);
|
|
|
|
transaction_->Execute([](Transaction* t, EngineShard* shard) { return OpStatus::OK; }, false);
|
|
}
|
|
|
|
void TransactionSuspension::Terminate() {
|
|
transaction_->Conclude();
|
|
transaction_ = nullptr;
|
|
}
|
|
|
|
class BaseFamilyTest::TestConnWrapper {
|
|
public:
|
|
TestConnWrapper(Protocol proto);
|
|
~TestConnWrapper();
|
|
|
|
CmdArgVec Args(ArgSlice list);
|
|
|
|
RespVec ParseResponse(bool fully_consumed);
|
|
|
|
// returns: type(pmessage), pattern, channel, message.
|
|
const facade::Connection::PubMessage& GetPubMessage(size_t index) const;
|
|
|
|
const facade::Connection::InvalidationMessage& GetInvalidationMessage(size_t index) const;
|
|
|
|
ConnectionContext* cmd_cntx() {
|
|
auto cntx = static_cast<ConnectionContext*>(dummy_conn_->cntx());
|
|
cntx->ns = &namespaces->GetDefaultNamespace();
|
|
return cntx;
|
|
}
|
|
|
|
StringVec SplitLines() const {
|
|
return dfly::SplitLines(sink_.str());
|
|
}
|
|
|
|
void ClearSink() {
|
|
sink_.Clear();
|
|
}
|
|
|
|
TestConnection* conn() {
|
|
return dummy_conn_.get();
|
|
}
|
|
|
|
SinkReplyBuilder* builder() {
|
|
return builder_.get();
|
|
}
|
|
|
|
private:
|
|
::io::StringSink sink_; // holds the response blob
|
|
|
|
std::unique_ptr<TestConnection> dummy_conn_;
|
|
|
|
std::vector<std::unique_ptr<std::string>> tmp_str_vec_;
|
|
|
|
std::unique_ptr<RedisParser> parser_;
|
|
std::unique_ptr<SinkReplyBuilder> builder_;
|
|
};
|
|
|
|
BaseFamilyTest::TestConnWrapper::TestConnWrapper(Protocol proto)
|
|
: dummy_conn_(new TestConnection(proto)) {
|
|
switch (proto) {
|
|
case Protocol::REDIS:
|
|
builder_.reset(new RedisReplyBuilder{&sink_});
|
|
break;
|
|
case Protocol::MEMCACHE:
|
|
builder_.reset(new MCReplyBuilder{&sink_});
|
|
break;
|
|
}
|
|
}
|
|
|
|
BaseFamilyTest::TestConnWrapper::~TestConnWrapper() {
|
|
}
|
|
|
|
BaseFamilyTest::BaseFamilyTest() {
|
|
}
|
|
|
|
BaseFamilyTest::~BaseFamilyTest() {
|
|
for (auto* v : resp_vec_)
|
|
delete v;
|
|
}
|
|
|
|
void BaseFamilyTest::SetUpTestSuite() {
|
|
kInitSegmentLog = 1;
|
|
|
|
absl::SetFlag(&FLAGS_rss_oom_deny_ratio, -1);
|
|
absl::SetFlag(&FLAGS_dbfilename, "");
|
|
|
|
static bool init = true;
|
|
if (exchange(init, false)) {
|
|
fb2::SetDefaultStackResource(&fb2::std_malloc_resource, kFiberDefaultStackSize);
|
|
}
|
|
|
|
init_zmalloc_threadlocal(mi_heap_get_backing());
|
|
|
|
// TODO: go over all env variables starting with FLAGS_ and make sure they are in the below list.
|
|
static constexpr const char* kEnvFlags[] = {
|
|
"cluster_mode",
|
|
"lock_on_hashtags",
|
|
"force_epoll",
|
|
};
|
|
for (string_view flag : kEnvFlags) {
|
|
const char* value = getenv(absl::StrCat("FLAGS_", flag).data());
|
|
if (value != nullptr) {
|
|
SetTestFlag(flag, value);
|
|
}
|
|
}
|
|
}
|
|
|
|
void BaseFamilyTest::SetUp() {
|
|
max_memory_limit = INT_MAX;
|
|
ResetService();
|
|
}
|
|
|
|
void BaseFamilyTest::TearDown() {
|
|
CHECK_EQ(NumLocked(), 0U);
|
|
|
|
ShutdownService();
|
|
|
|
const TestInfo* const test_info = UnitTest::GetInstance()->current_test_info();
|
|
LOG(INFO) << "Finishing " << test_info->name();
|
|
}
|
|
|
|
void BaseFamilyTest::ResetService() {
|
|
if (service_ != nullptr) {
|
|
TEST_InvalidateLockTagOptions();
|
|
|
|
ShutdownService();
|
|
}
|
|
|
|
#ifdef __linux__
|
|
if (absl::GetFlag(FLAGS_force_epoll)) {
|
|
pp_.reset(fb2::Pool::Epoll(num_threads_));
|
|
} else {
|
|
pp_.reset(fb2::Pool::IOUring(16, num_threads_));
|
|
}
|
|
#else
|
|
pp_.reset(fb2::Pool::Epoll(num_threads_));
|
|
#endif
|
|
|
|
// Using a different default than production could expose bugs
|
|
if (absl::GetFlag(FLAGS_num_shards) == 0) {
|
|
absl::SetFlag(&FLAGS_num_shards, num_threads_ - 1);
|
|
}
|
|
pp_->Run();
|
|
service_ = std::make_unique<Service>(pp_.get());
|
|
|
|
service_->Init(nullptr, {});
|
|
used_mem_current = 0;
|
|
|
|
TEST_current_time_ms = absl::GetCurrentTimeNanos() / 1000000;
|
|
auto default_ns = &namespaces->GetDefaultNamespace();
|
|
auto cb = [&](EngineShard* s) {
|
|
default_ns->GetDbSlice(s->shard_id()).UpdateExpireBase(TEST_current_time_ms - 1000, 0);
|
|
};
|
|
shard_set->RunBriefInParallel(cb);
|
|
|
|
const TestInfo* const test_info = UnitTest::GetInstance()->current_test_info();
|
|
LOG(INFO) << "Starting " << test_info->name();
|
|
|
|
watchdog_fiber_ = pp_->GetNextProactor()->LaunchFiber([this] {
|
|
ThisFiber::SetName("Watchdog");
|
|
|
|
if (!watchdog_done_.WaitFor(20s)) {
|
|
LOG(ERROR) << "Deadlock detected!!!!";
|
|
absl::SetFlag(&FLAGS_alsologtostderr, true);
|
|
fb2::Mutex m;
|
|
shard_set->pool()->AwaitFiberOnAll([&m, this](unsigned index, ProactorBase* base) {
|
|
ThisFiber::SetName("Watchdog");
|
|
std::unique_lock lk(m);
|
|
LOG(ERROR) << "Proactor " << index << ":\n";
|
|
fb2::detail::FiberInterface::PrintAllFiberStackTraces();
|
|
EngineShard* es = EngineShard::tlocal();
|
|
|
|
if (es != nullptr) {
|
|
TxQueue* txq = es->txq();
|
|
if (!txq->Empty()) {
|
|
LOG(ERROR) << "TxQueue for shard " << es->shard_id();
|
|
|
|
auto head = txq->Head();
|
|
auto it = head;
|
|
do {
|
|
Transaction* trans = std::get<Transaction*>(es->txq()->At(it));
|
|
LOG(ERROR) << "Transaction " << trans->DebugId(es->shard_id());
|
|
it = txq->Next(it);
|
|
} while (it != head);
|
|
}
|
|
|
|
LOG(ERROR) << "TxLocks for shard " << es->shard_id();
|
|
for (const auto& k_v : namespaces->GetDefaultNamespace()
|
|
.GetDbSlice(es->shard_id())
|
|
.GetDBTable(0)
|
|
->trans_locks) {
|
|
LOG(ERROR) << "Key " << k_v.first << " " << k_v.second;
|
|
}
|
|
|
|
LOG(ERROR) << "Transaction for shard " << es->shard_id();
|
|
for (auto& conn : connections_) {
|
|
auto* context = conn.second->cmd_cntx();
|
|
if (context->transaction && context->transaction->IsActive(es->shard_id())) {
|
|
LOG(ERROR) << context->transaction->DebugId(es->shard_id());
|
|
}
|
|
}
|
|
}
|
|
});
|
|
}
|
|
});
|
|
}
|
|
|
|
void BaseFamilyTest::ShutdownService() {
|
|
if (service_ == nullptr) {
|
|
return;
|
|
}
|
|
|
|
// Don't save files during shutdown
|
|
CleanupSnapshots();
|
|
absl::SetFlag(&FLAGS_dbfilename, "");
|
|
|
|
service_->Shutdown();
|
|
service_.reset();
|
|
|
|
delete shard_set;
|
|
shard_set = nullptr;
|
|
|
|
watchdog_done_.Notify();
|
|
watchdog_fiber_.Join();
|
|
|
|
pp_->Stop();
|
|
}
|
|
|
|
void BaseFamilyTest::InitWithDbFilename() {
|
|
ShutdownService();
|
|
|
|
absl::SetFlag(&FLAGS_dbfilename, "rdbtestdump");
|
|
CleanupSnapshots();
|
|
ResetService();
|
|
}
|
|
|
|
void BaseFamilyTest::CleanupSnapshots() {
|
|
string dbfilename = absl::GetFlag(FLAGS_dbfilename);
|
|
if (dbfilename.empty())
|
|
return;
|
|
|
|
auto rdb_files = io::StatFiles(absl::StrCat(dbfilename, "*"));
|
|
CHECK(rdb_files);
|
|
for (const auto& fl : *rdb_files) {
|
|
unlink(fl.name.c_str());
|
|
}
|
|
}
|
|
|
|
unsigned BaseFamilyTest::NumLocked() {
|
|
atomic_uint count = 0;
|
|
auto default_ns = &namespaces->GetDefaultNamespace();
|
|
shard_set->RunBriefInParallel([&](EngineShard* shard) {
|
|
for (const auto& db : default_ns->GetDbSlice(shard->shard_id()).databases()) {
|
|
if (db == nullptr) {
|
|
continue;
|
|
}
|
|
count += db->trans_locks.Size();
|
|
}
|
|
});
|
|
return count;
|
|
}
|
|
|
|
void BaseFamilyTest::ClearMetrics() {
|
|
shard_set->pool()->AwaitBrief([](unsigned, auto*) {
|
|
ServerState::tlocal()->stats = ServerState::Stats(shard_set->size());
|
|
});
|
|
}
|
|
|
|
void BaseFamilyTest::WaitUntilLocked(DbIndex db_index, string_view key, double timeout) {
|
|
auto step = 50us;
|
|
auto timeout_micro = chrono::duration_cast<chrono::microseconds>(1000ms * timeout);
|
|
int64_t steps = timeout_micro.count() / step.count();
|
|
do {
|
|
ThisFiber::SleepFor(step);
|
|
} while (!IsLocked(db_index, key) && --steps > 0);
|
|
CHECK(IsLocked(db_index, key));
|
|
}
|
|
|
|
bool BaseFamilyTest::WaitUntilCondition(std::function<bool()> condition_cb,
|
|
std::chrono::milliseconds timeout_ms) {
|
|
auto step = 50us;
|
|
auto timeout_micro = chrono::duration_cast<chrono::microseconds>(timeout_ms);
|
|
int64_t steps = timeout_micro.count() / step.count();
|
|
do {
|
|
ThisFiber::SleepFor(step);
|
|
} while (!condition_cb() && --steps > 0);
|
|
return condition_cb();
|
|
}
|
|
|
|
RespExpr BaseFamilyTest::Run(ArgSlice list) {
|
|
if (!ProactorBase::IsProactorThread()) {
|
|
return pp_->at(0)->Await([&] {
|
|
ThisFiber::SetName("Test::Run");
|
|
return this->Run(list);
|
|
});
|
|
}
|
|
|
|
return Run(GetId(), list);
|
|
}
|
|
|
|
RespExpr BaseFamilyTest::RunPrivileged(std::initializer_list<const std::string_view> list) {
|
|
if (!ProactorBase::IsProactorThread()) {
|
|
return pp_->at(0)->Await([&] { return this->RunPrivileged(list); });
|
|
}
|
|
string id = GetId();
|
|
TestConnWrapper* conn_wrapper = AddFindConn(Protocol::REDIS, id);
|
|
// Before running the command set the connection as admin connection
|
|
conn_wrapper->conn()->SetPrivileged(true);
|
|
auto res = Run(id, ArgSlice{list.begin(), list.size()});
|
|
// After running the command set the connection as non admin connection
|
|
// because the connction is returned to the poll. This way the next call to Run from the same
|
|
// thread will not have the connection set as admin.
|
|
conn_wrapper->conn()->SetPrivileged(false);
|
|
return res;
|
|
}
|
|
|
|
RespExpr BaseFamilyTest::Run(absl::Span<std::string> span) {
|
|
vector<string_view> sv_vec(span.size());
|
|
for (unsigned i = 0; i < span.size(); ++i) {
|
|
sv_vec[i] = span[i];
|
|
}
|
|
return Run(sv_vec);
|
|
}
|
|
|
|
RespExpr BaseFamilyTest::Run(std::string_view id, ArgSlice slice) {
|
|
if (!ProactorBase::IsProactorThread()) {
|
|
return pp_->at(0)->Await([&] { return this->Run(id, slice); });
|
|
}
|
|
|
|
TestConnWrapper* conn_wrapper = AddFindConn(Protocol::REDIS, id);
|
|
|
|
CmdArgVec args = conn_wrapper->Args(slice);
|
|
|
|
auto* context = conn_wrapper->cmd_cntx();
|
|
context->ns = &namespaces->GetDefaultNamespace();
|
|
|
|
DCHECK(context->transaction == nullptr) << id;
|
|
|
|
service_->DispatchCommand(CmdArgList{args}, conn_wrapper->builder(), context);
|
|
|
|
DCHECK(context->transaction == nullptr);
|
|
|
|
auto cmd = absl::AsciiStrToUpper(slice.front());
|
|
if (cmd == "EVAL" || cmd == "EVALSHA" || cmd == "EVAL_RO" || cmd == "EVALSHA_RO" ||
|
|
cmd == "EXEC") {
|
|
shard_set->AwaitRunningOnShardQueue([](auto*) {}); // Wait for async UnlockMulti.
|
|
}
|
|
|
|
unique_lock lk(mu_);
|
|
last_cmd_dbg_info_ = context->last_command_debug;
|
|
|
|
RespVec vec = conn_wrapper->ParseResponse(single_response_);
|
|
if (vec.size() == 1)
|
|
return vec.front();
|
|
RespVec* new_vec = new RespVec(vec);
|
|
resp_vec_.push_back(new_vec);
|
|
RespExpr e;
|
|
e.type = RespExpr::ARRAY;
|
|
e.u = new_vec;
|
|
|
|
return e;
|
|
}
|
|
|
|
auto BaseFamilyTest::RunMC(MP::CmdType cmd_type, string_view key, string_view value, uint32_t flags,
|
|
chrono::seconds ttl) -> MCResponse {
|
|
if (!ProactorBase::IsProactorThread()) {
|
|
return pp_->at(0)->Await([&] { return this->RunMC(cmd_type, key, value, flags, ttl); });
|
|
}
|
|
|
|
MP::Command cmd;
|
|
cmd.type = cmd_type;
|
|
cmd.key = key;
|
|
cmd.flags = flags;
|
|
cmd.bytes_len = value.size();
|
|
cmd.expire_ts = ttl.count();
|
|
|
|
TestConnWrapper* conn = AddFindConn(Protocol::MEMCACHE, GetId());
|
|
|
|
auto* context = conn->cmd_cntx();
|
|
|
|
DCHECK(context->transaction == nullptr);
|
|
|
|
service_->DispatchMC(cmd, value, static_cast<MCReplyBuilder*>(conn->builder()), context);
|
|
|
|
DCHECK(context->transaction == nullptr);
|
|
|
|
return conn->SplitLines();
|
|
}
|
|
|
|
auto BaseFamilyTest::RunMC(MP::CmdType cmd_type, std::string_view key) -> MCResponse {
|
|
if (!ProactorBase::IsProactorThread()) {
|
|
return pp_->at(0)->Await([&] { return this->RunMC(cmd_type, key); });
|
|
}
|
|
|
|
return RunMC(cmd_type, key, string_view{}, 0, chrono::seconds{});
|
|
}
|
|
|
|
auto BaseFamilyTest::GetMC(MP::CmdType cmd_type, std::initializer_list<std::string_view> list)
|
|
-> MCResponse {
|
|
CHECK_GT(list.size(), 0u);
|
|
CHECK(base::_in(cmd_type, {MP::GET, MP::GAT, MP::GETS, MP::GATS}));
|
|
|
|
if (!ProactorBase::IsProactorThread()) {
|
|
return pp_->at(0)->Await([&] { return this->GetMC(cmd_type, list); });
|
|
}
|
|
|
|
MP::Command cmd;
|
|
cmd.type = cmd_type;
|
|
auto src = list.begin();
|
|
cmd.key = *src++;
|
|
for (; src != list.end(); ++src) {
|
|
cmd.keys_ext.push_back(*src);
|
|
}
|
|
|
|
TestConnWrapper* conn = AddFindConn(Protocol::MEMCACHE, GetId());
|
|
|
|
auto* context = conn->cmd_cntx();
|
|
service_->DispatchMC(cmd, string_view{}, static_cast<MCReplyBuilder*>(conn->builder()), context);
|
|
|
|
return conn->SplitLines();
|
|
}
|
|
|
|
int64_t BaseFamilyTest::CheckedInt(ArgSlice list) {
|
|
RespExpr resp = Run(list);
|
|
if (resp.type == RespExpr::INT64) {
|
|
return get<int64_t>(resp.u);
|
|
}
|
|
if (resp.type == RespExpr::NIL) {
|
|
return INT64_MIN;
|
|
}
|
|
|
|
CHECK_EQ(RespExpr::STRING, int(resp.type)) << list;
|
|
string_view sv = ToSV(resp.GetBuf());
|
|
int64_t res;
|
|
CHECK(absl::SimpleAtoi(sv, &res)) << "|" << sv << "|";
|
|
return res;
|
|
}
|
|
|
|
string BaseFamilyTest::CheckedString(ArgSlice list) {
|
|
RespExpr resp = Run(list);
|
|
CHECK_EQ(RespExpr::STRING, int(resp.type)) << list;
|
|
return string{ToSV(resp.GetBuf())};
|
|
}
|
|
|
|
CmdArgVec BaseFamilyTest::TestConnWrapper::Args(ArgSlice list) {
|
|
CHECK_NE(0u, list.size());
|
|
|
|
CmdArgVec res;
|
|
string* str = new string;
|
|
|
|
// I compact all the arguments together on purpose.
|
|
// This way I check that arguments handling works well without c-string endings.
|
|
for (auto v : list) {
|
|
str->append(v);
|
|
}
|
|
tmp_str_vec_.emplace_back(str);
|
|
size_t offset = 0;
|
|
for (auto v : list) {
|
|
if (v.empty()) {
|
|
res.push_back(MutableSlice{});
|
|
} else {
|
|
res.emplace_back(str->data() + offset, v.size());
|
|
offset += v.size();
|
|
}
|
|
}
|
|
|
|
return res;
|
|
}
|
|
|
|
RespVec BaseFamilyTest::TestConnWrapper::ParseResponse(bool fully_consumed) {
|
|
tmp_str_vec_.emplace_back(new string{sink_.str()});
|
|
auto& s = *tmp_str_vec_.back();
|
|
auto buf = RespExpr::buffer(&s);
|
|
|
|
auto s_copy = s;
|
|
|
|
uint32_t consumed = 0;
|
|
parser_.reset(new RedisParser{UINT32_MAX, false}); // Client mode.
|
|
RespVec res;
|
|
RedisParser::Result st = parser_->Parse(buf, &consumed, &res);
|
|
|
|
CHECK_EQ(RedisParser::OK, st) << " response: \"" << s_copy << "\" (" << s_copy.size()
|
|
<< " chars)";
|
|
if (fully_consumed) {
|
|
DCHECK_EQ(consumed, s.size()) << s;
|
|
}
|
|
return res;
|
|
}
|
|
|
|
const facade::Connection::PubMessage& BaseFamilyTest::TestConnWrapper::GetPubMessage(
|
|
size_t index) const {
|
|
CHECK_LT(index, dummy_conn_->messages.size());
|
|
return dummy_conn_->messages[index];
|
|
}
|
|
|
|
const facade::Connection::InvalidationMessage&
|
|
BaseFamilyTest::TestConnWrapper::GetInvalidationMessage(size_t index) const {
|
|
CHECK_LT(index, dummy_conn_->invalidate_messages.size());
|
|
return dummy_conn_->invalidate_messages[index];
|
|
}
|
|
|
|
bool BaseFamilyTest::IsLocked(DbIndex db_index, std::string_view key) const {
|
|
return service_->IsLocked(&namespaces->GetDefaultNamespace(), db_index, key);
|
|
}
|
|
|
|
string BaseFamilyTest::GetId() const {
|
|
int32 id = ProactorBase::me()->GetPoolIndex();
|
|
CHECK_GE(id, 0);
|
|
return absl::StrCat("IO", id);
|
|
}
|
|
|
|
size_t BaseFamilyTest::SubscriberMessagesLen(string_view conn_id) const {
|
|
auto it = connections_.find(conn_id);
|
|
if (it == connections_.end())
|
|
return 0;
|
|
|
|
return it->second->conn()->messages.size();
|
|
}
|
|
|
|
size_t BaseFamilyTest::InvalidationMessagesLen(string_view conn_id) const {
|
|
auto it = connections_.find(conn_id);
|
|
if (it == connections_.end())
|
|
return 0;
|
|
|
|
return it->second->conn()->invalidate_messages.size();
|
|
}
|
|
|
|
const facade::Connection::PubMessage& BaseFamilyTest::GetPublishedMessage(string_view conn_id,
|
|
size_t index) const {
|
|
auto it = connections_.find(conn_id);
|
|
CHECK(it != connections_.end());
|
|
|
|
return it->second->GetPubMessage(index);
|
|
}
|
|
|
|
const facade::Connection::InvalidationMessage& BaseFamilyTest::GetInvalidationMessage(
|
|
string_view conn_id, size_t index) const {
|
|
auto it = connections_.find(conn_id);
|
|
CHECK(it != connections_.end());
|
|
return it->second->GetInvalidationMessage(index);
|
|
}
|
|
|
|
ConnectionContext::DebugInfo BaseFamilyTest::GetDebugInfo(const std::string& id) const {
|
|
auto it = connections_.find(id);
|
|
CHECK(it != connections_.end());
|
|
|
|
return it->second->cmd_cntx()->last_command_debug;
|
|
}
|
|
|
|
auto BaseFamilyTest::AddFindConn(Protocol proto, std::string_view id) -> TestConnWrapper* {
|
|
DCHECK(ProactorBase::IsProactorThread());
|
|
|
|
unique_lock lk(mu_);
|
|
|
|
auto [it, inserted] = connections_.emplace(id, nullptr);
|
|
|
|
if (inserted) {
|
|
it->second.reset(new TestConnWrapper(proto));
|
|
} else {
|
|
it->second->ClearSink();
|
|
}
|
|
return it->second.get();
|
|
}
|
|
|
|
vector<string> BaseFamilyTest::StrArray(const RespExpr& expr) {
|
|
CHECK(expr.type == RespExpr::ARRAY || expr.type == RespExpr::NIL_ARRAY);
|
|
if (expr.type == RespExpr::NIL_ARRAY)
|
|
return vector<string>{};
|
|
|
|
const RespVec* src = get<RespVec*>(expr.u);
|
|
vector<string> res(src->size());
|
|
for (size_t i = 0; i < src->size(); ++i) {
|
|
res[i] = ToSV(src->at(i).GetBuf());
|
|
}
|
|
|
|
return res;
|
|
}
|
|
|
|
vector<LockFp> BaseFamilyTest::GetLastFps() {
|
|
fb2::Mutex mu;
|
|
vector<LockFp> result;
|
|
|
|
auto add_keys = [&](ProactorBase* proactor) {
|
|
EngineShard* shard = EngineShard::tlocal();
|
|
if (shard == nullptr) {
|
|
return;
|
|
}
|
|
|
|
lock_guard lk(mu);
|
|
for (auto fp :
|
|
namespaces->GetDefaultNamespace().GetDbSlice(shard->shard_id()).TEST_GetLastLockedFps()) {
|
|
result.push_back(fp);
|
|
}
|
|
};
|
|
shard_set->pool()->AwaitFiberOnAll(add_keys);
|
|
|
|
return result;
|
|
}
|
|
|
|
void BaseFamilyTest::ExpectConditionWithinTimeout(const std::function<bool()>& condition,
|
|
absl::Duration timeout) {
|
|
absl::Time deadline = absl::Now() + timeout;
|
|
|
|
while (deadline > absl::Now()) {
|
|
if (condition()) {
|
|
break;
|
|
}
|
|
ThisFiber::SleepFor(5ms);
|
|
}
|
|
|
|
EXPECT_LE(absl::Now(), deadline)
|
|
<< "Timeout of " << timeout << " reached when expecting condition";
|
|
}
|
|
|
|
fb2::Fiber BaseFamilyTest::ExpectConditionWithSuspension(const std::function<bool()>& condition) {
|
|
TransactionSuspension tx;
|
|
pp_->at(0)->Await([&] { tx.Start(); });
|
|
|
|
auto fb =
|
|
pp_->at(0)->LaunchFiber(fb2::Launch::dispatch, [condition, tx = std::move(tx)]() mutable {
|
|
ExpectConditionWithinTimeout(condition);
|
|
tx.Terminate();
|
|
});
|
|
return fb;
|
|
}
|
|
|
|
util::fb2::Fiber BaseFamilyTest::ExpectUsedKeys(const std::vector<std::string_view>& keys) {
|
|
vector<LockFp> key_fps;
|
|
for (const auto& k : keys) {
|
|
key_fps.push_back(LockTag(k).Fingerprint());
|
|
}
|
|
sort(key_fps.begin(), key_fps.end());
|
|
auto cb = [=] {
|
|
auto last_fps = GetLastFps();
|
|
sort(last_fps.begin(), last_fps.end());
|
|
return last_fps == key_fps;
|
|
};
|
|
|
|
return ExpectConditionWithSuspension(std::move(cb));
|
|
}
|
|
|
|
void BaseFamilyTest::SetTestFlag(string_view flag_name, string_view new_value) {
|
|
auto* flag = absl::FindCommandLineFlag(flag_name);
|
|
CHECK_NE(flag, nullptr);
|
|
VLOG(1) << "Changing flag " << flag_name << " from " << flag->CurrentValue() << " to "
|
|
<< new_value;
|
|
string error;
|
|
CHECK(flag->ParseFrom(new_value, &error)) << "Error: " << error;
|
|
}
|
|
|
|
const acl::AclFamily* BaseFamilyTest::TestInitAclFam() {
|
|
absl::SetFlag(&FLAGS_acllog_max_len, 0);
|
|
return service_->TestInit();
|
|
}
|
|
|
|
} // namespace dfly
|