Files
CLIProxyAPI/internal/pluginhost/host_model_stream_callbacks.go
sususu98 9f940f162f fix(pluginhost): keep stream callbacks alive until stream close
Keep RPC streaming executor callback scopes alive until async streams close, detach nested host.model.execute_stream contexts from request cancellation, and clean up the stream bridge on stream completion.
2026-06-16 17:31:11 +08:00

88 lines
2.9 KiB
Go

package pluginhost
import (
"context"
"encoding/json"
"fmt"
"github.com/router-for-me/CLIProxyAPI/v7/sdk/pluginapi"
)
func (h *Host) callHostModelExecuteStream(ctx context.Context, request []byte) ([]byte, error) {
var req rpcHostModelExecutionRequest
if errUnmarshal := json.Unmarshal(request, &req); errUnmarshal != nil {
return nil, fmt.Errorf("decode host model execution stream request: %w", errUnmarshal)
}
if !req.Stream {
return nil, fmt.Errorf("host.model.execute_stream requires stream=true")
}
executor := h.currentModelExecutor()
if executor == nil {
return nil, fmt.Errorf("host model executor is unavailable")
}
skipPluginID := h.callbackCallerPluginID(ctx, req.HostCallbackID)
callbackCtx := h.resolveCallbackContext(req.HostCallbackID, ctx)
if callbackCtx == nil {
callbackCtx = context.Background()
}
// Detach request cancellation while preserving callback values; callback cleanup owns the model stream lifetime.
streamCtx, cancel := context.WithCancel(context.WithoutCancel(callbackCtx))
stream, errMsg := executor.ExecuteModelStream(streamCtx, modelExecutionRequestFromPlugin(req.HostModelExecutionRequest, skipPluginID))
if errMsg != nil {
cancel()
return nil, modelExecutionError(errMsg)
}
streamID := ""
if h.modelStreams != nil {
streamID = h.modelStreams.open(req.HostCallbackID, stream.Chunks, cancel)
}
if streamID == "" {
cancel()
return nil, fmt.Errorf("host model stream bridge is unavailable")
}
if req.HostCallbackID != "" {
h.addCallbackCleanup(req.HostCallbackID, func() {
h.modelStreams.close(streamID)
})
}
return marshalRPCResult(pluginapi.HostModelStreamResponse{
StatusCode: stream.StatusCode,
Headers: cloneHeader(stream.Headers),
StreamID: streamID,
})
}
func (h *Host) callHostModelStreamRead(ctx context.Context, request []byte) ([]byte, error) {
var req pluginapi.HostModelStreamReadRequest
if errUnmarshal := json.Unmarshal(request, &req); errUnmarshal != nil {
return nil, fmt.Errorf("decode host model stream read request: %w", errUnmarshal)
}
if h == nil || h.modelStreams == nil {
return nil, fmt.Errorf("host model stream bridge is unavailable")
}
chunk, done, errRead := h.modelStreams.read(ctx, req.StreamID)
if errRead != nil {
return nil, errRead
}
resp := pluginapi.HostModelStreamReadResponse{
Payload: append([]byte(nil), chunk.Payload...),
Done: done,
}
if chunk.Err != nil {
resp.Error = chunk.Err.Error()
resp.Done = true
}
return marshalRPCResult(resp)
}
func (h *Host) callHostModelStreamClose(request []byte) ([]byte, error) {
var req pluginapi.HostModelStreamCloseRequest
if errUnmarshal := json.Unmarshal(request, &req); errUnmarshal != nil {
return nil, fmt.Errorf("decode host model stream close request: %w", errUnmarshal)
}
if h != nil && h.modelStreams != nil {
h.modelStreams.close(req.StreamID)
}
return marshalRPCResult(rpcEmptyResponse{})
}