nginx-ui/internal/analytic/node_record.go

205 lines
3.9 KiB
Go

package analytic
import (
"context"
"net/http"
"sync"
"time"
"github.com/0xJacky/Nginx-UI/model"
"github.com/0xJacky/Nginx-UI/query"
"github.com/gorilla/websocket"
"github.com/uozi-tech/cosy/logger"
)
// NodeRecordManager manages the node status retrieval process
type NodeRecordManager struct {
ctx context.Context
cancel context.CancelFunc
wg sync.WaitGroup
mu sync.Mutex
}
// NewNodeRecordManager creates a new NodeRecordManager with the provided context
func NewNodeRecordManager(parentCtx context.Context) *NodeRecordManager {
ctx, cancel := context.WithCancel(parentCtx)
return &NodeRecordManager{
ctx: ctx,
cancel: cancel,
}
}
// Start begins retrieving node status using the manager's context
func (m *NodeRecordManager) Start() {
m.mu.Lock()
defer m.mu.Unlock()
m.wg.Add(1)
go func() {
defer m.wg.Done()
RetrieveNodesStatus(m.ctx)
}()
}
// Stop cancels the current context and waits for operations to complete
func (m *NodeRecordManager) Stop() {
m.mu.Lock()
defer m.mu.Unlock()
m.cancel()
m.wg.Wait()
}
// Restart stops and then restarts the node status retrieval
func (m *NodeRecordManager) Restart() {
m.Stop()
// Create new context
m.ctx, m.cancel = context.WithCancel(context.Background())
// Start retrieval with new context
m.Start()
}
// For backward compatibility
var (
defaultManager *NodeRecordManager
setupOnce sync.Once
restartMu sync.Mutex
)
// InitDefaultManager initializes the default NodeRecordManager
func InitDefaultManager() {
setupOnce.Do(func() {
defaultManager = NewNodeRecordManager(context.Background())
})
}
// RestartRetrieveNodesStatus restarts the node status retrieval process
// Kept for backward compatibility
func RestartRetrieveNodesStatus() {
restartMu.Lock()
defer restartMu.Unlock()
if defaultManager == nil {
InitDefaultManager()
}
defaultManager.Restart()
}
// StartRetrieveNodesStatus starts the node status retrieval with a custom context
func StartRetrieveNodesStatus(ctx context.Context) *NodeRecordManager {
manager := NewNodeRecordManager(ctx)
manager.Start()
return manager
}
func RetrieveNodesStatus(ctx context.Context) {
logger.Info("RetrieveNodesStatus start")
defer logger.Info("RetrieveNodesStatus exited")
mutex.Lock()
if NodeMap == nil {
NodeMap = make(TNodeMap)
}
mutex.Unlock()
env := query.Environment
envs, err := env.Where(env.Enabled.Is(true)).Find()
if err != nil {
logger.Error(err)
return
}
var wg sync.WaitGroup
defer wg.Wait()
for _, env := range envs {
wg.Add(1)
go func(e *model.Environment) {
defer wg.Done()
retryTicker := time.NewTicker(5 * time.Second)
defer retryTicker.Stop()
for {
select {
case <-ctx.Done():
return
default:
if err := nodeAnalyticRecord(e, ctx); err != nil {
logger.Error(err)
mutex.Lock()
if NodeMap[env.ID] != nil {
NodeMap[env.ID].Status = false
}
mutex.Unlock()
select {
case <-retryTicker.C:
case <-ctx.Done():
return
}
}
}
}
}(env)
}
}
func nodeAnalyticRecord(env *model.Environment, ctx context.Context) error {
scopeCtx, cancel := context.WithCancel(ctx)
defer cancel()
node, err := InitNode(env)
mutex.Lock()
NodeMap[env.ID] = node
mutex.Unlock()
if err != nil {
return err
}
u, err := env.GetWebSocketURL("/api/analytic/intro")
if err != nil {
return err
}
header := http.Header{}
header.Set("X-Node-Secret", env.Token)
dial := &websocket.Dialer{
Proxy: http.ProxyFromEnvironment,
HandshakeTimeout: 5 * time.Second,
}
c, _, err := dial.Dial(u, header)
if err != nil {
return err
}
defer c.Close()
go func() {
<-scopeCtx.Done()
_ = c.Close()
}()
var nodeStat NodeStat
for {
err = c.ReadJSON(&nodeStat)
if err != nil {
return err
}
// set online
nodeStat.Status = true
nodeStat.ResponseAt = time.Now()
mutex.Lock()
NodeMap[env.ID].NodeStat = nodeStat
mutex.Unlock()
}
}