mirror of
https://github.com/0xJacky/nginx-ui.git
synced 2026-05-06 22:12:23 +08:00
- Introduced `WebSocketTrustedOrigins` setting in `app.example.ini` and corresponding documentation. - Refactored WebSocket origin checks across multiple API endpoints to utilize the new middleware for improved security. - Added tests for the new origin validation logic to ensure proper handling of trusted origins and node secret requests.
288 lines
6.5 KiB
Go
288 lines
6.5 KiB
Go
package nginx
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/0xJacky/Nginx-UI/internal/helper"
|
|
"github.com/0xJacky/Nginx-UI/internal/kernel"
|
|
"github.com/0xJacky/Nginx-UI/internal/middleware"
|
|
"github.com/0xJacky/Nginx-UI/internal/performance"
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/uozi-tech/cosy/logger"
|
|
)
|
|
|
|
// PerformanceClient represents a WebSocket client for Nginx performance monitoring
|
|
type PerformanceClient struct {
|
|
conn *websocket.Conn
|
|
send chan interface{}
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
mutex sync.RWMutex
|
|
closed bool
|
|
}
|
|
|
|
func (c *PerformanceClient) trySend(message interface{}) bool {
|
|
c.mutex.RLock()
|
|
if c.closed {
|
|
c.mutex.RUnlock()
|
|
return false
|
|
}
|
|
|
|
select {
|
|
case c.send <- message:
|
|
c.mutex.RUnlock()
|
|
return true
|
|
default:
|
|
c.mutex.RUnlock()
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (c *PerformanceClient) closeSendChannel() {
|
|
c.mutex.Lock()
|
|
if c.closed {
|
|
c.mutex.Unlock()
|
|
return
|
|
}
|
|
|
|
close(c.send)
|
|
c.closed = true
|
|
c.mutex.Unlock()
|
|
}
|
|
|
|
// PerformanceHub manages WebSocket connections for Nginx performance monitoring
|
|
type PerformanceHub struct {
|
|
clients map[*PerformanceClient]bool
|
|
register chan *PerformanceClient
|
|
unregister chan *PerformanceClient
|
|
mutex sync.RWMutex
|
|
ticker *time.Ticker
|
|
}
|
|
|
|
var (
|
|
performanceHub *PerformanceHub
|
|
performanceHubOnce sync.Once
|
|
)
|
|
|
|
// GetNginxPerformanceHub returns the singleton hub instance
|
|
func GetNginxPerformanceHub() *PerformanceHub {
|
|
performanceHubOnce.Do(func() {
|
|
performanceHub = &PerformanceHub{
|
|
clients: make(map[*PerformanceClient]bool),
|
|
register: make(chan *PerformanceClient),
|
|
unregister: make(chan *PerformanceClient),
|
|
ticker: time.NewTicker(5 * time.Second),
|
|
}
|
|
go performanceHub.run()
|
|
})
|
|
return performanceHub
|
|
}
|
|
|
|
// run handles the main hub loop
|
|
func (h *PerformanceHub) run() {
|
|
defer h.ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case client := <-h.register:
|
|
h.mutex.Lock()
|
|
h.clients[client] = true
|
|
currentClients := len(h.clients)
|
|
h.mutex.Unlock()
|
|
logger.Debug("Nginx performance client connected, total clients:", currentClients)
|
|
|
|
// Send initial data to the new client
|
|
go h.sendPerformanceDataToClient(client)
|
|
|
|
case client := <-h.unregister:
|
|
currentClients, removed := h.removeClient(client)
|
|
if removed {
|
|
logger.Debug("Nginx performance client disconnected, total clients:", currentClients)
|
|
}
|
|
|
|
case <-h.ticker.C:
|
|
// Send performance data to all connected clients
|
|
h.broadcastPerformanceData()
|
|
|
|
case <-kernel.Context.Done():
|
|
logger.Debug("PerformanceHub: Context cancelled, closing WebSocket")
|
|
// Shutdown all clients
|
|
for _, client := range h.activeClients() {
|
|
h.removeClient(client)
|
|
}
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *PerformanceHub) activeClients() []*PerformanceClient {
|
|
h.mutex.RLock()
|
|
if len(h.clients) == 0 {
|
|
h.mutex.RUnlock()
|
|
return nil
|
|
}
|
|
|
|
clients := make([]*PerformanceClient, 0, len(h.clients))
|
|
for client := range h.clients {
|
|
clients = append(clients, client)
|
|
}
|
|
h.mutex.RUnlock()
|
|
return clients
|
|
}
|
|
|
|
func (h *PerformanceHub) removeClient(client *PerformanceClient) (remaining int, removed bool) {
|
|
h.mutex.Lock()
|
|
_, removed = h.clients[client]
|
|
if removed {
|
|
delete(h.clients, client)
|
|
}
|
|
remaining = len(h.clients)
|
|
h.mutex.Unlock()
|
|
|
|
if removed {
|
|
client.closeSendChannel()
|
|
}
|
|
return remaining, removed
|
|
}
|
|
|
|
// sendPerformanceDataToClient sends performance data to a specific client
|
|
func (h *PerformanceHub) sendPerformanceDataToClient(client *PerformanceClient) {
|
|
select {
|
|
case <-client.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
|
|
response := performance.GetPerformanceData()
|
|
|
|
if !client.trySend(response) {
|
|
// Channel is full, remove client
|
|
h.unregister <- client
|
|
}
|
|
}
|
|
|
|
// broadcastPerformanceData sends performance data to all connected clients
|
|
func (h *PerformanceHub) broadcastPerformanceData() {
|
|
clients := h.activeClients()
|
|
if len(clients) == 0 {
|
|
return
|
|
}
|
|
|
|
response := performance.GetPerformanceData()
|
|
|
|
for _, client := range clients {
|
|
if client.trySend(response) {
|
|
continue
|
|
}
|
|
|
|
h.removeClient(client)
|
|
}
|
|
}
|
|
|
|
// WebSocket upgrader configuration
|
|
var nginxPerformanceUpgrader = websocket.Upgrader{
|
|
CheckOrigin: middleware.CheckWebSocketOrigin,
|
|
ReadBufferSize: 1024,
|
|
WriteBufferSize: 1024,
|
|
}
|
|
|
|
// StreamDetailStatusWS handles WebSocket connection for Nginx performance monitoring
|
|
func StreamDetailStatusWS(c *gin.Context) {
|
|
ws, err := nginxPerformanceUpgrader.Upgrade(c.Writer, c.Request, nil)
|
|
if err != nil {
|
|
logger.Error("Failed to upgrade connection:", err)
|
|
return
|
|
}
|
|
defer ws.Close()
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
client := &PerformanceClient{
|
|
conn: ws,
|
|
send: make(chan interface{}, 1024), // Increased buffer size
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
}
|
|
|
|
hub := GetNginxPerformanceHub()
|
|
hub.register <- client
|
|
|
|
// Start write and read pumps
|
|
go client.writePump()
|
|
client.readPump()
|
|
}
|
|
|
|
// writePump pumps messages from the hub to the websocket connection
|
|
func (c *PerformanceClient) writePump() {
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
defer func() {
|
|
ticker.Stop()
|
|
c.conn.Close()
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case message, ok := <-c.send:
|
|
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
|
if !ok {
|
|
c.conn.WriteMessage(websocket.CloseMessage, []byte{})
|
|
return
|
|
}
|
|
|
|
if err := c.conn.WriteJSON(message); err != nil {
|
|
logger.Error("Failed to write message:", err)
|
|
if helper.IsUnexpectedWebsocketError(err) {
|
|
return
|
|
}
|
|
}
|
|
|
|
case <-ticker.C:
|
|
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
|
if err := c.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
|
|
logger.Error("Failed to write ping:", err)
|
|
return
|
|
}
|
|
|
|
case <-c.ctx.Done():
|
|
return
|
|
|
|
case <-kernel.Context.Done():
|
|
logger.Debug("PerformanceClient: Context cancelled, closing WebSocket")
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
// readPump pumps messages from the websocket connection to the hub
|
|
func (c *PerformanceClient) readPump() {
|
|
defer func() {
|
|
hub := GetNginxPerformanceHub()
|
|
hub.unregister <- c
|
|
c.conn.Close()
|
|
c.cancel()
|
|
}()
|
|
|
|
c.conn.SetReadLimit(512)
|
|
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
|
c.conn.SetPongHandler(func(string) error {
|
|
c.conn.SetReadDeadline(time.Now().Add(60 * time.Second))
|
|
return nil
|
|
})
|
|
|
|
for {
|
|
_, _, err := c.conn.ReadMessage()
|
|
if err != nil {
|
|
if helper.IsUnexpectedWebsocketError(err) {
|
|
logger.Error("Unexpected WebSocket error:", err)
|
|
}
|
|
break
|
|
}
|
|
// Handle incoming messages if needed
|
|
// For now, this is a one-way communication (server to client)
|
|
}
|
|
}
|