refactor: simplify event bus for ws broadcasting

This commit is contained in:
Jacky
2025-07-05 11:53:59 +08:00
parent d6ecf1d51b
commit 52cf30841c
12 changed files with 390 additions and 268 deletions

View File

@@ -305,14 +305,10 @@ jobs:
with:
accountId: ${{ secrets.CF_ACCOUNT_ID }}
apiToken: ${{ secrets.CF_R2_API_TOKEN }}
wranglerVersion: "4.21.1"
wranglerVersion: "4.23.0"
command: |
r2 object put nginx-ui-dev-build/${{ env.DIST }}.tar.gz --file ./${{ env.DIST }}.tar.gz --remote
r2 object put nginx-ui-dev-build/${{ env.DIST }}.tar.gz.digest --file ./${{ env.DIST }}.tar.gz.digest --remote
if [[ "$GOOS" == "windows" ]]; then
r2 object put nginx-ui-dev-build/${{ env.DIST }}.zip --file ./${{ env.DIST }}.zip --remote
r2 object put nginx-ui-dev-build/${{ env.DIST }}.zip.digest --file ./${{ env.DIST }}.zip.digest --remote
fi
docker-build:
if: github.event_name != 'pull_request'

View File

@@ -7,12 +7,9 @@ import (
"sync"
"time"
"github.com/0xJacky/Nginx-UI/internal/cache"
"github.com/0xJacky/Nginx-UI/internal/cert"
"github.com/0xJacky/Nginx-UI/internal/event"
"github.com/0xJacky/Nginx-UI/internal/helper"
"github.com/0xJacky/Nginx-UI/internal/kernel"
"github.com/0xJacky/Nginx-UI/internal/notification"
"github.com/0xJacky/Nginx-UI/model"
"github.com/gin-gonic/gin"
"github.com/gorilla/websocket"
"github.com/uozi-tech/cosy/logger"
@@ -57,10 +54,26 @@ func GetHub() *Hub {
unregister: make(chan *Client),
}
go hub.run()
// Register this hub directly with the event bus
event.SetWebSocketHub(hub)
})
return hub
}
// BroadcastMessage implements the WebSocketHub interface
func (h *Hub) BroadcastMessage(event string, data interface{}) {
message := WebSocketMessage{
Event: event,
Data: data,
}
select {
case h.broadcast <- message:
default:
logger.Warn("Broadcast channel full, message dropped")
}
}
// run handles the main hub loop
func (h *Hub) run() {
for {
@@ -95,19 +108,6 @@ func (h *Hub) run() {
}
}
// BroadcastMessage sends a message to all connected clients
func (h *Hub) BroadcastMessage(event string, data interface{}) {
message := WebSocketMessage{
Event: event,
Data: data,
}
select {
case h.broadcast <- message:
default:
logger.Warn("Broadcast channel full, message dropped")
}
}
// WebSocket upgrader configuration
var upgrader = websocket.Upgrader{
CheckOrigin: func(r *http.Request) bool {
@@ -139,92 +139,11 @@ func EventBus(c *gin.Context) {
hub := GetHub()
hub.register <- client
// Start goroutines for handling subscriptions
go client.handleNotifications()
go client.handleProcessingStatus()
go client.handleNginxLogStatus()
// Start write and read pumps
// Start write and read pumps - no manual event subscriptions needed
go client.writePump()
client.readPump()
}
// handleNotifications subscribes to notification events
func (c *Client) handleNotifications() {
evtChan := make(chan *model.Notification, 10)
wsManager := notification.GetWebSocketManager()
wsManager.Subscribe(evtChan)
defer func() {
wsManager.Unsubscribe(evtChan)
}()
for {
select {
case n := <-evtChan:
hub.BroadcastMessage("notification", n)
case <-c.ctx.Done():
return
}
}
}
// handleProcessingStatus subscribes to processing status events
func (c *Client) handleProcessingStatus() {
indexScanning := cache.SubscribeScanningStatus()
defer cache.UnsubscribeScanningStatus(indexScanning)
autoCert := cert.SubscribeProcessingStatus()
defer cert.UnsubscribeProcessingStatus(autoCert)
status := struct {
IndexScanning bool `json:"index_scanning"`
AutoCertProcessing bool `json:"auto_cert_processing"`
}{
IndexScanning: false,
AutoCertProcessing: false,
}
for {
select {
case indexStatus, ok := <-indexScanning:
if !ok {
return
}
status.IndexScanning = indexStatus
// Send processing status event
hub.BroadcastMessage("processing_status", status)
// Also send nginx log status event for backward compatibility
hub.BroadcastMessage("nginx_log_status", gin.H{
"scanning": indexStatus,
})
case certStatus, ok := <-autoCert:
if !ok {
return
}
status.AutoCertProcessing = certStatus
hub.BroadcastMessage("processing_status", status)
case <-c.ctx.Done():
return
}
}
}
// handleNginxLogStatus subscribes to nginx log scanning status events
// Note: This uses the same cache.SubscribeScanningStatus as handleProcessingStatus
// but sends different event types for different purposes
func (c *Client) handleNginxLogStatus() {
// We don't need a separate subscription here since handleProcessingStatus
// already handles the index scanning status. This function is kept for
// potential future nginx-specific log status that might be different
// from the general index scanning status.
// For now, this is handled by handleProcessingStatus
<-c.ctx.Done()
}
// writePump pumps messages from the hub to the websocket connection
func (c *Client) writePump() {
ticker := time.NewTicker(30 * time.Second)

23
go.mod
View File

@@ -84,6 +84,7 @@ require (
github.com/AzureAD/microsoft-authentication-library-for-go v1.4.2 // indirect
github.com/Microsoft/go-winio v0.6.2 // indirect
github.com/OpenDNS/vegadns2client v0.0.0-20180418235048-a3fa4a771d87 // indirect
github.com/RoaringBitmap/roaring/v2 v2.4.5 // indirect
github.com/akamai/AkamaiOPEN-edgegrid-golang v1.2.2 // indirect
github.com/aliyun/aliyun-log-go-sdk v0.1.101 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.11 // indirect
@@ -104,6 +105,25 @@ require (
github.com/aws/smithy-go v1.22.4 // indirect
github.com/baidubce/bce-sdk-go v0.9.233 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/bits-and-blooms/bitset v1.22.0 // indirect
github.com/blevesearch/bleve/v2 v2.5.2 // indirect
github.com/blevesearch/bleve_index_api v1.2.8 // indirect
github.com/blevesearch/geo v0.2.3 // indirect
github.com/blevesearch/go-faiss v1.0.25 // indirect
github.com/blevesearch/go-porterstemmer v1.0.3 // indirect
github.com/blevesearch/gtreap v0.1.1 // indirect
github.com/blevesearch/mmap-go v1.0.4 // indirect
github.com/blevesearch/scorch_segment_api/v2 v2.3.10 // indirect
github.com/blevesearch/segment v0.9.1 // indirect
github.com/blevesearch/snowballstem v0.9.0 // indirect
github.com/blevesearch/upsidedown_store_api v1.0.2 // indirect
github.com/blevesearch/vellum v1.1.0 // indirect
github.com/blevesearch/zapx/v11 v11.4.2 // indirect
github.com/blevesearch/zapx/v12 v12.4.2 // indirect
github.com/blevesearch/zapx/v13 v13.4.2 // indirect
github.com/blevesearch/zapx/v14 v14.4.2 // indirect
github.com/blevesearch/zapx/v15 v15.4.2 // indirect
github.com/blevesearch/zapx/v16 v16.2.4 // indirect
github.com/blinkbean/dingtalk v1.1.3 // indirect
github.com/boombuler/barcode v1.0.2 // indirect
github.com/bsm/redislock v0.9.4 // indirect
@@ -151,6 +171,7 @@ require (
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v4 v4.5.2 // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v1.0.0 // indirect
github.com/google/go-querystring v1.1.0 // indirect
github.com/google/go-tpm v0.9.5 // indirect
github.com/google/s2a-go v0.1.9 // indirect
@@ -206,6 +227,7 @@ require (
github.com/moby/term v0.5.2 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/mschoch/smat v0.2.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/namedotcom/go/v4 v4.0.2 // indirect
github.com/nrdcg/auroradns v1.1.0 // indirect
@@ -272,6 +294,7 @@ require (
github.com/yosida95/uritemplate/v3 v3.0.2 // indirect
github.com/youmark/pkcs8 v0.0.0-20240726163527-a2c0da244d78 // indirect
github.com/yusufpapurcu/wmi v1.2.4 // indirect
go.etcd.io/bbolt v1.4.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 // indirect
go.opentelemetry.io/otel v1.37.0 // indirect

46
go.sum
View File

@@ -682,6 +682,8 @@ github.com/Netflix/go-env v0.0.0-20220526054621-78278af1949d/go.mod h1:9XMFaCeRy
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OpenDNS/vegadns2client v0.0.0-20180418235048-a3fa4a771d87 h1:xPMsUicZ3iosVPSIP7bW5EcGUzjiiMl1OYTe14y/R24=
github.com/OpenDNS/vegadns2client v0.0.0-20180418235048-a3fa4a771d87/go.mod h1:iGLljf5n9GjT6kc0HBvyI1nOKnGQbNB66VzSNbK5iks=
github.com/RoaringBitmap/roaring/v2 v2.4.5 h1:uGrrMreGjvAtTBobc0g5IrW1D5ldxDQYe2JW2gggRdg=
github.com/RoaringBitmap/roaring/v2 v2.4.5/go.mod h1:FiJcsfkGje/nZBZgCu0ZxCPOKD/hVXDS2dXi7/eUFE0=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
github.com/Shopify/sarama v1.30.1/go.mod h1:hGgx05L/DiW8XYBXeJdKIN6V2QUy2H6JqME5VT1NLRw=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
@@ -793,7 +795,46 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/bits-and-blooms/bitset v1.12.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bits-and-blooms/bitset v1.22.0 h1:Tquv9S8+SGaS3EhyA+up3FXzmkhxPGjQQCkcs2uw7w4=
github.com/bits-and-blooms/bitset v1.22.0/go.mod h1:7hO7Gc7Pp1vODcmWvKMRA9BNmbv6a/7QIWpPxHddWR8=
github.com/bketelsen/crypt v0.0.3-0.20200106085610-5cbc8cc4026c/go.mod h1:MKsuJmJgSg28kpZDP6UIiPt0e0Oz0kqKNGyRaWEPv84=
github.com/blevesearch/bleve/v2 v2.5.2 h1:Ab0r0MODV2C5A6BEL87GqLBySqp/s9xFgceCju6BQk8=
github.com/blevesearch/bleve/v2 v2.5.2/go.mod h1:5Dj6dUQxZM6aqYT3eutTD/GpWKGFSsV8f7LDidFbwXo=
github.com/blevesearch/bleve_index_api v1.2.8 h1:Y98Pu5/MdlkRyLM0qDHostYo7i+Vv1cDNhqTeR4Sy6Y=
github.com/blevesearch/bleve_index_api v1.2.8/go.mod h1:rKQDl4u51uwafZxFrPD1R7xFOwKnzZW7s/LSeK4lgo0=
github.com/blevesearch/geo v0.2.3 h1:K9/vbGI9ehlXdxjxDRJtoAMt7zGAsMIzc6n8zWcwnhg=
github.com/blevesearch/geo v0.2.3/go.mod h1:K56Q33AzXt2YExVHGObtmRSFYZKYGv0JEN5mdacJJR8=
github.com/blevesearch/go-faiss v1.0.25 h1:lel1rkOUGbT1CJ0YgzKwC7k+XH0XVBHnCVWahdCXk4U=
github.com/blevesearch/go-faiss v1.0.25/go.mod h1:OMGQwOaRRYxrmeNdMrXJPvVx8gBnvE5RYrr0BahNnkk=
github.com/blevesearch/go-porterstemmer v1.0.3 h1:GtmsqID0aZdCSNiY8SkuPJ12pD4jI+DdXTAn4YRcHCo=
github.com/blevesearch/go-porterstemmer v1.0.3/go.mod h1:angGc5Ht+k2xhJdZi511LtmxuEf0OVpvUUNrwmM1P7M=
github.com/blevesearch/gtreap v0.1.1 h1:2JWigFrzDMR+42WGIN/V2p0cUvn4UP3C4Q5nmaZGW8Y=
github.com/blevesearch/gtreap v0.1.1/go.mod h1:QaQyDRAT51sotthUWAH4Sj08awFSSWzgYICSZ3w0tYk=
github.com/blevesearch/mmap-go v1.0.4 h1:OVhDhT5B/M1HNPpYPBKIEJaD0F3Si+CrEKULGCDPWmc=
github.com/blevesearch/mmap-go v1.0.4/go.mod h1:EWmEAOmdAS9z/pi/+Toxu99DnsbhG1TIxUoRmJw/pSs=
github.com/blevesearch/scorch_segment_api/v2 v2.3.10 h1:Yqk0XD1mE0fDZAJXTjawJ8If/85JxnLd8v5vG/jWE/s=
github.com/blevesearch/scorch_segment_api/v2 v2.3.10/go.mod h1:Z3e6ChN3qyN35yaQpl00MfI5s8AxUJbpTR/DL8QOQ+8=
github.com/blevesearch/segment v0.9.1 h1:+dThDy+Lvgj5JMxhmOVlgFfkUtZV2kw49xax4+jTfSU=
github.com/blevesearch/segment v0.9.1/go.mod h1:zN21iLm7+GnBHWTao9I+Au/7MBiL8pPFtJBJTsk6kQw=
github.com/blevesearch/snowballstem v0.9.0 h1:lMQ189YspGP6sXvZQ4WZ+MLawfV8wOmPoD/iWeNXm8s=
github.com/blevesearch/snowballstem v0.9.0/go.mod h1:PivSj3JMc8WuaFkTSRDW2SlrulNWPl4ABg1tC/hlgLs=
github.com/blevesearch/upsidedown_store_api v1.0.2 h1:U53Q6YoWEARVLd1OYNc9kvhBMGZzVrdmaozG2MfoB+A=
github.com/blevesearch/upsidedown_store_api v1.0.2/go.mod h1:M01mh3Gpfy56Ps/UXHjEO/knbqyQ1Oamg8If49gRwrQ=
github.com/blevesearch/vellum v1.1.0 h1:CinkGyIsgVlYf8Y2LUQHvdelgXr6PYuvoDIajq6yR9w=
github.com/blevesearch/vellum v1.1.0/go.mod h1:QgwWryE8ThtNPxtgWJof5ndPfx0/YMBh+W2weHKPw8Y=
github.com/blevesearch/zapx/v11 v11.4.2 h1:l46SV+b0gFN+Rw3wUI1YdMWdSAVhskYuvxlcgpQFljs=
github.com/blevesearch/zapx/v11 v11.4.2/go.mod h1:4gdeyy9oGa/lLa6D34R9daXNUvfMPZqUYjPwiLmekwc=
github.com/blevesearch/zapx/v12 v12.4.2 h1:fzRbhllQmEMUuAQ7zBuMvKRlcPA5ESTgWlDEoB9uQNE=
github.com/blevesearch/zapx/v12 v12.4.2/go.mod h1:TdFmr7afSz1hFh/SIBCCZvcLfzYvievIH6aEISCte58=
github.com/blevesearch/zapx/v13 v13.4.2 h1:46PIZCO/ZuKZYgxI8Y7lOJqX3Irkc3N8W82QTK3MVks=
github.com/blevesearch/zapx/v13 v13.4.2/go.mod h1:knK8z2NdQHlb5ot/uj8wuvOq5PhDGjNYQQy0QDnopZk=
github.com/blevesearch/zapx/v14 v14.4.2 h1:2SGHakVKd+TrtEqpfeq8X+So5PShQ5nW6GNxT7fWYz0=
github.com/blevesearch/zapx/v14 v14.4.2/go.mod h1:rz0XNb/OZSMjNorufDGSpFpjoFKhXmppH9Hi7a877D8=
github.com/blevesearch/zapx/v15 v15.4.2 h1:sWxpDE0QQOTjyxYbAVjt3+0ieu8NCE0fDRaFxEsp31k=
github.com/blevesearch/zapx/v15 v15.4.2/go.mod h1:1pssev/59FsuWcgSnTa0OeEpOzmhtmr/0/11H0Z8+Nw=
github.com/blevesearch/zapx/v16 v16.2.4 h1:tGgfvleXTAkwsD5mEzgM3zCS/7pgocTCnO1oyAUjlww=
github.com/blevesearch/zapx/v16 v16.2.4/go.mod h1:Rti/REtuuMmzwsI8/C/qIzRaEoSK/wiFYw5e5ctUKKs=
github.com/blinkbean/dingtalk v1.1.3 h1:MbidFZYom7DTFHD/YIs+eaI7kRy52kmWE/sy0xjo6E4=
github.com/blinkbean/dingtalk v1.1.3/go.mod h1:9BaLuGSBqY3vT5hstValh48DbsKO7vaHaJnG9pXwbto=
github.com/boombuler/barcode v1.0.0/go.mod h1:paBWMcWSl3LHKBqUq+rly7CNSldXjb2rDl3JlRe0mD8=
@@ -1544,6 +1585,8 @@ github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9G
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/morikuni/aec v1.0.0 h1:nP9CBfwrvYnBRgY6qfDQkygYDmYwOilePFkwzv4dU8A=
github.com/morikuni/aec v1.0.0/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc=
github.com/mschoch/smat v0.2.0 h1:8imxQsjDm8yFEAVBe7azKmKSgzSkZXDuKkSq9374khM=
github.com/mschoch/smat v0.2.0/go.mod h1:kc9mz7DoBKqDyiRL7VZN8KvXQMWeTaVnttLRXOlotKw=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 h1:C3w9PqII01/Oq1c1nUAm88MOHcQC9l5mIlSMApZMrHA=
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822/go.mod h1:+n7T8mK8HuQTcFwEeznm/DIxMOiR9yIdICNftLE1DvQ=
github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U=
@@ -1916,6 +1959,8 @@ github.com/zeebo/assert v1.3.0/go.mod h1:Pq9JiuJQpG8JLJdtkwrJESF0Foym2/D9XMU5ciN
github.com/zeebo/xxh3 v1.0.2/go.mod h1:5NWz9Sef7zIDm2JHfFlcQvNekmcEl9ekUZQQKCYaDcA=
github.com/zenazn/goji v0.9.0/go.mod h1:7S9M489iMyHBNxwZnk9/EHS098H4/F6TATF2mIxtB1Q=
go.etcd.io/bbolt v1.3.2/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/bbolt v1.4.0 h1:TU77id3TnN/zKr7CO/uk+fBCwF2jGcMuw2B/FMAzYIk=
go.etcd.io/bbolt v1.4.0/go.mod h1:AsD+OCi/qPN1giOX1aiLAha3o1U8rAz65bvN4j0sRuk=
go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs=
go.etcd.io/etcd/api/v3 v3.5.7/go.mod h1:9qew1gCdDDLu+VwmeG+iFpL+QlpHTo7iubavdVDgCAA=
go.etcd.io/etcd/api/v3 v3.5.9/go.mod h1:uyAal843mC8uUVSLWz6eHa/d971iDGnCRpmKd2Z+X8k=
@@ -2791,6 +2836,7 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gorm.io/datatypes v1.2.6 h1:KafLdXvFUhzNeL2ncm03Gl3eTLONQfNKZ+wJ+9Y4Nck=

View File

@@ -9,6 +9,7 @@ import (
"sync"
"time"
"github.com/0xJacky/Nginx-UI/internal/event"
"github.com/0xJacky/Nginx-UI/internal/nginx"
"github.com/fsnotify/fsnotify"
"github.com/uozi-tech/cosy/logger"
@@ -20,15 +21,12 @@ type ScanCallback func(configPath string, content []byte) error
// Scanner is responsible for scanning and watching nginx config files
type Scanner struct {
ctx context.Context // Context for the scanner
watcher *fsnotify.Watcher // File system watcher
scanTicker *time.Ticker // Ticker for periodic scanning
initialized bool // Whether the scanner has been initialized
scanning bool // Whether a scan is currently in progress
scanMutex sync.RWMutex // Mutex for protecting the scanning state
statusChan chan bool // Channel to broadcast scanning status changes
subscribers map[chan bool]struct{} // Set of subscribers
subscriberMux sync.RWMutex // Mutex for protecting the subscribers map
ctx context.Context // Context for the scanner
watcher *fsnotify.Watcher // File system watcher
scanTicker *time.Ticker // Ticker for periodic scanning
initialized bool // Whether the scanner has been initialized
scanning bool // Whether a scan is currently in progress
scanMutex sync.RWMutex // Mutex for protecting the scanning state
}
// Global variables
@@ -65,13 +63,7 @@ func GetScanner() *Scanner {
defer configScannerInitMux.Unlock()
if scanner == nil {
scanner = &Scanner{
statusChan: make(chan bool, 10), // Buffer to prevent blocking
subscribers: make(map[chan bool]struct{}),
}
// Start broadcaster goroutine
go scanner.broadcastStatus()
scanner = &Scanner{}
}
return scanner
}
@@ -84,56 +76,12 @@ func RegisterCallback(callback ScanCallback) {
scanCallbacks = append(scanCallbacks, callback)
}
// broadcastStatus listens for status changes and broadcasts to all subscribers
func (s *Scanner) broadcastStatus() {
for status := range s.statusChan {
s.subscriberMux.RLock()
for ch := range s.subscribers {
// Non-blocking send to prevent slow subscribers from blocking others
select {
case ch <- status:
default:
// Skip if channel buffer is full
}
}
s.subscriberMux.RUnlock()
}
}
// SubscribeScanningStatus allows a client to subscribe to scanning status changes
func SubscribeScanningStatus() chan bool {
s := GetScanner()
ch := make(chan bool, 5) // Buffer to prevent blocking
// Add to subscribers
s.subscriberMux.Lock()
s.subscribers[ch] = struct{}{}
s.subscriberMux.Unlock()
// Send current status immediately
s.scanMutex.RLock()
currentStatus := s.scanning
s.scanMutex.RUnlock()
// Non-blocking send
select {
case ch <- currentStatus:
default:
}
return ch
}
// UnsubscribeScanningStatus removes a subscriber from receiving status updates
func UnsubscribeScanningStatus(ch chan bool) {
s := GetScanner()
s.subscriberMux.Lock()
delete(s.subscribers, ch)
s.subscriberMux.Unlock()
// Close the channel so the client knows it's unsubscribed
close(ch)
// publishScanningStatus publishes the scanning status to the event bus
func (s *Scanner) publishScanningStatus(scanning bool) {
event.Publish(event.Event{
Type: event.EventTypeIndexScanning,
Data: scanning,
})
}
// Initialize sets up the scanner and starts watching for file changes
@@ -333,8 +281,8 @@ func (s *Scanner) scanSingleFileWithDepth(filePath string, visited map[string]bo
wasScanning = s.scanning
s.scanning = true
if !wasScanning {
// Only broadcast if status changed from not scanning to scanning
s.statusChan <- true
// Only publish if status changed from not scanning to scanning
s.publishScanningStatus(true)
}
s.scanMutex.Unlock()
@@ -342,8 +290,8 @@ func (s *Scanner) scanSingleFileWithDepth(filePath string, visited map[string]bo
defer func() {
s.scanMutex.Lock()
s.scanning = false
// Broadcast the completion
s.statusChan <- false
// Publish the completion
s.publishScanningStatus(false)
s.scanMutex.Unlock()
}()
}
@@ -431,8 +379,8 @@ func (s *Scanner) ScanAllConfigs() error {
wasScanning := s.scanning
s.scanning = true
if !wasScanning {
// Only broadcast if status changed from not scanning to scanning
s.statusChan <- true
// Only publish if status changed from not scanning to scanning
s.publishScanningStatus(true)
}
s.scanMutex.Unlock()
@@ -440,8 +388,8 @@ func (s *Scanner) ScanAllConfigs() error {
defer func() {
s.scanMutex.Lock()
s.scanning = false
// Broadcast the completion
s.statusChan <- false
// Publish the completion
s.publishScanningStatus(false)
s.scanMutex.Unlock()
}()
@@ -494,19 +442,6 @@ func (s *Scanner) Shutdown() {
if s.scanTicker != nil {
s.scanTicker.Stop()
}
// Clean up subscriber resources
s.subscriberMux.Lock()
// Close all subscriber channels
for ch := range s.subscribers {
close(ch)
}
// Clear the map
s.subscribers = make(map[chan bool]struct{})
s.subscriberMux.Unlock()
// Close the status channel
close(s.statusChan)
}
// IsScanningInProgress returns whether a scan is currently in progress

View File

@@ -1,23 +1,15 @@
package cert
import (
"context"
"sync"
"github.com/0xJacky/Nginx-UI/internal/event"
)
var (
// mutex is used to control access to certificate operations
mutex sync.Mutex
// statusChan is the channel to broadcast certificate status changes
statusChan = make(chan bool, 10)
// subscribers is a map of channels that are subscribed to certificate status changes
subscribers = make(map[chan bool]struct{})
// subscriberMux protects the subscribers map from concurrent access
subscriberMux sync.RWMutex
// isProcessing indicates whether a certificate operation is in progress
isProcessing bool
@@ -25,69 +17,12 @@ var (
processingMutex sync.RWMutex
)
func initBroadcastStatus(ctx context.Context) {
// Start broadcasting goroutine
go broadcastStatus(ctx)
}
// broadcastStatus listens for status changes and broadcasts to all subscribers
func broadcastStatus(ctx context.Context) {
for {
select {
case <-ctx.Done():
// Context cancelled, clean up resources and exit
close(statusChan)
return
case status, ok := <-statusChan:
if !ok {
// Channel closed, exit
return
}
subscriberMux.RLock()
for ch := range subscribers {
// Non-blocking send to prevent slow subscribers from blocking others
select {
case ch <- status:
default:
// Skip if channel buffer is full
}
}
subscriberMux.RUnlock()
}
}
}
// SubscribeProcessingStatus allows a client to subscribe to certificate processing status changes
func SubscribeProcessingStatus() chan bool {
ch := make(chan bool, 5) // Buffer to prevent blocking
// Add to subscribers
subscriberMux.Lock()
subscribers[ch] = struct{}{}
subscriberMux.Unlock()
// Send current status immediately
processingMutex.RLock()
currentStatus := isProcessing
processingMutex.RUnlock()
// Non-blocking send
select {
case ch <- currentStatus:
default:
}
return ch
}
// UnsubscribeProcessingStatus removes a subscriber from receiving status updates
func UnsubscribeProcessingStatus(ch chan bool) {
subscriberMux.Lock()
delete(subscribers, ch)
subscriberMux.Unlock()
// Close the channel so the client knows it's unsubscribed
close(ch)
// publishProcessingStatus publishes the processing status to the event bus
func publishProcessingStatus(processing bool) {
event.Publish(event.Event{
Type: event.EventTypeAutoCertProcessing,
Data: processing,
})
}
// lock acquires the certificate mutex
@@ -109,12 +44,12 @@ func IsProcessing() bool {
return isProcessing
}
// setProcessingStatus updates the processing status and broadcasts the change
// setProcessingStatus updates the processing status and publishes the change
func setProcessingStatus(status bool) {
processingMutex.Lock()
if isProcessing != status {
isProcessing = status
statusChan <- status
publishProcessingStatus(status)
}
processingMutex.Unlock()
}

View File

@@ -13,8 +13,6 @@ import (
// InitRegister init the default user for acme
func InitRegister(ctx context.Context) {
initBroadcastStatus(ctx)
email := settings.CertSettings.Email
if settings.CertSettings.Email == "" {
return

188
internal/event/bus.go Normal file
View File

@@ -0,0 +1,188 @@
package event
import (
"context"
"sync"
"github.com/uozi-tech/cosy/logger"
)
// WebSocketHub interface for broadcasting messages
type WebSocketHub interface {
BroadcastMessage(event string, data interface{})
}
// WebSocketEventConfig holds configuration for WebSocket event forwarding
type WebSocketEventConfig struct {
EventType EventType
WSEventName string
DataTransform func(data interface{}) interface{}
}
// EventBus manages event publishing and WebSocket forwarding
type EventBus struct {
wsHub WebSocketHub
wsConfigs map[EventType]*WebSocketEventConfig
wsMutex sync.RWMutex
ctx context.Context
cancel context.CancelFunc
}
var (
globalBus *EventBus
busOnce sync.Once
)
// GetEventBus returns the global event bus instance
func GetEventBus() *EventBus {
busOnce.Do(func() {
ctx, cancel := context.WithCancel(context.Background())
globalBus = &EventBus{
wsConfigs: make(map[EventType]*WebSocketEventConfig),
ctx: ctx,
cancel: cancel,
}
})
return globalBus
}
// SetWebSocketHub sets the WebSocket hub for direct event forwarding
func (eb *EventBus) SetWebSocketHub(hub WebSocketHub) {
eb.wsMutex.Lock()
defer eb.wsMutex.Unlock()
eb.wsHub = hub
logger.Info("WebSocket hub registered with event bus")
}
// RegisterWebSocketEventForwarding registers an event type to be forwarded to WebSocket clients
func (eb *EventBus) RegisterWebSocketEventForwarding(eventType EventType, wsEventName string) {
eb.RegisterWebSocketEventForwardingWithTransform(eventType, wsEventName, func(data interface{}) interface{} {
return data // Default: no transformation
})
}
// RegisterWebSocketEventForwardingWithTransform registers an event type with custom data transformation
func (eb *EventBus) RegisterWebSocketEventForwardingWithTransform(eventType EventType, wsEventName string, transform func(data interface{}) interface{}) {
eb.wsMutex.Lock()
defer eb.wsMutex.Unlock()
// Only register if not already registered
if _, exists := eb.wsConfigs[eventType]; !exists {
config := &WebSocketEventConfig{
EventType: eventType,
WSEventName: wsEventName,
DataTransform: transform,
}
eb.wsConfigs[eventType] = config
logger.Debugf("Registered WebSocket event forwarding: %s -> %s", eventType, wsEventName)
}
}
// Publish forwards an event directly to WebSocket clients
func (eb *EventBus) Publish(event Event) {
eb.forwardToWebSocket(event)
}
// forwardToWebSocket forwards an event to WebSocket clients if configured
func (eb *EventBus) forwardToWebSocket(event Event) {
eb.wsMutex.RLock()
config, exists := eb.wsConfigs[event.Type]
hub := eb.wsHub
eb.wsMutex.RUnlock()
if !exists || hub == nil {
return
}
// Apply data transformation
wsData := config.DataTransform(event.Data)
hub.BroadcastMessage(config.WSEventName, wsData)
}
// Shutdown gracefully shuts down the event bus
func (eb *EventBus) Shutdown() {
eb.cancel()
eb.wsMutex.Lock()
defer eb.wsMutex.Unlock()
// Clear all configurations
eb.wsConfigs = make(map[EventType]*WebSocketEventConfig)
eb.wsHub = nil
logger.Info("Event bus shutdown completed")
}
// Context returns the event bus context
func (eb *EventBus) Context() context.Context {
return eb.ctx
}
// Convenience functions for global event bus
// Publish forwards an event to WebSocket clients on the global bus
func Publish(event Event) {
GetEventBus().Publish(event)
}
// SetWebSocketHub sets the WebSocket hub for the global event bus
func SetWebSocketHub(hub WebSocketHub) {
GetEventBus().SetWebSocketHub(hub)
}
// RegisterWebSocketEventForwarding registers WebSocket event forwarding on the global bus
func RegisterWebSocketEventForwarding(eventType EventType, wsEventName string) {
GetEventBus().RegisterWebSocketEventForwarding(eventType, wsEventName)
}
// RegisterWebSocketEventForwardingWithTransform registers WebSocket event forwarding with transform on the global bus
func RegisterWebSocketEventForwardingWithTransform(eventType EventType, wsEventName string, transform func(data interface{}) interface{}) {
GetEventBus().RegisterWebSocketEventForwardingWithTransform(eventType, wsEventName, transform)
}
// RegisterWebSocketEventConfigs registers multiple WebSocket event configurations
func RegisterWebSocketEventConfigs(configs []WebSocketEventConfig) {
bus := GetEventBus()
for _, config := range configs {
bus.RegisterWebSocketEventForwardingWithTransform(config.EventType, config.WSEventName, config.DataTransform)
}
}
// GetDefaultWebSocketEventConfigs returns the default WebSocket event configurations
func GetDefaultWebSocketEventConfigs() []WebSocketEventConfig {
return []WebSocketEventConfig{
{
EventType: EventTypeIndexScanning,
WSEventName: "index_scanning",
DataTransform: func(data interface{}) interface{} {
return data
},
},
{
EventType: EventTypeAutoCertProcessing,
WSEventName: "auto_cert_processing",
DataTransform: func(data interface{}) interface{} {
return data
},
},
{
EventType: EventTypeProcessingStatus,
WSEventName: "processing_status",
DataTransform: func(data interface{}) interface{} {
return data
},
},
{
EventType: EventTypeNginxLogStatus,
WSEventName: "nginx_log_status",
DataTransform: func(data interface{}) interface{} {
return data
},
},
{
EventType: EventTypeNotification,
WSEventName: "notification",
DataTransform: func(data interface{}) interface{} {
return data
},
},
}
}

37
internal/event/init.go Normal file
View File

@@ -0,0 +1,37 @@
package event
import (
"context"
"github.com/uozi-tech/cosy/logger"
)
// InitEventSystem initializes the event system and sets up WebSocket forwarding
func InitEventSystem(ctx context.Context) {
logger.Info("Initializing event system...")
// Initialize the event bus by getting the singleton instance
GetEventBus()
// Initialize WebSocket event forwarding configurations
initWebSocketEventForwarding()
logger.Info("Event system initialized successfully")
defer ShutdownEventSystem()
<-ctx.Done()
}
// initWebSocketEventForwarding initializes WebSocket event forwarding configurations
func initWebSocketEventForwarding() {
// Register default event forwarding configurations
RegisterWebSocketEventConfigs(GetDefaultWebSocketEventConfigs())
logger.Info("WebSocket event forwarding initialized")
}
// ShutdownEventSystem gracefully shuts down the event system
func ShutdownEventSystem() {
logger.Info("Shutting down event system...")
GetEventBus().Shutdown()
logger.Info("Event system shutdown completed")
}

34
internal/event/types.go Normal file
View File

@@ -0,0 +1,34 @@
package event
// EventType represents the type of event
type EventType string
const (
// Processing status events
EventTypeIndexScanning EventType = "index_scanning"
EventTypeAutoCertProcessing EventType = "auto_cert_processing"
EventTypeProcessingStatus EventType = "processing_status"
// Nginx log status events (for backward compatibility)
EventTypeNginxLogStatus EventType = "nginx_log_status"
// Notification events
EventTypeNotification EventType = "notification"
)
// Event represents a generic event structure
type Event struct {
Type EventType `json:"type"`
Data interface{} `json:"data"`
}
// ProcessingStatusData represents the data for processing status events
type ProcessingStatusData struct {
IndexScanning bool `json:"index_scanning"`
AutoCertProcessing bool `json:"auto_cert_processing"`
}
// NginxLogStatusData represents the data for nginx log status events (backward compatibility)
type NginxLogStatusData struct {
Scanning bool `json:"scanning"`
}

View File

@@ -16,6 +16,7 @@ import (
"github.com/0xJacky/Nginx-UI/internal/cluster"
"github.com/0xJacky/Nginx-UI/internal/cron"
"github.com/0xJacky/Nginx-UI/internal/docker"
"github.com/0xJacky/Nginx-UI/internal/event"
"github.com/0xJacky/Nginx-UI/internal/helper"
"github.com/0xJacky/Nginx-UI/internal/mcp"
"github.com/0xJacky/Nginx-UI/internal/passkey"
@@ -54,6 +55,7 @@ func Boot(ctx context.Context) {
syncs := []func(ctx context.Context){
analytic.RecordServerAnalytic,
event.InitEventSystem,
}
for _, v := range async {

View File

@@ -1,6 +1,7 @@
package notification
import (
"github.com/0xJacky/Nginx-UI/internal/event"
"github.com/0xJacky/Nginx-UI/model"
"github.com/0xJacky/Nginx-UI/query"
"github.com/uozi-tech/cosy/logger"
@@ -21,6 +22,14 @@ func push(nType model.NotificationType, title string, content string, details an
logger.Error(err)
return
}
// Use event system instead of direct broadcast
event.Publish(event.Event{
Type: event.EventTypeNotification,
Data: data,
})
// Keep the old broadcast for backward compatibility
broadcast(data)
extNotify := &ExternalMessage{data}