🔧 工具开发
Workspace Tools 会在你的服务器上执行任意 Python 代码。 只从可信来源安装,导入前先审查代码,并且只允许可信管理员访问 Workspace。赋予用户创建或导入 Tools 的能力,等同于给他们服务器的 shell 访问权。完整说明请参阅 插件安全警告。
编写自定义工具包
工具包定义在一个 Python 文件中,文件顶部使用包含元数据的文档字符串,并提供一个 Tools 类。
工具方法通常应定义为 async,以确保与未来版本的 Open WebUI 兼容。后端正在逐步转向完全异步执行,同步函数在未来版本中可能会阻塞执行或引发问题。
顶层文档字符串示例
"""
title: String Inverse
author: Your Name
author_url: https://website.com
git_url: https://github.com/username/string-reverse.git
description: This tool calculates the inverse of a string
required_open_webui_version: 0.4.0
requirements: langchain-openai, langgraph, ollama, langchain_ollama
version: 0.4.0
licence: MIT
"""当你创建新工具(同样适用于 function 和 skill)时,编辑器会在你粘贴或输入代码的过程中读取 frontmatter,并在你尚未填写的情况下,从 title 和 description 自动填充 Name、ID 和 Description 字段。它永远不会覆盖你已经输入的内容,并且在编辑已有条目时也不会重新派生这些字段——你不再需要重复敲写源代码中已经声明过的元数据。
Tools 类
Tools 必须定义为名为 Tools 的类中的方法,并可选地包含名为 Valves 和 UserValves 的子类,例如:
class Tools:
def __init__(self):
"""Initialize the Tool."""
self.valves = self.Valves()
class Valves(BaseModel):
api_key: str = Field("", description="Your API key here")
async def reverse_string(self, string: str) -> str:
"""
Reverses the input string.
:param string: The string to reverse
"""
# example usage of valves
if self.valves.api_key != "42":
return "Wrong API key"
return string[::-1]类型注解
每个工具的参数都必须提供类型注解。类型也可以是嵌套的,例如 queries_and_docs: list[tuple[str, int]]。这些类型注解会用于生成发送给模型的 JSON Schema。没有类型注解的工具也能运行,但一致性会差很多。
Valves 和 UserValves(可选,但强烈建议)
Valves 和 UserValves 用于定义工具的可配置项。你可以在专门的 Valves & UserValves 页面 了解更多。
可选参数
下面是你的工具可以依赖的可选参数列表:
__event_emitter__:发出事件(见下一节)__event_call__:与事件发射器类似,但可用于用户交互。事件调用的服务器端超时时间可通过WEBSOCKET_EVENT_CALLER_TIMEOUT配置(默认:300 秒)。__user__:包含用户信息的字典,其中还包含__user__["valves"]中的UserValves对象。__metadata__:聊天元数据字典__messages__:历史消息列表__files__:附加文件__model__:模型信息字典__oauth_token__:包含用户有效、自动刷新的 OAuth 令牌负载的字典。这是获取用户令牌并进 行认证 API 调用的 新式、推荐且安全 的方式。该字典通常包含access_token、id_token以及其他提供商相关数据。
关于 __oauth_token__ 的更多信息,以及如何配置将该令牌传递给工具,请参阅 环境变量文档页 中的 OAuth 部分,以及 SSO 文档。
只需像上面示例中的 __user__ 一样,把它们作为参数添加到你的 Tool 类任意方法中即可。
在工具中使用 OAuth 令牌
当你构建需要代表用户与外部 API 交互的工具时,现在可以直接访问他们的 OAuth 令牌。这消除了脆弱的 cookie 抓取方式,并能确保令牌始终有效。
示例: 使用用户访问令牌调用外部 API 的工具。
import httpx
from typing import Optional
class Tools:
# ... other class setup ...
async def get_user_profile_from_external_api(self, __oauth_token__: Optional[dict] = None) -> str:
"""
Fetches user profile data from a secure external API using their OAuth access token.
:param __oauth_token__: Injected by Open WebUI, contains the user's token data.
"""
if not __oauth_token__ or "access_token" not in __oauth_token__:
return "Error: User is not authenticated via OAuth or token is unavailable."
access_token = __oauth_token__["access_token"]
headers = {
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json"
}
try:
async with httpx.AsyncClient() as client:
response = await client.get("https://api.my-service.com/v1/profile", headers=headers)
response.raise_for_status() # Raise an exception for bad status codes
return f"API Response: {response.json()}"
except httpx.HTTPStatusError as e:
return f"Error: Failed to fetch data from API. Status: {e.response.status_code}"
except Exception as e:
return f"An unexpected error occurred: {e}"事件发射器
事件发射器用于向聊天界面补充额外信息。和 Filter 的 outlet 类似,事件发射器可以向聊天中追加内容;不同的是,它们不能移除信息。此外,发射器可以在工具执行的任意阶段启用。
Default Mode 属于旧版且不再支持——完整策略请参阅 Tool Calling Modes 指南。请把工具写成能在 Native(Agentic)模式 下正常工作的形式,因为这才是未来唯一支持的模式。下面的事件发射器兼容性矩阵仍保留 Default Mode 的行为,仅供历史参考以及尚未迁移的旧工具维护者使用——新工具不应依赖只在 Default Mode 中生效的事件类型。如果你的工具 UX 本质上依赖某个只在 Default Mode 下可用的事件(message、chat:message:delta、chat:message、replace 的中途流式写入),请围绕 Native 兼容事件(status、notification、citation、chat:message:files、confirmation、chat:message:follow_ups)重新设计,而不是要求用户切换到旧模式。
事件发射器在两种函数调用模式下的行为不同。函数调用模式由 function_calling 参数控制:
- Native Mode(Agentic Mode)(
function_calling = "native")—— 唯一受支持的模式。它使用模型的结构化工具调用 API。可用的事件发射器表面有限——请查看下方矩阵以了解具体支持哪些事件类型。 - Default Mode(
function_calling = "default")—— 旧版、基于 prompt injection 的模式。虽然事件发射器能力完整,但该模式本身不受支持,不应在新部署中选择。
完整的模式策略、模型要求和配置请参阅 Tool Calling Modes 指南。简单来说:请使用 Native Mode;下方矩阵会告诉你哪些事件类型在该模式下可用。
函数调用模式配置
你可以在两个位置配置函数调用模式:
- 管理员级别:进入 Admin Panel > Settings > Models > Model Specific Settings > Advanced Parameters > Function Calling(设置为
Default或Native)。 - 按请求设置:在 Chat Controls > Advanced Params 中设置
params.function_calling = "native"或"default"。
如果模型似乎无法调用工具,请确认它已被启用(可在 Model 页面或聊天输入框旁边的 + 按钮中开启)。
编写自定义工具时请注意:当启用 Native Mode 时,Open WebUI 还会提供 内置系统工具。有关内置工具、函数调用模式和模型要求的详情,请参阅 Tool Calling Modes Guide。
完整事件类型兼容性矩阵
下面是各个事件类型在不同函数调用模式下行为的完整说明:
| 事件类型 | Default Mode 行为 | Native Mode 行为 | 状态 |
|---|---|---|---|
status | ✅ 完全支持 —— 在工具执行期间更新状态历史 | ✅ 完全相同 —— 跟踪函数执行状态 | 兼容 |
message | ✅ 完全支持 —— 在流式过程中追加增量内容 | ❌ 已损坏 —— 会被原生 completion 快照覆盖 | 不兼容 |
chat:completion | ✅ 完 全支持 —— 处理流式响应和 completion 数据 | ⚠️ 受限 —— 会携带函数结果,但可能覆盖工具更新 | 部分兼容 |
chat:message:delta | ✅ 完全支持 —— 在执行期间流式输出 delta 内容 | ❌ 已损坏 —— 内容会被原生函数快照替换 | 不兼容 |
chat:message | ✅ 完全支持 —— 干净地替换整条消息内容 | ❌ 已损坏 —— 会被后续原生 completion 覆盖 | 不兼容 |
replace | ✅ 完全支持 —— 可精确控制内容替换 | ❌ 已损坏 —— 被替换的内容会立刻再次被覆盖 | 不兼容 |
chat:message:files / files | ✅ 完全支持 —— 处理消息中的文件附件 | ✅ 完全相同 —— 处理来自函数输出的文件 | 兼容 |
chat:message:error | ✅ 完全支持 —— 显示错误通知 | ✅ 完全相同 —— 显示函数调用错误 | 兼容 |
chat:message:follow_ups | ✅ 完全支持 —— 显示后续建议 | ✅ 完全相同 —— 显示函数生成的后续问题 | 兼容 |
chat:title | ✅ 完全支持 —— 动态更新聊天标题 | ✅ 完全相同 —— 根据函数交互更新标题 | 兼容 |
chat:tags | ✅ 完全支持 —— 修改聊天标签 | ✅ 完全相同 —— 管理函数输出中的标签 | 兼容 |
chat:tasks:cancel | ✅ 完全支持 —— 取消进行中的任务 | ✅ 完全相同 —— 取消原生函数执行 | 兼容 |
citation / source | ✅ 完全支持 —— 处理带完整元数据的引用 | ✅ 完全相同 —— 处理函数生成的引用 | 兼容 |
notification | ✅ 完全支持 —— 显示 toast 通知 | ✅ 完全相同 —— 显示函数执行通知 | 兼容 |
confirmation | ✅ 完全支持 —— 请求用户确认 | ✅ 完全相同 —— 确认函数执行 | 兼容 |
execute | ✅ 完全支持 —— 动态执行代码 | ✅ 完全相同 —— 运行函数生成的代码 | 兼容 |
input | ✅ 完全支持 —— 提供完整 UI 供用户输入 | ✅ 完全相同 —— 收集函数所需输入 | 兼容 |
为什么原生模式会破坏某些事件类型
在 Native Mode 下,服务器会根据流式模型输出构造内容块,并反复发出带完整序列化内容快照的 "chat:completion" 事件。客户端会把这些快照视为权威数据,并彻底替换消息内容,从而实际上覆盖掉之前由工具发出的 message、chat:message 或 replace 更新。
技术细节:
middleware.py会把工具直接加入表单数据,以便原生模型处理- 流式处理器会通过
chat:completion事件反复发送内容快照 - 客户端的
chatCompletionEventHandler会把快照当作完整替换:message.content = content - 这会导致工具发出的内容更新闪烁并消失
最佳实践与建议
对于需要实时 UI 更新的工具:
class Tools:
def __init__(self):
# 记录函数调用模式要求的说明
self.description = "此工具需要 Default function calling mode 才能完整运行"
async def interactive_tool(self, prompt: str, __event_emitter__=None) -> str:
"""
⚠️ 该工具需要 function_calling = "default" 才能正确发出事件
"""
if not __event_emitter__:
return "事件发射器不可用——请确认已启用 Default function calling mode"
# 在 Default 模式下可以安全使用 message 事件
await __event_emitter__({
"type": "message",
"data": {"content": "正在处理第 1 步..."}
})
# ... rest of tool logic对于必须在两种模式下都能工作的工具:
async def universal_tool(self, prompt: str, __event_emitter__=None, __metadata__=None) -> str:
"""
设计为可在 Default 与 Native 两种函数调用模式下工作的工具
"""
# 检查是否处于 native 模式(这只是一个粗略启发式)
is_native_mode = __metadata__ and __metadata__.get("params", {}).get("function_calling") == "native"
if __event_emitter__:
if is_native_mode:
# Native 模式下只使用兼容的事件类型
await __event_emitter__({
"type": "status",
"data": {"description": "正在 Native 模式下处理...", "done": False}
})
else:
# Default 模式下可使用完整事件能力
await __event_emitter__({
"type": "message",
"data": {"content": "正在以完整事件支持处理..."}
})
# ... tool logic here
if __event_emitter__:
await __event_emitter__({
"type": "status",
"data": {"description": "已成功完成", "done": True}
})
return "工具执行已完成"事件发射器问题排查
Native 模式冲突的症状:
- 工具发出的消息短暂出现后又消失
- 工具执行过程中内容出现闪烁
message或replace事件似乎被忽略- 状态更新有效,但内容更新不会持久保留
解决方案:
- 切换到 Default 模式:在模型设置中把
function_calling从"native"改为"default" - 使用兼容的事件类型:在 Native 模式下坚持使用
status、citation、notification和其他兼容事件类型 - 实 现模式检测:添加逻辑检测函数调用模式,并据此调整事件使用方式
- 考虑混合方案:核心功能使用兼容事件,并在必要时优雅降级
调试你的事件发射器:
async def debug_events_tool(self, __event_emitter__=None, __metadata__=None) -> str:
"""Debug tool to test event emitter functionality"""
if not __event_emitter__:
return "No event emitter available"
# Test various event types
test_events = [
{"type": "status", "data": {"description": "Testing status events", "done": False}},
{"type": "message", "data": {"content": "Testing message events (may not work in native mode)"}},
{"type": "notification", "data": {"content": "Testing notification events"}},
]
mode_info = "Unknown"
if __metadata__:
mode_info = __metadata__.get("params", {}).get("function_calling", "default")
await __event_emitter__({
"type": "status",
"data": {"description": f"Function calling mode: {mode_info}", "done": False}
})
for i, event in enumerate(test_events):
await asyncio.sleep(1) # Space out events
await __event_emitter__(event)
await __event_emitter__({
"type": "status",
"data": {"description": f"Sent event {i+1}/{len(test_events)}", "done": False}
})
await __event_emitter__({
"type": "status",
"data": {"description": "Event testing complete", "done": True}
})
return f"Event testing completed in {mode_info} mode. Check for missing or flickering content."下面这些特定事件类型的行为各不相同:
Status Events ✅ FULLY COMPATIBLE
Status 事件在 Default 和 Native 两种函数调用模式下行为完全一致。 这是在工具执行期间提供实时反馈最可靠的事件类型。
Status 事件会在消息执行步骤时添加实时状态更新,可以在工具执行的任意阶段发出。状态消息会显示在消息内容正上方,对于会延迟 LLM 响应或处理大量信息的工具来说非常关键。
Basic Status Event Structure:
await __event_emitter__({
"type": "status",
"data": {
"description": "Message that shows up in the chat",
"done": False, # False = still processing, True = completed
"hidden": False # False = visible, True = auto-hide when done
}
})Status 事件参数:
description:显示给用户的状态文本done:布尔值,表示此状态是否代表完成hidden:布尔值,当设置为done: True后是否自动隐藏该状态
基础 Status 示例
async def data_processing_tool(
self, data_file: str, __user__: dict, __event_emitter__=None
) -> str:
"""
Processes a large data file with status updates
✅ Works in both Default and Native function calling modes
"""
if not __event_emitter__:
return "Processing completed (no status updates available)"
# Step 1: Loading
await __event_emitter__({
"type": "status",
"data": {"description": "Loading data file...", "done": False}
})
# Simulate loading time
await asyncio.sleep(2)
# Step 2: Processing
await __event_emitter__({
"type": "status",
"data": {"description": "Analyzing 10,000 records...", "done": False}
})
# Simulate processing time
await asyncio.sleep(3)
# Step 3: Completion
await __event_emitter__({
"type": "status",
"data": {"description": "Analysis complete!", "done": True, "hidden": False}
})
return "Data analysis completed successfully. Found 23 anomalies."带错误处理的高级 Status 示例
async def api_integration_tool(
self, endpoint: str, __event_emitter__=None
) -> str:
"""
Integrates with external API with comprehensive status tracking
✅ Compatible with both function calling modes
"""
if not __event_emitter__:
return "API integration completed (no status available)"
try:
await __event_emitter__({
"type": "status",
"data": {"description": "Connecting to API...", "done": False}
})
# Simulate API connection
await asyncio.sleep(1.5)
await __event_emitter__({
"type": "status",
"data": {"description": "Authenticating...", "done": False}
})
# Simulate authentication
await asyncio.sleep(1)
await __event_emitter__({
"type": "status",
"data": {"description": "Fetching data...", "done": False}
})
# Simulate data fetching
await asyncio.sleep(2)
# Success status
await __event_emitter__({
"type": "status",
"data": {"description": "API integration successful", "done": True}
})
return "Successfully retrieved 150 records from the API"
except Exception as e:
# Error status - always visible for debugging
await __event_emitter__({
"type": "status",
"data": {"description": f"Error: {str(e)}", "done": True, "hidden": False}
})
return f"API integration failed: {str(e)}"多步骤进度 Status 示例
async def batch_processor_tool(
self, items: list, __event_emitter__=None
) -> str:
"""
Processes items in batches with detailed progress tracking
✅ Works perfectly in both function calling modes
"""
if not __event_emitter__ or not items:
return "Batch processing completed"
total_items = len(items)
batch_size = 10
completed = 0
for i in range(0, total_items, batch_size):
batch = items[i:i + batch_size]
batch_num = (i // batch_size) + 1
total_batches = (total_items + batch_size - 1) // batch_size
# Update status for current batch
await __event_emitter__({
"type": "status",
"data": {
"description": f"Processing batch {batch_num}/{total_batches} ({len(batch)} items)...",
"done": False
}
})
# Simulate batch processing
await asyncio.sleep(1)
completed += len(batch)
# Progress update
progress_pct = int((completed / total_items) * 100)
await __event_emitter__({
"type": "status",
"data": {
"description": f"Progress: {completed}/{total_items} items ({progress_pct}%)",
"done": False
}
})
# Final completion status
await __event_emitter__({
"type": "status",
"data": {
"description": f"Batch processing complete! Processed {total_items} items",
"done": True
}
})
return f"Successfully processed {total_items} items in {total_batches} batches"Message 事件 ⚠️ 仅限 Default 模式
🚨 关键警告:Message 事件与 Native 函数调用模式不兼容!
Message 事件(message、chat:message、chat:message:delta、replace)允许你在工具执行的任意阶段追加或修改消息内容。这使得嵌入图片、渲染网页、流式更新内容以及创建丰富交互体验成为可能。
不过,这些事件类型存在严重兼容性问题:
- ✅ Default Mode:功能完整——内容会持久保存并正常显示
- ❌ Native Mode:已损坏——内容会被 completion 快照覆盖并消失
为什么 Message 事件在 Native 模式下会失效:
Native 函数调用会反复发出带完整内容快照的 chat:completion 事件,这些快照会完全替换消息内容,导致工具发出的任何消息更新闪烁并消失。
安全的 Message 事件结构(仅限 Default 模式):
await __event_emitter__({
"type": "message", # Also: "chat:message", "chat:message:delta", "replace"
"data": {"content": "This content will be appended/replaced in the chat"},
# Note: message types do NOT require a "done" condition
})Message 事件类型:
message/chat:message:delta:向现有消息追加内容chat:message/replace:替换整条消息内容- 这两类事件在 Native 模式下都会被覆盖
安全的 Message 流式输出(Default 模式)
async def streaming_content_tool(
self, query: str, __event_emitter__=None, __metadata__=None
) -> str:
"""
Streams content updates during processing
⚠ ️ REQUIRES function_calling = "default" - Will not work in Native mode!
"""
# Check function calling mode (rough detection)
mode = "unknown"
if __metadata__:
mode = __metadata__.get("params", {}).get("function_calling", "default")
if mode == "native":
return "❌ This tool requires Default function calling mode. Message streaming is not supported in Native mode due to content overwriting issues."
if not __event_emitter__:
return "Event emitter not available"
# Stream progressive content updates
content_chunks = [
"🔍 **Phase 1: Research**\nGathering information about your query...\n\n",
"📊 **Phase 2: Analysis**\nAnalyzing gathered data patterns...\n\n",
"✨ **Phase 3: Synthesis**\nGenerating insights and recommendations...\n\n",
"📝 **Phase 4: Final Report**\nCompiling comprehensive results...\n\n"
]
accumulated_content = ""
for i, chunk in enumerate(content_chunks):
accumulated_content += chunk
# Append this chunk to the message
await __event_emitter__({
"type": "message",
"data": {"content": chunk}
})
# Show progress status
await __event_emitter__({
"type": "status",
"data": {
"description": f"Processing phase {i+1}/{len(content_chunks)}...",
"done": False
}
})
# Simulate processing time
await asyncio.sleep(2)
# Final completion
await __event_emitter__({
"type": "status",
"data": {"description": "Content streaming complete!", "done": True}
})
return "Content streaming completed successfully. All phases processed."动态内容替换(Default 模式)
async def live_dashboard_tool(
self, __event_emitter__=None, __metadata__=None
) -> str:
"""
Creates a live-updating dashboard using content replacement
⚠️ ONLY WORKS in Default function calling mode
"""
# Verify we're not in Native mode
mode = __metadata__.get("params", {}).get("function_calling", "default") if __metadata__ else "default"
if mode == "native":
return """
❌ **Native Mode Incompatibility**
This dashboard tool cannot function in Native mode because:
- Content replacement events get overwritten by completion snapshots
- Live updates will flicker and disappear
- Real-time data will not persist in the interface
**Solution:** Switch to Default function calling mode in Model Settings → Advanced Params → Function Calling = "Default"
"""
if not __event_emitter__:
return "Dashboard created (static mode - no live updates)"
# Create initial dashboard
initial_dashboard = """
# 📊 Live System Dashboard
## System Status: 🟡 Initializing...
### Current Metrics:
- **CPU Usage**: Loading...
- **Memory**: Loading...
- **Active Users**: Loading...
- **Response Time**: Loading...
---
*Last Updated: Initializing...*
"""
await __event_emitter__({
"type": "replace",
"data": {"content": initial_dashboard}
})
# Simulate live data updates
updates = [
{
"status": "🟢 Online",
"cpu": "23%",
"memory": "64%",
"users": "1,247",
"response": "145ms"
},
{
"status": "🟢 Optimal",
"cpu": "18%",
"memory": "61%",
"users": "1,352",
"response": "132ms"
},
{
"status": "🟡 Busy",
"cpu": "67%",
"memory": "78%",
"users": "1,891",
"response": "234ms"
}
]
for i, data in enumerate(updates):
await asyncio.sleep(3) # Simulate data collection delay
updated_dashboard = f"""
# 📊 Live System Dashboard
## System Status: {data['status']}
### Current Metrics:
- **CPU Usage**: {data['cpu']}
- **Memory**: {data['memory']}
- **Active Users**: {data['users']}
- **Response Time**: {data['response']}
---
*Last Updated: {datetime.now().strftime('%H:%M:%S')}*
*Update {i+1}/{len(updates)}*
"""
# Replace entire dashboard content
await __event_emitter__({
"type": "replace",
"data": {"content": updated_dashboard}
})
# Status update
await __event_emitter__({
"type": "status",
"data": {"description": f"Dashboard updated ({i+1}/{len(updates)})", "done": False}
})
await __event_emitter__({
"type": "status",
"data": {"description": "Live dashboard monitoring complete", "done": True}
})
return "Dashboard monitoring session completed."模式安全的消息工具
async def adaptive_content_tool(
self, content_type: str, __event_emitter__=None, __metadata__=None
) -> str:
"""
Adapts behavior based on function calling mode
✅ Provides best possible experience in both modes
"""
# Detect function calling mode
mode = "default" # Default assumption
if __metadata__:
mode = __metadata__.get("params", {}).get("function_calling", "default")
if not __event_emitter__:
return f"Generated {content_type} content (no real-time updates available)"
# Mode-specific behavior
if mode == "native":
# Use only compatible events in Native mode
await __event_emitter__({
"type": "status",
"data": {"description": f"Generating {content_type} content in Native mode...", "done": False}
})
await asyncio.sleep(2)
await __event_emitter__({
"type": "status",
"data": {"description": "Content generation complete", "done": True}
})
# Return content normally - no message events
return f"""
# {content_type.title()} Content
**Mode**: Native Function Calling (Limited Event Support)
Generated content here... This content is returned as the tool result rather than being streamed via message events.
*Note: Live content updates are not available in Native mode due to event compatibility limitations.*
"""
else: # Default mode
# Full message event functionality available
await __event_emitter__({
"type": "status",
"data": {"description": "Generating content with full streaming support...", "done": False}
})
# Stream content progressively
progressive_content = [
f"# {content_type.title()} Content\n\n**Mode**: Default Function Calling ✅\n\n",
"## Section 1: Introduction\nStreaming content in real-time...\n\n",
"## Section 2: Details\nAdding detailed information...\n\n",
"## Section 3: Conclusion\nFinalizing content delivery...\n\n",
"*✅ Content streaming completed successfully!*"
]
for i, chunk in enumerate(progressive_content):
await __event_emitter__({
"type": "message",
"data": {"content": chunk}
})
await __event_emitter__({
"type": "status",
"data": {"description": f"Streaming section {i+1}/{len(progressive_content)}...", "done": False}
})
await asyncio.sleep(1.5)
await __event_emitter__({
"type": "status",
"data": {"description": "Content streaming complete!", "done": True}
})
return "Content has been streamed above with full Default mode capabilities."Citations ✅ 完全兼容
Citation 事件在 Default 与 Native 两种函数调 用模式下行为完全一致。 这种事件类型会在聊天界面中提供来源引用,用户可以点击查看原始材料。
Citation 对于从外部来源、数据库或文档中检索信息的工具至关重要。它们提供透明度,并允许用户验证信息来源。
Citation 事件结构:
await __event_emitter__({
"type": "citation",
"data": {
"document": [content], # Array of content strings
"metadata": [ # Array of metadata objects
{
"date_accessed": datetime.now().isoformat(),
"source": title,
"author": "Author Name", # Optional
"publication_date": "2024-01-01", # Optional
"url": "https://source-url.com" # Optional
}
],
"source": {"name": title, "url": url} # Primary source info
}
})重要的 Citation 配置:
实现自定义引用时,你 必须 在 Tools 类中禁用自动引用:
def __init__(self):
self.citation = False # REQUIRED - prevents automatic citations from overriding custom ones⚠️ 关键引用警告:
如果你设置了 self.citation = True(或者没有将其设为 False),自动引用会替换你发送的任何自定义引用。使用自定义 citation 事件时,请始终关闭自动引用。
基础 Citation 示例
class Tools:
def __init__(self):
self.citation = False # Disable automatic citations
async def research_tool(
self, topic: str, __event_emitter__=None
) -> str:
"""
Researches a topic and provides proper citations
✅ Works identically in both Default and Native modes
"""
if not __event_emitter__:
return "Research completed (citations not available)"
# Simulate research findings
sources = [
{
"title": "Advanced AI Systems",
"url": "https://example.com/ai-systems",
"content": "Artificial intelligence systems have evolved significantly...",
"author": "Dr. Jane Smith",
"date": "2024-03-15"
},
{
"title": "Machine Learning Fundamentals",
"url": "https://example.com/ml-fundamentals",
"content": "The core principles of machine learning include...",
"author": "Prof. John Doe",
"date": "2024-02-20"
}
]
# Emit citations for each source
for source in sources:
await __event_emitter__({
"type": "citation",
"data": {
"document": [source["content"]],
"metadata": [
{
"date_accessed": datetime.now().isoformat(),
"source": source["title"],
"author": source["author"],
"publication_date": source["date"],
"url": source["url"]
}
],
"source": {
"name": source["title"],
"url": source["url"]
}
}
})
return f"Research on '{topic}' completed. Found {len(sources)} relevant sources with detailed citations."高级多来源 Citation 示例
async def comprehensive_analysis_tool(
self, query: str, __event_emitter__=None
) -> str:
"""
Performs comprehensive analysis with multiple source types
✅ Full compatibility across all function calling modes
"""
if not __event_emitter__:
return "Analysis completed"
# Multiple source types with rich metadata
research_sources = {
"academic": [
{
"title": "Neural Network Architecture in Modern AI",
"authors": ["Dr. Sarah Chen", "Prof. Michael Rodriguez"],
"journal": "Journal of AI Research",
"volume": "Vol. 45, Issue 2",
"pages": "123-145",
"doi": "10.1000/182",
"date": "2024-01-15",
"content": "This comprehensive study examines the evolution of neural network architectures..."
}
],
"web_sources": [
{
"title": "Industry AI Implementation Trends",
"url": "https://tech-insights.com/ai-trends-2024",
"site_name": "TechInsights",
"published": "2024-03-01",
"content": "Recent industry surveys show that 78% of companies are implementing AI solutions..."
}
],
"reports": [
{
"title": "Global AI Market Report 2024",
"organization": "International Tech Research Institute",
"report_number": "ITRI-2024-AI-001",
"date": "2024-02-28",
"content": "The global artificial intelligence market is projected to reach $1.8 trillion by 2030..."
}
]
}
citation_count = 0
# Process academic sources
for source in research_sources["academic"]:
citation_count += 1
await __event_emitter__({
"type": "citation",
"data": {
"document": [source["content"]],
"metadata": [
{
"date_accessed": datetime.now().isoformat(),
"source": source["title"],
"authors": source["authors"],
"journal": source["journal"],
"volume": source["volume"],
"pages": source["pages"],
"doi": source["doi"],
"publication_date": source["date"],
"type": "academic_journal"
}
],
"source": {
"name": f"{source['title']} - {source['journal']}",
"url": f"https://doi.org/{source['doi']}"
}
}
})
# Process web sources
for source in research_sources["web_sources"]:
citation_count += 1
await __event_emitter__({
"type": "citation",
"data": {
"document": [source["content"]],
"metadata": [
{
"date_accessed": datetime.now().isoformat(),
"source": source["title"],
"site_name": source["site_name"],
"publication_date": source["published"],
"url": source["url"],
"type": "web_article"
}
],
"source": {
"name": source["title"],
"url": source["url"]
}
}
})
# Process reports
for source in research_sources["reports"]:
citation_count += 1
await __event_emitter__({
"type": "citation",
"data": {
"document": [source["content"]],
"metadata": [
{
"date_accessed": datetime.now().isoformat(),
"source": source["title"],
"organization": source["organization"],
"report_number": source["report_number"],
"publication_date": source["date"],
"type": "research_report"
}
],
"source": {
"name": f"{source['title']} - {source['organization']}",
"url": f"https://reports.example.com/{source['report_number']}"
}
}
})
return f"""
# Analysis Complete
Comprehensive analysis of '{query}' has been completed using {citation_count} authoritative sources:
- **{len(research_sources['academic'])}** Academic journal articles
- **{len(research_sources['web_sources'])}** Industry web sources
- **{len(research_sources['reports'])}** Research reports
All sources have been properly cited and are available for review by clicking the citation links above.
"""数据库 Citation 工具
async def database_query_tool(
self, sql_query: str, __event_emitter__=None
) -> str:
"""
Queries database and provides data citations
✅ Works perfectly in both function calling modes
"""
if not __event_emitter__:
return "Database query executed"
# Simulate database results with citation metadata
query_results = [
{
"record_id": "USR_001247",
"data": "John Smith, Software Engineer, joined 2023-01-15",
"table": "employees",
"last_updated": "2024-03-10T14:30:00Z",
"updated_by": "admin_user"
},
{
"record_id": "USR_001248",
"data": "Jane Wilson, Product Manager, joined 2023-02-20",
"table": "employees",
"last_updated": "2024-03-08T09:15:00Z",
"updated_by": "hr_system"
}
]
# Create citations for each database record
for i, record in enumerate(query_results):
await __event_emitter__({
"type": "citation",
"data": {
"document": [f"Database Record: {record['data']}"],
"metadata": [
{
"date_accessed": datetime.now().isoformat(),
"source": f"Database Table: {record['table']}",
"record_id": record['record_id'],
"last_updated": record['last_updated'],
"updated_by": record['updated_by'],
"query": sql_query,
"type": "database_record"
}
],
"source": {
"name": f"Record {record['record_id']} - {record['table']}",
"url": f"database://internal/tables/{record['table']}/{record['record_id']}"
}
}
})
return f"""
# Database Query Results
Executed query: `{sql_query}`
Retrieved **{len(query_results)}** records with complete citation metadata. Each record includes:
- Record ID and source table
- Last modification timestamp
- Update attribution
- Full audit trail
All data sources have been properly cited for transparency and verification.
"""Additional Compatible Event Types ✅
以下事件类型在 Default 和 Native 两种函数调用模式下行为完全一致:
Notification 事件
await __event_emitter__({
"type": "notification",
"data": {"content": "Toast notification message"}
})文件事件
await __event_emitter__({
"type": "files", # or "chat:message:files"
"data": {"files": [{"name": "report.pdf", "url": "/files/report.pdf"}]}
})后续问题事件
await __event_emitter__({
"type": "chat:message:follow_ups",
"data": {"follow_ups": ["What about X?", "Tell me more about Y"]}
})标题更新事件
await __event_emitter__({
"type": "chat:title",
"data": {"title": "New Chat Title"}
})标签事件
await __event_emitter__({
"type": "chat:tags",
"data": {"tags": ["research", "analysis", "completed"]}
})错误事件
await __event_emitter__({
"type": "chat:message:error",
"data": {
"error": {
"content": "Error message to display"
}
}
})确认事件
await __event_emitter__({
"type": "confirmation",
"data": {"message": "Are you sure you want to continue?"}
})输入请求事件
await __event_emitter__({
"type": "input",
"data": {"prompt": "Please enter additional information:"}
})代码执行事件
await __event_emitter__({
"type": "execute",
"data": {"code": "print('Hello from tool-generated code!')"}
})函数调用模式综合指南
Default Mode 属于旧版且不再支持。新工具应编写为在 Native Mode 下工作。下面的模式对比细节仅保留给仍在维护旧版 Default-Mode 工具的人,帮助他们了解迁移时需要弥补的差距——这 不是 对新项目选择 Default Mode 的建议。
如果你的工具设计依赖某个只在 Default Mode 中存在的事件类型,请围绕 Native 兼容面重新设计 UX(status、notification、citation、chat:message:files、confirmation、chat:message:follow_ups,再加上从工具方法返回最终内容块)。要求用户为了运行你的工具而切换到旧模式,这种做法今后不可接受。
模式对比概览(仅供迁移参考——Native 是唯一受支持的目标):
| 方面 | Default Mode(旧版 / 不支持) | Native Mode(受支持) |
|---|---|---|
| 状态 | ❌ 旧版,已不再支持 | ✅ 新工具必须使用 |
| 延迟 | 更高 —— 完整的 prompt injection 流水线 | 更低 —— 直接工具调用 API |
| KV 缓存 | ❌ 每一轮都会失效 | ✅ 保留 |
| 事件支持 | 完整的事件类型表面 | 有限的表面(见上方矩阵) |
| 流式处理 | 中途内容重写可用 | 中途重写会被 completion 快照覆盖 |
| Status / 通知 / 引用 | ✅ | ✅ |
消息内容事件(message、chat:message、replace) | ✅ | ❌ 会被覆盖 |
| 内置系统工具(Memory、Notes、Knowledge、Web Search、Image Gen、Code Interpreter) | ❌ | ✅ |
如何面向 Native Mode 设计:
- 将工具的主要输出作为工具方法的 返回值。该内容会作为模型响应的一部分发出,不会受到事件覆盖问题影响。
- 使用
status和notification提供进度与面向用户的提醒——它们在 Native Mode 下完全受支持。 - 使用
citation/source事件提供引用——在 Native Mode 下完全受支持。 - 使用通过
__event_call__的confirmation/input处理交互流程——在 Native Mode 下完全受支持。 - 通过
chat:message:files事件附加文件——在 Native Mode 下完全受支持。 - 避免在流式过程中依赖
message、chat:message:delta、chat:message或replace——这四种事件在 Native Mode 下会失效。如果你需要渐进式输出,请连续发出一系列status更新,并从工具返回最终内容。
将 Default Mode 工具迁移到 Native Mode
将 Default-Mode 工具迁移到 Native 时,常见的改写方式如下:
- 把中途追加内容的
message事 件改成一个单独的最终返回值,返回已组装好的完整内容。 - 把基于
replace的进度条改成一系列status事件(例如"description": "第 3/5 步——正在获取...")。 - 把基于
chat:message的仪表盘改成返回渲染后的仪表盘内容(Markdown / HTML 都可以)。 - 保持
citation、notification、confirmation和文件附件不变——它们在两种模式下行为一致。
通用兼容模式:
async def mode_adaptive_tool(
self, query: str, __event_emitter__=None, __metadata__=None
) -> str:
"""
Tool that adapts its behavior based on function calling mode
✅ Provides optimal experience in both modes
"""
# Detect current mode
mode = "default"
if __metadata__:
mode = __metadata__.get("params", {}).get("function_calling", "default")
is_native_mode = (mode == "native")
if not __event_emitter__:
return "Tool executed successfully (no event support)"
# Always safe: status updates work in both modes
await __event_emitter__({
"type": "status",
"data": {"description": f"Running in {mode} mode...", "done": False}
})
# Mode-specific logic
if is_native_mode:
# Native mode: use compatible events only
await __event_emitter__({
"type": "status",
"data": {"description": "Processing with native efficiency...", "done": False}
})
# Simulate processing
await asyncio.sleep(1)
# Return results directly - no message streaming
result = f"Query '{query}' processed successfully in Native mode."
else:
# Default mode: full event capabilities
await __event_emitter__({
"type": "message",
"data": {"content": f"🔍 **Processing Query**: {query}\n\n"}
})
await __event_emitter__({
"type": "status",
"data": {"description": "Analyzing with full streaming...", "done": False}
})
await asyncio.sleep(1)
await __event_emitter__({
"type": "message",
"data": {"content": "📊 **Results**: Analysis complete with detailed findings.\n\n"}
})
result = "Query processed with full Default mode capabilities."
# Final status (works in both modes)
await __event_emitter__({
"type": "status",
"data": {"description": "Processing complete!", "done": True}
})
return result🔧 事件发射器调试问题
常见问题与解决方案:
问题:内容出现后又消失
- 原因:在 Native Mode 中使用了
message/chat:message/replace事件——completion 快照会把它们覆盖掉。 - 解决方案:从工具方法返回内容,并使用
status事件来表示进度。不要切换回 Default Mode——Default Mode 是旧版且不再支持。
问题:工具似乎没有响应
- 原因:模型未启用函数调用
- 解决方案:在 Model 设置里或通过
+按钮启用工具
问题:事件完全没有触发
- 原因:缺少
__event_emitter__参数,或者它是None - 解决方案:确保该参数包含在工具方法签名中
问题:引用被覆盖
- 原因:
self.citation = True(或者没有设为False) - 解决方案:在
__init__方法中把self.citation = False
诊断工具:
async def event_diagnostics_tool(
self, __event_emitter__=None, __metadata__=None, __user__=None
) -> str:
"""
用于事件发射器调试的综合诊断工具
"""
report = ["# 🔍 Event Emitter Diagnostic Report\n"]
# 检查事件发射器是否可用
if __event_emitter__:
report.append("✅ Event emitter is available\n")
else:
report.append("❌ Event emitter is NOT available\n")
return "".join(report)
# 检查元数据是否可用
if __metadata__:
mode = __metadata__.get("params", {}).get("function_calling", "default")
report.append(f"✅ Function calling mode: **{mode}**\n")
else:
report.append("⚠️ Metadata not available (mode unknown)\n")
mode = "unknown"
# 检查用户上下文
if __user__:
report.append("✅ User context available\n")
else:
report.append("⚠️ User context not available\n")
# 测试兼容事件(两种模式都可用)
report.append("\n## 测试兼容事件:\n")
try:
await __event_emitter__({
"type": "status",
"data": {"description": "Testing status events...", "done": False}
})
report.append("✅ Status events: WORKING\n")
except Exception as e:
report.append(f"❌ Status events: FAILED - {str(e)}\n")
try:
await __event_emitter__({
"type": "notification",
"data": {"content": "Test notification"}
})
report.append("✅ Notification events: WORKING\n")
except Exception as e:
report.append(f"❌ Notification events: FAILED - {str(e)}\n")
# 测试有模式差异的事件(在 Native 模式中会失效)
report.append("\n## 测试依赖模式的事件:\n")
try:
await __event_emitter__({
"type": "message",
"data": {"content": "**Test message event** - This should appear in Default mode only\n"}
})
report.append("✅ Message events: SENT (may disappear in Native mode)\n")
except Exception as e:
report.append(f"❌ Message events: FAILED - {str(e)}\n")
# 最终状态
await __event_emitter__({
"type": "status",
"data": {"description": "Diagnostic complete", "done": True}
})
# 按模式给出建议
report.append("\n## 建议:\n")
if mode == "native":
report.append("""
⚠️ **检测到 Native Mode**:事件支持有限
- ✅ 可使用:status、citation、notification、files 事件
- ❌ 应避免:message、replace、chat:message 事件
- 💡 如需完整功能,请切换到 Default mode
""")
elif mode == "default":
report.append("""
✅ **检测到 Default Mode**:可用完整事件支持
- 所有事件类型都应当正常工作
- 最适合交互式和流式工具
""")
else:
report.append("""
❓ **未知模式**:请检查模型配置
- 确认已启用函数调用
- 验证模型是否支持工具调用
""")
return "".join(report)📚 事件发射器速查表
始终兼容(两种模式都可用):
# Status 更新——非常适合进度跟踪
await __event_emitter__({
"type": "status",
"data": {"description": "Processing...", "done": False}
})
# Citation——来源标注的核心能力
await __event_emitter__({
"type": "citation",
"data": {
"document": ["Content"],
"source": {"name": "Source", "url": "https://example.com"}
}
})
# Notification——用户提醒
await __event_emitter__({
"type": "notification",
"data": {"content": "Task completed!"}
})仅限 Default Mode(Native 中会失效):
# ⚠️ 这些在 Native 模式中会闪烁/消失
# 渐进式内容流式输出
await __event_emitter__({
"type": "message",
"data": {"content": "Streaming content..."}
})
# 内容替换
await __event_emitter__({
"type": "replace",
"data": {"content": "New complete content"}
})
# Delta 更新
await __event_emitter__({
"type": "chat:message:delta",
"data": {"content": "Additional content"}
})模式检测模式:
def get_function_calling_mode(__metadata__):
"""用于检测当前函数调用模式的工具函数"""
if not __metadata__:
return "unknown"
return __metadata__.get("params", {}).get("function_calling", "default")
# 在工具中的使用方式:
mode = get_function_calling_mode(__metadata__)
is_native = (mode == "native")
can_stream_messages = not is_native必要导入:
import asyncio
from datetime import datetime
from typing import Optional, Callable, Awaitable富 UI 元素嵌入
Tools 和 Actions 可以返回 HTML 内容,并在聊天中直接渲染为可交互的 iframe。完整文档、示例、安全注意事项和 CORS 配置请参阅专门的 Rich UI Embedding 指南。
外部包
在 Tools 定义元数据中,你可以指定自定义包。点击 Save 后,这一行会被解析,系统会一次性对所有依赖执行 pip install。
🚨 关键警告:可能存在包版本冲突
当多个工具 为同一个包定义了不同版本(例如,Tool A 需要 pandas==1.5.0,而 Tool B 需要 pandas==2.0.0)时,Open WebUI 会以非确定性顺序安装它们。这可能导致不可预测的行为,并破坏一个或多个工具。
解决这个问题最稳妥的办法是使用 OpenAPI 工具服务器。
我们强烈建议使用 OpenAPI tool server 来避免这些依赖冲突。
不要在生产环境中依赖运行时 pip 安装。 当使用 UVICORN_WORKERS > 1 或多个副本运行时,每个 worker/replica 都会在启动时独立尝试安装包。这会导致 竞态条件:并发的 pip 进程会因为 pip 的内部锁检测到同时安装而以 AssertionError 崩溃。
在生产环境中请设置 ENABLE_PIP_INSTALL_FRONTMATTER_REQUIREMENTS=False,彻底禁用运行时 pip 安装。然后使用自定义 Dockerfile 在镜像构建阶段预先安装所有必需包:
FROM ghcr.io/open-webui/open-webui:main
RUN pip install --no-cache-dir python-docx requests beautifulsoup4运行时安装只适用于单 worker 开发或家庭实验环境,也就是你在试验新函数和工具的时候。对于任何服务多个用户的部署,都应该把依赖打包进容器镜像。
请记住,pip 与 Open WebUI 运行在同一个进程中,因此安装期间 UI 会完全无响应。
系统不会采取任何措施来处理与 Open WebUI 自身依赖之间的冲突。若不小心指定了 requirements,可能会把 Open WebUI 搞坏。你也许可以通过把 open-webui 本身也写进 requirements 来绕过这个问题。
示例
"""
title: myToolName
author: myName
funding_url: [any link here will be shown behind a `Heart` button for users to show their support to you]
version: 1.0.0
# the version is displayed in the UI to help users keep track of updates.
license: GPLv3
description: [recommended]
requirements: package1>=2.7.0,package2,package3
"""