Files
CLIProxyAPI/internal/pluginhost/rpc_client_stream.go
sususu98 9f940f162f fix(pluginhost): keep stream callbacks alive until stream close
Keep RPC streaming executor callback scopes alive until async streams close, detach nested host.model.execute_stream contexts from request cancellation, and clean up the stream bridge on stream completion.
2026-06-16 17:31:11 +08:00

81 lines
2.1 KiB
Go

package pluginhost
import (
"context"
"fmt"
"sync"
"github.com/router-for-me/CLIProxyAPI/v7/sdk/pluginabi"
"github.com/router-for-me/CLIProxyAPI/v7/sdk/pluginapi"
)
func (a *rpcPluginAdapter) ExecuteStream(ctx context.Context, req pluginapi.ExecutorRequest) (pluginapi.ExecutorStreamResponse, error) {
if a == nil || a.host == nil || a.host.streams == nil {
return pluginapi.ExecutorStreamResponse{}, fmt.Errorf("plugin stream bridge is unavailable")
}
streamID, chunks, cleanupStream := a.host.streams.open(ctx)
callbackID, closeCallback := a.openHostCallbackContext(ctx)
cleanup := combinedCleanup(cleanupStream, closeCallback)
rpcReq := rpcExecutorRequest{
ExecutorRequest: req,
StreamID: streamID,
HostCallbackID: callbackID,
}
resp, errCall := callPlugin[rpcExecutorStreamResponse](ctx, a.client, pluginabi.MethodExecutorExecuteStream, rpcReq)
if errCall != nil {
cleanup()
return pluginapi.ExecutorStreamResponse{}, errCall
}
if len(resp.Chunks) > 0 {
cleanup()
out := make(chan pluginapi.ExecutorStreamChunk, len(resp.Chunks))
for _, chunk := range resp.Chunks {
out <- chunk
}
close(out)
return pluginapi.ExecutorStreamResponse{Headers: resp.Headers, Chunks: out}, nil
}
// Async streaming plugins can return before they finish emitting chunks, so keep callbacks alive until the stream ends.
return pluginapi.ExecutorStreamResponse{
Headers: resp.Headers,
Chunks: cleanupWhenStreamDone(ctx, chunks, cleanup),
}, nil
}
func combinedCleanup(cleanups ...func()) func() {
var once sync.Once
return func() {
once.Do(func() {
for _, cleanup := range cleanups {
if cleanup != nil {
cleanup()
}
}
})
}
}
func cleanupWhenStreamDone(ctx context.Context, chunks <-chan pluginapi.ExecutorStreamChunk, cleanup func()) <-chan pluginapi.ExecutorStreamChunk {
out := make(chan pluginapi.ExecutorStreamChunk)
go func() {
defer func() {
if cleanup != nil {
cleanup()
}
close(out)
}()
var done <-chan struct{}
if ctx != nil {
done = ctx.Done()
}
for chunk := range chunks {
select {
case out <- chunk:
case <-done:
return
}
}
}()
return out
}