Files
CLIProxyAPI/internal/redisqueue/queue.go
Luis Pater 28d78273e4 feat(api): implement protocol multiplexer and Redis queue for usage integration
- Added `protocol_multiplexer.go`, enabling support for both HTTP and Redis protocols on a single listener.
- Introduced `redis_queue_protocol.go` to handle Redis-compatible RESP commands for queue management.
- Integrated `redisqueue` package, supporting in-memory queuing with expiration pruning.
- Updated server initialization to manage a shared listener and multiplex connections.
- Adjusted `Handler` to adopt `AuthenticateManagementKey` for modular key validation, supporting both HTTP and Redis flows.
2026-04-25 18:52:24 +08:00

134 lines
1.9 KiB
Go

package redisqueue
import (
"sync"
"sync/atomic"
"time"
)
const retentionWindow = time.Minute
type queueItem struct {
enqueuedAt time.Time
payload []byte
}
type queue struct {
mu sync.Mutex
items []queueItem
head int
}
var (
enabled atomic.Bool
global queue
)
func SetEnabled(value bool) {
enabled.Store(value)
if !value {
global.clear()
}
}
func Enabled() bool {
return enabled.Load()
}
func Enqueue(payload []byte) {
if !Enabled() {
return
}
if len(payload) == 0 {
return
}
global.enqueue(payload)
}
func PopOldest(count int) [][]byte {
if !Enabled() {
return nil
}
if count <= 0 {
return nil
}
return global.popOldest(count)
}
func (q *queue) clear() {
q.mu.Lock()
defer q.mu.Unlock()
q.items = nil
q.head = 0
}
func (q *queue) enqueue(payload []byte) {
now := time.Now()
q.mu.Lock()
defer q.mu.Unlock()
q.pruneLocked(now)
q.items = append(q.items, queueItem{
enqueuedAt: now,
payload: append([]byte(nil), payload...),
})
q.maybeCompactLocked()
}
func (q *queue) popOldest(count int) [][]byte {
now := time.Now()
q.mu.Lock()
defer q.mu.Unlock()
q.pruneLocked(now)
available := len(q.items) - q.head
if available <= 0 {
q.items = nil
q.head = 0
return nil
}
if count > available {
count = available
}
out := make([][]byte, 0, count)
for i := 0; i < count; i++ {
item := q.items[q.head+i]
out = append(out, item.payload)
}
q.head += count
q.maybeCompactLocked()
return out
}
func (q *queue) pruneLocked(now time.Time) {
if q.head >= len(q.items) {
q.items = nil
q.head = 0
return
}
cutoff := now.Add(-retentionWindow)
for q.head < len(q.items) && q.items[q.head].enqueuedAt.Before(cutoff) {
q.head++
}
}
func (q *queue) maybeCompactLocked() {
if q.head == 0 {
return
}
if q.head >= len(q.items) {
q.items = nil
q.head = 0
return
}
if q.head < 1024 && q.head*2 < len(q.items) {
return
}
q.items = append([]queueItem(nil), q.items[q.head:]...)
q.head = 0
}