mirror of
https://github.com/dragonflydb/dragonfly.git
synced 2025-05-11 10:25:47 +02:00
fix: regression test failures (#2226)
Signed-off-by: Roman Gershman <roman@dragonflydb.io>
This commit is contained in:
parent
b853b2ab00
commit
0c5bb7b894
10 changed files with 18 additions and 18 deletions
2
helio
2
helio
|
@ -1 +1 @@
|
||||||
Subproject commit b786e8270fe2a02cdc259d3ea14f93cff0b8b72b
|
Subproject commit 1695b324146b1b1341fa46fa3e35c694cd04e14c
|
|
@ -287,7 +287,7 @@ void Listener::PostShutdown() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void Listener::OnConnectionStart(util::Connection* conn) {
|
void Listener::OnConnectionStart(util::Connection* conn) {
|
||||||
unsigned id = conn->socket()->proactor()->GetIndex();
|
unsigned id = conn->socket()->proactor()->GetPoolIndex();
|
||||||
DCHECK_LT(id, per_thread_.size());
|
DCHECK_LT(id, per_thread_.size());
|
||||||
|
|
||||||
facade::Connection* facade_conn = static_cast<facade::Connection*>(conn);
|
facade::Connection* facade_conn = static_cast<facade::Connection*>(conn);
|
||||||
|
@ -313,7 +313,7 @@ void Listener::OnConnectionStart(util::Connection* conn) {
|
||||||
|
|
||||||
void Listener::OnConnectionClose(util::Connection* conn) {
|
void Listener::OnConnectionClose(util::Connection* conn) {
|
||||||
// TODO: We do not account for connections migrated to other threads. This is a rare case.
|
// TODO: We do not account for connections migrated to other threads. This is a rare case.
|
||||||
unsigned id = conn->socket()->proactor()->GetIndex();
|
unsigned id = conn->socket()->proactor()->GetPoolIndex();
|
||||||
DCHECK_LT(id, per_thread_.size());
|
DCHECK_LT(id, per_thread_.size());
|
||||||
auto& pth = per_thread_[id];
|
auto& pth = per_thread_[id];
|
||||||
|
|
||||||
|
|
|
@ -141,7 +141,7 @@ vector<unsigned> ChangeSubscriptions(bool pattern, CmdArgList args, bool to_add,
|
||||||
auto& sinfo = *conn->conn_state.subscribe_info.get();
|
auto& sinfo = *conn->conn_state.subscribe_info.get();
|
||||||
auto& local_store = pattern ? sinfo.patterns : sinfo.channels;
|
auto& local_store = pattern ? sinfo.patterns : sinfo.channels;
|
||||||
|
|
||||||
int32_t tid = util::ProactorBase::GetIndex();
|
int32_t tid = util::ProactorBase::me()->GetPoolIndex();
|
||||||
DCHECK_GE(tid, 0);
|
DCHECK_GE(tid, 0);
|
||||||
|
|
||||||
ChannelStoreUpdater csu{pattern, to_add, conn, uint32_t(tid)};
|
ChannelStoreUpdater csu{pattern, to_add, conn, uint32_t(tid)};
|
||||||
|
|
|
@ -217,7 +217,7 @@ void DflyCmd::Thread(CmdArgList args, ConnectionContext* cntx) {
|
||||||
|
|
||||||
if (args.size() == 1) { // DFLY THREAD : returns connection thread index and number of threads.
|
if (args.size() == 1) { // DFLY THREAD : returns connection thread index and number of threads.
|
||||||
rb->StartArray(2);
|
rb->StartArray(2);
|
||||||
rb->SendLong(ProactorBase::GetIndex());
|
rb->SendLong(ProactorBase::me()->GetPoolIndex());
|
||||||
rb->SendLong(long(pool->size()));
|
rb->SendLong(long(pool->size()));
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
@ -230,7 +230,7 @@ void DflyCmd::Thread(CmdArgList args, ConnectionContext* cntx) {
|
||||||
}
|
}
|
||||||
|
|
||||||
if (num_thread < pool->size()) {
|
if (num_thread < pool->size()) {
|
||||||
if (int(num_thread) != ProactorBase::GetIndex()) {
|
if (int(num_thread) != ProactorBase::me()->GetPoolIndex()) {
|
||||||
cntx->conn()->Migrate(pool->at(num_thread));
|
cntx->conn()->Migrate(pool->at(num_thread));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -317,8 +317,8 @@ EngineShard::EngineShard(util::ProactorBase* pb, mi_heap_t* heap)
|
||||||
: queue_(kQueueLen),
|
: queue_(kQueueLen),
|
||||||
txq_([](const Transaction* t) { return t->txid(); }),
|
txq_([](const Transaction* t) { return t->txid(); }),
|
||||||
mi_resource_(heap),
|
mi_resource_(heap),
|
||||||
db_slice_(pb->GetIndex(), GetFlag(FLAGS_cache_mode), this) {
|
db_slice_(pb->GetPoolIndex(), GetFlag(FLAGS_cache_mode), this) {
|
||||||
fiber_q_ = MakeFiber([this, index = pb->GetIndex()] {
|
fiber_q_ = MakeFiber([this, index = pb->GetPoolIndex()] {
|
||||||
ThisFiber::SetName(absl::StrCat("shard_queue", index));
|
ThisFiber::SetName(absl::StrCat("shard_queue", index));
|
||||||
queue_.Run();
|
queue_.Run();
|
||||||
});
|
});
|
||||||
|
@ -355,14 +355,14 @@ void EngineShard::StartPeriodicFiber(util::ProactorBase* pb) {
|
||||||
if (clock_cycle_ms == 0)
|
if (clock_cycle_ms == 0)
|
||||||
clock_cycle_ms = 1;
|
clock_cycle_ms = 1;
|
||||||
|
|
||||||
fiber_periodic_ = MakeFiber([this, index = pb->GetIndex(), period_ms = clock_cycle_ms] {
|
fiber_periodic_ = MakeFiber([this, index = pb->GetPoolIndex(), period_ms = clock_cycle_ms] {
|
||||||
ThisFiber::SetName(absl::StrCat("shard_periodic", index));
|
ThisFiber::SetName(absl::StrCat("shard_periodic", index));
|
||||||
RunPeriodic(std::chrono::milliseconds(period_ms));
|
RunPeriodic(std::chrono::milliseconds(period_ms));
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
|
void EngineShard::InitThreadLocal(ProactorBase* pb, bool update_db_time) {
|
||||||
CHECK(shard_ == nullptr) << pb->GetIndex();
|
CHECK(shard_ == nullptr) << pb->GetPoolIndex();
|
||||||
|
|
||||||
mi_heap_t* data_heap = ServerState::tlocal()->data_heap();
|
mi_heap_t* data_heap = ServerState::tlocal()->data_heap();
|
||||||
void* ptr = mi_heap_malloc_aligned(data_heap, sizeof(EngineShard), alignof(EngineShard));
|
void* ptr = mi_heap_malloc_aligned(data_heap, sizeof(EngineShard), alignof(EngineShard));
|
||||||
|
|
|
@ -29,7 +29,7 @@ Journal::Journal() {
|
||||||
}
|
}
|
||||||
|
|
||||||
void Journal::StartInThread() {
|
void Journal::StartInThread() {
|
||||||
journal_slice.Init(unsigned(ProactorBase::GetIndex()));
|
journal_slice.Init(unsigned(ProactorBase::me()->GetPoolIndex()));
|
||||||
|
|
||||||
ServerState::tlocal()->set_journal(this);
|
ServerState::tlocal()->set_journal(this);
|
||||||
EngineShard* shard = EngineShard::tlocal();
|
EngineShard* shard = EngineShard::tlocal();
|
||||||
|
|
|
@ -251,8 +251,8 @@ void SendMonitor(const std::string& msg) {
|
||||||
const auto& monitor_repo = ServerState::tlocal()->Monitors();
|
const auto& monitor_repo = ServerState::tlocal()->Monitors();
|
||||||
const auto& monitors = monitor_repo.monitors();
|
const auto& monitors = monitor_repo.monitors();
|
||||||
if (!monitors.empty()) {
|
if (!monitors.empty()) {
|
||||||
VLOG(1) << "thread " << util::ProactorBase::GetIndex() << " sending monitor message '" << msg
|
VLOG(1) << "thread " << ProactorBase::me()->GetPoolIndex() << " sending monitor message '"
|
||||||
<< "' for " << monitors.size();
|
<< msg << "' for " << monitors.size();
|
||||||
|
|
||||||
for (auto monitor_conn : monitors) {
|
for (auto monitor_conn : monitors) {
|
||||||
// never preempts, so we can iterate safely.
|
// never preempts, so we can iterate safely.
|
||||||
|
@ -1815,8 +1815,8 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
|
||||||
});
|
});
|
||||||
|
|
||||||
if (*sid != ServerState::tlocal()->thread_index()) {
|
if (*sid != ServerState::tlocal()->thread_index()) {
|
||||||
VLOG(1) << "Migrating connection " << cntx->conn() << " from " << ProactorBase::GetIndex()
|
VLOG(1) << "Migrating connection " << cntx->conn() << " from "
|
||||||
<< " to " << *sid;
|
<< ProactorBase::me()->GetPoolIndex() << " to " << *sid;
|
||||||
cntx->conn()->RequestAsyncMigration(shard_set->pool()->at(*sid));
|
cntx->conn()->RequestAsyncMigration(shard_set->pool()->at(*sid));
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
|
|
@ -37,7 +37,7 @@ ServerState::Stats& ServerState::Stats::operator+=(const ServerState::Stats& oth
|
||||||
void MonitorsRepo::Add(facade::Connection* connection) {
|
void MonitorsRepo::Add(facade::Connection* connection) {
|
||||||
VLOG(1) << "register connection "
|
VLOG(1) << "register connection "
|
||||||
<< " at address 0x" << std::hex << (const void*)connection << " for thread "
|
<< " at address 0x" << std::hex << (const void*)connection << " for thread "
|
||||||
<< util::ProactorBase::GetIndex();
|
<< util::ProactorBase::me()->GetPoolIndex();
|
||||||
|
|
||||||
monitors_.push_back(connection);
|
monitors_.push_back(connection);
|
||||||
}
|
}
|
||||||
|
|
|
@ -176,7 +176,7 @@ void SliceSnapshot::Join() {
|
||||||
// Serializes all the entries with version less than snapshot_version_.
|
// Serializes all the entries with version less than snapshot_version_.
|
||||||
void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {
|
void SliceSnapshot::IterateBucketsFb(const Cancellation* cll) {
|
||||||
{
|
{
|
||||||
auto fiber_name = absl::StrCat("SliceSnapshot-", ProactorBase::GetIndex());
|
auto fiber_name = absl::StrCat("SliceSnapshot-", ProactorBase::me()->GetPoolIndex());
|
||||||
ThisFiber::SetName(std::move(fiber_name));
|
ThisFiber::SetName(std::move(fiber_name));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -539,7 +539,7 @@ bool BaseFamilyTest::IsLocked(DbIndex db_index, std::string_view key) const {
|
||||||
}
|
}
|
||||||
|
|
||||||
string BaseFamilyTest::GetId() const {
|
string BaseFamilyTest::GetId() const {
|
||||||
int32 id = ProactorBase::GetIndex();
|
int32 id = ProactorBase::me()->GetPoolIndex();
|
||||||
CHECK_GE(id, 0);
|
CHECK_GE(id, 0);
|
||||||
return absl::StrCat("IO", id);
|
return absl::StrCat("IO", id);
|
||||||
}
|
}
|
||||||
|
|
Loading…
Add table
Add a link
Reference in a new issue