跳到內容

IO Processor 外掛

IO Processor 外掛是一項功能,允許對池化模型的模型輸入和輸出進行預處理和後處理。其理念是允許使用者向 vLLM 傳遞自定義輸入,該輸入會被轉換為一個或多個模型提示,並饋送給模型的 encode 方法。這類外掛的一個潛在用例是使用 vLLM 生成多模態資料。例如,使用者向 vLLM 輸入一張圖片,並獲得一張圖片作為輸出。

在執行帶有 IO Processor 外掛的推理時,提示型別由外掛定義,最終請求的輸出也同樣如此。vLLM 不會執行任何輸入/輸出資料驗證,由外掛負責確保正確的資料被饋送給模型並返回給使用者。目前,這些外掛僅支援池化模型,可以透過 LLMAsyncLLM 中的 encode 方法,或在線上服務模式下透過 /pooling 端點觸發。

編寫 IO Processor 外掛

IO Processor 外掛實現了 IOProcessor 介面。

IOProcessorInput = TypeVar("IOProcessorInput")
IOProcessorOutput = TypeVar("IOProcessorOutput")

class IOProcessor(ABC, Generic[IOProcessorInput, IOProcessorOutput]):
    def __init__(self, vllm_config: VllmConfig):
        self.vllm_config = vllm_config

    @abstractmethod
    def pre_process(
        self,
        prompt: IOProcessorInput,
        request_id: str | None = None,
        **kwargs,
    ) -> PromptType | Sequence[PromptType]:
        raise NotImplementedError

    async def pre_process_async(
        self,
        prompt: IOProcessorInput,
        request_id: str | None = None,
        **kwargs,
    ) -> PromptType | Sequence[PromptType]:
        return self.pre_process(prompt, request_id, **kwargs)

    @abstractmethod
    def post_process(
        self,
        model_output: Sequence[PoolingRequestOutput],
        request_id: str | None = None,
        **kwargs,
    ) -> IOProcessorOutput:
        raise NotImplementedError

    async def post_process_async(
        self,
        model_output: AsyncGenerator[tuple[int, PoolingRequestOutput]],
        request_id: str | None = None,
        **kwargs,
    ) -> IOProcessorOutput:
        # We cannot guarantee outputs are returned in the same order they were
        # fed to vLLM.
        # Let's sort them by id before post_processing
        sorted_output = sorted(
            [(i, item) async for i, item in model_output], key=lambda output: output[0]
        )
        collected_output = [output[1] for output in sorted_output]
        return self.post_process(collected_output, request_id, **kwargs)

    @abstractmethod
    def parse_request(self, request: Any) -> IOProcessorInput:
        raise NotImplementedError

    def validate_or_generate_params(
        self, params: SamplingParams | PoolingParams | None = None
    ) -> SamplingParams | PoolingParams:
        return params or PoolingParams()

    @abstractmethod
    def output_to_response(
        self, plugin_output: IOProcessorOutput
    ) -> IOProcessorResponse:
        raise NotImplementedError

parse_request 方法用於驗證使用者提示並將其轉換為 pre_process/pre_process_async 方法所期望的輸入。pre_process* 方法接收經過驗證的外掛輸入,為常規推理生成 vLLM 的模型提示。post_process* 方法接收 PoolingRequestOutput 物件作為輸入,並生成自定義外掛輸出。validate_or_generate_params 方法用於透過外掛驗證使用者請求中收到的任何 SamplingParameters/PoolingParameters,或在未指定時生成新的引數。該函式始終返回已驗證/生成的引數。output_to_response 方法僅用於線上服務,並將外掛輸出轉換為 IOProcessorResponse 型別,然後由 API 伺服器返回。/pooling 服務端點的實現可以在這裡找到: vllm/entrypoints/openai/serving_pooling.py.

一個啟用 PrithviGeospatialMAE 模型生成 geotiff 影像的外掛示例實現可以在 這裡找到。請同時參考我們的線上( examples/pooling/plugin/prithvi_geospatial_mae_client.py)和離線( examples/pooling/plugin/prithvi_geospatial_mae_io_processor.py)推理示例。

使用 IO Processor 外掛

IO Processor 外掛在引擎啟動時載入,有兩種方法可以指定要載入的外掛名稱:

  1. 透過 vLLM 的 EngineArgs:在用於初始化 AsyncLLMEngineArgs 中設定 io_processor_plugin 引數。在離線模式下,透過將 io_processor_plugin 引數傳遞給 LLM 也可以達到相同的效果,或者在服務模式下透過傳遞 --io-processor-plugin 引數。
  2. 透過模型的 HF 配置:將 io_processor_plugin 欄位新增到模型配置 (config.json) 中。

順序也決定了方法的優先順序。即,透過 EngineArgs 設定外掛名稱將覆蓋在模型 HF 配置 (config.json) 中指定的任何外掛名稱。