refactor: nodes analytics (#847)

* refactor: nodes analytics

* feat(debug): add pprof in debug mode

* refactor: websocket error handler
This commit is contained in:
Jacky 2025-02-05 17:25:29 +08:00
parent b1ba719cb1
commit cb4977e5ab
No known key found for this signature in database
GPG key ID: 215C21B10DF38B4D
17 changed files with 276 additions and 155 deletions

View file

@ -2,6 +2,7 @@ package analytic
import (
"encoding/json"
"errors"
"github.com/0xJacky/Nginx-UI/internal/transport"
"github.com/0xJacky/Nginx-UI/internal/upgrader"
"github.com/0xJacky/Nginx-UI/model"
@ -69,15 +70,14 @@ func GetNode(env *model.Environment) (n *Node) {
return n
}
func InitNode(env *model.Environment) (n *Node) {
func InitNode(env *model.Environment) (n *Node, err error) {
n = &Node{
Environment: env,
}
u, err := url.JoinPath(env.URL, "/api/node")
if err != nil {
logger.Error(err)
return
return
}
t, err := transport.NewTransport()
@ -90,7 +90,6 @@ func InitNode(env *model.Environment) (n *Node) {
req, err := http.NewRequest("GET", u, nil)
if err != nil {
logger.Error(err)
return
}
@ -98,7 +97,6 @@ func InitNode(env *model.Environment) (n *Node) {
resp, err := client.Do(req)
if err != nil {
logger.Error(err)
return
}
@ -106,13 +104,11 @@ func InitNode(env *model.Environment) (n *Node) {
bytes, _ := io.ReadAll(resp.Body)
if resp.StatusCode != http.StatusOK {
logger.Error(string(bytes))
return
return n, errors.New(string(bytes))
}
err = json.Unmarshal(bytes, &n.NodeInfo)
if err != nil {
logger.Error(err)
return
}

View file

@ -2,81 +2,112 @@ package analytic
import (
"context"
"encoding/json"
"net/http"
"sync"
"time"
"github.com/0xJacky/Nginx-UI/model"
"github.com/0xJacky/Nginx-UI/query"
"github.com/gorilla/websocket"
"github.com/uozi-tech/cosy/logger"
"net/http"
"time"
)
var stopNodeRecordChan = make(chan struct{})
var (
ctx, cancel = context.WithCancel(context.Background())
wg sync.WaitGroup
restartMu sync.Mutex // Add mutex to prevent concurrent restarts
)
func RestartRetrieveNodesStatus() {
stopNodeRecordChan <- struct{}{}
time.Sleep(10 * time.Second)
go RetrieveNodesStatus()
restartMu.Lock() // Acquire lock before modifying shared resources
defer restartMu.Unlock()
// Cancel previous context to stop all operations
cancel()
// Wait for previous goroutines to finish
wg.Wait()
// Create new context for this run
ctx, cancel = context.WithCancel(context.Background())
wg.Add(1)
go func() {
defer wg.Done()
RetrieveNodesStatus()
}()
}
func RetrieveNodesStatus() {
NodeMap = make(TNodeMap)
errChan := make(chan error)
logger.Info("RetrieveNodesStatus start")
defer logger.Info("RetrieveNodesStatus exited")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
mutex.Lock()
if NodeMap == nil {
NodeMap = make(TNodeMap)
}
mutex.Unlock()
env := query.Environment
envs, err := env.Where(env.Enabled.Is(true)).Find()
if err != nil {
logger.Error(err)
return
}
for _, v := range envs {
go nodeAnalyticLive(v, errChan, ctx)
}
var wg sync.WaitGroup
defer wg.Wait()
for {
select {
case err = <-errChan:
logger.Error(err)
case <-stopNodeRecordChan:
logger.Info("RetrieveNodesStatus exited normally")
return // will execute defer cancel()
}
}
}
for _, env := range envs {
wg.Add(1)
go func(e *model.Environment) {
defer wg.Done()
retryTicker := time.NewTicker(5 * time.Second)
defer retryTicker.Stop()
func nodeAnalyticLive(env *model.Environment, errChan chan error, ctx context.Context) {
for {
err := nodeAnalyticRecord(env, ctx)
if err != nil {
// set node offline
if NodeMap[env.ID] != nil {
mutex.Lock()
NodeMap[env.ID].Status = false
mutex.Unlock()
for {
select {
case <-ctx.Done():
return
default:
if err := nodeAnalyticRecord(e, ctx); err != nil {
logger.Error(err)
if NodeMap[env.ID] != nil {
mutex.Lock()
NodeMap[env.ID].Status = false
mutex.Unlock()
}
select {
case <-retryTicker.C:
case <-ctx.Done():
return
}
}
}
}
logger.Error(err)
errChan <- err
// wait 5s then reconnect
time.Sleep(5 * time.Second)
}
}(env)
}
<-ctx.Done()
}
func nodeAnalyticRecord(env *model.Environment, ctx context.Context) (err error) {
func nodeAnalyticRecord(env *model.Environment, ctx context.Context) error {
scopeCtx, cancel := context.WithCancel(ctx)
defer cancel()
node, err := InitNode(env)
mutex.Lock()
NodeMap[env.ID] = InitNode(env)
NodeMap[env.ID] = node
mutex.Unlock()
if err != nil {
return err
}
u, err := env.GetWebSocketURL("/api/analytic/intro")
if err != nil {
return
return err
}
header := http.Header{}
@ -90,28 +121,20 @@ func nodeAnalyticRecord(env *model.Environment, ctx context.Context) (err error)
c, _, err := dial.Dial(u, header)
if err != nil {
return
return err
}
defer c.Close()
var nodeStat NodeStat
go func() {
// shutdown
<-ctx.Done()
<-scopeCtx.Done()
_ = c.Close()
}()
var nodeStat NodeStat
for {
_, message, err := c.ReadMessage()
if err != nil || websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNoStatusReceived,
websocket.CloseNormalClosure) {
return err
}
err = json.Unmarshal(message, &nodeStat)
err = c.ReadJSON(&nodeStat)
if err != nil {
return err
}

View file

@ -26,21 +26,18 @@ func GetNodeStat() (data NodeStat) {
cpuSystemUsage := (cpuTimesAfter[0].System - cpuTimesBefore[0].System) / (float64(1000*threadNum) / 1000)
loadAvg, err := load.Avg()
if err != nil {
logger.Error(err)
return
}
diskStat, err := GetDiskStat()
if err != nil {
logger.Error(err)
return
}
netIO, err := net.IOCounters(false)
if err != nil {
logger.Error(err)
return

View file

@ -1,24 +1,21 @@
package helper
import (
"strings"
"github.com/gorilla/websocket"
"errors"
"github.com/gorilla/websocket"
"strings"
"syscall"
)
// IsUnexpectedWebsocketError checks if the error is an unexpected websocket error
func IsUnexpectedWebsocketError(err error) bool {
// nil error is an expected error
if err == nil {
return false
}
// ignore: write: broken pipe
if errors.Is(err, syscall.EPIPE) {
return false
}
// client closed error: *net.OpErr
if strings.Contains(err.Error(), "An existing connection was forcibly closed by the remote host") {
return true
return false
}
return websocket.IsUnexpectedCloseError(err,

View file

@ -12,6 +12,34 @@ import (
"github.com/uozi-tech/cosy/logger"
)
// getToken from header, cookie or query
func getToken(c *gin.Context) (token string) {
if token = c.GetHeader("Authorization"); token != "" {
return
}
if token, _ = c.Cookie("token"); token != "" {
return token
}
if token = c.Query("token"); token != "" {
tokenBytes, _ := base64.StdEncoding.DecodeString(token)
return string(tokenBytes)
}
return ""
}
// getXNodeID from header or query
func getXNodeID(c *gin.Context) (xNodeID string) {
if xNodeID = c.GetHeader("X-Node-ID"); xNodeID != "" {
return xNodeID
}
return c.Query("x_node_id")
}
// AuthRequired is a middleware that checks if the user is authenticated
func AuthRequired() gin.HandlerFunc {
return func(c *gin.Context) {
abortWithAuthFailure := func() {
@ -20,20 +48,20 @@ func AuthRequired() gin.HandlerFunc {
})
}
token := c.GetHeader("Authorization")
xNodeID := getXNodeID(c)
if xNodeID != "" {
c.Set("ProxyNodeID", xNodeID)
}
token := getToken(c)
if token == "" {
if token = c.GetHeader("X-Node-Secret"); token != "" && token == settings.NodeSettings.Secret {
c.Set("Secret", token)
c.Next()
return
} else {
c.Set("ProxyNodeID", c.Query("x_node_id"))
tokenBytes, _ := base64.StdEncoding.DecodeString(c.Query("token"))
token = string(tokenBytes)
if token == "" {
abortWithAuthFailure()
return
}
abortWithAuthFailure()
return
}
}
@ -44,11 +72,6 @@ func AuthRequired() gin.HandlerFunc {
}
c.Set("user", u)
if nodeID := c.GetHeader("X-Node-ID"); nodeID != "" {
c.Set("ProxyNodeID", nodeID)
}
c.Next()
}
}
@ -70,6 +93,7 @@ func (f ServerFileSystemType) Exists(prefix string, _path string) bool {
return err == nil
}
// CacheJs is a middleware that send header to client to cache js file
func CacheJs() gin.HandlerFunc {
return func(c *gin.Context) {
if strings.Contains(c.Request.URL.String(), "js") {

View file

@ -2,6 +2,7 @@ package pty
import (
"encoding/json"
"github.com/0xJacky/Nginx-UI/internal/helper"
"github.com/0xJacky/Nginx-UI/settings"
"github.com/creack/pty"
"github.com/gorilla/websocket"
@ -46,9 +47,10 @@ func NewPipeLine(conn *websocket.Conn) (p *Pipeline, err error) {
func (p *Pipeline) ReadWsAndWritePty(errorChan chan error) {
for {
msgType, payload, err := p.ws.ReadMessage()
if err != nil && websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseNoStatusReceived,
websocket.CloseNormalClosure) {
errorChan <- errors.Wrap(err, "Error ReadWsAndWritePty unexpected close")
if err != nil {
if helper.IsUnexpectedWebsocketError(err) {
errorChan <- errors.Wrap(err, "Error ReadWsAndWritePty unexpected close")
}
return
}
if msgType != websocket.TextMessage {
@ -117,8 +119,10 @@ func (p *Pipeline) ReadPtyAndWriteWs(errorChan chan error) {
}
processedOutput := validString(string(buf[:n]))
err = p.ws.WriteMessage(websocket.TextMessage, []byte(processedOutput))
if err != nil && websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
errorChan <- errors.Wrap(err, "Error ReadPtyAndWriteWs websocket write")
if err != nil {
if helper.IsUnexpectedWebsocketError(err) {
errorChan <- errors.Wrap(err, "Error ReadPtyAndWriteWs websocket write")
}
return
}
}