Merge branch 'master' into rc/new-plugin

This commit is contained in:
Junyan Qin (Chin)
2025-09-12 23:02:51 +08:00
committed by GitHub
15 changed files with 693 additions and 196 deletions

View File

@@ -212,7 +212,7 @@ class DBMigrateV3Config(migration.DBMigration):
self.ap.instance_config.data['api']['port'] = self.ap.system_cfg.data['http-api']['port']
self.ap.instance_config.data['command'] = {
'prefix': self.ap.command_cfg.data['command-prefix'],
'enable': self.ap.command_cfg.data['command-enable'],
'enable': self.ap.command_cfg.data['command-enable'] if 'command-enable' in self.ap.command_cfg.data else True,
'privilege': self.ap.command_cfg.data['privilege'],
}
self.ap.instance_config.data['concurrency']['pipeline'] = self.ap.system_cfg.data['pipeline-concurrency']

View File

@@ -0,0 +1,45 @@
from .. import migration
import sqlalchemy
from ...entity.persistence import pipeline as persistence_pipeline
@migration.migration_class(6)
class DBMigrateLangflowApiConfig(migration.DBMigration):
"""Langflow API config"""
async def upgrade(self):
"""Upgrade"""
# read all pipelines
pipelines = await self.ap.persistence_mgr.execute_async(sqlalchemy.select(persistence_pipeline.LegacyPipeline))
for pipeline in pipelines:
serialized_pipeline = self.ap.persistence_mgr.serialize_model(persistence_pipeline.LegacyPipeline, pipeline)
config = serialized_pipeline['config']
if 'langflow-api' not in config['ai']:
config['ai']['langflow-api'] = {
'base-url': 'http://localhost:7860',
'api-key': 'your-api-key',
'flow-id': 'your-flow-id',
'input-type': 'chat',
'output-type': 'chat',
'tweaks': '{}',
}
await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(persistence_pipeline.LegacyPipeline)
.where(persistence_pipeline.LegacyPipeline.uuid == serialized_pipeline['uuid'])
.values(
{
'config': config,
'for_version': self.ap.ver_mgr.get_current_version(),
}
)
)
async def downgrade(self):
"""Downgrade"""
pass

View File

@@ -36,6 +36,6 @@ class RequestRunner(abc.ABC):
self.pipeline_config = pipeline_config
@abc.abstractmethod
async def run(self, query: pipeline_query.Query) -> typing.AsyncGenerator[provider_message.Message, None]:
async def run(self, query: core_entities.Query) -> typing.AsyncGenerator[llm_entities.Message | llm_entities.MessageChunk, None]:
"""运行请求"""
pass
pass

View File

@@ -0,0 +1,180 @@
from __future__ import annotations
import typing
import json
import httpx
import uuid
import traceback
from .. import runner
from ...core import app, entities as core_entities
from .. import entities as llm_entities
@runner.runner_class('langflow-api')
class LangflowAPIRunner(runner.RequestRunner):
"""Langflow API 对话请求器"""
def __init__(self, ap: app.Application, pipeline_config: dict):
self.ap = ap
self.pipeline_config = pipeline_config
async def _build_request_payload(self, query: core_entities.Query) -> dict:
"""构建请求负载
Args:
query: 用户查询对象
Returns:
dict: 请求负载
"""
# 获取用户消息文本
user_message_text = ''
if isinstance(query.user_message.content, str):
user_message_text = query.user_message.content
elif isinstance(query.user_message.content, list):
for item in query.user_message.content:
if item.type == 'text':
user_message_text += item.text
# 从配置中获取 input_type 和 output_type如果未配置则使用默认值
input_type = self.pipeline_config['ai']['langflow-api'].get('input_type', 'chat')
output_type = self.pipeline_config['ai']['langflow-api'].get('output_type', 'chat')
# 构建基本负载
payload = {
'output_type': output_type,
'input_type': input_type,
'input_value': user_message_text,
'session_id': str(uuid.uuid4()),
}
# 如果配置中有tweaks则添加到负载中
tweaks = json.loads(self.pipeline_config['ai']['langflow-api'].get('tweaks'))
if tweaks:
payload['tweaks'] = tweaks
return payload
async def run(
self, query: core_entities.Query
) -> typing.AsyncGenerator[llm_entities.Message | llm_entities.MessageChunk, None]:
"""运行请求
Args:
query: 用户查询对象
Yields:
Message: 回复消息
"""
# 检查是否支持流式输出
is_stream = False
try:
is_stream = await query.adapter.is_stream_output_supported()
except AttributeError:
is_stream = False
# 从配置中获取API参数
base_url = self.pipeline_config['ai']['langflow-api']['base-url']
api_key = self.pipeline_config['ai']['langflow-api']['api-key']
flow_id = self.pipeline_config['ai']['langflow-api']['flow-id']
# 构建API URL
url = f'{base_url.rstrip("/")}/api/v1/run/{flow_id}'
# 构建请求负载
payload = await self._build_request_payload(query)
# 设置请求头
headers = {'Content-Type': 'application/json', 'x-api-key': api_key}
# 发送请求
async with httpx.AsyncClient() as client:
if is_stream:
# 流式请求
async with client.stream('POST', url, json=payload, headers=headers, timeout=120.0) as response:
print(response)
response.raise_for_status()
accumulated_content = ''
message_count = 0
async for line in response.aiter_lines():
data_str = line
if data_str.startswith('data: '):
data_str = data_str[6:] # 移除 "data: " 前缀
try:
data = json.loads(data_str)
# 提取消息内容
message_text = ''
if 'outputs' in data and len(data['outputs']) > 0:
output = data['outputs'][0]
if 'outputs' in output and len(output['outputs']) > 0:
inner_output = output['outputs'][0]
if 'outputs' in inner_output and 'message' in inner_output['outputs']:
message_data = inner_output['outputs']['message']
if 'message' in message_data:
message_text = message_data['message']
# 如果没有找到消息,尝试其他可能的路径
if not message_text and 'messages' in data:
messages = data['messages']
if messages and len(messages) > 0:
message_text = messages[0].get('message', '')
if message_text:
# 更新累积内容
accumulated_content = message_text
message_count += 1
# 每8条消息或有新内容时生成一个chunk
if message_count % 8 == 0 or len(message_text) > 0:
yield llm_entities.MessageChunk(
role='assistant', content=accumulated_content, is_final=False
)
except json.JSONDecodeError:
# 如果不是JSON跳过这一行
traceback.print_exc()
continue
# 发送最终消息
yield llm_entities.MessageChunk(role='assistant', content=accumulated_content, is_final=True)
else:
# 非流式请求
response = await client.post(url, json=payload, headers=headers, timeout=120.0)
response.raise_for_status()
# 解析响应
response_data = response.json()
# 提取消息内容
# 根据Langflow API文档响应结构可能在outputs[0].outputs[0].outputs.message.message中
message_text = ''
if 'outputs' in response_data and len(response_data['outputs']) > 0:
output = response_data['outputs'][0]
if 'outputs' in output and len(output['outputs']) > 0:
inner_output = output['outputs'][0]
if 'outputs' in inner_output and 'message' in inner_output['outputs']:
message_data = inner_output['outputs']['message']
if 'message' in message_data:
message_text = message_data['message']
# 如果没有找到消息,尝试其他可能的路径
if not message_text and 'messages' in response_data:
messages = response_data['messages']
if messages and len(messages) > 0:
message_text = messages[0].get('message', '')
# 如果仍然没有找到消息,返回完整响应的字符串表示
if not message_text:
message_text = json.dumps(response_data, ensure_ascii=False, indent=2)
# 生成回复消息
if is_stream:
yield llm_entities.MessageChunk(role='assistant', content=message_text, is_final=True)
else:
reply_message = llm_entities.Message(role='assistant', content=message_text)
yield reply_message

View File

@@ -24,23 +24,23 @@ class Retriever(base_service.BaseService):
extra_args={}, # TODO: add extra args
)
chroma_results = await self.ap.vector_db_mgr.vector_db.search(kb_id, query_embedding[0], k)
vector_results = await self.ap.vector_db_mgr.vector_db.search(kb_id, query_embedding[0], k)
# 'ids' is always returned by ChromaDB, even if not explicitly in 'include'
matched_chroma_ids = chroma_results.get('ids', [[]])[0]
distances = chroma_results.get('distances', [[]])[0]
chroma_metadatas = chroma_results.get('metadatas', [[]])[0]
# 'ids' shape mirrors the Chroma-style response contract for compatibility
matched_vector_ids = vector_results.get('ids', [[]])[0]
distances = vector_results.get('distances', [[]])[0]
vector_metadatas = vector_results.get('metadatas', [[]])[0]
if not matched_chroma_ids:
self.ap.logger.info('No relevant chunks found in Chroma.')
if not matched_vector_ids:
self.ap.logger.info('No relevant chunks found in vector database.')
return []
result: list[retriever_entities.RetrieveResultEntry] = []
for i, id in enumerate(matched_chroma_ids):
for i, id in enumerate(matched_vector_ids):
entry = retriever_entities.RetrieveResultEntry(
id=id,
metadata=chroma_metadatas[i],
metadata=vector_metadatas[i],
distance=distances[i],
)
result.append(entry)

View File

@@ -1,7 +1,7 @@
semantic_version = 'v4.3.0.beta3'
required_database_version = 6
"""标记本版本所需要的数据库结构版本,用于判断数据库迁移"""
"""Tag the version of the database schema, used to check if the database needs to be migrated"""
debug_mode = False

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
from ..core import app
from .vdb import VectorDatabase
from .vdbs.chroma import ChromaVectorDatabase
from .vdbs.qdrant import QdrantVectorDatabase
class VectorDBManager:
@@ -13,6 +14,17 @@ class VectorDBManager:
self.ap = ap
async def initialize(self):
# 初始化 Chroma 向量数据库(可扩展为多种实现)
if self.vector_db is None:
kb_config = self.ap.instance_config.data.get('vdb')
if kb_config:
if kb_config.get('use') == 'chroma':
self.vector_db = ChromaVectorDatabase(self.ap)
self.ap.logger.info('Initialized Chroma vector database backend.')
elif kb_config.get('use') == 'qdrant':
self.vector_db = QdrantVectorDatabase(self.ap)
self.ap.logger.info('Initialized Qdrant vector database backend.')
else:
self.vector_db = ChromaVectorDatabase(self.ap)
self.ap.logger.warning('No valid vector database backend configured, defaulting to Chroma.')
else:
self.vector_db = ChromaVectorDatabase(self.ap)
self.ap.logger.warning('No vector database backend configured, defaulting to Chroma.')

View File

@@ -14,24 +14,25 @@ class VectorDatabase(abc.ABC):
metadatas: list[dict[str, Any]],
documents: list[str],
) -> None:
"""向指定 collection 添加向量数据。"""
"""Add vector data to the specified collection."""
pass
@abc.abstractmethod
async def search(self, collection: str, query_embedding: np.ndarray, k: int = 5) -> Dict[str, Any]:
"""在指定 collection 中检索最相似的向量。"""
"""Search for the most similar vectors in the specified collection."""
pass
@abc.abstractmethod
async def delete_by_file_id(self, collection: str, file_id: str) -> None:
"""根据 file_id 删除指定 collection 中的向量。"""
"""Delete vectors from the specified collection by file_id."""
pass
@abc.abstractmethod
async def get_or_create_collection(self, collection: str):
"""获取或创建 collection"""
"""Get or create collection."""
pass
@abc.abstractmethod
async def delete_collection(self, collection: str):
"""Delete collection."""
pass

104
pkg/vector/vdbs/qdrant.py Normal file
View File

@@ -0,0 +1,104 @@
from __future__ import annotations
from typing import Any, Dict, List
from qdrant_client import AsyncQdrantClient, models
from pkg.core import app
from pkg.vector.vdb import VectorDatabase
class QdrantVectorDatabase(VectorDatabase):
def __init__(self, ap: app.Application):
self.ap = ap
url = self.ap.instance_config.data['vdb']['qdrant']['url']
host = self.ap.instance_config.data['vdb']['qdrant']['host']
port = self.ap.instance_config.data['vdb']['qdrant']['port']
api_key = self.ap.instance_config.data['vdb']['qdrant']['api_key']
if url:
self.client = AsyncQdrantClient(url=url, api_key=api_key)
else:
self.client = AsyncQdrantClient(host=host, port=int(port), api_key=api_key)
self._collections: set[str] = set()
async def _ensure_collection(self, collection: str, vector_size: int) -> None:
if collection in self._collections:
return
exists = await self.client.collection_exists(collection)
if exists:
self._collections.add(collection)
return
await self.client.create_collection(
collection_name=collection,
vectors_config=models.VectorParams(size=vector_size, distance=models.Distance.COSINE),
)
self._collections.add(collection)
self.ap.logger.info(f"Qdrant collection '{collection}' created with dim={vector_size}.")
async def get_or_create_collection(self, collection: str):
# Qdrant requires vector size to create a collection; no-op here.
pass
async def add_embeddings(
self,
collection: str,
ids: List[str],
embeddings_list: List[List[float]],
metadatas: List[Dict[str, Any]],
) -> None:
if not embeddings_list:
return
await self._ensure_collection(collection, len(embeddings_list[0]))
points = [
models.PointStruct(id=ids[i], vector=embeddings_list[i], payload=metadatas[i]) for i in range(len(ids))
]
await self.client.upsert(collection_name=collection, points=points)
self.ap.logger.info(f"Added {len(ids)} embeddings to Qdrant collection '{collection}'.")
async def search(self, collection: str, query_embedding: list[float], k: int = 5) -> dict[str, Any]:
exists = await self.client.collection_exists(collection)
if not exists:
return {'ids': [[]], 'metadatas': [[]], 'distances': [[]]}
hits = (
await self.client.query_points(
collection_name=collection,
query=query_embedding,
limit=k,
with_payload=True,
)
).points
ids = [str(hit.id) for hit in hits]
metadatas = [hit.payload or {} for hit in hits]
# Qdrant's score is similarity; convert to a pseudo-distance for consistency
distances = [1 - float(hit.score) if hit.score is not None else 1.0 for hit in hits]
results = {'ids': [ids], 'metadatas': [metadatas], 'distances': [distances]}
self.ap.logger.info(f"Qdrant search in '{collection}' returned {len(results.get('ids', [[]])[0])} results.")
return results
async def delete_by_file_id(self, collection: str, file_id: str) -> None:
exists = await self.client.collection_exists(collection)
if not exists:
return
await self.client.delete(
collection_name=collection,
points_selector=models.Filter(
must=[models.FieldCondition(key='file_id', match=models.MatchValue(value=file_id))]
),
)
self.ap.logger.info(f"Deleted embeddings from Qdrant collection '{collection}' with file_id: {file_id}")
async def delete_collection(self, collection: str):
try:
await self.client.delete_collection(collection)
self._collections.discard(collection)
self.ap.logger.info(f"Qdrant collection '{collection}' deleted.")
except Exception:
self.ap.logger.warning(f"Qdrant collection '{collection}' not found.")