From dd96da895c0e60b5efbf29e3ea4102b251d72792 Mon Sep 17 00:00:00 2001 From: RockChinQ Date: Fri, 12 Jun 2026 08:11:43 -0400 Subject: [PATCH] feat(telemetry): payload v2 with feature usage counters and instance heartbeat Per-query events now carry event_type='query' and a features JSON object: - tool_calls by source (native/plugin/mcp/skill) via ToolManager - tool_call_rounds, kb usage (count/engine plugins/retrieved entries) via local-agent - sandbox execs/errors via BoxService - activated_skills and bound mcp_servers snapshots New instance_heartbeat event (startup + daily) reports anonymous instance profile: deploy platform, database/vdb kind, box backend/availability, adapter type names, and resource counts. Respects space.disable_telemetry. All collection helpers are defensive and never break the pipeline. Verified: ruff, 37 telemetry unit tests (13 new), 504 box/provider/pipeline tests. --- src/langbot/pkg/box/service.py | 3 + src/langbot/pkg/core/app.py | 11 ++ .../pkg/pipeline/process/handlers/chat.py | 7 + .../pkg/provider/runners/localagent.py | 21 +++ src/langbot/pkg/provider/tools/toolmgr.py | 6 + src/langbot/pkg/telemetry/features.py | 102 ++++++++++++++ src/langbot/pkg/telemetry/heartbeat.py | 131 ++++++++++++++++++ src/langbot/pkg/telemetry/telemetry.py | 11 ++ tests/unit_tests/telemetry/test_features.py | 92 ++++++++++++ tests/unit_tests/telemetry/test_heartbeat.py | 104 ++++++++++++++ 10 files changed, 488 insertions(+) create mode 100644 src/langbot/pkg/telemetry/features.py create mode 100644 src/langbot/pkg/telemetry/heartbeat.py create mode 100644 tests/unit_tests/telemetry/test_features.py create mode 100644 tests/unit_tests/telemetry/test_heartbeat.py diff --git a/src/langbot/pkg/box/service.py b/src/langbot/pkg/box/service.py index 65844de0..0eaa0973 100644 --- a/src/langbot/pkg/box/service.py +++ b/src/langbot/pkg/box/service.py @@ -12,6 +12,7 @@ import pydantic from langbot_plugin.box.client import BoxRuntimeClient from .connector import BoxRuntimeConnector, _get_box_config +from ..telemetry import features as telemetry_features from langbot_plugin.box.errors import BoxError, BoxValidationError from langbot_plugin.box.models import ( BUILTIN_PROFILES, @@ -218,6 +219,7 @@ class BoxService: f'query_id={query.query_id} ' f'summary={json.dumps(self._summarize_result(result), ensure_ascii=False)}' ) + telemetry_features.increment(query, 'sandbox', 'execs') return self._serialize_result(result) def resolve_box_session_id(self, query: pipeline_query.Query) -> str: @@ -785,6 +787,7 @@ class BoxService: # ── Observability ───────────────────────────────────────────────── def _record_error(self, exc: Exception, query: pipeline_query.Query): + telemetry_features.increment(query, 'sandbox', 'errors') self._recent_errors.append( { 'timestamp': _dt.datetime.now(_UTC).isoformat(), diff --git a/src/langbot/pkg/core/app.py b/src/langbot/pkg/core/app.py index 6e91c2b0..b0adb559 100644 --- a/src/langbot/pkg/core/app.py +++ b/src/langbot/pkg/core/app.py @@ -200,6 +200,17 @@ class Application: scopes=[core_entities.LifecycleControlScope.APPLICATION], ) + # Telemetry instance heartbeat (startup + daily); respects + # space.disable_telemetry via TelemetryManager.send(). + if self.telemetry is not None: + from ..telemetry import heartbeat as telemetry_heartbeat + + self.task_mgr.create_task( + telemetry_heartbeat.heartbeat_loop(self), + name='telemetry-heartbeat', + scopes=[core_entities.LifecycleControlScope.APPLICATION], + ) + # Start monitoring data cleanup task if enabled monitoring_cfg = self.instance_config.data.get('monitoring', {}) auto_cleanup_cfg = monitoring_cfg.get('auto_cleanup', {}) diff --git a/src/langbot/pkg/pipeline/process/handlers/chat.py b/src/langbot/pkg/pipeline/process/handlers/chat.py index c81461fd..2e2e7f28 100644 --- a/src/langbot/pkg/pipeline/process/handlers/chat.py +++ b/src/langbot/pkg/pipeline/process/handlers/chat.py @@ -13,6 +13,7 @@ from ....provider import runner as runner_module import langbot_plugin.api.entities.events as events from ....utils import importutil, constants, runner as runner_utils +from ....telemetry import features as telemetry_features from ....provider import runners import langbot_plugin.api.entities.builtin.provider.session as provider_session import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query @@ -201,7 +202,12 @@ class ChatMessageHandler(handler.MessageHandler): runner_name, runner, query.pipeline_config ) + # Feature usage collected during query processing (tool calls, + # knowledge base usage, sandbox executions, activated skills, ...) + features = telemetry_features.collect_features(query) + payload = { + 'event_type': 'query', 'query_id': query.query_id, 'adapter': adapter_name, 'runner': runner_name, @@ -212,6 +218,7 @@ class ChatMessageHandler(handler.MessageHandler): 'instance_id': constants.instance_id, 'edition': constants.edition, 'pipeline_plugins': pipeline_plugins, + 'features': features, 'error': locals().get('error_info', None), 'timestamp': datetime.utcnow().isoformat(), } diff --git a/src/langbot/pkg/provider/runners/localagent.py b/src/langbot/pkg/provider/runners/localagent.py index 28d014d0..710bce96 100644 --- a/src/langbot/pkg/provider/runners/localagent.py +++ b/src/langbot/pkg/provider/runners/localagent.py @@ -4,6 +4,7 @@ import json import copy import typing from .. import runner +from ...telemetry import features as telemetry_features from ..modelmgr import requester as modelmgr_requester from ..tools.loaders.native import EXEC_TOOL_NAME import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query @@ -187,6 +188,8 @@ class LocalAgentRunner(runner.RequestRunner): # only support text for now all_results: list[rag_context.RetrievalResultEntry] = [] + kb_engine_plugins: set[str] = set() + # Retrieve from each knowledge base for kb_uuid in kb_uuids: kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid) @@ -195,6 +198,12 @@ class LocalAgentRunner(runner.RequestRunner): self.ap.logger.warning(f'Knowledge base {kb_uuid} not found, skipping') continue + try: + engine_plugin_id = kb.get_knowledge_engine_plugin_id() or 'builtin' + except Exception: + engine_plugin_id = 'builtin' + kb_engine_plugins.add(engine_plugin_id) + result = await kb.retrieve( user_message_text, settings={ @@ -207,6 +216,17 @@ class LocalAgentRunner(runner.RequestRunner): if result: all_results.extend(result) + # Telemetry: knowledge base usage (counts and engine categories only) + telemetry_features.set_value( + query, + 'kb', + { + 'kb_count': len(kb_uuids), + 'engine_plugins': sorted(kb_engine_plugins), + 'retrieved_entries': len(all_results), + }, + ) + # Rerank step: re-score results using a rerank model if configured local_agent_config = query.pipeline_config.get('ai', {}).get('local-agent', {}) rerank_model_uuid = local_agent_config.get('rerank-model', '') @@ -373,6 +393,7 @@ class LocalAgentRunner(runner.RequestRunner): tool_call_round = 0 while pending_tool_calls: tool_call_round += 1 + telemetry_features.set_value(query, 'tool_call_rounds', tool_call_round) if tool_call_round > MAX_TOOL_CALL_ROUNDS: self.ap.logger.warning( f'Tool-call loop reached the {MAX_TOOL_CALL_ROUNDS}-round cap ' diff --git a/src/langbot/pkg/provider/tools/toolmgr.py b/src/langbot/pkg/provider/tools/toolmgr.py index 5c510fcd..16177a0b 100644 --- a/src/langbot/pkg/provider/tools/toolmgr.py +++ b/src/langbot/pkg/provider/tools/toolmgr.py @@ -97,13 +97,19 @@ class ToolManager: return tools async def execute_func_call(self, name: str, parameters: dict, query: pipeline_query.Query) -> typing.Any: + from langbot.pkg.telemetry import features as telemetry_features + if await self.native_tool_loader.has_tool(name): + telemetry_features.increment(query, 'tool_calls', 'native') return await self.native_tool_loader.invoke_tool(name, parameters, query) if await self.plugin_tool_loader.has_tool(name): + telemetry_features.increment(query, 'tool_calls', 'plugin') return await self.plugin_tool_loader.invoke_tool(name, parameters, query) if await self.mcp_tool_loader.has_tool(name): + telemetry_features.increment(query, 'tool_calls', 'mcp') return await self.mcp_tool_loader.invoke_tool(name, parameters, query) if await self.skill_tool_loader.has_tool(name): + telemetry_features.increment(query, 'tool_calls', 'skill') return await self.skill_tool_loader.invoke_tool(name, parameters, query) raise ValueError(f'未找到工具: {name}') diff --git a/src/langbot/pkg/telemetry/features.py b/src/langbot/pkg/telemetry/features.py new file mode 100644 index 00000000..ab66f988 --- /dev/null +++ b/src/langbot/pkg/telemetry/features.py @@ -0,0 +1,102 @@ +"""Per-query telemetry feature counters. + +Collects anonymous, content-free usage signals (tool call counts, knowledge +base usage, sandbox executions, ...) into ``query.variables`` during query +processing. The chat handler reads the accumulated dict when building the +telemetry payload and ships it as the ``features`` JSON object. + +Every helper here is defensive: telemetry must NEVER break the pipeline, so +all mutations are wrapped and failures are silently ignored. +""" + +from __future__ import annotations + +import typing + +if typing.TYPE_CHECKING: + import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query + + +FEATURES_KEY = '_telemetry_features' + + +def get_features(query: pipeline_query.Query) -> dict: + """Return the mutable features dict for this query, creating it if needed.""" + try: + return query.variables.setdefault(FEATURES_KEY, {}) + except Exception: + return {} + + +def increment(query: pipeline_query.Query, group: str, key: str | None = None, amount: int = 1) -> None: + """Increment a counter. + + ``increment(q, 'sandbox', 'execs')`` -> features['sandbox']['execs'] += 1 + ``increment(q, 'tool_call_rounds')`` -> features['tool_call_rounds'] += 1 + """ + try: + features = get_features(query) + if key is None: + features[group] = int(features.get(group, 0)) + amount + else: + nested = features.setdefault(group, {}) + if isinstance(nested, dict): + nested[key] = int(nested.get(key, 0)) + amount + except Exception: + pass + + +def set_value(query: pipeline_query.Query, group: str, value: typing.Any) -> None: + """Set a feature value (overwrites).""" + try: + get_features(query)[group] = value + except Exception: + pass + + +def collect_features(query: pipeline_query.Query) -> dict: + """Build the final ``features`` object for the telemetry payload. + + Combines the counters accumulated during processing with end-of-query + snapshots (activated skills, bound MCP servers). Returns a plain dict + that must be JSON-serializable; non-serializable values are dropped. + """ + features: dict = {} + try: + accumulated = query.variables.get(FEATURES_KEY) + if isinstance(accumulated, dict): + features.update(accumulated) + except Exception: + pass + + # Activated skills (names only, registered by the activate tool) + try: + activated = query.variables.get('_activated_skills', {}) + if isinstance(activated, dict) and activated: + features['activated_skills'] = sorted(activated.keys()) + except Exception: + pass + + # MCP servers bound to the pipeline (names only; None means "all enabled") + try: + bound_mcp = query.variables.get('_pipeline_bound_mcp_servers', None) + if bound_mcp is not None: + features['mcp_servers'] = list(bound_mcp) + except Exception: + pass + + # Drop anything that is not JSON-serializable + import json + + try: + json.dumps(features) + return features + except Exception: + safe: dict = {} + for k, v in features.items(): + try: + json.dumps({k: v}) + safe[k] = v + except Exception: + continue + return safe diff --git a/src/langbot/pkg/telemetry/heartbeat.py b/src/langbot/pkg/telemetry/heartbeat.py new file mode 100644 index 00000000..dd2a58d3 --- /dev/null +++ b/src/langbot/pkg/telemetry/heartbeat.py @@ -0,0 +1,131 @@ +"""Instance heartbeat telemetry. + +Sends a periodic (startup + daily) anonymous snapshot of the instance's +configuration profile so feature *adoption* can be measured separately from +feature *usage* (which is covered by per-query telemetry). + +The snapshot contains only configuration categories and object counts — +never names of user resources (except adapter type names, which are LangBot +adapter identifiers, not account info), never message content, never +credentials. +""" + +from __future__ import annotations + +import asyncio +import typing +from datetime import datetime, timezone + +import sqlalchemy + +from ..utils import constants, platform as platform_utils + +if typing.TYPE_CHECKING: + from ..core import app as core_app + + +HEARTBEAT_INTERVAL_SECONDS = 24 * 3600 + + +async def _count(ap: core_app.Application, table) -> int: + """Count rows in a persistence table; -1 when unavailable.""" + try: + result = await ap.persistence_mgr.execute_async(sqlalchemy.select(sqlalchemy.func.count()).select_from(table)) + return int(result.scalar() or 0) + except Exception: + return -1 + + +async def build_heartbeat_payload(ap: core_app.Application) -> dict: + """Collect the anonymous instance profile snapshot.""" + from ..entity.persistence import bot as persistence_bot + from ..entity.persistence import mcp as persistence_mcp + from ..entity.persistence import pipeline as persistence_pipeline + from ..entity.persistence import rag as persistence_rag + + config = ap.instance_config.data if ap.instance_config else {} + + features: dict = { + 'deploy_platform': platform_utils.get_platform(), + 'database': config.get('database', {}).get('use', 'sqlite'), + 'vdb': config.get('vdb', {}).get('use', 'chroma'), + } + + # Box / sandbox profile + try: + box_service = getattr(ap, 'box_service', None) + if box_service is not None: + box_info: dict = { + 'enabled': bool(box_service.enabled), + 'available': bool(box_service.available), + } + box_cfg = config.get('box', {}) + box_info['backend'] = box_cfg.get('backend', 'local') + try: + box_info['shares_fs'] = bool(box_service.shares_filesystem_with_box) + except Exception: + pass + features['box'] = box_info + except Exception: + pass + + # Bots / adapters (adapter type names only) + try: + platform_mgr = getattr(ap, 'platform_mgr', None) + if platform_mgr is not None and getattr(platform_mgr, 'bots', None) is not None: + enabled_bots = [bot for bot in platform_mgr.bots if getattr(bot, 'enable', False)] + features['bot_count'] = len(platform_mgr.bots) + adapters = sorted({bot.adapter.__class__.__name__ for bot in enabled_bots if getattr(bot, 'adapter', None)}) + features['adapters'] = adapters + except Exception: + pass + + # Resource counts + features['pipeline_count'] = await _count(ap, persistence_pipeline.LegacyPipeline) + features['mcp_server_count'] = await _count(ap, persistence_mcp.MCPServer) + features['knowledge_base_count'] = await _count(ap, persistence_rag.KnowledgeBase) + if 'bot_count' not in features: + features['bot_count'] = await _count(ap, persistence_bot.Bot) + + # Plugin count (from plugin runtime) + try: + plugin_connector = getattr(ap, 'plugin_connector', None) + if plugin_connector is not None: + plugins = await plugin_connector.list_plugins() + features['plugin_count'] = len(plugins) + except Exception: + features['plugin_count'] = -1 + + # Skill count (from Box runtime via skill manager) + try: + skill_mgr = getattr(ap, 'skill_mgr', None) + if skill_mgr is not None and getattr(skill_mgr, 'skills', None) is not None: + features['skill_count'] = len(skill_mgr.skills) + except Exception: + pass + + return { + 'event_type': 'instance_heartbeat', + 'query_id': '', + 'version': constants.semantic_version, + 'instance_id': constants.instance_id, + 'edition': constants.edition, + 'features': features, + 'timestamp': datetime.now(timezone.utc).isoformat(), + } + + +async def heartbeat_loop(ap: core_app.Application) -> None: + """Send one heartbeat shortly after startup, then daily.""" + # Small delay so managers (platform, skills, plugins) finish loading first + await asyncio.sleep(30) + while True: + try: + payload = await build_heartbeat_payload(ap) + await ap.telemetry.start_send_task(payload) + except Exception as e: + try: + ap.logger.debug(f'Telemetry heartbeat failed: {e}') + except Exception: + pass + await asyncio.sleep(HEARTBEAT_INTERVAL_SECONDS) diff --git a/src/langbot/pkg/telemetry/telemetry.py b/src/langbot/pkg/telemetry/telemetry.py index 04152901..2948e268 100644 --- a/src/langbot/pkg/telemetry/telemetry.py +++ b/src/langbot/pkg/telemetry/telemetry.py @@ -68,10 +68,21 @@ class TelemetryManager: 'edition', 'error', 'timestamp', + 'event_type', ): + if sfield not in sanitized: + continue v = sanitized.get(sfield) sanitized[sfield] = '' if v is None else str(v) + # event_type defaults to 'query' for backward compatibility + if not sanitized.get('event_type'): + sanitized['event_type'] = 'query' + + # features must be a JSON object + if 'features' in sanitized and not isinstance(sanitized['features'], dict): + sanitized['features'] = {} + if 'duration_ms' in sanitized: try: sanitized['duration_ms'] = ( diff --git a/tests/unit_tests/telemetry/test_features.py b/tests/unit_tests/telemetry/test_features.py new file mode 100644 index 00000000..6e19d082 --- /dev/null +++ b/tests/unit_tests/telemetry/test_features.py @@ -0,0 +1,92 @@ +"""Unit tests for telemetry feature counters (pkg/telemetry/features.py).""" + +from __future__ import annotations + +from importlib import import_module + + +def get_features_module(): + return import_module('langbot.pkg.telemetry.features') + + +class FakeQuery: + def __init__(self): + self.variables = {} + + +class TestIncrement: + def test_increment_nested_counter(self): + features = get_features_module() + q = FakeQuery() + features.increment(q, 'tool_calls', 'native') + features.increment(q, 'tool_calls', 'native') + features.increment(q, 'tool_calls', 'mcp') + assert q.variables[features.FEATURES_KEY]['tool_calls'] == {'native': 2, 'mcp': 1} + + def test_increment_flat_counter(self): + features = get_features_module() + q = FakeQuery() + features.increment(q, 'something') + features.increment(q, 'something', amount=2) + assert q.variables[features.FEATURES_KEY]['something'] == 3 + + def test_increment_never_raises_on_broken_query(self): + features = get_features_module() + + class Broken: + @property + def variables(self): + raise RuntimeError('boom') + + # Must not raise + features.increment(Broken(), 'tool_calls', 'native') + + def test_set_value(self): + features = get_features_module() + q = FakeQuery() + features.set_value(q, 'tool_call_rounds', 5) + assert q.variables[features.FEATURES_KEY]['tool_call_rounds'] == 5 + + +class TestCollectFeatures: + def test_collect_empty(self): + features = get_features_module() + q = FakeQuery() + assert features.collect_features(q) == {} + + def test_collect_combines_counters_and_snapshots(self): + features = get_features_module() + q = FakeQuery() + features.increment(q, 'sandbox', 'execs') + features.set_value(q, 'kb', {'kb_count': 2, 'engine_plugins': ['builtin'], 'retrieved_entries': 7}) + q.variables['_activated_skills'] = {'pdf-tools': {}, 'a-skill': {}} + q.variables['_pipeline_bound_mcp_servers'] = ['srv1', 'srv2'] + + result = features.collect_features(q) + assert result['sandbox'] == {'execs': 1} + assert result['kb']['kb_count'] == 2 + assert result['activated_skills'] == ['a-skill', 'pdf-tools'] # sorted + assert result['mcp_servers'] == ['srv1', 'srv2'] + + def test_collect_omits_mcp_when_all_enabled(self): + """None means 'all enabled' and is not reported.""" + features = get_features_module() + q = FakeQuery() + q.variables['_pipeline_bound_mcp_servers'] = None + assert 'mcp_servers' not in features.collect_features(q) + + def test_collect_drops_non_json_serializable(self): + features = get_features_module() + q = FakeQuery() + features.set_value(q, 'good', 1) + features.set_value(q, 'bad', object()) + result = features.collect_features(q) + assert result == {'good': 1} + + def test_collect_is_json_serializable(self): + import json + + features = get_features_module() + q = FakeQuery() + features.increment(q, 'tool_calls', 'skill') + json.dumps(features.collect_features(q)) diff --git a/tests/unit_tests/telemetry/test_heartbeat.py b/tests/unit_tests/telemetry/test_heartbeat.py new file mode 100644 index 00000000..a5bd2e32 --- /dev/null +++ b/tests/unit_tests/telemetry/test_heartbeat.py @@ -0,0 +1,104 @@ +"""Unit tests for telemetry heartbeat payload (pkg/telemetry/heartbeat.py).""" + +from __future__ import annotations + +import json + +import pytest +from unittest.mock import AsyncMock, Mock +from importlib import import_module + + +def get_heartbeat_module(): + return import_module('langbot.pkg.telemetry.heartbeat') + + +def make_app(): + ap = Mock() + ap.instance_config = Mock() + ap.instance_config.data = { + 'database': {'use': 'postgresql'}, + 'vdb': {'use': 'chroma'}, + 'box': {'enabled': True, 'backend': 'nsjail'}, + } + + # persistence counts + result = Mock() + result.scalar.return_value = 3 + ap.persistence_mgr = Mock() + ap.persistence_mgr.execute_async = AsyncMock(return_value=result) + + # box service + ap.box_service = Mock() + ap.box_service.enabled = True + ap.box_service.available = False + ap.box_service.shares_filesystem_with_box = False + + # platform manager with one enabled bot + bot = Mock() + bot.enable = True + bot.adapter = Mock() + bot.adapter.__class__.__name__ = 'TelegramAdapter' + ap.platform_mgr = Mock() + ap.platform_mgr.bots = [bot] + + # plugin connector + ap.plugin_connector = Mock() + ap.plugin_connector.list_plugins = AsyncMock(return_value=[{}, {}]) + + # skills + ap.skill_mgr = Mock() + ap.skill_mgr.skills = {'a': {}, 'b': {}, 'c': {}} + + return ap + + +class TestBuildHeartbeatPayload: + @pytest.mark.asyncio + async def test_payload_shape(self): + heartbeat = get_heartbeat_module() + ap = make_app() + payload = await heartbeat.build_heartbeat_payload(ap) + + assert payload['event_type'] == 'instance_heartbeat' + assert payload['query_id'] == '' + assert 'timestamp' in payload + f = payload['features'] + assert f['database'] == 'postgresql' + assert f['vdb'] == 'chroma' + assert f['box'] == { + 'enabled': True, + 'available': False, + 'backend': 'nsjail', + 'shares_fs': False, + } + assert f['adapters'] == ['TelegramAdapter'] + assert f['bot_count'] == 1 + assert f['plugin_count'] == 2 + assert f['skill_count'] == 3 + assert f['pipeline_count'] == 3 + assert f['mcp_server_count'] == 3 + assert f['knowledge_base_count'] == 3 + + @pytest.mark.asyncio + async def test_payload_is_json_serializable(self): + heartbeat = get_heartbeat_module() + payload = await heartbeat.build_heartbeat_payload(make_app()) + json.dumps(payload) + + @pytest.mark.asyncio + async def test_count_failure_yields_minus_one(self): + heartbeat = get_heartbeat_module() + ap = make_app() + ap.persistence_mgr.execute_async = AsyncMock(side_effect=RuntimeError('db down')) + payload = await heartbeat.build_heartbeat_payload(ap) + assert payload['features']['pipeline_count'] == -1 + + @pytest.mark.asyncio + async def test_no_user_content_fields(self): + """The heartbeat must never carry message content / credentials keys.""" + heartbeat = get_heartbeat_module() + payload = await heartbeat.build_heartbeat_payload(make_app()) + flat = json.dumps(payload).lower() + for forbidden in ('api_key', 'password', 'token', 'message_content'): + assert forbidden not in flat