package redisqueue import ( "sync" "sync/atomic" "time" ) const ( defaultRetentionSeconds int64 = 60 maxRetentionSeconds int64 = 3600 usageSubscriberBuffer = 256 ) type queueItem struct { enqueuedAt time.Time payload []byte } type queue struct { mu sync.Mutex items []queueItem head int subscribers map[uint64]chan []byte nextSubscriberID uint64 } var ( enabled atomic.Bool retentionSeconds atomic.Int64 global queue ) func init() { retentionSeconds.Store(defaultRetentionSeconds) } func SetEnabled(value bool) { enabled.Store(value) if !value { global.clear() } } func Enabled() bool { return enabled.Load() } func SetRetentionSeconds(value int) { normalized := int64(value) if normalized <= 0 { normalized = defaultRetentionSeconds } else if normalized > maxRetentionSeconds { normalized = maxRetentionSeconds } retentionSeconds.Store(normalized) } func Enqueue(payload []byte) { if !Enabled() { return } if len(payload) == 0 { return } if global.publishToSubscribers(payload) { return } global.enqueue(payload) } func PopOldest(count int) [][]byte { if !Enabled() { return nil } if count <= 0 { return nil } return global.popOldest(count) } func SubscribeUsage() (<-chan []byte, func()) { return global.subscribeUsage() } func (q *queue) clear() { q.mu.Lock() subscribers := make([]chan []byte, 0, len(q.subscribers)) for _, subscriber := range q.subscribers { subscribers = append(subscribers, subscriber) } q.items = nil q.head = 0 q.subscribers = nil q.mu.Unlock() for _, subscriber := range subscribers { close(subscriber) } } func (q *queue) enqueue(payload []byte) { now := time.Now() q.mu.Lock() defer q.mu.Unlock() q.pruneLocked(now) q.items = append(q.items, queueItem{ enqueuedAt: now, payload: append([]byte(nil), payload...), }) q.maybeCompactLocked() } func (q *queue) publishToSubscribers(payload []byte) bool { q.mu.Lock() defer q.mu.Unlock() if len(q.subscribers) == 0 { return false } for id, subscriber := range q.subscribers { cloned := append([]byte(nil), payload...) select { case subscriber <- cloned: default: delete(q.subscribers, id) close(subscriber) } } return true } func (q *queue) subscribeUsage() (<-chan []byte, func()) { subscriber := make(chan []byte, usageSubscriberBuffer) q.mu.Lock() if q.subscribers == nil { q.subscribers = make(map[uint64]chan []byte) } q.nextSubscriberID++ id := q.nextSubscriberID q.subscribers[id] = subscriber q.mu.Unlock() var once sync.Once unsubscribe := func() { once.Do(func() { q.unsubscribeUsage(id) }) } return subscriber, unsubscribe } func (q *queue) unsubscribeUsage(id uint64) { q.mu.Lock() subscriber, ok := q.subscribers[id] if ok { delete(q.subscribers, id) } q.mu.Unlock() if ok { close(subscriber) } } func (q *queue) popOldest(count int) [][]byte { now := time.Now() q.mu.Lock() defer q.mu.Unlock() q.pruneLocked(now) available := len(q.items) - q.head if available <= 0 { q.items = nil q.head = 0 return nil } if count > available { count = available } out := make([][]byte, 0, count) for i := 0; i < count; i++ { item := q.items[q.head+i] out = append(out, item.payload) } q.head += count q.maybeCompactLocked() return out } func (q *queue) pruneLocked(now time.Time) { if q.head >= len(q.items) { q.items = nil q.head = 0 return } windowSeconds := retentionSeconds.Load() if windowSeconds <= 0 { windowSeconds = defaultRetentionSeconds } cutoff := now.Add(-time.Duration(windowSeconds) * time.Second) for q.head < len(q.items) && q.items[q.head].enqueuedAt.Before(cutoff) { q.head++ } } func (q *queue) maybeCompactLocked() { if q.head == 0 { return } if q.head >= len(q.items) { q.items = nil q.head = 0 return } if q.head < 1024 && q.head*2 < len(q.items) { return } q.items = append([]queueItem(nil), q.items[q.head:]...) q.head = 0 }