Compare commits

...

5 Commits

Author SHA1 Message Date
huanghuoguoguo
7d94a3e8dd Add plugin rerank invocation action 2026-06-13 16:47:56 +08:00
RockChinQ
7965d333ac fix(mcp): read stdio args from form state in testMcp to avoid stale closure
The MCP detail page invokes testMcp() through an imperative handle
(formRef.current.testMcp()). The handle closure is only refreshed when
[mcpTesting] changes, so testMcp read a stale snapshot of the stdioArgs/
extraArgs React state — on the detail page that snapshot is the empty
initial [], so stdio 'args' were dropped entirely. The sandbox then
launched 'uvx' with no package, which exits 2 and surfaces only an opaque
'Connection closed' with no detail.

Read command/args/env via form.getValues() (kept in sync on every edit and
on load) instead of the captured state, matching how 'command' was already
read. Fixes stdio MCP test failing with empty args on the detail page.
2026-06-13 01:56:03 -04:00
RockChinQ
f7300f1473 chore(deps): bump langbot-plugin 0.4.2 -> 0.4.3
Picks up the nsjail Box backend fix: correct cgroup v2 detection (probe
cgroup.subtree_control instead of mkdir, fixing the private-cgroupns EBUSY
false-positive) and removal of the RLIMIT_AS memory cap that instantly
killed uv/node-based stdio MCP servers (exit 255). Containerized nsjail
deployments now require the host cgroup namespace (--cgroupns=host).
2026-06-13 01:00:00 -04:00
RockChinQ
2b6dcfe9c7 feat(survey): add bot_response_success_100 milestone trigger event
Counts successful non-WebSocket bot responses (persisted in the metadata
table as survey_bot_response_count, survives restarts) and fires the
bot_response_success_100 survey event once the instance reaches 100
responses. Counting stops after the milestone has been triggered.

Existing first_bot_response_success behavior unchanged. 6 new unit tests.
2026-06-12 09:40:07 -04:00
RockChinQ
dd96da895c feat(telemetry): payload v2 with feature usage counters and instance heartbeat
Per-query events now carry event_type='query' and a features JSON object:
- tool_calls by source (native/plugin/mcp/skill) via ToolManager
- tool_call_rounds, kb usage (count/engine plugins/retrieved entries) via local-agent
- sandbox execs/errors via BoxService
- activated_skills and bound mcp_servers snapshots

New instance_heartbeat event (startup + daily) reports anonymous instance
profile: deploy platform, database/vdb kind, box backend/availability,
adapter type names, and resource counts. Respects space.disable_telemetry.

All collection helpers are defensive and never break the pipeline.
Verified: ruff, 37 telemetry unit tests (13 new), 504 box/provider/pipeline tests.
2026-06-12 08:11:43 -04:00
17 changed files with 780 additions and 25 deletions

View File

@@ -70,7 +70,7 @@ dependencies = [
"chromadb>=1.0.0,<2.0.0",
"qdrant-client (>=1.15.1,<2.0.0)",
"pyseekdb==1.1.0.post3",
"langbot-plugin==0.4.2",
"langbot-plugin==0.4.3",
"asyncpg>=0.30.0",
"line-bot-sdk>=3.19.0",
"matrix-nio>=0.25.2",

View File

@@ -12,6 +12,7 @@ import pydantic
from langbot_plugin.box.client import BoxRuntimeClient
from .connector import BoxRuntimeConnector, _get_box_config
from ..telemetry import features as telemetry_features
from langbot_plugin.box.errors import BoxError, BoxValidationError
from langbot_plugin.box.models import (
BUILTIN_PROFILES,
@@ -218,6 +219,7 @@ class BoxService:
f'query_id={query.query_id} '
f'summary={json.dumps(self._summarize_result(result), ensure_ascii=False)}'
)
telemetry_features.increment(query, 'sandbox', 'execs')
return self._serialize_result(result)
def resolve_box_session_id(self, query: pipeline_query.Query) -> str:
@@ -785,6 +787,7 @@ class BoxService:
# ── Observability ─────────────────────────────────────────────────
def _record_error(self, exc: Exception, query: pipeline_query.Query):
telemetry_features.increment(query, 'sandbox', 'errors')
self._recent_errors.append(
{
'timestamp': _dt.datetime.now(_UTC).isoformat(),

View File

@@ -200,6 +200,17 @@ class Application:
scopes=[core_entities.LifecycleControlScope.APPLICATION],
)
# Telemetry instance heartbeat (startup + daily); respects
# space.disable_telemetry via TelemetryManager.send().
if self.telemetry is not None:
from ..telemetry import heartbeat as telemetry_heartbeat
self.task_mgr.create_task(
telemetry_heartbeat.heartbeat_loop(self),
name='telemetry-heartbeat',
scopes=[core_entities.LifecycleControlScope.APPLICATION],
)
# Start monitoring data cleanup task if enabled
monitoring_cfg = self.instance_config.data.get('monitoring', {})
auto_cleanup_cfg = monitoring_cfg.get('auto_cleanup', {})

View File

@@ -13,6 +13,7 @@ from ....provider import runner as runner_module
import langbot_plugin.api.entities.events as events
from ....utils import importutil, constants, runner as runner_utils
from ....telemetry import features as telemetry_features
from ....provider import runners
import langbot_plugin.api.entities.builtin.provider.session as provider_session
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
@@ -201,7 +202,12 @@ class ChatMessageHandler(handler.MessageHandler):
runner_name, runner, query.pipeline_config
)
# Feature usage collected during query processing (tool calls,
# knowledge base usage, sandbox executions, activated skills, ...)
features = telemetry_features.collect_features(query)
payload = {
'event_type': 'query',
'query_id': query.query_id,
'adapter': adapter_name,
'runner': runner_name,
@@ -212,6 +218,7 @@ class ChatMessageHandler(handler.MessageHandler):
'instance_id': constants.instance_id,
'edition': constants.edition,
'pipeline_plugins': pipeline_plugins,
'features': features,
'error': locals().get('error_info', None),
'timestamp': datetime.utcnow().isoformat(),
}
@@ -219,10 +226,12 @@ class ChatMessageHandler(handler.MessageHandler):
# Send telemetry asynchronously and do not block pipeline via app's telemetry manager
await self.ap.telemetry.start_send_task(payload)
# Trigger survey event on first successful non-WebSocket response
# Trigger survey events on successful non-WebSocket responses
if not locals().get('error_info') and adapter_name and 'WebSocket' not in adapter_name:
if self.ap.survey:
await self.ap.survey.trigger_event('first_bot_response_success')
# Counts toward the bot_response_success_100 milestone event
await self.ap.survey.record_bot_response_success()
except Exception as ex:
# Ensure telemetry issues do not affect normal flow
self.ap.logger.warning(f'Failed to send telemetry: {ex}')

View File

@@ -514,6 +514,35 @@ class RuntimeConnectionHandler(handler.Handler):
except Exception as e:
return _make_rag_error_response(e, 'EmbeddingError', embedding_model_uuid=embedding_model_uuid)
@self.action(PluginToRuntimeAction.INVOKE_RERANK)
async def invoke_rerank(data: dict[str, Any]) -> handler.ActionResponse:
rerank_model_uuid = data['rerank_model_uuid']
query = data['query']
documents = data['documents']
top_k = data.get('top_k')
extra_args = data.get('extra_args', {})
try:
rerank_model = await self.ap.model_mgr.get_rerank_model_by_uuid(rerank_model_uuid)
except ValueError:
return handler.ActionResponse.error(
message=f'Rerank model with rerank_model_uuid {rerank_model_uuid} not found',
)
try:
scores = await rerank_model.provider.invoke_rerank(
model=rerank_model,
query=query,
documents=documents[:64],
extra_args=extra_args,
)
scored = sorted(scores, key=lambda x: x.get('relevance_score', 0), reverse=True)
if top_k is not None:
scored = scored[: int(top_k)]
return handler.ActionResponse.success(data={'results': scored})
except Exception as e:
return _make_rag_error_response(e, 'RerankError', rerank_model_uuid=rerank_model_uuid)
@self.action(PluginToRuntimeAction.VECTOR_UPSERT)
async def vector_upsert(data: dict[str, Any]) -> handler.ActionResponse:
collection_id = data['collection_id']

View File

@@ -4,6 +4,7 @@ import json
import copy
import typing
from .. import runner
from ...telemetry import features as telemetry_features
from ..modelmgr import requester as modelmgr_requester
from ..tools.loaders.native import EXEC_TOOL_NAME
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
@@ -187,6 +188,8 @@ class LocalAgentRunner(runner.RequestRunner):
# only support text for now
all_results: list[rag_context.RetrievalResultEntry] = []
kb_engine_plugins: set[str] = set()
# Retrieve from each knowledge base
for kb_uuid in kb_uuids:
kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid)
@@ -195,6 +198,12 @@ class LocalAgentRunner(runner.RequestRunner):
self.ap.logger.warning(f'Knowledge base {kb_uuid} not found, skipping')
continue
try:
engine_plugin_id = kb.get_knowledge_engine_plugin_id() or 'builtin'
except Exception:
engine_plugin_id = 'builtin'
kb_engine_plugins.add(engine_plugin_id)
result = await kb.retrieve(
user_message_text,
settings={
@@ -207,6 +216,17 @@ class LocalAgentRunner(runner.RequestRunner):
if result:
all_results.extend(result)
# Telemetry: knowledge base usage (counts and engine categories only)
telemetry_features.set_value(
query,
'kb',
{
'kb_count': len(kb_uuids),
'engine_plugins': sorted(kb_engine_plugins),
'retrieved_entries': len(all_results),
},
)
# Rerank step: re-score results using a rerank model if configured
local_agent_config = query.pipeline_config.get('ai', {}).get('local-agent', {})
rerank_model_uuid = local_agent_config.get('rerank-model', '')
@@ -373,6 +393,7 @@ class LocalAgentRunner(runner.RequestRunner):
tool_call_round = 0
while pending_tool_calls:
tool_call_round += 1
telemetry_features.set_value(query, 'tool_call_rounds', tool_call_round)
if tool_call_round > MAX_TOOL_CALL_ROUNDS:
self.ap.logger.warning(
f'Tool-call loop reached the {MAX_TOOL_CALL_ROUNDS}-round cap '

View File

@@ -97,13 +97,19 @@ class ToolManager:
return tools
async def execute_func_call(self, name: str, parameters: dict, query: pipeline_query.Query) -> typing.Any:
from langbot.pkg.telemetry import features as telemetry_features
if await self.native_tool_loader.has_tool(name):
telemetry_features.increment(query, 'tool_calls', 'native')
return await self.native_tool_loader.invoke_tool(name, parameters, query)
if await self.plugin_tool_loader.has_tool(name):
telemetry_features.increment(query, 'tool_calls', 'plugin')
return await self.plugin_tool_loader.invoke_tool(name, parameters, query)
if await self.mcp_tool_loader.has_tool(name):
telemetry_features.increment(query, 'tool_calls', 'mcp')
return await self.mcp_tool_loader.invoke_tool(name, parameters, query)
if await self.skill_tool_loader.has_tool(name):
telemetry_features.increment(query, 'tool_calls', 'skill')
return await self.skill_tool_loader.invoke_tool(name, parameters, query)
raise ValueError(f'未找到工具: {name}')

View File

@@ -13,6 +13,11 @@ from ..entity.persistence.metadata import Metadata
from ..utils import constants
SURVEY_TRIGGERED_KEY = 'survey_triggered_events'
BOT_RESPONSE_COUNT_KEY = 'survey_bot_response_count'
# Milestone event fired when an instance accumulates this many successful bot responses
BOT_RESPONSE_MILESTONE = 100
BOT_RESPONSE_MILESTONE_EVENT = f'bot_response_success_{BOT_RESPONSE_MILESTONE}'
class SurveyManager:
@@ -23,11 +28,13 @@ class SurveyManager:
self._triggered_events: set[str] = set()
self._pending_survey: typing.Optional[dict] = None
self._space_url: str = ''
self._bot_response_count: int = 0
async def initialize(self):
space_config = self.ap.instance_config.data.get('space', {})
self._space_url = space_config.get('url', '').rstrip('/')
await self._load_triggered_events()
await self._load_bot_response_count()
async def _load_triggered_events(self):
"""Load previously triggered events from metadata table."""
@@ -65,6 +72,54 @@ class SurveyManager:
return False
return bool(self._space_url)
async def _load_bot_response_count(self):
"""Load the persisted successful bot response count from metadata table."""
try:
result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.select(Metadata).where(Metadata.key == BOT_RESPONSE_COUNT_KEY)
)
row = result.first()
if row:
self._bot_response_count = int(row[0].value)
except Exception:
self._bot_response_count = 0
async def _save_bot_response_count(self):
"""Persist the successful bot response count to metadata table."""
try:
value = str(self._bot_response_count)
result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.select(Metadata).where(Metadata.key == BOT_RESPONSE_COUNT_KEY)
)
if result.first():
await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(Metadata).where(Metadata.key == BOT_RESPONSE_COUNT_KEY).values(value=value)
)
else:
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(Metadata).values(key=BOT_RESPONSE_COUNT_KEY, value=value)
)
except Exception as e:
self.ap.logger.debug(f'Failed to save survey bot response count: {e}')
async def record_bot_response_success(self):
"""Count a successful bot response; fires the milestone event at the threshold.
Called by the chat handler after each successful (non-WebSocket) response.
The count is persisted so it survives restarts. Once the milestone event
has been triggered, counting stops (no further writes needed).
"""
if BOT_RESPONSE_MILESTONE_EVENT in self._triggered_events:
return
if not self._is_space_configured():
return
self._bot_response_count += 1
await self._save_bot_response_count()
if self._bot_response_count >= BOT_RESPONSE_MILESTONE:
await self.trigger_event(BOT_RESPONSE_MILESTONE_EVENT)
async def trigger_event(self, event: str):
"""Called when an event occurs. Checks Space for a pending survey."""
if event in self._triggered_events:

View File

@@ -0,0 +1,102 @@
"""Per-query telemetry feature counters.
Collects anonymous, content-free usage signals (tool call counts, knowledge
base usage, sandbox executions, ...) into ``query.variables`` during query
processing. The chat handler reads the accumulated dict when building the
telemetry payload and ships it as the ``features`` JSON object.
Every helper here is defensive: telemetry must NEVER break the pipeline, so
all mutations are wrapped and failures are silently ignored.
"""
from __future__ import annotations
import typing
if typing.TYPE_CHECKING:
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
FEATURES_KEY = '_telemetry_features'
def get_features(query: pipeline_query.Query) -> dict:
"""Return the mutable features dict for this query, creating it if needed."""
try:
return query.variables.setdefault(FEATURES_KEY, {})
except Exception:
return {}
def increment(query: pipeline_query.Query, group: str, key: str | None = None, amount: int = 1) -> None:
"""Increment a counter.
``increment(q, 'sandbox', 'execs')`` -> features['sandbox']['execs'] += 1
``increment(q, 'tool_call_rounds')`` -> features['tool_call_rounds'] += 1
"""
try:
features = get_features(query)
if key is None:
features[group] = int(features.get(group, 0)) + amount
else:
nested = features.setdefault(group, {})
if isinstance(nested, dict):
nested[key] = int(nested.get(key, 0)) + amount
except Exception:
pass
def set_value(query: pipeline_query.Query, group: str, value: typing.Any) -> None:
"""Set a feature value (overwrites)."""
try:
get_features(query)[group] = value
except Exception:
pass
def collect_features(query: pipeline_query.Query) -> dict:
"""Build the final ``features`` object for the telemetry payload.
Combines the counters accumulated during processing with end-of-query
snapshots (activated skills, bound MCP servers). Returns a plain dict
that must be JSON-serializable; non-serializable values are dropped.
"""
features: dict = {}
try:
accumulated = query.variables.get(FEATURES_KEY)
if isinstance(accumulated, dict):
features.update(accumulated)
except Exception:
pass
# Activated skills (names only, registered by the activate tool)
try:
activated = query.variables.get('_activated_skills', {})
if isinstance(activated, dict) and activated:
features['activated_skills'] = sorted(activated.keys())
except Exception:
pass
# MCP servers bound to the pipeline (names only; None means "all enabled")
try:
bound_mcp = query.variables.get('_pipeline_bound_mcp_servers', None)
if bound_mcp is not None:
features['mcp_servers'] = list(bound_mcp)
except Exception:
pass
# Drop anything that is not JSON-serializable
import json
try:
json.dumps(features)
return features
except Exception:
safe: dict = {}
for k, v in features.items():
try:
json.dumps({k: v})
safe[k] = v
except Exception:
continue
return safe

View File

@@ -0,0 +1,131 @@
"""Instance heartbeat telemetry.
Sends a periodic (startup + daily) anonymous snapshot of the instance's
configuration profile so feature *adoption* can be measured separately from
feature *usage* (which is covered by per-query telemetry).
The snapshot contains only configuration categories and object counts —
never names of user resources (except adapter type names, which are LangBot
adapter identifiers, not account info), never message content, never
credentials.
"""
from __future__ import annotations
import asyncio
import typing
from datetime import datetime, timezone
import sqlalchemy
from ..utils import constants, platform as platform_utils
if typing.TYPE_CHECKING:
from ..core import app as core_app
HEARTBEAT_INTERVAL_SECONDS = 24 * 3600
async def _count(ap: core_app.Application, table) -> int:
"""Count rows in a persistence table; -1 when unavailable."""
try:
result = await ap.persistence_mgr.execute_async(sqlalchemy.select(sqlalchemy.func.count()).select_from(table))
return int(result.scalar() or 0)
except Exception:
return -1
async def build_heartbeat_payload(ap: core_app.Application) -> dict:
"""Collect the anonymous instance profile snapshot."""
from ..entity.persistence import bot as persistence_bot
from ..entity.persistence import mcp as persistence_mcp
from ..entity.persistence import pipeline as persistence_pipeline
from ..entity.persistence import rag as persistence_rag
config = ap.instance_config.data if ap.instance_config else {}
features: dict = {
'deploy_platform': platform_utils.get_platform(),
'database': config.get('database', {}).get('use', 'sqlite'),
'vdb': config.get('vdb', {}).get('use', 'chroma'),
}
# Box / sandbox profile
try:
box_service = getattr(ap, 'box_service', None)
if box_service is not None:
box_info: dict = {
'enabled': bool(box_service.enabled),
'available': bool(box_service.available),
}
box_cfg = config.get('box', {})
box_info['backend'] = box_cfg.get('backend', 'local')
try:
box_info['shares_fs'] = bool(box_service.shares_filesystem_with_box)
except Exception:
pass
features['box'] = box_info
except Exception:
pass
# Bots / adapters (adapter type names only)
try:
platform_mgr = getattr(ap, 'platform_mgr', None)
if platform_mgr is not None and getattr(platform_mgr, 'bots', None) is not None:
enabled_bots = [bot for bot in platform_mgr.bots if getattr(bot, 'enable', False)]
features['bot_count'] = len(platform_mgr.bots)
adapters = sorted({bot.adapter.__class__.__name__ for bot in enabled_bots if getattr(bot, 'adapter', None)})
features['adapters'] = adapters
except Exception:
pass
# Resource counts
features['pipeline_count'] = await _count(ap, persistence_pipeline.LegacyPipeline)
features['mcp_server_count'] = await _count(ap, persistence_mcp.MCPServer)
features['knowledge_base_count'] = await _count(ap, persistence_rag.KnowledgeBase)
if 'bot_count' not in features:
features['bot_count'] = await _count(ap, persistence_bot.Bot)
# Plugin count (from plugin runtime)
try:
plugin_connector = getattr(ap, 'plugin_connector', None)
if plugin_connector is not None:
plugins = await plugin_connector.list_plugins()
features['plugin_count'] = len(plugins)
except Exception:
features['plugin_count'] = -1
# Skill count (from Box runtime via skill manager)
try:
skill_mgr = getattr(ap, 'skill_mgr', None)
if skill_mgr is not None and getattr(skill_mgr, 'skills', None) is not None:
features['skill_count'] = len(skill_mgr.skills)
except Exception:
pass
return {
'event_type': 'instance_heartbeat',
'query_id': '',
'version': constants.semantic_version,
'instance_id': constants.instance_id,
'edition': constants.edition,
'features': features,
'timestamp': datetime.now(timezone.utc).isoformat(),
}
async def heartbeat_loop(ap: core_app.Application) -> None:
"""Send one heartbeat shortly after startup, then daily."""
# Small delay so managers (platform, skills, plugins) finish loading first
await asyncio.sleep(30)
while True:
try:
payload = await build_heartbeat_payload(ap)
await ap.telemetry.start_send_task(payload)
except Exception as e:
try:
ap.logger.debug(f'Telemetry heartbeat failed: {e}')
except Exception:
pass
await asyncio.sleep(HEARTBEAT_INTERVAL_SECONDS)

View File

@@ -68,10 +68,21 @@ class TelemetryManager:
'edition',
'error',
'timestamp',
'event_type',
):
if sfield not in sanitized:
continue
v = sanitized.get(sfield)
sanitized[sfield] = '' if v is None else str(v)
# event_type defaults to 'query' for backward compatibility
if not sanitized.get('event_type'):
sanitized['event_type'] = 'query'
# features must be a JSON object
if 'features' in sanitized and not isinstance(sanitized['features'], dict):
sanitized['features'] = {}
if 'duration_ms' in sanitized:
try:
sanitized['duration_ms'] = (

View File

@@ -27,6 +27,66 @@ def compiled_params(statement):
return statement.compile().params
class TestRagRerankAction:
"""Tests for RAG rerank action handler."""
@pytest.fixture
def app(self):
mock_app = Mock()
mock_app.model_mgr = Mock()
mock_app.logger = Mock()
return mock_app
@pytest.mark.asyncio
async def test_invokes_rerank_model_and_sorts_scores(self, app):
"""Rerank action uses the selected model and returns top scores."""
provider = Mock()
provider.invoke_rerank = AsyncMock(
return_value=[
{'index': 0, 'relevance_score': 0.2},
{'index': 1, 'relevance_score': 0.9},
]
)
rerank_model = SimpleNamespace(provider=provider)
app.model_mgr.get_rerank_model_by_uuid = AsyncMock(return_value=rerank_model)
runtime_handler = make_handler(app)
response = await runtime_handler.actions[PluginToRuntimeAction.INVOKE_RERANK.value]({
'rerank_model_uuid': 'rerank-1',
'query': 'hello',
'documents': ['a', 'b'],
'top_k': 1,
'extra_args': {'return_documents': False},
})
assert response.code == 0
assert response.data['results'] == [{'index': 1, 'relevance_score': 0.9}]
app.model_mgr.get_rerank_model_by_uuid.assert_awaited_once_with('rerank-1')
provider.invoke_rerank.assert_awaited_once_with(
model=rerank_model,
query='hello',
documents=['a', 'b'],
extra_args={'return_documents': False},
)
@pytest.mark.asyncio
async def test_returns_error_when_rerank_model_missing(self, app):
"""Missing rerank model returns an action error."""
app.model_mgr.get_rerank_model_by_uuid = AsyncMock(
side_effect=ValueError('not found')
)
runtime_handler = make_handler(app)
response = await runtime_handler.actions[PluginToRuntimeAction.INVOKE_RERANK.value]({
'rerank_model_uuid': 'missing',
'query': 'hello',
'documents': ['a'],
})
assert response.code != 0
assert 'Rerank model with rerank_model_uuid missing not found' in response.message
class TestInitializePluginSettings:
"""Tests for initialize_plugin_settings action handler."""

View File

@@ -7,6 +7,7 @@ Tests cover:
- Survey response submission
- Survey dismissal
"""
from __future__ import annotations
import pytest
@@ -127,9 +128,7 @@ class TestLoadTriggeredEvents:
"""Test that empty set is used when no events stored."""
survey_module = get_survey_module()
mock_app = create_mock_app()
mock_app.persistence_mgr.execute_async = AsyncMock(
return_value=Mock(first=Mock(return_value=None))
)
mock_app.persistence_mgr.execute_async = AsyncMock(return_value=Mock(first=Mock(return_value=None)))
manager = survey_module.SurveyManager(mock_app)
await manager._load_triggered_events()
@@ -219,9 +218,7 @@ class TestTriggerEvent:
"""Test that new event is added and saved."""
survey_module = get_survey_module()
mock_app = create_mock_app()
mock_app.persistence_mgr.execute_async = AsyncMock(
return_value=Mock(first=Mock(return_value=None))
)
mock_app.persistence_mgr.execute_async = AsyncMock(return_value=Mock(first=Mock(return_value=None)))
manager = survey_module.SurveyManager(mock_app)
manager._space_url = 'https://space.example.com'
@@ -231,6 +228,104 @@ class TestTriggerEvent:
assert 'new_event' in manager._triggered_events
class TestRecordBotResponseSuccess:
"""Tests for the bot_response_success_100 milestone counter."""
def _make_manager(self, survey_module, mock_app):
manager = survey_module.SurveyManager(mock_app)
manager._space_url = 'https://space.example.com'
# No existing metadata rows: select returns no row
mock_app.persistence_mgr.execute_async = AsyncMock(return_value=Mock(first=Mock(return_value=None)))
return manager
@pytest.mark.asyncio
async def test_increments_and_persists_count(self):
survey_module = get_survey_module()
mock_app = create_mock_app()
manager = self._make_manager(survey_module, mock_app)
await manager.record_bot_response_success()
assert manager._bot_response_count == 1
# select + insert for the count key
assert mock_app.persistence_mgr.execute_async.call_count >= 2
@pytest.mark.asyncio
async def test_fires_milestone_event_at_threshold(self):
survey_module = get_survey_module()
mock_app = create_mock_app()
manager = self._make_manager(survey_module, mock_app)
manager._bot_response_count = survey_module.BOT_RESPONSE_MILESTONE - 1
await manager.record_bot_response_success()
assert manager._bot_response_count == survey_module.BOT_RESPONSE_MILESTONE
assert survey_module.BOT_RESPONSE_MILESTONE_EVENT in manager._triggered_events
@pytest.mark.asyncio
async def test_does_not_fire_below_threshold(self):
survey_module = get_survey_module()
mock_app = create_mock_app()
manager = self._make_manager(survey_module, mock_app)
manager._bot_response_count = 5
await manager.record_bot_response_success()
assert survey_module.BOT_RESPONSE_MILESTONE_EVENT not in manager._triggered_events
@pytest.mark.asyncio
async def test_stops_counting_after_milestone_triggered(self):
survey_module = get_survey_module()
mock_app = create_mock_app()
manager = self._make_manager(survey_module, mock_app)
manager._triggered_events.add(survey_module.BOT_RESPONSE_MILESTONE_EVENT)
manager._bot_response_count = survey_module.BOT_RESPONSE_MILESTONE
await manager.record_bot_response_success()
# No persistence write, count unchanged
mock_app.persistence_mgr.execute_async.assert_not_called()
assert manager._bot_response_count == survey_module.BOT_RESPONSE_MILESTONE
@pytest.mark.asyncio
async def test_skips_when_space_not_configured(self):
survey_module = get_survey_module()
mock_app = create_mock_app()
manager = self._make_manager(survey_module, mock_app)
manager._space_url = ''
await manager.record_bot_response_success()
assert manager._bot_response_count == 0
mock_app.persistence_mgr.execute_async.assert_not_called()
@pytest.mark.asyncio
async def test_count_loaded_on_initialize(self):
survey_module = get_survey_module()
mock_app = create_mock_app()
count_row = Mock()
count_row.value = '42'
def execute_side_effect(stmt):
result = Mock()
# Both _load_triggered_events and _load_bot_response_count select
# from Metadata; return the count row only for the count key.
stmt_str = str(stmt.compile(compile_kwargs={'literal_binds': True}))
if survey_module.BOT_RESPONSE_COUNT_KEY in stmt_str:
result.first.return_value = (count_row,)
else:
result.first.return_value = None
return result
mock_app.persistence_mgr.execute_async = AsyncMock(side_effect=execute_side_effect)
manager = survey_module.SurveyManager(mock_app)
await manager.initialize()
assert manager._bot_response_count == 42
class TestPendingSurvey:
"""Tests for get_pending_survey and clear_pending_survey."""
@@ -296,14 +391,19 @@ class TestSubmitResponse:
# Mock successful HTTP response
import httpx
mock_response = Mock()
mock_response.status_code = 200
with pytest.MonkeyPatch().context() as m:
m.setattr(httpx, 'AsyncClient', lambda **kwargs: MagicMock(
__aenter__=AsyncMock(return_value=Mock(post=AsyncMock(return_value=mock_response))),
__aexit__=AsyncMock(return_value=None)
))
m.setattr(
httpx,
'AsyncClient',
lambda **kwargs: MagicMock(
__aenter__=AsyncMock(return_value=Mock(post=AsyncMock(return_value=mock_response))),
__aexit__=AsyncMock(return_value=None),
),
)
result = await manager.submit_response('survey123', {'q1': 'answer1'})
assert result is True
@@ -338,15 +438,20 @@ class TestDismissSurvey:
# Mock successful HTTP response
import httpx
mock_response = Mock()
mock_response.status_code = 200
with pytest.MonkeyPatch().context() as m:
m.setattr(httpx, 'AsyncClient', lambda **kwargs: MagicMock(
__aenter__=AsyncMock(return_value=Mock(post=AsyncMock(return_value=mock_response))),
__aexit__=AsyncMock(return_value=None)
))
m.setattr(
httpx,
'AsyncClient',
lambda **kwargs: MagicMock(
__aenter__=AsyncMock(return_value=Mock(post=AsyncMock(return_value=mock_response))),
__aexit__=AsyncMock(return_value=None),
),
)
result = await manager.dismiss_survey('survey123')
assert result is True
assert manager._pending_survey is None
assert manager._pending_survey is None

View File

@@ -0,0 +1,92 @@
"""Unit tests for telemetry feature counters (pkg/telemetry/features.py)."""
from __future__ import annotations
from importlib import import_module
def get_features_module():
return import_module('langbot.pkg.telemetry.features')
class FakeQuery:
def __init__(self):
self.variables = {}
class TestIncrement:
def test_increment_nested_counter(self):
features = get_features_module()
q = FakeQuery()
features.increment(q, 'tool_calls', 'native')
features.increment(q, 'tool_calls', 'native')
features.increment(q, 'tool_calls', 'mcp')
assert q.variables[features.FEATURES_KEY]['tool_calls'] == {'native': 2, 'mcp': 1}
def test_increment_flat_counter(self):
features = get_features_module()
q = FakeQuery()
features.increment(q, 'something')
features.increment(q, 'something', amount=2)
assert q.variables[features.FEATURES_KEY]['something'] == 3
def test_increment_never_raises_on_broken_query(self):
features = get_features_module()
class Broken:
@property
def variables(self):
raise RuntimeError('boom')
# Must not raise
features.increment(Broken(), 'tool_calls', 'native')
def test_set_value(self):
features = get_features_module()
q = FakeQuery()
features.set_value(q, 'tool_call_rounds', 5)
assert q.variables[features.FEATURES_KEY]['tool_call_rounds'] == 5
class TestCollectFeatures:
def test_collect_empty(self):
features = get_features_module()
q = FakeQuery()
assert features.collect_features(q) == {}
def test_collect_combines_counters_and_snapshots(self):
features = get_features_module()
q = FakeQuery()
features.increment(q, 'sandbox', 'execs')
features.set_value(q, 'kb', {'kb_count': 2, 'engine_plugins': ['builtin'], 'retrieved_entries': 7})
q.variables['_activated_skills'] = {'pdf-tools': {}, 'a-skill': {}}
q.variables['_pipeline_bound_mcp_servers'] = ['srv1', 'srv2']
result = features.collect_features(q)
assert result['sandbox'] == {'execs': 1}
assert result['kb']['kb_count'] == 2
assert result['activated_skills'] == ['a-skill', 'pdf-tools'] # sorted
assert result['mcp_servers'] == ['srv1', 'srv2']
def test_collect_omits_mcp_when_all_enabled(self):
"""None means 'all enabled' and is not reported."""
features = get_features_module()
q = FakeQuery()
q.variables['_pipeline_bound_mcp_servers'] = None
assert 'mcp_servers' not in features.collect_features(q)
def test_collect_drops_non_json_serializable(self):
features = get_features_module()
q = FakeQuery()
features.set_value(q, 'good', 1)
features.set_value(q, 'bad', object())
result = features.collect_features(q)
assert result == {'good': 1}
def test_collect_is_json_serializable(self):
import json
features = get_features_module()
q = FakeQuery()
features.increment(q, 'tool_calls', 'skill')
json.dumps(features.collect_features(q))

View File

@@ -0,0 +1,104 @@
"""Unit tests for telemetry heartbeat payload (pkg/telemetry/heartbeat.py)."""
from __future__ import annotations
import json
import pytest
from unittest.mock import AsyncMock, Mock
from importlib import import_module
def get_heartbeat_module():
return import_module('langbot.pkg.telemetry.heartbeat')
def make_app():
ap = Mock()
ap.instance_config = Mock()
ap.instance_config.data = {
'database': {'use': 'postgresql'},
'vdb': {'use': 'chroma'},
'box': {'enabled': True, 'backend': 'nsjail'},
}
# persistence counts
result = Mock()
result.scalar.return_value = 3
ap.persistence_mgr = Mock()
ap.persistence_mgr.execute_async = AsyncMock(return_value=result)
# box service
ap.box_service = Mock()
ap.box_service.enabled = True
ap.box_service.available = False
ap.box_service.shares_filesystem_with_box = False
# platform manager with one enabled bot
bot = Mock()
bot.enable = True
bot.adapter = Mock()
bot.adapter.__class__.__name__ = 'TelegramAdapter'
ap.platform_mgr = Mock()
ap.platform_mgr.bots = [bot]
# plugin connector
ap.plugin_connector = Mock()
ap.plugin_connector.list_plugins = AsyncMock(return_value=[{}, {}])
# skills
ap.skill_mgr = Mock()
ap.skill_mgr.skills = {'a': {}, 'b': {}, 'c': {}}
return ap
class TestBuildHeartbeatPayload:
@pytest.mark.asyncio
async def test_payload_shape(self):
heartbeat = get_heartbeat_module()
ap = make_app()
payload = await heartbeat.build_heartbeat_payload(ap)
assert payload['event_type'] == 'instance_heartbeat'
assert payload['query_id'] == ''
assert 'timestamp' in payload
f = payload['features']
assert f['database'] == 'postgresql'
assert f['vdb'] == 'chroma'
assert f['box'] == {
'enabled': True,
'available': False,
'backend': 'nsjail',
'shares_fs': False,
}
assert f['adapters'] == ['TelegramAdapter']
assert f['bot_count'] == 1
assert f['plugin_count'] == 2
assert f['skill_count'] == 3
assert f['pipeline_count'] == 3
assert f['mcp_server_count'] == 3
assert f['knowledge_base_count'] == 3
@pytest.mark.asyncio
async def test_payload_is_json_serializable(self):
heartbeat = get_heartbeat_module()
payload = await heartbeat.build_heartbeat_payload(make_app())
json.dumps(payload)
@pytest.mark.asyncio
async def test_count_failure_yields_minus_one(self):
heartbeat = get_heartbeat_module()
ap = make_app()
ap.persistence_mgr.execute_async = AsyncMock(side_effect=RuntimeError('db down'))
payload = await heartbeat.build_heartbeat_payload(ap)
assert payload['features']['pipeline_count'] == -1
@pytest.mark.asyncio
async def test_no_user_content_fields(self):
"""The heartbeat must never carry message content / credentials keys."""
heartbeat = get_heartbeat_module()
payload = await heartbeat.build_heartbeat_payload(make_app())
flat = json.dumps(payload).lower()
for forbidden in ('api_key', 'password', 'token', 'message_content'):
assert forbidden not in flat

8
uv.lock generated
View File

@@ -2029,7 +2029,7 @@ requires-dist = [
{ name = "ebooklib", specifier = ">=0.18" },
{ name = "gewechat-client", specifier = ">=0.1.5" },
{ name = "html2text", specifier = ">=2024.2.26" },
{ name = "langbot-plugin", specifier = "==0.4.2" },
{ name = "langbot-plugin", specifier = "==0.4.3" },
{ name = "langchain", specifier = ">=0.2.0" },
{ name = "langchain-core", specifier = ">=1.3.3" },
{ name = "langchain-text-splitters", specifier = ">=1.1.2" },
@@ -2092,7 +2092,7 @@ dev = [
[[package]]
name = "langbot-plugin"
version = "0.4.2"
version = "0.4.3"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aiofiles" },
@@ -2113,9 +2113,9 @@ dependencies = [
{ name = "watchdog" },
{ name = "websockets" },
]
sdist = { url = "https://files.pythonhosted.org/packages/0d/ea/8a26bc399ae9aff0d99fb03b239f08ba79f211bed053dda479f01e08cef8/langbot_plugin-0.4.2.tar.gz", hash = "sha256:c6e247481f68e60aaafc30deabcd9a48b65269bcc99e1962a9df1e5d61a7de3d", size = 305407, upload-time = "2026-06-09T13:50:27.007Z" }
sdist = { url = "https://files.pythonhosted.org/packages/8e/f1/32ec67e8b8eb91159d2b9703f466cc2a763c8cea380dd56561efe793a55b/langbot_plugin-0.4.3.tar.gz", hash = "sha256:747fb78bc666cfac3842cb35130fa8323759dd8768fdaa1975099157a3749c6e", size = 309655, upload-time = "2026-06-13T04:58:10.279Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/de/1a/21c078ca309fbc6842548153545da434408035ea41eb8421fda5f9716dfe/langbot_plugin-0.4.2-py3-none-any.whl", hash = "sha256:3f2510f19c5cbdb025aeb52b057a17309cc694c48d8220b7dad7a66981a26b37", size = 210399, upload-time = "2026-06-09T13:50:28.347Z" },
{ url = "https://files.pythonhosted.org/packages/28/05/84bd7537efd45fc02044ca9509973160a7d6d10520ff73e31424141a3a6c/langbot_plugin-0.4.3-py3-none-any.whl", hash = "sha256:46aca36e2193c18f9cf332460760dd7b9340ee2e96a57f2e4ae621c4d4c4b61c", size = 211384, upload-time = "2026-06-13T04:58:11.668Z" },
]
[[package]]

View File

@@ -450,6 +450,9 @@ const MCPForm = forwardRef<MCPFormHandle, MCPFormProps>(function MCPForm(
testMcp: () => testMcp(),
isTesting: mcpTesting,
}),
// testMcp now reads everything via form.getValues(), so it does not need
// the latest stdioArgs/extraArgs closure — but keep mcpTesting so the
// exposed isTesting flag stays accurate.
[mcpTesting],
);
@@ -680,6 +683,17 @@ const MCPForm = forwardRef<MCPFormHandle, MCPFormProps>(function MCPForm(
try {
const mode = form.getValues('mode');
// Read every field via form.getValues() rather than the captured
// `stdioArgs` / `extraArgs` state. testMcp() is invoked through an
// imperative handle (formRef.current.testMcp()) whose closure is only
// refreshed when [mcpTesting] changes, so reading the React state here
// would use a stale snapshot — on the detail page that snapshot is the
// empty initial [], which dropped stdio args entirely and launched
// `uvx` with no package (exit 2 / "Connection closed", no detail).
// The form values are kept in sync on every edit and on load, so they
// are always current.
const formExtraArgs = form.getValues('extra_args') ?? [];
const formStdioArgs = form.getValues('args') ?? [];
let extraArgsData:
| MCPServerExtraArgsSSE
| MCPServerExtraArgsHttp
@@ -690,7 +704,7 @@ const MCPForm = forwardRef<MCPFormHandle, MCPFormProps>(function MCPForm(
url: form.getValues('url')!,
timeout: form.getValues('timeout'),
headers: Object.fromEntries(
extraArgs.map((arg) => [arg.key, arg.value]),
formExtraArgs.map((arg) => [arg.key, arg.value]),
),
ssereadtimeout: form.getValues('ssereadtimeout'),
};
@@ -699,14 +713,16 @@ const MCPForm = forwardRef<MCPFormHandle, MCPFormProps>(function MCPForm(
url: form.getValues('url')!,
timeout: form.getValues('timeout'),
headers: Object.fromEntries(
extraArgs.map((arg) => [arg.key, arg.value]),
formExtraArgs.map((arg) => [arg.key, arg.value]),
),
};
} else {
extraArgsData = {
command: form.getValues('command')!,
args: stdioArgs.map((arg) => arg.value),
env: Object.fromEntries(extraArgs.map((arg) => [arg.key, arg.value])),
args: formStdioArgs.map((arg) => arg.value),
env: Object.fromEntries(
formExtraArgs.map((arg) => [arg.key, arg.value]),
),
};
}