Skip to content

vllm.entrypoints.serve.rpc.api_router

logger module-attribute

logger = init_logger(__name__)

router module-attribute

router = APIRouter()

attach_router

attach_router(app: FastAPI)
Source code in vllm/entrypoints/serve/rpc/api_router.py
def attach_router(app: FastAPI):
    if not envs.VLLM_SERVER_DEV_MODE:
        return
    app.include_router(router)

collective_rpc async

collective_rpc(raw_request: Request)
Source code in vllm/entrypoints/serve/rpc/api_router.py
@router.post("/collective_rpc")
async def collective_rpc(raw_request: Request):
    try:
        body = await raw_request.json()
    except json.JSONDecodeError as e:
        raise HTTPException(
            status_code=HTTPStatus.BAD_REQUEST.value,
            detail=f"JSON decode error: {e}",
        ) from e
    method = body.get("method")
    if method is None:
        raise HTTPException(
            status_code=HTTPStatus.BAD_REQUEST.value,
            detail="Missing 'method' in request body",
        )
    # For security reason, only serialized string args/kwargs are passed.
    # User-defined `method` is responsible for deserialization if needed.
    args: list[str] = body.get("args", [])
    kwargs: dict[str, str] = body.get("kwargs", {})
    timeout: float | None = body.get("timeout")
    results = await engine_client(raw_request).collective_rpc(
        method=method, timeout=timeout, args=tuple(args), kwargs=kwargs
    )
    if results is None:
        return Response(status_code=200)
    response: list[Any] = []
    for result in results:
        if result is None or isinstance(result, dict | list):
            response.append(result)
        else:
            response.append(str(result))
    return JSONResponse(content={"results": response})

engine_client

engine_client(request: Request) -> EngineClient
Source code in vllm/entrypoints/serve/rpc/api_router.py
def engine_client(request: Request) -> EngineClient:
    return request.app.state.engine_client