mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-06-23 07:24:45 +08:00
- Added an example plugin `host-model-callback` in Go to summarize host model callbacks. - Implemented `cliproxy_plugin_init`, `cliproxyPluginCall`, and other plugin functions for callback handling. - Introduced API handlers for `ModelExecution` and `ModelExecutionStream` with support for both streaming and non-streaming requests. - Included unit tests (`model_execution_test.go`) to validate execution logic and streaming responses.
92 lines
2.0 KiB
Go
92 lines
2.0 KiB
Go
package pluginhost
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/router-for-me/CLIProxyAPI/v7/sdk/api/handlers"
|
|
)
|
|
|
|
type modelStreamBridge struct {
|
|
next atomic.Uint64
|
|
mu sync.Mutex
|
|
streams map[string]modelStreamEntry
|
|
}
|
|
|
|
type modelStreamEntry struct {
|
|
ownerCallbackID string
|
|
chunks <-chan handlers.ModelExecutionChunk
|
|
cancel context.CancelFunc
|
|
}
|
|
|
|
func newModelStreamBridge() *modelStreamBridge {
|
|
return &modelStreamBridge{streams: make(map[string]modelStreamEntry)}
|
|
}
|
|
|
|
func (b *modelStreamBridge) open(ownerCallbackID string, chunks <-chan handlers.ModelExecutionChunk, cancel context.CancelFunc) string {
|
|
if b == nil || chunks == nil {
|
|
if cancel != nil {
|
|
cancel()
|
|
}
|
|
return ""
|
|
}
|
|
id := strconv.FormatUint(b.next.Add(1), 10)
|
|
b.mu.Lock()
|
|
b.streams[id] = modelStreamEntry{
|
|
ownerCallbackID: ownerCallbackID,
|
|
chunks: chunks,
|
|
cancel: cancel,
|
|
}
|
|
b.mu.Unlock()
|
|
return id
|
|
}
|
|
|
|
func (b *modelStreamBridge) read(ctx context.Context, id string) (handlers.ModelExecutionChunk, bool, error) {
|
|
if b == nil {
|
|
return handlers.ModelExecutionChunk{}, true, fmt.Errorf("model stream bridge is unavailable")
|
|
}
|
|
if id == "" {
|
|
return handlers.ModelExecutionChunk{}, true, fmt.Errorf("model stream id is required")
|
|
}
|
|
b.mu.Lock()
|
|
entry, ok := b.streams[id]
|
|
b.mu.Unlock()
|
|
if !ok || entry.chunks == nil {
|
|
return handlers.ModelExecutionChunk{}, true, nil
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
b.close(id)
|
|
return handlers.ModelExecutionChunk{}, true, ctx.Err()
|
|
case chunk, okRead := <-entry.chunks:
|
|
if !okRead {
|
|
b.close(id)
|
|
return handlers.ModelExecutionChunk{}, true, nil
|
|
}
|
|
if chunk.Err != nil {
|
|
b.close(id)
|
|
return chunk, true, nil
|
|
}
|
|
return chunk, false, nil
|
|
}
|
|
}
|
|
|
|
func (b *modelStreamBridge) close(id string) {
|
|
if b == nil || id == "" {
|
|
return
|
|
}
|
|
b.mu.Lock()
|
|
entry := b.streams[id]
|
|
delete(b.streams, id)
|
|
b.mu.Unlock()
|
|
if entry.cancel != nil {
|
|
entry.cancel()
|
|
}
|
|
}
|