ccruiの博客

ccruiの博客

openwebui 显示 R1 思维链 [更新硅基流动支持]

143
2025-01-23

v1.2.5更新 修复可能出现的思考吃字问题
v1.2.4更新 修复Hyperbolic渠道</think>后吞字的问题
v1.2.3更新 增加对api请求模型的设置
v1.2.2更新 支持硅基流动的API返回

现在官方API一直属于崩溃状态,建议使用 硅基流动API

最近在 openwebui 中使用 R1 模型,因为看不到思维过程,总觉得难受,网上找上一圈也没有发现有现成的函数可以实现,于是趁着 上班摸鱼 无聊,手搓了一个 openwebui 函数 实现了思维链的查看。

并且根据DeepSeek官方文档的要求,上次回复的思维链内容,不会参与之后的对话请求。

6791ce7e2ebcf.webp

效果

6791ce8c261ff.png

使用方式

https://openwebui.com/f/zgccrui/deepseek_r1/

函数
"""
title: DeepSeek R1
author: zgccrui
description: 在OpwenWebUI中显示DeepSeek R1模型的思维链 - 仅支持0.5.6及以上版本
version: 1.2.5
licence: MIT
"""

import json
import httpx
import re
from typing import AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field
import asyncio


class Pipe:
    class Valves(BaseModel):
        DEEPSEEK_API_BASE_URL: str = Field(
            default="https://api.deepseek.com/v1",
            description="DeepSeek API的基础请求地址",
        )
        DEEPSEEK_API_KEY: str = Field(
            default="", description="用于身份验证的DeepSeek API密钥,可从控制台获取"
        )
        DEEPSEEK_API_MODEL: str = Field(
            default="deepseek-reasoner",
            description="API请求的模型名称,默认为 deepseek-reasoner ",
        )

    def __init__(self):
        self.valves = self.Valves()
        self.data_prefix = "data: "
        self.thinking = -1  # -1:未开始 0:思考中 1:已回答
        self.emitter = None

    def pipes(self):
        return [
            {
                "id": self.valves.DEEPSEEK_API_MODEL,
                "name": self.valves.DEEPSEEK_API_MODEL,
            }
        ]

    async def pipe(
        self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None
    ) -> AsyncGenerator[str, None]:
        """主处理管道(已移除缓冲)"""
        self.thinking = -1
        self.emitter = __event_emitter__

        # 验证配置
        if not self.valves.DEEPSEEK_API_KEY:
            yield json.dumps({"error": "未配置API密钥"}, ensure_ascii=False)
            return

        # 准备请求参数
        headers = {
            "Authorization": f"Bearer {self.valves.DEEPSEEK_API_KEY}",
            "Content-Type": "application/json",
        }

        try:
            # 模型ID提取
            model_id = body["model"].split(".", 1)[-1]
            payload = {**body, "model": model_id}

            # 处理消息以防止连续的相同角色
            messages = payload["messages"]
            i = 0
            while i < len(messages) - 1:
                if messages[i]["role"] == messages[i + 1]["role"]:
                    # 插入具有替代角色的占位符消息
                    alternate_role = (
                        "assistant" if messages[i]["role"] == "user" else "user"
                    )
                    messages.insert(
                        i + 1,
                        {"role": alternate_role, "content": "[Unfinished thinking]"},
                    )
                i += 1

            # yield json.dumps(payload, ensure_ascii=False)

            # 发起API请求
            async with httpx.AsyncClient(http2=True) as client:
                async with client.stream(
                    "POST",
                    f"{self.valves.DEEPSEEK_API_BASE_URL}/chat/completions",
                    json=payload,
                    headers=headers,
                    timeout=300,
                ) as response:
                    # 错误处理
                    if response.status_code != 200:
                        error = await response.aread()
                        yield self._format_error(response.status_code, error)
                        return

                    # 流式处理响应
                    async for line in response.aiter_lines():
                        if not line.startswith(self.data_prefix):
                            continue

                        data = json.loads(line[len(self.data_prefix) :])
                        choice = data.get("choices", [{}])[0]

                        # 结束条件判断
                        if choice.get("finish_reason"):
                            return

                        # 状态机处理
                        state_output = await self._update_thinking_state(
                            choice.get("delta", {})
                        )
                        if state_output:
                            yield state_output  # 直接发送状态标记
                            if state_output == "<think>":
                                yield "\n"

                        # 内容处理并立即发送
                        content = self._process_content(choice["delta"])
                        if content:
                            if content.startswith("<think>"):
                                match = re.match(r"^<think>", content)
                                if match:
                                    content = re.sub(r"^<think>", "", content)
                                    yield "<think>"
                                    await asyncio.sleep(0.1)
                                    yield "\n"

                            elif content.startswith("</think>"):
                                match = re.match(r"^</think>", content)
                                if match:
                                    content = re.sub(r"^</think>", "", content)
                                    yield "</think>"
                                    await asyncio.sleep(0.1)
                                    yield "\n"
                            yield content

        except Exception as e:
            yield self._format_exception(e)

    async def _update_thinking_state(self, delta: dict) -> str:
        """更新思考状态机(简化版)"""
        state_output = ""

        # 状态转换:未开始 -> 思考中
        if self.thinking == -1 and delta.get("reasoning_content"):
            self.thinking = 0
            state_output = "<think>"

        # 状态转换:思考中 -> 已回答
        elif self.thinking == 0 and not delta.get("reasoning_content"):
            self.thinking = 1
            state_output = "\n</think>\n\n"

        return state_output

    def _process_content(self, delta: dict) -> str:
        """直接返回处理后的内容"""
        return delta.get("reasoning_content", "") or delta.get("content", "")

    def _format_error(self, status_code: int, error: bytes) -> str:
        """错误格式化保持不变"""
        try:
            err_msg = json.loads(error).get("message", error.decode(errors="ignore"))[
                :200
            ]
        except:
            err_msg = error.decode(errors="ignore")[:200]
        return json.dumps(
            {"error": f"HTTP {status_code}: {err_msg}"}, ensure_ascii=False
        )

    def _format_exception(self, e: Exception) -> str:
        """异常格式化保持不变"""
        err_type = type(e).__name__
        return json.dumps({"error": f"{err_type}: {str(e)}"}, ensure_ascii=False)
函数 - 低于0.5.6版本
"""
title: DeepSeek R1 (Optimized and Fixed)
author: zgccrui
description: 在OpwenWebUI中显示DeepSeek R1模型的思维链
version: 1.1.2
licence: MIT
"""

import json
import httpx
import re
from typing import AsyncGenerator, Callable, Awaitable
from pydantic import BaseModel, Field


class Pipe:
    class Valves(BaseModel):
        DEEPSEEK_API_BASE_URL: str = Field(
            default="https://api.deepseek.com/v1",
            description="DeepSeek API的基础请求地址",
        )
        DEEPSEEK_API_KEY: str = Field(
            default="", description="用于身份验证的DeepSeek API密钥,可从控制台获取"
        )

    def __init__(self):
        self.valves = self.Valves()
        self.clean_pattern = re.compile(r"<details>.*?</details>\n\n", flags=re.DOTALL)
        self.data_prefix = "data: "
        self.thinking = -1  # -1:未开始 0:思考中 1:已回答
        self.emitter = None
        self.buffer_size = 3  # 响应缓冲阈值

    def pipes(self):
        return [{"id": "deepseek-reasoner", "name": "deepseek-reasoner"}]

    async def emit_status(self, message: str = "", done: bool = False):
        """状态通知发射器"""
        if self.emitter:
            await self.emitter(
                {"type": "status", "data": {"description": message, "done": done}}
            )

    async def pipe(
        self, body: dict, __event_emitter__: Callable[[dict], Awaitable[None]] = None
    ) -> AsyncGenerator[str, None]:
        """主处理管道"""
        self.thinking = -1
        self.emitter = __event_emitter__
        response_buffer = []

        # 验证配置
        if not self.valves.DEEPSEEK_API_KEY:
            yield json.dumps({"error": "未配置API密钥"}, ensure_ascii=False)
            return

        # 准备请求参数
        headers = {
            "Authorization": f"Bearer {self.valves.DEEPSEEK_API_KEY}",
            "Content-Type": "application/json",
        }

        try:
            # 模型ID提取
            model_id = body["model"].split(".", 1)[-1]
            payload = {**body, "model": model_id}

            # 消息内容清理
            for msg in payload["messages"]:
                if msg.get("role") == "assistant":
                    msg["content"] = self.clean_pattern.sub("", msg["content"]).strip()

            # 发起API请求
            async with httpx.AsyncClient(http2=True) as client:
                async with client.stream(
                    "POST",
                    f"{self.valves.DEEPSEEK_API_BASE_URL}/chat/completions",
                    json=payload,
                    headers=headers,
                    timeout=30,
                ) as response:
                    # 错误处理
                    if response.status_code != 200:
                        error = await response.aread()
                        yield self._format_error(response.status_code, error)
                        return

                    # 流式处理响应
                    async for line in response.aiter_lines():
                        if not line.startswith(self.data_prefix):
                            continue

                        data = json.loads(line[len(self.data_prefix) :])
                        choice = data.get("choices", [{}])[0]

                        # 结束条件判断
                        if choice.get("finish_reason"):
                            if response_buffer:
                                yield "".join(response_buffer)
                            return

                        # 状态机处理
                        state_output = await self._update_thinking_state(
                            choice.get("delta", {})
                        )
                        if state_output:
                            response_buffer.append(state_output)

                        # 内容处理
                        content = self._process_content(choice["delta"])
                        if content:  # 仅当 content 不为空时添加到缓冲区
                            response_buffer.append(content)

                        # 缓冲控制
                        if len(response_buffer) >= self.buffer_size or "\n" in content:
                            yield "".join(response_buffer)
                            response_buffer.clear()

                    # 清理剩余缓冲
                    if response_buffer:
                        yield "".join(response_buffer)

        except Exception as e:
            yield self._format_exception(e)

    async def _update_thinking_state(self, delta: dict) -> str:
        """更新思考状态机(异步版本)"""
        state_output = ""

        # 状态转换:未开始 -> 思考中
        if self.thinking == -1 and delta.get("reasoning_content"):
            self.thinking = 0
            await self.emit_status("🧐 思考中……")
            state_output = "<details>\n<summary>思考过程</summary>\n"

        # 状态转换:思考中 -> 已回答
        elif self.thinking == 0 and not delta.get("reasoning_content"):
            self.thinking = 1
            await self.emit_status("", done=True)
            state_output = "\n\n----------\n\n</details>\n\n"

        return state_output

    def _process_content(self, delta: dict) -> str:
        """处理内容片段"""
        reasoning = delta.get("reasoning_content")
        content = delta.get("content")

        # 如果 reasoning 和 content 都为空,返回空字符串(不输出)
        if not reasoning and not content:
            return ""

        # 优先返回 reasoning,其次返回 content
        return str(reasoning) if reasoning else str(content)

    def _format_error(self, status_code: int, error: bytes) -> str:
        """格式化错误消息"""
        try:
            err_msg = json.loads(error).get("message", "未知错误")
        except:
            err_msg = error.decode(errors="ignore")[:200]
        return json.dumps(
            {"error": f"HTTP {status_code}: {err_msg}"}, ensure_ascii=False
        )

    def _format_exception(self, e: Exception) -> str:
        """格式化异常信息"""
        err_type = type(e).__name__
        return json.dumps({"error": f"{err_type}: {str(e)}"}, ensure_ascii=False)