mirror of
https://github.com/xxnuo/MTranServer.git
synced 2026-06-01 11:30:29 +08:00
538 lines
13 KiB
Go
538 lines
13 KiB
Go
package services
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
"path/filepath"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/xxnuo/MTranServer/internal/config"
|
|
"github.com/xxnuo/MTranServer/internal/logger"
|
|
"github.com/xxnuo/MTranServer/internal/manager"
|
|
"github.com/xxnuo/MTranServer/internal/models"
|
|
"github.com/xxnuo/MTranServer/internal/utils"
|
|
)
|
|
|
|
type EngineInfo struct {
|
|
Managers []*manager.Manager
|
|
LastUsed time.Time
|
|
FromLang string
|
|
ToLang string
|
|
stopTimer *time.Timer
|
|
mu sync.Mutex
|
|
nextIdx int
|
|
}
|
|
|
|
var (
|
|
engines = make(map[string]*EngineInfo)
|
|
engMu sync.RWMutex
|
|
)
|
|
|
|
const (
|
|
workerMemoryMB = 2048
|
|
reservedMemoryMB = 4096
|
|
)
|
|
|
|
var ErrInsufficientMemory = errors.New("insufficient memory to create new worker")
|
|
|
|
func canCreateNewWorker() bool {
|
|
availableMB := getAvailableMemoryMB()
|
|
if availableMB == 0 {
|
|
logger.Debug("Cannot determine available memory, allowing worker creation")
|
|
return true
|
|
}
|
|
|
|
requiredMB := uint64(workerMemoryMB + reservedMemoryMB)
|
|
canCreate := availableMB >= requiredMB
|
|
|
|
logger.Debug("Memory check: available=%dMB, required=%dMB, canCreate=%v",
|
|
availableMB, requiredMB, canCreate)
|
|
|
|
return canCreate
|
|
}
|
|
|
|
func (ei *EngineInfo) resetIdleTimer() {
|
|
ei.mu.Lock()
|
|
defer ei.mu.Unlock()
|
|
|
|
ei.LastUsed = time.Now()
|
|
|
|
if ei.stopTimer != nil {
|
|
ei.stopTimer.Stop()
|
|
}
|
|
|
|
cfg := config.GetConfig()
|
|
timeout := time.Duration(cfg.WorkerIdleTimeout) * time.Second
|
|
|
|
ei.stopTimer = time.AfterFunc(timeout, func() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
logger.Error("Panic during engine cleanup: %v", r)
|
|
}
|
|
}()
|
|
|
|
key := fmt.Sprintf("%s-%s", ei.FromLang, ei.ToLang)
|
|
logger.Info("Engine %s idle timeout, stopping...", key)
|
|
|
|
engMu.Lock()
|
|
defer engMu.Unlock()
|
|
|
|
if info, ok := engines[key]; ok {
|
|
for _, m := range info.Managers {
|
|
if m != nil {
|
|
if err := m.Cleanup(); err != nil {
|
|
logger.Error("Failed to cleanup manager: %v", err)
|
|
}
|
|
}
|
|
}
|
|
delete(engines, key)
|
|
logger.Info("Engine %s stopped due to idle timeout", key)
|
|
}
|
|
})
|
|
}
|
|
|
|
func (ei *EngineInfo) getNextManager() *manager.Manager {
|
|
ei.mu.Lock()
|
|
defer ei.mu.Unlock()
|
|
|
|
if len(ei.Managers) == 0 {
|
|
return nil
|
|
}
|
|
|
|
// Try to find a running manager first
|
|
startIdx := ei.nextIdx
|
|
for i := 0; i < len(ei.Managers); i++ {
|
|
idx := (startIdx + i) % len(ei.Managers)
|
|
m := ei.Managers[idx]
|
|
if m != nil && m.IsRunning() {
|
|
ei.nextIdx = idx + 1 // Start from next one next time
|
|
return m
|
|
}
|
|
}
|
|
|
|
// If no running manager found, just return the next one in round-robin fashion
|
|
// (it will likely fail, but that's expected if all are restarting)
|
|
idx := ei.nextIdx % len(ei.Managers)
|
|
ei.nextIdx++
|
|
return ei.Managers[idx]
|
|
}
|
|
|
|
// Helper to get engine info without creating one if not exists
|
|
func getEngineInfo(fromLang, toLang string) *EngineInfo {
|
|
key := fmt.Sprintf("%s-%s", fromLang, toLang)
|
|
engMu.RLock()
|
|
defer engMu.RUnlock()
|
|
return engines[key]
|
|
}
|
|
|
|
func getOrCreateSingleEngine(fromLang, toLang string) (*manager.Manager, error) {
|
|
key := fmt.Sprintf("%s-%s", fromLang, toLang)
|
|
|
|
engMu.RLock()
|
|
if info, ok := engines[key]; ok && info != nil {
|
|
m := info.getNextManager()
|
|
if m != nil {
|
|
engMu.RUnlock()
|
|
info.resetIdleTimer()
|
|
return m, nil
|
|
}
|
|
}
|
|
engMu.RUnlock()
|
|
|
|
engMu.Lock()
|
|
defer engMu.Unlock()
|
|
|
|
if info, ok := engines[key]; ok && info != nil {
|
|
m := info.getNextManager()
|
|
if m != nil {
|
|
info.resetIdleTimer()
|
|
return m, nil
|
|
}
|
|
}
|
|
|
|
if !canCreateNewWorker() {
|
|
availableMB := getAvailableMemoryMB()
|
|
return nil, fmt.Errorf("%w: available memory %dMB, need at least %dMB",
|
|
ErrInsufficientMemory, availableMB, workerMemoryMB+reservedMemoryMB)
|
|
}
|
|
|
|
logger.Info("Creating new engine pool for %s -> %s", fromLang, toLang)
|
|
|
|
cfg := config.GetConfig()
|
|
if cfg.EnableOfflineMode {
|
|
logger.Info("Offline mode enabled, skipping model download")
|
|
} else {
|
|
logger.Info("Downloading model for %s -> %s", fromLang, toLang)
|
|
if err := models.DownloadModel(toLang, fromLang, ""); err != nil {
|
|
return nil, fmt.Errorf("failed to download model: %w", err)
|
|
}
|
|
}
|
|
|
|
langPairDir := filepath.Join(cfg.ModelDir, fmt.Sprintf("%s_%s", fromLang, toLang))
|
|
if err := os.MkdirAll(langPairDir, 0755); err != nil {
|
|
return nil, fmt.Errorf("failed to create work directory: %w", err)
|
|
}
|
|
|
|
numWorkers := cfg.WorkersPerLanguage
|
|
if numWorkers <= 0 {
|
|
numWorkers = 1
|
|
}
|
|
|
|
managers := make([]*manager.Manager, 0, numWorkers)
|
|
for i := 0; i < numWorkers; i++ {
|
|
port, err := utils.GetFreePort()
|
|
if err != nil {
|
|
for _, m := range managers {
|
|
m.Cleanup()
|
|
}
|
|
return nil, fmt.Errorf("failed to allocate port: %w", err)
|
|
}
|
|
|
|
args := manager.NewWorkerArgs()
|
|
args.Port = port
|
|
args.LogLevel = cfg.LogLevel
|
|
args.WorkDir = langPairDir
|
|
args.ModelDir = langPairDir
|
|
|
|
m := manager.NewManager(args)
|
|
|
|
if err := m.Start(); err != nil {
|
|
for _, m := range managers {
|
|
m.Cleanup()
|
|
}
|
|
return nil, fmt.Errorf("failed to start manager: %w", err)
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
ready := false
|
|
for j := 0; j < 30; j++ {
|
|
var err error
|
|
ready, err = m.Health(ctx)
|
|
logger.Debug("Worker %d health check %d: ready=%v, err=%v", i+1, j+1, ready, err)
|
|
if err == nil && ready {
|
|
break
|
|
}
|
|
time.Sleep(100 * time.Millisecond)
|
|
}
|
|
cancel()
|
|
|
|
if !ready {
|
|
m.Cleanup()
|
|
for _, m := range managers {
|
|
m.Cleanup()
|
|
}
|
|
return nil, fmt.Errorf("worker %d failed to become ready", i+1)
|
|
}
|
|
|
|
managers = append(managers, m)
|
|
logger.Info("Worker %d/%d created for %s -> %s on port %d", i+1, numWorkers, fromLang, toLang, port)
|
|
}
|
|
|
|
info := &EngineInfo{
|
|
Managers: managers,
|
|
LastUsed: time.Now(),
|
|
FromLang: fromLang,
|
|
ToLang: toLang,
|
|
nextIdx: 0,
|
|
}
|
|
info.resetIdleTimer()
|
|
|
|
engines[key] = info
|
|
logger.Info("Engine pool created successfully for %s -> %s with %d workers", fromLang, toLang, numWorkers)
|
|
|
|
return managers[0], nil
|
|
}
|
|
|
|
func needsPivotTranslation(fromLang, toLang string) bool {
|
|
|
|
if fromLang == "en" || toLang == "en" {
|
|
return false
|
|
}
|
|
|
|
if models.GlobalRecords != nil && models.GlobalRecords.HasLanguagePair(fromLang, toLang) {
|
|
return false
|
|
}
|
|
|
|
return true
|
|
}
|
|
|
|
func GetOrCreateEngine(fromLang, toLang string) (*manager.Manager, error) {
|
|
|
|
if !needsPivotTranslation(fromLang, toLang) {
|
|
return getOrCreateSingleEngine(fromLang, toLang)
|
|
}
|
|
|
|
logger.Debug("Translation %s -> %s requires pivot through English", fromLang, toLang)
|
|
return getOrCreateSingleEngine(fromLang, "en")
|
|
}
|
|
|
|
func translateSegment(ctx context.Context, fromLang, toLang, text string, isHTML bool) (string, error) {
|
|
if fromLang == toLang {
|
|
return text, nil
|
|
}
|
|
|
|
if !needsPivotTranslation(fromLang, toLang) {
|
|
return translateSingleLanguageText(ctx, fromLang, toLang, text, isHTML)
|
|
}
|
|
|
|
// Pivot Translation
|
|
|
|
// Step 1: from -> en
|
|
intermediateText, err := translateSingleLanguageText(ctx, fromLang, "en", text, isHTML)
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
|
|
// Step 2: en -> to
|
|
return translateSingleLanguageText(ctx, "en", toLang, intermediateText, isHTML)
|
|
}
|
|
|
|
func TranslateWithPivot(ctx context.Context, fromLang, toLang, text string, isHTML bool) (string, error) {
|
|
logger.Debug("TranslateWithPivot: %s -> %s, text length: %d, isHTML: %v", fromLang, toLang, len(text), isHTML)
|
|
|
|
if fromLang != "auto" && len(text) <= 128 {
|
|
if fromLang == toLang {
|
|
return text, nil
|
|
}
|
|
return translateSegment(ctx, fromLang, toLang, text, isHTML)
|
|
}
|
|
|
|
segments := DetectMultipleLanguages(text)
|
|
if len(segments) <= 1 {
|
|
var effectiveFromLang string
|
|
if len(segments) == 1 {
|
|
effectiveFromLang = segments[0].Language
|
|
} else if fromLang == "auto" {
|
|
detected := DetectLanguage(text)
|
|
if detected == "" {
|
|
return "", fmt.Errorf("failed to detect source language")
|
|
}
|
|
effectiveFromLang = detected
|
|
} else {
|
|
effectiveFromLang = fromLang
|
|
}
|
|
if effectiveFromLang == toLang {
|
|
return text, nil
|
|
}
|
|
return translateSegment(ctx, effectiveFromLang, toLang, text, isHTML)
|
|
}
|
|
|
|
logger.Debug("Detected %d language segments", len(segments))
|
|
var result strings.Builder
|
|
lastEnd := 0
|
|
|
|
for _, seg := range segments {
|
|
if seg.Start > lastEnd {
|
|
result.WriteString(text[lastEnd:seg.Start])
|
|
}
|
|
|
|
if seg.Language == toLang {
|
|
result.WriteString(seg.Text)
|
|
} else {
|
|
translated, err := translateSegment(ctx, seg.Language, toLang, seg.Text, isHTML)
|
|
if err != nil {
|
|
logger.Error("Failed to translate segment: %v", err)
|
|
result.WriteString(seg.Text)
|
|
} else {
|
|
result.WriteString(translated)
|
|
}
|
|
}
|
|
lastEnd = seg.End
|
|
}
|
|
|
|
if lastEnd < len(text) {
|
|
result.WriteString(text[lastEnd:])
|
|
}
|
|
|
|
return result.String(), nil
|
|
}
|
|
|
|
func translateSingleLanguageText(ctx context.Context, fromLang, toLang, text string, isHTML bool) (string, error) {
|
|
// 1. Get initial manager (will ensure pool is created)
|
|
m, err := getOrCreateSingleEngine(fromLang, toLang)
|
|
if err != nil {
|
|
logger.Error("translateSingleLanguageText: failed to get engine: %v", err)
|
|
return "", err
|
|
}
|
|
|
|
// Get engine info to check concurrency/count
|
|
info := getEngineInfo(fromLang, toLang)
|
|
var maxRetries int = 1
|
|
if info != nil {
|
|
maxRetries = len(info.Managers) * 2 // Try twice per manager on average
|
|
if maxRetries < 3 {
|
|
maxRetries = 3
|
|
}
|
|
}
|
|
|
|
var lastErr error
|
|
|
|
for i := 0; i < maxRetries; i++ {
|
|
// If retrying, get a potentially new manager (load balanced)
|
|
if i > 0 && info != nil {
|
|
m = info.getNextManager()
|
|
if m == nil {
|
|
// Should not happen if pool is alive
|
|
return "", fmt.Errorf("no managers available")
|
|
}
|
|
}
|
|
|
|
logger.Debug("translateSingleLanguageText: attempting translation (try %d/%d)", i+1, maxRetries)
|
|
var result string
|
|
if isHTML {
|
|
result, err = m.TranslateHTML(ctx, text)
|
|
} else {
|
|
result, err = m.Translate(ctx, text)
|
|
}
|
|
|
|
if err == nil {
|
|
return result, nil
|
|
}
|
|
|
|
// Check if error is retryable (worker failure)
|
|
if isConnectionError(err) {
|
|
logger.Debug("Translation attempt %d failed (connection error): %v. Retrying with next manager...", i+1, err)
|
|
lastErr = err
|
|
// Backoff: 500ms, 1000ms, 2000ms...
|
|
backoff := time.Duration(500*(1<<i)) * time.Millisecond
|
|
if backoff > 3*time.Second {
|
|
backoff = 3 * time.Second
|
|
}
|
|
time.Sleep(backoff)
|
|
continue
|
|
}
|
|
|
|
// If it's not a connection error (e.g. invalid request), return immediately
|
|
return "", err
|
|
}
|
|
|
|
// If all retries failed, fallback to segmented translation if applicable?
|
|
// The original code did that. Let's preserve it if appropriate.
|
|
if lastErr != nil {
|
|
logger.Warn("All translation attempts failed. Last error: %v. Trying segmented translation.", lastErr)
|
|
segResult, segErr := translateWithSegments(ctx, fromLang, toLang, text, isHTML)
|
|
if segErr != nil {
|
|
return "", lastErr // Return the main error
|
|
}
|
|
return segResult, nil
|
|
}
|
|
|
|
return "", lastErr
|
|
}
|
|
|
|
func CleanupAllEngines() {
|
|
engMu.Lock()
|
|
defer engMu.Unlock()
|
|
|
|
if len(engines) == 0 {
|
|
logger.Debug("No engines to cleanup")
|
|
return
|
|
}
|
|
|
|
logger.Info("Cleaning up %d engine(s)...", len(engines))
|
|
|
|
var wg sync.WaitGroup
|
|
for key, info := range engines {
|
|
wg.Add(1)
|
|
go func(k string, ei *EngineInfo) {
|
|
defer wg.Done()
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
logger.Error("Panic while cleaning up engine %s: %v", k, r)
|
|
}
|
|
}()
|
|
|
|
logger.Debug("Stopping engine: %s", k)
|
|
|
|
ei.mu.Lock()
|
|
if ei.stopTimer != nil {
|
|
ei.stopTimer.Stop()
|
|
}
|
|
ei.mu.Unlock()
|
|
|
|
for _, m := range ei.Managers {
|
|
if m != nil {
|
|
if err := m.Cleanup(); err != nil {
|
|
logger.Error("Failed to cleanup manager: %v", err)
|
|
} else {
|
|
logger.Debug("Manager cleaned up successfully")
|
|
}
|
|
}
|
|
}
|
|
}(key, info)
|
|
}
|
|
|
|
done := make(chan struct{})
|
|
go func() {
|
|
wg.Wait()
|
|
close(done)
|
|
}()
|
|
|
|
select {
|
|
case <-done:
|
|
logger.Info("All engines cleaned up successfully")
|
|
case <-time.After(15 * time.Second):
|
|
logger.Warn("Engine cleanup timeout after 15 seconds")
|
|
}
|
|
|
|
engines = make(map[string]*EngineInfo)
|
|
}
|
|
|
|
func isConnectionError(err error) bool {
|
|
if err == nil {
|
|
return false
|
|
}
|
|
errMsg := err.Error()
|
|
return strings.Contains(errMsg, "module closed") ||
|
|
strings.Contains(errMsg, "exit_code") ||
|
|
strings.Contains(errMsg, "not connected") ||
|
|
strings.Contains(errMsg, "failed to send message") ||
|
|
strings.Contains(errMsg, "failed to read response") ||
|
|
strings.Contains(errMsg, "wasm error") ||
|
|
strings.Contains(errMsg, "invalid table access") ||
|
|
strings.Contains(errMsg, "manager not running") ||
|
|
strings.Contains(errMsg, "worker connection failed")
|
|
}
|
|
|
|
func translateWithSegments(ctx context.Context, fromLang, toLang, text string, isHTML bool) (string, error) {
|
|
segments := DetectMultipleLanguages(text)
|
|
if len(segments) <= 1 {
|
|
return "", fmt.Errorf("segmented translation not applicable")
|
|
}
|
|
|
|
logger.Debug("Attempting segmented translation with %d segments", len(segments))
|
|
var result strings.Builder
|
|
lastEnd := 0
|
|
|
|
for _, seg := range segments {
|
|
if seg.Start > lastEnd {
|
|
result.WriteString(text[lastEnd:seg.Start])
|
|
}
|
|
|
|
if seg.Language == toLang {
|
|
result.WriteString(seg.Text)
|
|
} else {
|
|
segFromLang := seg.Language
|
|
if fromLang != "auto" && fromLang != "" {
|
|
segFromLang = fromLang
|
|
}
|
|
translated, err := translateSegment(ctx, segFromLang, toLang, seg.Text, isHTML)
|
|
if err != nil {
|
|
return "", fmt.Errorf("segmented translation failed: %w", err)
|
|
}
|
|
result.WriteString(translated)
|
|
}
|
|
lastEnd = seg.End
|
|
}
|
|
|
|
if lastEnd < len(text) {
|
|
result.WriteString(text[lastEnd:])
|
|
}
|
|
|
|
return result.String(), nil
|
|
}
|