From eb0e6aff6883f255096ff5d3f5ca0e12cb4e279f Mon Sep 17 00:00:00 2001 From: fdc310 <82008029+fdc310@users.noreply.github.com> Date: Fri, 9 Jan 2026 15:50:44 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20add=20telemetry=20support=20for=20query?= =?UTF-8?q?=20execution=20tracking=20and=20configur=E2=80=A6=20(#1900)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat: add telemetry support for query execution tracking and configuration * feat: integrate telemetry manager and enable telemetry data sending * feat: integrate telemetry manager and enhance error handling for telemetry sending * feat: update telemetry configuration to use 'space' instead of 'telemetry' and adjust related parameters * feat: integrate telemetry manager and enable telemetry data sending * feat: integrate telemetry manager and enhance error handling for telemetry sending * feat: add instance id * feat: enhance telemetry management with asynchronous task handling and improve model retrieval caching --------- Co-authored-by: Junyan Qin --- src/langbot/pkg/core/app.py | 3 + src/langbot/pkg/core/stages/build_app.py | 7 + .../pkg/pipeline/process/handlers/chat.py | 56 +++++++- src/langbot/pkg/provider/modelmgr/modelmgr.py | 3 + src/langbot/pkg/telemetry/__init__.py | 0 src/langbot/pkg/telemetry/telemetry.py | 120 ++++++++++++++++++ src/langbot/templates/config.yaml | 1 + 7 files changed, 186 insertions(+), 4 deletions(-) create mode 100644 src/langbot/pkg/telemetry/__init__.py create mode 100644 src/langbot/pkg/telemetry/telemetry.py diff --git a/src/langbot/pkg/core/app.py b/src/langbot/pkg/core/app.py index 4dad80a3..c23d114f 100644 --- a/src/langbot/pkg/core/app.py +++ b/src/langbot/pkg/core/app.py @@ -36,6 +36,7 @@ from . import taskmgr from . import entities as core_entities from ..rag.knowledge import kbmgr as rag_mgr from ..vector import mgr as vectordb_mgr +from ..telemetry import telemetry as telemetry_module class Application: @@ -140,6 +141,8 @@ class Application: webhook_service: webhook_service.WebhookService = None + telemetry: telemetry_module.TelemetryManager = None + def __init__(self): pass diff --git a/src/langbot/pkg/core/stages/build_app.py b/src/langbot/pkg/core/stages/build_app.py index 1ce4ddbd..e7226041 100644 --- a/src/langbot/pkg/core/stages/build_app.py +++ b/src/langbot/pkg/core/stages/build_app.py @@ -31,6 +31,8 @@ from ...storage import mgr as storagemgr from ...utils import logcache from ...vector import mgr as vectordb_mgr from .. import taskmgr +from ...telemetry import telemetry as telemetry_module + @stage.stage_class('BuildAppStage') @@ -102,6 +104,11 @@ class BuildAppStage(stage.BootingStage): ap.persistence_mgr = persistence_mgr_inst await persistence_mgr_inst.initialize() + # Telemetry manager: attach to app so other components can call via self.ap.telemetry + telemetry_inst = telemetry_module.TelemetryManager(ap) + await telemetry_inst.initialize() + ap.telemetry = telemetry_inst + cmd_mgr_inst = cmdmgr.CommandManager(ap) await cmd_mgr_inst.initialize() ap.cmd_mgr = cmd_mgr_inst diff --git a/src/langbot/pkg/pipeline/process/handlers/chat.py b/src/langbot/pkg/pipeline/process/handlers/chat.py index b1496260..1e07cb76 100644 --- a/src/langbot/pkg/pipeline/process/handlers/chat.py +++ b/src/langbot/pkg/pipeline/process/handlers/chat.py @@ -3,6 +3,8 @@ from __future__ import annotations import uuid import typing import traceback +import time +from datetime import datetime from .. import handler @@ -10,7 +12,7 @@ from ... import entities from ....provider import runner as runner_module import langbot_plugin.api.entities.events as events -from ....utils import importutil +from ....utils import importutil, constants from ....provider import runners import langbot_plugin.api.entities.builtin.provider.session as provider_session import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query @@ -84,6 +86,9 @@ class ChatMessageHandler(handler.MessageHandler): break else: raise ValueError(f'Request Runner not found: {query.pipeline_config["ai"]["runner"]["runner"]}') + # Mark start time for telemetry + start_ts = time.time() + if is_stream: resp_message_id = uuid.uuid4() chunk_count = 0 # Track streaming chunks to reduce excessive logging @@ -140,7 +145,8 @@ class ChatMessageHandler(handler.MessageHandler): query.session.using_conversation.messages.extend(query.resp_messages) except Exception as e: - self.ap.logger.error(f'Conversation({query.query_id}) Request Failed: {type(e).__name__} {str(e)}') + error_info = f'{type(e).__name__} {str(e)}' + self.ap.logger.error(f'Conversation({query.query_id}) Request Failed: {error_info}') traceback.print_exc() hide_exception_info = query.pipeline_config['output']['misc']['hide-exception'] @@ -153,5 +159,47 @@ class ChatMessageHandler(handler.MessageHandler): debug_notice=traceback.format_exc(), ) finally: - # TODO statistics - pass + # Telemetry reporting: collect minimal per-query execution info and send asynchronously + try: + end_ts = time.time() + duration_ms = None + if 'start_ts' in locals(): + duration_ms = int((end_ts - start_ts) * 1000) + + adapter_name = query.adapter.__class__.__name__ if hasattr(query, 'adapter') else None + runner_name = ( + query.pipeline_config.get('ai', {}).get('runner', {}).get('runner') + if query.pipeline_config + else None + ) + + # Model name if using localagent + model_name = None + try: + if runner_name == 'local-agent' and getattr(query, 'use_llm_model_uuid', None): + m = await self.ap.model_mgr.get_model_by_uuid(query.use_llm_model_uuid) + if m and getattr(m, 'model_entity', None): + model_name = getattr(m.model_entity, 'name', None) + except Exception: + model_name = None + + pipeline_plugins = query.variables.get('_pipeline_bound_plugins', None) + + payload = { + 'query_id': query.query_id, + 'adapter': adapter_name, + 'runner': runner_name, + 'duration_ms': duration_ms, + 'model_name': model_name, + 'version': constants.semantic_version, + 'instance_id': constants.instance_id, + 'pipeline_plugins': pipeline_plugins, + 'error': locals().get('error_info', None), + 'timestamp': datetime.utcnow().isoformat(), + } + + # Send telemetry asynchronously and do not block pipeline via app's telemetry manager + await self.ap.telemetry.start_send_task(payload) + except Exception as ex: + # Ensure telemetry issues do not affect normal flow + self.ap.logger.warning(f'Failed to send telemetry: {ex}') diff --git a/src/langbot/pkg/provider/modelmgr/modelmgr.py b/src/langbot/pkg/provider/modelmgr/modelmgr.py index b24bff77..41b401de 100644 --- a/src/langbot/pkg/provider/modelmgr/modelmgr.py +++ b/src/langbot/pkg/provider/modelmgr/modelmgr.py @@ -9,6 +9,7 @@ from ...discover import engine from . import token from ...entity.persistence import model as persistence_model from ...entity.errors import provider as provider_errors +from async_lru import alru_cache class ModelManager: @@ -349,6 +350,7 @@ class ModelManager: await self.load_embedding_model_with_provider(model_entity, provider_entity) + @alru_cache(ttl=60 * 5) async def get_model_by_uuid(self, uuid: str) -> requester.RuntimeLLMModel: """Get LLM model by uuid""" for model in self.llm_models: @@ -356,6 +358,7 @@ class ModelManager: return model raise ValueError(f'LLM model {uuid} not found') + @alru_cache(ttl=60 * 5) async def get_embedding_model_by_uuid(self, uuid: str) -> requester.RuntimeEmbeddingModel: """Get embedding model by uuid""" for model in self.embedding_models: diff --git a/src/langbot/pkg/telemetry/__init__.py b/src/langbot/pkg/telemetry/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/src/langbot/pkg/telemetry/telemetry.py b/src/langbot/pkg/telemetry/telemetry.py new file mode 100644 index 00000000..4d38aa83 --- /dev/null +++ b/src/langbot/pkg/telemetry/telemetry.py @@ -0,0 +1,120 @@ +from __future__ import annotations + +import asyncio +import httpx + + +class TelemetryManager: + """TelemetryManager handles sending telemetry for a given application instance. + + Usage: + telemetry = TelemetryManager(ap) + await telemetry.send({ ... }) + """ + + send_tasks: list[asyncio.Task] = [] + + def __init__(self, ap): + self.ap = ap + + self.telemetry_config = {} + + async def initialize(self): + self.telemetry_config = self.ap.instance_config.data.get('space', {}) + + async def start_send_task(self, payload: dict): + task = asyncio.create_task(self.send(payload)) + self.send_tasks.append(task) + + async def send(self, payload: dict): + """Send telemetry payload to configured telemetry server (non-blocking). + + Expects ap.instance_config.data.telemetry to have: + - enabled: bool + - server: str (base URL, e.g. https://space.example.com) + - timeout_seconds: optional int, overall request timeout (default 10) + + Posts to {server.rstrip('/')}/api/v1/telemetry as JSON. Failures are logged but do not raise. + """ + + try: + cfg = self.telemetry_config + if not cfg: + return + if cfg.get('disable_telemetry', False): + return + server = cfg.get('url', '') + if not server: + return + + # Normalize URL + url = server.rstrip('/') + '/api/v1/telemetry' + + try: + # Sanitize payload so string fields are strings and not nulls + sanitized = dict(payload) + if 'query_id' in sanitized: + try: + sanitized['query_id'] = '' if sanitized['query_id'] is None else str(sanitized['query_id']) + except Exception: + sanitized['query_id'] = str(sanitized.get('query_id', '')) + + for sfield in ('adapter', 'runner', 'model_name', 'version', 'error', 'timestamp'): + v = sanitized.get(sfield) + sanitized[sfield] = '' if v is None else str(v) + + if 'duration_ms' in sanitized: + try: + sanitized['duration_ms'] = ( + int(sanitized['duration_ms']) if sanitized['duration_ms'] is not None else 0 + ) + except Exception: + sanitized['duration_ms'] = 0 + + async with httpx.AsyncClient(timeout=httpx.Timeout(10)) as client: + try: + # Use asyncio.wait_for to ensure we always bound the total time + resp = await asyncio.wait_for(client.post(url, json=sanitized), timeout=10 + 1) + + if resp.status_code >= 400: + self.ap.logger.warning( + f'Telemetry post to {url} returned status {resp.status_code} - {resp.text}' + ) + else: + # Detect application-level errors inside HTTP 200 responses + app_err = False + try: + j = resp.json() + if isinstance(j, dict) and j.get('code') is not None and int(j.get('code')) >= 400: + app_err = True + self.ap.logger.warning( + f'Telemetry post to {url} returned application error code {j.get("code")} - {j.get("msg")}' + ) + except Exception: + pass + + if app_err: + self.ap.logger.warning( + f'Telemetry post to {url} returned app-level error - response: {resp.text[:200]}' + ) + else: + self.ap.logger.debug( + f'Telemetry posted to {url}, status {resp.status_code} - response: {resp.text[:200]}' + ) + except asyncio.TimeoutError: + self.ap.logger.warning(f'Telemetry post to {url} timed out') + except Exception as e: + self.ap.logger.warning(f'Failed to post telemetry to {url}: {e}', exc_info=True) + except Exception as e: + try: + self.ap.logger.warning( + f'Failed to create HTTP client for telemetry or sanitize payload: {e}', exc_info=True + ) + except Exception: + pass + except Exception as e: + # Never raise from telemetry; surface as warning for visibility + try: + self.ap.logger.warning(f'Unexpected telemetry error: {e}', exc_info=True) + except Exception: + pass diff --git a/src/langbot/templates/config.yaml b/src/langbot/templates/config.yaml index e1f7d576..590102ed 100644 --- a/src/langbot/templates/config.yaml +++ b/src/langbot/templates/config.yaml @@ -78,3 +78,4 @@ space: # OAuth authorization page URL (user will be redirected here) oauth_authorize_url: 'https://space.langbot.app/auth/authorize' disable_models_service: false + disable_telemetry: false