跳到主要内容

🚰 Pipe Function:创建自定义“Agent/模型”

⚠️ 关键安全警告

Pipe 函数会在你的服务器上执行任意 Python 代码。 函数创建仅限管理员。请只从可信来源安装,并在导入前审查代码。恶意函数可能访问文件系统、窃取数据,甚至危及整套系统。完整说明请参阅 插件安全警告

欢迎阅读这份关于在 Open WebUI 中创建 Pipe 的指南!你可以把 Pipe 理解为一种向 Open WebUI 添加新模型的方式。本文会拆解 Pipe 是什么、它如何工作,以及如何创建你自己的 Pipe,为 Open WebUI 模型加入自定义逻辑与处理流程。我们会用清晰的比喻逐步说明,尽量把每个细节讲透。

为了未来兼容性,请使用异步函数

Pipe 函数通常应定义为 async,以确保与未来版本的 Open WebUI 兼容。后端正在逐步转向完全异步执行,同步函数在未来版本中可能阻塞执行或引发问题。拿不准时,就把你的 pipe 函数写成异步。

Pipe 简介

你可以把 Open WebUI 想象成一个管道系统,数据在管道和阀门之间流动。在这个比喻里:

  • Pipes 就像 插件,它们为数据流动引入新的路径,让你注入自定义逻辑与处理。
  • Valves 是 Pipe 中可配置的部分,用来控制数据如何通过。

创建一个 Pipe,本质上就是在 Open WebUI 框架内打造一个具备你所需特定行为的自定义模型。


理解 Pipe 结构

先从一个最基础、最简化的 Pipe 版本开始,看看它的结构:

from pydantic import BaseModel, Field

class Pipe:
    class Valves(BaseModel):
        MODEL_ID: str = Field(default="")

    def __init__(self):
        self.valves = self.Valves()

    async def pipe(self, body: dict):
        # Logic goes here
        print(self.valves, body)  # This will print the configuration options and the input body
        return "Hello, World!"

Pipe 类

  • 定义Pipe 类是你编写自定义逻辑的地方。
  • 用途:它是你的插件蓝图,决定它在 Open WebUI 中的行为方式。

Valves:配置你的 Pipe

  • 定义ValvesPipe 中嵌套的类,继承自 BaseModel
  • 用途:它保存你的 Pipe 在使用过程中持续生效的配置项(参数)。
  • 示例:上面的代码中,MODEL_ID 就是一个默认值为空字符串的配置项。

比喻:你可以把 Valves 想成真实管道系统上的旋钮,用来控制水流。在你的 Pipe 中,Valves 允许用户调整会影响数据流动和处理方式的设置。

__init__ 方法

  • 定义Pipe 类的构造函数。
  • 用途:初始化 Pipe 的状态,并设置所需组件。
  • 最佳实践:保持简单,主要在这里初始化 self.valves
def __init__(self):
    self.valves = self.Valves()

pipe 函数

  • 定义:承载你的自定义逻辑的核心函数。
  • 参数
    • body:包含输入数据的字典。
  • 用途:用你的自定义逻辑处理输入数据并返回结果。
async def pipe(self, body: dict):
    # Logic goes here
    print(self.valves, body)  # This will print the configuration options and the input body
    return "Hello, World!"

注意:请始终将 Valves 放在 Pipe 类顶部,然后是 __init__,最后是 pipe 函数。这样的结构更清晰,也更一致。

使用 Pipes 创建多个模型

如果你希望你的 Pipe 在 Open WebUI 中创建多个模型,可以在 Pipe 类中定义一个 pipes 函数或变量。这个结构(非正式地称为 manifold)可以让你的 Pipe 表示多个模型。

Here's how you can do it:

from pydantic import BaseModel, Field

class Pipe:
    class Valves(BaseModel):
        MODEL_ID: str = Field(default="")

    def __init__(self):
        self.valves = self.Valves()

    def pipes(self):
        return [
            {"id": "model_id_1", "name": "model_1"},
            {"id": "model_id_2", "name": "model_2"},
            {"id": "model_id_3", "name": "model_3"},
        ]

    async def pipe(self, body: dict):
        # Logic goes here
        print(self.valves, body)  # Prints the configuration options and the input body
        model = body.get("model", "")
        return f"{model}: Hello, World!"

说明

pipes 函数

  • 返回一个字典列表。
  • 每个字典代表一个模型,包含唯一的 idname 键。
  • 这些模型会分别显示在 Open WebUI 的模型选择器中。

更新后的 pipe 函数

  • 根据所选模型处理输入。
  • 在这个示例中,它会把模型名包含在返回字符串中。

示例:OpenAI 代理 Pipe

下面我们来看一个实用示例:创建一个会将请求代理到 OpenAI API 的 Pipe。这个 Pipe 会从 OpenAI 获取可用模型,并允许用户通过 Open WebUI 与这些模型交互。

from pydantic import BaseModel, Field
import httpx


class Pipe:
    class Valves(BaseModel):
        NAME_PREFIX: str = Field(
            default="OPENAI/",
            description="Prefix to be added before model names.",
        )
        OPENAI_API_BASE_URL: str = Field(
            default="https://api.openai.com/v1",
            description="Base URL for accessing OpenAI API endpoints.",
        )
        OPENAI_API_KEY: str = Field(
            default="",
            description="API key for authenticating requests to the OpenAI API.",
        )

    def __init__(self):
        self.valves = self.Valves()

    async def pipes(self):
        if not self.valves.OPENAI_API_KEY:
            return [{"id": "error", "name": "API Key not provided."}]

        headers = {
            "Authorization": f"Bearer {self.valves.OPENAI_API_KEY}",
            "Content-Type": "application/json",
        }

        try:
            async with httpx.AsyncClient() as client:
                r = await client.get(
                    f"{self.valves.OPENAI_API_BASE_URL}/models", headers=headers
                )
                r.raise_for_status()
                models = r.json()

            return [
                {
                    "id": model["id"],
                    "name": f'{self.valves.NAME_PREFIX}{model.get("name", model["id"])}',
                }
                for model in models["data"]
                if "gpt" in model["id"]
            ]
        except Exception:
            return [
                {
                    "id": "error",
                    "name": "Error fetching models. Please check your API Key.",
                }
            ]

    async def pipe(self, body: dict, __user__: dict):
        print(f"pipe:{__name__}")
        headers = {
            "Authorization": f"Bearer {self.valves.OPENAI_API_KEY}",
            "Content-Type": "application/json",
        }

        # Extract model id from the model name
        model_id = body["model"][body["model"].find(".") + 1 :]

        # Update the model id in the body
        payload = {**body, "model": model_id}
        url = f"{self.valves.OPENAI_API_BASE_URL}/chat/completions"

        try:
            if body.get("stream", False):
                async def event_stream():
                    async with httpx.AsyncClient(timeout=None) as client:
                        async with client.stream(
                            "POST", url, json=payload, headers=headers
                        ) as r:
                            r.raise_for_status()
                            async for line in r.aiter_lines():
                                yield line

                return event_stream()

            async with httpx.AsyncClient(timeout=None) as client:
                r = await client.post(url, json=payload, headers=headers)
                r.raise_for_status()
                return r.json()
        except Exception as e:
            return f"Error: {e}"
使用异步 HTTP 客户端

这个示例使用 httpx.AsyncClient 而不是 requests,因为 pipes()pipe() 都运行在 Open WebUI 的异步事件循环中。直接在 async def 方法里调用同步的 requests 会在整个 HTTP 请求期间阻塞事件循环(对于流式请求,甚至会阻塞整条流),从而影响实例上的其他并发请求。httpx 原生支持异步,项目里也已经依赖它,并且能直接替代常见写法。

如果你必须在异步处理器中使用同步第三方库,请用 await anyio.to_thread.run_sync(...) 包装阻塞调用,让它在工作线程中运行,而不是阻塞事件循环。

详细拆解

Valves 配置

  • NAME_PREFIX
    • 为 Open WebUI 中显示的模型名称添加前缀。
    • 默认值:"OPENAI/"
  • OPENAI_API_BASE_URL
    • 指定 OpenAI API 的基础 URL。
    • 默认值:"https://api.openai.com/v1"
  • OPENAI_API_KEY
    • 用于认证的 OpenAI API Key。
    • 默认值:""(空字符串;必须提供)。

pipes 函数

  • 用途:获取可用的 OpenAI 模型,并让它们在 Open WebUI 中可用。

流程

  1. 检查 API Key:确保已提供 API Key。
  2. 获取模型:向 OpenAI API 发起 GET 请求以检索可用模型。
  3. 过滤模型:只返回 id 中包含 "gpt" 的模型。
  4. 错误处理:如果出错,则返回错误消息。

返回格式:一个字典列表,每个模型都包含 idname

pipe 函数

  • 用途:处理对所选 OpenAI 模型的请求并返回响应。

  • 参数

    • body:包含请求数据。
    • __user__:包含用户信息(本示例中未使用,但可用于认证或日志记录)。
  • 流程

    1. 准备请求头:使用 API Key 和内容类型设置请求头。
    2. 提取模型 ID:从所选模型名称中提取实际的模型 ID。
    3. 准备负载:将正确的模型 ID 更新到 body 中。
    4. 发起 API 请求:通过 httpx.AsyncClient 向 OpenAI API 的 chat completions 端点发送 POST 请求。
    5. 处理流式输出:如果 streamTrue,则返回一个异步生成器,从上游响应中逐行输出 SSE。
    6. 错误处理:捕获异常并返回错误消息。

扩展代理 Pipe

你可以通过调整 pipespipe 函数中的 API 端点、请求头与逻辑,把这个代理 Pipe 扩展到 Anthropic、Perplexity 等其他服务提供商。

要构建自包含 agent?不要发出 delta.tool_calls

如果你的 Pipe 包装的是一个 agent(LangChain、LlamaIndex、自定义 planner 等),并且它会在内部执行工具,然后再把最终答案流回聊天,那么在流中发出 delta.tool_calls 会触发 Open WebUI 的工具执行重试循环——中间件会把 delta.tool_calls 视为“请客户端帮我执行这个”,然后反复回到你的 pipe,导致响应最多重复 CHAT_RESPONSE_MAX_TOOL_CALL_ITERATIONS(默认 256;v0.9.6 之前为 CHAT_RESPONSE_MAX_TOOL_CALL_RETRIES,默认 30)次。

对于自包含 agent,请改用 <details type="tool_calls"> 内容块来表示工具执行——这和 Open WebUI 在内部工具执行后自己发出的内容格式一致。关于完整模式、LangChain 示例以及该怎么选路径,请参见 Pipes → Self-contained agents and delta.tool_calls 小节。


使用 Open WebUI 内部函数

有时你可能希望在 Pipe 中利用 Open WebUI 的内部函数。你可以直接从 open_webui 包中导入这些函数。需要注意的是,虽然这种情况不太常见,但内部函数可能会为了优化而发生变化,因此请始终参考最新文档。

下面示例展示了如何使用 Open WebUI 的内部函数:

from pydantic import BaseModel, Field
from fastapi import Request

from open_webui.models.users import Users
from open_webui.utils.chat import generate_chat_completion

class Pipe:
    def __init__(self):
        pass

    async def pipe(
        self,
        body: dict,
        __user__: dict,
        __request__: Request,
    ) -> str:
        # Use the unified endpoint with the updated signature
        user = await Users.get_user_by_id(__user__["id"])
        body["model"] = "llama3.2:latest"
        return await generate_chat_completion(__request__, body, user)

说明

  • 导入

    • open_webui.models.users 中的 Users:用于获取用户信息。
    • open_webui.utils.chat 中的 generate_chat_completion:用于通过内部逻辑生成聊天回复。
  • 异步 pipe 函数

    • 参数
      • body:模型输入数据。
      • __user__:包含用户信息的字典。
      • __request__:FastAPI 的 request 对象(generate_chat_completion 需要它)。
    • 流程
      1. 获取用户对象:根据用户 ID 检索用户对象。
      2. 设置模型:指定要使用的模型。
      3. 生成回复:调用 generate_chat_completion 处理输入并生成输出。

重要说明

  • 函数签名:请参考最新的 Open WebUI 代码库或文档,以获取最准确的函数签名与参数。
  • 最佳实践:始终妥善处理异常和错误,以确保流畅的用户体验。

常见问题

Q1:为什么我应该在 Open WebUI 中使用 Pipes?

A:Pipes 允许你为 Open WebUI 添加带有自定义逻辑和处理流程的新“模型”。它是一个灵活的插件系统,可以让你集成外部 API、定制模型行为,并在不改动核心代码库的情况下创造新功能。


Q2:什么是 Valves?为什么它们很重要?

A:Valves 是 Pipe 的可配置参数。它们像设置或控制项一样,决定 Pipe 的运行方式。通过调整 Valves,你可以在不修改底层代码的情况下改变 Pipe 的行为。


Q3:我可以不使用 Valves 创建 Pipe 吗?

A:可以。如果你的 Pipe 不需要持久化配置选项,你可以不定义 Valves 类,直接创建一个简单的 Pipe。不过,保留 Valves 仍然是一个好习惯,因为它更灵活,也更利于未来扩展。


Q4:使用 API key 时,如何确保我的 Pipe 安全?

A:不要把 API key 之类的敏感信息硬编码到 Pipe 里。请使用 Valves 来输入和存储 API key,并确保代码正确处理这些密钥,避免记录日志或泄露它们。


Q5:pipepipes 函数有什么区别?

A

  • pipe 函数:处理输入数据并生成输出的主要函数,负责单个模型的逻辑。

  • pipes 函数:通过返回一组模型定义,让你的 Pipe 代表多个模型。每个模型都会单独显示在 Open WebUI 中。


Q6:我该如何处理 Pipe 中的错误?

A:在 pipepipes 函数中使用 try-except 块来捕获异常。返回有意义的错误消息,或者优雅地处理错误,以便用户知道哪里出了问题。


Q7:我可以在 Pipe 中使用外部库吗?

A:可以,你可以按需导入并使用外部库。请确保相关依赖已在你的环境中正确安装和管理。


Q8:我该如何测试我的 Pipe?

A:在开发环境中运行 Open WebUI,并在界面中选择你的自定义模型来测试 Pipe。用不同的输入和配置验证它是否按预期工作。


Q9:整理 Pipe 代码时,有什么最佳实践?

A:有,建议遵循以下原则:

  • Valves 放在 Pipe 类的顶部。
  • __init__ 方法中初始化变量,主要是 self.valves
  • pipe 函数放在 __init__ 之后。
  • 使用清晰且具描述性的变量名。
  • 为代码添加注释,提升可读性。

Q10:我在哪里可以找到最新的 Open WebUI 文档?

A:请访问 Open WebUI 的官方仓库或文档站,获取最新信息,包括函数签名、示例以及迁移指南(如果有变更)。


结语

到这里,你应该已经充分了解如何在 Open WebUI 中创建和使用 Pipes。Pipe 提供了一种强大的方式,可按你的具体需求扩展和定制 Open WebUI 的能力。无论你是要集成外部 API、添加新模型,还是注入复杂逻辑,Pipe 都能为你提供实现这些功能所需的灵活性。

记住:

  • 保持清晰且一致的结构,组织你的 Pipe 类。
  • 善用 Valves 来配置可调选项。
  • 优雅地处理错误,提升用户体验。
  • 查阅最新文档,及时了解任何更新或变更。

祝你编码愉快,享受用 Pipes 扩展 Open WebUI 的过程!

本内容仅供参考,不构成任何保证、担保或合同承诺。Open WebUI 按“现状”提供。请参阅您的许可协议 以了解适用条款。