mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-06-12 01:02:41 +08:00
- Removed `examples/plugin/main.go` and `internal/pluginhost/loader_plugin.go` after migrating to a more modular system. - Introduced `streamBridge` in `internal/pluginhost/stream_bridge.go` for efficient stream handling and communication. - Added examples of `thinking` plugins written in both Rust and Go under `examples/plugin/thinking`. - Enhanced test coverage for plugin host system changes, including stream chunk translation and thinking logic. - Improved API compatibility and ensured backward-compatible upgrades for plugin execution.
94 lines
2.0 KiB
Go
94 lines
2.0 KiB
Go
package pluginhost
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"strconv"
|
|
"sync"
|
|
"sync/atomic"
|
|
|
|
"github.com/router-for-me/CLIProxyAPI/v7/sdk/pluginapi"
|
|
)
|
|
|
|
type streamBridge struct {
|
|
next atomic.Uint64
|
|
mu sync.Mutex
|
|
streams map[string]chan pluginapi.ExecutorStreamChunk
|
|
}
|
|
|
|
type rpcStreamEmitRequest struct {
|
|
StreamID string `json:"stream_id"`
|
|
Payload []byte `json:"payload,omitempty"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
type rpcStreamCloseRequest struct {
|
|
StreamID string `json:"stream_id"`
|
|
Error string `json:"error,omitempty"`
|
|
}
|
|
|
|
func newStreamBridge() *streamBridge {
|
|
return &streamBridge{streams: make(map[string]chan pluginapi.ExecutorStreamChunk)}
|
|
}
|
|
|
|
func (b *streamBridge) open(ctx context.Context) (string, <-chan pluginapi.ExecutorStreamChunk, func()) {
|
|
if b == nil {
|
|
chunks := make(chan pluginapi.ExecutorStreamChunk)
|
|
close(chunks)
|
|
return "", chunks, func() {}
|
|
}
|
|
id := strconv.FormatUint(b.next.Add(1), 10)
|
|
chunks := make(chan pluginapi.ExecutorStreamChunk, 16)
|
|
b.mu.Lock()
|
|
b.streams[id] = chunks
|
|
b.mu.Unlock()
|
|
cleanup := func() {
|
|
b.close(id, "")
|
|
}
|
|
if ctx != nil && ctx.Done() != nil {
|
|
go func() {
|
|
<-ctx.Done()
|
|
b.close(id, ctx.Err().Error())
|
|
}()
|
|
}
|
|
return id, chunks, cleanup
|
|
}
|
|
|
|
func (b *streamBridge) emit(ctx context.Context, id string, chunk pluginapi.ExecutorStreamChunk) error {
|
|
if b == nil || id == "" {
|
|
return fmt.Errorf("stream id is required")
|
|
}
|
|
b.mu.Lock()
|
|
chunks := b.streams[id]
|
|
b.mu.Unlock()
|
|
if chunks == nil {
|
|
return fmt.Errorf("stream %s is not open", id)
|
|
}
|
|
if ctx == nil {
|
|
ctx = context.Background()
|
|
}
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case chunks <- chunk:
|
|
return nil
|
|
}
|
|
}
|
|
|
|
func (b *streamBridge) close(id string, errorMessage string) {
|
|
if b == nil || id == "" {
|
|
return
|
|
}
|
|
b.mu.Lock()
|
|
chunks := b.streams[id]
|
|
delete(b.streams, id)
|
|
b.mu.Unlock()
|
|
if chunks == nil {
|
|
return
|
|
}
|
|
if errorMessage != "" {
|
|
chunks <- pluginapi.ExecutorStreamChunk{Err: fmt.Errorf("%s", errorMessage)}
|
|
}
|
|
close(chunks)
|
|
}
|