mirror of
https://github.com/router-for-me/CLIProxyAPI.git
synced 2026-06-23 00:35:09 +08:00
Keep RPC streaming executor callback scopes alive until async streams close, detach nested host.model.execute_stream contexts from request cancellation, and clean up the stream bridge on stream completion.
81 lines
2.1 KiB
Go
81 lines
2.1 KiB
Go
package pluginhost
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/router-for-me/CLIProxyAPI/v7/sdk/pluginabi"
|
|
"github.com/router-for-me/CLIProxyAPI/v7/sdk/pluginapi"
|
|
)
|
|
|
|
func (a *rpcPluginAdapter) ExecuteStream(ctx context.Context, req pluginapi.ExecutorRequest) (pluginapi.ExecutorStreamResponse, error) {
|
|
if a == nil || a.host == nil || a.host.streams == nil {
|
|
return pluginapi.ExecutorStreamResponse{}, fmt.Errorf("plugin stream bridge is unavailable")
|
|
}
|
|
streamID, chunks, cleanupStream := a.host.streams.open(ctx)
|
|
callbackID, closeCallback := a.openHostCallbackContext(ctx)
|
|
cleanup := combinedCleanup(cleanupStream, closeCallback)
|
|
rpcReq := rpcExecutorRequest{
|
|
ExecutorRequest: req,
|
|
StreamID: streamID,
|
|
HostCallbackID: callbackID,
|
|
}
|
|
resp, errCall := callPlugin[rpcExecutorStreamResponse](ctx, a.client, pluginabi.MethodExecutorExecuteStream, rpcReq)
|
|
if errCall != nil {
|
|
cleanup()
|
|
return pluginapi.ExecutorStreamResponse{}, errCall
|
|
}
|
|
if len(resp.Chunks) > 0 {
|
|
cleanup()
|
|
out := make(chan pluginapi.ExecutorStreamChunk, len(resp.Chunks))
|
|
for _, chunk := range resp.Chunks {
|
|
out <- chunk
|
|
}
|
|
close(out)
|
|
return pluginapi.ExecutorStreamResponse{Headers: resp.Headers, Chunks: out}, nil
|
|
}
|
|
// Async streaming plugins can return before they finish emitting chunks, so keep callbacks alive until the stream ends.
|
|
return pluginapi.ExecutorStreamResponse{
|
|
Headers: resp.Headers,
|
|
Chunks: cleanupWhenStreamDone(ctx, chunks, cleanup),
|
|
}, nil
|
|
}
|
|
|
|
func combinedCleanup(cleanups ...func()) func() {
|
|
var once sync.Once
|
|
return func() {
|
|
once.Do(func() {
|
|
for _, cleanup := range cleanups {
|
|
if cleanup != nil {
|
|
cleanup()
|
|
}
|
|
}
|
|
})
|
|
}
|
|
}
|
|
|
|
func cleanupWhenStreamDone(ctx context.Context, chunks <-chan pluginapi.ExecutorStreamChunk, cleanup func()) <-chan pluginapi.ExecutorStreamChunk {
|
|
out := make(chan pluginapi.ExecutorStreamChunk)
|
|
go func() {
|
|
defer func() {
|
|
if cleanup != nil {
|
|
cleanup()
|
|
}
|
|
close(out)
|
|
}()
|
|
var done <-chan struct{}
|
|
if ctx != nil {
|
|
done = ctx.Done()
|
|
}
|
|
for chunk := range chunks {
|
|
select {
|
|
case out <- chunk:
|
|
case <-done:
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return out
|
|
}
|