This commit is contained in:
Typer_Body
2026-06-07 01:08:39 +08:00
parent e5b3cced1f
commit 59f20bcc73
6 changed files with 667 additions and 0 deletions

View File

@@ -0,0 +1,4 @@
from .client import AsyncWeKnoraClient
from .errors import WeKnoraAPIError
__all__ = ['AsyncWeKnoraClient', 'WeKnoraAPIError']

View File

@@ -0,0 +1,172 @@
from __future__ import annotations
import httpx
import typing
import json
from .errors import WeKnoraAPIError
class AsyncWeKnoraClient:
"""WeKnora API 客户端"""
api_key: str
base_url: str
def __init__(
self,
api_key: str,
base_url: str = 'http://localhost:80/api/v1',
) -> None:
self.api_key = api_key
self.base_url = base_url
async def create_session(
self,
title: str = '',
description: str = '',
timeout: float = 30.0,
) -> str:
"""创建会话,返回 session_id"""
async with httpx.AsyncClient(
base_url=self.base_url,
trust_env=True,
timeout=timeout,
) as client:
payload: dict[str, typing.Any] = {}
if title:
payload['title'] = title
if description:
payload['description'] = description
response = await client.post(
'/sessions',
headers={
'X-API-Key': self.api_key,
'Content-Type': 'application/json',
},
json=payload,
)
if response.status_code not in (200, 201):
raise WeKnoraAPIError(f'{response.status_code} {response.text}')
data = response.json()
return data['data']['id']
async def agent_chat(
self,
session_id: str,
query: str,
user: str,
agent_id: str = '',
knowledge_base_ids: list[str] | None = None,
web_search_enabled: bool = False,
timeout: float = 120.0,
) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
"""
Agent 智能对话SSE 流式)
响应事件类型:
- agent_query: Agent 开始处理
- thinking: 思考过程
- tool_call: 工具调用
- tool_result: 工具结果
- references: 知识库引用
- answer: 回答内容
- reflection: 反思
- session_title: 会话标题
- error: 错误
"""
if knowledge_base_ids is None:
knowledge_base_ids = []
async with httpx.AsyncClient(
base_url=self.base_url,
trust_env=True,
timeout=timeout,
) as client:
payload: dict[str, typing.Any] = {
'query': query,
'agent_enabled': True,
'channel': 'im',
}
if agent_id:
payload['agent_id'] = agent_id
if knowledge_base_ids:
payload['knowledge_base_ids'] = knowledge_base_ids
if web_search_enabled:
payload['web_search_enabled'] = True
async with client.stream(
'POST',
f'/agent-chat/{session_id}',
headers={
'X-API-Key': self.api_key,
'Content-Type': 'application/json',
},
json=payload,
) as r:
async for chunk in r.aiter_lines():
if r.status_code != 200:
raise WeKnoraAPIError(f'{r.status_code} {chunk}')
if chunk.strip() == '':
continue
if chunk.startswith('data:'):
try:
yield json.loads(chunk[5:].strip())
except json.JSONDecodeError:
continue
async def knowledge_chat(
self,
session_id: str,
query: str,
user: str,
agent_id: str = 'builtin-quick-answer',
knowledge_base_ids: list[str] | None = None,
timeout: float = 120.0,
) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
"""
知识库 RAG 问答SSE 流式)
响应事件类型:
- references: 知识库引用
- answer: 回答内容
"""
if knowledge_base_ids is None:
knowledge_base_ids = []
async with httpx.AsyncClient(
base_url=self.base_url,
trust_env=True,
timeout=timeout,
) as client:
payload: dict[str, typing.Any] = {
'query': query,
'channel': 'im',
}
if agent_id:
payload['agent_id'] = agent_id
if knowledge_base_ids:
payload['knowledge_base_ids'] = knowledge_base_ids
async with client.stream(
'POST',
f'/knowledge-chat/{session_id}',
headers={
'X-API-Key': self.api_key,
'Content-Type': 'application/json',
},
json=payload,
) as r:
async for chunk in r.aiter_lines():
if r.status_code != 200:
raise WeKnoraAPIError(f'{r.status_code} {chunk}')
if chunk.strip() == '':
continue
if chunk.startswith('data:'):
try:
yield json.loads(chunk[5:].strip())
except json.JSONDecodeError:
continue

View File

@@ -0,0 +1,6 @@
class WeKnoraAPIError(Exception):
"""WeKnora API 请求失败"""
def __init__(self, message: str = ''):
self.message = message
super().__init__(self.message)

View File

@@ -0,0 +1,27 @@
from __future__ import annotations
from .. import migration
@migration.migration_class('weknora-api-config', 42)
class WeKnoraAPICfgMigration(migration.Migration):
"""WeKnora API 配置迁移"""
async def need_migrate(self) -> bool:
"""判断当前环境是否需要运行此迁移"""
return 'weknora-api' not in self.ap.provider_cfg.data
async def run(self):
"""执行迁移"""
self.ap.provider_cfg.data['weknora-api'] = {
'base-url': 'http://localhost:8080/api/v1',
'app-type': 'agent',
'api-key': '',
'agent-id': 'builtin-smart-reasoning',
'knowledge-base-ids': [],
'web-search-enabled': False,
'timeout': 120,
'base-prompt': '请回答用户的问题。',
}
await self.ap.provider_cfg.dump_config()

View File

@@ -0,0 +1,360 @@
from __future__ import annotations
import typing
import json
from langbot.pkg.provider import runner
from langbot.pkg.core import app
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
from langbot.libs.weknora_api.v1 import client, errors
@runner.runner_class('weknora-api')
class WeKnoraAPIRunner(runner.RequestRunner):
"""WeKnora API 对话请求器"""
weknora_client: client.AsyncWeKnoraClient
def __init__(self, ap: app.Application, pipeline_config: dict):
self.ap = ap
self.pipeline_config = pipeline_config
valid_app_types = ['chat', 'agent']
if self.pipeline_config['ai']['weknora-api']['app-type'] not in valid_app_types:
raise errors.WeKnoraAPIError(
f'不支持的 WeKnora 应用类型: {self.pipeline_config["ai"]["weknora-api"]["app-type"]}'
)
api_key = self.pipeline_config['ai']['weknora-api'].get('api-key', '').strip()
if not api_key:
raise errors.WeKnoraAPIError(
'WeKnora API Key 未配置,请在流水线的 WeKnora API 配置中填入 API Key '
'(从 WeKnora 前端 设置 → API Keys 生成)'
)
base_url = self.pipeline_config['ai']['weknora-api'].get('base-url', '').strip()
if not base_url:
raise errors.WeKnoraAPIError(
'WeKnora Base URL 未配置,请填入服务器地址,例如 http://localhost:8080/api/v1'
)
self.weknora_client = client.AsyncWeKnoraClient(
api_key=api_key,
base_url=base_url,
)
async def _extract_plain_text(self, query: pipeline_query.Query) -> str:
"""从用户消息中提取纯文本内容"""
plain_text = ''
if isinstance(query.user_message.content, str):
plain_text = query.user_message.content
elif isinstance(query.user_message.content, list):
for ce in query.user_message.content:
if ce.type == 'text':
plain_text += ce.text
if not plain_text:
plain_text = self.pipeline_config['ai']['weknora-api'].get('base-prompt', '')
return plain_text
async def _ensure_session(self, query: pipeline_query.Query) -> str:
"""确保会话存在,如果不存在则创建"""
session_id = query.session.using_conversation.uuid or ''
if not session_id:
user_tag = f'{query.session.launcher_type.value}_{query.session.launcher_id}'
session_id = await self.weknora_client.create_session(
title=f'IM Chat - {user_tag}'
)
query.session.using_conversation.uuid = session_id
return session_id
async def _agent_chat_messages(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用 Agent 智能对话(非流式聚合输出)"""
session_id = await self._ensure_session(query)
plain_text = await self._extract_plain_text(query)
user_tag = f'{query.session.launcher_type.value}_{query.session.launcher_id}'
config = self.pipeline_config['ai']['weknora-api']
agent_id = config.get('agent-id', 'builtin-smart-reasoning')
knowledge_base_ids = config.get('knowledge-base-ids', [])
web_search_enabled = config.get('web-search-enabled', False)
timeout = config.get('timeout', 120)
full_answer = ''
chunk = None
async for chunk in self.weknora_client.agent_chat(
session_id=session_id,
query=plain_text,
user=user_tag,
agent_id=agent_id,
knowledge_base_ids=knowledge_base_ids,
web_search_enabled=web_search_enabled,
timeout=timeout,
):
self.ap.logger.debug('weknora-agent-chunk: ' + str(chunk))
response_type = chunk.get('response_type', '')
content = chunk.get('content', '')
if response_type == 'tool_call':
# 工具调用
tool_data = chunk.get('data', {})
tool_name = tool_data.get('tool_name', '')
if tool_name:
yield provider_message.Message(
role='assistant',
tool_calls=[
provider_message.ToolCall(
id=chunk.get('id', ''),
type='function',
function=provider_message.FunctionCall(
name=tool_name,
arguments=json.dumps(tool_data.get('arguments', {})),
),
)
],
)
elif response_type == 'answer':
if content:
full_answer += content
elif response_type == 'error':
raise errors.WeKnoraAPIError(f'WeKnora 服务错误: {content}')
if chunk is None:
raise errors.WeKnoraAPIError('WeKnora API 没有返回任何响应请检查网络连接和API配置')
if full_answer:
yield provider_message.Message(
role='assistant',
content=full_answer,
)
async def _chat_messages(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message, None]:
"""调用知识库 RAG 问答(非流式聚合输出)"""
session_id = await self._ensure_session(query)
plain_text = await self._extract_plain_text(query)
user_tag = f'{query.session.launcher_type.value}_{query.session.launcher_id}'
config = self.pipeline_config['ai']['weknora-api']
agent_id = config.get('agent-id', 'builtin-quick-answer')
knowledge_base_ids = config.get('knowledge-base-ids', [])
timeout = config.get('timeout', 120)
full_answer = ''
chunk = None
async for chunk in self.weknora_client.knowledge_chat(
session_id=session_id,
query=plain_text,
user=user_tag,
agent_id=agent_id,
knowledge_base_ids=knowledge_base_ids,
timeout=timeout,
):
self.ap.logger.debug('weknora-chat-chunk: ' + str(chunk))
response_type = chunk.get('response_type', '')
content = chunk.get('content', '')
if response_type == 'answer':
if content:
full_answer += content
elif response_type == 'error':
raise errors.WeKnoraAPIError(f'WeKnora 服务错误: {content}')
if chunk is None:
raise errors.WeKnoraAPIError('WeKnora API 没有返回任何响应请检查网络连接和API配置')
if full_answer:
yield provider_message.Message(
role='assistant',
content=full_answer,
)
async def _agent_chat_messages_chunk(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""调用 Agent 智能对话(流式输出)"""
session_id = await self._ensure_session(query)
plain_text = await self._extract_plain_text(query)
user_tag = f'{query.session.launcher_type.value}_{query.session.launcher_id}'
config = self.pipeline_config['ai']['weknora-api']
agent_id = config.get('agent-id', 'builtin-smart-reasoning')
knowledge_base_ids = config.get('knowledge-base-ids', [])
web_search_enabled = config.get('web-search-enabled', False)
timeout = config.get('timeout', 120)
pending_answer = ''
message_idx = 0
is_final = False
chunk = None
async for chunk in self.weknora_client.agent_chat(
session_id=session_id,
query=plain_text,
user=user_tag,
agent_id=agent_id,
knowledge_base_ids=knowledge_base_ids,
web_search_enabled=web_search_enabled,
timeout=timeout,
):
self.ap.logger.debug('weknora-agent-chunk: ' + str(chunk))
response_type = chunk.get('response_type', '')
content = chunk.get('content', '')
done = chunk.get('done', False)
if response_type == 'tool_call':
tool_data = chunk.get('data', {})
tool_name = tool_data.get('tool_name', '')
if tool_name:
message_idx += 1
yield provider_message.MessageChunk(
role='assistant',
tool_calls=[
provider_message.ToolCall(
id=chunk.get('id', ''),
type='function',
function=provider_message.FunctionCall(
name=tool_name,
arguments=json.dumps(tool_data.get('arguments', {})),
),
)
],
)
elif response_type == 'answer':
message_idx += 1
if content:
pending_answer += content
if done:
is_final = True
# 每 8 个 chunk 输出一次,或最终输出
if message_idx % 8 == 0 or is_final:
yield provider_message.MessageChunk(
role='assistant',
content=pending_answer,
is_final=is_final,
)
elif response_type == 'error':
raise errors.WeKnoraAPIError(f'WeKnora 服务错误: {content}')
if chunk is None:
raise errors.WeKnoraAPIError('WeKnora API 没有返回任何响应请检查网络连接和API配置')
# 确保最终消息已发出
if not is_final and pending_answer:
yield provider_message.MessageChunk(
role='assistant',
content=pending_answer,
is_final=True,
)
async def _chat_messages_chunk(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.MessageChunk, None]:
"""调用知识库 RAG 问答(流式输出)"""
session_id = await self._ensure_session(query)
plain_text = await self._extract_plain_text(query)
user_tag = f'{query.session.launcher_type.value}_{query.session.launcher_id}'
config = self.pipeline_config['ai']['weknora-api']
agent_id = config.get('agent-id', 'builtin-quick-answer')
knowledge_base_ids = config.get('knowledge-base-ids', [])
timeout = config.get('timeout', 120)
pending_answer = ''
message_idx = 0
is_final = False
chunk = None
async for chunk in self.weknora_client.knowledge_chat(
session_id=session_id,
query=plain_text,
user=user_tag,
agent_id=agent_id,
knowledge_base_ids=knowledge_base_ids,
timeout=timeout,
):
self.ap.logger.debug('weknora-chat-chunk: ' + str(chunk))
response_type = chunk.get('response_type', '')
content = chunk.get('content', '')
done = chunk.get('done', False)
if response_type == 'answer':
message_idx += 1
if content:
pending_answer += content
if done:
is_final = True
if message_idx % 8 == 0 or is_final:
yield provider_message.MessageChunk(
role='assistant',
content=pending_answer,
is_final=is_final,
)
elif response_type == 'error':
raise errors.WeKnoraAPIError(f'WeKnora 服务错误: {content}')
if chunk is None:
raise errors.WeKnoraAPIError('WeKnora API 没有返回任何响应请检查网络连接和API配置')
if not is_final and pending_answer:
yield provider_message.MessageChunk(
role='assistant',
content=pending_answer,
is_final=True,
)
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
"""运行请求"""
app_type = self.pipeline_config['ai']['weknora-api']['app-type']
if await query.adapter.is_stream_output_supported():
msg_idx = 0
if app_type == 'agent':
async for msg in self._agent_chat_messages_chunk(query):
msg_idx += 1
msg.msg_sequence = msg_idx
yield msg
elif app_type == 'chat':
async for msg in self._chat_messages_chunk(query):
msg_idx += 1
msg.msg_sequence = msg_idx
yield msg
else:
raise errors.WeKnoraAPIError(
f'不支持的 WeKnora 应用类型: {app_type}'
)
else:
if app_type == 'agent':
async for msg in self._agent_chat_messages(query):
yield msg
elif app_type == 'chat':
async for msg in self._chat_messages(query):
yield msg
else:
raise errors.WeKnoraAPIError(
f'不支持的 WeKnora 应用类型: {app_type}'
)

View File

@@ -47,6 +47,10 @@ stages:
label:
en_US: Langflow API
zh_Hans: Langflow API
- name: weknora-api
label:
en_US: WeKnora API
zh_Hans: WeKnora API
- name: expire-time
label:
en_US: Conversation expire time (seconds)
@@ -653,3 +657,97 @@ stages:
type: json
required: false
default: '{}'
- name: weknora-api
label:
en_US: WeKnora API
zh_Hans: WeKnora API
description:
en_US: Configure the WeKnora API of the pipeline
zh_Hans: 配置 WeKnora API
config:
- name: base-url
label:
en_US: Base URL
zh_Hans: 基础 URL
description:
en_US: The base URL of the WeKnora server (with /api/v1)
zh_Hans: WeKnora 服务器的基础 URL包含 /api/v1
type: string
required: true
default: 'http://localhost:8080/api/v1'
- name: api-key
label:
en_US: API Key
zh_Hans: API 密钥
description:
en_US: The API key for WeKnora, generated from WeKnora frontend Settings → API Keys
zh_Hans: WeKnora 的 API 密钥,从 WeKnora 前端 设置 → API Keys 生成
type: string
required: true
default: ''
- name: app-type
label:
en_US: App Type
zh_Hans: 应用类型
type: select
required: true
default: agent
options:
- name: agent
label:
en_US: Agent (Smart Reasoning)
zh_Hans: Agent智能推理
- name: chat
label:
en_US: Chat (Knowledge Base RAG)
zh_Hans: 聊天(知识库 RAG
- name: agent-id
label:
en_US: Agent ID
zh_Hans: 智能体 ID
description:
en_US: The Agent ID to use. Built-in agents include builtin-quick-answer, builtin-smart-reasoning, builtin-data-analyst
zh_Hans: 要使用的智能体 ID。内置智能体builtin-quick-answer、builtin-smart-reasoning、builtin-data-analyst
type: string
required: true
default: 'builtin-smart-reasoning'
- name: knowledge-base-ids
label:
en_US: Knowledge Base IDs
zh_Hans: 知识库 ID 列表
description:
en_US: List of WeKnora knowledge base IDs to use (one per line)
zh_Hans: 要使用的 WeKnora 知识库 ID 列表(每行一个)
type: array
required: false
default: []
- name: web-search-enabled
label:
en_US: Enable Web Search
zh_Hans: 启用网络搜索
description:
en_US: Whether to enable web search in agent mode
zh_Hans: 在 Agent 模式下是否启用网络搜索
type: boolean
required: false
default: false
- name: timeout
label:
en_US: Timeout
zh_Hans: 超时时间
description:
en_US: Request timeout in seconds
zh_Hans: 请求超时时间(秒)
type: integer
required: false
default: 120
- name: base-prompt
label:
en_US: Base Prompt
zh_Hans: 基础提示词
description:
en_US: Default prompt when user message is empty (e.g. only images)
zh_Hans: 当用户消息为空(例如仅图片)时使用的默认提示词
type: string
required: false
default: '请回答用户的问题。'