refactor: handling context in goroutines

This commit is contained in:
Jacky 2025-05-04 13:36:54 +00:00
parent ecb2ad786b
commit 1a2758ac5b
No known key found for this signature in database
GPG key ID: 215C21B10DF38B4D
15 changed files with 161 additions and 140 deletions

View file

@ -1,6 +1,7 @@
package analytic
import (
"context"
"time"
"github.com/uozi-tech/cosy/logger"
@ -48,12 +49,18 @@ func init() {
}
}
func RecordServerAnalytic() {
func RecordServerAnalytic(ctx context.Context) {
logger.Info("RecordServerAnalytic Started")
for {
now := time.Now()
recordCpu(now) // this func will spend more than 1 second.
recordNetwork(now)
recordDiskIO(now)
select {
case <-ctx.Done():
logger.Info("RecordServerAnalytic Stopped")
return
case <-time.After(1 * time.Second):
now := time.Now()
recordCpu(now) // this func will spend more than 1 second.
recordNetwork(now)
recordDiskIO(now)
}
}
}

View file

@ -12,33 +12,90 @@ import (
"github.com/uozi-tech/cosy/logger"
)
var (
ctx, cancel = context.WithCancel(context.Background())
wg sync.WaitGroup
restartMu sync.Mutex // Add mutex to prevent concurrent restarts
)
// NodeRecordManager manages the node status retrieval process
type NodeRecordManager struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.Mutex
}
func RestartRetrieveNodesStatus() {
restartMu.Lock() // Acquire lock before modifying shared resources
defer restartMu.Unlock()
// NewNodeRecordManager creates a new NodeRecordManager with the provided context
func NewNodeRecordManager(parentCtx context.Context) *NodeRecordManager {
ctx, cancel := context.WithCancel(parentCtx)
return &NodeRecordManager{
ctx: ctx,
cancel: cancel,
}
}
// Cancel previous context to stop all operations
cancel()
// Start begins retrieving node status using the manager's context
func (m *NodeRecordManager) Start() {
m.mu.Lock()
defer m.mu.Unlock()
// Wait for previous goroutines to finish
wg.Wait()
// Create new context for this run
ctx, cancel = context.WithCancel(context.Background())
wg.Add(1)
m.wg.Add(1)
go func() {
defer wg.Done()
RetrieveNodesStatus()
defer m.wg.Done()
RetrieveNodesStatus(m.ctx)
}()
}
func RetrieveNodesStatus() {
// Stop cancels the current context and waits for operations to complete
func (m *NodeRecordManager) Stop() {
m.mu.Lock()
defer m.mu.Unlock()
m.cancel()
m.wg.Wait()
}
// Restart stops and then restarts the node status retrieval
func (m *NodeRecordManager) Restart() {
m.Stop()
// Create new context
m.ctx, m.cancel = context.WithCancel(context.Background())
// Start retrieval with new context
m.Start()
}
// For backward compatibility
var (
defaultManager *NodeRecordManager
setupOnce sync.Once
restartMu sync.Mutex
)
// InitDefaultManager initializes the default NodeRecordManager
func InitDefaultManager() {
setupOnce.Do(func() {
defaultManager = NewNodeRecordManager(context.Background())
})
}
// RestartRetrieveNodesStatus restarts the node status retrieval process
// Kept for backward compatibility
func RestartRetrieveNodesStatus() {
restartMu.Lock()
defer restartMu.Unlock()
if defaultManager == nil {
InitDefaultManager()
}
defaultManager.Restart()
}
// StartRetrieveNodesStatus starts the node status retrieval with a custom context
func StartRetrieveNodesStatus(ctx context.Context) *NodeRecordManager {
manager := NewNodeRecordManager(ctx)
manager.Start()
return manager
}
func RetrieveNodesStatus(ctx context.Context) {
logger.Info("RetrieveNodesStatus start")
defer logger.Info("RetrieveNodesStatus exited")
@ -87,8 +144,6 @@ func RetrieveNodesStatus() {
}
}(env)
}
<-ctx.Done()
}
func nodeAnalyticRecord(env *model.Environment, ctx context.Context) error {

View file

@ -1,6 +1,8 @@
package cert
import (
"context"
"github.com/0xJacky/Nginx-UI/model"
"github.com/0xJacky/Nginx-UI/query"
"github.com/0xJacky/Nginx-UI/settings"
@ -10,7 +12,7 @@ import (
)
// InitRegister init the default user for acme
func InitRegister() {
func InitRegister(ctx context.Context) {
email := settings.CertSettings.Email
if settings.CertSettings.Email == "" {
return

View file

@ -1,16 +1,19 @@
package cluster
import (
"context"
"net/url"
"strings"
"github.com/0xJacky/Nginx-UI/model"
"github.com/0xJacky/Nginx-UI/query"
"github.com/0xJacky/Nginx-UI/settings"
"github.com/uozi-tech/cosy/logger"
"gorm.io/gen/field"
"net/url"
"strings"
)
func RegisterPredefinedNodes() {
func RegisterPredefinedNodes(ctx context.Context) {
if len(settings.ClusterSettings.Node) == 0 {
return
}

View file

@ -1,6 +1,8 @@
package cron
import (
"context"
"github.com/go-co-op/gocron/v2"
"github.com/uozi-tech/cosy/logger"
)
@ -17,7 +19,7 @@ func init() {
}
// InitCronJobs initializes and starts all cron jobs
func InitCronJobs() {
func InitCronJobs(ctx context.Context) {
// Initialize auto cert job
_, err := setupAutoCertJob(s)
if err != nil {
@ -41,6 +43,9 @@ func InitCronJobs() {
// Start the scheduler
s.Start()
<-ctx.Done()
s.Shutdown()
}
// RestartLogrotate is a public API to restart the logrotate job

View file

@ -1,6 +1,7 @@
package kernel
import (
"context"
"crypto/rand"
"encoding/hex"
"mime"
@ -28,12 +29,11 @@ import (
cSettings "github.com/uozi-tech/cosy/settings"
)
func Boot() {
func Boot(ctx context.Context) {
defer recovery()
async := []func(){
InitJsExtensionType,
InitDatabase,
InitNodeSecret,
InitCryptoSecret,
validation.Init,
@ -41,8 +41,9 @@ func Boot() {
CheckAndCleanupOTAContainers,
}
syncs := []func(){
syncs := []func(ctx context.Context){
analytic.RecordServerAnalytic,
InitDatabase,
}
for _, v := range async {
@ -50,12 +51,12 @@ func Boot() {
}
for _, v := range syncs {
go v()
go v(ctx)
}
}
func InitAfterDatabase() {
syncs := []func(){
func InitAfterDatabase(ctx context.Context) {
syncs := []func(ctx context.Context){
registerPredefinedUser,
cert.InitRegister,
cron.InitCronJobs,
@ -67,7 +68,7 @@ func InitAfterDatabase() {
}
for _, v := range syncs {
go v()
go v(ctx)
}
}
@ -79,20 +80,28 @@ func recovery() {
}
}
func InitDatabase() {
var installChan = make(chan struct{})
func PostInstall() {
installChan <- struct{}{}
}
func InitDatabase(ctx context.Context) {
cModel.ResolvedModels()
// Skip install
if settings.NodeSettings.SkipInstallation {
skipInstall()
}
if cSettings.AppSettings.JwtSecret != "" {
db := cosy.InitDB(sqlite.Open(path.Dir(cSettings.ConfPath), settings.DatabaseSettings))
model.Use(db)
query.Init(db)
InitAfterDatabase()
if cSettings.AppSettings.JwtSecret == "" {
<-installChan
}
db := cosy.InitDB(sqlite.Open(path.Dir(cSettings.ConfPath), settings.DatabaseSettings))
model.Use(db)
query.Init(db)
InitAfterDatabase(ctx)
}
func InitNodeSecret() {

View file

@ -1,11 +1,13 @@
package kernel
import (
"context"
"github.com/0xJacky/Nginx-UI/query"
"github.com/uozi-tech/cosy/logger"
)
func RegisterAcmeUser() {
func RegisterAcmeUser(ctx context.Context) {
a := query.AcmeUser
users, _ := a.Where(a.RegisterOnStartup.Is(true)).Find()
for _, user := range users {

View file

@ -1,12 +1,14 @@
package kernel
import (
"context"
"errors"
"github.com/0xJacky/Nginx-UI/model"
"github.com/0xJacky/Nginx-UI/query"
"github.com/0xJacky/Nginx-UI/settings"
"github.com/caarlos0/env/v11"
"github.com/google/uuid"
"errors"
"github.com/uozi-tech/cosy/logger"
cSettings "github.com/uozi-tech/cosy/settings"
"golang.org/x/crypto/bcrypt"
@ -36,7 +38,7 @@ func skipInstall() {
}
}
func registerPredefinedUser() {
func registerPredefinedUser(ctx context.Context) {
// when skip installation mode is enabled, the predefined user will be created
if !settings.NodeSettings.SkipInstallation {
return

View file

@ -40,9 +40,10 @@ type Tool struct {
}
var (
tools = []Tool{}
toolMutex sync.Mutex
tools = []Tool{}
toolMutex sync.Mutex
)
func AddTool(tool mcp.Tool, handler func(ctx context.Context, request mcp.CallToolRequest) (*mcp.CallToolResult, error)) {
toolMutex.Lock()
defer toolMutex.Unlock()
@ -53,7 +54,7 @@ func ServeHTTP(c *gin.Context) {
sseServer.ServeHTTP(c.Writer, c.Request)
}
func Init() {
func Init(ctx context.Context) {
for _, tool := range tools {
mcpServer.AddTool(tool.Tool, tool.Handler)
}

View file

@ -1,6 +1,8 @@
package passkey
import (
"context"
"github.com/0xJacky/Nginx-UI/settings"
"github.com/go-webauthn/webauthn/protocol"
"github.com/go-webauthn/webauthn/webauthn"
@ -9,7 +11,7 @@ import (
var instance *webauthn.WebAuthn
func Init() {
func Init(ctx context.Context) {
options := settings.WebAuthnSettings
if !Enabled() {