diff --git a/src/langbot/libs/weknora_api/v1/__init__.py b/src/langbot/libs/weknora_api/v1/__init__.py new file mode 100644 index 00000000..23926101 --- /dev/null +++ b/src/langbot/libs/weknora_api/v1/__init__.py @@ -0,0 +1,4 @@ +from .client import AsyncWeKnoraClient +from .errors import WeKnoraAPIError + +__all__ = ['AsyncWeKnoraClient', 'WeKnoraAPIError'] diff --git a/src/langbot/libs/weknora_api/v1/client.py b/src/langbot/libs/weknora_api/v1/client.py new file mode 100644 index 00000000..ee7c5d9d --- /dev/null +++ b/src/langbot/libs/weknora_api/v1/client.py @@ -0,0 +1,172 @@ +from __future__ import annotations + +import httpx +import typing +import json + +from .errors import WeKnoraAPIError + + +class AsyncWeKnoraClient: + """WeKnora API 客户端""" + + api_key: str + base_url: str + + def __init__( + self, + api_key: str, + base_url: str = 'http://localhost:80/api/v1', + ) -> None: + self.api_key = api_key + self.base_url = base_url + + async def create_session( + self, + title: str = '', + description: str = '', + timeout: float = 30.0, + ) -> str: + """创建会话,返回 session_id""" + async with httpx.AsyncClient( + base_url=self.base_url, + trust_env=True, + timeout=timeout, + ) as client: + payload: dict[str, typing.Any] = {} + if title: + payload['title'] = title + if description: + payload['description'] = description + + response = await client.post( + '/sessions', + headers={ + 'X-API-Key': self.api_key, + 'Content-Type': 'application/json', + }, + json=payload, + ) + + if response.status_code not in (200, 201): + raise WeKnoraAPIError(f'{response.status_code} {response.text}') + + data = response.json() + return data['data']['id'] + + async def agent_chat( + self, + session_id: str, + query: str, + user: str, + agent_id: str = '', + knowledge_base_ids: list[str] | None = None, + web_search_enabled: bool = False, + timeout: float = 120.0, + ) -> typing.AsyncGenerator[dict[str, typing.Any], None]: + """ + Agent 智能对话(SSE 流式) + + 响应事件类型: + - agent_query: Agent 开始处理 + - thinking: 思考过程 + - tool_call: 工具调用 + - tool_result: 工具结果 + - references: 知识库引用 + - answer: 回答内容 + - reflection: 反思 + - session_title: 会话标题 + - error: 错误 + """ + if knowledge_base_ids is None: + knowledge_base_ids = [] + + async with httpx.AsyncClient( + base_url=self.base_url, + trust_env=True, + timeout=timeout, + ) as client: + payload: dict[str, typing.Any] = { + 'query': query, + 'agent_enabled': True, + 'channel': 'im', + } + if agent_id: + payload['agent_id'] = agent_id + if knowledge_base_ids: + payload['knowledge_base_ids'] = knowledge_base_ids + if web_search_enabled: + payload['web_search_enabled'] = True + + async with client.stream( + 'POST', + f'/agent-chat/{session_id}', + headers={ + 'X-API-Key': self.api_key, + 'Content-Type': 'application/json', + }, + json=payload, + ) as r: + async for chunk in r.aiter_lines(): + if r.status_code != 200: + raise WeKnoraAPIError(f'{r.status_code} {chunk}') + if chunk.strip() == '': + continue + if chunk.startswith('data:'): + try: + yield json.loads(chunk[5:].strip()) + except json.JSONDecodeError: + continue + + async def knowledge_chat( + self, + session_id: str, + query: str, + user: str, + agent_id: str = 'builtin-quick-answer', + knowledge_base_ids: list[str] | None = None, + timeout: float = 120.0, + ) -> typing.AsyncGenerator[dict[str, typing.Any], None]: + """ + 知识库 RAG 问答(SSE 流式) + + 响应事件类型: + - references: 知识库引用 + - answer: 回答内容 + """ + if knowledge_base_ids is None: + knowledge_base_ids = [] + + async with httpx.AsyncClient( + base_url=self.base_url, + trust_env=True, + timeout=timeout, + ) as client: + payload: dict[str, typing.Any] = { + 'query': query, + 'channel': 'im', + } + if agent_id: + payload['agent_id'] = agent_id + if knowledge_base_ids: + payload['knowledge_base_ids'] = knowledge_base_ids + + async with client.stream( + 'POST', + f'/knowledge-chat/{session_id}', + headers={ + 'X-API-Key': self.api_key, + 'Content-Type': 'application/json', + }, + json=payload, + ) as r: + async for chunk in r.aiter_lines(): + if r.status_code != 200: + raise WeKnoraAPIError(f'{r.status_code} {chunk}') + if chunk.strip() == '': + continue + if chunk.startswith('data:'): + try: + yield json.loads(chunk[5:].strip()) + except json.JSONDecodeError: + continue diff --git a/src/langbot/libs/weknora_api/v1/errors.py b/src/langbot/libs/weknora_api/v1/errors.py new file mode 100644 index 00000000..c43b724c --- /dev/null +++ b/src/langbot/libs/weknora_api/v1/errors.py @@ -0,0 +1,6 @@ +class WeKnoraAPIError(Exception): + """WeKnora API 请求失败""" + + def __init__(self, message: str = ''): + self.message = message + super().__init__(self.message) diff --git a/src/langbot/pkg/core/migrations/m042_weknora_api.py b/src/langbot/pkg/core/migrations/m042_weknora_api.py new file mode 100644 index 00000000..c3a5bf78 --- /dev/null +++ b/src/langbot/pkg/core/migrations/m042_weknora_api.py @@ -0,0 +1,27 @@ +from __future__ import annotations + +from .. import migration + + +@migration.migration_class('weknora-api-config', 42) +class WeKnoraAPICfgMigration(migration.Migration): + """WeKnora API 配置迁移""" + + async def need_migrate(self) -> bool: + """判断当前环境是否需要运行此迁移""" + return 'weknora-api' not in self.ap.provider_cfg.data + + async def run(self): + """执行迁移""" + self.ap.provider_cfg.data['weknora-api'] = { + 'base-url': 'http://localhost:8080/api/v1', + 'app-type': 'agent', + 'api-key': '', + 'agent-id': 'builtin-smart-reasoning', + 'knowledge-base-ids': [], + 'web-search-enabled': False, + 'timeout': 120, + 'base-prompt': '请回答用户的问题。', + } + + await self.ap.provider_cfg.dump_config() diff --git a/src/langbot/pkg/provider/runners/weknoraapi.py b/src/langbot/pkg/provider/runners/weknoraapi.py new file mode 100644 index 00000000..d5469986 --- /dev/null +++ b/src/langbot/pkg/provider/runners/weknoraapi.py @@ -0,0 +1,360 @@ +from __future__ import annotations + +import typing +import json + + +from langbot.pkg.provider import runner +from langbot.pkg.core import app +import langbot_plugin.api.entities.builtin.provider.message as provider_message +import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query +from langbot.libs.weknora_api.v1 import client, errors + + +@runner.runner_class('weknora-api') +class WeKnoraAPIRunner(runner.RequestRunner): + """WeKnora API 对话请求器""" + + weknora_client: client.AsyncWeKnoraClient + + def __init__(self, ap: app.Application, pipeline_config: dict): + self.ap = ap + self.pipeline_config = pipeline_config + + valid_app_types = ['chat', 'agent'] + if self.pipeline_config['ai']['weknora-api']['app-type'] not in valid_app_types: + raise errors.WeKnoraAPIError( + f'不支持的 WeKnora 应用类型: {self.pipeline_config["ai"]["weknora-api"]["app-type"]}' + ) + + api_key = self.pipeline_config['ai']['weknora-api'].get('api-key', '').strip() + if not api_key: + raise errors.WeKnoraAPIError( + 'WeKnora API Key 未配置,请在流水线的 WeKnora API 配置中填入 API Key ' + '(从 WeKnora 前端 设置 → API Keys 生成)' + ) + + base_url = self.pipeline_config['ai']['weknora-api'].get('base-url', '').strip() + if not base_url: + raise errors.WeKnoraAPIError( + 'WeKnora Base URL 未配置,请填入服务器地址,例如 http://localhost:8080/api/v1' + ) + + self.weknora_client = client.AsyncWeKnoraClient( + api_key=api_key, + base_url=base_url, + ) + + async def _extract_plain_text(self, query: pipeline_query.Query) -> str: + """从用户消息中提取纯文本内容""" + plain_text = '' + if isinstance(query.user_message.content, str): + plain_text = query.user_message.content + elif isinstance(query.user_message.content, list): + for ce in query.user_message.content: + if ce.type == 'text': + plain_text += ce.text + + if not plain_text: + plain_text = self.pipeline_config['ai']['weknora-api'].get('base-prompt', '') + + return plain_text + + async def _ensure_session(self, query: pipeline_query.Query) -> str: + """确保会话存在,如果不存在则创建""" + session_id = query.session.using_conversation.uuid or '' + + if not session_id: + user_tag = f'{query.session.launcher_type.value}_{query.session.launcher_id}' + session_id = await self.weknora_client.create_session( + title=f'IM Chat - {user_tag}' + ) + query.session.using_conversation.uuid = session_id + + return session_id + + async def _agent_chat_messages( + self, query: pipeline_query.Query + ) -> typing.AsyncGenerator[provider_message.Message, None]: + """调用 Agent 智能对话(非流式聚合输出)""" + session_id = await self._ensure_session(query) + plain_text = await self._extract_plain_text(query) + user_tag = f'{query.session.launcher_type.value}_{query.session.launcher_id}' + + config = self.pipeline_config['ai']['weknora-api'] + agent_id = config.get('agent-id', 'builtin-smart-reasoning') + knowledge_base_ids = config.get('knowledge-base-ids', []) + web_search_enabled = config.get('web-search-enabled', False) + timeout = config.get('timeout', 120) + + full_answer = '' + chunk = None + + async for chunk in self.weknora_client.agent_chat( + session_id=session_id, + query=plain_text, + user=user_tag, + agent_id=agent_id, + knowledge_base_ids=knowledge_base_ids, + web_search_enabled=web_search_enabled, + timeout=timeout, + ): + self.ap.logger.debug('weknora-agent-chunk: ' + str(chunk)) + + response_type = chunk.get('response_type', '') + content = chunk.get('content', '') + + if response_type == 'tool_call': + # 工具调用 + tool_data = chunk.get('data', {}) + tool_name = tool_data.get('tool_name', '') + if tool_name: + yield provider_message.Message( + role='assistant', + tool_calls=[ + provider_message.ToolCall( + id=chunk.get('id', ''), + type='function', + function=provider_message.FunctionCall( + name=tool_name, + arguments=json.dumps(tool_data.get('arguments', {})), + ), + ) + ], + ) + + elif response_type == 'answer': + if content: + full_answer += content + + elif response_type == 'error': + raise errors.WeKnoraAPIError(f'WeKnora 服务错误: {content}') + + if chunk is None: + raise errors.WeKnoraAPIError('WeKnora API 没有返回任何响应,请检查网络连接和API配置') + + if full_answer: + yield provider_message.Message( + role='assistant', + content=full_answer, + ) + + async def _chat_messages( + self, query: pipeline_query.Query + ) -> typing.AsyncGenerator[provider_message.Message, None]: + """调用知识库 RAG 问答(非流式聚合输出)""" + session_id = await self._ensure_session(query) + plain_text = await self._extract_plain_text(query) + user_tag = f'{query.session.launcher_type.value}_{query.session.launcher_id}' + + config = self.pipeline_config['ai']['weknora-api'] + agent_id = config.get('agent-id', 'builtin-quick-answer') + knowledge_base_ids = config.get('knowledge-base-ids', []) + timeout = config.get('timeout', 120) + + full_answer = '' + chunk = None + + async for chunk in self.weknora_client.knowledge_chat( + session_id=session_id, + query=plain_text, + user=user_tag, + agent_id=agent_id, + knowledge_base_ids=knowledge_base_ids, + timeout=timeout, + ): + self.ap.logger.debug('weknora-chat-chunk: ' + str(chunk)) + + response_type = chunk.get('response_type', '') + content = chunk.get('content', '') + + if response_type == 'answer': + if content: + full_answer += content + + elif response_type == 'error': + raise errors.WeKnoraAPIError(f'WeKnora 服务错误: {content}') + + if chunk is None: + raise errors.WeKnoraAPIError('WeKnora API 没有返回任何响应,请检查网络连接和API配置') + + if full_answer: + yield provider_message.Message( + role='assistant', + content=full_answer, + ) + + async def _agent_chat_messages_chunk( + self, query: pipeline_query.Query + ) -> typing.AsyncGenerator[provider_message.MessageChunk, None]: + """调用 Agent 智能对话(流式输出)""" + session_id = await self._ensure_session(query) + plain_text = await self._extract_plain_text(query) + user_tag = f'{query.session.launcher_type.value}_{query.session.launcher_id}' + + config = self.pipeline_config['ai']['weknora-api'] + agent_id = config.get('agent-id', 'builtin-smart-reasoning') + knowledge_base_ids = config.get('knowledge-base-ids', []) + web_search_enabled = config.get('web-search-enabled', False) + timeout = config.get('timeout', 120) + + pending_answer = '' + message_idx = 0 + is_final = False + chunk = None + + async for chunk in self.weknora_client.agent_chat( + session_id=session_id, + query=plain_text, + user=user_tag, + agent_id=agent_id, + knowledge_base_ids=knowledge_base_ids, + web_search_enabled=web_search_enabled, + timeout=timeout, + ): + self.ap.logger.debug('weknora-agent-chunk: ' + str(chunk)) + + response_type = chunk.get('response_type', '') + content = chunk.get('content', '') + done = chunk.get('done', False) + + if response_type == 'tool_call': + tool_data = chunk.get('data', {}) + tool_name = tool_data.get('tool_name', '') + if tool_name: + message_idx += 1 + yield provider_message.MessageChunk( + role='assistant', + tool_calls=[ + provider_message.ToolCall( + id=chunk.get('id', ''), + type='function', + function=provider_message.FunctionCall( + name=tool_name, + arguments=json.dumps(tool_data.get('arguments', {})), + ), + ) + ], + ) + + elif response_type == 'answer': + message_idx += 1 + if content: + pending_answer += content + + if done: + is_final = True + + # 每 8 个 chunk 输出一次,或最终输出 + if message_idx % 8 == 0 or is_final: + yield provider_message.MessageChunk( + role='assistant', + content=pending_answer, + is_final=is_final, + ) + + elif response_type == 'error': + raise errors.WeKnoraAPIError(f'WeKnora 服务错误: {content}') + + if chunk is None: + raise errors.WeKnoraAPIError('WeKnora API 没有返回任何响应,请检查网络连接和API配置') + + # 确保最终消息已发出 + if not is_final and pending_answer: + yield provider_message.MessageChunk( + role='assistant', + content=pending_answer, + is_final=True, + ) + + async def _chat_messages_chunk( + self, query: pipeline_query.Query + ) -> typing.AsyncGenerator[provider_message.MessageChunk, None]: + """调用知识库 RAG 问答(流式输出)""" + session_id = await self._ensure_session(query) + plain_text = await self._extract_plain_text(query) + user_tag = f'{query.session.launcher_type.value}_{query.session.launcher_id}' + + config = self.pipeline_config['ai']['weknora-api'] + agent_id = config.get('agent-id', 'builtin-quick-answer') + knowledge_base_ids = config.get('knowledge-base-ids', []) + timeout = config.get('timeout', 120) + + pending_answer = '' + message_idx = 0 + is_final = False + chunk = None + + async for chunk in self.weknora_client.knowledge_chat( + session_id=session_id, + query=plain_text, + user=user_tag, + agent_id=agent_id, + knowledge_base_ids=knowledge_base_ids, + timeout=timeout, + ): + self.ap.logger.debug('weknora-chat-chunk: ' + str(chunk)) + + response_type = chunk.get('response_type', '') + content = chunk.get('content', '') + done = chunk.get('done', False) + + if response_type == 'answer': + message_idx += 1 + if content: + pending_answer += content + + if done: + is_final = True + + if message_idx % 8 == 0 or is_final: + yield provider_message.MessageChunk( + role='assistant', + content=pending_answer, + is_final=is_final, + ) + + elif response_type == 'error': + raise errors.WeKnoraAPIError(f'WeKnora 服务错误: {content}') + + if chunk is None: + raise errors.WeKnoraAPIError('WeKnora API 没有返回任何响应,请检查网络连接和API配置') + + if not is_final and pending_answer: + yield provider_message.MessageChunk( + role='assistant', + content=pending_answer, + is_final=True, + ) + + async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]: + """运行请求""" + app_type = self.pipeline_config['ai']['weknora-api']['app-type'] + + if await query.adapter.is_stream_output_supported(): + msg_idx = 0 + if app_type == 'agent': + async for msg in self._agent_chat_messages_chunk(query): + msg_idx += 1 + msg.msg_sequence = msg_idx + yield msg + elif app_type == 'chat': + async for msg in self._chat_messages_chunk(query): + msg_idx += 1 + msg.msg_sequence = msg_idx + yield msg + else: + raise errors.WeKnoraAPIError( + f'不支持的 WeKnora 应用类型: {app_type}' + ) + else: + if app_type == 'agent': + async for msg in self._agent_chat_messages(query): + yield msg + elif app_type == 'chat': + async for msg in self._chat_messages(query): + yield msg + else: + raise errors.WeKnoraAPIError( + f'不支持的 WeKnora 应用类型: {app_type}' + ) diff --git a/src/langbot/templates/metadata/pipeline/ai.yaml b/src/langbot/templates/metadata/pipeline/ai.yaml index 32f4115f..1facc60f 100644 --- a/src/langbot/templates/metadata/pipeline/ai.yaml +++ b/src/langbot/templates/metadata/pipeline/ai.yaml @@ -47,6 +47,10 @@ stages: label: en_US: Langflow API zh_Hans: Langflow API + - name: weknora-api + label: + en_US: WeKnora API + zh_Hans: WeKnora API - name: expire-time label: en_US: Conversation expire time (seconds) @@ -653,3 +657,97 @@ stages: type: json required: false default: '{}' + - name: weknora-api + label: + en_US: WeKnora API + zh_Hans: WeKnora API + description: + en_US: Configure the WeKnora API of the pipeline + zh_Hans: 配置 WeKnora API + config: + - name: base-url + label: + en_US: Base URL + zh_Hans: 基础 URL + description: + en_US: The base URL of the WeKnora server (with /api/v1) + zh_Hans: WeKnora 服务器的基础 URL(包含 /api/v1) + type: string + required: true + default: 'http://localhost:8080/api/v1' + - name: api-key + label: + en_US: API Key + zh_Hans: API 密钥 + description: + en_US: The API key for WeKnora, generated from WeKnora frontend Settings → API Keys + zh_Hans: WeKnora 的 API 密钥,从 WeKnora 前端 设置 → API Keys 生成 + type: string + required: true + default: '' + - name: app-type + label: + en_US: App Type + zh_Hans: 应用类型 + type: select + required: true + default: agent + options: + - name: agent + label: + en_US: Agent (Smart Reasoning) + zh_Hans: Agent(智能推理) + - name: chat + label: + en_US: Chat (Knowledge Base RAG) + zh_Hans: 聊天(知识库 RAG) + - name: agent-id + label: + en_US: Agent ID + zh_Hans: 智能体 ID + description: + en_US: The Agent ID to use. Built-in agents include builtin-quick-answer, builtin-smart-reasoning, builtin-data-analyst + zh_Hans: 要使用的智能体 ID。内置智能体:builtin-quick-answer、builtin-smart-reasoning、builtin-data-analyst + type: string + required: true + default: 'builtin-smart-reasoning' + - name: knowledge-base-ids + label: + en_US: Knowledge Base IDs + zh_Hans: 知识库 ID 列表 + description: + en_US: List of WeKnora knowledge base IDs to use (one per line) + zh_Hans: 要使用的 WeKnora 知识库 ID 列表(每行一个) + type: array + required: false + default: [] + - name: web-search-enabled + label: + en_US: Enable Web Search + zh_Hans: 启用网络搜索 + description: + en_US: Whether to enable web search in agent mode + zh_Hans: 在 Agent 模式下是否启用网络搜索 + type: boolean + required: false + default: false + - name: timeout + label: + en_US: Timeout + zh_Hans: 超时时间 + description: + en_US: Request timeout in seconds + zh_Hans: 请求超时时间(秒) + type: integer + required: false + default: 120 + - name: base-prompt + label: + en_US: Base Prompt + zh_Hans: 基础提示词 + description: + en_US: Default prompt when user message is empty (e.g. only images) + zh_Hans: 当用户消息为空(例如仅图片)时使用的默认提示词 + type: string + required: false + default: '请回答用户的问题。'