mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-18 11:44:18 +00:00
Compare commits
3 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| 3b3b09331a | |||
| 75e5af26d0 | |||
| 22c0a18bea |
@@ -313,30 +313,18 @@ class MonitoringRouterGroup(group.RouterGroup):
|
||||
offset=0,
|
||||
)
|
||||
|
||||
# Get traces
|
||||
traces, traces_total = await self.ap.monitoring_service.get_traces(
|
||||
bot_ids=bot_ids if bot_ids else None,
|
||||
pipeline_ids=pipeline_ids if pipeline_ids else None,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
limit=limit,
|
||||
offset=0,
|
||||
)
|
||||
|
||||
return self.success(
|
||||
data={
|
||||
'overview': overview,
|
||||
'messages': messages,
|
||||
'llmCalls': llm_calls,
|
||||
'embeddingCalls': embedding_calls,
|
||||
'traces': traces,
|
||||
'sessions': sessions,
|
||||
'errors': errors,
|
||||
'totalCount': {
|
||||
'messages': messages_total,
|
||||
'llmCalls': llm_calls_total,
|
||||
'embeddingCalls': embedding_calls_total,
|
||||
'traces': traces_total,
|
||||
'sessions': sessions_total,
|
||||
'errors': errors_total,
|
||||
},
|
||||
@@ -362,49 +350,6 @@ class MonitoringRouterGroup(group.RouterGroup):
|
||||
|
||||
return self.success(data=details)
|
||||
|
||||
@self.route('/traces', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
|
||||
async def get_traces() -> str:
|
||||
"""Get end-to-end trace records."""
|
||||
bot_ids = quart.request.args.getlist('botId')
|
||||
pipeline_ids = quart.request.args.getlist('pipelineId')
|
||||
session_ids = quart.request.args.getlist('sessionId')
|
||||
statuses = quart.request.args.getlist('status')
|
||||
start_time_str = quart.request.args.get('startTime')
|
||||
end_time_str = quart.request.args.get('endTime')
|
||||
limit = int(quart.request.args.get('limit', 100))
|
||||
offset = int(quart.request.args.get('offset', 0))
|
||||
|
||||
start_time = parse_iso_datetime(start_time_str)
|
||||
end_time = parse_iso_datetime(end_time_str)
|
||||
|
||||
traces, total = await self.ap.monitoring_service.get_traces(
|
||||
bot_ids=bot_ids if bot_ids else None,
|
||||
pipeline_ids=pipeline_ids if pipeline_ids else None,
|
||||
session_ids=session_ids if session_ids else None,
|
||||
statuses=statuses if statuses else None,
|
||||
start_time=start_time,
|
||||
end_time=end_time,
|
||||
limit=limit,
|
||||
offset=offset,
|
||||
)
|
||||
|
||||
return self.success(
|
||||
data={
|
||||
'traces': traces,
|
||||
'total': total,
|
||||
'limit': limit,
|
||||
'offset': offset,
|
||||
}
|
||||
)
|
||||
|
||||
@self.route('/traces/<trace_id>', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
|
||||
async def get_trace_details(trace_id: str) -> str:
|
||||
"""Get one trace with all spans."""
|
||||
details = await self.ap.monitoring_service.get_trace_details(trace_id)
|
||||
if not details.get('found'):
|
||||
return self.http_status(404, -1, f'Trace {trace_id} not found')
|
||||
return self.success(data=details)
|
||||
|
||||
@self.route('/export', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
|
||||
async def export_data() -> tuple[str, int]:
|
||||
"""Export monitoring data as CSV"""
|
||||
|
||||
@@ -350,24 +350,8 @@ class PluginsRouterGroup(group.RouterGroup):
|
||||
if not endpoint.startswith('/') or '..' in endpoint:
|
||||
return self.http_status(400, -1, 'invalid endpoint')
|
||||
|
||||
caller = {
|
||||
'plugin_author': author,
|
||||
'plugin_name': plugin_name,
|
||||
'page_id': page_id,
|
||||
'origin': _get_request_origin(),
|
||||
}
|
||||
headers = {
|
||||
key: value
|
||||
for key, value in {
|
||||
'user-agent': quart.request.headers.get('User-Agent'),
|
||||
'x-request-id': quart.request.headers.get('X-Request-ID'),
|
||||
'x-forwarded-for': quart.request.headers.get('X-Forwarded-For'),
|
||||
}.items()
|
||||
if value
|
||||
}
|
||||
|
||||
result = await self.ap.plugin_connector.handle_page_api(
|
||||
author, plugin_name, page_id, endpoint, method.upper(), body, caller, headers
|
||||
author, plugin_name, page_id, endpoint, method.upper(), body
|
||||
)
|
||||
if result.get('error'):
|
||||
return self.http_status(400, -1, result['error'])
|
||||
|
||||
@@ -3,55 +3,11 @@ from __future__ import annotations
|
||||
import uuid
|
||||
import datetime
|
||||
import sqlalchemy
|
||||
import json
|
||||
|
||||
from ....core import app
|
||||
from ....entity.persistence import monitoring as persistence_monitoring
|
||||
|
||||
|
||||
# TODO: Move shared trace/time helpers into a small monitoring utility module
|
||||
# when trace propagation expands beyond the current query/retrieval path.
|
||||
def _utc_now() -> datetime.datetime:
|
||||
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
|
||||
|
||||
|
||||
def _json_dumps(value: dict | list | None) -> str | None:
|
||||
if value is None:
|
||||
return None
|
||||
try:
|
||||
return json.dumps(value, ensure_ascii=False, default=str)
|
||||
except Exception:
|
||||
return json.dumps({'serialization_error': str(value)}, ensure_ascii=False)
|
||||
|
||||
|
||||
def _json_loads(value: str | None) -> dict | list | None:
|
||||
if not value:
|
||||
return None
|
||||
try:
|
||||
return json.loads(value)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def new_trace_id() -> str:
|
||||
return f'trace-{uuid.uuid4().hex[:16]}'
|
||||
|
||||
|
||||
def new_span_id() -> str:
|
||||
return f'span-{uuid.uuid4().hex[:16]}'
|
||||
|
||||
|
||||
def normalize_trace_status(status: str | None) -> str:
|
||||
"""Normalize operation status to the monitoring UI vocabulary."""
|
||||
if status in ('completed', 'ok'):
|
||||
return 'success'
|
||||
if status in ('failed', 'failure', 'exception'):
|
||||
return 'error'
|
||||
if status in ('running', 'success', 'error'):
|
||||
return status
|
||||
return 'success'
|
||||
|
||||
|
||||
class MonitoringService:
|
||||
"""Monitoring service"""
|
||||
|
||||
@@ -118,18 +74,6 @@ class MonitoringService:
|
||||
persistence_monitoring.MonitoringFeedback.timestamp,
|
||||
persistence_monitoring.MonitoringFeedback.id,
|
||||
),
|
||||
(
|
||||
'monitoring_traces',
|
||||
persistence_monitoring.MonitoringTrace,
|
||||
persistence_monitoring.MonitoringTrace.started_at,
|
||||
persistence_monitoring.MonitoringTrace.trace_id,
|
||||
),
|
||||
(
|
||||
'monitoring_spans',
|
||||
persistence_monitoring.MonitoringSpan,
|
||||
persistence_monitoring.MonitoringSpan.started_at,
|
||||
persistence_monitoring.MonitoringSpan.span_id,
|
||||
),
|
||||
]
|
||||
|
||||
deleted_counts: dict[str, int] = {}
|
||||
@@ -189,116 +133,6 @@ class MonitoringService:
|
||||
|
||||
# ========== Recording Methods ==========
|
||||
|
||||
async def start_trace(
|
||||
self,
|
||||
trace_id: str | None = None,
|
||||
name: str = 'LangBot query',
|
||||
bot_id: str | None = None,
|
||||
bot_name: str | None = None,
|
||||
pipeline_id: str | None = None,
|
||||
pipeline_name: str | None = None,
|
||||
session_id: str | None = None,
|
||||
message_id: str | None = None,
|
||||
query_id: str | int | None = None,
|
||||
attributes: dict | None = None,
|
||||
) -> str:
|
||||
"""Create or update a trace header row."""
|
||||
trace_id = trace_id or new_trace_id()
|
||||
trace_data = {
|
||||
'trace_id': trace_id,
|
||||
'started_at': _utc_now(),
|
||||
'ended_at': None,
|
||||
'duration': None,
|
||||
'status': 'running',
|
||||
'name': name,
|
||||
'bot_id': bot_id,
|
||||
'bot_name': bot_name,
|
||||
'pipeline_id': pipeline_id,
|
||||
'pipeline_name': pipeline_name,
|
||||
'session_id': session_id,
|
||||
'message_id': message_id,
|
||||
'query_id': str(query_id) if query_id is not None else None,
|
||||
'attributes': _json_dumps(attributes),
|
||||
}
|
||||
|
||||
await self.ap.persistence_mgr.execute_async(
|
||||
sqlalchemy.insert(persistence_monitoring.MonitoringTrace).values(trace_data)
|
||||
)
|
||||
return trace_id
|
||||
|
||||
async def finish_trace(
|
||||
self,
|
||||
trace_id: str,
|
||||
status: str = 'success',
|
||||
duration: int | None = None,
|
||||
message_id: str | None = None,
|
||||
attributes: dict | None = None,
|
||||
) -> None:
|
||||
"""Mark a trace complete."""
|
||||
update_values: dict = {
|
||||
'ended_at': _utc_now(),
|
||||
'status': normalize_trace_status(status),
|
||||
}
|
||||
if duration is not None:
|
||||
update_values['duration'] = duration
|
||||
if message_id is not None:
|
||||
update_values['message_id'] = message_id
|
||||
if attributes is not None:
|
||||
update_values['attributes'] = _json_dumps(attributes)
|
||||
|
||||
await self.ap.persistence_mgr.execute_async(
|
||||
sqlalchemy.update(persistence_monitoring.MonitoringTrace)
|
||||
.where(persistence_monitoring.MonitoringTrace.trace_id == trace_id)
|
||||
.values(update_values)
|
||||
)
|
||||
|
||||
async def record_span(
|
||||
self,
|
||||
trace_id: str,
|
||||
name: str,
|
||||
kind: str,
|
||||
status: str = 'success',
|
||||
span_id: str | None = None,
|
||||
parent_span_id: str | None = None,
|
||||
started_at: datetime.datetime | None = None,
|
||||
ended_at: datetime.datetime | None = None,
|
||||
duration: int | None = None,
|
||||
message_id: str | None = None,
|
||||
session_id: str | None = None,
|
||||
bot_id: str | None = None,
|
||||
pipeline_id: str | None = None,
|
||||
attributes: dict | None = None,
|
||||
error_message: str | None = None,
|
||||
) -> str:
|
||||
"""Record a single completed span."""
|
||||
started_at = started_at or _utc_now()
|
||||
if duration is None and ended_at is not None:
|
||||
duration = int((ended_at - started_at).total_seconds() * 1000)
|
||||
elif duration is not None:
|
||||
duration = int(round(float(duration)))
|
||||
span_data = {
|
||||
'span_id': span_id or new_span_id(),
|
||||
'trace_id': trace_id,
|
||||
'parent_span_id': parent_span_id,
|
||||
'name': name,
|
||||
'kind': kind,
|
||||
'status': normalize_trace_status(status),
|
||||
'started_at': started_at,
|
||||
'ended_at': ended_at or _utc_now(),
|
||||
'duration': duration,
|
||||
'message_id': message_id,
|
||||
'session_id': session_id,
|
||||
'bot_id': bot_id,
|
||||
'pipeline_id': pipeline_id,
|
||||
'attributes': _json_dumps(attributes),
|
||||
'error_message': error_message,
|
||||
}
|
||||
|
||||
await self.ap.persistence_mgr.execute_async(
|
||||
sqlalchemy.insert(persistence_monitoring.MonitoringSpan).values(span_data)
|
||||
)
|
||||
return span_data['span_id']
|
||||
|
||||
async def record_message(
|
||||
self,
|
||||
bot_id: str,
|
||||
@@ -1242,19 +1076,6 @@ class MonitoringService:
|
||||
for row in error_rows
|
||||
]
|
||||
|
||||
trace_query = (
|
||||
sqlalchemy.select(persistence_monitoring.MonitoringTrace)
|
||||
.where(persistence_monitoring.MonitoringTrace.message_id == message_id)
|
||||
.order_by(persistence_monitoring.MonitoringTrace.started_at.desc())
|
||||
.limit(1)
|
||||
)
|
||||
trace_result = await self.ap.persistence_mgr.execute_async(trace_query)
|
||||
trace_row = trace_result.first()
|
||||
trace = None
|
||||
if trace_row:
|
||||
trace_model = trace_row[0] if isinstance(trace_row, tuple) else trace_row
|
||||
trace = self._serialize_trace(trace_model)
|
||||
|
||||
return {
|
||||
'message_id': message_id,
|
||||
'found': True,
|
||||
@@ -1269,84 +1090,6 @@ class MonitoringService:
|
||||
'average_duration_ms': int(total_duration / len(llm_rows)) if len(llm_rows) > 0 else 0,
|
||||
},
|
||||
'errors': errors,
|
||||
'trace': trace,
|
||||
}
|
||||
|
||||
def _serialize_trace(self, trace: persistence_monitoring.MonitoringTrace) -> dict:
|
||||
data = self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringTrace, trace)
|
||||
data['attributes'] = _json_loads(data.get('attributes')) or {}
|
||||
return data
|
||||
|
||||
def _serialize_span(self, span: persistence_monitoring.MonitoringSpan) -> dict:
|
||||
data = self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringSpan, span)
|
||||
data['attributes'] = _json_loads(data.get('attributes')) or {}
|
||||
return data
|
||||
|
||||
async def get_traces(
|
||||
self,
|
||||
bot_ids: list[str] | None = None,
|
||||
pipeline_ids: list[str] | None = None,
|
||||
session_ids: list[str] | None = None,
|
||||
statuses: list[str] | None = None,
|
||||
start_time: datetime.datetime | None = None,
|
||||
end_time: datetime.datetime | None = None,
|
||||
limit: int = 100,
|
||||
offset: int = 0,
|
||||
) -> tuple[list[dict], int]:
|
||||
"""Get trace headers with filters."""
|
||||
conditions = []
|
||||
if bot_ids:
|
||||
conditions.append(persistence_monitoring.MonitoringTrace.bot_id.in_(bot_ids))
|
||||
if pipeline_ids:
|
||||
conditions.append(persistence_monitoring.MonitoringTrace.pipeline_id.in_(pipeline_ids))
|
||||
if session_ids:
|
||||
conditions.append(persistence_monitoring.MonitoringTrace.session_id.in_(session_ids))
|
||||
if statuses:
|
||||
conditions.append(persistence_monitoring.MonitoringTrace.status.in_(statuses))
|
||||
if start_time:
|
||||
conditions.append(persistence_monitoring.MonitoringTrace.started_at >= start_time)
|
||||
if end_time:
|
||||
conditions.append(persistence_monitoring.MonitoringTrace.started_at <= end_time)
|
||||
|
||||
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringTrace.trace_id))
|
||||
query = sqlalchemy.select(persistence_monitoring.MonitoringTrace)
|
||||
if conditions:
|
||||
clause = sqlalchemy.and_(*conditions)
|
||||
count_query = count_query.where(clause)
|
||||
query = query.where(clause)
|
||||
|
||||
total_result = await self.ap.persistence_mgr.execute_async(count_query)
|
||||
total = total_result.scalar() or 0
|
||||
|
||||
query = query.order_by(persistence_monitoring.MonitoringTrace.started_at.desc()).limit(limit).offset(offset)
|
||||
result = await self.ap.persistence_mgr.execute_async(query)
|
||||
traces = [self._serialize_trace(row[0] if isinstance(row, tuple) else row) for row in result.all()]
|
||||
return traces, total
|
||||
|
||||
async def get_trace_details(self, trace_id: str) -> dict:
|
||||
"""Get a single trace and all spans in chronological order."""
|
||||
trace_query = sqlalchemy.select(persistence_monitoring.MonitoringTrace).where(
|
||||
persistence_monitoring.MonitoringTrace.trace_id == trace_id
|
||||
)
|
||||
trace_result = await self.ap.persistence_mgr.execute_async(trace_query)
|
||||
trace_row = trace_result.first()
|
||||
if not trace_row:
|
||||
return {'trace_id': trace_id, 'found': False}
|
||||
|
||||
trace = trace_row[0] if isinstance(trace_row, tuple) else trace_row
|
||||
span_query = (
|
||||
sqlalchemy.select(persistence_monitoring.MonitoringSpan)
|
||||
.where(persistence_monitoring.MonitoringSpan.trace_id == trace_id)
|
||||
.order_by(persistence_monitoring.MonitoringSpan.started_at.asc())
|
||||
)
|
||||
span_result = await self.ap.persistence_mgr.execute_async(span_query)
|
||||
spans = [self._serialize_span(row[0] if isinstance(row, tuple) else row) for row in span_result.all()]
|
||||
|
||||
return {
|
||||
'trace_id': trace_id,
|
||||
'found': True,
|
||||
'trace': self._serialize_trace(trace),
|
||||
'spans': spans,
|
||||
}
|
||||
|
||||
# ========== Export Methods ==========
|
||||
|
||||
@@ -335,6 +335,428 @@ class BoxService:
|
||||
|
||||
return await self.execute_spec_payload(spec_payload, query)
|
||||
|
||||
# ── Attachment passthrough (inbound / outbound) ──────────────────
|
||||
#
|
||||
# IM/webchat attachments (images, voices, files) reach the LLM as
|
||||
# multimodal content, but historically never landed on the sandbox
|
||||
# filesystem, so the agent's exec/read/write tools could not operate on
|
||||
# them. Conversely, files the agent produced inside the sandbox were
|
||||
# never surfaced back to the user. These two helpers close both gaps:
|
||||
#
|
||||
# inbound : message_chain attachments -> /workspace/inbox/<query_id>/
|
||||
# outbound : /workspace/outbox/<query_id>/ -> reply MessageChain
|
||||
#
|
||||
# Transfer prefers DIRECT HOST FILESYSTEM access to the bind-mounted
|
||||
# workspace (default_workspace on the host maps to /workspace inside the
|
||||
# container), which has no size limit. This covers the local docker /
|
||||
# nsjail / stdio backends. For backends where the workspace is NOT visible
|
||||
# on the LangBot host (E2B, an external remote runtime.endpoint), it falls
|
||||
# back to a base64-through-exec round-trip. The exec channel can only move
|
||||
# small files reliably — the docker backend passes the command as a single
|
||||
# argv (ARG_MAX) and exec stdout is truncated by output_limit_chars — so
|
||||
# the host path is strongly preferred and used whenever available.
|
||||
|
||||
INBOX_MOUNT_DIR = '/workspace/inbox'
|
||||
OUTBOX_MOUNT_DIR = '/workspace/outbox'
|
||||
INBOX_SUBDIR = 'inbox'
|
||||
OUTBOX_SUBDIR = 'outbox'
|
||||
# Hard cap on a single attachment. The HTTP upload endpoints already cap
|
||||
# uploads at 10MiB; keep parity.
|
||||
_ATTACHMENT_MAX_BYTES = 10 * _MIB
|
||||
# Conservative cap for the exec FALLBACK path only (ARG_MAX / stdout
|
||||
# truncation). The host-filesystem path has no such limit.
|
||||
_EXEC_FALLBACK_MAX_BYTES = 256 * 1024
|
||||
|
||||
def _host_query_dir(self, subdir: str, query_id) -> str | None:
|
||||
"""Host path for ``/workspace/<subdir>/<query_id>`` when LangBot can
|
||||
access the bind-mounted workspace directly, else ``None``.
|
||||
|
||||
``default_workspace`` is the host directory bind-mounted to
|
||||
``/workspace`` for the local docker/nsjail backends and shared
|
||||
outright in stdio mode, so a file written there by LangBot is visible
|
||||
to the sandbox (and vice-versa). It is ``None`` / not a local dir for
|
||||
E2B and remote runtimes, where we must fall back to the exec channel.
|
||||
"""
|
||||
root = self.default_workspace
|
||||
if not root or not os.path.isdir(root):
|
||||
return None
|
||||
return os.path.join(root, subdir, str(query_id))
|
||||
|
||||
@staticmethod
|
||||
def _sanitize_attachment_name(name: str, fallback: str) -> str:
|
||||
"""Reduce an arbitrary attachment name to a safe basename.
|
||||
|
||||
Strips directory separators and parent refs so a crafted file name
|
||||
can never escape the inbox/outbox directory.
|
||||
"""
|
||||
base = os.path.basename(str(name or '').replace('\\', '/').strip())
|
||||
base = base.lstrip('.') or ''
|
||||
# Drop anything that is not a conservative filename charset.
|
||||
cleaned = ''.join(c for c in base if c.isalnum() or c in ('.', '_', '-', ' ')).strip()
|
||||
cleaned = cleaned.replace(' ', '_')
|
||||
return cleaned or fallback
|
||||
|
||||
@staticmethod
|
||||
async def _component_to_bytes(component) -> tuple[bytes, str] | None:
|
||||
"""Best-effort extraction of (bytes, mime) from a platform component.
|
||||
|
||||
Handles base64, http(s) url and local path sources. Returns None when
|
||||
no payload can be resolved.
|
||||
"""
|
||||
import base64 as _b64
|
||||
|
||||
b64 = getattr(component, 'base64', None)
|
||||
if b64:
|
||||
data = b64
|
||||
mime = 'application/octet-stream'
|
||||
if isinstance(data, str) and data.startswith('data:'):
|
||||
split_index = data.find(';base64,')
|
||||
if split_index != -1:
|
||||
mime = data[5:split_index]
|
||||
data = data[split_index + 8 :]
|
||||
try:
|
||||
return _b64.b64decode(data), mime
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
url = getattr(component, 'url', None)
|
||||
if url:
|
||||
try:
|
||||
import httpx
|
||||
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
resp = await client.get(url)
|
||||
resp.raise_for_status()
|
||||
return resp.content, resp.headers.get('Content-Type', 'application/octet-stream')
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
path = getattr(component, 'path', None)
|
||||
if path:
|
||||
try:
|
||||
import aiofiles
|
||||
|
||||
async with aiofiles.open(path, 'rb') as f:
|
||||
return await f.read(), 'application/octet-stream'
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
return None
|
||||
|
||||
async def _write_files_into_sandbox(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
subdir: str,
|
||||
target_mount_dir: str,
|
||||
files: list[tuple[str, bytes]],
|
||||
) -> list[str]:
|
||||
"""Write *files* (name, bytes) into the per-query directory.
|
||||
|
||||
Prefers a direct host-filesystem write to the bind-mounted workspace
|
||||
(no size limit). Falls back to a base64-through-exec round-trip only
|
||||
when the workspace is not visible on the LangBot host (E2B / remote).
|
||||
Returns the list of in-sandbox paths actually written.
|
||||
"""
|
||||
if not files:
|
||||
return []
|
||||
|
||||
host_dir = self._host_query_dir(subdir, query.query_id)
|
||||
if host_dir is not None:
|
||||
return await asyncio.to_thread(self._write_files_host, host_dir, target_mount_dir, files)
|
||||
|
||||
return await self._write_files_via_exec(query, target_mount_dir, files)
|
||||
|
||||
def _write_files_host(
|
||||
self,
|
||||
host_dir: str,
|
||||
target_mount_dir: str,
|
||||
files: list[tuple[str, bytes]],
|
||||
) -> list[str]:
|
||||
"""Write attachments straight onto the bind-mounted host directory.
|
||||
|
||||
Recreates the per-query directory from scratch so a reused query_id
|
||||
(the webchat session uses small sequential ids) never inherits stale
|
||||
files from an earlier turn.
|
||||
"""
|
||||
import shutil
|
||||
|
||||
shutil.rmtree(host_dir, ignore_errors=True)
|
||||
os.makedirs(host_dir, exist_ok=True)
|
||||
written: list[str] = []
|
||||
for name, data in files:
|
||||
with open(os.path.join(host_dir, name), 'wb') as fh:
|
||||
fh.write(data)
|
||||
written.append(f'{target_mount_dir}/{name}')
|
||||
return written
|
||||
|
||||
async def _write_files_via_exec(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
target_dir: str,
|
||||
files: list[tuple[str, bytes]],
|
||||
) -> list[str]:
|
||||
"""Fallback: ship files into the sandbox over the exec channel.
|
||||
|
||||
Only used for backends without host-filesystem access (E2B / remote).
|
||||
Each file is base64-decoded inside the sandbox. Files larger than the
|
||||
conservative exec cap are skipped (ARG_MAX / stdout limits).
|
||||
"""
|
||||
import base64 as _b64
|
||||
import json as _json
|
||||
|
||||
manifest = []
|
||||
for name, data in files:
|
||||
if len(data) > self._EXEC_FALLBACK_MAX_BYTES:
|
||||
self.ap.logger.warning(
|
||||
f'Attachment "{name}" ({len(data)} bytes) exceeds the exec-channel '
|
||||
f'fallback limit ({self._EXEC_FALLBACK_MAX_BYTES} bytes); skipping. '
|
||||
f'Configure a host-shared workspace to transfer large files.'
|
||||
)
|
||||
continue
|
||||
manifest.append({'name': name, 'b64': _b64.b64encode(data).decode('ascii')})
|
||||
if not manifest:
|
||||
return []
|
||||
|
||||
manifest_b64 = _b64.b64encode(_json.dumps(manifest).encode('utf-8')).decode('ascii')
|
||||
script = (
|
||||
'import base64, json, os, shutil\n'
|
||||
f'target = {target_dir!r}\n'
|
||||
'shutil.rmtree(target, ignore_errors=True)\n'
|
||||
'os.makedirs(target, exist_ok=True)\n'
|
||||
f'manifest = json.loads(base64.b64decode({manifest_b64!r}))\n'
|
||||
'written = []\n'
|
||||
'for item in manifest:\n'
|
||||
" p = os.path.join(target, item['name'])\n"
|
||||
" with open(p, 'wb') as f:\n"
|
||||
" f.write(base64.b64decode(item['b64']))\n"
|
||||
' written.append(p)\n'
|
||||
'print(json.dumps(written))\n'
|
||||
)
|
||||
result = await self.execute_tool(
|
||||
{'command': f"python3 - <<'LBPY'\n{script}\nLBPY", 'timeout_sec': 120},
|
||||
query,
|
||||
)
|
||||
if not result.get('ok'):
|
||||
self.ap.logger.warning(
|
||||
f'Failed to write inbound attachments into sandbox via exec: '
|
||||
f'query_id={query.query_id} stderr={result.get("stderr", "")[:200]}'
|
||||
)
|
||||
return []
|
||||
try:
|
||||
return _json.loads(str(result.get('stdout') or '').strip().splitlines()[-1])
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
async def materialize_inbound_attachments(self, query: pipeline_query.Query) -> list[dict]:
|
||||
"""Persist message-chain attachments into the sandbox inbox.
|
||||
|
||||
Returns a list of ``{path, name, type, size}`` describing what was
|
||||
written, so the runner can tell the LLM the exact in-sandbox paths.
|
||||
Returns ``[]`` when sandbox is unavailable or there are no attachments.
|
||||
"""
|
||||
if not self._available:
|
||||
return []
|
||||
|
||||
import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
||||
|
||||
message_chain = getattr(query, 'message_chain', None)
|
||||
if not message_chain:
|
||||
return []
|
||||
|
||||
type_map = [
|
||||
(platform_message.Image, 'Image', 'image', 'png'),
|
||||
(platform_message.Voice, 'Voice', 'voice', 'wav'),
|
||||
(platform_message.File, 'File', 'file', 'bin'),
|
||||
]
|
||||
|
||||
pending: list[tuple[str, bytes]] = []
|
||||
descriptors: list[dict] = []
|
||||
index = 0
|
||||
for component in message_chain:
|
||||
matched = None
|
||||
for cls, kind, prefix, default_ext in type_map:
|
||||
if isinstance(component, cls):
|
||||
matched = (kind, prefix, default_ext)
|
||||
break
|
||||
if matched is None:
|
||||
continue
|
||||
kind, prefix, default_ext = matched
|
||||
|
||||
payload = await self._component_to_bytes(component)
|
||||
if payload is None:
|
||||
continue
|
||||
data, _mime = payload
|
||||
if not data or len(data) > self._ATTACHMENT_MAX_BYTES:
|
||||
continue
|
||||
|
||||
index += 1
|
||||
raw_name = getattr(component, 'name', None) or f'{prefix}_{index}.{default_ext}'
|
||||
safe_name = self._sanitize_attachment_name(raw_name, f'{prefix}_{index}.{default_ext}')
|
||||
pending.append((safe_name, data))
|
||||
descriptors.append(
|
||||
{
|
||||
'name': safe_name,
|
||||
'type': kind,
|
||||
'size': len(data),
|
||||
}
|
||||
)
|
||||
|
||||
if not pending:
|
||||
return []
|
||||
|
||||
target_dir = f'{self.INBOX_MOUNT_DIR}/{query.query_id}'
|
||||
written = await self._write_files_into_sandbox(query, self.INBOX_SUBDIR, target_dir, pending)
|
||||
written_basenames = {os.path.basename(p) for p in written}
|
||||
|
||||
result: list[dict] = []
|
||||
for desc in descriptors:
|
||||
if desc['name'] in written_basenames:
|
||||
desc['path'] = f'{target_dir}/{desc["name"]}'
|
||||
result.append(desc)
|
||||
if result:
|
||||
self.ap.logger.info(
|
||||
f'Materialized {len(result)} inbound attachment(s) into sandbox: '
|
||||
f'query_id={query.query_id} dir={target_dir}'
|
||||
)
|
||||
return result
|
||||
|
||||
async def collect_outbound_attachments(self, query: pipeline_query.Query) -> list[dict]:
|
||||
"""Collect files the agent produced in the sandbox outbox.
|
||||
|
||||
Reads ``/workspace/outbox/<query_id>/`` (recursively) — directly from
|
||||
the bind-mounted host directory when available (no size limit), else
|
||||
via the exec channel — returns a list of ``{type, name, base64}``
|
||||
ready to become platform message components, then clears the outbox so
|
||||
a later turn in the same session does not re-send stale files. Returns
|
||||
``[]`` when nothing was produced.
|
||||
"""
|
||||
if not self._available:
|
||||
return []
|
||||
|
||||
host_dir = self._host_query_dir(self.OUTBOX_SUBDIR, query.query_id)
|
||||
if host_dir is not None:
|
||||
entries = await asyncio.to_thread(self._read_outbox_host, host_dir)
|
||||
else:
|
||||
entries = await self._read_outbox_via_exec(query)
|
||||
|
||||
attachments = self._classify_outbound_entries(entries)
|
||||
|
||||
if attachments:
|
||||
await self._clear_outbox(query, host_dir)
|
||||
self.ap.logger.info(
|
||||
f'Collected {len(attachments)} outbound attachment(s) from sandbox: query_id={query.query_id}'
|
||||
)
|
||||
return attachments
|
||||
|
||||
def _read_outbox_host(self, host_dir: str) -> list[dict]:
|
||||
"""Read outbox files straight off the bind-mounted host directory."""
|
||||
import base64 as _b64
|
||||
|
||||
entries: list[dict] = []
|
||||
if not os.path.isdir(host_dir):
|
||||
return entries
|
||||
for root, _dirs, names in os.walk(host_dir):
|
||||
for name in sorted(names):
|
||||
path = os.path.join(root, name)
|
||||
try:
|
||||
if os.path.getsize(path) > self._ATTACHMENT_MAX_BYTES:
|
||||
continue
|
||||
with open(path, 'rb') as fh:
|
||||
data = fh.read()
|
||||
except OSError:
|
||||
continue
|
||||
rel = os.path.relpath(path, host_dir)
|
||||
entries.append({'name': rel, 'b64': _b64.b64encode(data).decode('ascii')})
|
||||
return entries
|
||||
|
||||
async def _read_outbox_via_exec(self, query: pipeline_query.Query) -> list[dict]:
|
||||
"""Fallback: read the outbox over the exec channel (E2B / remote).
|
||||
|
||||
Note: exec stdout is truncated by ``output_limit_chars``, so this path
|
||||
only reliably transfers small files. The host path is preferred.
|
||||
"""
|
||||
import json as _json
|
||||
|
||||
target_dir = f'{self.OUTBOX_MOUNT_DIR}/{query.query_id}'
|
||||
max_bytes = self._EXEC_FALLBACK_MAX_BYTES
|
||||
script = (
|
||||
'import base64, json, os\n'
|
||||
f'target = {target_dir!r}\n'
|
||||
f'max_bytes = {max_bytes}\n'
|
||||
'out = []\n'
|
||||
'if os.path.isdir(target):\n'
|
||||
' for root, _dirs, names in os.walk(target):\n'
|
||||
' for n in sorted(names):\n'
|
||||
' p = os.path.join(root, n)\n'
|
||||
' try:\n'
|
||||
' if os.path.getsize(p) > max_bytes:\n'
|
||||
' continue\n'
|
||||
" with open(p, 'rb') as f:\n"
|
||||
' data = f.read()\n'
|
||||
' except OSError:\n'
|
||||
' continue\n'
|
||||
' rel = os.path.relpath(p, target)\n'
|
||||
" out.append({'name': rel, 'b64': base64.b64encode(data).decode('ascii')})\n"
|
||||
'print(json.dumps(out))\n'
|
||||
)
|
||||
result = await self.execute_tool(
|
||||
{'command': f"python3 - <<'LBPY'\n{script}\nLBPY", 'timeout_sec': 120},
|
||||
query,
|
||||
)
|
||||
if not result.get('ok'):
|
||||
return []
|
||||
try:
|
||||
return _json.loads(str(result.get('stdout') or '').strip().splitlines()[-1])
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
async def _clear_outbox(self, query: pipeline_query.Query, host_dir: str | None) -> None:
|
||||
"""Empty the per-query outbox after collection (host or exec)."""
|
||||
if host_dir is not None:
|
||||
import shutil
|
||||
|
||||
def _clear():
|
||||
shutil.rmtree(host_dir, ignore_errors=True)
|
||||
os.makedirs(host_dir, exist_ok=True)
|
||||
|
||||
await asyncio.to_thread(_clear)
|
||||
return
|
||||
target_dir = f'{self.OUTBOX_MOUNT_DIR}/{query.query_id}'
|
||||
await self.execute_tool(
|
||||
{'command': f'rm -rf {target_dir} && mkdir -p {target_dir}', 'timeout_sec': 30},
|
||||
query,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _classify_outbound_entries(entries: list[dict]) -> list[dict]:
|
||||
"""Classify outbox files into Image/Voice/File component descriptors."""
|
||||
image_exts = {'png', 'jpg', 'jpeg', 'gif', 'webp', 'bmp'}
|
||||
voice_exts = {'wav', 'mp3', 'silk', 'amr', 'ogg', 'm4a', 'aac'}
|
||||
mime_by_ext = {
|
||||
'png': 'image/png',
|
||||
'jpg': 'image/jpeg',
|
||||
'jpeg': 'image/jpeg',
|
||||
'gif': 'image/gif',
|
||||
'webp': 'image/webp',
|
||||
'bmp': 'image/bmp',
|
||||
}
|
||||
attachments: list[dict] = []
|
||||
for entry in entries or []:
|
||||
name = str(entry.get('name', '') or '')
|
||||
b64 = entry.get('b64')
|
||||
if not name or not b64:
|
||||
continue
|
||||
ext = name.rsplit('.', 1)[-1].lower() if '.' in name else ''
|
||||
base_name = os.path.basename(name)
|
||||
if ext in image_exts:
|
||||
mime = mime_by_ext.get(ext, 'image/png')
|
||||
attachments.append({'type': 'Image', 'name': base_name, 'base64': f'data:{mime};base64,{b64}'})
|
||||
elif ext in voice_exts:
|
||||
attachments.append({'type': 'Voice', 'name': base_name, 'base64': f'data:audio/{ext};base64,{b64}'})
|
||||
else:
|
||||
attachments.append({'type': 'File', 'name': base_name, 'base64': b64})
|
||||
return attachments
|
||||
|
||||
async def shutdown(self):
|
||||
await self.client.shutdown()
|
||||
|
||||
|
||||
@@ -3,49 +3,6 @@ import sqlalchemy
|
||||
from .base import Base
|
||||
|
||||
|
||||
class MonitoringTrace(Base):
|
||||
"""End-to-end monitoring trace records"""
|
||||
|
||||
__tablename__ = 'monitoring_traces'
|
||||
|
||||
trace_id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True)
|
||||
started_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True)
|
||||
ended_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True, index=True)
|
||||
duration = sqlalchemy.Column(sqlalchemy.Integer, nullable=True) # milliseconds
|
||||
status = sqlalchemy.Column(sqlalchemy.String(50), nullable=False, index=True) # running, success, error
|
||||
name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
|
||||
bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
bot_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
|
||||
pipeline_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
pipeline_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
|
||||
session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
message_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
query_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
attributes = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
|
||||
|
||||
|
||||
class MonitoringSpan(Base):
|
||||
"""Trace span records for pipeline, RAG, model, plugin and tool operations"""
|
||||
|
||||
__tablename__ = 'monitoring_spans'
|
||||
|
||||
span_id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True)
|
||||
trace_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True)
|
||||
parent_span_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
|
||||
kind = sqlalchemy.Column(sqlalchemy.String(80), nullable=False, index=True)
|
||||
status = sqlalchemy.Column(sqlalchemy.String(50), nullable=False, index=True)
|
||||
started_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True)
|
||||
ended_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True)
|
||||
duration = sqlalchemy.Column(sqlalchemy.Integer, nullable=True) # milliseconds
|
||||
message_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
pipeline_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
|
||||
attributes = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
|
||||
error_message = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
|
||||
|
||||
|
||||
class MonitoringMessage(Base):
|
||||
"""Monitoring message records"""
|
||||
|
||||
|
||||
@@ -1,88 +0,0 @@
|
||||
"""add monitoring traces and spans
|
||||
|
||||
Revision ID: 0006_monitoring_traces
|
||||
Revises: 0005_add_llm_context_length
|
||||
Create Date: 2026-06-16
|
||||
"""
|
||||
|
||||
import sqlalchemy as sa
|
||||
from alembic import op
|
||||
|
||||
revision = '0006_monitoring_traces'
|
||||
down_revision = '0005_add_llm_context_length'
|
||||
branch_labels = None
|
||||
depends_on = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
conn = op.get_bind()
|
||||
inspector = sa.inspect(conn)
|
||||
tables = set(inspector.get_table_names())
|
||||
|
||||
if 'monitoring_traces' not in tables:
|
||||
op.create_table(
|
||||
'monitoring_traces',
|
||||
sa.Column('trace_id', sa.String(length=255), nullable=False),
|
||||
sa.Column('started_at', sa.DateTime(), nullable=False),
|
||||
sa.Column('ended_at', sa.DateTime(), nullable=True),
|
||||
sa.Column('duration', sa.Integer(), nullable=True),
|
||||
sa.Column('status', sa.String(length=50), nullable=False),
|
||||
sa.Column('name', sa.String(length=255), nullable=False),
|
||||
sa.Column('bot_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('bot_name', sa.String(length=255), nullable=True),
|
||||
sa.Column('pipeline_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('pipeline_name', sa.String(length=255), nullable=True),
|
||||
sa.Column('session_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('message_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('query_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('attributes', sa.Text(), nullable=True),
|
||||
sa.PrimaryKeyConstraint('trace_id'),
|
||||
)
|
||||
op.create_index('ix_monitoring_traces_started_at', 'monitoring_traces', ['started_at'])
|
||||
op.create_index('ix_monitoring_traces_ended_at', 'monitoring_traces', ['ended_at'])
|
||||
op.create_index('ix_monitoring_traces_status', 'monitoring_traces', ['status'])
|
||||
op.create_index('ix_monitoring_traces_bot_id', 'monitoring_traces', ['bot_id'])
|
||||
op.create_index('ix_monitoring_traces_pipeline_id', 'monitoring_traces', ['pipeline_id'])
|
||||
op.create_index('ix_monitoring_traces_session_id', 'monitoring_traces', ['session_id'])
|
||||
op.create_index('ix_monitoring_traces_message_id', 'monitoring_traces', ['message_id'])
|
||||
op.create_index('ix_monitoring_traces_query_id', 'monitoring_traces', ['query_id'])
|
||||
|
||||
if 'monitoring_spans' not in tables:
|
||||
op.create_table(
|
||||
'monitoring_spans',
|
||||
sa.Column('span_id', sa.String(length=255), nullable=False),
|
||||
sa.Column('trace_id', sa.String(length=255), nullable=False),
|
||||
sa.Column('parent_span_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('name', sa.String(length=255), nullable=False),
|
||||
sa.Column('kind', sa.String(length=80), nullable=False),
|
||||
sa.Column('status', sa.String(length=50), nullable=False),
|
||||
sa.Column('started_at', sa.DateTime(), nullable=False),
|
||||
sa.Column('ended_at', sa.DateTime(), nullable=True),
|
||||
sa.Column('duration', sa.Integer(), nullable=True),
|
||||
sa.Column('message_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('session_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('bot_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('pipeline_id', sa.String(length=255), nullable=True),
|
||||
sa.Column('attributes', sa.Text(), nullable=True),
|
||||
sa.Column('error_message', sa.Text(), nullable=True),
|
||||
sa.PrimaryKeyConstraint('span_id'),
|
||||
)
|
||||
op.create_index('ix_monitoring_spans_trace_id', 'monitoring_spans', ['trace_id'])
|
||||
op.create_index('ix_monitoring_spans_parent_span_id', 'monitoring_spans', ['parent_span_id'])
|
||||
op.create_index('ix_monitoring_spans_kind', 'monitoring_spans', ['kind'])
|
||||
op.create_index('ix_monitoring_spans_status', 'monitoring_spans', ['status'])
|
||||
op.create_index('ix_monitoring_spans_started_at', 'monitoring_spans', ['started_at'])
|
||||
op.create_index('ix_monitoring_spans_message_id', 'monitoring_spans', ['message_id'])
|
||||
op.create_index('ix_monitoring_spans_session_id', 'monitoring_spans', ['session_id'])
|
||||
op.create_index('ix_monitoring_spans_bot_id', 'monitoring_spans', ['bot_id'])
|
||||
op.create_index('ix_monitoring_spans_pipeline_id', 'monitoring_spans', ['pipeline_id'])
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
conn = op.get_bind()
|
||||
inspector = sa.inspect(conn)
|
||||
tables = set(inspector.get_table_names())
|
||||
if 'monitoring_spans' in tables:
|
||||
op.drop_table('monitoring_spans')
|
||||
if 'monitoring_traces' in tables:
|
||||
op.drop_table('monitoring_traces')
|
||||
@@ -2,9 +2,6 @@ from __future__ import annotations
|
||||
|
||||
import typing
|
||||
import traceback
|
||||
import time
|
||||
import uuid
|
||||
import datetime
|
||||
|
||||
import sqlalchemy
|
||||
|
||||
@@ -82,19 +79,6 @@ class RuntimePipeline:
|
||||
enable_all_plugins: bool
|
||||
"""是否启用所有插件"""
|
||||
|
||||
@staticmethod
|
||||
def _new_span_id() -> str:
|
||||
return f'span-{uuid.uuid4().hex[:16]}'
|
||||
|
||||
@staticmethod
|
||||
def _utc_now() -> datetime.datetime:
|
||||
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
|
||||
|
||||
@staticmethod
|
||||
def _query_session_id(query: pipeline_query.Query) -> str:
|
||||
launcher_type = query.launcher_type.value if hasattr(query.launcher_type, 'value') else str(query.launcher_type)
|
||||
return f'{launcher_type}_{query.launcher_id}'
|
||||
|
||||
enable_all_mcp_servers: bool
|
||||
"""是否启用所有MCP服务器"""
|
||||
|
||||
@@ -250,102 +234,44 @@ class RuntimePipeline:
|
||||
stage_container = self.stage_containers[i]
|
||||
|
||||
query.current_stage_name = stage_container.inst_name # 标记到 Query 对象里
|
||||
span_started_at = self._utc_now()
|
||||
span_started = time.perf_counter()
|
||||
span_status = 'success'
|
||||
span_error = None
|
||||
span_result_type = None
|
||||
|
||||
try:
|
||||
result = stage_container.inst.process(query, stage_container.inst_name)
|
||||
result = stage_container.inst.process(query, stage_container.inst_name)
|
||||
|
||||
if isinstance(result, typing.Coroutine):
|
||||
result = await result
|
||||
if isinstance(result, typing.Coroutine):
|
||||
result = await result
|
||||
|
||||
if isinstance(result, pipeline_entities.StageProcessResult): # 直接返回结果
|
||||
span_result_type = str(
|
||||
result.result_type.value if hasattr(result.result_type, 'value') else result.result_type
|
||||
)
|
||||
if isinstance(result, pipeline_entities.StageProcessResult): # 直接返回结果
|
||||
self.ap.logger.debug(
|
||||
f'Stage {stage_container.inst_name} processed query {query.query_id} res {result.result_type}'
|
||||
)
|
||||
await self._check_output(query, result)
|
||||
|
||||
if result.result_type == pipeline_entities.ResultType.INTERRUPT:
|
||||
self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}')
|
||||
break
|
||||
elif result.result_type == pipeline_entities.ResultType.CONTINUE:
|
||||
query = result.new_query
|
||||
elif isinstance(result, typing.AsyncGenerator): # 生成器
|
||||
self.ap.logger.debug(f'Stage {stage_container.inst_name} processed query {query.query_id} gen')
|
||||
|
||||
async for sub_result in result:
|
||||
self.ap.logger.debug(
|
||||
f'Stage {stage_container.inst_name} processed query {query.query_id} res {result.result_type}'
|
||||
f'Stage {stage_container.inst_name} processed query {query.query_id} res {sub_result.result_type}'
|
||||
)
|
||||
await self._check_output(query, result)
|
||||
if result.error_notice:
|
||||
span_status = 'error'
|
||||
span_error = result.error_notice
|
||||
await self._check_output(query, sub_result)
|
||||
|
||||
if result.result_type == pipeline_entities.ResultType.INTERRUPT:
|
||||
if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT:
|
||||
self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}')
|
||||
break
|
||||
elif result.result_type == pipeline_entities.ResultType.CONTINUE:
|
||||
query = result.new_query
|
||||
elif isinstance(result, typing.AsyncGenerator): # 生成器
|
||||
span_result_type = 'generator'
|
||||
self.ap.logger.debug(f'Stage {stage_container.inst_name} processed query {query.query_id} gen')
|
||||
|
||||
async for sub_result in result:
|
||||
span_result_type = str(
|
||||
sub_result.result_type.value
|
||||
if hasattr(sub_result.result_type, 'value')
|
||||
else sub_result.result_type
|
||||
)
|
||||
self.ap.logger.debug(
|
||||
f'Stage {stage_container.inst_name} processed query {query.query_id} res {sub_result.result_type}'
|
||||
)
|
||||
await self._check_output(query, sub_result)
|
||||
if sub_result.error_notice:
|
||||
span_status = 'error'
|
||||
span_error = sub_result.error_notice
|
||||
|
||||
if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT:
|
||||
self.ap.logger.debug(
|
||||
f'Stage {stage_container.inst_name} interrupted query {query.query_id}'
|
||||
)
|
||||
break
|
||||
elif sub_result.result_type == pipeline_entities.ResultType.CONTINUE:
|
||||
query = sub_result.new_query
|
||||
await self._execute_from_stage(i + 1, query)
|
||||
break
|
||||
except Exception as e:
|
||||
span_status = 'error'
|
||||
span_error = str(e)
|
||||
raise
|
||||
finally:
|
||||
trace_id = (query.variables or {}).get('_monitoring_trace_id')
|
||||
root_span_id = (query.variables or {}).get('_monitoring_root_span_id')
|
||||
if trace_id:
|
||||
try:
|
||||
await self.ap.monitoring_service.record_span(
|
||||
trace_id=trace_id,
|
||||
parent_span_id=root_span_id,
|
||||
name=stage_container.inst_name,
|
||||
kind='pipeline.stage',
|
||||
status=span_status,
|
||||
started_at=span_started_at,
|
||||
duration=int((time.perf_counter() - span_started) * 1000),
|
||||
message_id=(query.variables or {}).get('_monitoring_message_id'),
|
||||
session_id=self._query_session_id(query),
|
||||
bot_id=query.bot_uuid,
|
||||
pipeline_id=self.pipeline_entity.uuid,
|
||||
attributes={
|
||||
'stage_class': stage_container.inst.__class__.__name__,
|
||||
'result_type': span_result_type,
|
||||
'query_id': query.query_id,
|
||||
},
|
||||
error_message=span_error,
|
||||
)
|
||||
except Exception as monitor_err:
|
||||
self.ap.logger.error(f'Failed to record stage span: {monitor_err}')
|
||||
elif sub_result.result_type == pipeline_entities.ResultType.CONTINUE:
|
||||
query = sub_result.new_query
|
||||
await self._execute_from_stage(i + 1, query)
|
||||
break
|
||||
|
||||
i += 1
|
||||
|
||||
async def process_query(self, query: pipeline_query.Query):
|
||||
"""处理请求"""
|
||||
trace_started_at = self._utc_now()
|
||||
trace_started = time.perf_counter()
|
||||
root_span_id = self._new_span_id()
|
||||
trace_id = None
|
||||
trace_status = 'success'
|
||||
# Get monitoring metadata
|
||||
bot_name = query.variables.get('_monitoring_bot_name', 'Unknown')
|
||||
pipeline_name = query.variables.get('_monitoring_pipeline_name', 'Unknown')
|
||||
@@ -377,28 +303,6 @@ class RuntimePipeline:
|
||||
except Exception as e:
|
||||
self.ap.logger.error(f'Failed to record query start: {e}')
|
||||
|
||||
try:
|
||||
trace_id = await self.ap.monitoring_service.start_trace(
|
||||
name='LangBot query',
|
||||
bot_id=query.bot_uuid or 'unknown',
|
||||
bot_name=bot_name,
|
||||
pipeline_id=self.pipeline_entity.uuid,
|
||||
pipeline_name=pipeline_name,
|
||||
session_id=self._query_session_id(query),
|
||||
message_id=message_id or None,
|
||||
query_id=query.query_id,
|
||||
attributes={
|
||||
'launcher_type': query.launcher_type.value
|
||||
if hasattr(query.launcher_type, 'value')
|
||||
else str(query.launcher_type),
|
||||
'runner_name': runner_name,
|
||||
},
|
||||
)
|
||||
query.variables['_monitoring_trace_id'] = trace_id
|
||||
query.variables['_monitoring_root_span_id'] = root_span_id
|
||||
except Exception as e:
|
||||
self.ap.logger.error(f'Failed to start query trace: {e}')
|
||||
|
||||
try:
|
||||
# Get bound plugins for this pipeline
|
||||
bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
|
||||
@@ -432,10 +336,7 @@ class RuntimePipeline:
|
||||
await self._execute_from_stage(0, query)
|
||||
|
||||
# Record query success only if no error occurred during processing
|
||||
has_monitoring_error = query.variables.get('_monitoring_has_error', False)
|
||||
if has_monitoring_error:
|
||||
trace_status = 'error'
|
||||
else:
|
||||
if not query.variables.get('_monitoring_has_error', False):
|
||||
try:
|
||||
await monitoring_helper.MonitoringHelper.record_query_success(
|
||||
ap=self.ap,
|
||||
@@ -460,7 +361,6 @@ class RuntimePipeline:
|
||||
self.ap.logger.error(f'Failed to record query response: {e}')
|
||||
|
||||
except Exception as e:
|
||||
trace_status = 'error'
|
||||
inst_name = query.current_stage_name if query.current_stage_name else 'unknown'
|
||||
self.ap.logger.error(f'Error processing query {query.query_id} stage={inst_name} : {e}')
|
||||
self.ap.logger.error(f'Traceback: {traceback.format_exc()}')
|
||||
@@ -483,35 +383,6 @@ class RuntimePipeline:
|
||||
self.ap.logger.error(f'Failed to record query error: {me}')
|
||||
|
||||
finally:
|
||||
if trace_id:
|
||||
try:
|
||||
duration_ms = int((time.perf_counter() - trace_started) * 1000)
|
||||
await self.ap.monitoring_service.record_span(
|
||||
trace_id=trace_id,
|
||||
span_id=root_span_id,
|
||||
name='LangBot query',
|
||||
kind='pipeline.query',
|
||||
status=trace_status,
|
||||
started_at=trace_started_at,
|
||||
duration=duration_ms,
|
||||
message_id=message_id or None,
|
||||
session_id=self._query_session_id(query),
|
||||
bot_id=query.bot_uuid,
|
||||
pipeline_id=self.pipeline_entity.uuid,
|
||||
attributes={
|
||||
'query_id': query.query_id,
|
||||
'pipeline_name': pipeline_name,
|
||||
'runner_name': runner_name,
|
||||
},
|
||||
)
|
||||
await self.ap.monitoring_service.finish_trace(
|
||||
trace_id=trace_id,
|
||||
status=trace_status,
|
||||
duration=duration_ms,
|
||||
message_id=message_id or None,
|
||||
)
|
||||
except Exception as monitor_err:
|
||||
self.ap.logger.error(f'Failed to finish query trace: {monitor_err}')
|
||||
self.ap.logger.debug(f'Query {query.query_id} processed')
|
||||
del self.ap.query_pool.cached_queries[query.query_id]
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ from .. import stage
|
||||
|
||||
import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
import langbot_plugin.api.entities.events as events
|
||||
|
||||
|
||||
@@ -23,6 +24,50 @@ class ResponseWrapper(stage.PipelineStage):
|
||||
async def initialize(self, pipeline_config: dict):
|
||||
pass
|
||||
|
||||
def _is_final_assistant_message(self, result) -> bool:
|
||||
"""Whether *result* is the agent's final, tool-call-free answer.
|
||||
|
||||
Intermediate streaming chunks and tool-call rounds must NOT trigger
|
||||
outbound attachment collection — only the terminal assistant message.
|
||||
"""
|
||||
if getattr(result, 'role', None) != 'assistant':
|
||||
return False
|
||||
if result.tool_calls:
|
||||
return False
|
||||
if isinstance(result, provider_message.MessageChunk):
|
||||
return bool(result.is_final)
|
||||
return True
|
||||
|
||||
async def _append_outbound_attachments(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
message_chain: platform_message.MessageChain,
|
||||
) -> None:
|
||||
"""Collect sandbox outbox files and append them to *message_chain*.
|
||||
|
||||
Runs at most once per query (guarded by a query variable) and never
|
||||
raises into the pipeline — attachment delivery is best-effort.
|
||||
"""
|
||||
if query.variables.get('_sandbox_outbound_collected'):
|
||||
return
|
||||
box_service = getattr(self.ap, 'box_service', None)
|
||||
if box_service is None or not getattr(box_service, 'available', False):
|
||||
return
|
||||
query.variables['_sandbox_outbound_collected'] = True
|
||||
try:
|
||||
attachments = await box_service.collect_outbound_attachments(query)
|
||||
except Exception as e:
|
||||
self.ap.logger.warning(f'Outbound attachment collection failed: {e}')
|
||||
return
|
||||
for att in attachments:
|
||||
att_type = att.get('type')
|
||||
if att_type == 'Image':
|
||||
message_chain.append(platform_message.Image(base64=att['base64']))
|
||||
elif att_type == 'Voice':
|
||||
message_chain.append(platform_message.Voice(base64=att['base64']))
|
||||
else:
|
||||
message_chain.append(platform_message.File(name=att.get('name', 'file'), base64=att['base64']))
|
||||
|
||||
async def process(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
@@ -83,10 +128,16 @@ class ResponseWrapper(stage.PipelineStage):
|
||||
)
|
||||
else:
|
||||
if event_ctx.event.reply_message_chain is not None:
|
||||
query.resp_message_chain.append(event_ctx.event.reply_message_chain)
|
||||
|
||||
reply_chain = event_ctx.event.reply_message_chain
|
||||
else:
|
||||
query.resp_message_chain.append(result.get_content_platform_message_chain())
|
||||
reply_chain = result.get_content_platform_message_chain()
|
||||
|
||||
# Attach files the agent produced in the sandbox
|
||||
# outbox, but only on the terminal assistant message.
|
||||
if self._is_final_assistant_message(result):
|
||||
await self._append_outbound_attachments(query, reply_chain)
|
||||
|
||||
query.resp_message_chain.append(reply_chain)
|
||||
|
||||
yield entities.StageProcessResult(
|
||||
result_type=entities.ResultType.CONTINUE,
|
||||
|
||||
@@ -312,12 +312,18 @@ class WebSocketAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter)
|
||||
|
||||
async def _process_image_components(self, message_chain_obj: list):
|
||||
"""
|
||||
处理消息链中的图片和文件组件,将path转换为base64
|
||||
处理消息链中的图片、语音和文件组件,将 path 转换为 base64
|
||||
|
||||
Image / Voice / File components uploaded from the web client carry a
|
||||
storage key in ``path``. Resolve it to a base64 data URI so downstream
|
||||
stages (multimodal LLM input and the Box sandbox inbox) have a usable
|
||||
payload, then drop the now-consumed storage object.
|
||||
|
||||
Args:
|
||||
message_chain_obj: 消息链对象列表
|
||||
"""
|
||||
import base64
|
||||
import mimetypes
|
||||
|
||||
storage_mgr = self.ap.storage_mgr
|
||||
|
||||
@@ -325,31 +331,33 @@ class WebSocketAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter)
|
||||
comp_type = component.get('type', '')
|
||||
comp_path = component.get('path', '')
|
||||
|
||||
if not comp_path:
|
||||
if not comp_path or comp_type not in ('Image', 'Voice', 'File'):
|
||||
continue
|
||||
|
||||
if comp_type == 'Image':
|
||||
try:
|
||||
file_content = await storage_mgr.storage_provider.load(comp_path)
|
||||
base64_str = base64.b64encode(file_content).decode('utf-8')
|
||||
try:
|
||||
file_content = await storage_mgr.storage_provider.load(comp_path)
|
||||
base64_str = base64.b64encode(file_content).decode('utf-8')
|
||||
|
||||
file_key = comp_path
|
||||
if file_key.lower().endswith(('.jpg', '.jpeg')):
|
||||
lowered = comp_path.lower()
|
||||
if comp_type == 'Image':
|
||||
if lowered.endswith(('.jpg', '.jpeg')):
|
||||
mime_type = 'image/jpeg'
|
||||
elif file_key.lower().endswith('.png'):
|
||||
mime_type = 'image/png'
|
||||
elif file_key.lower().endswith('.gif'):
|
||||
elif lowered.endswith('.gif'):
|
||||
mime_type = 'image/gif'
|
||||
elif file_key.lower().endswith('.webp'):
|
||||
elif lowered.endswith('.webp'):
|
||||
mime_type = 'image/webp'
|
||||
else:
|
||||
mime_type = 'image/png'
|
||||
elif comp_type == 'Voice':
|
||||
mime_type = mimetypes.guess_type(comp_path)[0] or 'audio/wav'
|
||||
else: # File
|
||||
mime_type = mimetypes.guess_type(comp_path)[0] or 'application/octet-stream'
|
||||
|
||||
component['base64'] = f'data:{mime_type};base64,{base64_str}'
|
||||
await storage_mgr.storage_provider.delete(comp_path)
|
||||
component['path'] = ''
|
||||
except Exception as e:
|
||||
await self.logger.error(f'Failed to load image file {comp_path}: {e}')
|
||||
component['base64'] = f'data:{mime_type};base64,{base64_str}'
|
||||
await storage_mgr.storage_provider.delete(comp_path)
|
||||
component['path'] = ''
|
||||
except Exception as e:
|
||||
await self.logger.error(f'Failed to load {comp_type} file {comp_path}: {e}')
|
||||
|
||||
async def handle_websocket_message(
|
||||
self,
|
||||
|
||||
@@ -711,19 +711,8 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
|
||||
endpoint: str,
|
||||
method: str,
|
||||
body: Any = None,
|
||||
caller: dict[str, Any] | None = None,
|
||||
headers: dict[str, str] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
return await self.handler.handle_page_api(
|
||||
plugin_author,
|
||||
plugin_name,
|
||||
page_id,
|
||||
endpoint,
|
||||
method,
|
||||
body,
|
||||
caller,
|
||||
headers or {},
|
||||
)
|
||||
return await self.handler.handle_page_api(plugin_author, plugin_name, page_id, endpoint, method, body)
|
||||
|
||||
async def get_debug_info(self) -> dict[str, Any]:
|
||||
"""Get debug information including debug key and WS URL"""
|
||||
|
||||
@@ -755,21 +755,6 @@ class RuntimeConnectionHandler(handler.Handler):
|
||||
'session_name': session_name,
|
||||
'bot_uuid': query.bot_uuid or '',
|
||||
'sender_id': str(query.sender_id),
|
||||
'_trace_context': {
|
||||
'trace_id': query.variables.get('_monitoring_trace_id') if query.variables else None,
|
||||
'parent_span_id': query.variables.get('_monitoring_root_span_id')
|
||||
if query.variables
|
||||
else None,
|
||||
'message_id': query.variables.get('_monitoring_message_id') if query.variables else None,
|
||||
'query_id': query.query_id,
|
||||
'session_id': session_name,
|
||||
'bot_id': query.bot_uuid or '',
|
||||
'pipeline_id': query.pipeline_uuid or '',
|
||||
'knowledge_base_id': kb_id,
|
||||
'attributes': {
|
||||
'source': 'plugin-api',
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
results = [entry.model_dump(mode='json') for entry in entries]
|
||||
@@ -1026,8 +1011,6 @@ class RuntimeConnectionHandler(handler.Handler):
|
||||
endpoint: str,
|
||||
method: str,
|
||||
body: Any = None,
|
||||
caller: dict[str, Any] | None = None,
|
||||
headers: dict[str, str] | None = None,
|
||||
) -> dict[str, Any]:
|
||||
"""Forward a page API call to the plugin via runtime."""
|
||||
result = await self.call_action(
|
||||
@@ -1039,8 +1022,6 @@ class RuntimeConnectionHandler(handler.Handler):
|
||||
'endpoint': endpoint,
|
||||
'method': method,
|
||||
'body': body,
|
||||
'caller': caller,
|
||||
'headers': headers or {},
|
||||
},
|
||||
timeout=30,
|
||||
)
|
||||
|
||||
@@ -3,7 +3,6 @@ from __future__ import annotations
|
||||
import abc
|
||||
import typing
|
||||
import time
|
||||
import datetime
|
||||
|
||||
from ...core import app
|
||||
from ...entity.persistence import model as persistence_model
|
||||
@@ -17,15 +16,6 @@ LLM_USAGE_QUERY_VARIABLE = '_llm_usage'
|
||||
STREAM_USAGE_QUERY_VARIABLE = '_stream_usage'
|
||||
|
||||
|
||||
def _utc_now() -> datetime.datetime:
|
||||
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
|
||||
|
||||
|
||||
def _query_session_id(query: pipeline_query.Query) -> str:
|
||||
launcher_type = query.launcher_type.value if hasattr(query.launcher_type, 'value') else str(query.launcher_type)
|
||||
return f'{launcher_type}_{query.launcher_id}'
|
||||
|
||||
|
||||
def _store_llm_usage(query: pipeline_query.Query | None, usage_info: dict | None) -> None:
|
||||
"""Store the latest provider usage on the query for upstream action handlers."""
|
||||
if query is None or not usage_info:
|
||||
@@ -69,7 +59,6 @@ class RuntimeProvider:
|
||||
"""Bridge method for invoking LLM with monitoring"""
|
||||
# Start timing for monitoring
|
||||
start_time = time.time()
|
||||
span_started_at = _utc_now()
|
||||
input_tokens = 0
|
||||
output_tokens = 0
|
||||
status = 'success'
|
||||
@@ -136,30 +125,6 @@ class RuntimeProvider:
|
||||
error_message=error_message,
|
||||
message_id=message_id,
|
||||
)
|
||||
trace_id = query.variables.get('_monitoring_trace_id') if query.variables else None
|
||||
parent_span_id = query.variables.get('_monitoring_root_span_id') if query.variables else None
|
||||
if trace_id:
|
||||
await self.requester.ap.monitoring_service.record_span(
|
||||
trace_id=trace_id,
|
||||
parent_span_id=parent_span_id,
|
||||
name=f'LLM {model.model_entity.name}',
|
||||
kind='model.llm',
|
||||
status=status,
|
||||
started_at=span_started_at,
|
||||
duration=duration_ms,
|
||||
message_id=message_id,
|
||||
session_id=_query_session_id(query),
|
||||
bot_id=query.bot_uuid,
|
||||
pipeline_id=query.pipeline_uuid,
|
||||
attributes={
|
||||
'model_name': model.model_entity.name,
|
||||
'input_tokens': input_tokens,
|
||||
'output_tokens': output_tokens,
|
||||
'total_tokens': input_tokens + output_tokens,
|
||||
'stream': False,
|
||||
},
|
||||
error_message=error_message,
|
||||
)
|
||||
except Exception as monitor_err:
|
||||
self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM call: {monitor_err}')
|
||||
|
||||
@@ -175,7 +140,6 @@ class RuntimeProvider:
|
||||
"""Bridge method for invoking LLM stream with monitoring"""
|
||||
# Start timing for monitoring
|
||||
start_time = time.time()
|
||||
span_started_at = _utc_now()
|
||||
status = 'success'
|
||||
error_message = None
|
||||
input_tokens = 0
|
||||
@@ -240,30 +204,6 @@ class RuntimeProvider:
|
||||
error_message=error_message,
|
||||
message_id=message_id,
|
||||
)
|
||||
trace_id = query.variables.get('_monitoring_trace_id') if query.variables else None
|
||||
parent_span_id = query.variables.get('_monitoring_root_span_id') if query.variables else None
|
||||
if trace_id:
|
||||
await self.requester.ap.monitoring_service.record_span(
|
||||
trace_id=trace_id,
|
||||
parent_span_id=parent_span_id,
|
||||
name=f'LLM stream {model.model_entity.name}',
|
||||
kind='model.llm',
|
||||
status=status,
|
||||
started_at=span_started_at,
|
||||
duration=duration_ms,
|
||||
message_id=message_id,
|
||||
session_id=_query_session_id(query),
|
||||
bot_id=query.bot_uuid,
|
||||
pipeline_id=query.pipeline_uuid,
|
||||
attributes={
|
||||
'model_name': model.model_entity.name,
|
||||
'input_tokens': input_tokens,
|
||||
'output_tokens': output_tokens,
|
||||
'total_tokens': input_tokens + output_tokens,
|
||||
'stream': True,
|
||||
},
|
||||
error_message=error_message,
|
||||
)
|
||||
except Exception as monitor_err:
|
||||
self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM stream call: {monitor_err}')
|
||||
|
||||
|
||||
@@ -216,11 +216,22 @@ class LiteLLMRequester(requester.ProviderAPIRequester):
|
||||
content = msg_dict.get('content')
|
||||
|
||||
if isinstance(content, list):
|
||||
converted_parts = []
|
||||
for part in content:
|
||||
if isinstance(part, dict) and part.get('type') == 'image_base64':
|
||||
part['image_url'] = {'url': part['image_base64']}
|
||||
part['type'] = 'image_url'
|
||||
del part['image_base64']
|
||||
# OpenAI-compatible chat models reject non-image file parts
|
||||
# (audio/document base64 or url). These originate from Voice /
|
||||
# File attachments — including ones replayed from conversation
|
||||
# history — and the agent already accesses their bytes via the
|
||||
# sandbox. Drop them from the model payload to avoid
|
||||
# "Invalid user message ... invalid content type=file_base64".
|
||||
if isinstance(part, dict) and part.get('type') in ('file_base64', 'file_url'):
|
||||
continue
|
||||
converted_parts.append(part)
|
||||
msg_dict['content'] = converted_parts
|
||||
|
||||
req_messages.append(msg_dict)
|
||||
|
||||
|
||||
@@ -104,6 +104,68 @@ class _StreamAccumulator:
|
||||
class LocalAgentRunner(runner.RequestRunner):
|
||||
"""Local agent request runner"""
|
||||
|
||||
async def _inject_inbound_attachments(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
user_message: provider_message.Message,
|
||||
) -> None:
|
||||
"""Persist inbound attachments into the sandbox and tell the model.
|
||||
|
||||
No-op when the box service is unavailable or there are no attachments.
|
||||
On success, appends an extra text ContentElement to the user message
|
||||
listing the in-sandbox paths and the outbox convention, and stashes the
|
||||
descriptors in ``query.variables['_sandbox_inbound_attachments']``.
|
||||
"""
|
||||
box_service = getattr(self.ap, 'box_service', None)
|
||||
if box_service is None or not getattr(box_service, 'available', False):
|
||||
return
|
||||
try:
|
||||
attachments = await box_service.materialize_inbound_attachments(query)
|
||||
except Exception as e: # never break the chat turn over attachment IO
|
||||
self.ap.logger.warning(f'Inbound attachment materialization failed: {e}')
|
||||
return
|
||||
if not attachments:
|
||||
return
|
||||
|
||||
query.variables['_sandbox_inbound_attachments'] = attachments
|
||||
|
||||
lines = [
|
||||
'The user sent attachments. They have been saved into the sandbox and are '
|
||||
'available to the exec/read/write tools at these paths:'
|
||||
]
|
||||
for att in attachments:
|
||||
lines.append(f'- {att["type"]}: {att["path"]} ({att["size"]} bytes)')
|
||||
outbox_dir = f'{box_service.OUTBOX_MOUNT_DIR}/{query.query_id}'
|
||||
lines.append(
|
||||
'If you produce any file (image, audio, document, etc.) that should be sent '
|
||||
f'back to the user, write it into {outbox_dir}/ (create the directory if '
|
||||
'needed). Every file placed there will be delivered to the user automatically.'
|
||||
)
|
||||
note = '\n'.join(lines)
|
||||
|
||||
# Voice/File attachments are now available to the agent via the sandbox
|
||||
# (exec/read/write tools). Their raw bytes must NOT be forwarded to the
|
||||
# chat model as multimodal content: providers reject non-image file
|
||||
# parts ("Invalid user message ... ensure all user messages are valid
|
||||
# OpenAI chat completion messages"). Strip those content elements and
|
||||
# rely on the sandbox-path note instead. Images are kept so vision
|
||||
# models can still see them.
|
||||
_model_unsafe_types = {'file_base64', 'file_url'}
|
||||
if isinstance(user_message.content, list):
|
||||
user_message.content = [
|
||||
ce for ce in user_message.content if getattr(ce, 'type', None) not in _model_unsafe_types
|
||||
]
|
||||
|
||||
if isinstance(user_message.content, str):
|
||||
user_message.content = [
|
||||
provider_message.ContentElement.from_text(user_message.content),
|
||||
provider_message.ContentElement.from_text(note),
|
||||
]
|
||||
elif isinstance(user_message.content, list):
|
||||
user_message.content.append(provider_message.ContentElement.from_text(note))
|
||||
else:
|
||||
user_message.content = [provider_message.ContentElement.from_text(note)]
|
||||
|
||||
def _build_request_messages(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
@@ -232,6 +294,12 @@ class LocalAgentRunner(runner.RequestRunner):
|
||||
|
||||
user_message = copy.deepcopy(query.user_message)
|
||||
|
||||
# Materialize inbound attachments (images / voices / files) into the
|
||||
# sandbox so the agent's exec/read/write tools can operate on the real
|
||||
# bytes — not just the multimodal copy the model sees. The exact
|
||||
# in-sandbox paths are announced to the model as a system note.
|
||||
await self._inject_inbound_attachments(query, user_message)
|
||||
|
||||
user_message_text = ''
|
||||
|
||||
if isinstance(user_message.content, str):
|
||||
@@ -268,21 +336,6 @@ class LocalAgentRunner(runner.RequestRunner):
|
||||
'bot_uuid': query.bot_uuid or '',
|
||||
'sender_id': str(query.sender_id),
|
||||
'session_name': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
|
||||
'_trace_context': {
|
||||
'trace_id': query.variables.get('_monitoring_trace_id') if query.variables else None,
|
||||
'parent_span_id': query.variables.get('_monitoring_root_span_id')
|
||||
if query.variables
|
||||
else None,
|
||||
'message_id': query.variables.get('_monitoring_message_id') if query.variables else None,
|
||||
'query_id': query.query_id,
|
||||
'session_id': f'{query.launcher_type.value}_{query.launcher_id}',
|
||||
'bot_id': query.bot_uuid or '',
|
||||
'pipeline_id': query.pipeline_uuid or '',
|
||||
'knowledge_base_id': kb_uuid,
|
||||
'attributes': {
|
||||
'source': 'local-agent',
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
|
||||
|
||||
@@ -1,12 +1,10 @@
|
||||
from __future__ import annotations
|
||||
import mimetypes
|
||||
import os.path
|
||||
import time
|
||||
import traceback
|
||||
import uuid
|
||||
import zipfile
|
||||
import io
|
||||
import datetime
|
||||
from typing import Any
|
||||
from langbot.pkg.core import app
|
||||
import sqlalchemy
|
||||
@@ -27,10 +25,6 @@ class RuntimeKnowledgeBase(KnowledgeBaseInterface):
|
||||
super().__init__(ap)
|
||||
self.knowledge_base_entity = knowledge_base_entity
|
||||
|
||||
@staticmethod
|
||||
def _utc_now() -> datetime.datetime:
|
||||
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
|
||||
|
||||
async def initialize(self):
|
||||
pass
|
||||
|
||||
@@ -340,25 +334,6 @@ class RuntimeKnowledgeBase(KnowledgeBaseInterface):
|
||||
# are passed directly to vector_search by some plugins (e.g. LangRAG)
|
||||
# and would cause empty results when the metadata field doesn't exist.
|
||||
filters = settings.pop('filters', {})
|
||||
trace_context = settings.pop('_trace_context', None)
|
||||
host_span_started_at = self._utc_now()
|
||||
host_span_started = time.perf_counter()
|
||||
host_span_id = None
|
||||
if trace_context and trace_context.get('trace_id'):
|
||||
host_parent_span_id = trace_context.get('parent_span_id')
|
||||
host_span_id = trace_context.get('rag_span_id') or f'span-{uuid.uuid4().hex[:16]}'
|
||||
trace_context = {
|
||||
'trace_id': trace_context.get('trace_id'),
|
||||
'parent_span_id': host_span_id,
|
||||
'host_parent_span_id': host_parent_span_id,
|
||||
'message_id': trace_context.get('message_id'),
|
||||
'query_id': trace_context.get('query_id'),
|
||||
'session_id': trace_context.get('session_id'),
|
||||
'bot_id': trace_context.get('bot_id'),
|
||||
'pipeline_id': trace_context.get('pipeline_id'),
|
||||
'knowledge_base_id': kb.uuid,
|
||||
'attributes': trace_context.get('attributes') or {},
|
||||
}
|
||||
|
||||
retrieval_context = {
|
||||
'query': query,
|
||||
@@ -368,107 +343,13 @@ class RuntimeKnowledgeBase(KnowledgeBaseInterface):
|
||||
'creation_settings': kb.creation_settings or {},
|
||||
'filters': filters,
|
||||
}
|
||||
if trace_context:
|
||||
retrieval_context['trace_context'] = trace_context
|
||||
|
||||
try:
|
||||
result = await self.ap.plugin_connector.call_rag_retrieve(
|
||||
plugin_id,
|
||||
retrieval_context,
|
||||
)
|
||||
except Exception as e:
|
||||
if trace_context:
|
||||
await self._record_rag_trace_result(
|
||||
trace_context=trace_context,
|
||||
host_span_id=host_span_id,
|
||||
started_at=host_span_started_at,
|
||||
duration=int((time.perf_counter() - host_span_started) * 1000),
|
||||
plugin_id=plugin_id,
|
||||
result={
|
||||
'results': [],
|
||||
'metadata': {
|
||||
'status': 'error',
|
||||
'error_message': str(e),
|
||||
},
|
||||
},
|
||||
)
|
||||
raise
|
||||
if trace_context:
|
||||
await self._record_rag_trace_result(
|
||||
trace_context=trace_context,
|
||||
host_span_id=host_span_id,
|
||||
started_at=host_span_started_at,
|
||||
duration=int((time.perf_counter() - host_span_started) * 1000),
|
||||
plugin_id=plugin_id,
|
||||
result=result,
|
||||
)
|
||||
result = await self.ap.plugin_connector.call_rag_retrieve(
|
||||
plugin_id,
|
||||
retrieval_context,
|
||||
)
|
||||
return result
|
||||
|
||||
async def _record_rag_trace_result(
|
||||
self,
|
||||
trace_context: dict[str, Any],
|
||||
host_span_id: str | None,
|
||||
started_at: datetime.datetime,
|
||||
duration: int,
|
||||
plugin_id: str,
|
||||
result: dict[str, Any],
|
||||
) -> None:
|
||||
"""Persist host RAG span and plugin-provided child spans."""
|
||||
trace_id = trace_context.get('trace_id')
|
||||
if not trace_id:
|
||||
return
|
||||
|
||||
metadata = result.get('metadata') if isinstance(result, dict) else {}
|
||||
metadata = metadata if isinstance(metadata, dict) else {}
|
||||
plugin_spans = metadata.get('trace_spans') if isinstance(metadata.get('trace_spans'), list) else []
|
||||
parent_span_id = trace_context.get('parent_span_id')
|
||||
host_parent_span_id = trace_context.get('host_parent_span_id')
|
||||
|
||||
try:
|
||||
await self.ap.monitoring_service.record_span(
|
||||
trace_id=trace_id,
|
||||
span_id=host_span_id,
|
||||
parent_span_id=host_parent_span_id,
|
||||
name=f'Knowledge retrieval {self.knowledge_base_entity.name}',
|
||||
kind='rag.retrieval',
|
||||
status=metadata.get('status', 'success'),
|
||||
started_at=started_at,
|
||||
duration=duration,
|
||||
message_id=trace_context.get('message_id'),
|
||||
session_id=trace_context.get('session_id'),
|
||||
bot_id=trace_context.get('bot_id'),
|
||||
pipeline_id=trace_context.get('pipeline_id'),
|
||||
attributes={
|
||||
'knowledge_base_id': self.knowledge_base_entity.uuid,
|
||||
'knowledge_base_name': self.knowledge_base_entity.name,
|
||||
'plugin_id': plugin_id,
|
||||
'returned_count': len(result.get('results', []) if isinstance(result, dict) else []),
|
||||
'total_found': result.get('total_found') if isinstance(result, dict) else None,
|
||||
},
|
||||
error_message=metadata.get('error_message'),
|
||||
)
|
||||
for span in plugin_spans:
|
||||
if not isinstance(span, dict):
|
||||
continue
|
||||
await self.ap.monitoring_service.record_span(
|
||||
trace_id=trace_id,
|
||||
span_id=span.get('span_id'),
|
||||
parent_span_id=span.get('parent_span_id') or host_span_id or parent_span_id,
|
||||
name=span.get('name') or 'RAG plugin stage',
|
||||
kind=span.get('kind') or 'rag.stage',
|
||||
status=span.get('status') or 'success',
|
||||
started_at=started_at,
|
||||
duration=span.get('duration_ms'),
|
||||
message_id=trace_context.get('message_id'),
|
||||
session_id=trace_context.get('session_id'),
|
||||
bot_id=trace_context.get('bot_id'),
|
||||
pipeline_id=trace_context.get('pipeline_id'),
|
||||
attributes=span.get('attributes') if isinstance(span.get('attributes'), dict) else {},
|
||||
error_message=span.get('error_message'),
|
||||
)
|
||||
except Exception as e:
|
||||
self.ap.logger.error(f'Failed to record RAG trace spans: {e}')
|
||||
|
||||
async def _delete_document(self, document_id: str) -> bool:
|
||||
"""Call plugin to delete document."""
|
||||
kb = self.knowledge_base_entity
|
||||
|
||||
@@ -8,7 +8,6 @@ Run: uv run pytest tests/integration/api/test_monitoring.py -q
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
import pytest
|
||||
from unittest.mock import MagicMock, AsyncMock, Mock
|
||||
|
||||
@@ -83,15 +82,6 @@ def fake_monitoring_app():
|
||||
app.monitoring_service.get_messages = AsyncMock(return_value=([{'id': 'msg-1', 'content': 'test'}], 100))
|
||||
app.monitoring_service.get_llm_calls = AsyncMock(return_value=([{'id': 'llm-1'}], 50))
|
||||
app.monitoring_service.get_embedding_calls = AsyncMock(return_value=([{'id': 'emb-1'}], 10))
|
||||
app.monitoring_service.get_traces = AsyncMock(return_value=([{'trace_id': 'trace-1'}], 1))
|
||||
app.monitoring_service.get_trace_details = AsyncMock(
|
||||
side_effect=lambda trace_id: {
|
||||
'found': trace_id == 'trace-1',
|
||||
'trace_id': trace_id,
|
||||
'trace': {'trace_id': trace_id} if trace_id == 'trace-1' else None,
|
||||
'spans': [] if trace_id == 'trace-1' else None,
|
||||
}
|
||||
)
|
||||
app.monitoring_service.get_sessions = AsyncMock(return_value=([{'session_id': 'sess-1'}], 20))
|
||||
app.monitoring_service.get_errors = AsyncMock(return_value=([{'id': 'err-1'}], 2))
|
||||
app.monitoring_service.get_session_analysis = AsyncMock(
|
||||
@@ -232,7 +222,6 @@ class TestMonitoringAllDataEndpoint:
|
||||
assert response.status_code == 200
|
||||
data = await response.get_json()
|
||||
assert 'overview' in data['data']
|
||||
assert 'traces' in data['data']
|
||||
|
||||
|
||||
@pytest.mark.usefixtures('mock_circular_import_chain')
|
||||
@@ -257,60 +246,6 @@ class TestMonitoringDetailsEndpoints:
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_trace_details(self, quart_test_client):
|
||||
"""GET /api/v1/monitoring/traces/{id}."""
|
||||
response = await quart_test_client.get(
|
||||
'/api/v1/monitoring/traces/trace-1', headers={'Authorization': 'Bearer test_token'}
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
|
||||
|
||||
@pytest.mark.usefixtures('mock_circular_import_chain')
|
||||
class TestMonitoringTraceEndpoints:
|
||||
"""Tests for trace list and detail endpoints."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_traces_forwards_filters(self, quart_test_client, fake_monitoring_app):
|
||||
"""GET /api/v1/monitoring/traces forwards filters to service."""
|
||||
response = await quart_test_client.get(
|
||||
'/api/v1/monitoring/traces'
|
||||
'?botId=bot-1'
|
||||
'&pipelineId=pipeline-1'
|
||||
'&sessionId=session-1'
|
||||
'&status=success'
|
||||
'&startTime=2026-01-01T00:00:00Z'
|
||||
'&endTime=2026-01-02T00:00:00Z'
|
||||
'&limit=25'
|
||||
'&offset=5',
|
||||
headers={'Authorization': 'Bearer test_token'},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = await response.get_json()
|
||||
assert data['data']['traces'] == [{'trace_id': 'trace-1'}]
|
||||
assert data['data']['total'] == 1
|
||||
fake_monitoring_app.monitoring_service.get_traces.assert_awaited_with(
|
||||
bot_ids=['bot-1'],
|
||||
pipeline_ids=['pipeline-1'],
|
||||
session_ids=['session-1'],
|
||||
statuses=['success'],
|
||||
start_time=datetime.datetime(2026, 1, 1, 0, 0),
|
||||
end_time=datetime.datetime(2026, 1, 2, 0, 0),
|
||||
limit=25,
|
||||
offset=5,
|
||||
)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_trace_details_not_found(self, quart_test_client):
|
||||
"""GET /api/v1/monitoring/traces/{id} returns 404 when missing."""
|
||||
response = await quart_test_client.get(
|
||||
'/api/v1/monitoring/traces/trace-missing', headers={'Authorization': 'Bearer test_token'}
|
||||
)
|
||||
|
||||
assert response.status_code == 404
|
||||
|
||||
|
||||
@pytest.mark.usefixtures('mock_circular_import_chain')
|
||||
class TestMonitoringFeedbackEndpoints:
|
||||
|
||||
@@ -104,7 +104,7 @@ class TestSQLiteMigrationUpgrade:
|
||||
rev = await get_alembic_current(sqlite_engine)
|
||||
assert rev is not None, 'Expected a revision after upgrade'
|
||||
# Head should be the latest migration
|
||||
assert rev.startswith('0006'), f'Expected head to be 0006_*, got {rev}'
|
||||
assert rev.startswith('0005'), f'Expected head to be 0005_*, got {rev}'
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_upgrade_idempotent(self, sqlite_engine):
|
||||
|
||||
@@ -144,8 +144,8 @@ class TestPostgreSQLMigrationUpgrade:
|
||||
# Verify revision
|
||||
rev = await get_alembic_current(postgres_engine)
|
||||
assert rev is not None, 'Expected a revision after upgrade'
|
||||
# Head should be the latest migration.
|
||||
assert rev.startswith('0006'), f'Expected head to be 0006_*, got {rev}'
|
||||
# Head should be the latest migration (0005 for current state)
|
||||
assert rev.startswith('0005'), f'Expected head to be 0005_*, got {rev}'
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_postgres_upgrade_idempotent(self, postgres_engine, clean_tables, clean_alembic_version):
|
||||
|
||||
@@ -1,207 +0,0 @@
|
||||
"""Unit tests for MonitoringService trace observability."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, Mock
|
||||
|
||||
import pytest
|
||||
import sqlalchemy
|
||||
from sqlalchemy.ext.asyncio import create_async_engine
|
||||
|
||||
from langbot.pkg.api.http.service.monitoring import MonitoringService
|
||||
from langbot.pkg.entity.persistence.base import Base
|
||||
from langbot.pkg.entity.persistence import monitoring as persistence_monitoring
|
||||
|
||||
|
||||
pytestmark = pytest.mark.asyncio
|
||||
|
||||
|
||||
class _SQLitePersistence:
|
||||
def __init__(self, engine):
|
||||
self._engine = engine
|
||||
|
||||
def get_db_engine(self):
|
||||
return self._engine
|
||||
|
||||
async def execute_async(self, *args, **kwargs):
|
||||
async with self._engine.connect() as conn:
|
||||
result = await conn.execute(*args, **kwargs)
|
||||
await conn.commit()
|
||||
return result
|
||||
|
||||
def serialize_model(self, model, data, masked_columns=None):
|
||||
masked_columns = masked_columns or []
|
||||
return {
|
||||
column.name: getattr(data, column.name).isoformat()
|
||||
if isinstance(getattr(data, column.name), datetime.datetime)
|
||||
else getattr(data, column.name)
|
||||
for column in model.__table__.columns
|
||||
if column.name not in masked_columns
|
||||
}
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def monitoring_service(tmp_path):
|
||||
engine = create_async_engine(f'sqlite+aiosqlite:///{tmp_path / "monitoring.db"}')
|
||||
async with engine.begin() as conn:
|
||||
await conn.run_sync(Base.metadata.create_all)
|
||||
|
||||
ap = SimpleNamespace(
|
||||
persistence_mgr=_SQLitePersistence(engine),
|
||||
instance_config=SimpleNamespace(data={'database': {'use': 'sqlite'}}),
|
||||
logger=Mock(),
|
||||
)
|
||||
service = MonitoringService(ap)
|
||||
yield service
|
||||
await engine.dispose()
|
||||
|
||||
|
||||
async def test_trace_lifecycle_records_spans_and_returns_details(monitoring_service):
|
||||
started_at = datetime.datetime(2026, 1, 1, 12, 0, 0)
|
||||
ended_at = started_at + datetime.timedelta(milliseconds=125)
|
||||
|
||||
trace_id = await monitoring_service.start_trace(
|
||||
trace_id='trace-test',
|
||||
name='Pipeline query',
|
||||
bot_id='bot-1',
|
||||
bot_name='Bot',
|
||||
pipeline_id='pipeline-1',
|
||||
pipeline_name='Default',
|
||||
session_id='session-1',
|
||||
message_id='message-1',
|
||||
query_id=42,
|
||||
attributes={'source': 'unit-test'},
|
||||
)
|
||||
assert trace_id == 'trace-test'
|
||||
|
||||
root_span_id = await monitoring_service.record_span(
|
||||
trace_id=trace_id,
|
||||
span_id='span-root',
|
||||
name='Pipeline',
|
||||
kind='pipeline',
|
||||
status='completed',
|
||||
started_at=started_at,
|
||||
ended_at=ended_at,
|
||||
message_id='message-1',
|
||||
session_id='session-1',
|
||||
bot_id='bot-1',
|
||||
pipeline_id='pipeline-1',
|
||||
attributes={'stage_count': 2},
|
||||
)
|
||||
await monitoring_service.record_span(
|
||||
trace_id=trace_id,
|
||||
span_id='span-rag',
|
||||
parent_span_id=root_span_id,
|
||||
name='RAG retrieval',
|
||||
kind='rag.retrieval',
|
||||
status='failed',
|
||||
started_at=started_at + datetime.timedelta(seconds=1),
|
||||
duration=12.7,
|
||||
attributes={'top_k': 5},
|
||||
error_message='vector store timeout',
|
||||
)
|
||||
await monitoring_service.finish_trace(
|
||||
trace_id,
|
||||
status='completed',
|
||||
duration=250,
|
||||
message_id='message-final',
|
||||
attributes={'result_type': 'reply'},
|
||||
)
|
||||
|
||||
traces, total = await monitoring_service.get_traces(
|
||||
bot_ids=['bot-1'],
|
||||
pipeline_ids=['pipeline-1'],
|
||||
session_ids=['session-1'],
|
||||
statuses=['success'],
|
||||
limit=10,
|
||||
offset=0,
|
||||
)
|
||||
|
||||
assert total == 1
|
||||
assert traces[0]['trace_id'] == trace_id
|
||||
assert traces[0]['status'] == 'success'
|
||||
assert traces[0]['message_id'] == 'message-final'
|
||||
assert traces[0]['query_id'] == '42'
|
||||
assert traces[0]['attributes'] == {'result_type': 'reply'}
|
||||
|
||||
details = await monitoring_service.get_trace_details(trace_id)
|
||||
assert details['found'] is True
|
||||
assert details['trace']['trace_id'] == trace_id
|
||||
assert [span['span_id'] for span in details['spans']] == ['span-root', 'span-rag']
|
||||
assert details['spans'][0]['status'] == 'success'
|
||||
assert details['spans'][0]['duration'] == 125
|
||||
assert details['spans'][0]['attributes'] == {'stage_count': 2}
|
||||
assert details['spans'][1]['status'] == 'error'
|
||||
assert details['spans'][1]['duration'] == 13
|
||||
assert details['spans'][1]['parent_span_id'] == 'span-root'
|
||||
assert details['spans'][1]['error_message'] == 'vector store timeout'
|
||||
|
||||
|
||||
async def test_get_trace_details_returns_not_found_for_missing_trace(monitoring_service):
|
||||
details = await monitoring_service.get_trace_details('trace-missing')
|
||||
|
||||
assert details == {'trace_id': 'trace-missing', 'found': False}
|
||||
|
||||
|
||||
async def test_cleanup_expired_records_includes_traces_and_spans(monitoring_service):
|
||||
old_time = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - datetime.timedelta(days=30)
|
||||
recent_time = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
|
||||
|
||||
await monitoring_service.ap.persistence_mgr.execute_async(
|
||||
sqlalchemy.insert(persistence_monitoring.MonitoringTrace),
|
||||
[
|
||||
{
|
||||
'trace_id': 'trace-old',
|
||||
'started_at': old_time,
|
||||
'ended_at': old_time,
|
||||
'duration': 10,
|
||||
'status': 'success',
|
||||
'name': 'Old trace',
|
||||
},
|
||||
{
|
||||
'trace_id': 'trace-recent',
|
||||
'started_at': recent_time,
|
||||
'ended_at': recent_time,
|
||||
'duration': 10,
|
||||
'status': 'success',
|
||||
'name': 'Recent trace',
|
||||
},
|
||||
],
|
||||
)
|
||||
await monitoring_service.ap.persistence_mgr.execute_async(
|
||||
sqlalchemy.insert(persistence_monitoring.MonitoringSpan),
|
||||
[
|
||||
{
|
||||
'span_id': 'span-old',
|
||||
'trace_id': 'trace-old',
|
||||
'name': 'Old span',
|
||||
'kind': 'pipeline',
|
||||
'status': 'success',
|
||||
'started_at': old_time,
|
||||
'ended_at': old_time,
|
||||
},
|
||||
{
|
||||
'span_id': 'span-recent',
|
||||
'trace_id': 'trace-recent',
|
||||
'name': 'Recent span',
|
||||
'kind': 'pipeline',
|
||||
'status': 'success',
|
||||
'started_at': recent_time,
|
||||
'ended_at': recent_time,
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
monitoring_service._release_sqlite_space = AsyncMock()
|
||||
|
||||
deleted = await monitoring_service.cleanup_expired_records(retention_days=7, batch_size=1)
|
||||
|
||||
assert deleted['monitoring_traces'] == 1
|
||||
assert deleted['monitoring_spans'] == 1
|
||||
monitoring_service._release_sqlite_space.assert_awaited_once()
|
||||
|
||||
remaining = await monitoring_service.get_trace_details('trace-recent')
|
||||
assert remaining['found'] is True
|
||||
assert remaining['spans'][0]['span_id'] == 'span-recent'
|
||||
@@ -1,111 +0,0 @@
|
||||
"""Unit tests for monitoring trace HTTP routes."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import datetime
|
||||
from unittest.mock import AsyncMock, Mock
|
||||
|
||||
import pytest
|
||||
import quart
|
||||
|
||||
from tests.factories import FakeApp
|
||||
from tests.utils.import_isolation import MockLifecycleControlScope, isolated_sys_modules
|
||||
|
||||
|
||||
pytestmark = pytest.mark.asyncio
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
async def monitoring_client():
|
||||
mock_app = Mock()
|
||||
mock_app.Application = type('FakeMinimalApplication', (), {})
|
||||
mock_entities = Mock()
|
||||
mock_entities.LifecycleControlScope = MockLifecycleControlScope
|
||||
|
||||
clear = [
|
||||
'langbot.pkg.api.http.controller.group',
|
||||
'langbot.pkg.api.http.controller.groups',
|
||||
'langbot.pkg.api.http.controller.groups.monitoring',
|
||||
'langbot.pkg.api.http.controller.main',
|
||||
]
|
||||
|
||||
app = FakeApp()
|
||||
app.user_service = Mock()
|
||||
app.user_service.verify_jwt_token = AsyncMock(return_value='test@example.com')
|
||||
app.user_service.get_user_by_email = AsyncMock(return_value=Mock(email='test@example.com'))
|
||||
|
||||
app.monitoring_service = Mock()
|
||||
app.monitoring_service.get_traces = AsyncMock(return_value=([{'trace_id': 'trace-1'}], 1))
|
||||
app.monitoring_service.get_trace_details = AsyncMock(
|
||||
side_effect=lambda trace_id: {
|
||||
'found': trace_id == 'trace-1',
|
||||
'trace_id': trace_id,
|
||||
'trace': {'trace_id': trace_id} if trace_id == 'trace-1' else None,
|
||||
'spans': [] if trace_id == 'trace-1' else None,
|
||||
}
|
||||
)
|
||||
|
||||
with isolated_sys_modules(
|
||||
mocks={
|
||||
'langbot.pkg.core.app': mock_app,
|
||||
'langbot.pkg.core.entities': mock_entities,
|
||||
},
|
||||
clear=clear,
|
||||
):
|
||||
from langbot.pkg.api.http.controller.groups.monitoring import MonitoringRouterGroup
|
||||
|
||||
quart_app = quart.Quart(__name__)
|
||||
group = MonitoringRouterGroup(app, quart_app)
|
||||
await group.initialize()
|
||||
|
||||
yield app, quart_app.test_client()
|
||||
|
||||
|
||||
async def test_get_traces_route_forwards_filters(monitoring_client):
|
||||
app, client = monitoring_client
|
||||
|
||||
response = await client.get(
|
||||
'/api/v1/monitoring/traces'
|
||||
'?botId=bot-1'
|
||||
'&pipelineId=pipeline-1'
|
||||
'&sessionId=session-1'
|
||||
'&status=success'
|
||||
'&startTime=2026-01-01T00:00:00Z'
|
||||
'&endTime=2026-01-02T00:00:00Z'
|
||||
'&limit=25'
|
||||
'&offset=5',
|
||||
headers={'Authorization': 'Bearer test_token'},
|
||||
)
|
||||
|
||||
assert response.status_code == 200
|
||||
data = await response.get_json()
|
||||
assert data['data'] == {
|
||||
'traces': [{'trace_id': 'trace-1'}],
|
||||
'total': 1,
|
||||
'limit': 25,
|
||||
'offset': 5,
|
||||
}
|
||||
app.monitoring_service.get_traces.assert_awaited_once_with(
|
||||
bot_ids=['bot-1'],
|
||||
pipeline_ids=['pipeline-1'],
|
||||
session_ids=['session-1'],
|
||||
statuses=['success'],
|
||||
start_time=datetime.datetime(2026, 1, 1, 0, 0),
|
||||
end_time=datetime.datetime(2026, 1, 2, 0, 0),
|
||||
limit=25,
|
||||
offset=5,
|
||||
)
|
||||
|
||||
|
||||
async def test_get_trace_details_route_returns_404_for_missing_trace(monitoring_client):
|
||||
_app, client = monitoring_client
|
||||
|
||||
response = await client.get(
|
||||
'/api/v1/monitoring/traces/trace-missing',
|
||||
headers={'Authorization': 'Bearer test_token'},
|
||||
)
|
||||
|
||||
assert response.status_code == 404
|
||||
data = await response.get_json()
|
||||
assert data['code'] == -1
|
||||
assert data['msg'] == 'Trace trace-missing not found'
|
||||
@@ -1556,3 +1556,255 @@ class TestBuildSkillExtraMounts:
|
||||
service = BoxService(app, client=Mock(spec=BoxRuntimeClient))
|
||||
|
||||
assert service.build_skill_extra_mounts(make_query()) == []
|
||||
|
||||
|
||||
# ── Attachment passthrough (inbound / outbound) ─────────────────────────────
|
||||
|
||||
|
||||
class TestAttachmentHelpers:
|
||||
def test_sanitize_attachment_name_strips_traversal(self):
|
||||
assert BoxService._sanitize_attachment_name('../../etc/passwd', 'fb') == 'passwd'
|
||||
assert BoxService._sanitize_attachment_name('/a/b/c.png', 'fb') == 'c.png'
|
||||
assert BoxService._sanitize_attachment_name('a b c.txt', 'fb') == 'a_b_c.txt'
|
||||
assert BoxService._sanitize_attachment_name('', 'fallback.bin') == 'fallback.bin'
|
||||
assert BoxService._sanitize_attachment_name('...', 'fb.bin') == 'fb.bin'
|
||||
# weird unicode / shell chars dropped, but keeps a usable name
|
||||
out = BoxService._sanitize_attachment_name('rm -rf $(x).png', 'fb')
|
||||
assert '/' not in out and '$' not in out and out.endswith('.png')
|
||||
|
||||
def test_classify_outbound_entries_by_extension(self):
|
||||
entries = [
|
||||
{'name': 'chart.png', 'b64': 'AAA'},
|
||||
{'name': 'clip.mp3', 'b64': 'BBB'},
|
||||
{'name': 'report.pdf', 'b64': 'CCC'},
|
||||
{'name': 'sub/dir/photo.JPG', 'b64': 'DDD'},
|
||||
{'name': 'noext', 'b64': 'EEE'},
|
||||
{'name': 'skip', 'b64': ''}, # dropped (no payload)
|
||||
]
|
||||
out = BoxService._classify_outbound_entries(entries)
|
||||
by_name = {a['name']: a for a in out}
|
||||
assert by_name['chart.png']['type'] == 'Image'
|
||||
assert by_name['chart.png']['base64'].startswith('data:image/png;base64,')
|
||||
assert by_name['clip.mp3']['type'] == 'Voice'
|
||||
assert by_name['clip.mp3']['base64'].startswith('data:audio/mp3;base64,')
|
||||
assert by_name['report.pdf']['type'] == 'File'
|
||||
assert by_name['report.pdf']['base64'] == 'CCC' # raw b64, no data: prefix
|
||||
# nested path collapses to basename, case-insensitive ext
|
||||
assert by_name['photo.JPG']['type'] == 'Image'
|
||||
assert by_name['noext']['type'] == 'File'
|
||||
assert 'skip' not in by_name
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_component_to_bytes_from_data_uri(self):
|
||||
import base64
|
||||
|
||||
raw = b'hello-bytes'
|
||||
data_uri = 'data:text/plain;base64,' + base64.b64encode(raw).decode()
|
||||
component = SimpleNamespace(base64=data_uri, url=None, path=None)
|
||||
result = await BoxService._component_to_bytes(component)
|
||||
assert result is not None
|
||||
data, mime = result
|
||||
assert data == raw
|
||||
assert mime == 'text/plain'
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_component_to_bytes_returns_none_when_empty(self):
|
||||
component = SimpleNamespace(base64=None, url=None, path=None)
|
||||
assert await BoxService._component_to_bytes(component) is None
|
||||
|
||||
|
||||
class TestInboundOutboundRoundTrip:
|
||||
def _service(self) -> BoxService:
|
||||
service = BoxService(make_app(Mock()), client=Mock(spec=BoxRuntimeClient))
|
||||
service._available = True
|
||||
return service
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_materialize_inbound_writes_and_describes(self):
|
||||
import base64
|
||||
|
||||
import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
||||
|
||||
service = self._service()
|
||||
|
||||
img_bytes = b'\x89PNG\r\n\x1a\n fake png'
|
||||
img_b64 = 'data:image/png;base64,' + base64.b64encode(img_bytes).decode()
|
||||
|
||||
query = make_query()
|
||||
query.message_chain = platform_message.MessageChain(
|
||||
[
|
||||
platform_message.Plain(text='please resize this'),
|
||||
platform_message.Image(base64=img_b64),
|
||||
]
|
||||
)
|
||||
|
||||
# Mock the sandbox write path: echo back the written paths.
|
||||
async def fake_execute_tool(parameters, q):
|
||||
assert '/workspace/inbox/' in parameters['command']
|
||||
return {
|
||||
'ok': True,
|
||||
'stdout': '["/workspace/inbox/42/image_1.png"]',
|
||||
'stderr': '',
|
||||
}
|
||||
|
||||
service.execute_tool = AsyncMock(side_effect=fake_execute_tool)
|
||||
|
||||
descriptors = await service.materialize_inbound_attachments(query)
|
||||
assert len(descriptors) == 1
|
||||
d = descriptors[0]
|
||||
assert d['type'] == 'Image'
|
||||
assert d['path'] == '/workspace/inbox/42/image_1.png'
|
||||
assert d['size'] == len(img_bytes)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_materialize_inbound_noop_without_attachments(self):
|
||||
import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
||||
|
||||
service = self._service()
|
||||
query = make_query()
|
||||
query.message_chain = platform_message.MessageChain([platform_message.Plain(text='just text')])
|
||||
service.execute_tool = AsyncMock()
|
||||
assert await service.materialize_inbound_attachments(query) == []
|
||||
service.execute_tool.assert_not_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_collect_outbound_reads_and_clears(self):
|
||||
service = self._service()
|
||||
query = make_query()
|
||||
|
||||
calls = []
|
||||
|
||||
async def fake_execute_tool(parameters, q):
|
||||
calls.append(parameters['command'])
|
||||
if 'os.walk' in parameters['command']:
|
||||
return {
|
||||
'ok': True,
|
||||
'stdout': '[{"name": "out.png", "b64": "QUJD"}]',
|
||||
'stderr': '',
|
||||
}
|
||||
# the rm -rf cleanup call
|
||||
return {'ok': True, 'stdout': '', 'stderr': ''}
|
||||
|
||||
service.execute_tool = AsyncMock(side_effect=fake_execute_tool)
|
||||
|
||||
attachments = await service.collect_outbound_attachments(query)
|
||||
assert len(attachments) == 1
|
||||
assert attachments[0]['type'] == 'Image'
|
||||
assert attachments[0]['name'] == 'out.png'
|
||||
# cleanup (rm -rf) must have been issued after a successful collection
|
||||
assert any('rm -rf' in c for c in calls)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_collect_outbound_empty_no_cleanup(self):
|
||||
service = self._service()
|
||||
query = make_query()
|
||||
|
||||
calls = []
|
||||
|
||||
async def fake_execute_tool(parameters, q):
|
||||
calls.append(parameters['command'])
|
||||
return {'ok': True, 'stdout': '[]', 'stderr': ''}
|
||||
|
||||
service.execute_tool = AsyncMock(side_effect=fake_execute_tool)
|
||||
assert await service.collect_outbound_attachments(query) == []
|
||||
assert not any('rm -rf' in c for c in calls)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_passthrough_noop_when_unavailable(self):
|
||||
service = BoxService(make_app(Mock()), client=Mock(spec=BoxRuntimeClient))
|
||||
service._available = False
|
||||
query = make_query()
|
||||
assert await service.materialize_inbound_attachments(query) == []
|
||||
assert await service.collect_outbound_attachments(query) == []
|
||||
|
||||
|
||||
class TestAttachmentHostPath:
|
||||
"""Direct host-filesystem transfer path (bind-mounted workspace).
|
||||
|
||||
When ``default_workspace`` is a real local dir, inbound/outbound bypass the
|
||||
exec channel entirely (no ARG_MAX / stdout-truncation limits) and read/write
|
||||
the bind-mounted host dir directly.
|
||||
"""
|
||||
|
||||
def _service_with_workspace(self, tmp_path):
|
||||
ws = str(tmp_path / 'box' / 'default')
|
||||
os.makedirs(ws, exist_ok=True)
|
||||
app = make_app(Mock(), allowed_mount_roots=[str(tmp_path)], host_root=str(tmp_path / 'box'))
|
||||
service = BoxService(app, client=Mock(spec=BoxRuntimeClient))
|
||||
service._available = True
|
||||
# Force the default_workspace to our tmp dir so _host_query_dir resolves.
|
||||
service.default_workspace = ws
|
||||
return service, ws
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inbound_writes_to_host_no_exec(self, tmp_path):
|
||||
import base64
|
||||
|
||||
import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
||||
|
||||
service, ws = self._service_with_workspace(tmp_path)
|
||||
# Big payload that would blow ARG_MAX on the exec path:
|
||||
big = b'\x89PNG\r\n\x1a\n' + b'x' * (300 * 1024)
|
||||
b64 = 'data:image/png;base64,' + base64.b64encode(big).decode()
|
||||
query = make_query()
|
||||
query.message_chain = platform_message.MessageChain([platform_message.Image(base64=b64)])
|
||||
# execute_tool must NOT be called on the host path.
|
||||
service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used on host path'))
|
||||
|
||||
descriptors = await service.materialize_inbound_attachments(query)
|
||||
assert len(descriptors) == 1
|
||||
d = descriptors[0]
|
||||
assert d['type'] == 'Image'
|
||||
assert d['size'] == len(big)
|
||||
# File actually landed on the host workspace.
|
||||
host_file = os.path.join(ws, 'inbox', str(query.query_id), d['name'])
|
||||
assert os.path.isfile(host_file)
|
||||
assert open(host_file, 'rb').read() == big
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inbound_host_clears_stale_query_dir(self, tmp_path):
|
||||
import base64
|
||||
|
||||
import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
||||
|
||||
service, ws = self._service_with_workspace(tmp_path)
|
||||
# Seed a stale file under the same query_id (simulates webchat id reuse).
|
||||
stale_dir = os.path.join(ws, 'inbox', '42')
|
||||
os.makedirs(stale_dir, exist_ok=True)
|
||||
open(os.path.join(stale_dir, 'image_1.png'), 'wb').write(b'STALE-OLD-IMAGE')
|
||||
|
||||
new = b'\x89PNG\r\n\x1a\n NEW'
|
||||
b64 = 'data:image/png;base64,' + base64.b64encode(new).decode()
|
||||
query = make_query(query_id=42)
|
||||
query.message_chain = platform_message.MessageChain([platform_message.Image(base64=b64)])
|
||||
service.execute_tool = AsyncMock()
|
||||
descriptors = await service.materialize_inbound_attachments(query)
|
||||
# The new write recreated the dir; the stale file is gone, new bytes present.
|
||||
host_file = os.path.join(stale_dir, descriptors[0]['name'])
|
||||
assert open(host_file, 'rb').read() == new
|
||||
# No leftover content from the stale image.
|
||||
assert b'STALE-OLD-IMAGE' not in open(host_file, 'rb').read()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_outbound_reads_host_and_clears(self, tmp_path):
|
||||
service, ws = self._service_with_workspace(tmp_path)
|
||||
query = make_query()
|
||||
outbox = os.path.join(ws, 'outbox', str(query.query_id))
|
||||
os.makedirs(outbox, exist_ok=True)
|
||||
# A large file that would be truncated on the exec/stdout path:
|
||||
big_png = b'\x89PNG\r\n\x1a\n' + b'y' * (400 * 1024)
|
||||
open(os.path.join(outbox, 'result.png'), 'wb').write(big_png)
|
||||
open(os.path.join(outbox, 'notes.txt'), 'wb').write(b'hello')
|
||||
|
||||
service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used on host path'))
|
||||
attachments = await service.collect_outbound_attachments(query)
|
||||
by_name = {a['name']: a for a in attachments}
|
||||
assert by_name['result.png']['type'] == 'Image'
|
||||
assert by_name['notes.txt']['type'] == 'File'
|
||||
# Full image survived (no truncation).
|
||||
import base64
|
||||
|
||||
raw = base64.b64decode(by_name['result.png']['base64'].split(',', 1)[-1])
|
||||
assert raw == big_png
|
||||
# Outbox cleared after collection.
|
||||
assert os.listdir(outbox) == []
|
||||
|
||||
@@ -1,87 +0,0 @@
|
||||
"""Unit tests for the monitoring trace Alembic migration."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from importlib import import_module
|
||||
|
||||
|
||||
class _FakeInspector:
|
||||
def __init__(self, tables):
|
||||
self._tables = tables
|
||||
|
||||
def get_table_names(self):
|
||||
return list(self._tables)
|
||||
|
||||
|
||||
class _FakeOp:
|
||||
def __init__(self):
|
||||
self.created_tables = []
|
||||
self.created_indexes = []
|
||||
self.dropped_tables = []
|
||||
|
||||
def get_bind(self):
|
||||
return object()
|
||||
|
||||
def create_table(self, table_name, *columns):
|
||||
self.created_tables.append((table_name, columns))
|
||||
|
||||
def create_index(self, index_name, table_name, columns):
|
||||
self.created_indexes.append((index_name, table_name, columns))
|
||||
|
||||
def drop_table(self, table_name):
|
||||
self.dropped_tables.append(table_name)
|
||||
|
||||
|
||||
def _migration_module():
|
||||
return import_module('langbot.pkg.persistence.alembic.versions.0006_monitoring_traces')
|
||||
|
||||
|
||||
def test_upgrade_creates_monitoring_trace_tables_and_indexes(monkeypatch):
|
||||
migration = _migration_module()
|
||||
fake_op = _FakeOp()
|
||||
|
||||
monkeypatch.setattr(migration, 'op', fake_op)
|
||||
monkeypatch.setattr(migration.sa, 'inspect', lambda _conn: _FakeInspector(tables=set()))
|
||||
|
||||
migration.upgrade()
|
||||
|
||||
assert [table_name for table_name, _columns in fake_op.created_tables] == [
|
||||
'monitoring_traces',
|
||||
'monitoring_spans',
|
||||
]
|
||||
assert ('ix_monitoring_traces_started_at', 'monitoring_traces', ['started_at']) in fake_op.created_indexes
|
||||
assert ('ix_monitoring_spans_trace_id', 'monitoring_spans', ['trace_id']) in fake_op.created_indexes
|
||||
assert ('ix_monitoring_spans_pipeline_id', 'monitoring_spans', ['pipeline_id']) in fake_op.created_indexes
|
||||
|
||||
|
||||
def test_upgrade_skips_existing_monitoring_trace_tables(monkeypatch):
|
||||
migration = _migration_module()
|
||||
fake_op = _FakeOp()
|
||||
|
||||
monkeypatch.setattr(migration, 'op', fake_op)
|
||||
monkeypatch.setattr(
|
||||
migration.sa,
|
||||
'inspect',
|
||||
lambda _conn: _FakeInspector(tables={'monitoring_traces', 'monitoring_spans'}),
|
||||
)
|
||||
|
||||
migration.upgrade()
|
||||
|
||||
assert fake_op.created_tables == []
|
||||
assert fake_op.created_indexes == []
|
||||
|
||||
|
||||
def test_downgrade_drops_spans_before_traces(monkeypatch):
|
||||
migration = _migration_module()
|
||||
fake_op = _FakeOp()
|
||||
|
||||
monkeypatch.setattr(migration, 'op', fake_op)
|
||||
monkeypatch.setattr(
|
||||
migration.sa,
|
||||
'inspect',
|
||||
lambda _conn: _FakeInspector(tables={'monitoring_traces', 'monitoring_spans'}),
|
||||
)
|
||||
|
||||
migration.downgrade()
|
||||
|
||||
assert fake_op.dropped_tables == ['monitoring_spans', 'monitoring_traces']
|
||||
@@ -162,61 +162,3 @@ async def test_runtime_pipeline_execute(mock_app, sample_query):
|
||||
|
||||
# Verify stage was called
|
||||
mock_stage.process.assert_called_once()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_runtime_pipeline_marks_trace_error_when_stage_returns_error_notice(mock_app, sample_query):
|
||||
"""Trace status follows handled stage errors, not only raised exceptions."""
|
||||
pipelinemgr = get_pipelinemgr_module()
|
||||
stage = get_stage_module()
|
||||
persistence_pipeline = get_persistence_pipeline_module()
|
||||
entities = get_entities_module()
|
||||
|
||||
error_result = entities.StageProcessResult(
|
||||
result_type=entities.ResultType.INTERRUPT,
|
||||
new_query=sample_query,
|
||||
user_notice='',
|
||||
console_notice='',
|
||||
debug_notice='traceback',
|
||||
error_notice='model request failed',
|
||||
)
|
||||
|
||||
mock_stage = Mock(spec=stage.PipelineStage)
|
||||
mock_stage.process = AsyncMock(return_value=error_result)
|
||||
stage_container = pipelinemgr.StageInstContainer(inst_name='FailingStage', inst=mock_stage)
|
||||
|
||||
pipeline_entity = Mock(spec=persistence_pipeline.LegacyPipeline)
|
||||
pipeline_entity.uuid = 'test-pipeline-uuid'
|
||||
pipeline_entity.name = 'Test Pipeline'
|
||||
pipeline_entity.config = sample_query.pipeline_config
|
||||
pipeline_entity.extensions_preferences = {'plugins': []}
|
||||
|
||||
mock_app.bot_service = AsyncMock()
|
||||
mock_app.bot_service.get_bot = AsyncMock(return_value={'name': 'Test Bot'})
|
||||
mock_app.monitoring_service = AsyncMock()
|
||||
mock_app.monitoring_service.record_message = AsyncMock(return_value='message-1')
|
||||
mock_app.monitoring_service.update_session_activity = AsyncMock(return_value=True)
|
||||
mock_app.monitoring_service.start_trace = AsyncMock(return_value='trace-1')
|
||||
mock_app.monitoring_service.record_span = AsyncMock()
|
||||
mock_app.monitoring_service.finish_trace = AsyncMock()
|
||||
mock_app.monitoring_service.update_message_status = AsyncMock()
|
||||
mock_app.monitoring_service.record_error = AsyncMock()
|
||||
|
||||
event_ctx = Mock()
|
||||
event_ctx.is_prevented_default = Mock(return_value=False)
|
||||
mock_app.plugin_connector.emit_event = AsyncMock(return_value=event_ctx)
|
||||
mock_app.query_pool.cached_queries[sample_query.query_id] = sample_query
|
||||
|
||||
runtime_pipeline = pipelinemgr.RuntimePipeline(mock_app, pipeline_entity, [stage_container])
|
||||
|
||||
await runtime_pipeline.run(sample_query)
|
||||
|
||||
mock_app.monitoring_service.finish_trace.assert_awaited_once()
|
||||
assert mock_app.monitoring_service.finish_trace.await_args.kwargs['status'] == 'error'
|
||||
|
||||
span_calls = mock_app.monitoring_service.record_span.await_args_list
|
||||
stage_span_call = next(call for call in span_calls if call.kwargs['name'] == 'FailingStage')
|
||||
root_span_call = next(call for call in span_calls if call.kwargs['kind'] == 'pipeline.query')
|
||||
assert stage_span_call.kwargs['status'] == 'error'
|
||||
assert stage_span_call.kwargs['error_message'] == 'model request failed'
|
||||
assert root_span_call.kwargs['status'] == 'error'
|
||||
|
||||
@@ -0,0 +1,146 @@
|
||||
"""Unit tests for ResponseWrapper outbound-attachment helpers.
|
||||
|
||||
Covers the sandbox -> user attachment path added for the Box attachment
|
||||
round-trip:
|
||||
|
||||
* ``_is_final_assistant_message`` — only the terminal, tool-call-free assistant
|
||||
message (or a final MessageChunk) should trigger collection.
|
||||
* ``_append_outbound_attachments`` — collects sandbox outbox files exactly once
|
||||
per query and maps each descriptor to the right platform component, swallowing
|
||||
collection errors.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, Mock
|
||||
|
||||
import pytest
|
||||
|
||||
import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
|
||||
from langbot.pkg.pipeline.wrapper.wrapper import ResponseWrapper
|
||||
|
||||
|
||||
def _make_wrapper(box_service) -> ResponseWrapper:
|
||||
app = SimpleNamespace(logger=Mock())
|
||||
wrapper = ResponseWrapper.__new__(ResponseWrapper)
|
||||
wrapper.ap = app
|
||||
return wrapper
|
||||
|
||||
|
||||
def _make_query():
|
||||
return SimpleNamespace(variables={})
|
||||
|
||||
|
||||
def test_is_final_assistant_message_plain_assistant():
|
||||
wrapper = _make_wrapper(box_service=None)
|
||||
msg = provider_message.Message(role='assistant', content='done')
|
||||
assert wrapper._is_final_assistant_message(msg) is True
|
||||
|
||||
|
||||
def test_is_final_assistant_message_rejects_non_assistant():
|
||||
wrapper = _make_wrapper(box_service=None)
|
||||
msg = provider_message.Message(role='tool', content='{}')
|
||||
assert wrapper._is_final_assistant_message(msg) is False
|
||||
|
||||
|
||||
def test_is_final_assistant_message_rejects_tool_call_round():
|
||||
wrapper = _make_wrapper(box_service=None)
|
||||
msg = provider_message.Message(
|
||||
role='assistant',
|
||||
content='calling',
|
||||
tool_calls=[
|
||||
provider_message.ToolCall(
|
||||
id='c1',
|
||||
type='function',
|
||||
function=provider_message.FunctionCall(name='exec', arguments='{}'),
|
||||
)
|
||||
],
|
||||
)
|
||||
assert wrapper._is_final_assistant_message(msg) is False
|
||||
|
||||
|
||||
def test_is_final_assistant_message_non_final_chunk():
|
||||
wrapper = _make_wrapper(box_service=None)
|
||||
chunk = provider_message.MessageChunk(role='assistant', content='partial', is_final=False)
|
||||
assert wrapper._is_final_assistant_message(chunk) is False
|
||||
|
||||
final_chunk = provider_message.MessageChunk(role='assistant', content='partial', is_final=True)
|
||||
assert wrapper._is_final_assistant_message(final_chunk) is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_append_outbound_attachments_maps_each_type():
|
||||
box_service = SimpleNamespace(
|
||||
available=True,
|
||||
collect_outbound_attachments=AsyncMock(
|
||||
return_value=[
|
||||
{'type': 'Image', 'base64': 'data:image/png;base64,iVBORw0K'},
|
||||
{'type': 'Voice', 'base64': 'data:audio/wav;base64,UklGRg=='},
|
||||
{'type': 'File', 'name': 'report.xlsx', 'base64': 'data:app;base64,UEsDBA=='},
|
||||
]
|
||||
),
|
||||
)
|
||||
wrapper = _make_wrapper(box_service)
|
||||
wrapper.ap.box_service = box_service
|
||||
query = _make_query()
|
||||
chain = platform_message.MessageChain([])
|
||||
|
||||
await wrapper._append_outbound_attachments(query, chain)
|
||||
|
||||
kinds = [type(c).__name__ for c in chain]
|
||||
assert kinds == ['Image', 'Voice', 'File']
|
||||
assert query.variables['_sandbox_outbound_collected'] is True
|
||||
# File keeps its name
|
||||
file_comp = chain[2]
|
||||
assert getattr(file_comp, 'name', None) == 'report.xlsx'
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_append_outbound_attachments_runs_once_per_query():
|
||||
box_service = SimpleNamespace(
|
||||
available=True,
|
||||
collect_outbound_attachments=AsyncMock(return_value=[]),
|
||||
)
|
||||
wrapper = _make_wrapper(box_service)
|
||||
wrapper.ap.box_service = box_service
|
||||
query = _make_query()
|
||||
query.variables['_sandbox_outbound_collected'] = True
|
||||
chain = platform_message.MessageChain([])
|
||||
|
||||
await wrapper._append_outbound_attachments(query, chain)
|
||||
|
||||
box_service.collect_outbound_attachments.assert_not_awaited()
|
||||
assert len(chain) == 0
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_append_outbound_attachments_noop_without_box_service():
|
||||
wrapper = _make_wrapper(box_service=None)
|
||||
wrapper.ap.box_service = None
|
||||
query = _make_query()
|
||||
chain = platform_message.MessageChain([])
|
||||
|
||||
await wrapper._append_outbound_attachments(query, chain)
|
||||
assert len(chain) == 0
|
||||
# not marked collected, since service is unavailable
|
||||
assert '_sandbox_outbound_collected' not in query.variables
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_append_outbound_attachments_swallows_collection_error():
|
||||
box_service = SimpleNamespace(
|
||||
available=True,
|
||||
collect_outbound_attachments=AsyncMock(side_effect=RuntimeError('boom')),
|
||||
)
|
||||
wrapper = _make_wrapper(box_service)
|
||||
wrapper.ap.box_service = box_service
|
||||
query = _make_query()
|
||||
chain = platform_message.MessageChain([])
|
||||
|
||||
# must not raise
|
||||
await wrapper._append_outbound_attachments(query, chain)
|
||||
assert len(chain) == 0
|
||||
wrapper.ap.logger.warning.assert_called_once()
|
||||
@@ -0,0 +1,92 @@
|
||||
"""Unit tests for WebSocketAdapter._process_image_components.
|
||||
|
||||
The web debug client uploads Image / Voice / File components carrying a storage
|
||||
key in ``path``. This helper resolves each to a base64 data URI (so multimodal
|
||||
LLM input and the Box sandbox inbox have usable bytes), then deletes the
|
||||
consumed storage object and clears ``path``. Covers mimetype selection per
|
||||
type and graceful error handling.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
from unittest.mock import AsyncMock, Mock
|
||||
|
||||
import pytest
|
||||
|
||||
from langbot.pkg.platform.sources.websocket_adapter import WebSocketAdapter
|
||||
|
||||
|
||||
def _make_adapter(load_return=b'hello', load_side_effect=None):
|
||||
provider = Mock()
|
||||
provider.load = AsyncMock(return_value=load_return, side_effect=load_side_effect)
|
||||
provider.delete = AsyncMock()
|
||||
ap = Mock()
|
||||
ap.storage_mgr.storage_provider = provider
|
||||
logger = Mock()
|
||||
logger.error = AsyncMock()
|
||||
# WebSocketAdapter is a pydantic model; bypass full __init__/validation.
|
||||
adapter = WebSocketAdapter.model_construct(ap=ap, logger=logger)
|
||||
return adapter, provider
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_image_jpeg_mimetype_and_cleanup():
|
||||
adapter, provider = _make_adapter(load_return=b'\xff\xd8\xff')
|
||||
chain = [{'type': 'Image', 'path': 'storage://abc/photo.jpg'}]
|
||||
|
||||
await adapter._process_image_components(chain)
|
||||
|
||||
expected_b64 = base64.b64encode(b'\xff\xd8\xff').decode('utf-8')
|
||||
assert chain[0]['base64'] == f'data:image/jpeg;base64,{expected_b64}'
|
||||
assert chain[0]['path'] == '' # consumed
|
||||
provider.delete.assert_awaited_once_with('storage://abc/photo.jpg')
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_image_defaults_to_png():
|
||||
adapter, _ = _make_adapter()
|
||||
chain = [{'type': 'Image', 'path': 'storage://abc/blob'}]
|
||||
await adapter._process_image_components(chain)
|
||||
assert chain[0]['base64'].startswith('data:image/png;base64,')
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_voice_uses_guessed_or_wav_mimetype():
|
||||
adapter, _ = _make_adapter()
|
||||
chain = [{'type': 'Voice', 'path': 'storage://abc/clip.wav'}]
|
||||
await adapter._process_image_components(chain)
|
||||
assert chain[0]['base64'].startswith('data:audio/')
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_file_uses_octet_stream_fallback():
|
||||
adapter, _ = _make_adapter()
|
||||
chain = [{'type': 'File', 'path': 'storage://abc/unknownblob'}]
|
||||
await adapter._process_image_components(chain)
|
||||
assert chain[0]['base64'].startswith('data:application/octet-stream;base64,')
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_skips_components_without_path_or_unknown_type():
|
||||
adapter, provider = _make_adapter()
|
||||
chain = [
|
||||
{'type': 'Image', 'path': ''}, # no path
|
||||
{'type': 'Plain', 'path': 'storage://abc/x'}, # not a file component
|
||||
{'type': 'At', 'target': '123'}, # no path key at all
|
||||
]
|
||||
await adapter._process_image_components(chain)
|
||||
provider.load.assert_not_awaited()
|
||||
assert 'base64' not in chain[0]
|
||||
assert 'base64' not in chain[1]
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_load_failure_is_logged_not_raised():
|
||||
adapter, _ = _make_adapter(load_side_effect=RuntimeError('storage down'))
|
||||
chain = [{'type': 'File', 'path': 'storage://abc/doc.pdf'}]
|
||||
|
||||
# must not raise
|
||||
await adapter._process_image_components(chain)
|
||||
assert 'base64' not in chain[0]
|
||||
adapter.logger.error.assert_awaited_once()
|
||||
@@ -0,0 +1,93 @@
|
||||
"""Unit tests for LiteLLMRequester._convert_messages.
|
||||
|
||||
Focus: the content-part normalization that (a) converts image_base64 parts to
|
||||
the OpenAI image_url shape and (b) drops non-image file parts (file_base64 /
|
||||
file_url) which OpenAI-compatible chat models reject. The latter is essential
|
||||
for Voice/File attachments — including ones replayed from conversation history —
|
||||
since the agent consumes their bytes via the sandbox, not the model payload.
|
||||
"""
|
||||
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
|
||||
from langbot.pkg.provider.modelmgr.requesters.litellmchat import LiteLLMRequester
|
||||
|
||||
|
||||
def _make_requester() -> LiteLLMRequester:
|
||||
# _convert_messages does not touch instance config, so bypass __init__.
|
||||
return LiteLLMRequester.__new__(LiteLLMRequester)
|
||||
|
||||
|
||||
def test_convert_messages_drops_file_base64_part():
|
||||
req = _make_requester()
|
||||
msg = provider_message.Message(
|
||||
role='user',
|
||||
content=[
|
||||
provider_message.ContentElement.from_text('analyze this audio'),
|
||||
provider_message.ContentElement.from_file_base64('data:audio/wav;base64,AAAA', 'voice.wav'),
|
||||
],
|
||||
)
|
||||
out = req._convert_messages([msg])
|
||||
parts = out[0]['content']
|
||||
types = [p.get('type') for p in parts]
|
||||
assert 'file_base64' not in types
|
||||
assert types == ['text']
|
||||
assert parts[0]['text'] == 'analyze this audio'
|
||||
|
||||
|
||||
def test_convert_messages_drops_file_url_part():
|
||||
req = _make_requester()
|
||||
msg = provider_message.Message(
|
||||
role='user',
|
||||
content=[
|
||||
provider_message.ContentElement.from_text('here is a doc'),
|
||||
provider_message.ContentElement.from_file_url('http://example.com/report.xlsx', 'report.xlsx'),
|
||||
],
|
||||
)
|
||||
out = req._convert_messages([msg])
|
||||
types = [p.get('type') for p in out[0]['content']]
|
||||
assert types == ['text']
|
||||
|
||||
|
||||
def test_convert_messages_keeps_image_and_converts_to_image_url():
|
||||
req = _make_requester()
|
||||
msg = provider_message.Message(
|
||||
role='user',
|
||||
content=[
|
||||
provider_message.ContentElement.from_text('look'),
|
||||
provider_message.ContentElement.from_image_base64('data:image/png;base64,AAAA'),
|
||||
],
|
||||
)
|
||||
out = req._convert_messages([msg])
|
||||
parts = out[0]['content']
|
||||
types = [p.get('type') for p in parts]
|
||||
# image is preserved and reshaped to the OpenAI image_url form
|
||||
assert types == ['text', 'image_url']
|
||||
img_part = parts[1]
|
||||
assert img_part['image_url'] == {'url': 'data:image/png;base64,AAAA'}
|
||||
assert 'image_base64' not in img_part
|
||||
|
||||
|
||||
def test_convert_messages_mixed_history_strips_only_files():
|
||||
req = _make_requester()
|
||||
# Simulate replayed history: an old voice turn + a current text turn.
|
||||
history_voice = provider_message.Message(
|
||||
role='user',
|
||||
content=[
|
||||
provider_message.ContentElement.from_text('old audio turn'),
|
||||
provider_message.ContentElement.from_file_base64('data:audio/wav;base64,BBBB', 'voice.wav'),
|
||||
],
|
||||
)
|
||||
current = provider_message.Message(
|
||||
role='user',
|
||||
content=[provider_message.ContentElement.from_text('now do the csv')],
|
||||
)
|
||||
out = req._convert_messages([history_voice, current])
|
||||
assert [p.get('type') for p in out[0]['content']] == ['text']
|
||||
assert [p.get('type') for p in out[1]['content']] == ['text']
|
||||
|
||||
|
||||
def test_convert_messages_plain_string_content_untouched():
|
||||
req = _make_requester()
|
||||
msg = provider_message.Message(role='user', content='just text')
|
||||
out = req._convert_messages([msg])
|
||||
assert out[0]['content'] == 'just text'
|
||||
@@ -0,0 +1,146 @@
|
||||
"""Unit tests for LocalAgentRunner._inject_inbound_attachments.
|
||||
|
||||
Covers the user -> sandbox attachment path added for the Box attachment
|
||||
round-trip:
|
||||
|
||||
* materialized descriptors are stashed on the query and described to the model
|
||||
via an appended text note (in-sandbox paths + outbox convention);
|
||||
* non-image file parts (file_base64 / file_url) are stripped from the user
|
||||
message content because OpenAI-compatible chat models reject them, while
|
||||
image and text parts are kept for vision models;
|
||||
* the helper is a no-op when the box service is unavailable or yields nothing,
|
||||
and never raises into the chat turn on materialization failure.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from types import SimpleNamespace
|
||||
from unittest.mock import AsyncMock, Mock
|
||||
|
||||
import pytest
|
||||
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
|
||||
from langbot.pkg.provider.runners.localagent import LocalAgentRunner
|
||||
|
||||
|
||||
def _make_runner(box_service) -> LocalAgentRunner:
|
||||
runner = LocalAgentRunner.__new__(LocalAgentRunner)
|
||||
runner.ap = SimpleNamespace(logger=Mock(), box_service=box_service)
|
||||
return runner
|
||||
|
||||
|
||||
def _make_query():
|
||||
return SimpleNamespace(variables={}, query_id='q-123')
|
||||
|
||||
|
||||
def _box_service(attachments):
|
||||
svc = SimpleNamespace(
|
||||
available=True,
|
||||
OUTBOX_MOUNT_DIR='/outbox',
|
||||
materialize_inbound_attachments=AsyncMock(return_value=attachments),
|
||||
)
|
||||
return svc
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inject_strips_file_parts_and_appends_note():
|
||||
box = _box_service([{'type': 'Voice', 'path': '/inbox/q-123/voice.wav', 'size': 176000}])
|
||||
runner = _make_runner(box)
|
||||
query = _make_query()
|
||||
user_message = provider_message.Message(
|
||||
role='user',
|
||||
content=[
|
||||
provider_message.ContentElement.from_text('transcribe this'),
|
||||
provider_message.ContentElement.from_file_base64('data:audio/wav;base64,AAAA', 'voice.wav'),
|
||||
],
|
||||
)
|
||||
|
||||
await runner._inject_inbound_attachments(query, user_message)
|
||||
|
||||
types = [getattr(ce, 'type', None) for ce in user_message.content]
|
||||
# file_base64 dropped; text kept; sandbox-path note appended as text
|
||||
assert 'file_base64' not in types
|
||||
assert types.count('text') == 2
|
||||
note = user_message.content[-1].text
|
||||
assert '/inbox/q-123/voice.wav' in note
|
||||
assert '/outbox/q-123' in note
|
||||
# descriptors stashed for downstream stages
|
||||
assert query.variables['_sandbox_inbound_attachments'] == box.materialize_inbound_attachments.return_value
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inject_keeps_image_parts():
|
||||
box = _box_service([{'type': 'Image', 'path': '/inbox/q-123/pic.png', 'size': 1234}])
|
||||
runner = _make_runner(box)
|
||||
query = _make_query()
|
||||
user_message = provider_message.Message(
|
||||
role='user',
|
||||
content=[
|
||||
provider_message.ContentElement.from_text('what is this'),
|
||||
provider_message.ContentElement.from_image_base64('data:image/png;base64,iVBORw0K'),
|
||||
],
|
||||
)
|
||||
|
||||
await runner._inject_inbound_attachments(query, user_message)
|
||||
|
||||
types = [getattr(ce, 'type', None) for ce in user_message.content]
|
||||
assert 'image_base64' in types # vision part preserved
|
||||
assert types[-1] == 'text' # note appended last
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inject_promotes_string_content_to_list_with_note():
|
||||
box = _box_service([{'type': 'File', 'path': '/inbox/q-123/data.csv', 'size': 42}])
|
||||
runner = _make_runner(box)
|
||||
query = _make_query()
|
||||
user_message = provider_message.Message(role='user', content='clean this csv')
|
||||
|
||||
await runner._inject_inbound_attachments(query, user_message)
|
||||
|
||||
assert isinstance(user_message.content, list)
|
||||
assert [getattr(ce, 'type', None) for ce in user_message.content] == ['text', 'text']
|
||||
assert user_message.content[0].text == 'clean this csv'
|
||||
assert '/inbox/q-123/data.csv' in user_message.content[1].text
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inject_noop_without_box_service():
|
||||
runner = _make_runner(box_service=None)
|
||||
query = _make_query()
|
||||
user_message = provider_message.Message(role='user', content='hello')
|
||||
|
||||
await runner._inject_inbound_attachments(query, user_message)
|
||||
|
||||
assert user_message.content == 'hello'
|
||||
assert '_sandbox_inbound_attachments' not in query.variables
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inject_noop_when_no_attachments():
|
||||
box = _box_service([])
|
||||
runner = _make_runner(box)
|
||||
query = _make_query()
|
||||
user_message = provider_message.Message(role='user', content='hello')
|
||||
|
||||
await runner._inject_inbound_attachments(query, user_message)
|
||||
|
||||
assert user_message.content == 'hello'
|
||||
assert '_sandbox_inbound_attachments' not in query.variables
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_inject_swallows_materialization_error():
|
||||
box = SimpleNamespace(
|
||||
available=True,
|
||||
OUTBOX_MOUNT_DIR='/outbox',
|
||||
materialize_inbound_attachments=AsyncMock(side_effect=RuntimeError('disk full')),
|
||||
)
|
||||
runner = _make_runner(box)
|
||||
query = _make_query()
|
||||
user_message = provider_message.Message(role='user', content='hello')
|
||||
|
||||
# must not raise
|
||||
await runner._inject_inbound_attachments(query, user_message)
|
||||
assert user_message.content == 'hello'
|
||||
runner.ap.logger.warning.assert_called_once()
|
||||
@@ -407,32 +407,6 @@ class TestRuntimeKnowledgeBaseRetrieve:
|
||||
call_args = mock_app.plugin_connector.call_rag_retrieve.call_args
|
||||
assert call_args[0][1]['retrieval_settings']['top_k'] == 5
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_retrieve_records_host_rag_duration(self, monkeypatch):
|
||||
"""Test host RAG span duration is measured even if plugin omits it."""
|
||||
rag_module = get_rag_module()
|
||||
mock_app = create_mock_app()
|
||||
mock_app.monitoring_service = AsyncMock()
|
||||
mock_kb = create_mock_kb_entity()
|
||||
mock_app.plugin_connector.call_rag_retrieve = AsyncMock(
|
||||
return_value={'results': [], 'metadata': {'status': 'success'}}
|
||||
)
|
||||
monkeypatch.setattr(rag_module.time, 'perf_counter', Mock(side_effect=[10.0, 10.25]))
|
||||
|
||||
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
|
||||
|
||||
await runtime_kb._retrieve(
|
||||
'query text',
|
||||
{
|
||||
'_trace_context': {
|
||||
'trace_id': 'trace-1',
|
||||
'parent_span_id': 'span-root',
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
assert mock_app.monitoring_service.record_span.await_args.kwargs['duration'] == 250
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_retrieve_converts_dict_to_entry(self):
|
||||
"""Test that dict results are converted to RetrievalResultEntry."""
|
||||
|
||||
@@ -5,7 +5,6 @@ import {
|
||||
ModelCall,
|
||||
LLMCall,
|
||||
EmbeddingCall,
|
||||
MonitoringTrace,
|
||||
} from '../types/monitoring';
|
||||
import { backendClient } from '@/app/infra/http';
|
||||
import { parseUTCTimestamp } from '../utils/dateUtils';
|
||||
@@ -264,48 +263,12 @@ export function useMonitoringData(filterState: FilterState) {
|
||||
messageId: error.message_id,
|
||||
}),
|
||||
),
|
||||
traces: (response.traces || []).map(
|
||||
(trace: {
|
||||
trace_id: string;
|
||||
started_at: string;
|
||||
ended_at?: string;
|
||||
duration?: number;
|
||||
status: string;
|
||||
name: string;
|
||||
bot_id?: string;
|
||||
bot_name?: string;
|
||||
pipeline_id?: string;
|
||||
pipeline_name?: string;
|
||||
session_id?: string;
|
||||
message_id?: string;
|
||||
query_id?: string;
|
||||
attributes?: Record<string, unknown>;
|
||||
}): MonitoringTrace => ({
|
||||
traceId: trace.trace_id,
|
||||
name: trace.name,
|
||||
startedAt: parseUTCTimestamp(trace.started_at),
|
||||
endedAt: trace.ended_at
|
||||
? parseUTCTimestamp(trace.ended_at)
|
||||
: undefined,
|
||||
duration: trace.duration,
|
||||
status: trace.status as 'running' | 'success' | 'error',
|
||||
botId: trace.bot_id,
|
||||
botName: trace.bot_name,
|
||||
pipelineId: trace.pipeline_id,
|
||||
pipelineName: trace.pipeline_name,
|
||||
sessionId: trace.session_id,
|
||||
messageId: trace.message_id,
|
||||
queryId: trace.query_id,
|
||||
attributes: trace.attributes || {},
|
||||
}),
|
||||
),
|
||||
totalCount: {
|
||||
messages: response.totalCount.messages,
|
||||
llmCalls: response.totalCount.llmCalls,
|
||||
embeddingCalls: response.totalCount.embeddingCalls || 0,
|
||||
sessions: response.totalCount.sessions,
|
||||
errors: response.totalCount.errors,
|
||||
traces: response.totalCount.traces || 0,
|
||||
},
|
||||
};
|
||||
|
||||
|
||||
@@ -10,7 +10,6 @@ import {
|
||||
MessageSquare,
|
||||
Sparkles,
|
||||
CheckCircle2,
|
||||
GitBranch,
|
||||
} from 'lucide-react';
|
||||
import OverviewCards from './components/overview-cards/OverviewCards';
|
||||
import MonitoringFilters from './components/filters/MonitoringFilters';
|
||||
@@ -23,15 +22,9 @@ import { MessageDetailsCard } from './components/MessageDetailsCard';
|
||||
import { MessageContentRenderer } from './components/MessageContentRenderer';
|
||||
import { FeedbackStatsCards } from './components/FeedbackCard';
|
||||
import { FeedbackList } from './components/FeedbackList';
|
||||
import {
|
||||
MessageDetails,
|
||||
TraceDetails,
|
||||
MonitoringSpan,
|
||||
} from './types/monitoring';
|
||||
import { MessageDetails } from './types/monitoring';
|
||||
import { httpClient } from '@/app/infra/http/HttpClient';
|
||||
import { backendClient } from '@/app/infra/http';
|
||||
import { LoadingSpinner, LoadingPage } from '@/components/ui/loading-spinner';
|
||||
import { parseUTCTimestamp } from './utils/dateUtils';
|
||||
|
||||
interface RawMessageData {
|
||||
id: string;
|
||||
@@ -79,97 +72,6 @@ interface RawErrorData {
|
||||
stack_trace: string | null;
|
||||
}
|
||||
|
||||
interface RawTraceData {
|
||||
trace_id: string;
|
||||
started_at: string;
|
||||
ended_at?: string;
|
||||
duration?: number;
|
||||
status: string;
|
||||
name: string;
|
||||
bot_id?: string;
|
||||
bot_name?: string;
|
||||
pipeline_id?: string;
|
||||
pipeline_name?: string;
|
||||
session_id?: string;
|
||||
message_id?: string;
|
||||
query_id?: string;
|
||||
attributes?: Record<string, unknown>;
|
||||
}
|
||||
|
||||
interface RawSpanData {
|
||||
span_id: string;
|
||||
trace_id: string;
|
||||
parent_span_id?: string;
|
||||
name: string;
|
||||
kind: string;
|
||||
status: string;
|
||||
started_at: string;
|
||||
ended_at?: string;
|
||||
duration?: number;
|
||||
message_id?: string;
|
||||
session_id?: string;
|
||||
bot_id?: string;
|
||||
pipeline_id?: string;
|
||||
attributes?: Record<string, unknown>;
|
||||
error_message?: string;
|
||||
}
|
||||
|
||||
function mapTrace(raw: RawTraceData) {
|
||||
return {
|
||||
traceId: raw.trace_id,
|
||||
name: raw.name,
|
||||
startedAt: parseUTCTimestamp(raw.started_at),
|
||||
endedAt: raw.ended_at ? parseUTCTimestamp(raw.ended_at) : undefined,
|
||||
duration: raw.duration,
|
||||
status: raw.status as 'running' | 'success' | 'error',
|
||||
botId: raw.bot_id,
|
||||
botName: raw.bot_name,
|
||||
pipelineId: raw.pipeline_id,
|
||||
pipelineName: raw.pipeline_name,
|
||||
sessionId: raw.session_id,
|
||||
messageId: raw.message_id,
|
||||
queryId: raw.query_id,
|
||||
attributes: raw.attributes || {},
|
||||
};
|
||||
}
|
||||
|
||||
function mapSpan(raw: RawSpanData): MonitoringSpan {
|
||||
return {
|
||||
spanId: raw.span_id,
|
||||
traceId: raw.trace_id,
|
||||
parentSpanId: raw.parent_span_id,
|
||||
name: raw.name,
|
||||
kind: raw.kind,
|
||||
status: raw.status as 'running' | 'success' | 'error',
|
||||
startedAt: parseUTCTimestamp(raw.started_at),
|
||||
endedAt: raw.ended_at ? parseUTCTimestamp(raw.ended_at) : undefined,
|
||||
duration: raw.duration,
|
||||
messageId: raw.message_id,
|
||||
sessionId: raw.session_id,
|
||||
botId: raw.bot_id,
|
||||
pipelineId: raw.pipeline_id,
|
||||
attributes: raw.attributes || {},
|
||||
errorMessage: raw.error_message,
|
||||
};
|
||||
}
|
||||
|
||||
function spanDepth(
|
||||
span: MonitoringSpan,
|
||||
spansById: Map<string, MonitoringSpan>,
|
||||
) {
|
||||
let depth = 0;
|
||||
let current = span.parentSpanId
|
||||
? spansById.get(span.parentSpanId)
|
||||
: undefined;
|
||||
while (current && depth < 8) {
|
||||
depth += 1;
|
||||
current = current.parentSpanId
|
||||
? spansById.get(current.parentSpanId)
|
||||
: undefined;
|
||||
}
|
||||
return depth;
|
||||
}
|
||||
|
||||
function MonitoringPageContent() {
|
||||
const { t } = useTranslation();
|
||||
const { filterState, setSelectedBots, setSelectedPipelines, setTimeRange } =
|
||||
@@ -256,13 +158,6 @@ function MonitoringPageContent() {
|
||||
|
||||
// State for expanded errors
|
||||
const [expandedErrorId, setExpandedErrorId] = useState<string | null>(null);
|
||||
const [expandedTraceId, setExpandedTraceId] = useState<string | null>(null);
|
||||
const [traceDetails, setTraceDetails] = useState<
|
||||
Record<string, TraceDetails>
|
||||
>({});
|
||||
const [loadingTraceDetails, setLoadingTraceDetails] = useState<
|
||||
Record<string, boolean>
|
||||
>({});
|
||||
|
||||
// State for controlled tabs
|
||||
const [activeTab, setActiveTab] = useState<string>('messages');
|
||||
@@ -370,34 +265,6 @@ function MonitoringPageContent() {
|
||||
}
|
||||
};
|
||||
|
||||
const toggleTraceExpand = async (traceId: string) => {
|
||||
if (expandedTraceId === traceId) {
|
||||
setExpandedTraceId(null);
|
||||
return;
|
||||
}
|
||||
|
||||
setExpandedTraceId(traceId);
|
||||
if (traceDetails[traceId]) return;
|
||||
|
||||
setLoadingTraceDetails((prev) => ({ ...prev, [traceId]: true }));
|
||||
try {
|
||||
const result = await backendClient.getMonitoringTraceDetails(traceId);
|
||||
setTraceDetails((prev) => ({
|
||||
...prev,
|
||||
[traceId]: {
|
||||
traceId: result.trace_id,
|
||||
found: result.found,
|
||||
trace: result.trace ? mapTrace(result.trace) : undefined,
|
||||
spans: (result.spans || []).map(mapSpan),
|
||||
},
|
||||
}));
|
||||
} catch (error) {
|
||||
console.error('Failed to fetch trace details:', error);
|
||||
} finally {
|
||||
setLoadingTraceDetails((prev) => ({ ...prev, [traceId]: false }));
|
||||
}
|
||||
};
|
||||
|
||||
return (
|
||||
<div className="w-full h-full overflow-y-auto overflow-x-hidden">
|
||||
{/* Filters and Refresh Button - Sticky */}
|
||||
@@ -456,9 +323,6 @@ function MonitoringPageContent() {
|
||||
<TabsTrigger value="tokens" className="px-6 py-2">
|
||||
{t('monitoring.tabs.tokens')}
|
||||
</TabsTrigger>
|
||||
<TabsTrigger value="traces" className="px-6 py-2">
|
||||
{t('monitoring.tabs.traces')}
|
||||
</TabsTrigger>
|
||||
<TabsTrigger value="feedback" className="px-6 py-2">
|
||||
{t('monitoring.tabs.feedback')}
|
||||
</TabsTrigger>
|
||||
@@ -826,166 +690,6 @@ function MonitoringPageContent() {
|
||||
/>
|
||||
</TabsContent>
|
||||
|
||||
<TabsContent value="traces" className="p-6 m-0">
|
||||
<div>
|
||||
{loading && (
|
||||
<div className="py-12 flex justify-center">
|
||||
<LoadingSpinner text={t('common.loading')} />
|
||||
</div>
|
||||
)}
|
||||
|
||||
{!loading && data && data.traces && data.traces.length > 0 && (
|
||||
<div className="space-y-4">
|
||||
{data.traces.map((trace) => {
|
||||
const details = traceDetails[trace.traceId];
|
||||
const spans = details?.spans || [];
|
||||
const spansById = new Map(
|
||||
spans.map((span) => [span.spanId, span]),
|
||||
);
|
||||
const maxDuration = Math.max(
|
||||
1,
|
||||
...spans.map((span) => span.duration || 0),
|
||||
);
|
||||
|
||||
return (
|
||||
<div
|
||||
key={trace.traceId}
|
||||
className="border rounded-xl overflow-hidden transition-all duration-200"
|
||||
>
|
||||
<div
|
||||
className="p-5 cursor-pointer hover:bg-accent transition-colors"
|
||||
onClick={() => toggleTraceExpand(trace.traceId)}
|
||||
>
|
||||
<div className="flex items-start justify-between gap-4">
|
||||
<div className="flex items-start flex-1 min-w-0">
|
||||
<div className="mr-3 mt-0.5">
|
||||
{expandedTraceId === trace.traceId ? (
|
||||
<ChevronDown className="w-5 h-5 text-muted-foreground" />
|
||||
) : (
|
||||
<ChevronRight className="w-5 h-5 text-muted-foreground" />
|
||||
)}
|
||||
</div>
|
||||
<div className="min-w-0 flex-1">
|
||||
<div className="flex flex-wrap items-center gap-2 mb-2">
|
||||
<span className="text-xs text-muted-foreground font-mono">
|
||||
{trace.traceId}
|
||||
</span>
|
||||
<span
|
||||
className={`text-xs px-2 py-1 rounded ${
|
||||
trace.status === 'error'
|
||||
? 'bg-red-100 text-red-800 dark:bg-red-900 dark:text-red-200'
|
||||
: trace.status === 'running'
|
||||
? 'bg-yellow-100 text-yellow-800 dark:bg-yellow-900 dark:text-yellow-200'
|
||||
: 'bg-green-100 text-green-800 dark:bg-green-900 dark:text-green-200'
|
||||
}`}
|
||||
>
|
||||
{trace.status}
|
||||
</span>
|
||||
</div>
|
||||
<div className="font-medium text-sm text-foreground mb-1">
|
||||
{trace.name}
|
||||
</div>
|
||||
<div className="text-xs text-muted-foreground truncate">
|
||||
{trace.botName || '-'} →{' '}
|
||||
{trace.pipelineName || '-'}
|
||||
{trace.sessionId
|
||||
? ` · ${trace.sessionId}`
|
||||
: ''}
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
<div className="flex flex-col items-end gap-1 text-xs text-muted-foreground whitespace-nowrap">
|
||||
<span>{trace.startedAt.toLocaleString()}</span>
|
||||
<span>{trace.duration ?? 0}ms</span>
|
||||
</div>
|
||||
</div>
|
||||
</div>
|
||||
|
||||
{expandedTraceId === trace.traceId && (
|
||||
<div className="border-t p-5 bg-muted">
|
||||
{loadingTraceDetails[trace.traceId] && (
|
||||
<div className="py-4 flex justify-center">
|
||||
<LoadingSpinner size="sm" text="" />
|
||||
</div>
|
||||
)}
|
||||
{!loadingTraceDetails[trace.traceId] && (
|
||||
<div className="space-y-3">
|
||||
{spans.length === 0 && (
|
||||
<div className="text-sm text-muted-foreground">
|
||||
{t('monitoring.traces.noSpans')}
|
||||
</div>
|
||||
)}
|
||||
{spans.map((span) => {
|
||||
const depth = spanDepth(span, spansById);
|
||||
const width = Math.max(
|
||||
6,
|
||||
Math.min(
|
||||
100,
|
||||
((span.duration || 0) / maxDuration) *
|
||||
100,
|
||||
),
|
||||
);
|
||||
return (
|
||||
<div
|
||||
key={span.spanId}
|
||||
className="grid grid-cols-[minmax(180px,1fr)_minmax(140px,2fr)_80px] gap-3 items-center text-xs"
|
||||
>
|
||||
<div
|
||||
className="min-w-0"
|
||||
style={{
|
||||
paddingLeft: `${depth * 16}px`,
|
||||
}}
|
||||
>
|
||||
<div className="font-medium text-foreground truncate">
|
||||
{span.name}
|
||||
</div>
|
||||
<div className="text-muted-foreground truncate">
|
||||
{span.kind}
|
||||
</div>
|
||||
</div>
|
||||
<div className="h-7 bg-background rounded border overflow-hidden">
|
||||
<div
|
||||
className={`h-full ${
|
||||
span.status === 'error'
|
||||
? 'bg-red-500/70'
|
||||
: 'bg-blue-500/70'
|
||||
}`}
|
||||
style={{ width: `${width}%` }}
|
||||
/>
|
||||
</div>
|
||||
<div className="text-right text-muted-foreground">
|
||||
{span.duration ?? 0}ms
|
||||
</div>
|
||||
{span.errorMessage && (
|
||||
<div className="col-span-3 text-red-600 dark:text-red-400 bg-background rounded p-2">
|
||||
{span.errorMessage}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
})}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
})}
|
||||
</div>
|
||||
)}
|
||||
|
||||
{!loading &&
|
||||
(!data || !data.traces || data.traces.length === 0) && (
|
||||
<div className="flex flex-col items-center justify-center text-muted-foreground py-16 gap-2">
|
||||
<GitBranch className="h-[3rem] w-[3rem]" />
|
||||
<div className="text-sm">
|
||||
{t('monitoring.traces.noTraces')}
|
||||
</div>
|
||||
</div>
|
||||
)}
|
||||
</div>
|
||||
</TabsContent>
|
||||
|
||||
<TabsContent value="feedback" className="p-6 m-0">
|
||||
<div>
|
||||
{loading && (
|
||||
|
||||
@@ -111,48 +111,6 @@ export interface ErrorLog {
|
||||
messageId?: string;
|
||||
}
|
||||
|
||||
export interface MonitoringTrace {
|
||||
traceId: string;
|
||||
name: string;
|
||||
startedAt: Date;
|
||||
endedAt?: Date;
|
||||
duration?: number;
|
||||
status: 'running' | 'success' | 'error';
|
||||
botId?: string;
|
||||
botName?: string;
|
||||
pipelineId?: string;
|
||||
pipelineName?: string;
|
||||
sessionId?: string;
|
||||
messageId?: string;
|
||||
queryId?: string;
|
||||
attributes: Record<string, unknown>;
|
||||
}
|
||||
|
||||
export interface MonitoringSpan {
|
||||
spanId: string;
|
||||
traceId: string;
|
||||
parentSpanId?: string;
|
||||
name: string;
|
||||
kind: string;
|
||||
status: 'success' | 'error' | 'running';
|
||||
startedAt: Date;
|
||||
endedAt?: Date;
|
||||
duration?: number;
|
||||
messageId?: string;
|
||||
sessionId?: string;
|
||||
botId?: string;
|
||||
pipelineId?: string;
|
||||
attributes: Record<string, unknown>;
|
||||
errorMessage?: string;
|
||||
}
|
||||
|
||||
export interface TraceDetails {
|
||||
traceId: string;
|
||||
found: boolean;
|
||||
trace?: MonitoringTrace;
|
||||
spans: MonitoringSpan[];
|
||||
}
|
||||
|
||||
export interface MessageDetails {
|
||||
messageId: string;
|
||||
found: boolean;
|
||||
@@ -167,7 +125,6 @@ export interface MessageDetails {
|
||||
averageDurationMs: number;
|
||||
};
|
||||
errors: ErrorLog[];
|
||||
trace?: MonitoringTrace;
|
||||
}
|
||||
|
||||
export interface OverviewMetrics {
|
||||
@@ -246,7 +203,6 @@ export interface MonitoringData {
|
||||
modelCalls: ModelCall[];
|
||||
sessions: SessionInfo[];
|
||||
errors: ErrorLog[];
|
||||
traces: MonitoringTrace[];
|
||||
feedback?: FeedbackRecord[];
|
||||
feedbackStats?: FeedbackStats;
|
||||
totalCount: {
|
||||
@@ -255,7 +211,6 @@ export interface MonitoringData {
|
||||
embeddingCalls: number;
|
||||
sessions: number;
|
||||
errors: number;
|
||||
traces: number;
|
||||
feedback?: number;
|
||||
};
|
||||
}
|
||||
|
||||
@@ -15,6 +15,7 @@ import {
|
||||
At,
|
||||
Quote,
|
||||
Voice,
|
||||
File as FileComponent,
|
||||
Source,
|
||||
} from '@/app/infra/entities/message';
|
||||
import { toast } from 'sonner';
|
||||
@@ -64,7 +65,12 @@ export default function DebugDialog({
|
||||
const [isHovering, setIsHovering] = useState(false);
|
||||
const [isConnected, setIsConnected] = useState(false);
|
||||
const [selectedImages, setSelectedImages] = useState<
|
||||
Array<{ file: File; preview: string; fileKey?: string }>
|
||||
Array<{
|
||||
file: File;
|
||||
preview: string;
|
||||
fileKey?: string;
|
||||
kind: 'image' | 'voice' | 'file';
|
||||
}>
|
||||
>([]);
|
||||
const [isUploading, setIsUploading] = useState(false);
|
||||
const [previewImageUrl, setPreviewImageUrl] = useState<string>('');
|
||||
@@ -292,23 +298,38 @@ export default function DebugDialog({
|
||||
const files = e.target.files;
|
||||
if (!files || files.length === 0) return;
|
||||
|
||||
const newImages: Array<{ file: File; preview: string }> = [];
|
||||
const newImages: Array<{
|
||||
file: File;
|
||||
preview: string;
|
||||
kind: 'image' | 'voice' | 'file';
|
||||
}> = [];
|
||||
|
||||
for (let i = 0; i < files.length; i++) {
|
||||
const file = files[i];
|
||||
if (file.type.startsWith('image/')) {
|
||||
const preview = URL.createObjectURL(file);
|
||||
newImages.push({ file, preview });
|
||||
newImages.push({
|
||||
file,
|
||||
preview: URL.createObjectURL(file),
|
||||
kind: 'image',
|
||||
});
|
||||
} else if (file.type.startsWith('audio/')) {
|
||||
newImages.push({ file, preview: '', kind: 'voice' });
|
||||
} else {
|
||||
newImages.push({ file, preview: '', kind: 'file' });
|
||||
}
|
||||
}
|
||||
|
||||
setSelectedImages((prev) => [...prev, ...newImages]);
|
||||
// reset the input so selecting the same file again re-triggers onChange
|
||||
e.target.value = '';
|
||||
};
|
||||
|
||||
const handleRemoveImage = (index: number) => {
|
||||
setSelectedImages((prev) => {
|
||||
const newImages = [...prev];
|
||||
URL.revokeObjectURL(newImages[index].preview);
|
||||
if (newImages[index].preview) {
|
||||
URL.revokeObjectURL(newImages[index].preview);
|
||||
}
|
||||
newImages.splice(index, 1);
|
||||
return newImages;
|
||||
});
|
||||
@@ -372,19 +393,33 @@ export default function DebugDialog({
|
||||
});
|
||||
}
|
||||
|
||||
// Upload images and add to message chain
|
||||
for (const image of selectedImages) {
|
||||
// Upload attachments and add to message chain
|
||||
for (const attachment of selectedImages) {
|
||||
try {
|
||||
const result = await httpClient.uploadWebSocketImage(
|
||||
selectedPipelineId,
|
||||
image.file,
|
||||
);
|
||||
messageChain.push({
|
||||
type: 'Image',
|
||||
path: result.file_key,
|
||||
});
|
||||
if (attachment.kind === 'image') {
|
||||
const result = await httpClient.uploadWebSocketImage(
|
||||
selectedPipelineId,
|
||||
attachment.file,
|
||||
);
|
||||
messageChain.push({
|
||||
type: 'Image',
|
||||
path: result.file_key,
|
||||
});
|
||||
} else {
|
||||
// Voice / File go through the generic document upload endpoint,
|
||||
// which returns a storage key the backend resolves into the
|
||||
// sandbox inbox just like images.
|
||||
const result = await httpClient.uploadDocumentFile(attachment.file);
|
||||
messageChain.push({
|
||||
type: attachment.kind === 'voice' ? 'Voice' : 'File',
|
||||
path: result.file_id,
|
||||
...(attachment.kind === 'file'
|
||||
? { name: attachment.file.name }
|
||||
: {}),
|
||||
});
|
||||
}
|
||||
} catch (error) {
|
||||
console.error('Image upload failed:', error);
|
||||
console.error('Attachment upload failed:', error);
|
||||
toast.error(t('pipelines.debugDialog.imageUploadFailed'));
|
||||
}
|
||||
}
|
||||
@@ -393,7 +428,9 @@ export default function DebugDialog({
|
||||
setInputValue('');
|
||||
setHasAt(false);
|
||||
setQuotedMessage(null);
|
||||
selectedImages.forEach((img) => URL.revokeObjectURL(img.preview));
|
||||
selectedImages.forEach((img) => {
|
||||
if (img.preview) URL.revokeObjectURL(img.preview);
|
||||
});
|
||||
setSelectedImages([]);
|
||||
|
||||
// Send message via WebSocket
|
||||
@@ -460,13 +497,29 @@ export default function DebugDialog({
|
||||
}
|
||||
|
||||
case 'File': {
|
||||
const file = component as MessageChainComponent & { name?: string };
|
||||
const file = component as FileComponent;
|
||||
const downloadHref = file.base64
|
||||
? file.base64.startsWith('data:')
|
||||
? file.base64
|
||||
: `data:application/octet-stream;base64,${file.base64}`
|
||||
: file.url || '';
|
||||
const fileName = file.name || 'Unknown';
|
||||
return (
|
||||
<div key={index} className="my-2 flex items-center gap-2 text-sm">
|
||||
<Paperclip className="size-4" />
|
||||
<span>
|
||||
[{t('pipelines.debugDialog.file')}] {file.name || 'Unknown'}
|
||||
</span>
|
||||
{downloadHref ? (
|
||||
<a
|
||||
href={downloadHref}
|
||||
download={fileName}
|
||||
className="text-primary underline hover:opacity-80"
|
||||
>
|
||||
[{t('pipelines.debugDialog.file')}] {fileName}
|
||||
</a>
|
||||
) : (
|
||||
<span>
|
||||
[{t('pipelines.debugDialog.file')}] {fileName}
|
||||
</span>
|
||||
)}
|
||||
</div>
|
||||
);
|
||||
}
|
||||
@@ -844,17 +897,30 @@ export default function DebugDialog({
|
||||
</div>
|
||||
)}
|
||||
|
||||
{/* Image preview area */}
|
||||
{/* Attachment preview area */}
|
||||
{selectedImages.length > 0 && (
|
||||
<div className="px-4 pb-2">
|
||||
<div className="flex gap-2 flex-wrap">
|
||||
{selectedImages.map((image, index) => (
|
||||
<div key={index} className="relative group">
|
||||
<img
|
||||
src={image.preview}
|
||||
alt={`preview-${index}`}
|
||||
className="w-20 h-20 object-cover rounded-lg border"
|
||||
/>
|
||||
{image.kind === 'image' ? (
|
||||
<img
|
||||
src={image.preview}
|
||||
alt={`preview-${index}`}
|
||||
className="w-20 h-20 object-cover rounded-lg border"
|
||||
/>
|
||||
) : (
|
||||
<div className="w-36 h-20 px-2 rounded-lg border bg-muted/40 flex items-center gap-2 overflow-hidden">
|
||||
{image.kind === 'voice' ? (
|
||||
<Music className="size-5 shrink-0 text-muted-foreground" />
|
||||
) : (
|
||||
<Paperclip className="size-5 shrink-0 text-muted-foreground" />
|
||||
)}
|
||||
<span className="text-xs text-muted-foreground truncate">
|
||||
{image.file.name}
|
||||
</span>
|
||||
</div>
|
||||
)}
|
||||
<button
|
||||
type="button"
|
||||
onClick={() => handleRemoveImage(index)}
|
||||
@@ -883,7 +949,7 @@ export default function DebugDialog({
|
||||
<input
|
||||
ref={fileInputRef}
|
||||
type="file"
|
||||
accept="image/*"
|
||||
accept="image/*,audio/*,*/*"
|
||||
multiple
|
||||
onChange={handleImageSelect}
|
||||
className="hidden"
|
||||
|
||||
@@ -64,6 +64,8 @@ export interface File extends MessageComponent {
|
||||
name?: string;
|
||||
size?: number;
|
||||
url?: string;
|
||||
path?: string;
|
||||
base64?: string;
|
||||
}
|
||||
|
||||
// Unknown component
|
||||
|
||||
@@ -1185,29 +1185,12 @@ export class BackendClient extends BaseHttpClient {
|
||||
stack_trace?: string;
|
||||
message_id?: string;
|
||||
}>;
|
||||
traces?: Array<{
|
||||
trace_id: string;
|
||||
started_at: string;
|
||||
ended_at?: string;
|
||||
duration?: number;
|
||||
status: string;
|
||||
name: string;
|
||||
bot_id?: string;
|
||||
bot_name?: string;
|
||||
pipeline_id?: string;
|
||||
pipeline_name?: string;
|
||||
session_id?: string;
|
||||
message_id?: string;
|
||||
query_id?: string;
|
||||
attributes?: Record<string, unknown>;
|
||||
}>;
|
||||
totalCount: {
|
||||
messages: number;
|
||||
llmCalls: number;
|
||||
embeddingCalls: number;
|
||||
sessions: number;
|
||||
errors: number;
|
||||
traces?: number;
|
||||
};
|
||||
}> {
|
||||
const queryParams = new URLSearchParams();
|
||||
@@ -1230,90 +1213,6 @@ export class BackendClient extends BaseHttpClient {
|
||||
return this.get(`/api/v1/monitoring/data?${queryParams.toString()}`);
|
||||
}
|
||||
|
||||
public getMonitoringTraces(params: {
|
||||
botId?: string[];
|
||||
pipelineId?: string[];
|
||||
startTime?: string;
|
||||
endTime?: string;
|
||||
limit?: number;
|
||||
}): Promise<{
|
||||
traces: Array<{
|
||||
trace_id: string;
|
||||
started_at: string;
|
||||
ended_at?: string;
|
||||
duration?: number;
|
||||
status: string;
|
||||
name: string;
|
||||
bot_id?: string;
|
||||
bot_name?: string;
|
||||
pipeline_id?: string;
|
||||
pipeline_name?: string;
|
||||
session_id?: string;
|
||||
message_id?: string;
|
||||
query_id?: string;
|
||||
attributes?: Record<string, unknown>;
|
||||
}>;
|
||||
total: number;
|
||||
}> {
|
||||
const queryParams = new URLSearchParams();
|
||||
if (params.botId) {
|
||||
params.botId.forEach((id) => queryParams.append('botId', id));
|
||||
}
|
||||
if (params.pipelineId) {
|
||||
params.pipelineId.forEach((id) => queryParams.append('pipelineId', id));
|
||||
}
|
||||
if (params.startTime) {
|
||||
queryParams.append('startTime', params.startTime);
|
||||
}
|
||||
if (params.endTime) {
|
||||
queryParams.append('endTime', params.endTime);
|
||||
}
|
||||
if (params.limit) {
|
||||
queryParams.append('limit', params.limit.toString());
|
||||
}
|
||||
return this.get(`/api/v1/monitoring/traces?${queryParams.toString()}`);
|
||||
}
|
||||
|
||||
public getMonitoringTraceDetails(traceId: string): Promise<{
|
||||
trace_id: string;
|
||||
found: boolean;
|
||||
trace: {
|
||||
trace_id: string;
|
||||
started_at: string;
|
||||
ended_at?: string;
|
||||
duration?: number;
|
||||
status: string;
|
||||
name: string;
|
||||
bot_id?: string;
|
||||
bot_name?: string;
|
||||
pipeline_id?: string;
|
||||
pipeline_name?: string;
|
||||
session_id?: string;
|
||||
message_id?: string;
|
||||
query_id?: string;
|
||||
attributes?: Record<string, unknown>;
|
||||
};
|
||||
spans: Array<{
|
||||
span_id: string;
|
||||
trace_id: string;
|
||||
parent_span_id?: string;
|
||||
name: string;
|
||||
kind: string;
|
||||
status: string;
|
||||
started_at: string;
|
||||
ended_at?: string;
|
||||
duration?: number;
|
||||
message_id?: string;
|
||||
session_id?: string;
|
||||
bot_id?: string;
|
||||
pipeline_id?: string;
|
||||
attributes?: Record<string, unknown>;
|
||||
error_message?: string;
|
||||
}>;
|
||||
}> {
|
||||
return this.get(`/api/v1/monitoring/traces/${traceId}`);
|
||||
}
|
||||
|
||||
public getMonitoringOverview(params: {
|
||||
botId?: string[];
|
||||
pipelineId?: string[];
|
||||
|
||||
@@ -1217,7 +1217,6 @@ const enUS = {
|
||||
embeddingCalls: 'Embedding Calls',
|
||||
modelCalls: 'Model Calls',
|
||||
tokens: 'Token Monitoring',
|
||||
traces: 'Traces',
|
||||
feedback: 'User Feedback',
|
||||
sessions: 'Session Analysis',
|
||||
errors: 'Error Logs',
|
||||
@@ -1322,11 +1321,6 @@ const enUS = {
|
||||
noErrors: 'No errors found',
|
||||
stackTrace: 'Stack Trace',
|
||||
},
|
||||
traces: {
|
||||
title: 'Traces',
|
||||
noTraces: 'No traces found',
|
||||
noSpans: 'No spans recorded for this trace',
|
||||
},
|
||||
feedback: {
|
||||
title: 'User Feedback',
|
||||
totalFeedback: 'Total Feedback',
|
||||
|
||||
@@ -1158,7 +1158,6 @@ const zhHans = {
|
||||
embeddingCalls: 'Embedding调用',
|
||||
modelCalls: '模型调用',
|
||||
tokens: 'Token 监控',
|
||||
traces: '链路追踪',
|
||||
feedback: '用户反馈',
|
||||
sessions: '会话分析',
|
||||
errors: '错误日志',
|
||||
@@ -1263,11 +1262,6 @@ const zhHans = {
|
||||
noErrors: '未找到错误',
|
||||
stackTrace: '堆栈追踪',
|
||||
},
|
||||
traces: {
|
||||
title: '链路追踪',
|
||||
noTraces: '未找到链路记录',
|
||||
noSpans: '此链路暂无 Span 记录',
|
||||
},
|
||||
feedback: {
|
||||
title: '用户反馈',
|
||||
totalFeedback: '总反馈数',
|
||||
|
||||
Reference in New Issue
Block a user