Files
Luis Pater 56df36895a feat: add configurable retention period for Redis usage queue
- Introduced `redis-usage-queue-retention-seconds` config parameter with a default of 60 seconds and a max of 3600 seconds.
- Updated logic in `redisqueue` to honor configurable retention periods for enqueued usage data.
- Modified config validation and initialization to support and enforce retention limits.
- Enhanced change tracking in `config_diff` to detect updates to this parameter.
2026-05-02 20:43:16 +08:00

156 lines
2.5 KiB
Go

package redisqueue
import (
"sync"
"sync/atomic"
"time"
)
const (
defaultRetentionSeconds int64 = 60
maxRetentionSeconds int64 = 3600
)
type queueItem struct {
enqueuedAt time.Time
payload []byte
}
type queue struct {
mu sync.Mutex
items []queueItem
head int
}
var (
enabled atomic.Bool
retentionSeconds atomic.Int64
global queue
)
func init() {
retentionSeconds.Store(defaultRetentionSeconds)
}
func SetEnabled(value bool) {
enabled.Store(value)
if !value {
global.clear()
}
}
func Enabled() bool {
return enabled.Load()
}
func SetRetentionSeconds(value int) {
normalized := int64(value)
if normalized <= 0 {
normalized = defaultRetentionSeconds
} else if normalized > maxRetentionSeconds {
normalized = maxRetentionSeconds
}
retentionSeconds.Store(normalized)
}
func Enqueue(payload []byte) {
if !Enabled() {
return
}
if len(payload) == 0 {
return
}
global.enqueue(payload)
}
func PopOldest(count int) [][]byte {
if !Enabled() {
return nil
}
if count <= 0 {
return nil
}
return global.popOldest(count)
}
func (q *queue) clear() {
q.mu.Lock()
defer q.mu.Unlock()
q.items = nil
q.head = 0
}
func (q *queue) enqueue(payload []byte) {
now := time.Now()
q.mu.Lock()
defer q.mu.Unlock()
q.pruneLocked(now)
q.items = append(q.items, queueItem{
enqueuedAt: now,
payload: append([]byte(nil), payload...),
})
q.maybeCompactLocked()
}
func (q *queue) popOldest(count int) [][]byte {
now := time.Now()
q.mu.Lock()
defer q.mu.Unlock()
q.pruneLocked(now)
available := len(q.items) - q.head
if available <= 0 {
q.items = nil
q.head = 0
return nil
}
if count > available {
count = available
}
out := make([][]byte, 0, count)
for i := 0; i < count; i++ {
item := q.items[q.head+i]
out = append(out, item.payload)
}
q.head += count
q.maybeCompactLocked()
return out
}
func (q *queue) pruneLocked(now time.Time) {
if q.head >= len(q.items) {
q.items = nil
q.head = 0
return
}
windowSeconds := retentionSeconds.Load()
if windowSeconds <= 0 {
windowSeconds = defaultRetentionSeconds
}
cutoff := now.Add(-time.Duration(windowSeconds) * time.Second)
for q.head < len(q.items) && q.items[q.head].enqueuedAt.Before(cutoff) {
q.head++
}
}
func (q *queue) maybeCompactLocked() {
if q.head == 0 {
return
}
if q.head >= len(q.items) {
q.items = nil
q.head = 0
return
}
if q.head < 1024 && q.head*2 < len(q.items) {
return
}
q.items = append([]queueItem(nil), q.items[q.head:]...)
q.head = 0
}