diff --git a/src/facade/dragonfly_connection.cc b/src/facade/dragonfly_connection.cc index af73de75d..3e753e905 100644 --- a/src/facade/dragonfly_connection.cc +++ b/src/facade/dragonfly_connection.cc @@ -518,6 +518,16 @@ void Connection::OnPostMigrateThread() { socket_->RegisterOnErrorCb([this](int32_t mask) { this->OnBreakCb(mask); }); break_cb_engaged_ = true; } + + // Update tl variables + queue_backpressure_ = &tl_queue_backpressure_; + + stats_ = &tl_facade_stats->conn_stats; + ++stats_->num_conns; + stats_->read_buf_capacity += io_buf_.Capacity(); + if (cc_->replica_conn) { + ++stats_->num_replicas; + } } auto Connection::RegisterShutdownHook(ShutdownCb cb) -> ShutdownHandle { @@ -992,25 +1002,6 @@ void Connection::HandleMigrateRequest() { DecreaseStatsOnClose(); this->Migrate(dest); - - auto update_tl_vars = [this] [[gnu::noinline]] { - // The compiler barrier that does not allow reordering memory accesses - // to before this function starts. See https://stackoverflow.com/a/75622732 - asm volatile(""); - - queue_backpressure_ = &tl_queue_backpressure_; - - stats_ = &tl_facade_stats->conn_stats; - ++stats_->num_conns; - stats_->read_buf_capacity += io_buf_.Capacity(); - if (cc_->replica_conn) { - ++stats_->num_replicas; - } - }; - - // We're now running in `dest` thread. We use non-inline lambda to force reading new thread's - // thread local vars. - update_tl_vars(); } DCHECK(dispatch_q_.empty()); diff --git a/src/server/main_service.cc b/src/server/main_service.cc index c20c116cc..0c85e19bf 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1204,13 +1204,10 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo return true; // return false only for internal error aborts } - ServerState& etl = *ServerState::tlocal(); - // We are not sending any admin command in the monitor, and we do not want to // do any processing if we don't have any waiting connections with monitor // enabled on them - see https://redis.io/commands/monitor/ - const MonitorsRepo& monitors = etl.Monitors(); - if (!monitors.Empty() && (cid->opt_mask() & CO::ADMIN) == 0) { + if (!ServerState::tlocal()->Monitors().Empty() && (cid->opt_mask() & CO::ADMIN) == 0) { DispatchMonitor(cntx, cid, tail_args); } @@ -1229,8 +1226,9 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo // TODO: we should probably discard more commands here, // not just the blocking ones const auto* conn = cntx->conn(); - if (!(cid->opt_mask() & CO::BLOCKING) && conn != nullptr && etl.GetSlowLog().IsEnabled() && - invoke_time_usec >= etl.log_slower_than_usec) { + if (!(cid->opt_mask() & CO::BLOCKING) && conn != nullptr && + // Use SafeTLocal() to avoid accessing the wrong thread local instance + ServerState::SafeTLocal()->ShouldLogSlowCmd(invoke_time_usec)) { vector aux_params; CmdArgVec aux_slices; @@ -1240,8 +1238,9 @@ bool Service::InvokeCmd(const CommandId* cid, CmdArgList tail_args, ConnectionCo aux_slices.emplace_back(aux_params.back()); tail_args = absl::MakeSpan(aux_slices); } - etl.GetSlowLog().Add(cid->name(), tail_args, conn->GetName(), conn->RemoteEndpointStr(), - invoke_time_usec, absl::GetCurrentTimeNanos() / 1000); + ServerState::SafeTLocal()->GetSlowLog().Add(cid->name(), tail_args, conn->GetName(), + conn->RemoteEndpointStr(), invoke_time_usec, + absl::GetCurrentTimeNanos() / 1000); } if (cntx->transaction && !cntx->conn_state.exec_info.IsRunning() && @@ -2065,14 +2064,13 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { SinkReplyBuilder::ReplyAggregator agg(rb); rb->StartArray(exec_info.body.size()); - ServerState* ss = ServerState::tlocal(); if (state != ExecEvalState::NONE) exec_info.preborrowed_interpreter = ServerState::tlocal()->BorrowInterpreter(); if (!exec_info.body.empty()) { if (GetFlag(FLAGS_track_exec_frequencies)) { string descr = CreateExecDescriptor(exec_info.body, cntx->transaction->GetUniqueShardCnt()); - ss->exec_freq_count[descr]++; + ServerState::tlocal()->exec_freq_count[descr]++; } if (absl::GetFlag(FLAGS_multi_exec_squash) && state == ExecEvalState::NONE) { @@ -2105,7 +2103,8 @@ void Service::Exec(CmdArgList args, ConnectionContext* cntx) { } if (exec_info.preborrowed_interpreter) { - ServerState::tlocal()->ReturnInterpreter(exec_info.preborrowed_interpreter); + // Use SafeTLocal() to avoid accessing the wrong thread local instance + ServerState::SafeTLocal()->ReturnInterpreter(exec_info.preborrowed_interpreter); exec_info.preborrowed_interpreter = nullptr; } diff --git a/src/server/multi_command_squasher.cc b/src/server/multi_command_squasher.cc index 4b9c92b8b..c52ed1e16 100644 --- a/src/server/multi_command_squasher.cc +++ b/src/server/multi_command_squasher.cc @@ -202,8 +202,7 @@ bool MultiCommandSquasher::ExecuteSquashed() { sd.replies.reserve(sd.cmds.size()); Transaction* tx = cntx_->transaction; - ServerState* ss = ServerState::tlocal(); - ss->stats.multi_squash_executions++; + ServerState::tlocal()->stats.multi_squash_executions++; ProactorBase* proactor = ProactorBase::me(); uint64_t start = proactor->GetMonotonicTimeNs(); @@ -236,8 +235,8 @@ bool MultiCommandSquasher::ExecuteSquashed() { break; } uint64_t after_reply = proactor->GetMonotonicTimeNs(); - ss->stats.multi_squash_exec_hop_usec += (after_hop - start) / 1000; - ss->stats.multi_squash_exec_reply_usec += (after_reply - after_hop) / 1000; + ServerState::SafeTLocal()->stats.multi_squash_exec_hop_usec += (after_hop - start) / 1000; + ServerState::SafeTLocal()->stats.multi_squash_exec_reply_usec += (after_reply - after_hop) / 1000; for (auto& sinfo : sharded_) sinfo.cmds.clear(); diff --git a/src/server/server_state.cc b/src/server/server_state.cc index 5b92ce911..b12ca7aa7 100644 --- a/src/server/server_state.cc +++ b/src/server/server_state.cc @@ -181,4 +181,13 @@ void ServerState::ReturnInterpreter(Interpreter* ir) { interpreter_mgr_.Return(ir); } +ServerState* ServerState::SafeTLocal() { + // https://stackoverflow.com/a/75622732 + asm volatile(""); + return state_; +} + +bool ServerState::ShouldLogSlowCmd(unsigned latency_usec) const { + return slow_log_shard_.IsEnabled() && latency_usec >= log_slower_than_usec; +} } // end of namespace dfly diff --git a/src/server/server_state.h b/src/server/server_state.h index fcd0756b5..4b6c52f70 100644 --- a/src/server/server_state.h +++ b/src/server/server_state.h @@ -125,10 +125,18 @@ class ServerState { // public struct - to allow initialization. Stats& operator=(const Stats&) = delete; }; + // Unsafe version. + // Do not use after fiber migration because it can cause a data race. static ServerState* tlocal() { return state_; } + // Safe version. + // Calls to tlocal() before and after a fiber migrates to a different thread may both + // return the thread local of the thread that run the fiber before the migration. Use this + // function to avoid this and access the correct thread local after the migration. + static ServerState* __attribute__((noinline)) SafeTLocal(); + static facade::ConnectionStats* tl_connection_stats() { return &facade::tl_facade_stats->conn_stats; } @@ -230,6 +238,8 @@ class ServerState { // public struct - to allow initialization. channel_store_ = replacement; } + bool ShouldLogSlowCmd(unsigned latency_usec) const; + Stats stats; bool is_master = true;