Compare commits

...

7 Commits

Author SHA1 Message Date
Junyan Qin
a05cdcac50 chore: update project version to 4.7.1 2026-01-09 21:52:08 +08:00
Junyan Qin
ecfb2bfb34 chore: add type hints for ap in telemetry.py 2026-01-09 21:50:43 +08:00
Guanchao Wang
e17dba0a98 fix: testing mcp server (#1912) 2026-01-09 18:39:40 +08:00
Hadong
6b138943ce feat(milvus): milvus related updates (#1908)
- Add Milvus db_name configuration and client parameter support.
- change kb_data uuid for Milvus. 3. add MAX_BATCH_SIZE for openai.
- support more vector_size.
2026-01-09 16:03:43 +08:00
fdc310
eb0e6aff68 feat: add telemetry support for query execution tracking and configur… (#1900)
* feat: add telemetry support for query execution tracking and configuration

* feat: integrate telemetry manager and enable telemetry data sending

* feat: integrate telemetry manager and enhance error handling for telemetry sending

* feat: update telemetry configuration to use 'space' instead of 'telemetry' and adjust related parameters

* feat: integrate telemetry manager and enable telemetry data sending

* feat: integrate telemetry manager and enhance error handling for telemetry sending

* feat: add instance id

* feat: enhance telemetry management with asynchronous task handling and improve model retrieval caching

---------

Co-authored-by: Junyan Qin <rockchinq@gmail.com>
2026-01-09 15:50:44 +08:00
Junyan Qin
4d0095626a fix: update docker-compose command to include --no-sync option for improved runtime behavior 2026-01-08 11:30:25 +08:00
Junyan Qin
aa0a501ade fix: bug in bind space account in models dialog 2026-01-05 20:53:35 +08:00
15 changed files with 334 additions and 46 deletions

View File

@@ -14,7 +14,7 @@ services:
restart: on-failure
environment:
- TZ=Asia/Shanghai
command: ["uv", "run", "-m", "langbot_plugin.cli.__init__", "rt"]
command: ["uv", "run", "--no-sync", "-m", "langbot_plugin.cli.__init__", "rt"]
networks:
- langbot_network

View File

@@ -1,6 +1,6 @@
[project]
name = "langbot"
version = "4.7.0"
version = "4.7.1"
description = "Production-grade platform for building IM bots"
readme = "README.md"
license-files = ["LICENSE"]

View File

@@ -1,3 +1,3 @@
"""LangBot - Production-grade platform for building IM bots"""
__version__ = '4.7.0'
__version__ = '4.7.1'

View File

@@ -36,6 +36,7 @@ from . import taskmgr
from . import entities as core_entities
from ..rag.knowledge import kbmgr as rag_mgr
from ..vector import mgr as vectordb_mgr
from ..telemetry import telemetry as telemetry_module
class Application:
@@ -140,6 +141,8 @@ class Application:
webhook_service: webhook_service.WebhookService = None
telemetry: telemetry_module.TelemetryManager = None
def __init__(self):
pass

View File

@@ -31,6 +31,8 @@ from ...storage import mgr as storagemgr
from ...utils import logcache
from ...vector import mgr as vectordb_mgr
from .. import taskmgr
from ...telemetry import telemetry as telemetry_module
@stage.stage_class('BuildAppStage')
@@ -102,6 +104,11 @@ class BuildAppStage(stage.BootingStage):
ap.persistence_mgr = persistence_mgr_inst
await persistence_mgr_inst.initialize()
# Telemetry manager: attach to app so other components can call via self.ap.telemetry
telemetry_inst = telemetry_module.TelemetryManager(ap)
await telemetry_inst.initialize()
ap.telemetry = telemetry_inst
cmd_mgr_inst = cmdmgr.CommandManager(ap)
await cmd_mgr_inst.initialize()
ap.cmd_mgr = cmd_mgr_inst

View File

@@ -3,6 +3,8 @@ from __future__ import annotations
import uuid
import typing
import traceback
import time
from datetime import datetime
from .. import handler
@@ -10,7 +12,7 @@ from ... import entities
from ....provider import runner as runner_module
import langbot_plugin.api.entities.events as events
from ....utils import importutil
from ....utils import importutil, constants
from ....provider import runners
import langbot_plugin.api.entities.builtin.provider.session as provider_session
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
@@ -84,6 +86,9 @@ class ChatMessageHandler(handler.MessageHandler):
break
else:
raise ValueError(f'Request Runner not found: {query.pipeline_config["ai"]["runner"]["runner"]}')
# Mark start time for telemetry
start_ts = time.time()
if is_stream:
resp_message_id = uuid.uuid4()
chunk_count = 0 # Track streaming chunks to reduce excessive logging
@@ -140,7 +145,8 @@ class ChatMessageHandler(handler.MessageHandler):
query.session.using_conversation.messages.extend(query.resp_messages)
except Exception as e:
self.ap.logger.error(f'Conversation({query.query_id}) Request Failed: {type(e).__name__} {str(e)}')
error_info = f'{type(e).__name__} {str(e)}'
self.ap.logger.error(f'Conversation({query.query_id}) Request Failed: {error_info}')
traceback.print_exc()
hide_exception_info = query.pipeline_config['output']['misc']['hide-exception']
@@ -153,5 +159,47 @@ class ChatMessageHandler(handler.MessageHandler):
debug_notice=traceback.format_exc(),
)
finally:
# TODO statistics
pass
# Telemetry reporting: collect minimal per-query execution info and send asynchronously
try:
end_ts = time.time()
duration_ms = None
if 'start_ts' in locals():
duration_ms = int((end_ts - start_ts) * 1000)
adapter_name = query.adapter.__class__.__name__ if hasattr(query, 'adapter') else None
runner_name = (
query.pipeline_config.get('ai', {}).get('runner', {}).get('runner')
if query.pipeline_config
else None
)
# Model name if using localagent
model_name = None
try:
if runner_name == 'local-agent' and getattr(query, 'use_llm_model_uuid', None):
m = await self.ap.model_mgr.get_model_by_uuid(query.use_llm_model_uuid)
if m and getattr(m, 'model_entity', None):
model_name = getattr(m.model_entity, 'name', None)
except Exception:
model_name = None
pipeline_plugins = query.variables.get('_pipeline_bound_plugins', None)
payload = {
'query_id': query.query_id,
'adapter': adapter_name,
'runner': runner_name,
'duration_ms': duration_ms,
'model_name': model_name,
'version': constants.semantic_version,
'instance_id': constants.instance_id,
'pipeline_plugins': pipeline_plugins,
'error': locals().get('error_info', None),
'timestamp': datetime.utcnow().isoformat(),
}
# Send telemetry asynchronously and do not block pipeline via app's telemetry manager
await self.ap.telemetry.start_send_task(payload)
except Exception as ex:
# Ensure telemetry issues do not affect normal flow
self.ap.logger.warning(f'Failed to send telemetry: {ex}')

View File

@@ -9,6 +9,7 @@ from ...discover import engine
from . import token
from ...entity.persistence import model as persistence_model
from ...entity.errors import provider as provider_errors
from async_lru import alru_cache
class ModelManager:
@@ -349,6 +350,7 @@ class ModelManager:
await self.load_embedding_model_with_provider(model_entity, provider_entity)
@alru_cache(ttl=60 * 5)
async def get_model_by_uuid(self, uuid: str) -> requester.RuntimeLLMModel:
"""Get LLM model by uuid"""
for model in self.llm_models:
@@ -356,6 +358,7 @@ class ModelManager:
return model
raise ValueError(f'LLM model {uuid} not found')
@alru_cache(ttl=60 * 5)
async def get_embedding_model_by_uuid(self, uuid: str) -> requester.RuntimeEmbeddingModel:
"""Get embedding model by uuid"""
for model in self.embedding_models:

View File

@@ -7,7 +7,7 @@ import traceback
from langbot_plugin.api.entities.events import pipeline_query
import sqlalchemy
import asyncio
import uuid as uuid_module
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from mcp.client.sse import sse_client
@@ -287,6 +287,14 @@ class MCPLoader(loader.ToolLoader):
- enable: 是否启用
- extra_args: 额外的配置参数 (可选)
"""
uuid_ = server_config.get('uuid')
if not uuid_:
self.ap.logger.warning(
'Server UUID is None for MCP server, maybe testing in the config page.'
)
uuid_ = str(uuid_module.uuid4())
server_config['uuid'] = uuid_
name = server_config['name']
uuid = server_config['uuid']

View File

@@ -32,12 +32,18 @@ class Embedder(BaseService):
await self.ap.persistence_mgr.execute_async(sqlalchemy.insert(persistence_rag.Chunk).values(chunk_dicts))
# get embeddings
embeddings_list: list[list[float]] = await embedding_model.provider.requester.invoke_embedding(
model=embedding_model,
input_text=chunks,
extra_args={}, # TODO: add extra args
)
# get embeddings (batch size limit: 64 for OpenAI)
MAX_BATCH_SIZE = 64
embeddings_list: list[list[float]] = []
for i in range(0, len(chunks), MAX_BATCH_SIZE):
batch = chunks[i:i + MAX_BATCH_SIZE]
batch_embeddings = await embedding_model.provider.requester.invoke_embedding(
model=embedding_model,
input_text=batch,
extra_args={}, # TODO: add extra args
)
embeddings_list.extend(batch_embeddings)
# save embeddings to vdb
await self.ap.vector_db_mgr.vector_db.add_embeddings(kb_id, chunk_ids, embeddings_list, chunk_dicts)

View File

View File

@@ -0,0 +1,121 @@
from __future__ import annotations
import asyncio
import httpx
from ..core import app as core_app
class TelemetryManager:
"""TelemetryManager handles sending telemetry for a given application instance.
Usage:
telemetry = TelemetryManager(ap)
await telemetry.send({ ... })
"""
send_tasks: list[asyncio.Task] = []
def __init__(self, ap: core_app.Application):
self.ap = ap
self.telemetry_config = {}
async def initialize(self):
self.telemetry_config = self.ap.instance_config.data.get('space', {})
async def start_send_task(self, payload: dict):
task = asyncio.create_task(self.send(payload))
self.send_tasks.append(task)
async def send(self, payload: dict):
"""Send telemetry payload to configured telemetry server (non-blocking).
Expects ap.instance_config.data.telemetry to have:
- enabled: bool
- server: str (base URL, e.g. https://space.example.com)
- timeout_seconds: optional int, overall request timeout (default 10)
Posts to {server.rstrip('/')}/api/v1/telemetry as JSON. Failures are logged but do not raise.
"""
try:
cfg = self.telemetry_config
if not cfg:
return
if cfg.get('disable_telemetry', False):
return
server = cfg.get('url', '')
if not server:
return
# Normalize URL
url = server.rstrip('/') + '/api/v1/telemetry'
try:
# Sanitize payload so string fields are strings and not nulls
sanitized = dict(payload)
if 'query_id' in sanitized:
try:
sanitized['query_id'] = '' if sanitized['query_id'] is None else str(sanitized['query_id'])
except Exception:
sanitized['query_id'] = str(sanitized.get('query_id', ''))
for sfield in ('adapter', 'runner', 'model_name', 'version', 'error', 'timestamp'):
v = sanitized.get(sfield)
sanitized[sfield] = '' if v is None else str(v)
if 'duration_ms' in sanitized:
try:
sanitized['duration_ms'] = (
int(sanitized['duration_ms']) if sanitized['duration_ms'] is not None else 0
)
except Exception:
sanitized['duration_ms'] = 0
async with httpx.AsyncClient(timeout=httpx.Timeout(10)) as client:
try:
# Use asyncio.wait_for to ensure we always bound the total time
resp = await asyncio.wait_for(client.post(url, json=sanitized), timeout=10 + 1)
if resp.status_code >= 400:
self.ap.logger.warning(
f'Telemetry post to {url} returned status {resp.status_code} - {resp.text}'
)
else:
# Detect application-level errors inside HTTP 200 responses
app_err = False
try:
j = resp.json()
if isinstance(j, dict) and j.get('code') is not None and int(j.get('code')) >= 400:
app_err = True
self.ap.logger.warning(
f'Telemetry post to {url} returned application error code {j.get("code")} - {j.get("msg")}'
)
except Exception:
pass
if app_err:
self.ap.logger.warning(
f'Telemetry post to {url} returned app-level error - response: {resp.text[:200]}'
)
else:
self.ap.logger.debug(
f'Telemetry posted to {url}, status {resp.status_code} - response: {resp.text[:200]}'
)
except asyncio.TimeoutError:
self.ap.logger.warning(f'Telemetry post to {url} timed out')
except Exception as e:
self.ap.logger.warning(f'Failed to post telemetry to {url}: {e}', exc_info=True)
except Exception as e:
try:
self.ap.logger.warning(
f'Failed to create HTTP client for telemetry or sanitize payload: {e}', exc_info=True
)
except Exception:
pass
except Exception as e:
# Never raise from telemetry; surface as warning for visibility
try:
self.ap.logger.warning(f'Unexpected telemetry error: {e}', exc_info=True)
except Exception:
pass

View File

@@ -37,7 +37,8 @@ class VectorDBManager:
milvus_config = kb_config.get('milvus', {})
uri = milvus_config.get('uri', './data/milvus.db')
token = milvus_config.get('token')
self.vector_db = MilvusVectorDatabase(self.ap, uri=uri, token=token)
db_name = milvus_config.get('db_name', 'default')
self.vector_db = MilvusVectorDatabase(self.ap, uri=uri, token=token, db_name=db_name)
self.ap.logger.info('Initialized Milvus vector database backend.')
elif vdb_type == 'pgvector':

View File

@@ -1,7 +1,8 @@
from __future__ import annotations
import asyncio
from typing import Any, Dict
from pymilvus import MilvusClient, DataType
from pymilvus import MilvusClient, DataType, CollectionSchema, FieldSchema
from pymilvus.milvus_client.index import IndexParams
from langbot.pkg.vector.vdb import VectorDatabase
from langbot.pkg.core import app
@@ -9,7 +10,7 @@ from langbot.pkg.core import app
class MilvusVectorDatabase(VectorDatabase):
"""Milvus vector database implementation"""
def __init__(self, ap: app.Application, uri: str = "milvus.db", token: str = None):
def __init__(self, ap: app.Application, uri: str = "milvus.db", token: str = None, db_name: str = None):
"""Initialize Milvus vector database
Args:
@@ -21,30 +22,76 @@ class MilvusVectorDatabase(VectorDatabase):
self.ap = ap
self.uri = uri
self.token = token
self.db_name = db_name
self.client = None
self._collections = {}
self._collections: set[str] = set()
self._initialize_client()
def _initialize_client(self):
"""Initialize Milvus client connection"""
try:
if self.token:
self.client = MilvusClient(uri=self.uri, token=self.token)
self.client = MilvusClient(uri=self.uri, token=self.token, db_name=self.db_name)
else:
self.client = MilvusClient(uri=self.uri)
self.client = MilvusClient(uri=self.uri, db_name=self.db_name)
self.ap.logger.info(f"Connected to Milvus at {self.uri}")
except Exception as e:
self.ap.logger.error(f"Failed to connect to Milvus: {e}")
raise
async def get_or_create_collection(self, collection: str):
"""Get or create a Milvus collection
@staticmethod
def _normalize_collection_name(collection: str) -> str:
"""Normalize collection name to comply with Milvus naming requirements.
Milvus requirements:
- First character must be an underscore or letter
- Can only contain numbers, letters and underscores
Args:
collection: Original collection name (e.g., UUID with hyphens)
Returns:
Normalized collection name that complies with Milvus requirements
"""
# Replace hyphens with underscores
normalized = collection.replace('-', '_')
# If first character is not a letter or underscore, prepend 'kb_'
if normalized and not (normalized[0].isalpha() or normalized[0] == '_'):
normalized = 'kb_' + normalized
return normalized
async def _ensure_vector_index(self, collection: str) -> None:
"""Ensure the vector field has an index.
Args:
collection: Normalized collection name
"""
index_params = IndexParams()
index_params.add_index(
field_name="vector",
index_type="AUTOINDEX",
metric_type="COSINE",
)
await asyncio.to_thread(
self.client.create_index,
collection_name=collection,
index_params=index_params
)
async def _get_or_create_collection_internal(self, collection: str, vector_size: int = None):
"""Internal method to get or create a Milvus collection with proper configuration.
Args:
collection: Collection name (corresponds to knowledge base UUID)
vector_size: Dimension of the vectors (if None, defaults to 1536)
"""
# Normalize collection name for Milvus compatibility
collection = self._normalize_collection_name(collection)
if collection in self._collections:
return self._collections[collection]
return collection
# Check if collection exists
has_collection = await asyncio.to_thread(
@@ -52,12 +99,13 @@ class MilvusVectorDatabase(VectorDatabase):
)
if not has_collection:
# Create collection with custom schema to support string IDs
from pymilvus import CollectionSchema, FieldSchema, DataType
# Default dimension if not specified (for backward compatibility)
if vector_size is None:
vector_size = 1536
fields = [
FieldSchema(name="id", dtype=DataType.VARCHAR, is_primary=True, max_length=255),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=1536),
FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=vector_size),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=65535),
FieldSchema(name="file_id", dtype=DataType.VARCHAR, max_length=255),
FieldSchema(name="chunk_uuid", dtype=DataType.VARCHAR, max_length=255),
@@ -72,26 +120,42 @@ class MilvusVectorDatabase(VectorDatabase):
metric_type="COSINE",
)
# Create index for vector field (required for loading/searching)
index_params = {
"metric_type": "COSINE",
"index_type": "AUTOINDEX",
"params": {}
}
await asyncio.to_thread(
self.client.create_index,
collection_name=collection,
field_name="vector",
index_params=index_params
)
self.ap.logger.info(f"Created Milvus collection '{collection}' with index")
await self._ensure_vector_index(collection)
self.ap.logger.info(f"Created Milvus collection '{collection}' with dimension={vector_size}, index=AUTOINDEX")
else:
# Ensure index exists for existing collection
await self._ensure_index_if_missing(collection)
self.ap.logger.info(f"Milvus collection '{collection}' already exists")
self._collections[collection] = collection
self._collections.add(collection)
return collection
async def _ensure_index_if_missing(self, collection: str) -> None:
"""Check if index exists for collection and create if missing.
Args:
collection: Normalized collection name
"""
try:
indexes = await asyncio.to_thread(
self.client.list_indexes,
collection_name=collection
)
if "vector" not in indexes:
await self._ensure_vector_index(collection)
self.ap.logger.info(f"Created index for existing Milvus collection '{collection}'")
except Exception as e:
self.ap.logger.warning(f"Could not verify/create index for collection '{collection}': {e}")
async def get_or_create_collection(self, collection: str):
"""Get or create a Milvus collection (without vector size - will use default).
Args:
collection: Collection name (corresponds to knowledge base UUID)
"""
collection = self._normalize_collection_name(collection)
return await self._get_or_create_collection_internal(collection)
async def add_embeddings(
self,
collection: str,
@@ -107,7 +171,14 @@ class MilvusVectorDatabase(VectorDatabase):
embeddings_list: List of embedding vectors
metadatas: List of metadata dictionaries for each vector
"""
await self.get_or_create_collection(collection)
collection = self._normalize_collection_name(collection)
if not embeddings_list:
return
# Ensure collection exists with correct dimension
vector_size = len(embeddings_list[0])
await self._get_or_create_collection_internal(collection, vector_size)
# Prepare data in Milvus format
data = []
@@ -156,6 +227,7 @@ class MilvusVectorDatabase(VectorDatabase):
Returns:
Dictionary with search results in Chroma-compatible format
"""
collection = self._normalize_collection_name(collection)
await self.get_or_create_collection(collection)
# Perform search
@@ -214,6 +286,7 @@ class MilvusVectorDatabase(VectorDatabase):
collection: Collection name
file_id: File ID to filter deletion
"""
collection = self._normalize_collection_name(collection)
await self.get_or_create_collection(collection)
# Delete entities matching the file_id
@@ -232,8 +305,9 @@ class MilvusVectorDatabase(VectorDatabase):
Args:
collection: Collection name to delete
"""
if collection in self._collections:
del self._collections[collection]
collection = self._normalize_collection_name(collection)
self._collections.discard(collection)
# Check if collection exists before attempting deletion
has_collection = await asyncio.to_thread(

View File

@@ -51,6 +51,7 @@ vdb:
milvus:
uri: 'http://127.0.0.1:19530'
token: ''
db_name: ''
pgvector:
host: '127.0.0.1'
port: 5433
@@ -78,3 +79,4 @@ space:
# OAuth authorization page URL (user will be redirected here)
oauth_authorize_url: 'https://space.langbot.app/auth/authorize'
disable_models_service: false
disable_telemetry: false

View File

@@ -206,8 +206,23 @@ export default function ModelsDialog({
}
}
function handleSpaceLogin() {
window.location.href = '/auth/space';
async function handleSpaceLogin() {
try {
const token = localStorage.getItem('token');
if (!token) {
toast.error(t('common.error'));
return;
}
const currentOrigin = window.location.origin;
const redirectUri = `${currentOrigin}/auth/space/callback?mode=bind`;
const response = await httpClient.getSpaceAuthorizeUrl(
redirectUri,
token,
);
window.location.href = response.authorize_url;
} catch {
toast.error(t('common.spaceLoginFailed'));
}
}
async function handleAddModel(