Files
CLIProxyAPI/internal/redisqueue/queue.go
Luis Pater 1d529c3ce4 feat(redis): implement Pub/Sub support for usage tracking
- Added Redis Pub/Sub capability to broadcast usage updates to subscribed clients.
- Enhanced `redisqueue` with subscriber management and message broadcasting.
- Updated tests to validate Pub/Sub message handling, subscription behavior, and fallback to the queue after unsubscribing.
- Integrated `project_id` parsing into auth-files logic to include project identifiers in metadata.
2026-05-15 21:59:43 +08:00

231 lines
3.9 KiB
Go

package redisqueue
import (
"sync"
"sync/atomic"
"time"
)
const (
defaultRetentionSeconds int64 = 60
maxRetentionSeconds int64 = 3600
usageSubscriberBuffer = 256
)
type queueItem struct {
enqueuedAt time.Time
payload []byte
}
type queue struct {
mu sync.Mutex
items []queueItem
head int
subscribers map[uint64]chan []byte
nextSubscriberID uint64
}
var (
enabled atomic.Bool
retentionSeconds atomic.Int64
global queue
)
func init() {
retentionSeconds.Store(defaultRetentionSeconds)
}
func SetEnabled(value bool) {
enabled.Store(value)
if !value {
global.clear()
}
}
func Enabled() bool {
return enabled.Load()
}
func SetRetentionSeconds(value int) {
normalized := int64(value)
if normalized <= 0 {
normalized = defaultRetentionSeconds
} else if normalized > maxRetentionSeconds {
normalized = maxRetentionSeconds
}
retentionSeconds.Store(normalized)
}
func Enqueue(payload []byte) {
if !Enabled() {
return
}
if len(payload) == 0 {
return
}
if global.publishToSubscribers(payload) {
return
}
global.enqueue(payload)
}
func PopOldest(count int) [][]byte {
if !Enabled() {
return nil
}
if count <= 0 {
return nil
}
return global.popOldest(count)
}
func SubscribeUsage() (<-chan []byte, func()) {
return global.subscribeUsage()
}
func (q *queue) clear() {
q.mu.Lock()
subscribers := make([]chan []byte, 0, len(q.subscribers))
for _, subscriber := range q.subscribers {
subscribers = append(subscribers, subscriber)
}
q.items = nil
q.head = 0
q.subscribers = nil
q.mu.Unlock()
for _, subscriber := range subscribers {
close(subscriber)
}
}
func (q *queue) enqueue(payload []byte) {
now := time.Now()
q.mu.Lock()
defer q.mu.Unlock()
q.pruneLocked(now)
q.items = append(q.items, queueItem{
enqueuedAt: now,
payload: append([]byte(nil), payload...),
})
q.maybeCompactLocked()
}
func (q *queue) publishToSubscribers(payload []byte) bool {
q.mu.Lock()
defer q.mu.Unlock()
if len(q.subscribers) == 0 {
return false
}
for id, subscriber := range q.subscribers {
cloned := append([]byte(nil), payload...)
select {
case subscriber <- cloned:
default:
delete(q.subscribers, id)
close(subscriber)
}
}
return true
}
func (q *queue) subscribeUsage() (<-chan []byte, func()) {
subscriber := make(chan []byte, usageSubscriberBuffer)
q.mu.Lock()
if q.subscribers == nil {
q.subscribers = make(map[uint64]chan []byte)
}
q.nextSubscriberID++
id := q.nextSubscriberID
q.subscribers[id] = subscriber
q.mu.Unlock()
var once sync.Once
unsubscribe := func() {
once.Do(func() {
q.unsubscribeUsage(id)
})
}
return subscriber, unsubscribe
}
func (q *queue) unsubscribeUsage(id uint64) {
q.mu.Lock()
subscriber, ok := q.subscribers[id]
if ok {
delete(q.subscribers, id)
}
q.mu.Unlock()
if ok {
close(subscriber)
}
}
func (q *queue) popOldest(count int) [][]byte {
now := time.Now()
q.mu.Lock()
defer q.mu.Unlock()
q.pruneLocked(now)
available := len(q.items) - q.head
if available <= 0 {
q.items = nil
q.head = 0
return nil
}
if count > available {
count = available
}
out := make([][]byte, 0, count)
for i := 0; i < count; i++ {
item := q.items[q.head+i]
out = append(out, item.payload)
}
q.head += count
q.maybeCompactLocked()
return out
}
func (q *queue) pruneLocked(now time.Time) {
if q.head >= len(q.items) {
q.items = nil
q.head = 0
return
}
windowSeconds := retentionSeconds.Load()
if windowSeconds <= 0 {
windowSeconds = defaultRetentionSeconds
}
cutoff := now.Add(-time.Duration(windowSeconds) * time.Second)
for q.head < len(q.items) && q.items[q.head].enqueuedAt.Before(cutoff) {
q.head++
}
}
func (q *queue) maybeCompactLocked() {
if q.head == 0 {
return
}
if q.head >= len(q.items) {
q.items = nil
q.head = 0
return
}
if q.head < 1024 && q.head*2 < len(q.items) {
return
}
q.items = append([]queueItem(nil), q.items[q.head:]...)
q.head = 0
}