mirror of
https://github.com/0xJacky/nginx-ui.git
synced 2026-05-07 14:32:42 +08:00
- Introduced SafeWebSocketWriter to ensure thread-safe writes to WebSocket connections. - Updated WebSocket handling in certificate issuance, revocation, Nginx log tailing, and system upgrades to use the new writer. - Enhanced WebSocket client management in the site navigation module for improved message handling and connection stability.
239 lines
5.3 KiB
Go
239 lines
5.3 KiB
Go
package sites
|
|
|
|
import (
|
|
"errors"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/0xJacky/Nginx-UI/internal/helper"
|
|
"github.com/0xJacky/Nginx-UI/internal/middleware"
|
|
"github.com/0xJacky/Nginx-UI/internal/sitecheck"
|
|
"github.com/gin-gonic/gin"
|
|
"github.com/gorilla/websocket"
|
|
"github.com/uozi-tech/cosy/logger"
|
|
)
|
|
|
|
// WebSocket message types
|
|
const (
|
|
MessageTypeInitial = "initial"
|
|
MessageTypeUpdate = "update"
|
|
MessageTypeRefresh = "refresh"
|
|
MessageTypePing = "ping"
|
|
MessageTypePong = "pong"
|
|
)
|
|
|
|
// ClientMessage represents incoming WebSocket messages from client
|
|
type ClientMessage struct {
|
|
Type string `json:"type"`
|
|
}
|
|
|
|
// ServerMessage represents outgoing WebSocket messages to client
|
|
type ServerMessage struct {
|
|
Type string `json:"type"`
|
|
Data []*sitecheck.SiteInfo `json:"data,omitempty"`
|
|
}
|
|
|
|
// PongMessage represents a pong response
|
|
type PongMessage struct {
|
|
Type string `json:"type"`
|
|
}
|
|
|
|
var upgrader = websocket.Upgrader{
|
|
CheckOrigin: middleware.CheckWebSocketOrigin,
|
|
}
|
|
|
|
// WSManager WebSocket connection manager
|
|
type WSManager struct {
|
|
connections map[*websocket.Conn]*WSClient
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
var errClientUnavailable = errors.New("websocket client unavailable")
|
|
|
|
// WSClient wraps a websocket connection and handles serialized writes.
|
|
type WSClient struct {
|
|
conn *websocket.Conn
|
|
send chan interface{}
|
|
mutex sync.RWMutex
|
|
closed bool
|
|
}
|
|
|
|
func (c *WSClient) trySend(v interface{}) bool {
|
|
c.mutex.RLock()
|
|
if c.closed {
|
|
c.mutex.RUnlock()
|
|
return false
|
|
}
|
|
|
|
select {
|
|
case c.send <- v:
|
|
c.mutex.RUnlock()
|
|
return true
|
|
default:
|
|
c.mutex.RUnlock()
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (c *WSClient) closeSendChannel() {
|
|
c.mutex.Lock()
|
|
if c.closed {
|
|
c.mutex.Unlock()
|
|
return
|
|
}
|
|
|
|
close(c.send)
|
|
c.closed = true
|
|
c.mutex.Unlock()
|
|
}
|
|
|
|
func (c *WSClient) writePump() {
|
|
for message := range c.send {
|
|
c.conn.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
|
if err := c.conn.WriteJSON(message); err != nil {
|
|
logger.Error("Failed to write site websocket message:", err)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
var wsManager = &WSManager{
|
|
connections: make(map[*websocket.Conn]*WSClient),
|
|
}
|
|
|
|
// AddConnection adds a WebSocket connection to the manager
|
|
func (wm *WSManager) AddConnection(conn *websocket.Conn) *WSClient {
|
|
wm.mutex.Lock()
|
|
defer wm.mutex.Unlock()
|
|
client := &WSClient{
|
|
conn: conn,
|
|
send: make(chan interface{}, 16),
|
|
}
|
|
wm.connections[conn] = client
|
|
return client
|
|
}
|
|
|
|
// RemoveConnection removes a WebSocket connection from the manager
|
|
func (wm *WSManager) RemoveConnection(conn *websocket.Conn) {
|
|
wm.mutex.Lock()
|
|
client, ok := wm.connections[conn]
|
|
if ok {
|
|
delete(wm.connections, conn)
|
|
}
|
|
wm.mutex.Unlock()
|
|
|
|
if ok {
|
|
client.closeSendChannel()
|
|
}
|
|
}
|
|
|
|
func (wm *WSManager) activeClients() []*WSClient {
|
|
wm.mutex.RLock()
|
|
if len(wm.connections) == 0 {
|
|
wm.mutex.RUnlock()
|
|
return nil
|
|
}
|
|
|
|
clients := make([]*WSClient, 0, len(wm.connections))
|
|
for _, client := range wm.connections {
|
|
clients = append(clients, client)
|
|
}
|
|
wm.mutex.RUnlock()
|
|
|
|
return clients
|
|
}
|
|
|
|
// BroadcastUpdate sends updates to all connected WebSocket clients
|
|
func (wm *WSManager) BroadcastUpdate(sites []*sitecheck.SiteInfo) {
|
|
for _, client := range wm.activeClients() {
|
|
if err := sendSiteData(client, MessageTypeUpdate, sites); err == nil {
|
|
continue
|
|
}
|
|
|
|
wm.RemoveConnection(client.conn)
|
|
client.conn.Close()
|
|
}
|
|
}
|
|
|
|
// GetManager returns the global WebSocket manager instance
|
|
func GetManager() *WSManager {
|
|
return wsManager
|
|
}
|
|
|
|
// InitWebSocketNotifications sets up the callback for site check updates
|
|
func InitWebSocketNotifications() {
|
|
service := sitecheck.GetService()
|
|
service.SetUpdateCallback(func(sites []*sitecheck.SiteInfo) {
|
|
wsManager.BroadcastUpdate(sites)
|
|
})
|
|
}
|
|
|
|
// SiteNavigationWebSocket handles WebSocket connections for real-time site status updates
|
|
func SiteNavigationWebSocket(c *gin.Context) {
|
|
conn, err := upgrader.Upgrade(c.Writer, c.Request, nil)
|
|
if err != nil {
|
|
logger.Error("WebSocket upgrade failed:", err)
|
|
return
|
|
}
|
|
|
|
client := wsManager.AddConnection(conn)
|
|
defer func() {
|
|
wsManager.RemoveConnection(conn)
|
|
conn.Close()
|
|
}()
|
|
|
|
logger.Info("Site navigation WebSocket connection established")
|
|
|
|
service := sitecheck.GetService()
|
|
|
|
go client.writePump()
|
|
|
|
// Send initial data
|
|
if err := sendSiteData(client, MessageTypeInitial, service.GetSites()); err != nil {
|
|
logger.Error("Failed to queue initial site data:", err)
|
|
return
|
|
}
|
|
|
|
handleClientMessages(client, service)
|
|
logger.Info("Site navigation WebSocket connection closed")
|
|
}
|
|
|
|
// sendSiteData sends site data via WebSocket
|
|
func sendSiteData(client *WSClient, msgType string, sites []*sitecheck.SiteInfo) error {
|
|
message := ServerMessage{
|
|
Type: msgType,
|
|
Data: sites,
|
|
}
|
|
|
|
if !client.trySend(message) {
|
|
return errClientUnavailable
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// handleClientMessages handles incoming WebSocket messages
|
|
func handleClientMessages(client *WSClient, service *sitecheck.Service) {
|
|
for {
|
|
var msg ClientMessage
|
|
if err := client.conn.ReadJSON(&msg); err != nil {
|
|
if helper.IsUnexpectedWebsocketError(err) {
|
|
logger.Error("WebSocket read error:", err)
|
|
}
|
|
return
|
|
}
|
|
|
|
switch msg.Type {
|
|
case MessageTypeRefresh:
|
|
logger.Info("Client requested site refresh")
|
|
service.RefreshSites()
|
|
case MessageTypePing:
|
|
pongMsg := PongMessage{Type: MessageTypePong}
|
|
if !client.trySend(pongMsg) {
|
|
logger.Error("Failed to queue pong response:", errClientUnavailable)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|