Files
LangBot/src/langbot/pkg/provider/runners/langflowapi.py
Copilot e642ffa5b3 chore: Add PyPI package support for uvx/pip installation (#1764)
* Initial plan

* Add package structure and resource path utilities

- Created langbot/ package with __init__.py and __main__.py entry point
- Added paths utility to find frontend and resource files from package installation
- Updated config loading to use resource paths
- Updated frontend serving to use resource paths
- Added MANIFEST.in for package data inclusion
- Updated pyproject.toml with build system and entry points

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* Add PyPI publishing workflow and update license

- Created GitHub Actions workflow to build frontend and publish to PyPI
- Added license field to pyproject.toml to fix deprecation warning
- Updated .gitignore to exclude build artifacts
- Tested package building successfully

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* Add PyPI installation documentation

- Created PYPI_INSTALLATION.md with detailed installation and usage instructions
- Updated README.md to feature uvx/pip installation as recommended method
- Updated README_EN.md with same changes for English documentation

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* Address code review feedback

- Made package-data configuration more specific to langbot package only
- Improved path detection with caching to avoid repeated file I/O
- Removed sys.path searching which was incorrect for package data
- Removed interactive input() call for non-interactive environment compatibility
- Simplified error messages for version check

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* Fix code review issues

- Use specific exception types instead of bare except
- Fix misleading comments about directory levels
- Remove redundant existence check before makedirs with exist_ok=True
- Use context manager for file opening to ensure proper cleanup

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* Simplify package configuration and document behavioral differences

- Removed redundant package-data configuration, relying on MANIFEST.in
- Added documentation about behavioral differences between package and source installation
- Clarified that include-package-data=true uses MANIFEST.in for data files

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* chore: update pyproject.toml

* chore: try pack templates in langbot/

* chore: update

* chore: update

* chore: update

* chore: update

* chore: update

* chore: adjust dir structure

* chore: fix imports

* fix: read default-pipeline-config.json

* fix: read default-pipeline-config.json

* fix: tests

* ci: publish pypi

* chore: bump version 4.6.0-beta.1 for testing

* chore: add templates/**

* fix: send adapters and requesters icons

* chore: bump version 4.6.0b2 for testing

* chore: add platform field for docker-compose.yaml

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>
Co-authored-by: Junyan Qin <rockchinq@gmail.com>
2025-11-16 19:53:01 +08:00

182 lines
7.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from __future__ import annotations
import typing
import json
import httpx
import uuid
import traceback
from .. import runner
from ...core import app
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
@runner.runner_class('langflow-api')
class LangflowAPIRunner(runner.RequestRunner):
"""Langflow API 对话请求器"""
def __init__(self, ap: app.Application, pipeline_config: dict):
self.ap = ap
self.pipeline_config = pipeline_config
async def _build_request_payload(self, query: pipeline_query.Query) -> dict:
"""构建请求负载
Args:
query: 用户查询对象
Returns:
dict: 请求负载
"""
# 获取用户消息文本
user_message_text = ''
if isinstance(query.user_message.content, str):
user_message_text = query.user_message.content
elif isinstance(query.user_message.content, list):
for item in query.user_message.content:
if item.type == 'text':
user_message_text += item.text
# 从配置中获取 input_type 和 output_type如果未配置则使用默认值
input_type = self.pipeline_config['ai']['langflow-api'].get('input_type', 'chat')
output_type = self.pipeline_config['ai']['langflow-api'].get('output_type', 'chat')
# 构建基本负载
payload = {
'output_type': output_type,
'input_type': input_type,
'input_value': user_message_text,
'session_id': str(uuid.uuid4()),
}
# 如果配置中有tweaks则添加到负载中
tweaks = json.loads(self.pipeline_config['ai']['langflow-api'].get('tweaks'))
if tweaks:
payload['tweaks'] = tweaks
return payload
async def run(
self, query: pipeline_query.Query
) -> typing.AsyncGenerator[provider_message.Message | provider_message.MessageChunk, None]:
"""运行请求
Args:
query: 用户查询对象
Yields:
Message: 回复消息
"""
# 检查是否支持流式输出
is_stream = False
try:
is_stream = await query.adapter.is_stream_output_supported()
except AttributeError:
is_stream = False
# 从配置中获取API参数
base_url = self.pipeline_config['ai']['langflow-api']['base-url']
api_key = self.pipeline_config['ai']['langflow-api']['api-key']
flow_id = self.pipeline_config['ai']['langflow-api']['flow-id']
# 构建API URL
url = f'{base_url.rstrip("/")}/api/v1/run/{flow_id}'
# 构建请求负载
payload = await self._build_request_payload(query)
# 设置请求头
headers = {'Content-Type': 'application/json', 'x-api-key': api_key}
# 发送请求
async with httpx.AsyncClient() as client:
if is_stream:
# 流式请求
async with client.stream('POST', url, json=payload, headers=headers, timeout=120.0) as response:
print(response)
response.raise_for_status()
accumulated_content = ''
message_count = 0
async for line in response.aiter_lines():
data_str = line
if data_str.startswith('data: '):
data_str = data_str[6:] # 移除 "data: " 前缀
try:
data = json.loads(data_str)
# 提取消息内容
message_text = ''
if 'outputs' in data and len(data['outputs']) > 0:
output = data['outputs'][0]
if 'outputs' in output and len(output['outputs']) > 0:
inner_output = output['outputs'][0]
if 'outputs' in inner_output and 'message' in inner_output['outputs']:
message_data = inner_output['outputs']['message']
if 'message' in message_data:
message_text = message_data['message']
# 如果没有找到消息,尝试其他可能的路径
if not message_text and 'messages' in data:
messages = data['messages']
if messages and len(messages) > 0:
message_text = messages[0].get('message', '')
if message_text:
# 更新累积内容
accumulated_content = message_text
message_count += 1
# 每8条消息或有新内容时生成一个chunk
if message_count % 8 == 0 or len(message_text) > 0:
yield provider_message.MessageChunk(
role='assistant', content=accumulated_content, is_final=False
)
except json.JSONDecodeError:
# 如果不是JSON跳过这一行
traceback.print_exc()
continue
# 发送最终消息
yield provider_message.MessageChunk(role='assistant', content=accumulated_content, is_final=True)
else:
# 非流式请求
response = await client.post(url, json=payload, headers=headers, timeout=120.0)
response.raise_for_status()
# 解析响应
response_data = response.json()
# 提取消息内容
# 根据Langflow API文档响应结构可能在outputs[0].outputs[0].outputs.message.message中
message_text = ''
if 'outputs' in response_data and len(response_data['outputs']) > 0:
output = response_data['outputs'][0]
if 'outputs' in output and len(output['outputs']) > 0:
inner_output = output['outputs'][0]
if 'outputs' in inner_output and 'message' in inner_output['outputs']:
message_data = inner_output['outputs']['message']
if 'message' in message_data:
message_text = message_data['message']
# 如果没有找到消息,尝试其他可能的路径
if not message_text and 'messages' in response_data:
messages = response_data['messages']
if messages and len(messages) > 0:
message_text = messages[0].get('message', '')
# 如果仍然没有找到消息,返回完整响应的字符串表示
if not message_text:
message_text = json.dumps(response_data, ensure_ascii=False, indent=2)
# 生成回复消息
if is_stream:
yield provider_message.MessageChunk(role='assistant', content=message_text, is_final=True)
else:
reply_message = provider_message.Message(role='assistant', content=message_text)
yield reply_message