mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-16 02:36:03 +00:00
Compare commits
5 Commits
refactor/e
...
feat/rag-r
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7d94a3e8dd | ||
|
|
7965d333ac | ||
|
|
f7300f1473 | ||
|
|
2b6dcfe9c7 | ||
|
|
dd96da895c |
@@ -70,7 +70,7 @@ dependencies = [
|
|||||||
"chromadb>=1.0.0,<2.0.0",
|
"chromadb>=1.0.0,<2.0.0",
|
||||||
"qdrant-client (>=1.15.1,<2.0.0)",
|
"qdrant-client (>=1.15.1,<2.0.0)",
|
||||||
"pyseekdb==1.1.0.post3",
|
"pyseekdb==1.1.0.post3",
|
||||||
"langbot-plugin==0.4.2",
|
"langbot-plugin==0.4.3",
|
||||||
"asyncpg>=0.30.0",
|
"asyncpg>=0.30.0",
|
||||||
"line-bot-sdk>=3.19.0",
|
"line-bot-sdk>=3.19.0",
|
||||||
"matrix-nio>=0.25.2",
|
"matrix-nio>=0.25.2",
|
||||||
|
|||||||
@@ -12,6 +12,7 @@ import pydantic
|
|||||||
|
|
||||||
from langbot_plugin.box.client import BoxRuntimeClient
|
from langbot_plugin.box.client import BoxRuntimeClient
|
||||||
from .connector import BoxRuntimeConnector, _get_box_config
|
from .connector import BoxRuntimeConnector, _get_box_config
|
||||||
|
from ..telemetry import features as telemetry_features
|
||||||
from langbot_plugin.box.errors import BoxError, BoxValidationError
|
from langbot_plugin.box.errors import BoxError, BoxValidationError
|
||||||
from langbot_plugin.box.models import (
|
from langbot_plugin.box.models import (
|
||||||
BUILTIN_PROFILES,
|
BUILTIN_PROFILES,
|
||||||
@@ -218,6 +219,7 @@ class BoxService:
|
|||||||
f'query_id={query.query_id} '
|
f'query_id={query.query_id} '
|
||||||
f'summary={json.dumps(self._summarize_result(result), ensure_ascii=False)}'
|
f'summary={json.dumps(self._summarize_result(result), ensure_ascii=False)}'
|
||||||
)
|
)
|
||||||
|
telemetry_features.increment(query, 'sandbox', 'execs')
|
||||||
return self._serialize_result(result)
|
return self._serialize_result(result)
|
||||||
|
|
||||||
def resolve_box_session_id(self, query: pipeline_query.Query) -> str:
|
def resolve_box_session_id(self, query: pipeline_query.Query) -> str:
|
||||||
@@ -785,6 +787,7 @@ class BoxService:
|
|||||||
# ── Observability ─────────────────────────────────────────────────
|
# ── Observability ─────────────────────────────────────────────────
|
||||||
|
|
||||||
def _record_error(self, exc: Exception, query: pipeline_query.Query):
|
def _record_error(self, exc: Exception, query: pipeline_query.Query):
|
||||||
|
telemetry_features.increment(query, 'sandbox', 'errors')
|
||||||
self._recent_errors.append(
|
self._recent_errors.append(
|
||||||
{
|
{
|
||||||
'timestamp': _dt.datetime.now(_UTC).isoformat(),
|
'timestamp': _dt.datetime.now(_UTC).isoformat(),
|
||||||
|
|||||||
@@ -200,6 +200,17 @@ class Application:
|
|||||||
scopes=[core_entities.LifecycleControlScope.APPLICATION],
|
scopes=[core_entities.LifecycleControlScope.APPLICATION],
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Telemetry instance heartbeat (startup + daily); respects
|
||||||
|
# space.disable_telemetry via TelemetryManager.send().
|
||||||
|
if self.telemetry is not None:
|
||||||
|
from ..telemetry import heartbeat as telemetry_heartbeat
|
||||||
|
|
||||||
|
self.task_mgr.create_task(
|
||||||
|
telemetry_heartbeat.heartbeat_loop(self),
|
||||||
|
name='telemetry-heartbeat',
|
||||||
|
scopes=[core_entities.LifecycleControlScope.APPLICATION],
|
||||||
|
)
|
||||||
|
|
||||||
# Start monitoring data cleanup task if enabled
|
# Start monitoring data cleanup task if enabled
|
||||||
monitoring_cfg = self.instance_config.data.get('monitoring', {})
|
monitoring_cfg = self.instance_config.data.get('monitoring', {})
|
||||||
auto_cleanup_cfg = monitoring_cfg.get('auto_cleanup', {})
|
auto_cleanup_cfg = monitoring_cfg.get('auto_cleanup', {})
|
||||||
|
|||||||
@@ -13,6 +13,7 @@ from ....provider import runner as runner_module
|
|||||||
|
|
||||||
import langbot_plugin.api.entities.events as events
|
import langbot_plugin.api.entities.events as events
|
||||||
from ....utils import importutil, constants, runner as runner_utils
|
from ....utils import importutil, constants, runner as runner_utils
|
||||||
|
from ....telemetry import features as telemetry_features
|
||||||
from ....provider import runners
|
from ....provider import runners
|
||||||
import langbot_plugin.api.entities.builtin.provider.session as provider_session
|
import langbot_plugin.api.entities.builtin.provider.session as provider_session
|
||||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||||
@@ -201,7 +202,12 @@ class ChatMessageHandler(handler.MessageHandler):
|
|||||||
runner_name, runner, query.pipeline_config
|
runner_name, runner, query.pipeline_config
|
||||||
)
|
)
|
||||||
|
|
||||||
|
# Feature usage collected during query processing (tool calls,
|
||||||
|
# knowledge base usage, sandbox executions, activated skills, ...)
|
||||||
|
features = telemetry_features.collect_features(query)
|
||||||
|
|
||||||
payload = {
|
payload = {
|
||||||
|
'event_type': 'query',
|
||||||
'query_id': query.query_id,
|
'query_id': query.query_id,
|
||||||
'adapter': adapter_name,
|
'adapter': adapter_name,
|
||||||
'runner': runner_name,
|
'runner': runner_name,
|
||||||
@@ -212,6 +218,7 @@ class ChatMessageHandler(handler.MessageHandler):
|
|||||||
'instance_id': constants.instance_id,
|
'instance_id': constants.instance_id,
|
||||||
'edition': constants.edition,
|
'edition': constants.edition,
|
||||||
'pipeline_plugins': pipeline_plugins,
|
'pipeline_plugins': pipeline_plugins,
|
||||||
|
'features': features,
|
||||||
'error': locals().get('error_info', None),
|
'error': locals().get('error_info', None),
|
||||||
'timestamp': datetime.utcnow().isoformat(),
|
'timestamp': datetime.utcnow().isoformat(),
|
||||||
}
|
}
|
||||||
@@ -219,10 +226,12 @@ class ChatMessageHandler(handler.MessageHandler):
|
|||||||
# Send telemetry asynchronously and do not block pipeline via app's telemetry manager
|
# Send telemetry asynchronously and do not block pipeline via app's telemetry manager
|
||||||
await self.ap.telemetry.start_send_task(payload)
|
await self.ap.telemetry.start_send_task(payload)
|
||||||
|
|
||||||
# Trigger survey event on first successful non-WebSocket response
|
# Trigger survey events on successful non-WebSocket responses
|
||||||
if not locals().get('error_info') and adapter_name and 'WebSocket' not in adapter_name:
|
if not locals().get('error_info') and adapter_name and 'WebSocket' not in adapter_name:
|
||||||
if self.ap.survey:
|
if self.ap.survey:
|
||||||
await self.ap.survey.trigger_event('first_bot_response_success')
|
await self.ap.survey.trigger_event('first_bot_response_success')
|
||||||
|
# Counts toward the bot_response_success_100 milestone event
|
||||||
|
await self.ap.survey.record_bot_response_success()
|
||||||
except Exception as ex:
|
except Exception as ex:
|
||||||
# Ensure telemetry issues do not affect normal flow
|
# Ensure telemetry issues do not affect normal flow
|
||||||
self.ap.logger.warning(f'Failed to send telemetry: {ex}')
|
self.ap.logger.warning(f'Failed to send telemetry: {ex}')
|
||||||
|
|||||||
@@ -514,6 +514,35 @@ class RuntimeConnectionHandler(handler.Handler):
|
|||||||
except Exception as e:
|
except Exception as e:
|
||||||
return _make_rag_error_response(e, 'EmbeddingError', embedding_model_uuid=embedding_model_uuid)
|
return _make_rag_error_response(e, 'EmbeddingError', embedding_model_uuid=embedding_model_uuid)
|
||||||
|
|
||||||
|
@self.action(PluginToRuntimeAction.INVOKE_RERANK)
|
||||||
|
async def invoke_rerank(data: dict[str, Any]) -> handler.ActionResponse:
|
||||||
|
rerank_model_uuid = data['rerank_model_uuid']
|
||||||
|
query = data['query']
|
||||||
|
documents = data['documents']
|
||||||
|
top_k = data.get('top_k')
|
||||||
|
extra_args = data.get('extra_args', {})
|
||||||
|
|
||||||
|
try:
|
||||||
|
rerank_model = await self.ap.model_mgr.get_rerank_model_by_uuid(rerank_model_uuid)
|
||||||
|
except ValueError:
|
||||||
|
return handler.ActionResponse.error(
|
||||||
|
message=f'Rerank model with rerank_model_uuid {rerank_model_uuid} not found',
|
||||||
|
)
|
||||||
|
|
||||||
|
try:
|
||||||
|
scores = await rerank_model.provider.invoke_rerank(
|
||||||
|
model=rerank_model,
|
||||||
|
query=query,
|
||||||
|
documents=documents[:64],
|
||||||
|
extra_args=extra_args,
|
||||||
|
)
|
||||||
|
scored = sorted(scores, key=lambda x: x.get('relevance_score', 0), reverse=True)
|
||||||
|
if top_k is not None:
|
||||||
|
scored = scored[: int(top_k)]
|
||||||
|
return handler.ActionResponse.success(data={'results': scored})
|
||||||
|
except Exception as e:
|
||||||
|
return _make_rag_error_response(e, 'RerankError', rerank_model_uuid=rerank_model_uuid)
|
||||||
|
|
||||||
@self.action(PluginToRuntimeAction.VECTOR_UPSERT)
|
@self.action(PluginToRuntimeAction.VECTOR_UPSERT)
|
||||||
async def vector_upsert(data: dict[str, Any]) -> handler.ActionResponse:
|
async def vector_upsert(data: dict[str, Any]) -> handler.ActionResponse:
|
||||||
collection_id = data['collection_id']
|
collection_id = data['collection_id']
|
||||||
|
|||||||
@@ -4,6 +4,7 @@ import json
|
|||||||
import copy
|
import copy
|
||||||
import typing
|
import typing
|
||||||
from .. import runner
|
from .. import runner
|
||||||
|
from ...telemetry import features as telemetry_features
|
||||||
from ..modelmgr import requester as modelmgr_requester
|
from ..modelmgr import requester as modelmgr_requester
|
||||||
from ..tools.loaders.native import EXEC_TOOL_NAME
|
from ..tools.loaders.native import EXEC_TOOL_NAME
|
||||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||||
@@ -187,6 +188,8 @@ class LocalAgentRunner(runner.RequestRunner):
|
|||||||
# only support text for now
|
# only support text for now
|
||||||
all_results: list[rag_context.RetrievalResultEntry] = []
|
all_results: list[rag_context.RetrievalResultEntry] = []
|
||||||
|
|
||||||
|
kb_engine_plugins: set[str] = set()
|
||||||
|
|
||||||
# Retrieve from each knowledge base
|
# Retrieve from each knowledge base
|
||||||
for kb_uuid in kb_uuids:
|
for kb_uuid in kb_uuids:
|
||||||
kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid)
|
kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid)
|
||||||
@@ -195,6 +198,12 @@ class LocalAgentRunner(runner.RequestRunner):
|
|||||||
self.ap.logger.warning(f'Knowledge base {kb_uuid} not found, skipping')
|
self.ap.logger.warning(f'Knowledge base {kb_uuid} not found, skipping')
|
||||||
continue
|
continue
|
||||||
|
|
||||||
|
try:
|
||||||
|
engine_plugin_id = kb.get_knowledge_engine_plugin_id() or 'builtin'
|
||||||
|
except Exception:
|
||||||
|
engine_plugin_id = 'builtin'
|
||||||
|
kb_engine_plugins.add(engine_plugin_id)
|
||||||
|
|
||||||
result = await kb.retrieve(
|
result = await kb.retrieve(
|
||||||
user_message_text,
|
user_message_text,
|
||||||
settings={
|
settings={
|
||||||
@@ -207,6 +216,17 @@ class LocalAgentRunner(runner.RequestRunner):
|
|||||||
if result:
|
if result:
|
||||||
all_results.extend(result)
|
all_results.extend(result)
|
||||||
|
|
||||||
|
# Telemetry: knowledge base usage (counts and engine categories only)
|
||||||
|
telemetry_features.set_value(
|
||||||
|
query,
|
||||||
|
'kb',
|
||||||
|
{
|
||||||
|
'kb_count': len(kb_uuids),
|
||||||
|
'engine_plugins': sorted(kb_engine_plugins),
|
||||||
|
'retrieved_entries': len(all_results),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
|
||||||
# Rerank step: re-score results using a rerank model if configured
|
# Rerank step: re-score results using a rerank model if configured
|
||||||
local_agent_config = query.pipeline_config.get('ai', {}).get('local-agent', {})
|
local_agent_config = query.pipeline_config.get('ai', {}).get('local-agent', {})
|
||||||
rerank_model_uuid = local_agent_config.get('rerank-model', '')
|
rerank_model_uuid = local_agent_config.get('rerank-model', '')
|
||||||
@@ -373,6 +393,7 @@ class LocalAgentRunner(runner.RequestRunner):
|
|||||||
tool_call_round = 0
|
tool_call_round = 0
|
||||||
while pending_tool_calls:
|
while pending_tool_calls:
|
||||||
tool_call_round += 1
|
tool_call_round += 1
|
||||||
|
telemetry_features.set_value(query, 'tool_call_rounds', tool_call_round)
|
||||||
if tool_call_round > MAX_TOOL_CALL_ROUNDS:
|
if tool_call_round > MAX_TOOL_CALL_ROUNDS:
|
||||||
self.ap.logger.warning(
|
self.ap.logger.warning(
|
||||||
f'Tool-call loop reached the {MAX_TOOL_CALL_ROUNDS}-round cap '
|
f'Tool-call loop reached the {MAX_TOOL_CALL_ROUNDS}-round cap '
|
||||||
|
|||||||
@@ -97,13 +97,19 @@ class ToolManager:
|
|||||||
return tools
|
return tools
|
||||||
|
|
||||||
async def execute_func_call(self, name: str, parameters: dict, query: pipeline_query.Query) -> typing.Any:
|
async def execute_func_call(self, name: str, parameters: dict, query: pipeline_query.Query) -> typing.Any:
|
||||||
|
from langbot.pkg.telemetry import features as telemetry_features
|
||||||
|
|
||||||
if await self.native_tool_loader.has_tool(name):
|
if await self.native_tool_loader.has_tool(name):
|
||||||
|
telemetry_features.increment(query, 'tool_calls', 'native')
|
||||||
return await self.native_tool_loader.invoke_tool(name, parameters, query)
|
return await self.native_tool_loader.invoke_tool(name, parameters, query)
|
||||||
if await self.plugin_tool_loader.has_tool(name):
|
if await self.plugin_tool_loader.has_tool(name):
|
||||||
|
telemetry_features.increment(query, 'tool_calls', 'plugin')
|
||||||
return await self.plugin_tool_loader.invoke_tool(name, parameters, query)
|
return await self.plugin_tool_loader.invoke_tool(name, parameters, query)
|
||||||
if await self.mcp_tool_loader.has_tool(name):
|
if await self.mcp_tool_loader.has_tool(name):
|
||||||
|
telemetry_features.increment(query, 'tool_calls', 'mcp')
|
||||||
return await self.mcp_tool_loader.invoke_tool(name, parameters, query)
|
return await self.mcp_tool_loader.invoke_tool(name, parameters, query)
|
||||||
if await self.skill_tool_loader.has_tool(name):
|
if await self.skill_tool_loader.has_tool(name):
|
||||||
|
telemetry_features.increment(query, 'tool_calls', 'skill')
|
||||||
return await self.skill_tool_loader.invoke_tool(name, parameters, query)
|
return await self.skill_tool_loader.invoke_tool(name, parameters, query)
|
||||||
raise ValueError(f'未找到工具: {name}')
|
raise ValueError(f'未找到工具: {name}')
|
||||||
|
|
||||||
|
|||||||
@@ -13,6 +13,11 @@ from ..entity.persistence.metadata import Metadata
|
|||||||
from ..utils import constants
|
from ..utils import constants
|
||||||
|
|
||||||
SURVEY_TRIGGERED_KEY = 'survey_triggered_events'
|
SURVEY_TRIGGERED_KEY = 'survey_triggered_events'
|
||||||
|
BOT_RESPONSE_COUNT_KEY = 'survey_bot_response_count'
|
||||||
|
|
||||||
|
# Milestone event fired when an instance accumulates this many successful bot responses
|
||||||
|
BOT_RESPONSE_MILESTONE = 100
|
||||||
|
BOT_RESPONSE_MILESTONE_EVENT = f'bot_response_success_{BOT_RESPONSE_MILESTONE}'
|
||||||
|
|
||||||
|
|
||||||
class SurveyManager:
|
class SurveyManager:
|
||||||
@@ -23,11 +28,13 @@ class SurveyManager:
|
|||||||
self._triggered_events: set[str] = set()
|
self._triggered_events: set[str] = set()
|
||||||
self._pending_survey: typing.Optional[dict] = None
|
self._pending_survey: typing.Optional[dict] = None
|
||||||
self._space_url: str = ''
|
self._space_url: str = ''
|
||||||
|
self._bot_response_count: int = 0
|
||||||
|
|
||||||
async def initialize(self):
|
async def initialize(self):
|
||||||
space_config = self.ap.instance_config.data.get('space', {})
|
space_config = self.ap.instance_config.data.get('space', {})
|
||||||
self._space_url = space_config.get('url', '').rstrip('/')
|
self._space_url = space_config.get('url', '').rstrip('/')
|
||||||
await self._load_triggered_events()
|
await self._load_triggered_events()
|
||||||
|
await self._load_bot_response_count()
|
||||||
|
|
||||||
async def _load_triggered_events(self):
|
async def _load_triggered_events(self):
|
||||||
"""Load previously triggered events from metadata table."""
|
"""Load previously triggered events from metadata table."""
|
||||||
@@ -65,6 +72,54 @@ class SurveyManager:
|
|||||||
return False
|
return False
|
||||||
return bool(self._space_url)
|
return bool(self._space_url)
|
||||||
|
|
||||||
|
async def _load_bot_response_count(self):
|
||||||
|
"""Load the persisted successful bot response count from metadata table."""
|
||||||
|
try:
|
||||||
|
result = await self.ap.persistence_mgr.execute_async(
|
||||||
|
sqlalchemy.select(Metadata).where(Metadata.key == BOT_RESPONSE_COUNT_KEY)
|
||||||
|
)
|
||||||
|
row = result.first()
|
||||||
|
if row:
|
||||||
|
self._bot_response_count = int(row[0].value)
|
||||||
|
except Exception:
|
||||||
|
self._bot_response_count = 0
|
||||||
|
|
||||||
|
async def _save_bot_response_count(self):
|
||||||
|
"""Persist the successful bot response count to metadata table."""
|
||||||
|
try:
|
||||||
|
value = str(self._bot_response_count)
|
||||||
|
result = await self.ap.persistence_mgr.execute_async(
|
||||||
|
sqlalchemy.select(Metadata).where(Metadata.key == BOT_RESPONSE_COUNT_KEY)
|
||||||
|
)
|
||||||
|
if result.first():
|
||||||
|
await self.ap.persistence_mgr.execute_async(
|
||||||
|
sqlalchemy.update(Metadata).where(Metadata.key == BOT_RESPONSE_COUNT_KEY).values(value=value)
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
await self.ap.persistence_mgr.execute_async(
|
||||||
|
sqlalchemy.insert(Metadata).values(key=BOT_RESPONSE_COUNT_KEY, value=value)
|
||||||
|
)
|
||||||
|
except Exception as e:
|
||||||
|
self.ap.logger.debug(f'Failed to save survey bot response count: {e}')
|
||||||
|
|
||||||
|
async def record_bot_response_success(self):
|
||||||
|
"""Count a successful bot response; fires the milestone event at the threshold.
|
||||||
|
|
||||||
|
Called by the chat handler after each successful (non-WebSocket) response.
|
||||||
|
The count is persisted so it survives restarts. Once the milestone event
|
||||||
|
has been triggered, counting stops (no further writes needed).
|
||||||
|
"""
|
||||||
|
if BOT_RESPONSE_MILESTONE_EVENT in self._triggered_events:
|
||||||
|
return
|
||||||
|
if not self._is_space_configured():
|
||||||
|
return
|
||||||
|
|
||||||
|
self._bot_response_count += 1
|
||||||
|
await self._save_bot_response_count()
|
||||||
|
|
||||||
|
if self._bot_response_count >= BOT_RESPONSE_MILESTONE:
|
||||||
|
await self.trigger_event(BOT_RESPONSE_MILESTONE_EVENT)
|
||||||
|
|
||||||
async def trigger_event(self, event: str):
|
async def trigger_event(self, event: str):
|
||||||
"""Called when an event occurs. Checks Space for a pending survey."""
|
"""Called when an event occurs. Checks Space for a pending survey."""
|
||||||
if event in self._triggered_events:
|
if event in self._triggered_events:
|
||||||
|
|||||||
102
src/langbot/pkg/telemetry/features.py
Normal file
102
src/langbot/pkg/telemetry/features.py
Normal file
@@ -0,0 +1,102 @@
|
|||||||
|
"""Per-query telemetry feature counters.
|
||||||
|
|
||||||
|
Collects anonymous, content-free usage signals (tool call counts, knowledge
|
||||||
|
base usage, sandbox executions, ...) into ``query.variables`` during query
|
||||||
|
processing. The chat handler reads the accumulated dict when building the
|
||||||
|
telemetry payload and ships it as the ``features`` JSON object.
|
||||||
|
|
||||||
|
Every helper here is defensive: telemetry must NEVER break the pipeline, so
|
||||||
|
all mutations are wrapped and failures are silently ignored.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import typing
|
||||||
|
|
||||||
|
if typing.TYPE_CHECKING:
|
||||||
|
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||||
|
|
||||||
|
|
||||||
|
FEATURES_KEY = '_telemetry_features'
|
||||||
|
|
||||||
|
|
||||||
|
def get_features(query: pipeline_query.Query) -> dict:
|
||||||
|
"""Return the mutable features dict for this query, creating it if needed."""
|
||||||
|
try:
|
||||||
|
return query.variables.setdefault(FEATURES_KEY, {})
|
||||||
|
except Exception:
|
||||||
|
return {}
|
||||||
|
|
||||||
|
|
||||||
|
def increment(query: pipeline_query.Query, group: str, key: str | None = None, amount: int = 1) -> None:
|
||||||
|
"""Increment a counter.
|
||||||
|
|
||||||
|
``increment(q, 'sandbox', 'execs')`` -> features['sandbox']['execs'] += 1
|
||||||
|
``increment(q, 'tool_call_rounds')`` -> features['tool_call_rounds'] += 1
|
||||||
|
"""
|
||||||
|
try:
|
||||||
|
features = get_features(query)
|
||||||
|
if key is None:
|
||||||
|
features[group] = int(features.get(group, 0)) + amount
|
||||||
|
else:
|
||||||
|
nested = features.setdefault(group, {})
|
||||||
|
if isinstance(nested, dict):
|
||||||
|
nested[key] = int(nested.get(key, 0)) + amount
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def set_value(query: pipeline_query.Query, group: str, value: typing.Any) -> None:
|
||||||
|
"""Set a feature value (overwrites)."""
|
||||||
|
try:
|
||||||
|
get_features(query)[group] = value
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def collect_features(query: pipeline_query.Query) -> dict:
|
||||||
|
"""Build the final ``features`` object for the telemetry payload.
|
||||||
|
|
||||||
|
Combines the counters accumulated during processing with end-of-query
|
||||||
|
snapshots (activated skills, bound MCP servers). Returns a plain dict
|
||||||
|
that must be JSON-serializable; non-serializable values are dropped.
|
||||||
|
"""
|
||||||
|
features: dict = {}
|
||||||
|
try:
|
||||||
|
accumulated = query.variables.get(FEATURES_KEY)
|
||||||
|
if isinstance(accumulated, dict):
|
||||||
|
features.update(accumulated)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Activated skills (names only, registered by the activate tool)
|
||||||
|
try:
|
||||||
|
activated = query.variables.get('_activated_skills', {})
|
||||||
|
if isinstance(activated, dict) and activated:
|
||||||
|
features['activated_skills'] = sorted(activated.keys())
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# MCP servers bound to the pipeline (names only; None means "all enabled")
|
||||||
|
try:
|
||||||
|
bound_mcp = query.variables.get('_pipeline_bound_mcp_servers', None)
|
||||||
|
if bound_mcp is not None:
|
||||||
|
features['mcp_servers'] = list(bound_mcp)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Drop anything that is not JSON-serializable
|
||||||
|
import json
|
||||||
|
|
||||||
|
try:
|
||||||
|
json.dumps(features)
|
||||||
|
return features
|
||||||
|
except Exception:
|
||||||
|
safe: dict = {}
|
||||||
|
for k, v in features.items():
|
||||||
|
try:
|
||||||
|
json.dumps({k: v})
|
||||||
|
safe[k] = v
|
||||||
|
except Exception:
|
||||||
|
continue
|
||||||
|
return safe
|
||||||
131
src/langbot/pkg/telemetry/heartbeat.py
Normal file
131
src/langbot/pkg/telemetry/heartbeat.py
Normal file
@@ -0,0 +1,131 @@
|
|||||||
|
"""Instance heartbeat telemetry.
|
||||||
|
|
||||||
|
Sends a periodic (startup + daily) anonymous snapshot of the instance's
|
||||||
|
configuration profile so feature *adoption* can be measured separately from
|
||||||
|
feature *usage* (which is covered by per-query telemetry).
|
||||||
|
|
||||||
|
The snapshot contains only configuration categories and object counts —
|
||||||
|
never names of user resources (except adapter type names, which are LangBot
|
||||||
|
adapter identifiers, not account info), never message content, never
|
||||||
|
credentials.
|
||||||
|
"""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import asyncio
|
||||||
|
import typing
|
||||||
|
from datetime import datetime, timezone
|
||||||
|
|
||||||
|
import sqlalchemy
|
||||||
|
|
||||||
|
from ..utils import constants, platform as platform_utils
|
||||||
|
|
||||||
|
if typing.TYPE_CHECKING:
|
||||||
|
from ..core import app as core_app
|
||||||
|
|
||||||
|
|
||||||
|
HEARTBEAT_INTERVAL_SECONDS = 24 * 3600
|
||||||
|
|
||||||
|
|
||||||
|
async def _count(ap: core_app.Application, table) -> int:
|
||||||
|
"""Count rows in a persistence table; -1 when unavailable."""
|
||||||
|
try:
|
||||||
|
result = await ap.persistence_mgr.execute_async(sqlalchemy.select(sqlalchemy.func.count()).select_from(table))
|
||||||
|
return int(result.scalar() or 0)
|
||||||
|
except Exception:
|
||||||
|
return -1
|
||||||
|
|
||||||
|
|
||||||
|
async def build_heartbeat_payload(ap: core_app.Application) -> dict:
|
||||||
|
"""Collect the anonymous instance profile snapshot."""
|
||||||
|
from ..entity.persistence import bot as persistence_bot
|
||||||
|
from ..entity.persistence import mcp as persistence_mcp
|
||||||
|
from ..entity.persistence import pipeline as persistence_pipeline
|
||||||
|
from ..entity.persistence import rag as persistence_rag
|
||||||
|
|
||||||
|
config = ap.instance_config.data if ap.instance_config else {}
|
||||||
|
|
||||||
|
features: dict = {
|
||||||
|
'deploy_platform': platform_utils.get_platform(),
|
||||||
|
'database': config.get('database', {}).get('use', 'sqlite'),
|
||||||
|
'vdb': config.get('vdb', {}).get('use', 'chroma'),
|
||||||
|
}
|
||||||
|
|
||||||
|
# Box / sandbox profile
|
||||||
|
try:
|
||||||
|
box_service = getattr(ap, 'box_service', None)
|
||||||
|
if box_service is not None:
|
||||||
|
box_info: dict = {
|
||||||
|
'enabled': bool(box_service.enabled),
|
||||||
|
'available': bool(box_service.available),
|
||||||
|
}
|
||||||
|
box_cfg = config.get('box', {})
|
||||||
|
box_info['backend'] = box_cfg.get('backend', 'local')
|
||||||
|
try:
|
||||||
|
box_info['shares_fs'] = bool(box_service.shares_filesystem_with_box)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
features['box'] = box_info
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Bots / adapters (adapter type names only)
|
||||||
|
try:
|
||||||
|
platform_mgr = getattr(ap, 'platform_mgr', None)
|
||||||
|
if platform_mgr is not None and getattr(platform_mgr, 'bots', None) is not None:
|
||||||
|
enabled_bots = [bot for bot in platform_mgr.bots if getattr(bot, 'enable', False)]
|
||||||
|
features['bot_count'] = len(platform_mgr.bots)
|
||||||
|
adapters = sorted({bot.adapter.__class__.__name__ for bot in enabled_bots if getattr(bot, 'adapter', None)})
|
||||||
|
features['adapters'] = adapters
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
# Resource counts
|
||||||
|
features['pipeline_count'] = await _count(ap, persistence_pipeline.LegacyPipeline)
|
||||||
|
features['mcp_server_count'] = await _count(ap, persistence_mcp.MCPServer)
|
||||||
|
features['knowledge_base_count'] = await _count(ap, persistence_rag.KnowledgeBase)
|
||||||
|
if 'bot_count' not in features:
|
||||||
|
features['bot_count'] = await _count(ap, persistence_bot.Bot)
|
||||||
|
|
||||||
|
# Plugin count (from plugin runtime)
|
||||||
|
try:
|
||||||
|
plugin_connector = getattr(ap, 'plugin_connector', None)
|
||||||
|
if plugin_connector is not None:
|
||||||
|
plugins = await plugin_connector.list_plugins()
|
||||||
|
features['plugin_count'] = len(plugins)
|
||||||
|
except Exception:
|
||||||
|
features['plugin_count'] = -1
|
||||||
|
|
||||||
|
# Skill count (from Box runtime via skill manager)
|
||||||
|
try:
|
||||||
|
skill_mgr = getattr(ap, 'skill_mgr', None)
|
||||||
|
if skill_mgr is not None and getattr(skill_mgr, 'skills', None) is not None:
|
||||||
|
features['skill_count'] = len(skill_mgr.skills)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
|
||||||
|
return {
|
||||||
|
'event_type': 'instance_heartbeat',
|
||||||
|
'query_id': '',
|
||||||
|
'version': constants.semantic_version,
|
||||||
|
'instance_id': constants.instance_id,
|
||||||
|
'edition': constants.edition,
|
||||||
|
'features': features,
|
||||||
|
'timestamp': datetime.now(timezone.utc).isoformat(),
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
async def heartbeat_loop(ap: core_app.Application) -> None:
|
||||||
|
"""Send one heartbeat shortly after startup, then daily."""
|
||||||
|
# Small delay so managers (platform, skills, plugins) finish loading first
|
||||||
|
await asyncio.sleep(30)
|
||||||
|
while True:
|
||||||
|
try:
|
||||||
|
payload = await build_heartbeat_payload(ap)
|
||||||
|
await ap.telemetry.start_send_task(payload)
|
||||||
|
except Exception as e:
|
||||||
|
try:
|
||||||
|
ap.logger.debug(f'Telemetry heartbeat failed: {e}')
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
await asyncio.sleep(HEARTBEAT_INTERVAL_SECONDS)
|
||||||
@@ -68,10 +68,21 @@ class TelemetryManager:
|
|||||||
'edition',
|
'edition',
|
||||||
'error',
|
'error',
|
||||||
'timestamp',
|
'timestamp',
|
||||||
|
'event_type',
|
||||||
):
|
):
|
||||||
|
if sfield not in sanitized:
|
||||||
|
continue
|
||||||
v = sanitized.get(sfield)
|
v = sanitized.get(sfield)
|
||||||
sanitized[sfield] = '' if v is None else str(v)
|
sanitized[sfield] = '' if v is None else str(v)
|
||||||
|
|
||||||
|
# event_type defaults to 'query' for backward compatibility
|
||||||
|
if not sanitized.get('event_type'):
|
||||||
|
sanitized['event_type'] = 'query'
|
||||||
|
|
||||||
|
# features must be a JSON object
|
||||||
|
if 'features' in sanitized and not isinstance(sanitized['features'], dict):
|
||||||
|
sanitized['features'] = {}
|
||||||
|
|
||||||
if 'duration_ms' in sanitized:
|
if 'duration_ms' in sanitized:
|
||||||
try:
|
try:
|
||||||
sanitized['duration_ms'] = (
|
sanitized['duration_ms'] = (
|
||||||
|
|||||||
@@ -27,6 +27,66 @@ def compiled_params(statement):
|
|||||||
return statement.compile().params
|
return statement.compile().params
|
||||||
|
|
||||||
|
|
||||||
|
class TestRagRerankAction:
|
||||||
|
"""Tests for RAG rerank action handler."""
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def app(self):
|
||||||
|
mock_app = Mock()
|
||||||
|
mock_app.model_mgr = Mock()
|
||||||
|
mock_app.logger = Mock()
|
||||||
|
return mock_app
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_invokes_rerank_model_and_sorts_scores(self, app):
|
||||||
|
"""Rerank action uses the selected model and returns top scores."""
|
||||||
|
provider = Mock()
|
||||||
|
provider.invoke_rerank = AsyncMock(
|
||||||
|
return_value=[
|
||||||
|
{'index': 0, 'relevance_score': 0.2},
|
||||||
|
{'index': 1, 'relevance_score': 0.9},
|
||||||
|
]
|
||||||
|
)
|
||||||
|
rerank_model = SimpleNamespace(provider=provider)
|
||||||
|
app.model_mgr.get_rerank_model_by_uuid = AsyncMock(return_value=rerank_model)
|
||||||
|
runtime_handler = make_handler(app)
|
||||||
|
|
||||||
|
response = await runtime_handler.actions[PluginToRuntimeAction.INVOKE_RERANK.value]({
|
||||||
|
'rerank_model_uuid': 'rerank-1',
|
||||||
|
'query': 'hello',
|
||||||
|
'documents': ['a', 'b'],
|
||||||
|
'top_k': 1,
|
||||||
|
'extra_args': {'return_documents': False},
|
||||||
|
})
|
||||||
|
|
||||||
|
assert response.code == 0
|
||||||
|
assert response.data['results'] == [{'index': 1, 'relevance_score': 0.9}]
|
||||||
|
app.model_mgr.get_rerank_model_by_uuid.assert_awaited_once_with('rerank-1')
|
||||||
|
provider.invoke_rerank.assert_awaited_once_with(
|
||||||
|
model=rerank_model,
|
||||||
|
query='hello',
|
||||||
|
documents=['a', 'b'],
|
||||||
|
extra_args={'return_documents': False},
|
||||||
|
)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_returns_error_when_rerank_model_missing(self, app):
|
||||||
|
"""Missing rerank model returns an action error."""
|
||||||
|
app.model_mgr.get_rerank_model_by_uuid = AsyncMock(
|
||||||
|
side_effect=ValueError('not found')
|
||||||
|
)
|
||||||
|
runtime_handler = make_handler(app)
|
||||||
|
|
||||||
|
response = await runtime_handler.actions[PluginToRuntimeAction.INVOKE_RERANK.value]({
|
||||||
|
'rerank_model_uuid': 'missing',
|
||||||
|
'query': 'hello',
|
||||||
|
'documents': ['a'],
|
||||||
|
})
|
||||||
|
|
||||||
|
assert response.code != 0
|
||||||
|
assert 'Rerank model with rerank_model_uuid missing not found' in response.message
|
||||||
|
|
||||||
|
|
||||||
class TestInitializePluginSettings:
|
class TestInitializePluginSettings:
|
||||||
"""Tests for initialize_plugin_settings action handler."""
|
"""Tests for initialize_plugin_settings action handler."""
|
||||||
|
|
||||||
|
|||||||
@@ -7,6 +7,7 @@ Tests cover:
|
|||||||
- Survey response submission
|
- Survey response submission
|
||||||
- Survey dismissal
|
- Survey dismissal
|
||||||
"""
|
"""
|
||||||
|
|
||||||
from __future__ import annotations
|
from __future__ import annotations
|
||||||
|
|
||||||
import pytest
|
import pytest
|
||||||
@@ -127,9 +128,7 @@ class TestLoadTriggeredEvents:
|
|||||||
"""Test that empty set is used when no events stored."""
|
"""Test that empty set is used when no events stored."""
|
||||||
survey_module = get_survey_module()
|
survey_module = get_survey_module()
|
||||||
mock_app = create_mock_app()
|
mock_app = create_mock_app()
|
||||||
mock_app.persistence_mgr.execute_async = AsyncMock(
|
mock_app.persistence_mgr.execute_async = AsyncMock(return_value=Mock(first=Mock(return_value=None)))
|
||||||
return_value=Mock(first=Mock(return_value=None))
|
|
||||||
)
|
|
||||||
|
|
||||||
manager = survey_module.SurveyManager(mock_app)
|
manager = survey_module.SurveyManager(mock_app)
|
||||||
await manager._load_triggered_events()
|
await manager._load_triggered_events()
|
||||||
@@ -219,9 +218,7 @@ class TestTriggerEvent:
|
|||||||
"""Test that new event is added and saved."""
|
"""Test that new event is added and saved."""
|
||||||
survey_module = get_survey_module()
|
survey_module = get_survey_module()
|
||||||
mock_app = create_mock_app()
|
mock_app = create_mock_app()
|
||||||
mock_app.persistence_mgr.execute_async = AsyncMock(
|
mock_app.persistence_mgr.execute_async = AsyncMock(return_value=Mock(first=Mock(return_value=None)))
|
||||||
return_value=Mock(first=Mock(return_value=None))
|
|
||||||
)
|
|
||||||
|
|
||||||
manager = survey_module.SurveyManager(mock_app)
|
manager = survey_module.SurveyManager(mock_app)
|
||||||
manager._space_url = 'https://space.example.com'
|
manager._space_url = 'https://space.example.com'
|
||||||
@@ -231,6 +228,104 @@ class TestTriggerEvent:
|
|||||||
assert 'new_event' in manager._triggered_events
|
assert 'new_event' in manager._triggered_events
|
||||||
|
|
||||||
|
|
||||||
|
class TestRecordBotResponseSuccess:
|
||||||
|
"""Tests for the bot_response_success_100 milestone counter."""
|
||||||
|
|
||||||
|
def _make_manager(self, survey_module, mock_app):
|
||||||
|
manager = survey_module.SurveyManager(mock_app)
|
||||||
|
manager._space_url = 'https://space.example.com'
|
||||||
|
# No existing metadata rows: select returns no row
|
||||||
|
mock_app.persistence_mgr.execute_async = AsyncMock(return_value=Mock(first=Mock(return_value=None)))
|
||||||
|
return manager
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_increments_and_persists_count(self):
|
||||||
|
survey_module = get_survey_module()
|
||||||
|
mock_app = create_mock_app()
|
||||||
|
manager = self._make_manager(survey_module, mock_app)
|
||||||
|
|
||||||
|
await manager.record_bot_response_success()
|
||||||
|
|
||||||
|
assert manager._bot_response_count == 1
|
||||||
|
# select + insert for the count key
|
||||||
|
assert mock_app.persistence_mgr.execute_async.call_count >= 2
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_fires_milestone_event_at_threshold(self):
|
||||||
|
survey_module = get_survey_module()
|
||||||
|
mock_app = create_mock_app()
|
||||||
|
manager = self._make_manager(survey_module, mock_app)
|
||||||
|
manager._bot_response_count = survey_module.BOT_RESPONSE_MILESTONE - 1
|
||||||
|
|
||||||
|
await manager.record_bot_response_success()
|
||||||
|
|
||||||
|
assert manager._bot_response_count == survey_module.BOT_RESPONSE_MILESTONE
|
||||||
|
assert survey_module.BOT_RESPONSE_MILESTONE_EVENT in manager._triggered_events
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_does_not_fire_below_threshold(self):
|
||||||
|
survey_module = get_survey_module()
|
||||||
|
mock_app = create_mock_app()
|
||||||
|
manager = self._make_manager(survey_module, mock_app)
|
||||||
|
manager._bot_response_count = 5
|
||||||
|
|
||||||
|
await manager.record_bot_response_success()
|
||||||
|
|
||||||
|
assert survey_module.BOT_RESPONSE_MILESTONE_EVENT not in manager._triggered_events
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_stops_counting_after_milestone_triggered(self):
|
||||||
|
survey_module = get_survey_module()
|
||||||
|
mock_app = create_mock_app()
|
||||||
|
manager = self._make_manager(survey_module, mock_app)
|
||||||
|
manager._triggered_events.add(survey_module.BOT_RESPONSE_MILESTONE_EVENT)
|
||||||
|
manager._bot_response_count = survey_module.BOT_RESPONSE_MILESTONE
|
||||||
|
|
||||||
|
await manager.record_bot_response_success()
|
||||||
|
|
||||||
|
# No persistence write, count unchanged
|
||||||
|
mock_app.persistence_mgr.execute_async.assert_not_called()
|
||||||
|
assert manager._bot_response_count == survey_module.BOT_RESPONSE_MILESTONE
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_skips_when_space_not_configured(self):
|
||||||
|
survey_module = get_survey_module()
|
||||||
|
mock_app = create_mock_app()
|
||||||
|
manager = self._make_manager(survey_module, mock_app)
|
||||||
|
manager._space_url = ''
|
||||||
|
|
||||||
|
await manager.record_bot_response_success()
|
||||||
|
|
||||||
|
assert manager._bot_response_count == 0
|
||||||
|
mock_app.persistence_mgr.execute_async.assert_not_called()
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_count_loaded_on_initialize(self):
|
||||||
|
survey_module = get_survey_module()
|
||||||
|
mock_app = create_mock_app()
|
||||||
|
|
||||||
|
count_row = Mock()
|
||||||
|
count_row.value = '42'
|
||||||
|
|
||||||
|
def execute_side_effect(stmt):
|
||||||
|
result = Mock()
|
||||||
|
# Both _load_triggered_events and _load_bot_response_count select
|
||||||
|
# from Metadata; return the count row only for the count key.
|
||||||
|
stmt_str = str(stmt.compile(compile_kwargs={'literal_binds': True}))
|
||||||
|
if survey_module.BOT_RESPONSE_COUNT_KEY in stmt_str:
|
||||||
|
result.first.return_value = (count_row,)
|
||||||
|
else:
|
||||||
|
result.first.return_value = None
|
||||||
|
return result
|
||||||
|
|
||||||
|
mock_app.persistence_mgr.execute_async = AsyncMock(side_effect=execute_side_effect)
|
||||||
|
|
||||||
|
manager = survey_module.SurveyManager(mock_app)
|
||||||
|
await manager.initialize()
|
||||||
|
|
||||||
|
assert manager._bot_response_count == 42
|
||||||
|
|
||||||
|
|
||||||
class TestPendingSurvey:
|
class TestPendingSurvey:
|
||||||
"""Tests for get_pending_survey and clear_pending_survey."""
|
"""Tests for get_pending_survey and clear_pending_survey."""
|
||||||
|
|
||||||
@@ -296,14 +391,19 @@ class TestSubmitResponse:
|
|||||||
|
|
||||||
# Mock successful HTTP response
|
# Mock successful HTTP response
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
mock_response = Mock()
|
mock_response = Mock()
|
||||||
mock_response.status_code = 200
|
mock_response.status_code = 200
|
||||||
|
|
||||||
with pytest.MonkeyPatch().context() as m:
|
with pytest.MonkeyPatch().context() as m:
|
||||||
m.setattr(httpx, 'AsyncClient', lambda **kwargs: MagicMock(
|
m.setattr(
|
||||||
__aenter__=AsyncMock(return_value=Mock(post=AsyncMock(return_value=mock_response))),
|
httpx,
|
||||||
__aexit__=AsyncMock(return_value=None)
|
'AsyncClient',
|
||||||
))
|
lambda **kwargs: MagicMock(
|
||||||
|
__aenter__=AsyncMock(return_value=Mock(post=AsyncMock(return_value=mock_response))),
|
||||||
|
__aexit__=AsyncMock(return_value=None),
|
||||||
|
),
|
||||||
|
)
|
||||||
result = await manager.submit_response('survey123', {'q1': 'answer1'})
|
result = await manager.submit_response('survey123', {'q1': 'answer1'})
|
||||||
|
|
||||||
assert result is True
|
assert result is True
|
||||||
@@ -338,15 +438,20 @@ class TestDismissSurvey:
|
|||||||
|
|
||||||
# Mock successful HTTP response
|
# Mock successful HTTP response
|
||||||
import httpx
|
import httpx
|
||||||
|
|
||||||
mock_response = Mock()
|
mock_response = Mock()
|
||||||
mock_response.status_code = 200
|
mock_response.status_code = 200
|
||||||
|
|
||||||
with pytest.MonkeyPatch().context() as m:
|
with pytest.MonkeyPatch().context() as m:
|
||||||
m.setattr(httpx, 'AsyncClient', lambda **kwargs: MagicMock(
|
m.setattr(
|
||||||
__aenter__=AsyncMock(return_value=Mock(post=AsyncMock(return_value=mock_response))),
|
httpx,
|
||||||
__aexit__=AsyncMock(return_value=None)
|
'AsyncClient',
|
||||||
))
|
lambda **kwargs: MagicMock(
|
||||||
|
__aenter__=AsyncMock(return_value=Mock(post=AsyncMock(return_value=mock_response))),
|
||||||
|
__aexit__=AsyncMock(return_value=None),
|
||||||
|
),
|
||||||
|
)
|
||||||
result = await manager.dismiss_survey('survey123')
|
result = await manager.dismiss_survey('survey123')
|
||||||
|
|
||||||
assert result is True
|
assert result is True
|
||||||
assert manager._pending_survey is None
|
assert manager._pending_survey is None
|
||||||
|
|||||||
92
tests/unit_tests/telemetry/test_features.py
Normal file
92
tests/unit_tests/telemetry/test_features.py
Normal file
@@ -0,0 +1,92 @@
|
|||||||
|
"""Unit tests for telemetry feature counters (pkg/telemetry/features.py)."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
from importlib import import_module
|
||||||
|
|
||||||
|
|
||||||
|
def get_features_module():
|
||||||
|
return import_module('langbot.pkg.telemetry.features')
|
||||||
|
|
||||||
|
|
||||||
|
class FakeQuery:
|
||||||
|
def __init__(self):
|
||||||
|
self.variables = {}
|
||||||
|
|
||||||
|
|
||||||
|
class TestIncrement:
|
||||||
|
def test_increment_nested_counter(self):
|
||||||
|
features = get_features_module()
|
||||||
|
q = FakeQuery()
|
||||||
|
features.increment(q, 'tool_calls', 'native')
|
||||||
|
features.increment(q, 'tool_calls', 'native')
|
||||||
|
features.increment(q, 'tool_calls', 'mcp')
|
||||||
|
assert q.variables[features.FEATURES_KEY]['tool_calls'] == {'native': 2, 'mcp': 1}
|
||||||
|
|
||||||
|
def test_increment_flat_counter(self):
|
||||||
|
features = get_features_module()
|
||||||
|
q = FakeQuery()
|
||||||
|
features.increment(q, 'something')
|
||||||
|
features.increment(q, 'something', amount=2)
|
||||||
|
assert q.variables[features.FEATURES_KEY]['something'] == 3
|
||||||
|
|
||||||
|
def test_increment_never_raises_on_broken_query(self):
|
||||||
|
features = get_features_module()
|
||||||
|
|
||||||
|
class Broken:
|
||||||
|
@property
|
||||||
|
def variables(self):
|
||||||
|
raise RuntimeError('boom')
|
||||||
|
|
||||||
|
# Must not raise
|
||||||
|
features.increment(Broken(), 'tool_calls', 'native')
|
||||||
|
|
||||||
|
def test_set_value(self):
|
||||||
|
features = get_features_module()
|
||||||
|
q = FakeQuery()
|
||||||
|
features.set_value(q, 'tool_call_rounds', 5)
|
||||||
|
assert q.variables[features.FEATURES_KEY]['tool_call_rounds'] == 5
|
||||||
|
|
||||||
|
|
||||||
|
class TestCollectFeatures:
|
||||||
|
def test_collect_empty(self):
|
||||||
|
features = get_features_module()
|
||||||
|
q = FakeQuery()
|
||||||
|
assert features.collect_features(q) == {}
|
||||||
|
|
||||||
|
def test_collect_combines_counters_and_snapshots(self):
|
||||||
|
features = get_features_module()
|
||||||
|
q = FakeQuery()
|
||||||
|
features.increment(q, 'sandbox', 'execs')
|
||||||
|
features.set_value(q, 'kb', {'kb_count': 2, 'engine_plugins': ['builtin'], 'retrieved_entries': 7})
|
||||||
|
q.variables['_activated_skills'] = {'pdf-tools': {}, 'a-skill': {}}
|
||||||
|
q.variables['_pipeline_bound_mcp_servers'] = ['srv1', 'srv2']
|
||||||
|
|
||||||
|
result = features.collect_features(q)
|
||||||
|
assert result['sandbox'] == {'execs': 1}
|
||||||
|
assert result['kb']['kb_count'] == 2
|
||||||
|
assert result['activated_skills'] == ['a-skill', 'pdf-tools'] # sorted
|
||||||
|
assert result['mcp_servers'] == ['srv1', 'srv2']
|
||||||
|
|
||||||
|
def test_collect_omits_mcp_when_all_enabled(self):
|
||||||
|
"""None means 'all enabled' and is not reported."""
|
||||||
|
features = get_features_module()
|
||||||
|
q = FakeQuery()
|
||||||
|
q.variables['_pipeline_bound_mcp_servers'] = None
|
||||||
|
assert 'mcp_servers' not in features.collect_features(q)
|
||||||
|
|
||||||
|
def test_collect_drops_non_json_serializable(self):
|
||||||
|
features = get_features_module()
|
||||||
|
q = FakeQuery()
|
||||||
|
features.set_value(q, 'good', 1)
|
||||||
|
features.set_value(q, 'bad', object())
|
||||||
|
result = features.collect_features(q)
|
||||||
|
assert result == {'good': 1}
|
||||||
|
|
||||||
|
def test_collect_is_json_serializable(self):
|
||||||
|
import json
|
||||||
|
|
||||||
|
features = get_features_module()
|
||||||
|
q = FakeQuery()
|
||||||
|
features.increment(q, 'tool_calls', 'skill')
|
||||||
|
json.dumps(features.collect_features(q))
|
||||||
104
tests/unit_tests/telemetry/test_heartbeat.py
Normal file
104
tests/unit_tests/telemetry/test_heartbeat.py
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
"""Unit tests for telemetry heartbeat payload (pkg/telemetry/heartbeat.py)."""
|
||||||
|
|
||||||
|
from __future__ import annotations
|
||||||
|
|
||||||
|
import json
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
from unittest.mock import AsyncMock, Mock
|
||||||
|
from importlib import import_module
|
||||||
|
|
||||||
|
|
||||||
|
def get_heartbeat_module():
|
||||||
|
return import_module('langbot.pkg.telemetry.heartbeat')
|
||||||
|
|
||||||
|
|
||||||
|
def make_app():
|
||||||
|
ap = Mock()
|
||||||
|
ap.instance_config = Mock()
|
||||||
|
ap.instance_config.data = {
|
||||||
|
'database': {'use': 'postgresql'},
|
||||||
|
'vdb': {'use': 'chroma'},
|
||||||
|
'box': {'enabled': True, 'backend': 'nsjail'},
|
||||||
|
}
|
||||||
|
|
||||||
|
# persistence counts
|
||||||
|
result = Mock()
|
||||||
|
result.scalar.return_value = 3
|
||||||
|
ap.persistence_mgr = Mock()
|
||||||
|
ap.persistence_mgr.execute_async = AsyncMock(return_value=result)
|
||||||
|
|
||||||
|
# box service
|
||||||
|
ap.box_service = Mock()
|
||||||
|
ap.box_service.enabled = True
|
||||||
|
ap.box_service.available = False
|
||||||
|
ap.box_service.shares_filesystem_with_box = False
|
||||||
|
|
||||||
|
# platform manager with one enabled bot
|
||||||
|
bot = Mock()
|
||||||
|
bot.enable = True
|
||||||
|
bot.adapter = Mock()
|
||||||
|
bot.adapter.__class__.__name__ = 'TelegramAdapter'
|
||||||
|
ap.platform_mgr = Mock()
|
||||||
|
ap.platform_mgr.bots = [bot]
|
||||||
|
|
||||||
|
# plugin connector
|
||||||
|
ap.plugin_connector = Mock()
|
||||||
|
ap.plugin_connector.list_plugins = AsyncMock(return_value=[{}, {}])
|
||||||
|
|
||||||
|
# skills
|
||||||
|
ap.skill_mgr = Mock()
|
||||||
|
ap.skill_mgr.skills = {'a': {}, 'b': {}, 'c': {}}
|
||||||
|
|
||||||
|
return ap
|
||||||
|
|
||||||
|
|
||||||
|
class TestBuildHeartbeatPayload:
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_payload_shape(self):
|
||||||
|
heartbeat = get_heartbeat_module()
|
||||||
|
ap = make_app()
|
||||||
|
payload = await heartbeat.build_heartbeat_payload(ap)
|
||||||
|
|
||||||
|
assert payload['event_type'] == 'instance_heartbeat'
|
||||||
|
assert payload['query_id'] == ''
|
||||||
|
assert 'timestamp' in payload
|
||||||
|
f = payload['features']
|
||||||
|
assert f['database'] == 'postgresql'
|
||||||
|
assert f['vdb'] == 'chroma'
|
||||||
|
assert f['box'] == {
|
||||||
|
'enabled': True,
|
||||||
|
'available': False,
|
||||||
|
'backend': 'nsjail',
|
||||||
|
'shares_fs': False,
|
||||||
|
}
|
||||||
|
assert f['adapters'] == ['TelegramAdapter']
|
||||||
|
assert f['bot_count'] == 1
|
||||||
|
assert f['plugin_count'] == 2
|
||||||
|
assert f['skill_count'] == 3
|
||||||
|
assert f['pipeline_count'] == 3
|
||||||
|
assert f['mcp_server_count'] == 3
|
||||||
|
assert f['knowledge_base_count'] == 3
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_payload_is_json_serializable(self):
|
||||||
|
heartbeat = get_heartbeat_module()
|
||||||
|
payload = await heartbeat.build_heartbeat_payload(make_app())
|
||||||
|
json.dumps(payload)
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_count_failure_yields_minus_one(self):
|
||||||
|
heartbeat = get_heartbeat_module()
|
||||||
|
ap = make_app()
|
||||||
|
ap.persistence_mgr.execute_async = AsyncMock(side_effect=RuntimeError('db down'))
|
||||||
|
payload = await heartbeat.build_heartbeat_payload(ap)
|
||||||
|
assert payload['features']['pipeline_count'] == -1
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_no_user_content_fields(self):
|
||||||
|
"""The heartbeat must never carry message content / credentials keys."""
|
||||||
|
heartbeat = get_heartbeat_module()
|
||||||
|
payload = await heartbeat.build_heartbeat_payload(make_app())
|
||||||
|
flat = json.dumps(payload).lower()
|
||||||
|
for forbidden in ('api_key', 'password', 'token', 'message_content'):
|
||||||
|
assert forbidden not in flat
|
||||||
8
uv.lock
generated
8
uv.lock
generated
@@ -2029,7 +2029,7 @@ requires-dist = [
|
|||||||
{ name = "ebooklib", specifier = ">=0.18" },
|
{ name = "ebooklib", specifier = ">=0.18" },
|
||||||
{ name = "gewechat-client", specifier = ">=0.1.5" },
|
{ name = "gewechat-client", specifier = ">=0.1.5" },
|
||||||
{ name = "html2text", specifier = ">=2024.2.26" },
|
{ name = "html2text", specifier = ">=2024.2.26" },
|
||||||
{ name = "langbot-plugin", specifier = "==0.4.2" },
|
{ name = "langbot-plugin", specifier = "==0.4.3" },
|
||||||
{ name = "langchain", specifier = ">=0.2.0" },
|
{ name = "langchain", specifier = ">=0.2.0" },
|
||||||
{ name = "langchain-core", specifier = ">=1.3.3" },
|
{ name = "langchain-core", specifier = ">=1.3.3" },
|
||||||
{ name = "langchain-text-splitters", specifier = ">=1.1.2" },
|
{ name = "langchain-text-splitters", specifier = ">=1.1.2" },
|
||||||
@@ -2092,7 +2092,7 @@ dev = [
|
|||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
name = "langbot-plugin"
|
name = "langbot-plugin"
|
||||||
version = "0.4.2"
|
version = "0.4.3"
|
||||||
source = { registry = "https://pypi.org/simple" }
|
source = { registry = "https://pypi.org/simple" }
|
||||||
dependencies = [
|
dependencies = [
|
||||||
{ name = "aiofiles" },
|
{ name = "aiofiles" },
|
||||||
@@ -2113,9 +2113,9 @@ dependencies = [
|
|||||||
{ name = "watchdog" },
|
{ name = "watchdog" },
|
||||||
{ name = "websockets" },
|
{ name = "websockets" },
|
||||||
]
|
]
|
||||||
sdist = { url = "https://files.pythonhosted.org/packages/0d/ea/8a26bc399ae9aff0d99fb03b239f08ba79f211bed053dda479f01e08cef8/langbot_plugin-0.4.2.tar.gz", hash = "sha256:c6e247481f68e60aaafc30deabcd9a48b65269bcc99e1962a9df1e5d61a7de3d", size = 305407, upload-time = "2026-06-09T13:50:27.007Z" }
|
sdist = { url = "https://files.pythonhosted.org/packages/8e/f1/32ec67e8b8eb91159d2b9703f466cc2a763c8cea380dd56561efe793a55b/langbot_plugin-0.4.3.tar.gz", hash = "sha256:747fb78bc666cfac3842cb35130fa8323759dd8768fdaa1975099157a3749c6e", size = 309655, upload-time = "2026-06-13T04:58:10.279Z" }
|
||||||
wheels = [
|
wheels = [
|
||||||
{ url = "https://files.pythonhosted.org/packages/de/1a/21c078ca309fbc6842548153545da434408035ea41eb8421fda5f9716dfe/langbot_plugin-0.4.2-py3-none-any.whl", hash = "sha256:3f2510f19c5cbdb025aeb52b057a17309cc694c48d8220b7dad7a66981a26b37", size = 210399, upload-time = "2026-06-09T13:50:28.347Z" },
|
{ url = "https://files.pythonhosted.org/packages/28/05/84bd7537efd45fc02044ca9509973160a7d6d10520ff73e31424141a3a6c/langbot_plugin-0.4.3-py3-none-any.whl", hash = "sha256:46aca36e2193c18f9cf332460760dd7b9340ee2e96a57f2e4ae621c4d4c4b61c", size = 211384, upload-time = "2026-06-13T04:58:11.668Z" },
|
||||||
]
|
]
|
||||||
|
|
||||||
[[package]]
|
[[package]]
|
||||||
|
|||||||
@@ -450,6 +450,9 @@ const MCPForm = forwardRef<MCPFormHandle, MCPFormProps>(function MCPForm(
|
|||||||
testMcp: () => testMcp(),
|
testMcp: () => testMcp(),
|
||||||
isTesting: mcpTesting,
|
isTesting: mcpTesting,
|
||||||
}),
|
}),
|
||||||
|
// testMcp now reads everything via form.getValues(), so it does not need
|
||||||
|
// the latest stdioArgs/extraArgs closure — but keep mcpTesting so the
|
||||||
|
// exposed isTesting flag stays accurate.
|
||||||
[mcpTesting],
|
[mcpTesting],
|
||||||
);
|
);
|
||||||
|
|
||||||
@@ -680,6 +683,17 @@ const MCPForm = forwardRef<MCPFormHandle, MCPFormProps>(function MCPForm(
|
|||||||
|
|
||||||
try {
|
try {
|
||||||
const mode = form.getValues('mode');
|
const mode = form.getValues('mode');
|
||||||
|
// Read every field via form.getValues() rather than the captured
|
||||||
|
// `stdioArgs` / `extraArgs` state. testMcp() is invoked through an
|
||||||
|
// imperative handle (formRef.current.testMcp()) whose closure is only
|
||||||
|
// refreshed when [mcpTesting] changes, so reading the React state here
|
||||||
|
// would use a stale snapshot — on the detail page that snapshot is the
|
||||||
|
// empty initial [], which dropped stdio args entirely and launched
|
||||||
|
// `uvx` with no package (exit 2 / "Connection closed", no detail).
|
||||||
|
// The form values are kept in sync on every edit and on load, so they
|
||||||
|
// are always current.
|
||||||
|
const formExtraArgs = form.getValues('extra_args') ?? [];
|
||||||
|
const formStdioArgs = form.getValues('args') ?? [];
|
||||||
let extraArgsData:
|
let extraArgsData:
|
||||||
| MCPServerExtraArgsSSE
|
| MCPServerExtraArgsSSE
|
||||||
| MCPServerExtraArgsHttp
|
| MCPServerExtraArgsHttp
|
||||||
@@ -690,7 +704,7 @@ const MCPForm = forwardRef<MCPFormHandle, MCPFormProps>(function MCPForm(
|
|||||||
url: form.getValues('url')!,
|
url: form.getValues('url')!,
|
||||||
timeout: form.getValues('timeout'),
|
timeout: form.getValues('timeout'),
|
||||||
headers: Object.fromEntries(
|
headers: Object.fromEntries(
|
||||||
extraArgs.map((arg) => [arg.key, arg.value]),
|
formExtraArgs.map((arg) => [arg.key, arg.value]),
|
||||||
),
|
),
|
||||||
ssereadtimeout: form.getValues('ssereadtimeout'),
|
ssereadtimeout: form.getValues('ssereadtimeout'),
|
||||||
};
|
};
|
||||||
@@ -699,14 +713,16 @@ const MCPForm = forwardRef<MCPFormHandle, MCPFormProps>(function MCPForm(
|
|||||||
url: form.getValues('url')!,
|
url: form.getValues('url')!,
|
||||||
timeout: form.getValues('timeout'),
|
timeout: form.getValues('timeout'),
|
||||||
headers: Object.fromEntries(
|
headers: Object.fromEntries(
|
||||||
extraArgs.map((arg) => [arg.key, arg.value]),
|
formExtraArgs.map((arg) => [arg.key, arg.value]),
|
||||||
),
|
),
|
||||||
};
|
};
|
||||||
} else {
|
} else {
|
||||||
extraArgsData = {
|
extraArgsData = {
|
||||||
command: form.getValues('command')!,
|
command: form.getValues('command')!,
|
||||||
args: stdioArgs.map((arg) => arg.value),
|
args: formStdioArgs.map((arg) => arg.value),
|
||||||
env: Object.fromEntries(extraArgs.map((arg) => [arg.key, arg.value])),
|
env: Object.fromEntries(
|
||||||
|
formExtraArgs.map((arg) => [arg.key, arg.value]),
|
||||||
|
),
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user