mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-27 16:04:21 +00:00
feat: add telemetry support for query execution tracking and configur… (#1900)
* feat: add telemetry support for query execution tracking and configuration * feat: integrate telemetry manager and enable telemetry data sending * feat: integrate telemetry manager and enhance error handling for telemetry sending * feat: update telemetry configuration to use 'space' instead of 'telemetry' and adjust related parameters * feat: integrate telemetry manager and enable telemetry data sending * feat: integrate telemetry manager and enhance error handling for telemetry sending * feat: add instance id * feat: enhance telemetry management with asynchronous task handling and improve model retrieval caching --------- Co-authored-by: Junyan Qin <rockchinq@gmail.com>
This commit is contained in:
@@ -36,6 +36,7 @@ from . import taskmgr
|
|||||||
from . import entities as core_entities
|
from . import entities as core_entities
|
||||||
from ..rag.knowledge import kbmgr as rag_mgr
|
from ..rag.knowledge import kbmgr as rag_mgr
|
||||||
from ..vector import mgr as vectordb_mgr
|
from ..vector import mgr as vectordb_mgr
|
||||||
|
from ..telemetry import telemetry as telemetry_module
|
||||||
|
|
||||||
|
|
||||||
class Application:
|
class Application:
|
||||||
@@ -140,6 +141,8 @@ class Application:
|
|||||||
|
|
||||||
webhook_service: webhook_service.WebhookService = None
|
webhook_service: webhook_service.WebhookService = None
|
||||||
|
|
||||||
|
telemetry: telemetry_module.TelemetryManager = None
|
||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
pass
|
pass
|
||||||
|
|
||||||
|
|||||||
@@ -31,6 +31,8 @@ from ...storage import mgr as storagemgr
|
|||||||
from ...utils import logcache
|
from ...utils import logcache
|
||||||
from ...vector import mgr as vectordb_mgr
|
from ...vector import mgr as vectordb_mgr
|
||||||
from .. import taskmgr
|
from .. import taskmgr
|
||||||
|
from ...telemetry import telemetry as telemetry_module
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
@stage.stage_class('BuildAppStage')
|
@stage.stage_class('BuildAppStage')
|
||||||
@@ -102,6 +104,11 @@ class BuildAppStage(stage.BootingStage):
|
|||||||
ap.persistence_mgr = persistence_mgr_inst
|
ap.persistence_mgr = persistence_mgr_inst
|
||||||
await persistence_mgr_inst.initialize()
|
await persistence_mgr_inst.initialize()
|
||||||
|
|
||||||
|
# Telemetry manager: attach to app so other components can call via self.ap.telemetry
|
||||||
|
telemetry_inst = telemetry_module.TelemetryManager(ap)
|
||||||
|
await telemetry_inst.initialize()
|
||||||
|
ap.telemetry = telemetry_inst
|
||||||
|
|
||||||
cmd_mgr_inst = cmdmgr.CommandManager(ap)
|
cmd_mgr_inst = cmdmgr.CommandManager(ap)
|
||||||
await cmd_mgr_inst.initialize()
|
await cmd_mgr_inst.initialize()
|
||||||
ap.cmd_mgr = cmd_mgr_inst
|
ap.cmd_mgr = cmd_mgr_inst
|
||||||
|
|||||||
@@ -3,6 +3,8 @@ from __future__ import annotations
|
|||||||
import uuid
|
import uuid
|
||||||
import typing
|
import typing
|
||||||
import traceback
|
import traceback
|
||||||
|
import time
|
||||||
|
from datetime import datetime
|
||||||
|
|
||||||
|
|
||||||
from .. import handler
|
from .. import handler
|
||||||
@@ -10,7 +12,7 @@ from ... import entities
|
|||||||
from ....provider import runner as runner_module
|
from ....provider import runner as runner_module
|
||||||
|
|
||||||
import langbot_plugin.api.entities.events as events
|
import langbot_plugin.api.entities.events as events
|
||||||
from ....utils import importutil
|
from ....utils import importutil, constants
|
||||||
from ....provider import runners
|
from ....provider import runners
|
||||||
import langbot_plugin.api.entities.builtin.provider.session as provider_session
|
import langbot_plugin.api.entities.builtin.provider.session as provider_session
|
||||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||||
@@ -84,6 +86,9 @@ class ChatMessageHandler(handler.MessageHandler):
|
|||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
raise ValueError(f'Request Runner not found: {query.pipeline_config["ai"]["runner"]["runner"]}')
|
raise ValueError(f'Request Runner not found: {query.pipeline_config["ai"]["runner"]["runner"]}')
|
||||||
|
# Mark start time for telemetry
|
||||||
|
start_ts = time.time()
|
||||||
|
|
||||||
if is_stream:
|
if is_stream:
|
||||||
resp_message_id = uuid.uuid4()
|
resp_message_id = uuid.uuid4()
|
||||||
chunk_count = 0 # Track streaming chunks to reduce excessive logging
|
chunk_count = 0 # Track streaming chunks to reduce excessive logging
|
||||||
@@ -140,7 +145,8 @@ class ChatMessageHandler(handler.MessageHandler):
|
|||||||
|
|
||||||
query.session.using_conversation.messages.extend(query.resp_messages)
|
query.session.using_conversation.messages.extend(query.resp_messages)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
self.ap.logger.error(f'Conversation({query.query_id}) Request Failed: {type(e).__name__} {str(e)}')
|
error_info = f'{type(e).__name__} {str(e)}'
|
||||||
|
self.ap.logger.error(f'Conversation({query.query_id}) Request Failed: {error_info}')
|
||||||
traceback.print_exc()
|
traceback.print_exc()
|
||||||
|
|
||||||
hide_exception_info = query.pipeline_config['output']['misc']['hide-exception']
|
hide_exception_info = query.pipeline_config['output']['misc']['hide-exception']
|
||||||
@@ -153,5 +159,47 @@ class ChatMessageHandler(handler.MessageHandler):
|
|||||||
debug_notice=traceback.format_exc(),
|
debug_notice=traceback.format_exc(),
|
||||||
)
|
)
|
||||||
finally:
|
finally:
|
||||||
# TODO statistics
|
# Telemetry reporting: collect minimal per-query execution info and send asynchronously
|
||||||
pass
|
try:
|
||||||
|
end_ts = time.time()
|
||||||
|
duration_ms = None
|
||||||
|
if 'start_ts' in locals():
|
||||||
|
duration_ms = int((end_ts - start_ts) * 1000)
|
||||||
|
|
||||||
|
adapter_name = query.adapter.__class__.__name__ if hasattr(query, 'adapter') else None
|
||||||
|
runner_name = (
|
||||||
|
query.pipeline_config.get('ai', {}).get('runner', {}).get('runner')
|
||||||
|
if query.pipeline_config
|
||||||
|
else None
|
||||||
|
)
|
||||||
|
|
||||||
|
# Model name if using localagent
|
||||||
|
model_name = None
|
||||||
|
try:
|
||||||
|
if runner_name == 'local-agent' and getattr(query, 'use_llm_model_uuid', None):
|
||||||
|
m = await self.ap.model_mgr.get_model_by_uuid(query.use_llm_model_uuid)
|
||||||
|
if m and getattr(m, 'model_entity', None):
|
||||||
|
model_name = getattr(m.model_entity, 'name', None)
|
||||||
|
except Exception:
|
||||||
|
model_name = None
|
||||||
|
|
||||||
|
pipeline_plugins = query.variables.get('_pipeline_bound_plugins', None)
|
||||||
|
|
||||||
|
payload = {
|
||||||
|
'query_id': query.query_id,
|
||||||
|
'adapter': adapter_name,
|
||||||
|
'runner': runner_name,
|
||||||
|
'duration_ms': duration_ms,
|
||||||
|
'model_name': model_name,
|
||||||
|
'version': constants.semantic_version,
|
||||||
|
'instance_id': constants.instance_id,
|
||||||
|
'pipeline_plugins': pipeline_plugins,
|
||||||
|
'error': locals().get('error_info', None),
|
||||||
|
'timestamp': datetime.utcnow().isoformat(),
|
||||||
|
}
|
||||||
|
|
||||||
|
# Send telemetry asynchronously and do not block pipeline via app's telemetry manager
|
||||||
|
await self.ap.telemetry.start_send_task(payload)
|
||||||
|
except Exception as ex:
|
||||||
|
# Ensure telemetry issues do not affect normal flow
|
||||||
|
self.ap.logger.warning(f'Failed to send telemetry: {ex}')
|
||||||
|
|||||||
@@ -9,6 +9,7 @@ from ...discover import engine
|
|||||||
from . import token
|
from . import token
|
||||||
from ...entity.persistence import model as persistence_model
|
from ...entity.persistence import model as persistence_model
|
||||||
from ...entity.errors import provider as provider_errors
|
from ...entity.errors import provider as provider_errors
|
||||||
|
from async_lru import alru_cache
|
||||||
|
|
||||||
|
|
||||||
class ModelManager:
|
class ModelManager:
|
||||||
@@ -349,6 +350,7 @@ class ModelManager:
|
|||||||
|
|
||||||
await self.load_embedding_model_with_provider(model_entity, provider_entity)
|
await self.load_embedding_model_with_provider(model_entity, provider_entity)
|
||||||
|
|
||||||
|
@alru_cache(ttl=60 * 5)
|
||||||
async def get_model_by_uuid(self, uuid: str) -> requester.RuntimeLLMModel:
|
async def get_model_by_uuid(self, uuid: str) -> requester.RuntimeLLMModel:
|
||||||
"""Get LLM model by uuid"""
|
"""Get LLM model by uuid"""
|
||||||
for model in self.llm_models:
|
for model in self.llm_models:
|
||||||
@@ -356,6 +358,7 @@ class ModelManager:
|
|||||||
return model
|
return model
|
||||||
raise ValueError(f'LLM model {uuid} not found')
|
raise ValueError(f'LLM model {uuid} not found')
|
||||||
|
|
||||||
|
@alru_cache(ttl=60 * 5)
|
||||||
async def get_embedding_model_by_uuid(self, uuid: str) -> requester.RuntimeEmbeddingModel:
|
async def get_embedding_model_by_uuid(self, uuid: str) -> requester.RuntimeEmbeddingModel:
|
||||||
"""Get embedding model by uuid"""
|
"""Get embedding model by uuid"""
|
||||||
for model in self.embedding_models:
|
for model in self.embedding_models:
|
||||||
|
|||||||
@@ -0,0 +1,120 @@
|
|||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
|
||||||
|
class TelemetryManager:
|
||||||
|
"""TelemetryManager handles sending telemetry for a given application instance.
|
||||||
|
|
||||||
|
Usage:
|
||||||
|
telemetry = TelemetryManager(ap)
|
||||||
|
await telemetry.send({ ... })
|
||||||
|
"""
|
||||||
|
|
||||||
|
send_tasks: list[asyncio.Task] = []
|
||||||
|
|
||||||
|
def __init__(self, ap):
|
||||||
|
self.ap = ap
|
||||||
|
|
||||||
|
self.telemetry_config = {}
|
||||||
|
|
||||||
|
async def initialize(self):
|
||||||
|
self.telemetry_config = self.ap.instance_config.data.get('space', {})
|
||||||
|
|
||||||
|
async def start_send_task(self, payload: dict):
|
||||||
|
task = asyncio.create_task(self.send(payload))
|
||||||
|
self.send_tasks.append(task)
|
||||||
|
|
||||||
|
async def send(self, payload: dict):
|
||||||
|
"""Send telemetry payload to configured telemetry server (non-blocking).
|
||||||
|
|
||||||
|
Expects ap.instance_config.data.telemetry to have:
|
||||||
|
- enabled: bool
|
||||||
|
- server: str (base URL, e.g. https://space.example.com)
|
||||||
|
- timeout_seconds: optional int, overall request timeout (default 10)
|
||||||
|
|
||||||
|
Posts to {server.rstrip('/')}/api/v1/telemetry as JSON. Failures are logged but do not raise.
|
||||||
|
"""
|
||||||
|
|
||||||
|
try:
|
||||||
|
cfg = self.telemetry_config
|
||||||
|
if not cfg:
|
||||||
|
return
|
||||||
|
if cfg.get('disable_telemetry', False):
|
||||||
|
return
|
||||||
|
server = cfg.get('url', '')
|
||||||
|
if not server:
|
||||||
|
return
|
||||||
|
|
||||||
|
# Normalize URL
|
||||||
|
url = server.rstrip('/') + '/api/v1/telemetry'
|
||||||
|
|
||||||
|
try:
|
||||||
|
# Sanitize payload so string fields are strings and not nulls
|
||||||
|
sanitized = dict(payload)
|
||||||
|
if 'query_id' in sanitized:
|
||||||
|
try:
|
||||||
|
sanitized['query_id'] = '' if sanitized['query_id'] is None else str(sanitized['query_id'])
|
||||||
|
except Exception:
|
||||||
|
sanitized['query_id'] = str(sanitized.get('query_id', ''))
|
||||||
|
|
||||||
|
for sfield in ('adapter', 'runner', 'model_name', 'version', 'error', 'timestamp'):
|
||||||
|
v = sanitized.get(sfield)
|
||||||
|
sanitized[sfield] = '' if v is None else str(v)
|
||||||
|
|
||||||
|
if 'duration_ms' in sanitized:
|
||||||
|
try:
|
||||||
|
sanitized['duration_ms'] = (
|
||||||
|
int(sanitized['duration_ms']) if sanitized['duration_ms'] is not None else 0
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
sanitized['duration_ms'] = 0
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(timeout=httpx.Timeout(10)) as client:
|
||||||
|
try:
|
||||||
|
# Use asyncio.wait_for to ensure we always bound the total time
|
||||||
|
resp = await asyncio.wait_for(client.post(url, json=sanitized), timeout=10 + 1)
|
||||||
|
|
||||||
|
if resp.status_code >= 400:
|
||||||
|
self.ap.logger.warning(
|
||||||
|
f'Telemetry post to {url} returned status {resp.status_code} - {resp.text}'
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
# Detect application-level errors inside HTTP 200 responses
|
||||||
|
app_err = False
|
||||||
|
try:
|
||||||
|
j = resp.json()
|
||||||
|
if isinstance(j, dict) and j.get('code') is not None and int(j.get('code')) >= 400:
|
||||||
|
app_err = True
|
||||||
|
self.ap.logger.warning(
|
||||||
|
f'Telemetry post to {url} returned application error code {j.get("code")} - {j.get("msg")}'
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
if app_err:
|
||||||
|
self.ap.logger.warning(
|
||||||
|
f'Telemetry post to {url} returned app-level error - response: {resp.text[:200]}'
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
self.ap.logger.debug(
|
||||||
|
f'Telemetry posted to {url}, status {resp.status_code} - response: {resp.text[:200]}'
|
||||||
|
)
|
||||||
|
except asyncio.TimeoutError:
|
||||||
|
self.ap.logger.warning(f'Telemetry post to {url} timed out')
|
||||||
|
except Exception as e:
|
||||||
|
self.ap.logger.warning(f'Failed to post telemetry to {url}: {e}', exc_info=True)
|
||||||
|
except Exception as e:
|
||||||
|
try:
|
||||||
|
self.ap.logger.warning(
|
||||||
|
f'Failed to create HTTP client for telemetry or sanitize payload: {e}', exc_info=True
|
||||||
|
)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
except Exception as e:
|
||||||
|
# Never raise from telemetry; surface as warning for visibility
|
||||||
|
try:
|
||||||
|
self.ap.logger.warning(f'Unexpected telemetry error: {e}', exc_info=True)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
@@ -78,3 +78,4 @@ space:
|
|||||||
# OAuth authorization page URL (user will be redirected here)
|
# OAuth authorization page URL (user will be redirected here)
|
||||||
oauth_authorize_url: 'https://space.langbot.app/auth/authorize'
|
oauth_authorize_url: 'https://space.langbot.app/auth/authorize'
|
||||||
disable_models_service: false
|
disable_models_service: false
|
||||||
|
disable_telemetry: false
|
||||||
|
|||||||
Reference in New Issue
Block a user