Tail management logs with cursors

This commit is contained in:
LTbinglingfeng
2026-06-15 02:08:06 +08:00
parent c61453a807
commit 95a72a47c8
2 changed files with 277 additions and 12 deletions

View File

@@ -2,9 +2,11 @@ package management
import (
"bufio"
"bytes"
"crypto/sha256"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
"io"
"math"
@@ -53,11 +55,7 @@ func (h *Handler) GetLogs(c *gin.Context) {
if err != nil {
if os.IsNotExist(err) {
cutoff := parseCutoff(c.Query("after"))
c.JSON(http.StatusOK, gin.H{
"lines": []string{},
"line-count": 0,
"latest-timestamp": cutoff,
})
writeLogsResponse(c, []string{}, 0, cutoff, "", false)
return
}
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to list log files: %v", err)})
@@ -71,10 +69,20 @@ func (h *Handler) GetLogs(c *gin.Context) {
}
cutoff := parseCutoff(c.Query("after"))
if strings.TrimSpace(c.Query("cursor")) == "" && cutoff == 0 && limit > 0 {
result, errTail := tailLogFiles(files, limit, 0)
if errTail != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to read log files: %v", errTail)})
return
}
writeLogsResponse(c, result.lines, len(result.lines), result.latest, result.nextCursor, false)
return
}
acc := newLogAccumulator(cutoff, limit)
for i := range files {
if errProcess := acc.consumeFile(files[i]); errProcess != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to read log file %s: %v", files[i], errProcess)})
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to read log file: %v", errProcess)})
return
}
}
@@ -83,11 +91,12 @@ func (h *Handler) GetLogs(c *gin.Context) {
if latest == 0 || latest < cutoff {
latest = cutoff
}
c.JSON(http.StatusOK, gin.H{
"lines": lines,
"line-count": total,
"latest-timestamp": latest,
})
nextCursor, errCursor := cursorForLatestLogFile(files, latest)
if errCursor != nil {
c.JSON(http.StatusInternalServerError, gin.H{"error": fmt.Sprintf("failed to prepare log cursor: %v", errCursor)})
return
}
writeLogsResponse(c, lines, total, latest, nextCursor, false)
}
// DeleteLogs removes all rotated log files and truncates the active log.
@@ -498,6 +507,140 @@ type completeLogRead struct {
hitLimit bool
}
type logReadResult struct {
lines []string
latest int64
nextCursor string
}
func writeLogsResponse(c *gin.Context, lines []string, lineCount int, latest int64, nextCursor string, cursorReset bool) {
if lines == nil {
lines = []string{}
}
payload := gin.H{
"lines": lines,
"line-count": lineCount,
"latest-timestamp": latest,
"next-cursor": nextCursor,
}
if cursorReset {
payload["cursor-reset"] = true
}
c.JSON(http.StatusOK, payload)
}
func tailLogFiles(files []string, limit int, fallbackLatest int64) (logReadResult, error) {
result := logReadResult{
lines: []string{},
latest: fallbackLatest,
}
for i := len(files) - 1; i >= 0; i-- {
remaining := 0
if limit > 0 {
remaining = limit - len(result.lines)
if remaining <= 0 {
break
}
}
read, errRead := readTailLogLines(files[i], remaining)
if errRead != nil {
if errors.Is(errRead, os.ErrNotExist) {
continue
}
return logReadResult{}, errRead
}
if len(read.lines) == 0 {
continue
}
result.lines = append(append([]string{}, read.lines...), result.lines...)
if read.latest > result.latest {
result.latest = read.latest
}
}
nextCursor, errCursor := cursorForLatestLogFile(files, result.latest)
if errCursor != nil {
return logReadResult{}, errCursor
}
result.nextCursor = nextCursor
return result, nil
}
func readTailLogLines(path string, limit int) (completeLogRead, error) {
boundary, errBoundary := completeLogBoundary(path)
if errBoundary != nil {
return completeLogRead{}, errBoundary
}
if boundary == 0 {
return completeLogRead{lines: []string{}}, nil
}
start, errStart := tailStartOffset(path, boundary, limit)
if errStart != nil {
return completeLogRead{}, errStart
}
return readCompleteLogLines(path, start, boundary, limit)
}
func tailStartOffset(path string, boundary int64, limit int) (int64, error) {
if limit <= 0 {
return 0, nil
}
file, errOpen := os.Open(path)
if errOpen != nil {
return 0, errOpen
}
defer func() {
_ = file.Close()
}()
buf := make([]byte, 32*1024)
pos := boundary
lineBreaks := 0
for pos > 0 {
chunk := minInt64(int64(len(buf)), pos)
pos -= chunk
n, errRead := file.ReadAt(buf[:chunk], pos)
if errRead != nil && errRead != io.EOF {
return 0, errRead
}
if n <= 0 {
continue
}
data := buf[:n]
for len(data) > 0 {
idx := bytes.LastIndexByte(data, '\n')
if idx < 0 {
break
}
lineBreaks++
if lineBreaks > limit {
return pos + int64(idx) + 1, nil
}
data = data[:idx]
}
}
return 0, nil
}
func cursorForLatestLogFile(files []string, latest int64) (string, error) {
for i := len(files) - 1; i >= 0; i-- {
boundary, errBoundary := completeLogBoundary(files[i])
if errBoundary != nil {
if errors.Is(errBoundary, os.ErrNotExist) {
continue
}
return "", errBoundary
}
cursor, errCursor := newLogCursor(files[i], boundary, latest)
if errCursor != nil {
if errors.Is(errCursor, os.ErrNotExist) {
continue
}
return "", errCursor
}
return cursor, nil
}
return "", nil
}
func encodeLogCursor(cursor logCursor) (string, error) {
raw, err := json.Marshal(cursor)
if err != nil {
@@ -760,7 +903,7 @@ func completeLogBoundary(path string) (int64, error) {
if n <= 0 {
continue
}
if idx := strings.LastIndexByte(string(buf[:n]), '\n'); idx >= 0 {
if idx := bytes.LastIndexByte(buf[:n], '\n'); idx >= 0 {
return pos + int64(idx) + 1, nil
}
}

View File

@@ -3,11 +3,18 @@ package management
import (
"encoding/base64"
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"reflect"
"strconv"
"strings"
"testing"
"time"
"github.com/gin-gonic/gin"
"github.com/router-for-me/CLIProxyAPI/v7/internal/config"
)
func TestDecodeLogCursorRejectsUnsafeFiles(t *testing.T) {
@@ -126,6 +133,80 @@ func TestReadCompleteLogLinesSkipsTrailingPartial(t *testing.T) {
}
}
func TestGetLogsTailLimitReturnsRecentLinesWithCursor(t *testing.T) {
dir := t.TempDir()
lines := []string{
"[2026-06-15 10:00:00] first",
"[2026-06-15 10:00:01] second",
"[2026-06-15 10:00:02] third",
"[2026-06-15 10:00:03] fourth",
}
writeMainLog(t, dir, strings.Join(lines, "\n")+"\n")
resp := performGetLogs(t, newLogsTestHandler(dir, true), "/v0/management/logs?limit=2")
wantLines := []string{lines[2], lines[3]}
if !reflect.DeepEqual(resp.Lines, wantLines) {
t.Fatalf("lines = %#v, want %#v", resp.Lines, wantLines)
}
if resp.LineCount != 2 {
t.Fatalf("line-count = %d, want 2", resp.LineCount)
}
if resp.NextCursor == "" {
t.Fatal("next-cursor is empty")
}
wantLatest := time.Date(2026, 6, 15, 10, 0, 3, 0, time.Local).Unix()
if resp.LatestTimestamp != wantLatest {
t.Fatalf("latest-timestamp = %d, want %d", resp.LatestTimestamp, wantLatest)
}
}
func TestGetLogsNoLimitKeepsFullScanBehavior(t *testing.T) {
dir := t.TempDir()
writeMainLog(t, dir, "complete\npartial")
resp := performGetLogs(t, newLogsTestHandler(dir, true), "/v0/management/logs")
wantLines := []string{"complete", "partial"}
if !reflect.DeepEqual(resp.Lines, wantLines) {
t.Fatalf("lines = %#v, want %#v", resp.Lines, wantLines)
}
if resp.LineCount != 2 {
t.Fatalf("line-count = %d, want full scan count 2", resp.LineCount)
}
if resp.NextCursor == "" {
t.Fatal("next-cursor is empty")
}
cursor, errCursor := decodeLogCursor(resp.NextCursor)
if errCursor != nil {
t.Fatalf("decode next-cursor: %v", errCursor)
}
if cursor.Offset != int64(len("complete\n")) {
t.Fatalf("cursor offset = %d, want complete-line boundary", cursor.Offset)
}
}
func TestGetLogsAfterKeepsTimestampScanAndReturnsCursor(t *testing.T) {
dir := t.TempDir()
lines := []string{
"[2026-06-15 10:00:00] first",
"[2026-06-15 10:00:01] second",
"[2026-06-15 10:00:02] third",
}
writeMainLog(t, dir, strings.Join(lines, "\n")+"\n")
cutoff := time.Date(2026, 6, 15, 10, 0, 0, 0, time.Local).Unix()
resp := performGetLogs(t, newLogsTestHandler(dir, true), "/v0/management/logs?after="+strconv.FormatInt(cutoff, 10))
wantLines := []string{lines[1], lines[2]}
if !reflect.DeepEqual(resp.Lines, wantLines) {
t.Fatalf("lines = %#v, want %#v", resp.Lines, wantLines)
}
if resp.LineCount != 3 {
t.Fatalf("line-count = %d, want full scan count 3", resp.LineCount)
}
if resp.NextCursor == "" {
t.Fatal("next-cursor is empty")
}
}
func mustEncodeRawCursor(t *testing.T, cursor logCursor) string {
t.Helper()
raw, err := json.Marshal(cursor)
@@ -134,3 +215,44 @@ func mustEncodeRawCursor(t *testing.T, cursor logCursor) string {
}
return base64.RawURLEncoding.EncodeToString(raw)
}
type logsAPIResponse struct {
Lines []string `json:"lines"`
LineCount int `json:"line-count"`
LatestTimestamp int64 `json:"latest-timestamp"`
NextCursor string `json:"next-cursor"`
CursorReset bool `json:"cursor-reset"`
}
func newLogsTestHandler(dir string, loggingToFile bool) *Handler {
h := NewHandlerWithoutConfigFilePath(&config.Config{LoggingToFile: loggingToFile}, nil)
h.SetLogDirectory(dir)
return h
}
func performGetLogs(t *testing.T, h *Handler, target string) logsAPIResponse {
t.Helper()
gin.SetMode(gin.TestMode)
rec := httptest.NewRecorder()
c, _ := gin.CreateTestContext(rec)
c.Request = httptest.NewRequest(http.MethodGet, target, nil)
h.GetLogs(c)
if rec.Code != http.StatusOK {
t.Fatalf("GetLogs status = %d, body = %s", rec.Code, rec.Body.String())
}
var resp logsAPIResponse
if err := json.Unmarshal(rec.Body.Bytes(), &resp); err != nil {
t.Fatalf("decode response: %v", err)
}
if resp.Lines == nil {
resp.Lines = []string{}
}
return resp
}
func writeMainLog(t *testing.T, dir, content string) {
t.Helper()
if err := os.WriteFile(filepath.Join(dir, defaultLogFileName), []byte(content), 0o644); err != nil {
t.Fatalf("write main log: %v", err)
}
}