openwebui 显示 R1 思维链 [支持PPLX][更新 支持火山引擎Bots搜索]
编辑
2317
2025-01-23
v1.2.13更新 支持
pplx
搜索推理模型
v1.2.12更新 修复火山引擎bots
引用错误的问题
v1.2.11更新 支持火山引擎bots
搜索结果的显示与引用
v1.2.10更新 修正部分渠道流式返回最后出现[DONE]导致解析报错的问题
v1.2.9更新 修正错误处理中出现了二次报错问题 感谢 @happyhome 提出的问题和解决方案
v1.2.8更新 修复多用户并发时出现的思维链排序错乱的问题
v1.2.7更新 支持腾讯云
v1.2.6更新 支持火山引擎
v1.2.5更新 修复可能出现的思考吃字
问题
v1.2.4更新 修复Hyperbolic渠道</think>
后吞字的问题
v1.2.3更新 增加对api请求模型的设置
v1.2.2更新 支持硅基流动的API返回
现在官方API一直属于崩溃状态,建议使用 火山方舟API
最近在 openwebui
中使用 R1
模型,因为看不到思维过程,总觉得难受,网上找上一圈也没有发现有现成的函数可以实现,于是趁着 上班摸鱼 无聊,手搓了一个 openwebui
函数 实现了思维链的查看。
并且根据DeepSeek官方文档的要求,上次回复的思维链内容,不会参与之后的对话请求。
效果
仅思维链
PPLX联网
火山方舟联网
使用方式
https://openwebui.com/f/zgccrui/deepseek_r1/
函数
"""
title: DeepSeek R1
author: zgccrui
description: 在OpwenWebUI中显示DeepSeek R1模型的思维链 - 仅支持0.5.6及以上版本
version: 1.2.13
licence: MIT
"""
import json
import httpx
import re
from typing import AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field
import asyncio
class Pipe:
class Valves(BaseModel):
DEEPSEEK_API_BASE_URL: str = Field(
default="https://api.deepseek.com/v1",
description="DeepSeek API的基础请求地址",
)
DEEPSEEK_API_KEY: str = Field(
default="", description="用于身份验证的DeepSeek API密钥,可从控制台获取"
)
DEEPSEEK_API_MODEL: str = Field(
default="deepseek-reasoner",
description="API请求的模型名称,默认为 deepseek-reasoner ",
)
def __init__(self):
self.valves = self.Valves()
self.data_prefix = "data:"
self.emitter = None
def pipes(self):
return [
{
"id": self.valves.DEEPSEEK_API_MODEL,
"name": self.valves.DEEPSEEK_API_MODEL,
}
]
async def pipe(
self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None
) -> AsyncGenerator[str, None]:
"""主处理管道(已移除缓冲)"""
thinking_state = {"thinking": -1} # 用于存储thinking状态
self.emitter = __event_emitter__
# 用于存储联网模式下返回的参考资料列表
stored_references = []
# 联网搜索供应商 0-无 1-火山引擎 2-PPLX引擎
search_providers = 0
waiting_for_reference = False
# 验证配置
if not self.valves.DEEPSEEK_API_KEY:
yield json.dumps({"error": "未配置API密钥"}, ensure_ascii=False)
return
# 准备请求参数
headers = {
"Authorization": f"Bearer {self.valves.DEEPSEEK_API_KEY}",
"Content-Type": "application/json",
}
try:
# 模型ID提取
model_id = body["model"].split(".", 1)[-1]
payload = {**body, "model": model_id}
# 处理消息以防止连续的相同角色
messages = payload["messages"]
i = 0
while i < len(messages) - 1:
if messages[i]["role"] == messages[i + 1]["role"]:
# 插入具有替代角色的占位符消息
alternate_role = (
"assistant" if messages[i]["role"] == "user" else "user"
)
messages.insert(
i + 1,
{"role": alternate_role, "content": "[Unfinished thinking]"},
)
i += 1
# 发起API请求
async with httpx.AsyncClient(http2=True) as client:
async with client.stream(
"POST",
f"{self.valves.DEEPSEEK_API_BASE_URL}/chat/completions",
json=payload,
headers=headers,
timeout=300,
) as response:
# 错误处理
if response.status_code != 200:
error = await response.aread()
yield self._format_error(response.status_code, error)
return
# 流式处理响应
async for line in response.aiter_lines():
if not line.startswith(self.data_prefix):
continue
# 截取 JSON 字符串
json_str = line[len(self.data_prefix) :]
# 去除首尾空格后检查是否为结束标记
if json_str.strip() == "[DONE]":
return
try:
data = json.loads(json_str)
except json.JSONDecodeError as e:
error_detail = f"解析失败 - 内容:{json_str},原因:{e}"
yield self._format_error("JSONDecodeError", error_detail)
return
if search_providers == 0:
# 处理参考资料
stored_references = data.get("references", []) + data.get("citations", [])
if stored_references:
ref_count = len(stored_references)
yield '<details type="search">\n'
yield f"<summary>已搜索 {ref_count} 个网站</summary>\n"
# 如果data中有references,则说明是火山引擎的返回结果
if data.get("references"):
for idx, reference in enumerate(stored_references, 1):
yield f'> {idx}. [{reference["title"]}]({reference["url"]})\n'
yield "</details>\n"
search_providers = 1
# 如果data中有citations,则说明是PPLX引擎的返回结果
elif data.get("citations"):
for idx, reference in enumerate(stored_references, 1):
yield f'> {idx}. {reference}\n'
yield "</details>\n"
search_providers = 2
choice = data.get("choices", [{}])[0]
# 结束条件判断
if choice.get("finish_reason"):
return
# 状态机处理
state_output = await self._update_thinking_state(
choice.get("delta", {}), thinking_state
)
if state_output:
yield state_output
if state_output == "<think>":
yield "\n"
# 处理并立即发送内容
content = self._process_content(choice["delta"])
if content:
# 处理思考状态标记
if content.startswith("<think>"):
content = re.sub(r"^<think>", "", content)
yield "<think>"
await asyncio.sleep(0.1)
yield "\n"
elif content.startswith("</think>"):
content = re.sub(r"^</think>", "", content)
yield "</think>"
await asyncio.sleep(0.1)
yield "\n"
# 处理参考资料
if search_providers == 1:
# 火山引擎的参考资料处理
# 如果文本中包含"摘要",设置等待标志
if "摘要" in content:
waiting_for_reference = True
yield content
continue
# 如果正在等待参考资料的数字
if waiting_for_reference:
# 如果内容仅包含数字或"、"
if re.match(r"^(\d+|、)$", content.strip()):
numbers = re.findall(r"\d+", content)
if numbers:
num = numbers[0]
ref_index = int(num) - 1
if 0 <= ref_index < len(stored_references):
ref_url = stored_references[ref_index]["url"]
else:
ref_url = ""
content = f"[[{num}]]({ref_url})"
# 保持等待状态继续处理后续数字
# 如果遇到非数字且非"、"的内容且不含"摘要",停止等待
elif not "摘要" in content:
waiting_for_reference = False
elif search_providers == 2:
# PPLX引擎的参考资料处理
def replace_ref(m):
idx = int(m.group(1)) - 1
if 0 <= idx < len(stored_references):
return f'[[{m.group(1)}]]({stored_references[idx]})'
return f'[[{m.group(1)}]]()'
content = re.sub(r'\[(\d+)\]', replace_ref, content)
yield content
except Exception as e:
yield self._format_exception(e)
async def _update_thinking_state(self, delta: dict, thinking_state: dict) -> str:
"""更新思考状态机(简化版)"""
state_output = ""
if thinking_state["thinking"] == -1 and delta.get("reasoning_content"):
thinking_state["thinking"] = 0
state_output = "<think>"
elif (
thinking_state["thinking"] == 0
and not delta.get("reasoning_content")
and delta.get("content")
):
thinking_state["thinking"] = 1
state_output = "\n</think>\n\n"
return state_output
def _process_content(self, delta: dict) -> str:
"""直接返回处理后的内容"""
return delta.get("reasoning_content", "") or delta.get("content", "")
def _emit_status(self, description: str, done: bool = False) -> Awaitable[None]:
"""发送状态更新"""
if self.emitter:
return self.emitter(
{
"type": "status",
"data": {
"description": description,
"done": done,
},
}
)
return None
def _format_error(self, status_code: int, error: bytes) -> str:
if isinstance(error, str):
error_str = error
else:
error_str = error.decode(errors="ignore")
try:
err_msg = json.loads(error_str).get("message", error_str)[:200]
except Exception:
err_msg = error_str[:200]
return json.dumps(
{"error": f"HTTP {status_code}: {err_msg}"}, ensure_ascii=False
)
def _format_exception(self, e: Exception) -> str:
err_type = type(e).__name__
return json.dumps({"error": f"{err_type}: {str(e)}"}, ensure_ascii=False)
函数 - 低于0.5.6版本
"""
title: DeepSeek R1 (Optimized and Fixed)
author: zgccrui
description: 在OpwenWebUI中显示DeepSeek R1模型的思维链
version: 1.1.2
licence: MIT
"""
import json
import httpx
import re
from typing import AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field
class Pipe:
class Valves(BaseModel):
DEEPSEEK_API_BASE_URL: str = Field(
default="https://api.deepseek.com/v1",
description="DeepSeek API的基础请求地址",
)
DEEPSEEK_API_KEY: str = Field(
default="", description="用于身份验证的DeepSeek API密钥,可从控制台获取"
)
def __init__(self):
self.valves = self.Valves()
self.clean_pattern = re.compile(r"<details>.*?</details>\n\n", flags=re.DOTALL)
self.data_prefix = "data: "
self.thinking = -1 # -1:未开始 0:思考中 1:已回答
self.emitter = None
self.buffer_size = 3 # 响应缓冲阈值
def pipes(self):
return [{"id": "deepseek-reasoner", "name": "deepseek-reasoner"}]
async def emit_status(self, message: str = "", done: bool = False):
"""状态通知发射器"""
if self.emitter:
await self.emitter(
{"type": "status", "data": {"description": message, "done": done}}
)
async def pipe(
self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None
) -> AsyncGenerator[str, None]:
"""主处理管道"""
self.thinking = -1
self.emitter = __event_emitter__
response_buffer = []
# 验证配置
if not self.valves.DEEPSEEK_API_KEY:
yield json.dumps({"error": "未配置API密钥"}, ensure_ascii=False)
return
# 准备请求参数
headers = {
"Authorization": f"Bearer {self.valves.DEEPSEEK_API_KEY}",
"Content-Type": "application/json",
}
try:
# 模型ID提取
model_id = body["model"].split(".", 1)[-1]
payload = {**body, "model": model_id}
# 消息内容清理
for msg in payload["messages"]:
if msg.get("role") == "assistant":
msg["content"] = self.clean_pattern.sub("", msg["content"]).strip()
# 发起API请求
async with httpx.AsyncClient(http2=True) as client:
async with client.stream(
"POST",
f"{self.valves.DEEPSEEK_API_BASE_URL}/chat/completions",
json=payload,
headers=headers,
timeout=30,
) as response:
# 错误处理
if response.status_code != 200:
error = await response.aread()
yield self._format_error(response.status_code, error)
return
# 流式处理响应
async for line in response.aiter_lines():
if not line.startswith(self.data_prefix):
continue
data = json.loads(line[len(self.data_prefix) :])
choice = data.get("choices", [{}])[0]
# 结束条件判断
if choice.get("finish_reason"):
if response_buffer:
yield "".join(response_buffer)
return
# 状态机处理
state_output = await self._update_thinking_state(
choice.get("delta", {})
)
if state_output:
response_buffer.append(state_output)
# 内容处理
content = self._process_content(choice["delta"])
if content: # 仅当 content 不为空时添加到缓冲区
response_buffer.append(content)
# 缓冲控制
if len(response_buffer) >= self.buffer_size or "\n" in content:
yield "".join(response_buffer)
response_buffer.clear()
# 清理剩余缓冲
if response_buffer:
yield "".join(response_buffer)
except Exception as e:
yield self._format_exception(e)
async def _update_thinking_state(self, delta: dict) -> str:
"""更新思考状态机(异步版本)"""
state_output = ""
# 状态转换:未开始 -> 思考中
if self.thinking == -1 and delta.get("reasoning_content"):
self.thinking = 0
await self.emit_status("🧐 思考中……")
state_output = "<details>\n<summary>思考过程</summary>\n"
# 状态转换:思考中 -> 已回答
elif self.thinking == 0 and not delta.get("reasoning_content"):
self.thinking = 1
await self.emit_status("", done=True)
state_output = "\n\n----------\n\n</details>\n\n"
return state_output
def _process_content(self, delta: dict) -> str:
"""处理内容片段"""
reasoning = delta.get("reasoning_content")
content = delta.get("content")
# 如果 reasoning 和 content 都为空,返回空字符串(不输出)
if not reasoning and not content:
return ""
# 优先返回 reasoning,其次返回 content
return str(reasoning) if reasoning else str(content)
def _format_error(self, status_code: int, error: bytes) -> str:
"""格式化错误消息"""
try:
err_msg = json.loads(error).get("message", "未知错误")
except:
err_msg = error.decode(errors="ignore")[:200]
return json.dumps(
{"error": f"HTTP {status_code}: {err_msg}"}, ensure_ascii=False
)
def _format_exception(self, e: Exception) -> str:
"""格式化异常信息"""
err_type = type(e).__name__
return json.dumps({"error": f"{err_type}: {str(e)}"}, ensure_ascii=False)
- 4
- 5
-
分享