feat(telemetry): payload v2 with feature usage counters and instance heartbeat

Per-query events now carry event_type='query' and a features JSON object:
- tool_calls by source (native/plugin/mcp/skill) via ToolManager
- tool_call_rounds, kb usage (count/engine plugins/retrieved entries) via local-agent
- sandbox execs/errors via BoxService
- activated_skills and bound mcp_servers snapshots

New instance_heartbeat event (startup + daily) reports anonymous instance
profile: deploy platform, database/vdb kind, box backend/availability,
adapter type names, and resource counts. Respects space.disable_telemetry.

All collection helpers are defensive and never break the pipeline.
Verified: ruff, 37 telemetry unit tests (13 new), 504 box/provider/pipeline tests.
This commit is contained in:
RockChinQ
2026-06-12 08:11:43 -04:00
parent bca710dbd4
commit dd96da895c
10 changed files with 488 additions and 0 deletions
+102
View File
@@ -0,0 +1,102 @@
"""Per-query telemetry feature counters.
Collects anonymous, content-free usage signals (tool call counts, knowledge
base usage, sandbox executions, ...) into ``query.variables`` during query
processing. The chat handler reads the accumulated dict when building the
telemetry payload and ships it as the ``features`` JSON object.
Every helper here is defensive: telemetry must NEVER break the pipeline, so
all mutations are wrapped and failures are silently ignored.
"""
from __future__ import annotations
import typing
if typing.TYPE_CHECKING:
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
FEATURES_KEY = '_telemetry_features'
def get_features(query: pipeline_query.Query) -> dict:
"""Return the mutable features dict for this query, creating it if needed."""
try:
return query.variables.setdefault(FEATURES_KEY, {})
except Exception:
return {}
def increment(query: pipeline_query.Query, group: str, key: str | None = None, amount: int = 1) -> None:
"""Increment a counter.
``increment(q, 'sandbox', 'execs')`` -> features['sandbox']['execs'] += 1
``increment(q, 'tool_call_rounds')`` -> features['tool_call_rounds'] += 1
"""
try:
features = get_features(query)
if key is None:
features[group] = int(features.get(group, 0)) + amount
else:
nested = features.setdefault(group, {})
if isinstance(nested, dict):
nested[key] = int(nested.get(key, 0)) + amount
except Exception:
pass
def set_value(query: pipeline_query.Query, group: str, value: typing.Any) -> None:
"""Set a feature value (overwrites)."""
try:
get_features(query)[group] = value
except Exception:
pass
def collect_features(query: pipeline_query.Query) -> dict:
"""Build the final ``features`` object for the telemetry payload.
Combines the counters accumulated during processing with end-of-query
snapshots (activated skills, bound MCP servers). Returns a plain dict
that must be JSON-serializable; non-serializable values are dropped.
"""
features: dict = {}
try:
accumulated = query.variables.get(FEATURES_KEY)
if isinstance(accumulated, dict):
features.update(accumulated)
except Exception:
pass
# Activated skills (names only, registered by the activate tool)
try:
activated = query.variables.get('_activated_skills', {})
if isinstance(activated, dict) and activated:
features['activated_skills'] = sorted(activated.keys())
except Exception:
pass
# MCP servers bound to the pipeline (names only; None means "all enabled")
try:
bound_mcp = query.variables.get('_pipeline_bound_mcp_servers', None)
if bound_mcp is not None:
features['mcp_servers'] = list(bound_mcp)
except Exception:
pass
# Drop anything that is not JSON-serializable
import json
try:
json.dumps(features)
return features
except Exception:
safe: dict = {}
for k, v in features.items():
try:
json.dumps({k: v})
safe[k] = v
except Exception:
continue
return safe
+131
View File
@@ -0,0 +1,131 @@
"""Instance heartbeat telemetry.
Sends a periodic (startup + daily) anonymous snapshot of the instance's
configuration profile so feature *adoption* can be measured separately from
feature *usage* (which is covered by per-query telemetry).
The snapshot contains only configuration categories and object counts —
never names of user resources (except adapter type names, which are LangBot
adapter identifiers, not account info), never message content, never
credentials.
"""
from __future__ import annotations
import asyncio
import typing
from datetime import datetime, timezone
import sqlalchemy
from ..utils import constants, platform as platform_utils
if typing.TYPE_CHECKING:
from ..core import app as core_app
HEARTBEAT_INTERVAL_SECONDS = 24 * 3600
async def _count(ap: core_app.Application, table) -> int:
"""Count rows in a persistence table; -1 when unavailable."""
try:
result = await ap.persistence_mgr.execute_async(sqlalchemy.select(sqlalchemy.func.count()).select_from(table))
return int(result.scalar() or 0)
except Exception:
return -1
async def build_heartbeat_payload(ap: core_app.Application) -> dict:
"""Collect the anonymous instance profile snapshot."""
from ..entity.persistence import bot as persistence_bot
from ..entity.persistence import mcp as persistence_mcp
from ..entity.persistence import pipeline as persistence_pipeline
from ..entity.persistence import rag as persistence_rag
config = ap.instance_config.data if ap.instance_config else {}
features: dict = {
'deploy_platform': platform_utils.get_platform(),
'database': config.get('database', {}).get('use', 'sqlite'),
'vdb': config.get('vdb', {}).get('use', 'chroma'),
}
# Box / sandbox profile
try:
box_service = getattr(ap, 'box_service', None)
if box_service is not None:
box_info: dict = {
'enabled': bool(box_service.enabled),
'available': bool(box_service.available),
}
box_cfg = config.get('box', {})
box_info['backend'] = box_cfg.get('backend', 'local')
try:
box_info['shares_fs'] = bool(box_service.shares_filesystem_with_box)
except Exception:
pass
features['box'] = box_info
except Exception:
pass
# Bots / adapters (adapter type names only)
try:
platform_mgr = getattr(ap, 'platform_mgr', None)
if platform_mgr is not None and getattr(platform_mgr, 'bots', None) is not None:
enabled_bots = [bot for bot in platform_mgr.bots if getattr(bot, 'enable', False)]
features['bot_count'] = len(platform_mgr.bots)
adapters = sorted({bot.adapter.__class__.__name__ for bot in enabled_bots if getattr(bot, 'adapter', None)})
features['adapters'] = adapters
except Exception:
pass
# Resource counts
features['pipeline_count'] = await _count(ap, persistence_pipeline.LegacyPipeline)
features['mcp_server_count'] = await _count(ap, persistence_mcp.MCPServer)
features['knowledge_base_count'] = await _count(ap, persistence_rag.KnowledgeBase)
if 'bot_count' not in features:
features['bot_count'] = await _count(ap, persistence_bot.Bot)
# Plugin count (from plugin runtime)
try:
plugin_connector = getattr(ap, 'plugin_connector', None)
if plugin_connector is not None:
plugins = await plugin_connector.list_plugins()
features['plugin_count'] = len(plugins)
except Exception:
features['plugin_count'] = -1
# Skill count (from Box runtime via skill manager)
try:
skill_mgr = getattr(ap, 'skill_mgr', None)
if skill_mgr is not None and getattr(skill_mgr, 'skills', None) is not None:
features['skill_count'] = len(skill_mgr.skills)
except Exception:
pass
return {
'event_type': 'instance_heartbeat',
'query_id': '',
'version': constants.semantic_version,
'instance_id': constants.instance_id,
'edition': constants.edition,
'features': features,
'timestamp': datetime.now(timezone.utc).isoformat(),
}
async def heartbeat_loop(ap: core_app.Application) -> None:
"""Send one heartbeat shortly after startup, then daily."""
# Small delay so managers (platform, skills, plugins) finish loading first
await asyncio.sleep(30)
while True:
try:
payload = await build_heartbeat_payload(ap)
await ap.telemetry.start_send_task(payload)
except Exception as e:
try:
ap.logger.debug(f'Telemetry heartbeat failed: {e}')
except Exception:
pass
await asyncio.sleep(HEARTBEAT_INTERVAL_SECONDS)
+11
View File
@@ -68,10 +68,21 @@ class TelemetryManager:
'edition',
'error',
'timestamp',
'event_type',
):
if sfield not in sanitized:
continue
v = sanitized.get(sfield)
sanitized[sfield] = '' if v is None else str(v)
# event_type defaults to 'query' for backward compatibility
if not sanitized.get('event_type'):
sanitized['event_type'] = 'query'
# features must be a JSON object
if 'features' in sanitized and not isinstance(sanitized['features'], dict):
sanitized['features'] = {}
if 'duration_ms' in sanitized:
try:
sanitized['duration_ms'] = (