feat: sync streams

This commit is contained in:
Jacky 2025-02-25 15:53:08 +08:00
parent 082ccc18bc
commit 34fa4eb204
No known key found for this signature in database
GPG key ID: 215C21B10DF38B4D
27 changed files with 788 additions and 557 deletions

79
internal/stream/delete.go Normal file
View file

@ -0,0 +1,79 @@
package stream
import (
"fmt"
"net/http"
"os"
"runtime"
"github.com/0xJacky/Nginx-UI/internal/helper"
"github.com/0xJacky/Nginx-UI/internal/nginx"
"github.com/0xJacky/Nginx-UI/internal/notification"
"github.com/0xJacky/Nginx-UI/model"
"github.com/0xJacky/Nginx-UI/query"
"github.com/go-resty/resty/v2"
"github.com/uozi-tech/cosy/logger"
)
// Delete deletes a site by removing the file in sites-available
func Delete(name string) (err error) {
availablePath := nginx.GetConfPath("streams-available", name)
syncDelete(name)
s := query.Site
_, err = s.Where(s.Path.Eq(availablePath)).Unscoped().Delete(&model.Site{})
if err != nil {
return
}
enabledPath := nginx.GetConfPath("streams-enabled", name)
if !helper.FileExists(availablePath) {
return ErrStreamNotFound
}
if helper.FileExists(enabledPath) {
return ErrStreamIsEnabled
}
certModel := model.Cert{Filename: name}
_ = certModel.Remove()
err = os.Remove(availablePath)
if err != nil {
return
}
return
}
func syncDelete(name string) {
nodes := getSyncNodes(name)
for _, node := range nodes {
go func() {
defer func() {
if err := recover(); err != nil {
buf := make([]byte, 1024)
runtime.Stack(buf, false)
logger.Error(err)
}
}()
client := resty.New()
client.SetBaseURL(node.URL)
resp, err := client.R().
SetHeader("X-Node-Secret", node.Token).
Delete(fmt.Sprintf("/api/streams/%s", name))
if err != nil {
notification.Error("Delete Remote Stream Error", err.Error())
return
}
if resp.StatusCode() != http.StatusOK {
notification.Error("Delete Remote Stream Error", NewSyncResult(node.Name, name, resp).String())
return
}
notification.Success("Delete Remote Stream Success", NewSyncResult(node.Name, name, resp).String())
}()
}
}

View file

@ -0,0 +1,81 @@
package stream
import (
"fmt"
"github.com/0xJacky/Nginx-UI/internal/nginx"
"github.com/0xJacky/Nginx-UI/internal/notification"
"github.com/0xJacky/Nginx-UI/model"
"github.com/go-resty/resty/v2"
"github.com/uozi-tech/cosy/logger"
"net/http"
"os"
"runtime"
"sync"
)
// Disable disables a site by removing the symlink in sites-enabled
func Disable(name string) (err error) {
enabledConfigFilePath := nginx.GetConfPath("streams-enabled", name)
_, err = os.Stat(enabledConfigFilePath)
if err != nil {
return
}
err = os.Remove(enabledConfigFilePath)
if err != nil {
return
}
// delete auto cert record
certModel := model.Cert{Filename: name}
err = certModel.Remove()
if err != nil {
return
}
output := nginx.Reload()
if nginx.GetLogLevel(output) > nginx.Warn {
return fmt.Errorf("%s", output)
}
go syncDisable(name)
return
}
func syncDisable(name string) {
nodes := getSyncNodes(name)
wg := &sync.WaitGroup{}
wg.Add(len(nodes))
for _, node := range nodes {
go func() {
defer func() {
if err := recover(); err != nil {
buf := make([]byte, 1024)
runtime.Stack(buf, false)
logger.Error(err)
}
}()
defer wg.Done()
client := resty.New()
client.SetBaseURL(node.URL)
resp, err := client.R().
SetHeader("X-Node-Secret", node.Token).
Post(fmt.Sprintf("/api/streams/%s/disable", name))
if err != nil {
notification.Error("Disable Remote Stream Error", err.Error())
return
}
if resp.StatusCode() != http.StatusOK {
notification.Error("Disable Remote Stream Error", NewSyncResult(node.Name, name, resp).String())
return
}
notification.Success("Disable Remote Stream Success", NewSyncResult(node.Name, name, resp).String())
}()
}
wg.Wait()
}

View file

@ -0,0 +1,23 @@
package stream
import (
"github.com/0xJacky/Nginx-UI/internal/helper"
"github.com/0xJacky/Nginx-UI/internal/nginx"
)
// Duplicate duplicates a site by copying the file
func Duplicate(src, dst string) (err error) {
src = nginx.GetConfPath("streams-available", src)
dst = nginx.GetConfPath("streams-available", dst)
if helper.FileExists(dst) {
return ErrDstFileExists
}
_, err = helper.CopyFile(src, dst)
if err != nil {
return
}
return
}

87
internal/stream/enable.go Normal file
View file

@ -0,0 +1,87 @@
package stream
import (
"fmt"
"github.com/0xJacky/Nginx-UI/internal/helper"
"github.com/0xJacky/Nginx-UI/internal/nginx"
"github.com/0xJacky/Nginx-UI/internal/notification"
"github.com/go-resty/resty/v2"
"github.com/uozi-tech/cosy/logger"
"net/http"
"os"
"runtime"
"sync"
)
// Enable enables a site by creating a symlink in sites-enabled
func Enable(name string) (err error) {
configFilePath := nginx.GetConfPath("streams-available", name)
enabledConfigFilePath := nginx.GetConfPath("streams-enabled", name)
_, err = os.Stat(configFilePath)
if err != nil {
return
}
if helper.FileExists(enabledConfigFilePath) {
return
}
err = os.Symlink(configFilePath, enabledConfigFilePath)
if err != nil {
return
}
// Test nginx config, if not pass, then disable the site.
output := nginx.TestConf()
if nginx.GetLogLevel(output) > nginx.Warn {
_ = os.Remove(enabledConfigFilePath)
return fmt.Errorf("%s", output)
}
output = nginx.Reload()
if nginx.GetLogLevel(output) > nginx.Warn {
return fmt.Errorf("%s", output)
}
go syncEnable(name)
return
}
func syncEnable(name string) {
nodes := getSyncNodes(name)
wg := &sync.WaitGroup{}
wg.Add(len(nodes))
for _, node := range nodes {
go func() {
defer func() {
if err := recover(); err != nil {
buf := make([]byte, 1024)
runtime.Stack(buf, false)
logger.Error(err)
}
}()
defer wg.Done()
client := resty.New()
client.SetBaseURL(node.URL)
resp, err := client.R().
SetHeader("X-Node-Secret", node.Token).
Post(fmt.Sprintf("/api/streams/%s/enable", name))
if err != nil {
notification.Error("Enable Remote Stream Error", err.Error())
return
}
if resp.StatusCode() != http.StatusOK {
notification.Error("Enable Remote Stream Error", NewSyncResult(node.Name, name, resp).String())
return
}
notification.Success("Enable Remote Stream Success", NewSyncResult(node.Name, name, resp).String())
}()
}
wg.Wait()
}

10
internal/stream/errors.go Normal file
View file

@ -0,0 +1,10 @@
package stream
import "github.com/uozi-tech/cosy"
var (
e = cosy.NewErrorScope("stream")
ErrStreamNotFound = e.New(40401, "stream not found")
ErrDstFileExists = e.New(50001, "destination file already exists")
ErrStreamIsEnabled = e.New(50002, "stream is enabled")
)

108
internal/stream/rename.go Normal file
View file

@ -0,0 +1,108 @@
package stream
import (
"fmt"
"github.com/0xJacky/Nginx-UI/internal/helper"
"github.com/0xJacky/Nginx-UI/internal/nginx"
"github.com/0xJacky/Nginx-UI/internal/notification"
"github.com/0xJacky/Nginx-UI/query"
"github.com/go-resty/resty/v2"
"github.com/uozi-tech/cosy/logger"
"net/http"
"os"
"runtime"
"sync"
)
func Rename(oldName string, newName string) (err error) {
oldPath := nginx.GetConfPath("streams-available", oldName)
newPath := nginx.GetConfPath("streams-available", newName)
if oldPath == newPath {
return
}
// check if dst file exists, do not rename
if helper.FileExists(newPath) {
return ErrDstFileExists
}
s := query.Site
_, _ = s.Where(s.Path.Eq(oldPath)).Update(s.Path, newPath)
err = os.Rename(oldPath, newPath)
if err != nil {
return
}
// recreate a soft link
oldEnabledConfigFilePath := nginx.GetConfPath("streams-enabled", oldName)
if helper.SymbolLinkExists(oldEnabledConfigFilePath) {
_ = os.Remove(oldEnabledConfigFilePath)
newEnabledConfigFilePath := nginx.GetConfPath("streams-enabled", newName)
err = os.Symlink(newPath, newEnabledConfigFilePath)
if err != nil {
return
}
}
// test nginx configuration
output := nginx.TestConf()
if nginx.GetLogLevel(output) > nginx.Warn {
return fmt.Errorf("%s", output)
}
// reload nginx
output = nginx.Reload()
if nginx.GetLogLevel(output) > nginx.Warn {
return fmt.Errorf("%s", output)
}
go syncRename(oldName, newName)
return
}
func syncRename(oldName, newName string) {
nodes := getSyncNodes(newName)
wg := &sync.WaitGroup{}
wg.Add(len(nodes))
for _, node := range nodes {
go func() {
defer func() {
if err := recover(); err != nil {
buf := make([]byte, 1024)
runtime.Stack(buf, false)
logger.Error(err)
}
}()
defer wg.Done()
client := resty.New()
client.SetBaseURL(node.URL)
resp, err := client.R().
SetHeader("X-Node-Secret", node.Token).
SetBody(map[string]string{
"new_name": newName,
}).
Post(fmt.Sprintf("/api/streams/%s/rename", oldName))
if err != nil {
notification.Error("Rename Remote Stream Error", err.Error())
return
}
if resp.StatusCode() != http.StatusOK {
notification.Error("Rename Remote Stream Error",
NewSyncResult(node.Name, oldName, resp).
SetNewName(newName).String())
return
}
notification.Success("Rename Remote Stream Success",
NewSyncResult(node.Name, oldName, resp).
SetNewName(newName).String())
}()
}
wg.Wait()
}

107
internal/stream/save.go Normal file
View file

@ -0,0 +1,107 @@
package stream
import (
"fmt"
"net/http"
"os"
"runtime"
"sync"
"github.com/0xJacky/Nginx-UI/internal/helper"
"github.com/0xJacky/Nginx-UI/internal/nginx"
"github.com/0xJacky/Nginx-UI/internal/notification"
"github.com/0xJacky/Nginx-UI/model"
"github.com/0xJacky/Nginx-UI/query"
"github.com/go-resty/resty/v2"
"github.com/uozi-tech/cosy/logger"
)
// Save saves a site configuration file
func Save(name string, content string, overwrite bool, syncNodeIds []uint64) (err error) {
path := nginx.GetConfPath("streams-available", name)
if !overwrite && helper.FileExists(path) {
return ErrDstFileExists
}
err = os.WriteFile(path, []byte(content), 0644)
if err != nil {
return
}
enabledConfigFilePath := nginx.GetConfPath("streams-enabled", name)
if helper.FileExists(enabledConfigFilePath) {
// Test nginx configuration
output := nginx.TestConf()
if nginx.GetLogLevel(output) > nginx.Warn {
return fmt.Errorf("%s", output)
}
output = nginx.Reload()
if nginx.GetLogLevel(output) > nginx.Warn {
return fmt.Errorf("%s", output)
}
}
s := query.Stream
_, err = s.Where(s.Path.Eq(path)).
Select(s.SyncNodeIDs).
Updates(&model.Site{
SyncNodeIDs: syncNodeIds,
})
if err != nil {
return
}
go syncSave(name, content)
return
}
func syncSave(name string, content string) {
nodes := getSyncNodes(name)
wg := &sync.WaitGroup{}
wg.Add(len(nodes))
for _, node := range nodes {
go func() {
defer func() {
if err := recover(); err != nil {
buf := make([]byte, 1024)
runtime.Stack(buf, false)
logger.Error(err)
}
}()
defer wg.Done()
client := resty.New()
client.SetBaseURL(node.URL)
resp, err := client.R().
SetHeader("X-Node-Secret", node.Token).
SetBody(map[string]interface{}{
"content": content,
"overwrite": true,
}).
Post(fmt.Sprintf("/api/streams/%s", name))
if err != nil {
notification.Error("Save Remote Stream Error", err.Error())
return
}
if resp.StatusCode() != http.StatusOK {
notification.Error("Save Remote Stream Error", NewSyncResult(node.Name, name, resp).String())
return
}
notification.Success("Save Remote Stream Success", NewSyncResult(node.Name, name, resp).String())
// Check if the site is enabled, if so then enable it on the remote node
enabledConfigFilePath := nginx.GetConfPath("streams-enabled", name)
if helper.FileExists(enabledConfigFilePath) {
syncEnable(name)
}
}()
}
wg.Wait()
}

74
internal/stream/sync.go Normal file
View file

@ -0,0 +1,74 @@
package stream
import (
"encoding/json"
"github.com/0xJacky/Nginx-UI/internal/nginx"
"github.com/0xJacky/Nginx-UI/model"
"github.com/0xJacky/Nginx-UI/query"
"github.com/gin-gonic/gin"
"github.com/go-resty/resty/v2"
"github.com/samber/lo"
"github.com/uozi-tech/cosy/logger"
)
// getSyncNodes returns the nodes that need to be synchronized by site name
func getSyncNodes(name string) (nodes []*model.Environment) {
configFilePath := nginx.GetConfPath("streams-available", name)
s := query.Site
site, err := s.Where(s.Path.Eq(configFilePath)).
Preload(s.SiteCategory).First()
if err != nil {
logger.Error(err)
return
}
syncNodeIds := site.SyncNodeIDs
// inherit sync node ids from site category
if site.SiteCategory != nil {
syncNodeIds = append(syncNodeIds, site.SiteCategory.SyncNodeIds...)
}
syncNodeIds = lo.Uniq(syncNodeIds)
e := query.Environment
nodes, err = e.Where(e.ID.In(syncNodeIds...)).Find()
if err != nil {
logger.Error(err)
return
}
return
}
type SyncResult struct {
StatusCode int `json:"status_code"`
Node string `json:"node"`
Name string `json:"name"`
NewName string `json:"new_name,omitempty"`
Response gin.H `json:"response"`
Error string `json:"error"`
}
func NewSyncResult(node string, siteName string, resp *resty.Response) (s *SyncResult) {
s = &SyncResult{
StatusCode: resp.StatusCode(),
Node: node,
Name: siteName,
}
err := json.Unmarshal(resp.Body(), &s.Response)
if err != nil {
logger.Error(err)
}
return
}
func (s *SyncResult) SetNewName(name string) *SyncResult {
s.NewName = name
return s
}
func (s *SyncResult) String() string {
b, err := json.Marshal(s)
if err != nil {
logger.Error(err)
}
return string(b)
}