diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index aae7bca8..fad7a1de 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -305,14 +305,10 @@ jobs: with: accountId: ${{ secrets.CF_ACCOUNT_ID }} apiToken: ${{ secrets.CF_R2_API_TOKEN }} - wranglerVersion: "4.21.1" + wranglerVersion: "4.23.0" command: | r2 object put nginx-ui-dev-build/${{ env.DIST }}.tar.gz --file ./${{ env.DIST }}.tar.gz --remote r2 object put nginx-ui-dev-build/${{ env.DIST }}.tar.gz.digest --file ./${{ env.DIST }}.tar.gz.digest --remote - if [[ "$GOOS" == "windows" ]]; then - r2 object put nginx-ui-dev-build/${{ env.DIST }}.zip --file ./${{ env.DIST }}.zip --remote - r2 object put nginx-ui-dev-build/${{ env.DIST }}.zip.digest --file ./${{ env.DIST }}.zip.digest --remote - fi docker-build: if: github.event_name != 'pull_request' diff --git a/api/event/websocket.go b/api/event/websocket.go index 24726849..3778ccdb 100644 --- a/api/event/websocket.go +++ b/api/event/websocket.go @@ -7,12 +7,9 @@ import ( "sync" "time" - "github.com/0xJacky/Nginx-UI/internal/cache" - "github.com/0xJacky/Nginx-UI/internal/cert" + "github.com/0xJacky/Nginx-UI/internal/event" "github.com/0xJacky/Nginx-UI/internal/helper" "github.com/0xJacky/Nginx-UI/internal/kernel" - "github.com/0xJacky/Nginx-UI/internal/notification" - "github.com/0xJacky/Nginx-UI/model" "github.com/gin-gonic/gin" "github.com/gorilla/websocket" "github.com/uozi-tech/cosy/logger" @@ -57,10 +54,26 @@ func GetHub() *Hub { unregister: make(chan *Client), } go hub.run() + + // Register this hub directly with the event bus + event.SetWebSocketHub(hub) }) return hub } +// BroadcastMessage implements the WebSocketHub interface +func (h *Hub) BroadcastMessage(event string, data interface{}) { + message := WebSocketMessage{ + Event: event, + Data: data, + } + select { + case h.broadcast <- message: + default: + logger.Warn("Broadcast channel full, message dropped") + } +} + // run handles the main hub loop func (h *Hub) run() { for { @@ -95,19 +108,6 @@ func (h *Hub) run() { } } -// BroadcastMessage sends a message to all connected clients -func (h *Hub) BroadcastMessage(event string, data interface{}) { - message := WebSocketMessage{ - Event: event, - Data: data, - } - select { - case h.broadcast <- message: - default: - logger.Warn("Broadcast channel full, message dropped") - } -} - // WebSocket upgrader configuration var upgrader = websocket.Upgrader{ CheckOrigin: func(r *http.Request) bool { @@ -139,92 +139,11 @@ func EventBus(c *gin.Context) { hub := GetHub() hub.register <- client - // Start goroutines for handling subscriptions - go client.handleNotifications() - go client.handleProcessingStatus() - go client.handleNginxLogStatus() - - // Start write and read pumps + // Start write and read pumps - no manual event subscriptions needed go client.writePump() client.readPump() } -// handleNotifications subscribes to notification events -func (c *Client) handleNotifications() { - evtChan := make(chan *model.Notification, 10) - wsManager := notification.GetWebSocketManager() - wsManager.Subscribe(evtChan) - - defer func() { - wsManager.Unsubscribe(evtChan) - }() - - for { - select { - case n := <-evtChan: - hub.BroadcastMessage("notification", n) - case <-c.ctx.Done(): - return - } - } -} - -// handleProcessingStatus subscribes to processing status events -func (c *Client) handleProcessingStatus() { - indexScanning := cache.SubscribeScanningStatus() - defer cache.UnsubscribeScanningStatus(indexScanning) - - autoCert := cert.SubscribeProcessingStatus() - defer cert.UnsubscribeProcessingStatus(autoCert) - - status := struct { - IndexScanning bool `json:"index_scanning"` - AutoCertProcessing bool `json:"auto_cert_processing"` - }{ - IndexScanning: false, - AutoCertProcessing: false, - } - - for { - select { - case indexStatus, ok := <-indexScanning: - if !ok { - return - } - status.IndexScanning = indexStatus - // Send processing status event - hub.BroadcastMessage("processing_status", status) - // Also send nginx log status event for backward compatibility - hub.BroadcastMessage("nginx_log_status", gin.H{ - "scanning": indexStatus, - }) - - case certStatus, ok := <-autoCert: - if !ok { - return - } - status.AutoCertProcessing = certStatus - hub.BroadcastMessage("processing_status", status) - - case <-c.ctx.Done(): - return - } - } -} - -// handleNginxLogStatus subscribes to nginx log scanning status events -// Note: This uses the same cache.SubscribeScanningStatus as handleProcessingStatus -// but sends different event types for different purposes -func (c *Client) handleNginxLogStatus() { - // We don't need a separate subscription here since handleProcessingStatus - // already handles the index scanning status. This function is kept for - // potential future nginx-specific log status that might be different - // from the general index scanning status. - - // For now, this is handled by handleProcessingStatus - <-c.ctx.Done() -} - // writePump pumps messages from the hub to the websocket connection func (c *Client) writePump() { ticker := time.NewTicker(30 * time.Second) diff --git a/go.mod b/go.mod index 98c867e6..b2dde9b2 100644 --- a/go.mod +++ b/go.mod @@ -84,6 +84,7 @@ require ( github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2 // indirect github.com/Microsoft/go-winio v0.6.2 // indirect github.com/OpenDNS/vegadns2client v0.0.0-20180418235048-a3fa4a771d87 // indirect + github.com/RoaringBitmap/roaring/v2 v2.4.5 // indirect github.com/akamai/AkamaiOPEN-edgegrid-golang v1.2.2 // indirect github.com/aliyun/aliyun-log-go-sdk v0.1.101 // indirect github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.11 // indirect @@ -104,6 +105,25 @@ require ( github.com/aws/smithy-go v1.22.4 // indirect github.com/baidubce/bce-sdk-go v0.9.233 // indirect github.com/beorn7/perks v1.0.1 // indirect + github.com/bits-and-blooms/bitset v1.22.0 // indirect + github.com/blevesearch/bleve/v2 v2.5.2 // indirect + github.com/blevesearch/bleve_index_api v1.2.8 // indirect + github.com/blevesearch/geo v0.2.3 // indirect + github.com/blevesearch/go-faiss v1.0.25 // indirect + github.com/blevesearch/go-porterstemmer v1.0.3 // indirect + github.com/blevesearch/gtreap v0.1.1 // indirect + github.com/blevesearch/mmap-go v1.0.4 // indirect + github.com/blevesearch/scorch_segment_api/v2 v2.3.10 // indirect + github.com/blevesearch/segment v0.9.1 // indirect + github.com/blevesearch/snowballstem v0.9.0 // indirect + github.com/blevesearch/upsidedown_store_api v1.0.2 // indirect + github.com/blevesearch/vellum v1.1.0 // indirect + github.com/blevesearch/zapx/v11 v11.4.2 // indirect + github.com/blevesearch/zapx/v12 v12.4.2 // indirect + github.com/blevesearch/zapx/v13 v13.4.2 // indirect + github.com/blevesearch/zapx/v14 v14.4.2 // indirect + github.com/blevesearch/zapx/v15 v15.4.2 // indirect + github.com/blevesearch/zapx/v16 v16.2.4 // indirect github.com/blinkbean/dingtalk v1.1.3 // indirect github.com/boombuler/barcode v1.0.2 // indirect github.com/bsm/redislock v0.9.4 // indirect @@ -151,6 +171,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang-jwt/jwt/v4 v4.5.2 // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/snappy v1.0.0 // indirect github.com/google/go-querystring v1.1.0 // indirect github.com/google/go-tpm v0.9.5 // indirect github.com/google/s2a-go v0.1.9 // indirect @@ -206,6 +227,7 @@ require ( github.com/moby/term v0.5.2 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect + github.com/mschoch/smat v0.2.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/namedotcom/go/v4 v4.0.2 // indirect github.com/nrdcg/auroradns v1.1.0 // indirect @@ -272,6 +294,7 @@ require ( github.com/yosida95/uritemplate/v3 v3.0.2 // indirect github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect + go.etcd.io/bbolt v1.4.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 // indirect go.opentelemetry.io/otel v1.37.0 // indirect diff --git a/go.sum b/go.sum index be19b452..67eb4254 100644 --- a/go.sum +++ b/go.sum @@ -682,6 +682,8 @@ github.com/Netflix/go-env v0.0.0-20220526054621-78278af1949d/go.mod h1:9XMFaCeRy github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OpenDNS/vegadns2client v0.0.0-20180418235048-a3fa4a771d87 h1:xPMsUicZ3iosVPSIP7bW5EcGUzjiiMl1OYTe14y/R24= github.com/OpenDNS/vegadns2client v0.0.0-20180418235048-a3fa4a771d87/go.mod h1:iGLljf5n9GjT6kc0HBvyI1nOKnGQbNB66VzSNbK5iks= +github.com/RoaringBitmap/roaring/v2 v2.4.5 h1:uGrrMreGjvAtTBobc0g5IrW1D5ldxDQYe2JW2gggRdg= +github.com/RoaringBitmap/roaring/v2 v2.4.5/go.mod h1:FiJcsfkGje/nZBZgCu0ZxCPOKD/hVXDS2dXi7/eUFE0= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/sarama v1.30.1/go.mod h1:hGgx05L/DiW8XYBXeJdKIN6V2QUy2H6JqME5VT1NLRw= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= @@ -793,7 +795,46 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs= +github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= +github.com/bits-and-blooms/bitset v1.22.0 h1:Tquv9S8+SGaS3EhyA+up3FXzmkhxPGjQQCkcs2uw7w4= +github.com/bits-and-blooms/bitset v1.22.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8= github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84= +github.com/blevesearch/bleve/v2 v2.5.2 h1:Ab0r0MODV2C5A6BEL87GqLBySqp/s9xFgceCju6BQk8= +github.com/blevesearch/bleve/v2 v2.5.2/go.mod h1:5Dj6dUQxZM6aqYT3eutTD/GpWKGFSsV8f7LDidFbwXo= +github.com/blevesearch/bleve_index_api v1.2.8 h1:Y98Pu5/MdlkRyLM0qDHostYo7i+Vv1cDNhqTeR4Sy6Y= +github.com/blevesearch/bleve_index_api v1.2.8/go.mod h1:rKQDl4u51uwafZxFrPD1R7xFOwKnzZW7s/LSeK4lgo0= +github.com/blevesearch/geo v0.2.3 h1:K9/vbGI9ehlXdxjxDRJtoAMt7zGAsMIzc6n8zWcwnhg= +github.com/blevesearch/geo v0.2.3/go.mod h1:K56Q33AzXt2YExVHGObtmRSFYZKYGv0JEN5mdacJJR8= +github.com/blevesearch/go-faiss v1.0.25 h1:lel1rkOUGbT1CJ0YgzKwC7k+XH0XVBHnCVWahdCXk4U= +github.com/blevesearch/go-faiss v1.0.25/go.mod h1:OMGQwOaRRYxrmeNdMrXJPvVx8gBnvE5RYrr0BahNnkk= +github.com/blevesearch/go-porterstemmer v1.0.3 h1:GtmsqID0aZdCSNiY8SkuPJ12pD4jI+DdXTAn4YRcHCo= +github.com/blevesearch/go-porterstemmer v1.0.3/go.mod h1:angGc5Ht+k2xhJdZi511LtmxuEf0OVpvUUNrwmM1P7M= +github.com/blevesearch/gtreap v0.1.1 h1:2JWigFrzDMR+42WGIN/V2p0cUvn4UP3C4Q5nmaZGW8Y= +github.com/blevesearch/gtreap v0.1.1/go.mod h1:QaQyDRAT51sotthUWAH4Sj08awFSSWzgYICSZ3w0tYk= +github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc= +github.com/blevesearch/mmap-go v1.0.4/go.mod h1:EWmEAOmdAS9z/pi/+Toxu99DnsbhG1TIxUoRmJw/pSs= +github.com/blevesearch/scorch_segment_api/v2 v2.3.10 h1:Yqk0XD1mE0fDZAJXTjawJ8If/85JxnLd8v5vG/jWE/s= +github.com/blevesearch/scorch_segment_api/v2 v2.3.10/go.mod h1:Z3e6ChN3qyN35yaQpl00MfI5s8AxUJbpTR/DL8QOQ+8= +github.com/blevesearch/segment v0.9.1 h1:+dThDy+Lvgj5JMxhmOVlgFfkUtZV2kw49xax4+jTfSU= +github.com/blevesearch/segment v0.9.1/go.mod h1:zN21iLm7+GnBHWTao9I+Au/7MBiL8pPFtJBJTsk6kQw= +github.com/blevesearch/snowballstem v0.9.0 h1:lMQ189YspGP6sXvZQ4WZ+MLawfV8wOmPoD/iWeNXm8s= +github.com/blevesearch/snowballstem v0.9.0/go.mod h1:PivSj3JMc8WuaFkTSRDW2SlrulNWPl4ABg1tC/hlgLs= +github.com/blevesearch/upsidedown_store_api v1.0.2 h1:U53Q6YoWEARVLd1OYNc9kvhBMGZzVrdmaozG2MfoB+A= +github.com/blevesearch/upsidedown_store_api v1.0.2/go.mod h1:M01mh3Gpfy56Ps/UXHjEO/knbqyQ1Oamg8If49gRwrQ= +github.com/blevesearch/vellum v1.1.0 h1:CinkGyIsgVlYf8Y2LUQHvdelgXr6PYuvoDIajq6yR9w= +github.com/blevesearch/vellum v1.1.0/go.mod h1:QgwWryE8ThtNPxtgWJof5ndPfx0/YMBh+W2weHKPw8Y= +github.com/blevesearch/zapx/v11 v11.4.2 h1:l46SV+b0gFN+Rw3wUI1YdMWdSAVhskYuvxlcgpQFljs= +github.com/blevesearch/zapx/v11 v11.4.2/go.mod h1:4gdeyy9oGa/lLa6D34R9daXNUvfMPZqUYjPwiLmekwc= +github.com/blevesearch/zapx/v12 v12.4.2 h1:fzRbhllQmEMUuAQ7zBuMvKRlcPA5ESTgWlDEoB9uQNE= +github.com/blevesearch/zapx/v12 v12.4.2/go.mod h1:TdFmr7afSz1hFh/SIBCCZvcLfzYvievIH6aEISCte58= +github.com/blevesearch/zapx/v13 v13.4.2 h1:46PIZCO/ZuKZYgxI8Y7lOJqX3Irkc3N8W82QTK3MVks= +github.com/blevesearch/zapx/v13 v13.4.2/go.mod h1:knK8z2NdQHlb5ot/uj8wuvOq5PhDGjNYQQy0QDnopZk= +github.com/blevesearch/zapx/v14 v14.4.2 h1:2SGHakVKd+TrtEqpfeq8X+So5PShQ5nW6GNxT7fWYz0= +github.com/blevesearch/zapx/v14 v14.4.2/go.mod h1:rz0XNb/OZSMjNorufDGSpFpjoFKhXmppH9Hi7a877D8= +github.com/blevesearch/zapx/v15 v15.4.2 h1:sWxpDE0QQOTjyxYbAVjt3+0ieu8NCE0fDRaFxEsp31k= +github.com/blevesearch/zapx/v15 v15.4.2/go.mod h1:1pssev/59FsuWcgSnTa0OeEpOzmhtmr/0/11H0Z8+Nw= +github.com/blevesearch/zapx/v16 v16.2.4 h1:tGgfvleXTAkwsD5mEzgM3zCS/7pgocTCnO1oyAUjlww= +github.com/blevesearch/zapx/v16 v16.2.4/go.mod h1:Rti/REtuuMmzwsI8/C/qIzRaEoSK/wiFYw5e5ctUKKs= github.com/blinkbean/dingtalk v1.1.3 h1:MbidFZYom7DTFHD/YIs+eaI7kRy52kmWE/sy0xjo6E4= github.com/blinkbean/dingtalk v1.1.3/go.mod h1:9BaLuGSBqY3vT5hstValh48DbsKO7vaHaJnG9pXwbto= github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8= @@ -1544,6 +1585,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A= github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= +github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM= +github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA= github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ= github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= @@ -1916,6 +1959,8 @@ github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA= github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q= go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU= +go.etcd.io/bbolt v1.4.0 h1:TU77id3TnN/zKr7CO/uk+fBCwF2jGcMuw2B/FMAzYIk= +go.etcd.io/bbolt v1.4.0/go.mod h1:AsD+OCi/qPN1giOX1aiLAha3o1U8rAz65bvN4j0sRuk= go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs= go.etcd.io/etcd/api/v3 v3.5.7/go.mod h1:9qew1gCdDDLu+VwmeG+iFpL+QlpHTo7iubavdVDgCAA= go.etcd.io/etcd/api/v3 v3.5.9/go.mod h1:uyAal843mC8uUVSLWz6eHa/d971iDGnCRpmKd2Z+X8k= @@ -2791,6 +2836,7 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gorm.io/datatypes v1.2.6 h1:KafLdXvFUhzNeL2ncm03Gl3eTLONQfNKZ+wJ+9Y4Nck= diff --git a/internal/cache/index.go b/internal/cache/index.go index 2072066d..de21b028 100644 --- a/internal/cache/index.go +++ b/internal/cache/index.go @@ -9,6 +9,7 @@ import ( "sync" "time" + "github.com/0xJacky/Nginx-UI/internal/event" "github.com/0xJacky/Nginx-UI/internal/nginx" "github.com/fsnotify/fsnotify" "github.com/uozi-tech/cosy/logger" @@ -20,15 +21,12 @@ type ScanCallback func(configPath string, content []byte) error // Scanner is responsible for scanning and watching nginx config files type Scanner struct { - ctx context.Context // Context for the scanner - watcher *fsnotify.Watcher // File system watcher - scanTicker *time.Ticker // Ticker for periodic scanning - initialized bool // Whether the scanner has been initialized - scanning bool // Whether a scan is currently in progress - scanMutex sync.RWMutex // Mutex for protecting the scanning state - statusChan chan bool // Channel to broadcast scanning status changes - subscribers map[chan bool]struct{} // Set of subscribers - subscriberMux sync.RWMutex // Mutex for protecting the subscribers map + ctx context.Context // Context for the scanner + watcher *fsnotify.Watcher // File system watcher + scanTicker *time.Ticker // Ticker for periodic scanning + initialized bool // Whether the scanner has been initialized + scanning bool // Whether a scan is currently in progress + scanMutex sync.RWMutex // Mutex for protecting the scanning state } // Global variables @@ -65,13 +63,7 @@ func GetScanner() *Scanner { defer configScannerInitMux.Unlock() if scanner == nil { - scanner = &Scanner{ - statusChan: make(chan bool, 10), // Buffer to prevent blocking - subscribers: make(map[chan bool]struct{}), - } - - // Start broadcaster goroutine - go scanner.broadcastStatus() + scanner = &Scanner{} } return scanner } @@ -84,56 +76,12 @@ func RegisterCallback(callback ScanCallback) { scanCallbacks = append(scanCallbacks, callback) } -// broadcastStatus listens for status changes and broadcasts to all subscribers -func (s *Scanner) broadcastStatus() { - for status := range s.statusChan { - s.subscriberMux.RLock() - for ch := range s.subscribers { - // Non-blocking send to prevent slow subscribers from blocking others - select { - case ch <- status: - default: - // Skip if channel buffer is full - } - } - s.subscriberMux.RUnlock() - } -} - -// SubscribeScanningStatus allows a client to subscribe to scanning status changes -func SubscribeScanningStatus() chan bool { - s := GetScanner() - ch := make(chan bool, 5) // Buffer to prevent blocking - - // Add to subscribers - s.subscriberMux.Lock() - s.subscribers[ch] = struct{}{} - s.subscriberMux.Unlock() - - // Send current status immediately - s.scanMutex.RLock() - currentStatus := s.scanning - s.scanMutex.RUnlock() - - // Non-blocking send - select { - case ch <- currentStatus: - default: - } - - return ch -} - -// UnsubscribeScanningStatus removes a subscriber from receiving status updates -func UnsubscribeScanningStatus(ch chan bool) { - s := GetScanner() - - s.subscriberMux.Lock() - delete(s.subscribers, ch) - s.subscriberMux.Unlock() - - // Close the channel so the client knows it's unsubscribed - close(ch) +// publishScanningStatus publishes the scanning status to the event bus +func (s *Scanner) publishScanningStatus(scanning bool) { + event.Publish(event.Event{ + Type: event.EventTypeIndexScanning, + Data: scanning, + }) } // Initialize sets up the scanner and starts watching for file changes @@ -333,8 +281,8 @@ func (s *Scanner) scanSingleFileWithDepth(filePath string, visited map[string]bo wasScanning = s.scanning s.scanning = true if !wasScanning { - // Only broadcast if status changed from not scanning to scanning - s.statusChan <- true + // Only publish if status changed from not scanning to scanning + s.publishScanningStatus(true) } s.scanMutex.Unlock() @@ -342,8 +290,8 @@ func (s *Scanner) scanSingleFileWithDepth(filePath string, visited map[string]bo defer func() { s.scanMutex.Lock() s.scanning = false - // Broadcast the completion - s.statusChan <- false + // Publish the completion + s.publishScanningStatus(false) s.scanMutex.Unlock() }() } @@ -431,8 +379,8 @@ func (s *Scanner) ScanAllConfigs() error { wasScanning := s.scanning s.scanning = true if !wasScanning { - // Only broadcast if status changed from not scanning to scanning - s.statusChan <- true + // Only publish if status changed from not scanning to scanning + s.publishScanningStatus(true) } s.scanMutex.Unlock() @@ -440,8 +388,8 @@ func (s *Scanner) ScanAllConfigs() error { defer func() { s.scanMutex.Lock() s.scanning = false - // Broadcast the completion - s.statusChan <- false + // Publish the completion + s.publishScanningStatus(false) s.scanMutex.Unlock() }() @@ -494,19 +442,6 @@ func (s *Scanner) Shutdown() { if s.scanTicker != nil { s.scanTicker.Stop() } - - // Clean up subscriber resources - s.subscriberMux.Lock() - // Close all subscriber channels - for ch := range s.subscribers { - close(ch) - } - // Clear the map - s.subscribers = make(map[chan bool]struct{}) - s.subscriberMux.Unlock() - - // Close the status channel - close(s.statusChan) } // IsScanningInProgress returns whether a scan is currently in progress diff --git a/internal/cert/mutex.go b/internal/cert/mutex.go index c1f406a5..cceecb63 100644 --- a/internal/cert/mutex.go +++ b/internal/cert/mutex.go @@ -1,23 +1,15 @@ package cert import ( - "context" "sync" + + "github.com/0xJacky/Nginx-UI/internal/event" ) var ( // mutex is used to control access to certificate operations mutex sync.Mutex - // statusChan is the channel to broadcast certificate status changes - statusChan = make(chan bool, 10) - - // subscribers is a map of channels that are subscribed to certificate status changes - subscribers = make(map[chan bool]struct{}) - - // subscriberMux protects the subscribers map from concurrent access - subscriberMux sync.RWMutex - // isProcessing indicates whether a certificate operation is in progress isProcessing bool @@ -25,69 +17,12 @@ var ( processingMutex sync.RWMutex ) -func initBroadcastStatus(ctx context.Context) { - // Start broadcasting goroutine - go broadcastStatus(ctx) -} - -// broadcastStatus listens for status changes and broadcasts to all subscribers -func broadcastStatus(ctx context.Context) { - for { - select { - case <-ctx.Done(): - // Context cancelled, clean up resources and exit - close(statusChan) - return - case status, ok := <-statusChan: - if !ok { - // Channel closed, exit - return - } - subscriberMux.RLock() - for ch := range subscribers { - // Non-blocking send to prevent slow subscribers from blocking others - select { - case ch <- status: - default: - // Skip if channel buffer is full - } - } - subscriberMux.RUnlock() - } - } -} - -// SubscribeProcessingStatus allows a client to subscribe to certificate processing status changes -func SubscribeProcessingStatus() chan bool { - ch := make(chan bool, 5) // Buffer to prevent blocking - - // Add to subscribers - subscriberMux.Lock() - subscribers[ch] = struct{}{} - subscriberMux.Unlock() - - // Send current status immediately - processingMutex.RLock() - currentStatus := isProcessing - processingMutex.RUnlock() - - // Non-blocking send - select { - case ch <- currentStatus: - default: - } - - return ch -} - -// UnsubscribeProcessingStatus removes a subscriber from receiving status updates -func UnsubscribeProcessingStatus(ch chan bool) { - subscriberMux.Lock() - delete(subscribers, ch) - subscriberMux.Unlock() - - // Close the channel so the client knows it's unsubscribed - close(ch) +// publishProcessingStatus publishes the processing status to the event bus +func publishProcessingStatus(processing bool) { + event.Publish(event.Event{ + Type: event.EventTypeAutoCertProcessing, + Data: processing, + }) } // lock acquires the certificate mutex @@ -109,12 +44,12 @@ func IsProcessing() bool { return isProcessing } -// setProcessingStatus updates the processing status and broadcasts the change +// setProcessingStatus updates the processing status and publishes the change func setProcessingStatus(status bool) { processingMutex.Lock() if isProcessing != status { isProcessing = status - statusChan <- status + publishProcessingStatus(status) } processingMutex.Unlock() } diff --git a/internal/cert/register.go b/internal/cert/register.go index 64e32948..1eb61436 100644 --- a/internal/cert/register.go +++ b/internal/cert/register.go @@ -13,8 +13,6 @@ import ( // InitRegister init the default user for acme func InitRegister(ctx context.Context) { - initBroadcastStatus(ctx) - email := settings.CertSettings.Email if settings.CertSettings.Email == "" { return diff --git a/internal/event/bus.go b/internal/event/bus.go new file mode 100644 index 00000000..a7b9b58c --- /dev/null +++ b/internal/event/bus.go @@ -0,0 +1,188 @@ +package event + +import ( + "context" + "sync" + + "github.com/uozi-tech/cosy/logger" +) + +// WebSocketHub interface for broadcasting messages +type WebSocketHub interface { + BroadcastMessage(event string, data interface{}) +} + +// WebSocketEventConfig holds configuration for WebSocket event forwarding +type WebSocketEventConfig struct { + EventType EventType + WSEventName string + DataTransform func(data interface{}) interface{} +} + +// EventBus manages event publishing and WebSocket forwarding +type EventBus struct { + wsHub WebSocketHub + wsConfigs map[EventType]*WebSocketEventConfig + wsMutex sync.RWMutex + ctx context.Context + cancel context.CancelFunc +} + +var ( + globalBus *EventBus + busOnce sync.Once +) + +// GetEventBus returns the global event bus instance +func GetEventBus() *EventBus { + busOnce.Do(func() { + ctx, cancel := context.WithCancel(context.Background()) + globalBus = &EventBus{ + wsConfigs: make(map[EventType]*WebSocketEventConfig), + ctx: ctx, + cancel: cancel, + } + }) + return globalBus +} + +// SetWebSocketHub sets the WebSocket hub for direct event forwarding +func (eb *EventBus) SetWebSocketHub(hub WebSocketHub) { + eb.wsMutex.Lock() + defer eb.wsMutex.Unlock() + eb.wsHub = hub + logger.Info("WebSocket hub registered with event bus") +} + +// RegisterWebSocketEventForwarding registers an event type to be forwarded to WebSocket clients +func (eb *EventBus) RegisterWebSocketEventForwarding(eventType EventType, wsEventName string) { + eb.RegisterWebSocketEventForwardingWithTransform(eventType, wsEventName, func(data interface{}) interface{} { + return data // Default: no transformation + }) +} + +// RegisterWebSocketEventForwardingWithTransform registers an event type with custom data transformation +func (eb *EventBus) RegisterWebSocketEventForwardingWithTransform(eventType EventType, wsEventName string, transform func(data interface{}) interface{}) { + eb.wsMutex.Lock() + defer eb.wsMutex.Unlock() + + // Only register if not already registered + if _, exists := eb.wsConfigs[eventType]; !exists { + config := &WebSocketEventConfig{ + EventType: eventType, + WSEventName: wsEventName, + DataTransform: transform, + } + eb.wsConfigs[eventType] = config + logger.Debugf("Registered WebSocket event forwarding: %s -> %s", eventType, wsEventName) + } +} + +// Publish forwards an event directly to WebSocket clients +func (eb *EventBus) Publish(event Event) { + eb.forwardToWebSocket(event) +} + +// forwardToWebSocket forwards an event to WebSocket clients if configured +func (eb *EventBus) forwardToWebSocket(event Event) { + eb.wsMutex.RLock() + config, exists := eb.wsConfigs[event.Type] + hub := eb.wsHub + eb.wsMutex.RUnlock() + + if !exists || hub == nil { + return + } + + // Apply data transformation + wsData := config.DataTransform(event.Data) + hub.BroadcastMessage(config.WSEventName, wsData) +} + +// Shutdown gracefully shuts down the event bus +func (eb *EventBus) Shutdown() { + eb.cancel() + eb.wsMutex.Lock() + defer eb.wsMutex.Unlock() + + // Clear all configurations + eb.wsConfigs = make(map[EventType]*WebSocketEventConfig) + eb.wsHub = nil + logger.Info("Event bus shutdown completed") +} + +// Context returns the event bus context +func (eb *EventBus) Context() context.Context { + return eb.ctx +} + +// Convenience functions for global event bus + +// Publish forwards an event to WebSocket clients on the global bus +func Publish(event Event) { + GetEventBus().Publish(event) +} + +// SetWebSocketHub sets the WebSocket hub for the global event bus +func SetWebSocketHub(hub WebSocketHub) { + GetEventBus().SetWebSocketHub(hub) +} + +// RegisterWebSocketEventForwarding registers WebSocket event forwarding on the global bus +func RegisterWebSocketEventForwarding(eventType EventType, wsEventName string) { + GetEventBus().RegisterWebSocketEventForwarding(eventType, wsEventName) +} + +// RegisterWebSocketEventForwardingWithTransform registers WebSocket event forwarding with transform on the global bus +func RegisterWebSocketEventForwardingWithTransform(eventType EventType, wsEventName string, transform func(data interface{}) interface{}) { + GetEventBus().RegisterWebSocketEventForwardingWithTransform(eventType, wsEventName, transform) +} + +// RegisterWebSocketEventConfigs registers multiple WebSocket event configurations +func RegisterWebSocketEventConfigs(configs []WebSocketEventConfig) { + bus := GetEventBus() + for _, config := range configs { + bus.RegisterWebSocketEventForwardingWithTransform(config.EventType, config.WSEventName, config.DataTransform) + } +} + +// GetDefaultWebSocketEventConfigs returns the default WebSocket event configurations +func GetDefaultWebSocketEventConfigs() []WebSocketEventConfig { + return []WebSocketEventConfig{ + { + EventType: EventTypeIndexScanning, + WSEventName: "index_scanning", + DataTransform: func(data interface{}) interface{} { + return data + }, + }, + { + EventType: EventTypeAutoCertProcessing, + WSEventName: "auto_cert_processing", + DataTransform: func(data interface{}) interface{} { + return data + }, + }, + { + EventType: EventTypeProcessingStatus, + WSEventName: "processing_status", + DataTransform: func(data interface{}) interface{} { + return data + }, + }, + { + EventType: EventTypeNginxLogStatus, + WSEventName: "nginx_log_status", + DataTransform: func(data interface{}) interface{} { + return data + }, + }, + { + EventType: EventTypeNotification, + WSEventName: "notification", + DataTransform: func(data interface{}) interface{} { + return data + }, + }, + } +} diff --git a/internal/event/init.go b/internal/event/init.go new file mode 100644 index 00000000..ee344867 --- /dev/null +++ b/internal/event/init.go @@ -0,0 +1,37 @@ +package event + +import ( + "context" + + "github.com/uozi-tech/cosy/logger" +) + +// InitEventSystem initializes the event system and sets up WebSocket forwarding +func InitEventSystem(ctx context.Context) { + logger.Info("Initializing event system...") + + // Initialize the event bus by getting the singleton instance + GetEventBus() + + // Initialize WebSocket event forwarding configurations + initWebSocketEventForwarding() + + logger.Info("Event system initialized successfully") + defer ShutdownEventSystem() + + <-ctx.Done() +} + +// initWebSocketEventForwarding initializes WebSocket event forwarding configurations +func initWebSocketEventForwarding() { + // Register default event forwarding configurations + RegisterWebSocketEventConfigs(GetDefaultWebSocketEventConfigs()) + logger.Info("WebSocket event forwarding initialized") +} + +// ShutdownEventSystem gracefully shuts down the event system +func ShutdownEventSystem() { + logger.Info("Shutting down event system...") + GetEventBus().Shutdown() + logger.Info("Event system shutdown completed") +} diff --git a/internal/event/types.go b/internal/event/types.go new file mode 100644 index 00000000..d27a13bd --- /dev/null +++ b/internal/event/types.go @@ -0,0 +1,34 @@ +package event + +// EventType represents the type of event +type EventType string + +const ( + // Processing status events + EventTypeIndexScanning EventType = "index_scanning" + EventTypeAutoCertProcessing EventType = "auto_cert_processing" + EventTypeProcessingStatus EventType = "processing_status" + + // Nginx log status events (for backward compatibility) + EventTypeNginxLogStatus EventType = "nginx_log_status" + + // Notification events + EventTypeNotification EventType = "notification" +) + +// Event represents a generic event structure +type Event struct { + Type EventType `json:"type"` + Data interface{} `json:"data"` +} + +// ProcessingStatusData represents the data for processing status events +type ProcessingStatusData struct { + IndexScanning bool `json:"index_scanning"` + AutoCertProcessing bool `json:"auto_cert_processing"` +} + +// NginxLogStatusData represents the data for nginx log status events (backward compatibility) +type NginxLogStatusData struct { + Scanning bool `json:"scanning"` +} diff --git a/internal/kernel/boot.go b/internal/kernel/boot.go index 9ab25da5..5cf5fa3b 100644 --- a/internal/kernel/boot.go +++ b/internal/kernel/boot.go @@ -16,6 +16,7 @@ import ( "github.com/0xJacky/Nginx-UI/internal/cluster" "github.com/0xJacky/Nginx-UI/internal/cron" "github.com/0xJacky/Nginx-UI/internal/docker" + "github.com/0xJacky/Nginx-UI/internal/event" "github.com/0xJacky/Nginx-UI/internal/helper" "github.com/0xJacky/Nginx-UI/internal/mcp" "github.com/0xJacky/Nginx-UI/internal/passkey" @@ -54,6 +55,7 @@ func Boot(ctx context.Context) { syncs := []func(ctx context.Context){ analytic.RecordServerAnalytic, + event.InitEventSystem, } for _, v := range async { diff --git a/internal/notification/push.go b/internal/notification/push.go index b02e8061..438f9992 100644 --- a/internal/notification/push.go +++ b/internal/notification/push.go @@ -1,6 +1,7 @@ package notification import ( + "github.com/0xJacky/Nginx-UI/internal/event" "github.com/0xJacky/Nginx-UI/model" "github.com/0xJacky/Nginx-UI/query" "github.com/uozi-tech/cosy/logger" @@ -21,6 +22,14 @@ func push(nType model.NotificationType, title string, content string, details an logger.Error(err) return } + + // Use event system instead of direct broadcast + event.Publish(event.Event{ + Type: event.EventTypeNotification, + Data: data, + }) + + // Keep the old broadcast for backward compatibility broadcast(data) extNotify := &ExternalMessage{data}