Compare commits

..

4 Commits

Author SHA1 Message Date
RockChinQ 67e3f837fe fix(box): purge leftover inbox/outbox on startup; clear root-owned outbox via exec
The agent attachment outbox is written by the sandbox container as root over
the bind-mount, so the LangBot host process (non-root) cannot rmtree those
files — the host-side delete failed silently and stale files were re-collected
on a later turn that reused the same query_id (the query_id counter resets to 0
on every restart).

- BoxService.initialize now purges leftover inbox/outbox after the runtime is
  available: host rmtree first, then an in-sandbox 'rm -rf' via exec for any
  root-owned survivors.
- _clear_outbox now falls back to exec when the host delete leaves root-owned
  files behind, instead of silently failing.
- collect_outbound_attachments clears the outbox unconditionally (even on an
  empty collection) so a reused query_id never inherits stale files.
- Tests: startup purge (host-owned + root-owned exec fallback + no-workspace
  noop) and empty-collection-still-clears.
2026-06-18 09:53:37 -04:00
RockChinQ 3b3b09331a test(box): cover inbound/outbound attachment helpers; fix ruff format
- ruff format localagent.py (CI ruff format --check was failing)
- add unit tests for ResponseWrapper outbound-attachment helpers (wrapper.py 78%->98%)
- add unit tests for LocalAgentRunner._inject_inbound_attachments
- add unit tests for WebSocketAdapter._process_image_components (0%->covered)

Lifts PR patch coverage from 68.97% to ~88% (>75% target).
2026-06-17 22:08:42 -04:00
RockChinQ 75e5af26d0 feat(box): support voice/file attachment round-trip end-to-end
Extends the bidirectional attachment transfer to audio and arbitrary files
through the real webchat UI, and fixes the model-payload errors that
non-image attachments triggered.

- platform(websocket_adapter): resolve Voice/File component storage keys to
  base64 (previously only Image), so audio/documents reach the sandbox inbox.
- web(debug-dialog): accept audio/* and any file in the uploader (was
  image-only), classify by mimetype, upload Voice/File via the documents
  endpoint, and render non-image staged attachments as a chip.
- provider(litellmchat): drop non-image file parts (file_base64 / file_url)
  when building the OpenAI/LiteLLM payload. These come from Voice/File
  attachments — including ones replayed from conversation history — and the
  agent reads their bytes from the sandbox, not the model. Without this the
  provider rejects the request: 'invalid content type=file_base64'.
- provider(localagent): also strip those parts from the current user message
  alongside the sandbox-path note (model-facing clarity; the requester is the
  real safety net for history).
- tests: cover the requester strip/keep behavior (file dropped, image kept and
  reshaped to image_url, mixed history, plain-string content).
2026-06-17 21:57:09 -04:00
RockChinQ 22c0a18bea feat(box): bidirectional attachment transfer for sandbox
Materialize inbound attachments into the sandbox workspace so agents can
process user-sent files, and collect agent-produced files from the outbox
to attach them back to the reply.

- box(service): add materialize_inbound_attachments / collect_outbound
  attachments. Prefer direct host-filesystem read/write on the bind-mounted
  workspace (no size limit), falling back to chunked exec only for
  non-shared backends (e2b/remote). Clear per-query inbox/outbox dirs at
  turn start to avoid query_id-reuse collisions.
- provider(localagent): inject inbound attachment descriptors into the
  sandbox and append a system note telling the agent the inbox/outbox paths.
- pipeline(wrapper): collect outbox files on the final stream chunk and
  append them as attachment components to the response chain.
- web(debug-dialog): render File components with a download link when
  base64/url is present; add base64/path fields to the File entity.
- tests: cover inbound/outbound, large-file transfer without truncation,
  and stale-dir clearing (86 passing).
2026-06-17 19:01:03 -04:00
36 changed files with 1613 additions and 1941 deletions
@@ -313,30 +313,18 @@ class MonitoringRouterGroup(group.RouterGroup):
offset=0, offset=0,
) )
# Get traces
traces, traces_total = await self.ap.monitoring_service.get_traces(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
start_time=start_time,
end_time=end_time,
limit=limit,
offset=0,
)
return self.success( return self.success(
data={ data={
'overview': overview, 'overview': overview,
'messages': messages, 'messages': messages,
'llmCalls': llm_calls, 'llmCalls': llm_calls,
'embeddingCalls': embedding_calls, 'embeddingCalls': embedding_calls,
'traces': traces,
'sessions': sessions, 'sessions': sessions,
'errors': errors, 'errors': errors,
'totalCount': { 'totalCount': {
'messages': messages_total, 'messages': messages_total,
'llmCalls': llm_calls_total, 'llmCalls': llm_calls_total,
'embeddingCalls': embedding_calls_total, 'embeddingCalls': embedding_calls_total,
'traces': traces_total,
'sessions': sessions_total, 'sessions': sessions_total,
'errors': errors_total, 'errors': errors_total,
}, },
@@ -362,49 +350,6 @@ class MonitoringRouterGroup(group.RouterGroup):
return self.success(data=details) return self.success(data=details)
@self.route('/traces', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_traces() -> str:
"""Get end-to-end trace records."""
bot_ids = quart.request.args.getlist('botId')
pipeline_ids = quart.request.args.getlist('pipelineId')
session_ids = quart.request.args.getlist('sessionId')
statuses = quart.request.args.getlist('status')
start_time_str = quart.request.args.get('startTime')
end_time_str = quart.request.args.get('endTime')
limit = int(quart.request.args.get('limit', 100))
offset = int(quart.request.args.get('offset', 0))
start_time = parse_iso_datetime(start_time_str)
end_time = parse_iso_datetime(end_time_str)
traces, total = await self.ap.monitoring_service.get_traces(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
session_ids=session_ids if session_ids else None,
statuses=statuses if statuses else None,
start_time=start_time,
end_time=end_time,
limit=limit,
offset=offset,
)
return self.success(
data={
'traces': traces,
'total': total,
'limit': limit,
'offset': offset,
}
)
@self.route('/traces/<trace_id>', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def get_trace_details(trace_id: str) -> str:
"""Get one trace with all spans."""
details = await self.ap.monitoring_service.get_trace_details(trace_id)
if not details.get('found'):
return self.http_status(404, -1, f'Trace {trace_id} not found')
return self.success(data=details)
@self.route('/export', methods=['GET'], auth_type=group.AuthType.USER_TOKEN) @self.route('/export', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def export_data() -> tuple[str, int]: async def export_data() -> tuple[str, int]:
"""Export monitoring data as CSV""" """Export monitoring data as CSV"""
@@ -350,24 +350,8 @@ class PluginsRouterGroup(group.RouterGroup):
if not endpoint.startswith('/') or '..' in endpoint: if not endpoint.startswith('/') or '..' in endpoint:
return self.http_status(400, -1, 'invalid endpoint') return self.http_status(400, -1, 'invalid endpoint')
caller = {
'plugin_author': author,
'plugin_name': plugin_name,
'page_id': page_id,
'origin': _get_request_origin(),
}
headers = {
key: value
for key, value in {
'user-agent': quart.request.headers.get('User-Agent'),
'x-request-id': quart.request.headers.get('X-Request-ID'),
'x-forwarded-for': quart.request.headers.get('X-Forwarded-For'),
}.items()
if value
}
result = await self.ap.plugin_connector.handle_page_api( result = await self.ap.plugin_connector.handle_page_api(
author, plugin_name, page_id, endpoint, method.upper(), body, caller, headers author, plugin_name, page_id, endpoint, method.upper(), body
) )
if result.get('error'): if result.get('error'):
return self.http_status(400, -1, result['error']) return self.http_status(400, -1, result['error'])
@@ -3,55 +3,11 @@ from __future__ import annotations
import uuid import uuid
import datetime import datetime
import sqlalchemy import sqlalchemy
import json
from ....core import app from ....core import app
from ....entity.persistence import monitoring as persistence_monitoring from ....entity.persistence import monitoring as persistence_monitoring
# TODO: Move shared trace/time helpers into a small monitoring utility module
# when trace propagation expands beyond the current query/retrieval path.
def _utc_now() -> datetime.datetime:
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
def _json_dumps(value: dict | list | None) -> str | None:
if value is None:
return None
try:
return json.dumps(value, ensure_ascii=False, default=str)
except Exception:
return json.dumps({'serialization_error': str(value)}, ensure_ascii=False)
def _json_loads(value: str | None) -> dict | list | None:
if not value:
return None
try:
return json.loads(value)
except Exception:
return None
def new_trace_id() -> str:
return f'trace-{uuid.uuid4().hex[:16]}'
def new_span_id() -> str:
return f'span-{uuid.uuid4().hex[:16]}'
def normalize_trace_status(status: str | None) -> str:
"""Normalize operation status to the monitoring UI vocabulary."""
if status in ('completed', 'ok'):
return 'success'
if status in ('failed', 'failure', 'exception'):
return 'error'
if status in ('running', 'success', 'error'):
return status
return 'success'
class MonitoringService: class MonitoringService:
"""Monitoring service""" """Monitoring service"""
@@ -118,18 +74,6 @@ class MonitoringService:
persistence_monitoring.MonitoringFeedback.timestamp, persistence_monitoring.MonitoringFeedback.timestamp,
persistence_monitoring.MonitoringFeedback.id, persistence_monitoring.MonitoringFeedback.id,
), ),
(
'monitoring_traces',
persistence_monitoring.MonitoringTrace,
persistence_monitoring.MonitoringTrace.started_at,
persistence_monitoring.MonitoringTrace.trace_id,
),
(
'monitoring_spans',
persistence_monitoring.MonitoringSpan,
persistence_monitoring.MonitoringSpan.started_at,
persistence_monitoring.MonitoringSpan.span_id,
),
] ]
deleted_counts: dict[str, int] = {} deleted_counts: dict[str, int] = {}
@@ -189,116 +133,6 @@ class MonitoringService:
# ========== Recording Methods ========== # ========== Recording Methods ==========
async def start_trace(
self,
trace_id: str | None = None,
name: str = 'LangBot query',
bot_id: str | None = None,
bot_name: str | None = None,
pipeline_id: str | None = None,
pipeline_name: str | None = None,
session_id: str | None = None,
message_id: str | None = None,
query_id: str | int | None = None,
attributes: dict | None = None,
) -> str:
"""Create or update a trace header row."""
trace_id = trace_id or new_trace_id()
trace_data = {
'trace_id': trace_id,
'started_at': _utc_now(),
'ended_at': None,
'duration': None,
'status': 'running',
'name': name,
'bot_id': bot_id,
'bot_name': bot_name,
'pipeline_id': pipeline_id,
'pipeline_name': pipeline_name,
'session_id': session_id,
'message_id': message_id,
'query_id': str(query_id) if query_id is not None else None,
'attributes': _json_dumps(attributes),
}
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringTrace).values(trace_data)
)
return trace_id
async def finish_trace(
self,
trace_id: str,
status: str = 'success',
duration: int | None = None,
message_id: str | None = None,
attributes: dict | None = None,
) -> None:
"""Mark a trace complete."""
update_values: dict = {
'ended_at': _utc_now(),
'status': normalize_trace_status(status),
}
if duration is not None:
update_values['duration'] = duration
if message_id is not None:
update_values['message_id'] = message_id
if attributes is not None:
update_values['attributes'] = _json_dumps(attributes)
await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(persistence_monitoring.MonitoringTrace)
.where(persistence_monitoring.MonitoringTrace.trace_id == trace_id)
.values(update_values)
)
async def record_span(
self,
trace_id: str,
name: str,
kind: str,
status: str = 'success',
span_id: str | None = None,
parent_span_id: str | None = None,
started_at: datetime.datetime | None = None,
ended_at: datetime.datetime | None = None,
duration: int | None = None,
message_id: str | None = None,
session_id: str | None = None,
bot_id: str | None = None,
pipeline_id: str | None = None,
attributes: dict | None = None,
error_message: str | None = None,
) -> str:
"""Record a single completed span."""
started_at = started_at or _utc_now()
if duration is None and ended_at is not None:
duration = int((ended_at - started_at).total_seconds() * 1000)
elif duration is not None:
duration = int(round(float(duration)))
span_data = {
'span_id': span_id or new_span_id(),
'trace_id': trace_id,
'parent_span_id': parent_span_id,
'name': name,
'kind': kind,
'status': normalize_trace_status(status),
'started_at': started_at,
'ended_at': ended_at or _utc_now(),
'duration': duration,
'message_id': message_id,
'session_id': session_id,
'bot_id': bot_id,
'pipeline_id': pipeline_id,
'attributes': _json_dumps(attributes),
'error_message': error_message,
}
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringSpan).values(span_data)
)
return span_data['span_id']
async def record_message( async def record_message(
self, self,
bot_id: str, bot_id: str,
@@ -1242,19 +1076,6 @@ class MonitoringService:
for row in error_rows for row in error_rows
] ]
trace_query = (
sqlalchemy.select(persistence_monitoring.MonitoringTrace)
.where(persistence_monitoring.MonitoringTrace.message_id == message_id)
.order_by(persistence_monitoring.MonitoringTrace.started_at.desc())
.limit(1)
)
trace_result = await self.ap.persistence_mgr.execute_async(trace_query)
trace_row = trace_result.first()
trace = None
if trace_row:
trace_model = trace_row[0] if isinstance(trace_row, tuple) else trace_row
trace = self._serialize_trace(trace_model)
return { return {
'message_id': message_id, 'message_id': message_id,
'found': True, 'found': True,
@@ -1269,84 +1090,6 @@ class MonitoringService:
'average_duration_ms': int(total_duration / len(llm_rows)) if len(llm_rows) > 0 else 0, 'average_duration_ms': int(total_duration / len(llm_rows)) if len(llm_rows) > 0 else 0,
}, },
'errors': errors, 'errors': errors,
'trace': trace,
}
def _serialize_trace(self, trace: persistence_monitoring.MonitoringTrace) -> dict:
data = self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringTrace, trace)
data['attributes'] = _json_loads(data.get('attributes')) or {}
return data
def _serialize_span(self, span: persistence_monitoring.MonitoringSpan) -> dict:
data = self.ap.persistence_mgr.serialize_model(persistence_monitoring.MonitoringSpan, span)
data['attributes'] = _json_loads(data.get('attributes')) or {}
return data
async def get_traces(
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
session_ids: list[str] | None = None,
statuses: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int = 100,
offset: int = 0,
) -> tuple[list[dict], int]:
"""Get trace headers with filters."""
conditions = []
if bot_ids:
conditions.append(persistence_monitoring.MonitoringTrace.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(persistence_monitoring.MonitoringTrace.pipeline_id.in_(pipeline_ids))
if session_ids:
conditions.append(persistence_monitoring.MonitoringTrace.session_id.in_(session_ids))
if statuses:
conditions.append(persistence_monitoring.MonitoringTrace.status.in_(statuses))
if start_time:
conditions.append(persistence_monitoring.MonitoringTrace.started_at >= start_time)
if end_time:
conditions.append(persistence_monitoring.MonitoringTrace.started_at <= end_time)
count_query = sqlalchemy.select(sqlalchemy.func.count(persistence_monitoring.MonitoringTrace.trace_id))
query = sqlalchemy.select(persistence_monitoring.MonitoringTrace)
if conditions:
clause = sqlalchemy.and_(*conditions)
count_query = count_query.where(clause)
query = query.where(clause)
total_result = await self.ap.persistence_mgr.execute_async(count_query)
total = total_result.scalar() or 0
query = query.order_by(persistence_monitoring.MonitoringTrace.started_at.desc()).limit(limit).offset(offset)
result = await self.ap.persistence_mgr.execute_async(query)
traces = [self._serialize_trace(row[0] if isinstance(row, tuple) else row) for row in result.all()]
return traces, total
async def get_trace_details(self, trace_id: str) -> dict:
"""Get a single trace and all spans in chronological order."""
trace_query = sqlalchemy.select(persistence_monitoring.MonitoringTrace).where(
persistence_monitoring.MonitoringTrace.trace_id == trace_id
)
trace_result = await self.ap.persistence_mgr.execute_async(trace_query)
trace_row = trace_result.first()
if not trace_row:
return {'trace_id': trace_id, 'found': False}
trace = trace_row[0] if isinstance(trace_row, tuple) else trace_row
span_query = (
sqlalchemy.select(persistence_monitoring.MonitoringSpan)
.where(persistence_monitoring.MonitoringSpan.trace_id == trace_id)
.order_by(persistence_monitoring.MonitoringSpan.started_at.asc())
)
span_result = await self.ap.persistence_mgr.execute_async(span_query)
spans = [self._serialize_span(row[0] if isinstance(row, tuple) else row) for row in span_result.all()]
return {
'trace_id': trace_id,
'found': True,
'trace': self._serialize_trace(trace),
'spans': spans,
} }
# ========== Export Methods ========== # ========== Export Methods ==========
+502
View File
@@ -105,6 +105,7 @@ class BoxService:
f'LangBot Box runtime initialized: profile={self.profile.name} ' f'LangBot Box runtime initialized: profile={self.profile.name} '
f'default_workspace={self.default_workspace or "(none)"}' f'default_workspace={self.default_workspace or "(none)"}'
) )
await self._purge_attachment_dirs()
except Exception as exc: except Exception as exc:
self.ap.logger.warning(f'LangBot Box runtime unavailable, sandbox features disabled: {exc}') self.ap.logger.warning(f'LangBot Box runtime unavailable, sandbox features disabled: {exc}')
self._available = False self._available = False
@@ -335,6 +336,507 @@ class BoxService:
return await self.execute_spec_payload(spec_payload, query) return await self.execute_spec_payload(spec_payload, query)
# ── Attachment passthrough (inbound / outbound) ──────────────────
#
# IM/webchat attachments (images, voices, files) reach the LLM as
# multimodal content, but historically never landed on the sandbox
# filesystem, so the agent's exec/read/write tools could not operate on
# them. Conversely, files the agent produced inside the sandbox were
# never surfaced back to the user. These two helpers close both gaps:
#
# inbound : message_chain attachments -> /workspace/inbox/<query_id>/
# outbound : /workspace/outbox/<query_id>/ -> reply MessageChain
#
# Transfer prefers DIRECT HOST FILESYSTEM access to the bind-mounted
# workspace (default_workspace on the host maps to /workspace inside the
# container), which has no size limit. This covers the local docker /
# nsjail / stdio backends. For backends where the workspace is NOT visible
# on the LangBot host (E2B, an external remote runtime.endpoint), it falls
# back to a base64-through-exec round-trip. The exec channel can only move
# small files reliably — the docker backend passes the command as a single
# argv (ARG_MAX) and exec stdout is truncated by output_limit_chars — so
# the host path is strongly preferred and used whenever available.
INBOX_MOUNT_DIR = '/workspace/inbox'
OUTBOX_MOUNT_DIR = '/workspace/outbox'
INBOX_SUBDIR = 'inbox'
OUTBOX_SUBDIR = 'outbox'
# Hard cap on a single attachment. The HTTP upload endpoints already cap
# uploads at 10MiB; keep parity.
_ATTACHMENT_MAX_BYTES = 10 * _MIB
# Conservative cap for the exec FALLBACK path only (ARG_MAX / stdout
# truncation). The host-filesystem path has no such limit.
_EXEC_FALLBACK_MAX_BYTES = 256 * 1024
def _host_query_dir(self, subdir: str, query_id) -> str | None:
"""Host path for ``/workspace/<subdir>/<query_id>`` when LangBot can
access the bind-mounted workspace directly, else ``None``.
``default_workspace`` is the host directory bind-mounted to
``/workspace`` for the local docker/nsjail backends and shared
outright in stdio mode, so a file written there by LangBot is visible
to the sandbox (and vice-versa). It is ``None`` / not a local dir for
E2B and remote runtimes, where we must fall back to the exec channel.
"""
root = self.default_workspace
if not root or not os.path.isdir(root):
return None
return os.path.join(root, subdir, str(query_id))
async def _purge_attachment_dirs(self) -> None:
"""Remove leftover inbox/outbox directories on startup.
``query_id`` is a process-local counter (see pipeline query pool) that
resets to 0 on every restart, so per-query attachment directories from
a previous process would otherwise be silently reused — leaking a prior
run's inbound files and re-sending stale outbound files.
Outbox files are written by the sandbox **container**, which runs as
root over the bind-mount, so the LangBot host process (a non-root user)
cannot ``rmtree`` them. We therefore try a host-side delete first (fast,
works for host-owned inbox files) and, for anything that survives,
delete from *inside* the sandbox via exec where the container's root can
remove its own files. Best-effort: never block startup.
"""
root = self.default_workspace
if not root or not os.path.isdir(root):
return
import shutil
host_survivors: list[str] = []
def _host_purge() -> list[str]:
survivors: list[str] = []
for subdir in (self.INBOX_SUBDIR, self.OUTBOX_SUBDIR):
path = os.path.join(root, subdir)
if not os.path.isdir(path):
continue
shutil.rmtree(path, ignore_errors=True)
if os.path.exists(path):
survivors.append(subdir)
return survivors
try:
host_survivors = await asyncio.to_thread(_host_purge)
except Exception as exc: # pragma: no cover - defensive
self.ap.logger.warning(f'Host-side purge of sandbox attachment dirs failed: {exc}')
host_survivors = [self.INBOX_SUBDIR, self.OUTBOX_SUBDIR]
if not host_survivors:
self.ap.logger.info('Purged leftover sandbox attachment dirs from a previous process.')
return
# Root-owned leftovers (container output): delete from inside the box.
targets = ' '.join(f'/workspace/{sub}' for sub in host_survivors)
try:
spec = self.build_spec({'cmd': f'rm -rf {targets}', 'session_id': '__startup_purge__', 'timeout_sec': 30})
await self.client.execute(spec)
self.ap.logger.info(
f'Purged root-owned leftover sandbox attachment dirs via sandbox exec: {host_survivors}'
)
except Exception as exc:
self.ap.logger.warning(
f'Failed to purge root-owned sandbox attachment dirs {host_survivors} via exec: {exc}'
)
@staticmethod
def _sanitize_attachment_name(name: str, fallback: str) -> str:
"""Reduce an arbitrary attachment name to a safe basename.
Strips directory separators and parent refs so a crafted file name
can never escape the inbox/outbox directory.
"""
base = os.path.basename(str(name or '').replace('\\', '/').strip())
base = base.lstrip('.') or ''
# Drop anything that is not a conservative filename charset.
cleaned = ''.join(c for c in base if c.isalnum() or c in ('.', '_', '-', ' ')).strip()
cleaned = cleaned.replace(' ', '_')
return cleaned or fallback
@staticmethod
async def _component_to_bytes(component) -> tuple[bytes, str] | None:
"""Best-effort extraction of (bytes, mime) from a platform component.
Handles base64, http(s) url and local path sources. Returns None when
no payload can be resolved.
"""
import base64 as _b64
b64 = getattr(component, 'base64', None)
if b64:
data = b64
mime = 'application/octet-stream'
if isinstance(data, str) and data.startswith('data:'):
split_index = data.find(';base64,')
if split_index != -1:
mime = data[5:split_index]
data = data[split_index + 8 :]
try:
return _b64.b64decode(data), mime
except Exception:
return None
url = getattr(component, 'url', None)
if url:
try:
import httpx
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url)
resp.raise_for_status()
return resp.content, resp.headers.get('Content-Type', 'application/octet-stream')
except Exception:
return None
path = getattr(component, 'path', None)
if path:
try:
import aiofiles
async with aiofiles.open(path, 'rb') as f:
return await f.read(), 'application/octet-stream'
except Exception:
return None
return None
async def _write_files_into_sandbox(
self,
query: pipeline_query.Query,
subdir: str,
target_mount_dir: str,
files: list[tuple[str, bytes]],
) -> list[str]:
"""Write *files* (name, bytes) into the per-query directory.
Prefers a direct host-filesystem write to the bind-mounted workspace
(no size limit). Falls back to a base64-through-exec round-trip only
when the workspace is not visible on the LangBot host (E2B / remote).
Returns the list of in-sandbox paths actually written.
"""
if not files:
return []
host_dir = self._host_query_dir(subdir, query.query_id)
if host_dir is not None:
return await asyncio.to_thread(self._write_files_host, host_dir, target_mount_dir, files)
return await self._write_files_via_exec(query, target_mount_dir, files)
def _write_files_host(
self,
host_dir: str,
target_mount_dir: str,
files: list[tuple[str, bytes]],
) -> list[str]:
"""Write attachments straight onto the bind-mounted host directory.
Recreates the per-query directory from scratch so a reused query_id
(the webchat session uses small sequential ids) never inherits stale
files from an earlier turn.
"""
import shutil
shutil.rmtree(host_dir, ignore_errors=True)
os.makedirs(host_dir, exist_ok=True)
written: list[str] = []
for name, data in files:
with open(os.path.join(host_dir, name), 'wb') as fh:
fh.write(data)
written.append(f'{target_mount_dir}/{name}')
return written
async def _write_files_via_exec(
self,
query: pipeline_query.Query,
target_dir: str,
files: list[tuple[str, bytes]],
) -> list[str]:
"""Fallback: ship files into the sandbox over the exec channel.
Only used for backends without host-filesystem access (E2B / remote).
Each file is base64-decoded inside the sandbox. Files larger than the
conservative exec cap are skipped (ARG_MAX / stdout limits).
"""
import base64 as _b64
import json as _json
manifest = []
for name, data in files:
if len(data) > self._EXEC_FALLBACK_MAX_BYTES:
self.ap.logger.warning(
f'Attachment "{name}" ({len(data)} bytes) exceeds the exec-channel '
f'fallback limit ({self._EXEC_FALLBACK_MAX_BYTES} bytes); skipping. '
f'Configure a host-shared workspace to transfer large files.'
)
continue
manifest.append({'name': name, 'b64': _b64.b64encode(data).decode('ascii')})
if not manifest:
return []
manifest_b64 = _b64.b64encode(_json.dumps(manifest).encode('utf-8')).decode('ascii')
script = (
'import base64, json, os, shutil\n'
f'target = {target_dir!r}\n'
'shutil.rmtree(target, ignore_errors=True)\n'
'os.makedirs(target, exist_ok=True)\n'
f'manifest = json.loads(base64.b64decode({manifest_b64!r}))\n'
'written = []\n'
'for item in manifest:\n'
" p = os.path.join(target, item['name'])\n"
" with open(p, 'wb') as f:\n"
" f.write(base64.b64decode(item['b64']))\n"
' written.append(p)\n'
'print(json.dumps(written))\n'
)
result = await self.execute_tool(
{'command': f"python3 - <<'LBPY'\n{script}\nLBPY", 'timeout_sec': 120},
query,
)
if not result.get('ok'):
self.ap.logger.warning(
f'Failed to write inbound attachments into sandbox via exec: '
f'query_id={query.query_id} stderr={result.get("stderr", "")[:200]}'
)
return []
try:
return _json.loads(str(result.get('stdout') or '').strip().splitlines()[-1])
except Exception:
return []
async def materialize_inbound_attachments(self, query: pipeline_query.Query) -> list[dict]:
"""Persist message-chain attachments into the sandbox inbox.
Returns a list of ``{path, name, type, size}`` describing what was
written, so the runner can tell the LLM the exact in-sandbox paths.
Returns ``[]`` when sandbox is unavailable or there are no attachments.
"""
if not self._available:
return []
import langbot_plugin.api.entities.builtin.platform.message as platform_message
message_chain = getattr(query, 'message_chain', None)
if not message_chain:
return []
type_map = [
(platform_message.Image, 'Image', 'image', 'png'),
(platform_message.Voice, 'Voice', 'voice', 'wav'),
(platform_message.File, 'File', 'file', 'bin'),
]
pending: list[tuple[str, bytes]] = []
descriptors: list[dict] = []
index = 0
for component in message_chain:
matched = None
for cls, kind, prefix, default_ext in type_map:
if isinstance(component, cls):
matched = (kind, prefix, default_ext)
break
if matched is None:
continue
kind, prefix, default_ext = matched
payload = await self._component_to_bytes(component)
if payload is None:
continue
data, _mime = payload
if not data or len(data) > self._ATTACHMENT_MAX_BYTES:
continue
index += 1
raw_name = getattr(component, 'name', None) or f'{prefix}_{index}.{default_ext}'
safe_name = self._sanitize_attachment_name(raw_name, f'{prefix}_{index}.{default_ext}')
pending.append((safe_name, data))
descriptors.append(
{
'name': safe_name,
'type': kind,
'size': len(data),
}
)
if not pending:
return []
target_dir = f'{self.INBOX_MOUNT_DIR}/{query.query_id}'
written = await self._write_files_into_sandbox(query, self.INBOX_SUBDIR, target_dir, pending)
written_basenames = {os.path.basename(p) for p in written}
result: list[dict] = []
for desc in descriptors:
if desc['name'] in written_basenames:
desc['path'] = f'{target_dir}/{desc["name"]}'
result.append(desc)
if result:
self.ap.logger.info(
f'Materialized {len(result)} inbound attachment(s) into sandbox: '
f'query_id={query.query_id} dir={target_dir}'
)
return result
async def collect_outbound_attachments(self, query: pipeline_query.Query) -> list[dict]:
"""Collect files the agent produced in the sandbox outbox.
Reads ``/workspace/outbox/<query_id>/`` (recursively) — directly from
the bind-mounted host directory when available (no size limit), else
via the exec channel — returns a list of ``{type, name, base64}``
ready to become platform message components, then clears the outbox so
a later turn in the same session does not re-send stale files. Returns
``[]`` when nothing was produced.
"""
if not self._available:
return []
host_dir = self._host_query_dir(self.OUTBOX_SUBDIR, query.query_id)
if host_dir is not None:
entries = await asyncio.to_thread(self._read_outbox_host, host_dir)
else:
entries = await self._read_outbox_via_exec(query)
attachments = self._classify_outbound_entries(entries)
# Always clear the per-query outbox after reading — even when nothing
# was collected — so a later turn that reuses the same query_id (the
# counter resets across restarts) never inherits stale files.
await self._clear_outbox(query, host_dir)
if attachments:
self.ap.logger.info(
f'Collected {len(attachments)} outbound attachment(s) from sandbox: query_id={query.query_id}'
)
return attachments
def _read_outbox_host(self, host_dir: str) -> list[dict]:
"""Read outbox files straight off the bind-mounted host directory."""
import base64 as _b64
entries: list[dict] = []
if not os.path.isdir(host_dir):
return entries
for root, _dirs, names in os.walk(host_dir):
for name in sorted(names):
path = os.path.join(root, name)
try:
if os.path.getsize(path) > self._ATTACHMENT_MAX_BYTES:
continue
with open(path, 'rb') as fh:
data = fh.read()
except OSError:
continue
rel = os.path.relpath(path, host_dir)
entries.append({'name': rel, 'b64': _b64.b64encode(data).decode('ascii')})
return entries
async def _read_outbox_via_exec(self, query: pipeline_query.Query) -> list[dict]:
"""Fallback: read the outbox over the exec channel (E2B / remote).
Note: exec stdout is truncated by ``output_limit_chars``, so this path
only reliably transfers small files. The host path is preferred.
"""
import json as _json
target_dir = f'{self.OUTBOX_MOUNT_DIR}/{query.query_id}'
max_bytes = self._EXEC_FALLBACK_MAX_BYTES
script = (
'import base64, json, os\n'
f'target = {target_dir!r}\n'
f'max_bytes = {max_bytes}\n'
'out = []\n'
'if os.path.isdir(target):\n'
' for root, _dirs, names in os.walk(target):\n'
' for n in sorted(names):\n'
' p = os.path.join(root, n)\n'
' try:\n'
' if os.path.getsize(p) > max_bytes:\n'
' continue\n'
" with open(p, 'rb') as f:\n"
' data = f.read()\n'
' except OSError:\n'
' continue\n'
' rel = os.path.relpath(p, target)\n'
" out.append({'name': rel, 'b64': base64.b64encode(data).decode('ascii')})\n"
'print(json.dumps(out))\n'
)
result = await self.execute_tool(
{'command': f"python3 - <<'LBPY'\n{script}\nLBPY", 'timeout_sec': 120},
query,
)
if not result.get('ok'):
return []
try:
return _json.loads(str(result.get('stdout') or '').strip().splitlines()[-1])
except Exception:
return []
async def _clear_outbox(self, query: pipeline_query.Query, host_dir: str | None) -> None:
"""Empty the per-query outbox after collection.
Tries a host-side ``rmtree`` first (fast, no container round-trip).
Outbox files are created by the sandbox container as root over the
bind-mount, so when LangBot runs as a non-root user the host delete
fails silently and the files survive — they would then be re-collected
on the next turn that reuses the same query_id. So if anything survives
the host delete, clear it from *inside* the sandbox via exec, where the
container's root can remove its own files. Best-effort: never raise
into the pipeline.
"""
target_dir = f'{self.OUTBOX_MOUNT_DIR}/{query.query_id}'
if host_dir is not None:
import shutil
def _clear() -> bool:
shutil.rmtree(host_dir, ignore_errors=True)
survived = os.path.exists(host_dir) and bool(os.listdir(host_dir))
os.makedirs(host_dir, exist_ok=True)
return survived
survived = await asyncio.to_thread(_clear)
if not survived:
return
# Root-owned container files survived the host delete — fall through.
try:
await self.execute_tool(
{'command': f'rm -rf {target_dir} && mkdir -p {target_dir}', 'timeout_sec': 30},
query,
)
except Exception as exc:
self.ap.logger.warning(f'Failed to clear sandbox outbox {target_dir}: {exc}')
@staticmethod
def _classify_outbound_entries(entries: list[dict]) -> list[dict]:
"""Classify outbox files into Image/Voice/File component descriptors."""
image_exts = {'png', 'jpg', 'jpeg', 'gif', 'webp', 'bmp'}
voice_exts = {'wav', 'mp3', 'silk', 'amr', 'ogg', 'm4a', 'aac'}
mime_by_ext = {
'png': 'image/png',
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'gif': 'image/gif',
'webp': 'image/webp',
'bmp': 'image/bmp',
}
attachments: list[dict] = []
for entry in entries or []:
name = str(entry.get('name', '') or '')
b64 = entry.get('b64')
if not name or not b64:
continue
ext = name.rsplit('.', 1)[-1].lower() if '.' in name else ''
base_name = os.path.basename(name)
if ext in image_exts:
mime = mime_by_ext.get(ext, 'image/png')
attachments.append({'type': 'Image', 'name': base_name, 'base64': f'data:{mime};base64,{b64}'})
elif ext in voice_exts:
attachments.append({'type': 'Voice', 'name': base_name, 'base64': f'data:audio/{ext};base64,{b64}'})
else:
attachments.append({'type': 'File', 'name': base_name, 'base64': b64})
return attachments
async def shutdown(self): async def shutdown(self):
await self.client.shutdown() await self.client.shutdown()
@@ -3,49 +3,6 @@ import sqlalchemy
from .base import Base from .base import Base
class MonitoringTrace(Base):
"""End-to-end monitoring trace records"""
__tablename__ = 'monitoring_traces'
trace_id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True)
started_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True)
ended_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True, index=True)
duration = sqlalchemy.Column(sqlalchemy.Integer, nullable=True) # milliseconds
status = sqlalchemy.Column(sqlalchemy.String(50), nullable=False, index=True) # running, success, error
name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
bot_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
pipeline_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
pipeline_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
message_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
query_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
attributes = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
class MonitoringSpan(Base):
"""Trace span records for pipeline, RAG, model, plugin and tool operations"""
__tablename__ = 'monitoring_spans'
span_id = sqlalchemy.Column(sqlalchemy.String(255), primary_key=True)
trace_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=False, index=True)
parent_span_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
name = sqlalchemy.Column(sqlalchemy.String(255), nullable=False)
kind = sqlalchemy.Column(sqlalchemy.String(80), nullable=False, index=True)
status = sqlalchemy.Column(sqlalchemy.String(50), nullable=False, index=True)
started_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=False, index=True)
ended_at = sqlalchemy.Column(sqlalchemy.DateTime, nullable=True)
duration = sqlalchemy.Column(sqlalchemy.Integer, nullable=True) # milliseconds
message_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
session_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
bot_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
pipeline_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True, index=True)
attributes = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
error_message = sqlalchemy.Column(sqlalchemy.Text, nullable=True)
class MonitoringMessage(Base): class MonitoringMessage(Base):
"""Monitoring message records""" """Monitoring message records"""
@@ -1,88 +0,0 @@
"""add monitoring traces and spans
Revision ID: 0006_monitoring_traces
Revises: 0005_add_llm_context_length
Create Date: 2026-06-16
"""
import sqlalchemy as sa
from alembic import op
revision = '0006_monitoring_traces'
down_revision = '0005_add_llm_context_length'
branch_labels = None
depends_on = None
def upgrade() -> None:
conn = op.get_bind()
inspector = sa.inspect(conn)
tables = set(inspector.get_table_names())
if 'monitoring_traces' not in tables:
op.create_table(
'monitoring_traces',
sa.Column('trace_id', sa.String(length=255), nullable=False),
sa.Column('started_at', sa.DateTime(), nullable=False),
sa.Column('ended_at', sa.DateTime(), nullable=True),
sa.Column('duration', sa.Integer(), nullable=True),
sa.Column('status', sa.String(length=50), nullable=False),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('bot_id', sa.String(length=255), nullable=True),
sa.Column('bot_name', sa.String(length=255), nullable=True),
sa.Column('pipeline_id', sa.String(length=255), nullable=True),
sa.Column('pipeline_name', sa.String(length=255), nullable=True),
sa.Column('session_id', sa.String(length=255), nullable=True),
sa.Column('message_id', sa.String(length=255), nullable=True),
sa.Column('query_id', sa.String(length=255), nullable=True),
sa.Column('attributes', sa.Text(), nullable=True),
sa.PrimaryKeyConstraint('trace_id'),
)
op.create_index('ix_monitoring_traces_started_at', 'monitoring_traces', ['started_at'])
op.create_index('ix_monitoring_traces_ended_at', 'monitoring_traces', ['ended_at'])
op.create_index('ix_monitoring_traces_status', 'monitoring_traces', ['status'])
op.create_index('ix_monitoring_traces_bot_id', 'monitoring_traces', ['bot_id'])
op.create_index('ix_monitoring_traces_pipeline_id', 'monitoring_traces', ['pipeline_id'])
op.create_index('ix_monitoring_traces_session_id', 'monitoring_traces', ['session_id'])
op.create_index('ix_monitoring_traces_message_id', 'monitoring_traces', ['message_id'])
op.create_index('ix_monitoring_traces_query_id', 'monitoring_traces', ['query_id'])
if 'monitoring_spans' not in tables:
op.create_table(
'monitoring_spans',
sa.Column('span_id', sa.String(length=255), nullable=False),
sa.Column('trace_id', sa.String(length=255), nullable=False),
sa.Column('parent_span_id', sa.String(length=255), nullable=True),
sa.Column('name', sa.String(length=255), nullable=False),
sa.Column('kind', sa.String(length=80), nullable=False),
sa.Column('status', sa.String(length=50), nullable=False),
sa.Column('started_at', sa.DateTime(), nullable=False),
sa.Column('ended_at', sa.DateTime(), nullable=True),
sa.Column('duration', sa.Integer(), nullable=True),
sa.Column('message_id', sa.String(length=255), nullable=True),
sa.Column('session_id', sa.String(length=255), nullable=True),
sa.Column('bot_id', sa.String(length=255), nullable=True),
sa.Column('pipeline_id', sa.String(length=255), nullable=True),
sa.Column('attributes', sa.Text(), nullable=True),
sa.Column('error_message', sa.Text(), nullable=True),
sa.PrimaryKeyConstraint('span_id'),
)
op.create_index('ix_monitoring_spans_trace_id', 'monitoring_spans', ['trace_id'])
op.create_index('ix_monitoring_spans_parent_span_id', 'monitoring_spans', ['parent_span_id'])
op.create_index('ix_monitoring_spans_kind', 'monitoring_spans', ['kind'])
op.create_index('ix_monitoring_spans_status', 'monitoring_spans', ['status'])
op.create_index('ix_monitoring_spans_started_at', 'monitoring_spans', ['started_at'])
op.create_index('ix_monitoring_spans_message_id', 'monitoring_spans', ['message_id'])
op.create_index('ix_monitoring_spans_session_id', 'monitoring_spans', ['session_id'])
op.create_index('ix_monitoring_spans_bot_id', 'monitoring_spans', ['bot_id'])
op.create_index('ix_monitoring_spans_pipeline_id', 'monitoring_spans', ['pipeline_id'])
def downgrade() -> None:
conn = op.get_bind()
inspector = sa.inspect(conn)
tables = set(inspector.get_table_names())
if 'monitoring_spans' in tables:
op.drop_table('monitoring_spans')
if 'monitoring_traces' in tables:
op.drop_table('monitoring_traces')
+26 -155
View File
@@ -2,9 +2,6 @@ from __future__ import annotations
import typing import typing
import traceback import traceback
import time
import uuid
import datetime
import sqlalchemy import sqlalchemy
@@ -82,19 +79,6 @@ class RuntimePipeline:
enable_all_plugins: bool enable_all_plugins: bool
"""是否启用所有插件""" """是否启用所有插件"""
@staticmethod
def _new_span_id() -> str:
return f'span-{uuid.uuid4().hex[:16]}'
@staticmethod
def _utc_now() -> datetime.datetime:
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
@staticmethod
def _query_session_id(query: pipeline_query.Query) -> str:
launcher_type = query.launcher_type.value if hasattr(query.launcher_type, 'value') else str(query.launcher_type)
return f'{launcher_type}_{query.launcher_id}'
enable_all_mcp_servers: bool enable_all_mcp_servers: bool
"""是否启用所有MCP服务器""" """是否启用所有MCP服务器"""
@@ -250,102 +234,44 @@ class RuntimePipeline:
stage_container = self.stage_containers[i] stage_container = self.stage_containers[i]
query.current_stage_name = stage_container.inst_name # 标记到 Query 对象里 query.current_stage_name = stage_container.inst_name # 标记到 Query 对象里
span_started_at = self._utc_now()
span_started = time.perf_counter()
span_status = 'success'
span_error = None
span_result_type = None
try: result = stage_container.inst.process(query, stage_container.inst_name)
result = stage_container.inst.process(query, stage_container.inst_name)
if isinstance(result, typing.Coroutine): if isinstance(result, typing.Coroutine):
result = await result result = await result
if isinstance(result, pipeline_entities.StageProcessResult): # 直接返回结果 if isinstance(result, pipeline_entities.StageProcessResult): # 直接返回结果
span_result_type = str( self.ap.logger.debug(
result.result_type.value if hasattr(result.result_type, 'value') else result.result_type f'Stage {stage_container.inst_name} processed query {query.query_id} res {result.result_type}'
) )
await self._check_output(query, result)
if result.result_type == pipeline_entities.ResultType.INTERRUPT:
self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}')
break
elif result.result_type == pipeline_entities.ResultType.CONTINUE:
query = result.new_query
elif isinstance(result, typing.AsyncGenerator): # 生成器
self.ap.logger.debug(f'Stage {stage_container.inst_name} processed query {query.query_id} gen')
async for sub_result in result:
self.ap.logger.debug( self.ap.logger.debug(
f'Stage {stage_container.inst_name} processed query {query.query_id} res {result.result_type}' f'Stage {stage_container.inst_name} processed query {query.query_id} res {sub_result.result_type}'
) )
await self._check_output(query, result) await self._check_output(query, sub_result)
if result.error_notice:
span_status = 'error'
span_error = result.error_notice
if result.result_type == pipeline_entities.ResultType.INTERRUPT: if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT:
self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}') self.ap.logger.debug(f'Stage {stage_container.inst_name} interrupted query {query.query_id}')
break break
elif result.result_type == pipeline_entities.ResultType.CONTINUE: elif sub_result.result_type == pipeline_entities.ResultType.CONTINUE:
query = result.new_query query = sub_result.new_query
elif isinstance(result, typing.AsyncGenerator): # 生成器 await self._execute_from_stage(i + 1, query)
span_result_type = 'generator' break
self.ap.logger.debug(f'Stage {stage_container.inst_name} processed query {query.query_id} gen')
async for sub_result in result:
span_result_type = str(
sub_result.result_type.value
if hasattr(sub_result.result_type, 'value')
else sub_result.result_type
)
self.ap.logger.debug(
f'Stage {stage_container.inst_name} processed query {query.query_id} res {sub_result.result_type}'
)
await self._check_output(query, sub_result)
if sub_result.error_notice:
span_status = 'error'
span_error = sub_result.error_notice
if sub_result.result_type == pipeline_entities.ResultType.INTERRUPT:
self.ap.logger.debug(
f'Stage {stage_container.inst_name} interrupted query {query.query_id}'
)
break
elif sub_result.result_type == pipeline_entities.ResultType.CONTINUE:
query = sub_result.new_query
await self._execute_from_stage(i + 1, query)
break
except Exception as e:
span_status = 'error'
span_error = str(e)
raise
finally:
trace_id = (query.variables or {}).get('_monitoring_trace_id')
root_span_id = (query.variables or {}).get('_monitoring_root_span_id')
if trace_id:
try:
await self.ap.monitoring_service.record_span(
trace_id=trace_id,
parent_span_id=root_span_id,
name=stage_container.inst_name,
kind='pipeline.stage',
status=span_status,
started_at=span_started_at,
duration=int((time.perf_counter() - span_started) * 1000),
message_id=(query.variables or {}).get('_monitoring_message_id'),
session_id=self._query_session_id(query),
bot_id=query.bot_uuid,
pipeline_id=self.pipeline_entity.uuid,
attributes={
'stage_class': stage_container.inst.__class__.__name__,
'result_type': span_result_type,
'query_id': query.query_id,
},
error_message=span_error,
)
except Exception as monitor_err:
self.ap.logger.error(f'Failed to record stage span: {monitor_err}')
i += 1 i += 1
async def process_query(self, query: pipeline_query.Query): async def process_query(self, query: pipeline_query.Query):
"""处理请求""" """处理请求"""
trace_started_at = self._utc_now()
trace_started = time.perf_counter()
root_span_id = self._new_span_id()
trace_id = None
trace_status = 'success'
# Get monitoring metadata # Get monitoring metadata
bot_name = query.variables.get('_monitoring_bot_name', 'Unknown') bot_name = query.variables.get('_monitoring_bot_name', 'Unknown')
pipeline_name = query.variables.get('_monitoring_pipeline_name', 'Unknown') pipeline_name = query.variables.get('_monitoring_pipeline_name', 'Unknown')
@@ -377,28 +303,6 @@ class RuntimePipeline:
except Exception as e: except Exception as e:
self.ap.logger.error(f'Failed to record query start: {e}') self.ap.logger.error(f'Failed to record query start: {e}')
try:
trace_id = await self.ap.monitoring_service.start_trace(
name='LangBot query',
bot_id=query.bot_uuid or 'unknown',
bot_name=bot_name,
pipeline_id=self.pipeline_entity.uuid,
pipeline_name=pipeline_name,
session_id=self._query_session_id(query),
message_id=message_id or None,
query_id=query.query_id,
attributes={
'launcher_type': query.launcher_type.value
if hasattr(query.launcher_type, 'value')
else str(query.launcher_type),
'runner_name': runner_name,
},
)
query.variables['_monitoring_trace_id'] = trace_id
query.variables['_monitoring_root_span_id'] = root_span_id
except Exception as e:
self.ap.logger.error(f'Failed to start query trace: {e}')
try: try:
# Get bound plugins for this pipeline # Get bound plugins for this pipeline
bound_plugins = query.variables.get('_pipeline_bound_plugins', None) bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
@@ -432,10 +336,7 @@ class RuntimePipeline:
await self._execute_from_stage(0, query) await self._execute_from_stage(0, query)
# Record query success only if no error occurred during processing # Record query success only if no error occurred during processing
has_monitoring_error = query.variables.get('_monitoring_has_error', False) if not query.variables.get('_monitoring_has_error', False):
if has_monitoring_error:
trace_status = 'error'
else:
try: try:
await monitoring_helper.MonitoringHelper.record_query_success( await monitoring_helper.MonitoringHelper.record_query_success(
ap=self.ap, ap=self.ap,
@@ -460,7 +361,6 @@ class RuntimePipeline:
self.ap.logger.error(f'Failed to record query response: {e}') self.ap.logger.error(f'Failed to record query response: {e}')
except Exception as e: except Exception as e:
trace_status = 'error'
inst_name = query.current_stage_name if query.current_stage_name else 'unknown' inst_name = query.current_stage_name if query.current_stage_name else 'unknown'
self.ap.logger.error(f'Error processing query {query.query_id} stage={inst_name} : {e}') self.ap.logger.error(f'Error processing query {query.query_id} stage={inst_name} : {e}')
self.ap.logger.error(f'Traceback: {traceback.format_exc()}') self.ap.logger.error(f'Traceback: {traceback.format_exc()}')
@@ -483,35 +383,6 @@ class RuntimePipeline:
self.ap.logger.error(f'Failed to record query error: {me}') self.ap.logger.error(f'Failed to record query error: {me}')
finally: finally:
if trace_id:
try:
duration_ms = int((time.perf_counter() - trace_started) * 1000)
await self.ap.monitoring_service.record_span(
trace_id=trace_id,
span_id=root_span_id,
name='LangBot query',
kind='pipeline.query',
status=trace_status,
started_at=trace_started_at,
duration=duration_ms,
message_id=message_id or None,
session_id=self._query_session_id(query),
bot_id=query.bot_uuid,
pipeline_id=self.pipeline_entity.uuid,
attributes={
'query_id': query.query_id,
'pipeline_name': pipeline_name,
'runner_name': runner_name,
},
)
await self.ap.monitoring_service.finish_trace(
trace_id=trace_id,
status=trace_status,
duration=duration_ms,
message_id=message_id or None,
)
except Exception as monitor_err:
self.ap.logger.error(f'Failed to finish query trace: {monitor_err}')
self.ap.logger.debug(f'Query {query.query_id} processed') self.ap.logger.debug(f'Query {query.query_id} processed')
del self.ap.query_pool.cached_queries[query.query_id] del self.ap.query_pool.cached_queries[query.query_id]
+54 -3
View File
@@ -7,6 +7,7 @@ from .. import stage
import langbot_plugin.api.entities.builtin.platform.message as platform_message import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.events as events import langbot_plugin.api.entities.events as events
@@ -23,6 +24,50 @@ class ResponseWrapper(stage.PipelineStage):
async def initialize(self, pipeline_config: dict): async def initialize(self, pipeline_config: dict):
pass pass
def _is_final_assistant_message(self, result) -> bool:
"""Whether *result* is the agent's final, tool-call-free answer.
Intermediate streaming chunks and tool-call rounds must NOT trigger
outbound attachment collection only the terminal assistant message.
"""
if getattr(result, 'role', None) != 'assistant':
return False
if result.tool_calls:
return False
if isinstance(result, provider_message.MessageChunk):
return bool(result.is_final)
return True
async def _append_outbound_attachments(
self,
query: pipeline_query.Query,
message_chain: platform_message.MessageChain,
) -> None:
"""Collect sandbox outbox files and append them to *message_chain*.
Runs at most once per query (guarded by a query variable) and never
raises into the pipeline attachment delivery is best-effort.
"""
if query.variables.get('_sandbox_outbound_collected'):
return
box_service = getattr(self.ap, 'box_service', None)
if box_service is None or not getattr(box_service, 'available', False):
return
query.variables['_sandbox_outbound_collected'] = True
try:
attachments = await box_service.collect_outbound_attachments(query)
except Exception as e:
self.ap.logger.warning(f'Outbound attachment collection failed: {e}')
return
for att in attachments:
att_type = att.get('type')
if att_type == 'Image':
message_chain.append(platform_message.Image(base64=att['base64']))
elif att_type == 'Voice':
message_chain.append(platform_message.Voice(base64=att['base64']))
else:
message_chain.append(platform_message.File(name=att.get('name', 'file'), base64=att['base64']))
async def process( async def process(
self, self,
query: pipeline_query.Query, query: pipeline_query.Query,
@@ -83,10 +128,16 @@ class ResponseWrapper(stage.PipelineStage):
) )
else: else:
if event_ctx.event.reply_message_chain is not None: if event_ctx.event.reply_message_chain is not None:
query.resp_message_chain.append(event_ctx.event.reply_message_chain) reply_chain = event_ctx.event.reply_message_chain
else: else:
query.resp_message_chain.append(result.get_content_platform_message_chain()) reply_chain = result.get_content_platform_message_chain()
# Attach files the agent produced in the sandbox
# outbox, but only on the terminal assistant message.
if self._is_final_assistant_message(result):
await self._append_outbound_attachments(query, reply_chain)
query.resp_message_chain.append(reply_chain)
yield entities.StageProcessResult( yield entities.StageProcessResult(
result_type=entities.ResultType.CONTINUE, result_type=entities.ResultType.CONTINUE,
@@ -312,12 +312,18 @@ class WebSocketAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter)
async def _process_image_components(self, message_chain_obj: list): async def _process_image_components(self, message_chain_obj: list):
""" """
处理消息链中的图片和文件组件path转换为base64 处理消息链中的图片语音和文件组件 path 转换为 base64
Image / Voice / File components uploaded from the web client carry a
storage key in ``path``. Resolve it to a base64 data URI so downstream
stages (multimodal LLM input and the Box sandbox inbox) have a usable
payload, then drop the now-consumed storage object.
Args: Args:
message_chain_obj: 消息链对象列表 message_chain_obj: 消息链对象列表
""" """
import base64 import base64
import mimetypes
storage_mgr = self.ap.storage_mgr storage_mgr = self.ap.storage_mgr
@@ -325,31 +331,33 @@ class WebSocketAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter)
comp_type = component.get('type', '') comp_type = component.get('type', '')
comp_path = component.get('path', '') comp_path = component.get('path', '')
if not comp_path: if not comp_path or comp_type not in ('Image', 'Voice', 'File'):
continue continue
if comp_type == 'Image': try:
try: file_content = await storage_mgr.storage_provider.load(comp_path)
file_content = await storage_mgr.storage_provider.load(comp_path) base64_str = base64.b64encode(file_content).decode('utf-8')
base64_str = base64.b64encode(file_content).decode('utf-8')
file_key = comp_path lowered = comp_path.lower()
if file_key.lower().endswith(('.jpg', '.jpeg')): if comp_type == 'Image':
if lowered.endswith(('.jpg', '.jpeg')):
mime_type = 'image/jpeg' mime_type = 'image/jpeg'
elif file_key.lower().endswith('.png'): elif lowered.endswith('.gif'):
mime_type = 'image/png'
elif file_key.lower().endswith('.gif'):
mime_type = 'image/gif' mime_type = 'image/gif'
elif file_key.lower().endswith('.webp'): elif lowered.endswith('.webp'):
mime_type = 'image/webp' mime_type = 'image/webp'
else: else:
mime_type = 'image/png' mime_type = 'image/png'
elif comp_type == 'Voice':
mime_type = mimetypes.guess_type(comp_path)[0] or 'audio/wav'
else: # File
mime_type = mimetypes.guess_type(comp_path)[0] or 'application/octet-stream'
component['base64'] = f'data:{mime_type};base64,{base64_str}' component['base64'] = f'data:{mime_type};base64,{base64_str}'
await storage_mgr.storage_provider.delete(comp_path) await storage_mgr.storage_provider.delete(comp_path)
component['path'] = '' component['path'] = ''
except Exception as e: except Exception as e:
await self.logger.error(f'Failed to load image file {comp_path}: {e}') await self.logger.error(f'Failed to load {comp_type} file {comp_path}: {e}')
async def handle_websocket_message( async def handle_websocket_message(
self, self,
+1 -12
View File
@@ -711,19 +711,8 @@ class PluginRuntimeConnector(ManagedRuntimeConnector):
endpoint: str, endpoint: str,
method: str, method: str,
body: Any = None, body: Any = None,
caller: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
return await self.handler.handle_page_api( return await self.handler.handle_page_api(plugin_author, plugin_name, page_id, endpoint, method, body)
plugin_author,
plugin_name,
page_id,
endpoint,
method,
body,
caller,
headers or {},
)
async def get_debug_info(self) -> dict[str, Any]: async def get_debug_info(self) -> dict[str, Any]:
"""Get debug information including debug key and WS URL""" """Get debug information including debug key and WS URL"""
-19
View File
@@ -755,21 +755,6 @@ class RuntimeConnectionHandler(handler.Handler):
'session_name': session_name, 'session_name': session_name,
'bot_uuid': query.bot_uuid or '', 'bot_uuid': query.bot_uuid or '',
'sender_id': str(query.sender_id), 'sender_id': str(query.sender_id),
'_trace_context': {
'trace_id': query.variables.get('_monitoring_trace_id') if query.variables else None,
'parent_span_id': query.variables.get('_monitoring_root_span_id')
if query.variables
else None,
'message_id': query.variables.get('_monitoring_message_id') if query.variables else None,
'query_id': query.query_id,
'session_id': session_name,
'bot_id': query.bot_uuid or '',
'pipeline_id': query.pipeline_uuid or '',
'knowledge_base_id': kb_id,
'attributes': {
'source': 'plugin-api',
},
},
}, },
) )
results = [entry.model_dump(mode='json') for entry in entries] results = [entry.model_dump(mode='json') for entry in entries]
@@ -1026,8 +1011,6 @@ class RuntimeConnectionHandler(handler.Handler):
endpoint: str, endpoint: str,
method: str, method: str,
body: Any = None, body: Any = None,
caller: dict[str, Any] | None = None,
headers: dict[str, str] | None = None,
) -> dict[str, Any]: ) -> dict[str, Any]:
"""Forward a page API call to the plugin via runtime.""" """Forward a page API call to the plugin via runtime."""
result = await self.call_action( result = await self.call_action(
@@ -1039,8 +1022,6 @@ class RuntimeConnectionHandler(handler.Handler):
'endpoint': endpoint, 'endpoint': endpoint,
'method': method, 'method': method,
'body': body, 'body': body,
'caller': caller,
'headers': headers or {},
}, },
timeout=30, timeout=30,
) )
@@ -3,7 +3,6 @@ from __future__ import annotations
import abc import abc
import typing import typing
import time import time
import datetime
from ...core import app from ...core import app
from ...entity.persistence import model as persistence_model from ...entity.persistence import model as persistence_model
@@ -17,15 +16,6 @@ LLM_USAGE_QUERY_VARIABLE = '_llm_usage'
STREAM_USAGE_QUERY_VARIABLE = '_stream_usage' STREAM_USAGE_QUERY_VARIABLE = '_stream_usage'
def _utc_now() -> datetime.datetime:
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
def _query_session_id(query: pipeline_query.Query) -> str:
launcher_type = query.launcher_type.value if hasattr(query.launcher_type, 'value') else str(query.launcher_type)
return f'{launcher_type}_{query.launcher_id}'
def _store_llm_usage(query: pipeline_query.Query | None, usage_info: dict | None) -> None: def _store_llm_usage(query: pipeline_query.Query | None, usage_info: dict | None) -> None:
"""Store the latest provider usage on the query for upstream action handlers.""" """Store the latest provider usage on the query for upstream action handlers."""
if query is None or not usage_info: if query is None or not usage_info:
@@ -69,7 +59,6 @@ class RuntimeProvider:
"""Bridge method for invoking LLM with monitoring""" """Bridge method for invoking LLM with monitoring"""
# Start timing for monitoring # Start timing for monitoring
start_time = time.time() start_time = time.time()
span_started_at = _utc_now()
input_tokens = 0 input_tokens = 0
output_tokens = 0 output_tokens = 0
status = 'success' status = 'success'
@@ -136,30 +125,6 @@ class RuntimeProvider:
error_message=error_message, error_message=error_message,
message_id=message_id, message_id=message_id,
) )
trace_id = query.variables.get('_monitoring_trace_id') if query.variables else None
parent_span_id = query.variables.get('_monitoring_root_span_id') if query.variables else None
if trace_id:
await self.requester.ap.monitoring_service.record_span(
trace_id=trace_id,
parent_span_id=parent_span_id,
name=f'LLM {model.model_entity.name}',
kind='model.llm',
status=status,
started_at=span_started_at,
duration=duration_ms,
message_id=message_id,
session_id=_query_session_id(query),
bot_id=query.bot_uuid,
pipeline_id=query.pipeline_uuid,
attributes={
'model_name': model.model_entity.name,
'input_tokens': input_tokens,
'output_tokens': output_tokens,
'total_tokens': input_tokens + output_tokens,
'stream': False,
},
error_message=error_message,
)
except Exception as monitor_err: except Exception as monitor_err:
self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM call: {monitor_err}') self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM call: {monitor_err}')
@@ -175,7 +140,6 @@ class RuntimeProvider:
"""Bridge method for invoking LLM stream with monitoring""" """Bridge method for invoking LLM stream with monitoring"""
# Start timing for monitoring # Start timing for monitoring
start_time = time.time() start_time = time.time()
span_started_at = _utc_now()
status = 'success' status = 'success'
error_message = None error_message = None
input_tokens = 0 input_tokens = 0
@@ -240,30 +204,6 @@ class RuntimeProvider:
error_message=error_message, error_message=error_message,
message_id=message_id, message_id=message_id,
) )
trace_id = query.variables.get('_monitoring_trace_id') if query.variables else None
parent_span_id = query.variables.get('_monitoring_root_span_id') if query.variables else None
if trace_id:
await self.requester.ap.monitoring_service.record_span(
trace_id=trace_id,
parent_span_id=parent_span_id,
name=f'LLM stream {model.model_entity.name}',
kind='model.llm',
status=status,
started_at=span_started_at,
duration=duration_ms,
message_id=message_id,
session_id=_query_session_id(query),
bot_id=query.bot_uuid,
pipeline_id=query.pipeline_uuid,
attributes={
'model_name': model.model_entity.name,
'input_tokens': input_tokens,
'output_tokens': output_tokens,
'total_tokens': input_tokens + output_tokens,
'stream': True,
},
error_message=error_message,
)
except Exception as monitor_err: except Exception as monitor_err:
self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM stream call: {monitor_err}') self.requester.ap.logger.error(f'[Monitoring] Failed to record LLM stream call: {monitor_err}')
@@ -216,11 +216,22 @@ class LiteLLMRequester(requester.ProviderAPIRequester):
content = msg_dict.get('content') content = msg_dict.get('content')
if isinstance(content, list): if isinstance(content, list):
converted_parts = []
for part in content: for part in content:
if isinstance(part, dict) and part.get('type') == 'image_base64': if isinstance(part, dict) and part.get('type') == 'image_base64':
part['image_url'] = {'url': part['image_base64']} part['image_url'] = {'url': part['image_base64']}
part['type'] = 'image_url' part['type'] = 'image_url'
del part['image_base64'] del part['image_base64']
# OpenAI-compatible chat models reject non-image file parts
# (audio/document base64 or url). These originate from Voice /
# File attachments — including ones replayed from conversation
# history — and the agent already accesses their bytes via the
# sandbox. Drop them from the model payload to avoid
# "Invalid user message ... invalid content type=file_base64".
if isinstance(part, dict) and part.get('type') in ('file_base64', 'file_url'):
continue
converted_parts.append(part)
msg_dict['content'] = converted_parts
req_messages.append(msg_dict) req_messages.append(msg_dict)
+68 -15
View File
@@ -104,6 +104,68 @@ class _StreamAccumulator:
class LocalAgentRunner(runner.RequestRunner): class LocalAgentRunner(runner.RequestRunner):
"""Local agent request runner""" """Local agent request runner"""
async def _inject_inbound_attachments(
self,
query: pipeline_query.Query,
user_message: provider_message.Message,
) -> None:
"""Persist inbound attachments into the sandbox and tell the model.
No-op when the box service is unavailable or there are no attachments.
On success, appends an extra text ContentElement to the user message
listing the in-sandbox paths and the outbox convention, and stashes the
descriptors in ``query.variables['_sandbox_inbound_attachments']``.
"""
box_service = getattr(self.ap, 'box_service', None)
if box_service is None or not getattr(box_service, 'available', False):
return
try:
attachments = await box_service.materialize_inbound_attachments(query)
except Exception as e: # never break the chat turn over attachment IO
self.ap.logger.warning(f'Inbound attachment materialization failed: {e}')
return
if not attachments:
return
query.variables['_sandbox_inbound_attachments'] = attachments
lines = [
'The user sent attachments. They have been saved into the sandbox and are '
'available to the exec/read/write tools at these paths:'
]
for att in attachments:
lines.append(f'- {att["type"]}: {att["path"]} ({att["size"]} bytes)')
outbox_dir = f'{box_service.OUTBOX_MOUNT_DIR}/{query.query_id}'
lines.append(
'If you produce any file (image, audio, document, etc.) that should be sent '
f'back to the user, write it into {outbox_dir}/ (create the directory if '
'needed). Every file placed there will be delivered to the user automatically.'
)
note = '\n'.join(lines)
# Voice/File attachments are now available to the agent via the sandbox
# (exec/read/write tools). Their raw bytes must NOT be forwarded to the
# chat model as multimodal content: providers reject non-image file
# parts ("Invalid user message ... ensure all user messages are valid
# OpenAI chat completion messages"). Strip those content elements and
# rely on the sandbox-path note instead. Images are kept so vision
# models can still see them.
_model_unsafe_types = {'file_base64', 'file_url'}
if isinstance(user_message.content, list):
user_message.content = [
ce for ce in user_message.content if getattr(ce, 'type', None) not in _model_unsafe_types
]
if isinstance(user_message.content, str):
user_message.content = [
provider_message.ContentElement.from_text(user_message.content),
provider_message.ContentElement.from_text(note),
]
elif isinstance(user_message.content, list):
user_message.content.append(provider_message.ContentElement.from_text(note))
else:
user_message.content = [provider_message.ContentElement.from_text(note)]
def _build_request_messages( def _build_request_messages(
self, self,
query: pipeline_query.Query, query: pipeline_query.Query,
@@ -232,6 +294,12 @@ class LocalAgentRunner(runner.RequestRunner):
user_message = copy.deepcopy(query.user_message) user_message = copy.deepcopy(query.user_message)
# Materialize inbound attachments (images / voices / files) into the
# sandbox so the agent's exec/read/write tools can operate on the real
# bytes — not just the multimodal copy the model sees. The exact
# in-sandbox paths are announced to the model as a system note.
await self._inject_inbound_attachments(query, user_message)
user_message_text = '' user_message_text = ''
if isinstance(user_message.content, str): if isinstance(user_message.content, str):
@@ -268,21 +336,6 @@ class LocalAgentRunner(runner.RequestRunner):
'bot_uuid': query.bot_uuid or '', 'bot_uuid': query.bot_uuid or '',
'sender_id': str(query.sender_id), 'sender_id': str(query.sender_id),
'session_name': f'{query.session.launcher_type.value}_{query.session.launcher_id}', 'session_name': f'{query.session.launcher_type.value}_{query.session.launcher_id}',
'_trace_context': {
'trace_id': query.variables.get('_monitoring_trace_id') if query.variables else None,
'parent_span_id': query.variables.get('_monitoring_root_span_id')
if query.variables
else None,
'message_id': query.variables.get('_monitoring_message_id') if query.variables else None,
'query_id': query.query_id,
'session_id': f'{query.launcher_type.value}_{query.launcher_id}',
'bot_id': query.bot_uuid or '',
'pipeline_id': query.pipeline_uuid or '',
'knowledge_base_id': kb_uuid,
'attributes': {
'source': 'local-agent',
},
},
}, },
) )
+4 -123
View File
@@ -1,12 +1,10 @@
from __future__ import annotations from __future__ import annotations
import mimetypes import mimetypes
import os.path import os.path
import time
import traceback import traceback
import uuid import uuid
import zipfile import zipfile
import io import io
import datetime
from typing import Any from typing import Any
from langbot.pkg.core import app from langbot.pkg.core import app
import sqlalchemy import sqlalchemy
@@ -27,10 +25,6 @@ class RuntimeKnowledgeBase(KnowledgeBaseInterface):
super().__init__(ap) super().__init__(ap)
self.knowledge_base_entity = knowledge_base_entity self.knowledge_base_entity = knowledge_base_entity
@staticmethod
def _utc_now() -> datetime.datetime:
return datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
async def initialize(self): async def initialize(self):
pass pass
@@ -340,25 +334,6 @@ class RuntimeKnowledgeBase(KnowledgeBaseInterface):
# are passed directly to vector_search by some plugins (e.g. LangRAG) # are passed directly to vector_search by some plugins (e.g. LangRAG)
# and would cause empty results when the metadata field doesn't exist. # and would cause empty results when the metadata field doesn't exist.
filters = settings.pop('filters', {}) filters = settings.pop('filters', {})
trace_context = settings.pop('_trace_context', None)
host_span_started_at = self._utc_now()
host_span_started = time.perf_counter()
host_span_id = None
if trace_context and trace_context.get('trace_id'):
host_parent_span_id = trace_context.get('parent_span_id')
host_span_id = trace_context.get('rag_span_id') or f'span-{uuid.uuid4().hex[:16]}'
trace_context = {
'trace_id': trace_context.get('trace_id'),
'parent_span_id': host_span_id,
'host_parent_span_id': host_parent_span_id,
'message_id': trace_context.get('message_id'),
'query_id': trace_context.get('query_id'),
'session_id': trace_context.get('session_id'),
'bot_id': trace_context.get('bot_id'),
'pipeline_id': trace_context.get('pipeline_id'),
'knowledge_base_id': kb.uuid,
'attributes': trace_context.get('attributes') or {},
}
retrieval_context = { retrieval_context = {
'query': query, 'query': query,
@@ -368,107 +343,13 @@ class RuntimeKnowledgeBase(KnowledgeBaseInterface):
'creation_settings': kb.creation_settings or {}, 'creation_settings': kb.creation_settings or {},
'filters': filters, 'filters': filters,
} }
if trace_context:
retrieval_context['trace_context'] = trace_context
try: result = await self.ap.plugin_connector.call_rag_retrieve(
result = await self.ap.plugin_connector.call_rag_retrieve( plugin_id,
plugin_id, retrieval_context,
retrieval_context, )
)
except Exception as e:
if trace_context:
await self._record_rag_trace_result(
trace_context=trace_context,
host_span_id=host_span_id,
started_at=host_span_started_at,
duration=int((time.perf_counter() - host_span_started) * 1000),
plugin_id=plugin_id,
result={
'results': [],
'metadata': {
'status': 'error',
'error_message': str(e),
},
},
)
raise
if trace_context:
await self._record_rag_trace_result(
trace_context=trace_context,
host_span_id=host_span_id,
started_at=host_span_started_at,
duration=int((time.perf_counter() - host_span_started) * 1000),
plugin_id=plugin_id,
result=result,
)
return result return result
async def _record_rag_trace_result(
self,
trace_context: dict[str, Any],
host_span_id: str | None,
started_at: datetime.datetime,
duration: int,
plugin_id: str,
result: dict[str, Any],
) -> None:
"""Persist host RAG span and plugin-provided child spans."""
trace_id = trace_context.get('trace_id')
if not trace_id:
return
metadata = result.get('metadata') if isinstance(result, dict) else {}
metadata = metadata if isinstance(metadata, dict) else {}
plugin_spans = metadata.get('trace_spans') if isinstance(metadata.get('trace_spans'), list) else []
parent_span_id = trace_context.get('parent_span_id')
host_parent_span_id = trace_context.get('host_parent_span_id')
try:
await self.ap.monitoring_service.record_span(
trace_id=trace_id,
span_id=host_span_id,
parent_span_id=host_parent_span_id,
name=f'Knowledge retrieval {self.knowledge_base_entity.name}',
kind='rag.retrieval',
status=metadata.get('status', 'success'),
started_at=started_at,
duration=duration,
message_id=trace_context.get('message_id'),
session_id=trace_context.get('session_id'),
bot_id=trace_context.get('bot_id'),
pipeline_id=trace_context.get('pipeline_id'),
attributes={
'knowledge_base_id': self.knowledge_base_entity.uuid,
'knowledge_base_name': self.knowledge_base_entity.name,
'plugin_id': plugin_id,
'returned_count': len(result.get('results', []) if isinstance(result, dict) else []),
'total_found': result.get('total_found') if isinstance(result, dict) else None,
},
error_message=metadata.get('error_message'),
)
for span in plugin_spans:
if not isinstance(span, dict):
continue
await self.ap.monitoring_service.record_span(
trace_id=trace_id,
span_id=span.get('span_id'),
parent_span_id=span.get('parent_span_id') or host_span_id or parent_span_id,
name=span.get('name') or 'RAG plugin stage',
kind=span.get('kind') or 'rag.stage',
status=span.get('status') or 'success',
started_at=started_at,
duration=span.get('duration_ms'),
message_id=trace_context.get('message_id'),
session_id=trace_context.get('session_id'),
bot_id=trace_context.get('bot_id'),
pipeline_id=trace_context.get('pipeline_id'),
attributes=span.get('attributes') if isinstance(span.get('attributes'), dict) else {},
error_message=span.get('error_message'),
)
except Exception as e:
self.ap.logger.error(f'Failed to record RAG trace spans: {e}')
async def _delete_document(self, document_id: str) -> bool: async def _delete_document(self, document_id: str) -> bool:
"""Call plugin to delete document.""" """Call plugin to delete document."""
kb = self.knowledge_base_entity kb = self.knowledge_base_entity
-65
View File
@@ -8,7 +8,6 @@ Run: uv run pytest tests/integration/api/test_monitoring.py -q
from __future__ import annotations from __future__ import annotations
import datetime
import pytest import pytest
from unittest.mock import MagicMock, AsyncMock, Mock from unittest.mock import MagicMock, AsyncMock, Mock
@@ -83,15 +82,6 @@ def fake_monitoring_app():
app.monitoring_service.get_messages = AsyncMock(return_value=([{'id': 'msg-1', 'content': 'test'}], 100)) app.monitoring_service.get_messages = AsyncMock(return_value=([{'id': 'msg-1', 'content': 'test'}], 100))
app.monitoring_service.get_llm_calls = AsyncMock(return_value=([{'id': 'llm-1'}], 50)) app.monitoring_service.get_llm_calls = AsyncMock(return_value=([{'id': 'llm-1'}], 50))
app.monitoring_service.get_embedding_calls = AsyncMock(return_value=([{'id': 'emb-1'}], 10)) app.monitoring_service.get_embedding_calls = AsyncMock(return_value=([{'id': 'emb-1'}], 10))
app.monitoring_service.get_traces = AsyncMock(return_value=([{'trace_id': 'trace-1'}], 1))
app.monitoring_service.get_trace_details = AsyncMock(
side_effect=lambda trace_id: {
'found': trace_id == 'trace-1',
'trace_id': trace_id,
'trace': {'trace_id': trace_id} if trace_id == 'trace-1' else None,
'spans': [] if trace_id == 'trace-1' else None,
}
)
app.monitoring_service.get_sessions = AsyncMock(return_value=([{'session_id': 'sess-1'}], 20)) app.monitoring_service.get_sessions = AsyncMock(return_value=([{'session_id': 'sess-1'}], 20))
app.monitoring_service.get_errors = AsyncMock(return_value=([{'id': 'err-1'}], 2)) app.monitoring_service.get_errors = AsyncMock(return_value=([{'id': 'err-1'}], 2))
app.monitoring_service.get_session_analysis = AsyncMock( app.monitoring_service.get_session_analysis = AsyncMock(
@@ -232,7 +222,6 @@ class TestMonitoringAllDataEndpoint:
assert response.status_code == 200 assert response.status_code == 200
data = await response.get_json() data = await response.get_json()
assert 'overview' in data['data'] assert 'overview' in data['data']
assert 'traces' in data['data']
@pytest.mark.usefixtures('mock_circular_import_chain') @pytest.mark.usefixtures('mock_circular_import_chain')
@@ -257,60 +246,6 @@ class TestMonitoringDetailsEndpoints:
assert response.status_code == 200 assert response.status_code == 200
@pytest.mark.asyncio
async def test_get_trace_details(self, quart_test_client):
"""GET /api/v1/monitoring/traces/{id}."""
response = await quart_test_client.get(
'/api/v1/monitoring/traces/trace-1', headers={'Authorization': 'Bearer test_token'}
)
assert response.status_code == 200
@pytest.mark.usefixtures('mock_circular_import_chain')
class TestMonitoringTraceEndpoints:
"""Tests for trace list and detail endpoints."""
@pytest.mark.asyncio
async def test_get_traces_forwards_filters(self, quart_test_client, fake_monitoring_app):
"""GET /api/v1/monitoring/traces forwards filters to service."""
response = await quart_test_client.get(
'/api/v1/monitoring/traces'
'?botId=bot-1'
'&pipelineId=pipeline-1'
'&sessionId=session-1'
'&status=success'
'&startTime=2026-01-01T00:00:00Z'
'&endTime=2026-01-02T00:00:00Z'
'&limit=25'
'&offset=5',
headers={'Authorization': 'Bearer test_token'},
)
assert response.status_code == 200
data = await response.get_json()
assert data['data']['traces'] == [{'trace_id': 'trace-1'}]
assert data['data']['total'] == 1
fake_monitoring_app.monitoring_service.get_traces.assert_awaited_with(
bot_ids=['bot-1'],
pipeline_ids=['pipeline-1'],
session_ids=['session-1'],
statuses=['success'],
start_time=datetime.datetime(2026, 1, 1, 0, 0),
end_time=datetime.datetime(2026, 1, 2, 0, 0),
limit=25,
offset=5,
)
@pytest.mark.asyncio
async def test_get_trace_details_not_found(self, quart_test_client):
"""GET /api/v1/monitoring/traces/{id} returns 404 when missing."""
response = await quart_test_client.get(
'/api/v1/monitoring/traces/trace-missing', headers={'Authorization': 'Bearer test_token'}
)
assert response.status_code == 404
@pytest.mark.usefixtures('mock_circular_import_chain') @pytest.mark.usefixtures('mock_circular_import_chain')
class TestMonitoringFeedbackEndpoints: class TestMonitoringFeedbackEndpoints:
@@ -104,7 +104,7 @@ class TestSQLiteMigrationUpgrade:
rev = await get_alembic_current(sqlite_engine) rev = await get_alembic_current(sqlite_engine)
assert rev is not None, 'Expected a revision after upgrade' assert rev is not None, 'Expected a revision after upgrade'
# Head should be the latest migration # Head should be the latest migration
assert rev.startswith('0006'), f'Expected head to be 0006_*, got {rev}' assert rev.startswith('0005'), f'Expected head to be 0005_*, got {rev}'
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_upgrade_idempotent(self, sqlite_engine): async def test_upgrade_idempotent(self, sqlite_engine):
@@ -144,8 +144,8 @@ class TestPostgreSQLMigrationUpgrade:
# Verify revision # Verify revision
rev = await get_alembic_current(postgres_engine) rev = await get_alembic_current(postgres_engine)
assert rev is not None, 'Expected a revision after upgrade' assert rev is not None, 'Expected a revision after upgrade'
# Head should be the latest migration. # Head should be the latest migration (0005 for current state)
assert rev.startswith('0006'), f'Expected head to be 0006_*, got {rev}' assert rev.startswith('0005'), f'Expected head to be 0005_*, got {rev}'
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_postgres_upgrade_idempotent(self, postgres_engine, clean_tables, clean_alembic_version): async def test_postgres_upgrade_idempotent(self, postgres_engine, clean_tables, clean_alembic_version):
@@ -1,207 +0,0 @@
"""Unit tests for MonitoringService trace observability."""
from __future__ import annotations
import datetime
from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock
import pytest
import sqlalchemy
from sqlalchemy.ext.asyncio import create_async_engine
from langbot.pkg.api.http.service.monitoring import MonitoringService
from langbot.pkg.entity.persistence.base import Base
from langbot.pkg.entity.persistence import monitoring as persistence_monitoring
pytestmark = pytest.mark.asyncio
class _SQLitePersistence:
def __init__(self, engine):
self._engine = engine
def get_db_engine(self):
return self._engine
async def execute_async(self, *args, **kwargs):
async with self._engine.connect() as conn:
result = await conn.execute(*args, **kwargs)
await conn.commit()
return result
def serialize_model(self, model, data, masked_columns=None):
masked_columns = masked_columns or []
return {
column.name: getattr(data, column.name).isoformat()
if isinstance(getattr(data, column.name), datetime.datetime)
else getattr(data, column.name)
for column in model.__table__.columns
if column.name not in masked_columns
}
@pytest.fixture
async def monitoring_service(tmp_path):
engine = create_async_engine(f'sqlite+aiosqlite:///{tmp_path / "monitoring.db"}')
async with engine.begin() as conn:
await conn.run_sync(Base.metadata.create_all)
ap = SimpleNamespace(
persistence_mgr=_SQLitePersistence(engine),
instance_config=SimpleNamespace(data={'database': {'use': 'sqlite'}}),
logger=Mock(),
)
service = MonitoringService(ap)
yield service
await engine.dispose()
async def test_trace_lifecycle_records_spans_and_returns_details(monitoring_service):
started_at = datetime.datetime(2026, 1, 1, 12, 0, 0)
ended_at = started_at + datetime.timedelta(milliseconds=125)
trace_id = await monitoring_service.start_trace(
trace_id='trace-test',
name='Pipeline query',
bot_id='bot-1',
bot_name='Bot',
pipeline_id='pipeline-1',
pipeline_name='Default',
session_id='session-1',
message_id='message-1',
query_id=42,
attributes={'source': 'unit-test'},
)
assert trace_id == 'trace-test'
root_span_id = await monitoring_service.record_span(
trace_id=trace_id,
span_id='span-root',
name='Pipeline',
kind='pipeline',
status='completed',
started_at=started_at,
ended_at=ended_at,
message_id='message-1',
session_id='session-1',
bot_id='bot-1',
pipeline_id='pipeline-1',
attributes={'stage_count': 2},
)
await monitoring_service.record_span(
trace_id=trace_id,
span_id='span-rag',
parent_span_id=root_span_id,
name='RAG retrieval',
kind='rag.retrieval',
status='failed',
started_at=started_at + datetime.timedelta(seconds=1),
duration=12.7,
attributes={'top_k': 5},
error_message='vector store timeout',
)
await monitoring_service.finish_trace(
trace_id,
status='completed',
duration=250,
message_id='message-final',
attributes={'result_type': 'reply'},
)
traces, total = await monitoring_service.get_traces(
bot_ids=['bot-1'],
pipeline_ids=['pipeline-1'],
session_ids=['session-1'],
statuses=['success'],
limit=10,
offset=0,
)
assert total == 1
assert traces[0]['trace_id'] == trace_id
assert traces[0]['status'] == 'success'
assert traces[0]['message_id'] == 'message-final'
assert traces[0]['query_id'] == '42'
assert traces[0]['attributes'] == {'result_type': 'reply'}
details = await monitoring_service.get_trace_details(trace_id)
assert details['found'] is True
assert details['trace']['trace_id'] == trace_id
assert [span['span_id'] for span in details['spans']] == ['span-root', 'span-rag']
assert details['spans'][0]['status'] == 'success'
assert details['spans'][0]['duration'] == 125
assert details['spans'][0]['attributes'] == {'stage_count': 2}
assert details['spans'][1]['status'] == 'error'
assert details['spans'][1]['duration'] == 13
assert details['spans'][1]['parent_span_id'] == 'span-root'
assert details['spans'][1]['error_message'] == 'vector store timeout'
async def test_get_trace_details_returns_not_found_for_missing_trace(monitoring_service):
details = await monitoring_service.get_trace_details('trace-missing')
assert details == {'trace_id': 'trace-missing', 'found': False}
async def test_cleanup_expired_records_includes_traces_and_spans(monitoring_service):
old_time = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None) - datetime.timedelta(days=30)
recent_time = datetime.datetime.now(datetime.timezone.utc).replace(tzinfo=None)
await monitoring_service.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringTrace),
[
{
'trace_id': 'trace-old',
'started_at': old_time,
'ended_at': old_time,
'duration': 10,
'status': 'success',
'name': 'Old trace',
},
{
'trace_id': 'trace-recent',
'started_at': recent_time,
'ended_at': recent_time,
'duration': 10,
'status': 'success',
'name': 'Recent trace',
},
],
)
await monitoring_service.ap.persistence_mgr.execute_async(
sqlalchemy.insert(persistence_monitoring.MonitoringSpan),
[
{
'span_id': 'span-old',
'trace_id': 'trace-old',
'name': 'Old span',
'kind': 'pipeline',
'status': 'success',
'started_at': old_time,
'ended_at': old_time,
},
{
'span_id': 'span-recent',
'trace_id': 'trace-recent',
'name': 'Recent span',
'kind': 'pipeline',
'status': 'success',
'started_at': recent_time,
'ended_at': recent_time,
},
],
)
monitoring_service._release_sqlite_space = AsyncMock()
deleted = await monitoring_service.cleanup_expired_records(retention_days=7, batch_size=1)
assert deleted['monitoring_traces'] == 1
assert deleted['monitoring_spans'] == 1
monitoring_service._release_sqlite_space.assert_awaited_once()
remaining = await monitoring_service.get_trace_details('trace-recent')
assert remaining['found'] is True
assert remaining['spans'][0]['span_id'] == 'span-recent'
@@ -1,111 +0,0 @@
"""Unit tests for monitoring trace HTTP routes."""
from __future__ import annotations
import datetime
from unittest.mock import AsyncMock, Mock
import pytest
import quart
from tests.factories import FakeApp
from tests.utils.import_isolation import MockLifecycleControlScope, isolated_sys_modules
pytestmark = pytest.mark.asyncio
@pytest.fixture
async def monitoring_client():
mock_app = Mock()
mock_app.Application = type('FakeMinimalApplication', (), {})
mock_entities = Mock()
mock_entities.LifecycleControlScope = MockLifecycleControlScope
clear = [
'langbot.pkg.api.http.controller.group',
'langbot.pkg.api.http.controller.groups',
'langbot.pkg.api.http.controller.groups.monitoring',
'langbot.pkg.api.http.controller.main',
]
app = FakeApp()
app.user_service = Mock()
app.user_service.verify_jwt_token = AsyncMock(return_value='test@example.com')
app.user_service.get_user_by_email = AsyncMock(return_value=Mock(email='test@example.com'))
app.monitoring_service = Mock()
app.monitoring_service.get_traces = AsyncMock(return_value=([{'trace_id': 'trace-1'}], 1))
app.monitoring_service.get_trace_details = AsyncMock(
side_effect=lambda trace_id: {
'found': trace_id == 'trace-1',
'trace_id': trace_id,
'trace': {'trace_id': trace_id} if trace_id == 'trace-1' else None,
'spans': [] if trace_id == 'trace-1' else None,
}
)
with isolated_sys_modules(
mocks={
'langbot.pkg.core.app': mock_app,
'langbot.pkg.core.entities': mock_entities,
},
clear=clear,
):
from langbot.pkg.api.http.controller.groups.monitoring import MonitoringRouterGroup
quart_app = quart.Quart(__name__)
group = MonitoringRouterGroup(app, quart_app)
await group.initialize()
yield app, quart_app.test_client()
async def test_get_traces_route_forwards_filters(monitoring_client):
app, client = monitoring_client
response = await client.get(
'/api/v1/monitoring/traces'
'?botId=bot-1'
'&pipelineId=pipeline-1'
'&sessionId=session-1'
'&status=success'
'&startTime=2026-01-01T00:00:00Z'
'&endTime=2026-01-02T00:00:00Z'
'&limit=25'
'&offset=5',
headers={'Authorization': 'Bearer test_token'},
)
assert response.status_code == 200
data = await response.get_json()
assert data['data'] == {
'traces': [{'trace_id': 'trace-1'}],
'total': 1,
'limit': 25,
'offset': 5,
}
app.monitoring_service.get_traces.assert_awaited_once_with(
bot_ids=['bot-1'],
pipeline_ids=['pipeline-1'],
session_ids=['session-1'],
statuses=['success'],
start_time=datetime.datetime(2026, 1, 1, 0, 0),
end_time=datetime.datetime(2026, 1, 2, 0, 0),
limit=25,
offset=5,
)
async def test_get_trace_details_route_returns_404_for_missing_trace(monitoring_client):
_app, client = monitoring_client
response = await client.get(
'/api/v1/monitoring/traces/trace-missing',
headers={'Authorization': 'Bearer test_token'},
)
assert response.status_code == 404
data = await response.get_json()
assert data['code'] == -1
assert data['msg'] == 'Trace trace-missing not found'
+344
View File
@@ -1556,3 +1556,347 @@ class TestBuildSkillExtraMounts:
service = BoxService(app, client=Mock(spec=BoxRuntimeClient)) service = BoxService(app, client=Mock(spec=BoxRuntimeClient))
assert service.build_skill_extra_mounts(make_query()) == [] assert service.build_skill_extra_mounts(make_query()) == []
# ── Attachment passthrough (inbound / outbound) ─────────────────────────────
class TestAttachmentHelpers:
def test_sanitize_attachment_name_strips_traversal(self):
assert BoxService._sanitize_attachment_name('../../etc/passwd', 'fb') == 'passwd'
assert BoxService._sanitize_attachment_name('/a/b/c.png', 'fb') == 'c.png'
assert BoxService._sanitize_attachment_name('a b c.txt', 'fb') == 'a_b_c.txt'
assert BoxService._sanitize_attachment_name('', 'fallback.bin') == 'fallback.bin'
assert BoxService._sanitize_attachment_name('...', 'fb.bin') == 'fb.bin'
# weird unicode / shell chars dropped, but keeps a usable name
out = BoxService._sanitize_attachment_name('rm -rf $(x).png', 'fb')
assert '/' not in out and '$' not in out and out.endswith('.png')
def test_classify_outbound_entries_by_extension(self):
entries = [
{'name': 'chart.png', 'b64': 'AAA'},
{'name': 'clip.mp3', 'b64': 'BBB'},
{'name': 'report.pdf', 'b64': 'CCC'},
{'name': 'sub/dir/photo.JPG', 'b64': 'DDD'},
{'name': 'noext', 'b64': 'EEE'},
{'name': 'skip', 'b64': ''}, # dropped (no payload)
]
out = BoxService._classify_outbound_entries(entries)
by_name = {a['name']: a for a in out}
assert by_name['chart.png']['type'] == 'Image'
assert by_name['chart.png']['base64'].startswith('data:image/png;base64,')
assert by_name['clip.mp3']['type'] == 'Voice'
assert by_name['clip.mp3']['base64'].startswith('data:audio/mp3;base64,')
assert by_name['report.pdf']['type'] == 'File'
assert by_name['report.pdf']['base64'] == 'CCC' # raw b64, no data: prefix
# nested path collapses to basename, case-insensitive ext
assert by_name['photo.JPG']['type'] == 'Image'
assert by_name['noext']['type'] == 'File'
assert 'skip' not in by_name
@pytest.mark.asyncio
async def test_component_to_bytes_from_data_uri(self):
import base64
raw = b'hello-bytes'
data_uri = 'data:text/plain;base64,' + base64.b64encode(raw).decode()
component = SimpleNamespace(base64=data_uri, url=None, path=None)
result = await BoxService._component_to_bytes(component)
assert result is not None
data, mime = result
assert data == raw
assert mime == 'text/plain'
@pytest.mark.asyncio
async def test_component_to_bytes_returns_none_when_empty(self):
component = SimpleNamespace(base64=None, url=None, path=None)
assert await BoxService._component_to_bytes(component) is None
class TestInboundOutboundRoundTrip:
def _service(self) -> BoxService:
service = BoxService(make_app(Mock()), client=Mock(spec=BoxRuntimeClient))
service._available = True
return service
@pytest.mark.asyncio
async def test_materialize_inbound_writes_and_describes(self):
import base64
import langbot_plugin.api.entities.builtin.platform.message as platform_message
service = self._service()
img_bytes = b'\x89PNG\r\n\x1a\n fake png'
img_b64 = 'data:image/png;base64,' + base64.b64encode(img_bytes).decode()
query = make_query()
query.message_chain = platform_message.MessageChain(
[
platform_message.Plain(text='please resize this'),
platform_message.Image(base64=img_b64),
]
)
# Mock the sandbox write path: echo back the written paths.
async def fake_execute_tool(parameters, q):
assert '/workspace/inbox/' in parameters['command']
return {
'ok': True,
'stdout': '["/workspace/inbox/42/image_1.png"]',
'stderr': '',
}
service.execute_tool = AsyncMock(side_effect=fake_execute_tool)
descriptors = await service.materialize_inbound_attachments(query)
assert len(descriptors) == 1
d = descriptors[0]
assert d['type'] == 'Image'
assert d['path'] == '/workspace/inbox/42/image_1.png'
assert d['size'] == len(img_bytes)
@pytest.mark.asyncio
async def test_materialize_inbound_noop_without_attachments(self):
import langbot_plugin.api.entities.builtin.platform.message as platform_message
service = self._service()
query = make_query()
query.message_chain = platform_message.MessageChain([platform_message.Plain(text='just text')])
service.execute_tool = AsyncMock()
assert await service.materialize_inbound_attachments(query) == []
service.execute_tool.assert_not_called()
@pytest.mark.asyncio
async def test_collect_outbound_reads_and_clears(self):
service = self._service()
query = make_query()
calls = []
async def fake_execute_tool(parameters, q):
calls.append(parameters['command'])
if 'os.walk' in parameters['command']:
return {
'ok': True,
'stdout': '[{"name": "out.png", "b64": "QUJD"}]',
'stderr': '',
}
# the rm -rf cleanup call
return {'ok': True, 'stdout': '', 'stderr': ''}
service.execute_tool = AsyncMock(side_effect=fake_execute_tool)
attachments = await service.collect_outbound_attachments(query)
assert len(attachments) == 1
assert attachments[0]['type'] == 'Image'
assert attachments[0]['name'] == 'out.png'
# cleanup (rm -rf) must have been issued after a successful collection
assert any('rm -rf' in c for c in calls)
@pytest.mark.asyncio
async def test_collect_outbound_empty_still_clears(self):
# An empty collection MUST still clear the per-query outbox, so a later
# turn reusing the same query_id (the counter resets across restarts)
# cannot inherit stale files left from a prior run.
service = self._service()
query = make_query()
calls = []
async def fake_execute_tool(parameters, q):
calls.append(parameters['command'])
if 'os.walk' in parameters['command']:
return {'ok': True, 'stdout': '[]', 'stderr': ''}
return {'ok': True, 'stdout': '', 'stderr': ''}
service.execute_tool = AsyncMock(side_effect=fake_execute_tool)
assert await service.collect_outbound_attachments(query) == []
# cleanup (rm -rf) is issued unconditionally now
assert any('rm -rf' in c for c in calls)
@pytest.mark.asyncio
async def test_passthrough_noop_when_unavailable(self):
service = BoxService(make_app(Mock()), client=Mock(spec=BoxRuntimeClient))
service._available = False
query = make_query()
assert await service.materialize_inbound_attachments(query) == []
assert await service.collect_outbound_attachments(query) == []
class TestAttachmentHostPath:
"""Direct host-filesystem transfer path (bind-mounted workspace).
When ``default_workspace`` is a real local dir, inbound/outbound bypass the
exec channel entirely (no ARG_MAX / stdout-truncation limits) and read/write
the bind-mounted host dir directly.
"""
def _service_with_workspace(self, tmp_path):
ws = str(tmp_path / 'box' / 'default')
os.makedirs(ws, exist_ok=True)
app = make_app(Mock(), allowed_mount_roots=[str(tmp_path)], host_root=str(tmp_path / 'box'))
service = BoxService(app, client=Mock(spec=BoxRuntimeClient))
service._available = True
# Force the default_workspace to our tmp dir so _host_query_dir resolves.
service.default_workspace = ws
return service, ws
@pytest.mark.asyncio
async def test_inbound_writes_to_host_no_exec(self, tmp_path):
import base64
import langbot_plugin.api.entities.builtin.platform.message as platform_message
service, ws = self._service_with_workspace(tmp_path)
# Big payload that would blow ARG_MAX on the exec path:
big = b'\x89PNG\r\n\x1a\n' + b'x' * (300 * 1024)
b64 = 'data:image/png;base64,' + base64.b64encode(big).decode()
query = make_query()
query.message_chain = platform_message.MessageChain([platform_message.Image(base64=b64)])
# execute_tool must NOT be called on the host path.
service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used on host path'))
descriptors = await service.materialize_inbound_attachments(query)
assert len(descriptors) == 1
d = descriptors[0]
assert d['type'] == 'Image'
assert d['size'] == len(big)
# File actually landed on the host workspace.
host_file = os.path.join(ws, 'inbox', str(query.query_id), d['name'])
assert os.path.isfile(host_file)
assert open(host_file, 'rb').read() == big
@pytest.mark.asyncio
async def test_inbound_host_clears_stale_query_dir(self, tmp_path):
import base64
import langbot_plugin.api.entities.builtin.platform.message as platform_message
service, ws = self._service_with_workspace(tmp_path)
# Seed a stale file under the same query_id (simulates webchat id reuse).
stale_dir = os.path.join(ws, 'inbox', '42')
os.makedirs(stale_dir, exist_ok=True)
open(os.path.join(stale_dir, 'image_1.png'), 'wb').write(b'STALE-OLD-IMAGE')
new = b'\x89PNG\r\n\x1a\n NEW'
b64 = 'data:image/png;base64,' + base64.b64encode(new).decode()
query = make_query(query_id=42)
query.message_chain = platform_message.MessageChain([platform_message.Image(base64=b64)])
service.execute_tool = AsyncMock()
descriptors = await service.materialize_inbound_attachments(query)
# The new write recreated the dir; the stale file is gone, new bytes present.
host_file = os.path.join(stale_dir, descriptors[0]['name'])
assert open(host_file, 'rb').read() == new
# No leftover content from the stale image.
assert b'STALE-OLD-IMAGE' not in open(host_file, 'rb').read()
@pytest.mark.asyncio
async def test_outbound_reads_host_and_clears(self, tmp_path):
service, ws = self._service_with_workspace(tmp_path)
query = make_query()
outbox = os.path.join(ws, 'outbox', str(query.query_id))
os.makedirs(outbox, exist_ok=True)
# A large file that would be truncated on the exec/stdout path:
big_png = b'\x89PNG\r\n\x1a\n' + b'y' * (400 * 1024)
open(os.path.join(outbox, 'result.png'), 'wb').write(big_png)
open(os.path.join(outbox, 'notes.txt'), 'wb').write(b'hello')
service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used on host path'))
attachments = await service.collect_outbound_attachments(query)
by_name = {a['name']: a for a in attachments}
assert by_name['result.png']['type'] == 'Image'
assert by_name['notes.txt']['type'] == 'File'
# Full image survived (no truncation).
import base64
raw = base64.b64decode(by_name['result.png']['base64'].split(',', 1)[-1])
assert raw == big_png
# Outbox cleared after collection.
assert os.listdir(outbox) == []
@pytest.mark.asyncio
async def test_outbound_empty_clears_stale_host_dir(self, tmp_path):
# Reusing a query_id (counter resets on restart) must not re-send files
# a previous run left in the outbox: an empty collection still clears it.
service, ws = self._service_with_workspace(tmp_path)
query = make_query()
outbox = os.path.join(ws, 'outbox', str(query.query_id))
os.makedirs(outbox, exist_ok=True)
# Stale file from a prior turn; the agent produced nothing this turn —
# but _read_outbox_host would still pick it up, so collection must drop
# it and then wipe the dir. Simulate "nothing produced this turn" by
# treating any present file as stale and asserting it is not re-sent
# across a second, genuinely-empty collection.
open(os.path.join(outbox, 'stale.png'), 'wb').write(b'\x89PNG\r\n\x1a\n old')
service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used on host path'))
# First collection drains + clears the dir.
first = await service.collect_outbound_attachments(query)
assert {a['name'] for a in first} == {'stale.png'}
assert os.listdir(outbox) == []
# Second collection (no new files) returns nothing and leaves a clean dir.
second = await service.collect_outbound_attachments(query)
assert second == []
assert os.listdir(outbox) == []
@pytest.mark.asyncio
async def test_purge_attachment_dirs_wipes_host_owned_leftovers_on_init(self, tmp_path):
# Leftover inbox/outbox dirs from a previous process (same reset
# query_id counter) must be removed at startup. Host-owned files are
# cleared without any sandbox exec.
service, ws = self._service_with_workspace(tmp_path)
for sub in ('inbox', 'outbox'):
d = os.path.join(ws, sub, '0')
os.makedirs(d, exist_ok=True)
open(os.path.join(d, 'leftover.bin'), 'wb').write(b'from a previous process')
service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used for host-owned files'))
await service._purge_attachment_dirs()
assert not os.path.exists(os.path.join(ws, 'inbox'))
assert not os.path.exists(os.path.join(ws, 'outbox'))
# The workspace root itself survives.
assert os.path.isdir(ws)
@pytest.mark.asyncio
async def test_purge_attachment_dirs_falls_back_to_exec_for_root_owned(self, tmp_path, monkeypatch):
# When the host delete cannot remove a dir (root-owned container output),
# purge must fall back to deleting from inside the sandbox via exec.
service, ws = self._service_with_workspace(tmp_path)
outbox = os.path.join(ws, 'outbox')
os.makedirs(os.path.join(outbox, '0'), exist_ok=True)
# Simulate a host delete that cannot remove the root-owned outbox.
import shutil as _shutil
real_rmtree = _shutil.rmtree
def fake_rmtree(path, *a, **k):
if os.path.abspath(path) == os.path.abspath(outbox):
return # "permission denied" — silently leaves the dir
return real_rmtree(path, *a, **k)
monkeypatch.setattr(_shutil, 'rmtree', fake_rmtree)
executed = {}
spec_obj = object()
service.build_spec = Mock(return_value=spec_obj)
service.client.execute = AsyncMock(side_effect=lambda s: executed.setdefault('spec', s))
await service._purge_attachment_dirs()
# build_spec was asked to rm the surviving outbox via exec.
cmd = service.build_spec.call_args.args[0]['cmd']
assert 'rm -rf' in cmd and '/workspace/outbox' in cmd
assert '/workspace/inbox' not in cmd # inbox was host-deletable
service.client.execute.assert_awaited_once_with(spec_obj)
@pytest.mark.asyncio
async def test_purge_attachment_dirs_noop_without_workspace(self):
# No bind-mounted workspace (E2B / remote): purge is a safe no-op.
service = BoxService(make_app(Mock()), client=Mock(spec=BoxRuntimeClient))
service.default_workspace = None
# Must not raise.
await service._purge_attachment_dirs()
@@ -1,87 +0,0 @@
"""Unit tests for the monitoring trace Alembic migration."""
from __future__ import annotations
from importlib import import_module
class _FakeInspector:
def __init__(self, tables):
self._tables = tables
def get_table_names(self):
return list(self._tables)
class _FakeOp:
def __init__(self):
self.created_tables = []
self.created_indexes = []
self.dropped_tables = []
def get_bind(self):
return object()
def create_table(self, table_name, *columns):
self.created_tables.append((table_name, columns))
def create_index(self, index_name, table_name, columns):
self.created_indexes.append((index_name, table_name, columns))
def drop_table(self, table_name):
self.dropped_tables.append(table_name)
def _migration_module():
return import_module('langbot.pkg.persistence.alembic.versions.0006_monitoring_traces')
def test_upgrade_creates_monitoring_trace_tables_and_indexes(monkeypatch):
migration = _migration_module()
fake_op = _FakeOp()
monkeypatch.setattr(migration, 'op', fake_op)
monkeypatch.setattr(migration.sa, 'inspect', lambda _conn: _FakeInspector(tables=set()))
migration.upgrade()
assert [table_name for table_name, _columns in fake_op.created_tables] == [
'monitoring_traces',
'monitoring_spans',
]
assert ('ix_monitoring_traces_started_at', 'monitoring_traces', ['started_at']) in fake_op.created_indexes
assert ('ix_monitoring_spans_trace_id', 'monitoring_spans', ['trace_id']) in fake_op.created_indexes
assert ('ix_monitoring_spans_pipeline_id', 'monitoring_spans', ['pipeline_id']) in fake_op.created_indexes
def test_upgrade_skips_existing_monitoring_trace_tables(monkeypatch):
migration = _migration_module()
fake_op = _FakeOp()
monkeypatch.setattr(migration, 'op', fake_op)
monkeypatch.setattr(
migration.sa,
'inspect',
lambda _conn: _FakeInspector(tables={'monitoring_traces', 'monitoring_spans'}),
)
migration.upgrade()
assert fake_op.created_tables == []
assert fake_op.created_indexes == []
def test_downgrade_drops_spans_before_traces(monkeypatch):
migration = _migration_module()
fake_op = _FakeOp()
monkeypatch.setattr(migration, 'op', fake_op)
monkeypatch.setattr(
migration.sa,
'inspect',
lambda _conn: _FakeInspector(tables={'monitoring_traces', 'monitoring_spans'}),
)
migration.downgrade()
assert fake_op.dropped_tables == ['monitoring_spans', 'monitoring_traces']
@@ -162,61 +162,3 @@ async def test_runtime_pipeline_execute(mock_app, sample_query):
# Verify stage was called # Verify stage was called
mock_stage.process.assert_called_once() mock_stage.process.assert_called_once()
@pytest.mark.asyncio
async def test_runtime_pipeline_marks_trace_error_when_stage_returns_error_notice(mock_app, sample_query):
"""Trace status follows handled stage errors, not only raised exceptions."""
pipelinemgr = get_pipelinemgr_module()
stage = get_stage_module()
persistence_pipeline = get_persistence_pipeline_module()
entities = get_entities_module()
error_result = entities.StageProcessResult(
result_type=entities.ResultType.INTERRUPT,
new_query=sample_query,
user_notice='',
console_notice='',
debug_notice='traceback',
error_notice='model request failed',
)
mock_stage = Mock(spec=stage.PipelineStage)
mock_stage.process = AsyncMock(return_value=error_result)
stage_container = pipelinemgr.StageInstContainer(inst_name='FailingStage', inst=mock_stage)
pipeline_entity = Mock(spec=persistence_pipeline.LegacyPipeline)
pipeline_entity.uuid = 'test-pipeline-uuid'
pipeline_entity.name = 'Test Pipeline'
pipeline_entity.config = sample_query.pipeline_config
pipeline_entity.extensions_preferences = {'plugins': []}
mock_app.bot_service = AsyncMock()
mock_app.bot_service.get_bot = AsyncMock(return_value={'name': 'Test Bot'})
mock_app.monitoring_service = AsyncMock()
mock_app.monitoring_service.record_message = AsyncMock(return_value='message-1')
mock_app.monitoring_service.update_session_activity = AsyncMock(return_value=True)
mock_app.monitoring_service.start_trace = AsyncMock(return_value='trace-1')
mock_app.monitoring_service.record_span = AsyncMock()
mock_app.monitoring_service.finish_trace = AsyncMock()
mock_app.monitoring_service.update_message_status = AsyncMock()
mock_app.monitoring_service.record_error = AsyncMock()
event_ctx = Mock()
event_ctx.is_prevented_default = Mock(return_value=False)
mock_app.plugin_connector.emit_event = AsyncMock(return_value=event_ctx)
mock_app.query_pool.cached_queries[sample_query.query_id] = sample_query
runtime_pipeline = pipelinemgr.RuntimePipeline(mock_app, pipeline_entity, [stage_container])
await runtime_pipeline.run(sample_query)
mock_app.monitoring_service.finish_trace.assert_awaited_once()
assert mock_app.monitoring_service.finish_trace.await_args.kwargs['status'] == 'error'
span_calls = mock_app.monitoring_service.record_span.await_args_list
stage_span_call = next(call for call in span_calls if call.kwargs['name'] == 'FailingStage')
root_span_call = next(call for call in span_calls if call.kwargs['kind'] == 'pipeline.query')
assert stage_span_call.kwargs['status'] == 'error'
assert stage_span_call.kwargs['error_message'] == 'model request failed'
assert root_span_call.kwargs['status'] == 'error'
@@ -0,0 +1,146 @@
"""Unit tests for ResponseWrapper outbound-attachment helpers.
Covers the sandbox -> user attachment path added for the Box attachment
round-trip:
* ``_is_final_assistant_message`` only the terminal, tool-call-free assistant
message (or a final MessageChunk) should trigger collection.
* ``_append_outbound_attachments`` collects sandbox outbox files exactly once
per query and maps each descriptor to the right platform component, swallowing
collection errors.
"""
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock
import pytest
import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.provider.message as provider_message
from langbot.pkg.pipeline.wrapper.wrapper import ResponseWrapper
def _make_wrapper(box_service) -> ResponseWrapper:
app = SimpleNamespace(logger=Mock())
wrapper = ResponseWrapper.__new__(ResponseWrapper)
wrapper.ap = app
return wrapper
def _make_query():
return SimpleNamespace(variables={})
def test_is_final_assistant_message_plain_assistant():
wrapper = _make_wrapper(box_service=None)
msg = provider_message.Message(role='assistant', content='done')
assert wrapper._is_final_assistant_message(msg) is True
def test_is_final_assistant_message_rejects_non_assistant():
wrapper = _make_wrapper(box_service=None)
msg = provider_message.Message(role='tool', content='{}')
assert wrapper._is_final_assistant_message(msg) is False
def test_is_final_assistant_message_rejects_tool_call_round():
wrapper = _make_wrapper(box_service=None)
msg = provider_message.Message(
role='assistant',
content='calling',
tool_calls=[
provider_message.ToolCall(
id='c1',
type='function',
function=provider_message.FunctionCall(name='exec', arguments='{}'),
)
],
)
assert wrapper._is_final_assistant_message(msg) is False
def test_is_final_assistant_message_non_final_chunk():
wrapper = _make_wrapper(box_service=None)
chunk = provider_message.MessageChunk(role='assistant', content='partial', is_final=False)
assert wrapper._is_final_assistant_message(chunk) is False
final_chunk = provider_message.MessageChunk(role='assistant', content='partial', is_final=True)
assert wrapper._is_final_assistant_message(final_chunk) is True
@pytest.mark.asyncio
async def test_append_outbound_attachments_maps_each_type():
box_service = SimpleNamespace(
available=True,
collect_outbound_attachments=AsyncMock(
return_value=[
{'type': 'Image', 'base64': 'data:image/png;base64,iVBORw0K'},
{'type': 'Voice', 'base64': 'data:audio/wav;base64,UklGRg=='},
{'type': 'File', 'name': 'report.xlsx', 'base64': 'data:app;base64,UEsDBA=='},
]
),
)
wrapper = _make_wrapper(box_service)
wrapper.ap.box_service = box_service
query = _make_query()
chain = platform_message.MessageChain([])
await wrapper._append_outbound_attachments(query, chain)
kinds = [type(c).__name__ for c in chain]
assert kinds == ['Image', 'Voice', 'File']
assert query.variables['_sandbox_outbound_collected'] is True
# File keeps its name
file_comp = chain[2]
assert getattr(file_comp, 'name', None) == 'report.xlsx'
@pytest.mark.asyncio
async def test_append_outbound_attachments_runs_once_per_query():
box_service = SimpleNamespace(
available=True,
collect_outbound_attachments=AsyncMock(return_value=[]),
)
wrapper = _make_wrapper(box_service)
wrapper.ap.box_service = box_service
query = _make_query()
query.variables['_sandbox_outbound_collected'] = True
chain = platform_message.MessageChain([])
await wrapper._append_outbound_attachments(query, chain)
box_service.collect_outbound_attachments.assert_not_awaited()
assert len(chain) == 0
@pytest.mark.asyncio
async def test_append_outbound_attachments_noop_without_box_service():
wrapper = _make_wrapper(box_service=None)
wrapper.ap.box_service = None
query = _make_query()
chain = platform_message.MessageChain([])
await wrapper._append_outbound_attachments(query, chain)
assert len(chain) == 0
# not marked collected, since service is unavailable
assert '_sandbox_outbound_collected' not in query.variables
@pytest.mark.asyncio
async def test_append_outbound_attachments_swallows_collection_error():
box_service = SimpleNamespace(
available=True,
collect_outbound_attachments=AsyncMock(side_effect=RuntimeError('boom')),
)
wrapper = _make_wrapper(box_service)
wrapper.ap.box_service = box_service
query = _make_query()
chain = platform_message.MessageChain([])
# must not raise
await wrapper._append_outbound_attachments(query, chain)
assert len(chain) == 0
wrapper.ap.logger.warning.assert_called_once()
@@ -0,0 +1,92 @@
"""Unit tests for WebSocketAdapter._process_image_components.
The web debug client uploads Image / Voice / File components carrying a storage
key in ``path``. This helper resolves each to a base64 data URI (so multimodal
LLM input and the Box sandbox inbox have usable bytes), then deletes the
consumed storage object and clears ``path``. Covers mimetype selection per
type and graceful error handling.
"""
from __future__ import annotations
import base64
from unittest.mock import AsyncMock, Mock
import pytest
from langbot.pkg.platform.sources.websocket_adapter import WebSocketAdapter
def _make_adapter(load_return=b'hello', load_side_effect=None):
provider = Mock()
provider.load = AsyncMock(return_value=load_return, side_effect=load_side_effect)
provider.delete = AsyncMock()
ap = Mock()
ap.storage_mgr.storage_provider = provider
logger = Mock()
logger.error = AsyncMock()
# WebSocketAdapter is a pydantic model; bypass full __init__/validation.
adapter = WebSocketAdapter.model_construct(ap=ap, logger=logger)
return adapter, provider
@pytest.mark.asyncio
async def test_image_jpeg_mimetype_and_cleanup():
adapter, provider = _make_adapter(load_return=b'\xff\xd8\xff')
chain = [{'type': 'Image', 'path': 'storage://abc/photo.jpg'}]
await adapter._process_image_components(chain)
expected_b64 = base64.b64encode(b'\xff\xd8\xff').decode('utf-8')
assert chain[0]['base64'] == f'data:image/jpeg;base64,{expected_b64}'
assert chain[0]['path'] == '' # consumed
provider.delete.assert_awaited_once_with('storage://abc/photo.jpg')
@pytest.mark.asyncio
async def test_image_defaults_to_png():
adapter, _ = _make_adapter()
chain = [{'type': 'Image', 'path': 'storage://abc/blob'}]
await adapter._process_image_components(chain)
assert chain[0]['base64'].startswith('data:image/png;base64,')
@pytest.mark.asyncio
async def test_voice_uses_guessed_or_wav_mimetype():
adapter, _ = _make_adapter()
chain = [{'type': 'Voice', 'path': 'storage://abc/clip.wav'}]
await adapter._process_image_components(chain)
assert chain[0]['base64'].startswith('data:audio/')
@pytest.mark.asyncio
async def test_file_uses_octet_stream_fallback():
adapter, _ = _make_adapter()
chain = [{'type': 'File', 'path': 'storage://abc/unknownblob'}]
await adapter._process_image_components(chain)
assert chain[0]['base64'].startswith('data:application/octet-stream;base64,')
@pytest.mark.asyncio
async def test_skips_components_without_path_or_unknown_type():
adapter, provider = _make_adapter()
chain = [
{'type': 'Image', 'path': ''}, # no path
{'type': 'Plain', 'path': 'storage://abc/x'}, # not a file component
{'type': 'At', 'target': '123'}, # no path key at all
]
await adapter._process_image_components(chain)
provider.load.assert_not_awaited()
assert 'base64' not in chain[0]
assert 'base64' not in chain[1]
@pytest.mark.asyncio
async def test_load_failure_is_logged_not_raised():
adapter, _ = _make_adapter(load_side_effect=RuntimeError('storage down'))
chain = [{'type': 'File', 'path': 'storage://abc/doc.pdf'}]
# must not raise
await adapter._process_image_components(chain)
assert 'base64' not in chain[0]
adapter.logger.error.assert_awaited_once()
@@ -0,0 +1,93 @@
"""Unit tests for LiteLLMRequester._convert_messages.
Focus: the content-part normalization that (a) converts image_base64 parts to
the OpenAI image_url shape and (b) drops non-image file parts (file_base64 /
file_url) which OpenAI-compatible chat models reject. The latter is essential
for Voice/File attachments including ones replayed from conversation history
since the agent consumes their bytes via the sandbox, not the model payload.
"""
import langbot_plugin.api.entities.builtin.provider.message as provider_message
from langbot.pkg.provider.modelmgr.requesters.litellmchat import LiteLLMRequester
def _make_requester() -> LiteLLMRequester:
# _convert_messages does not touch instance config, so bypass __init__.
return LiteLLMRequester.__new__(LiteLLMRequester)
def test_convert_messages_drops_file_base64_part():
req = _make_requester()
msg = provider_message.Message(
role='user',
content=[
provider_message.ContentElement.from_text('analyze this audio'),
provider_message.ContentElement.from_file_base64('data:audio/wav;base64,AAAA', 'voice.wav'),
],
)
out = req._convert_messages([msg])
parts = out[0]['content']
types = [p.get('type') for p in parts]
assert 'file_base64' not in types
assert types == ['text']
assert parts[0]['text'] == 'analyze this audio'
def test_convert_messages_drops_file_url_part():
req = _make_requester()
msg = provider_message.Message(
role='user',
content=[
provider_message.ContentElement.from_text('here is a doc'),
provider_message.ContentElement.from_file_url('http://example.com/report.xlsx', 'report.xlsx'),
],
)
out = req._convert_messages([msg])
types = [p.get('type') for p in out[0]['content']]
assert types == ['text']
def test_convert_messages_keeps_image_and_converts_to_image_url():
req = _make_requester()
msg = provider_message.Message(
role='user',
content=[
provider_message.ContentElement.from_text('look'),
provider_message.ContentElement.from_image_base64('data:image/png;base64,AAAA'),
],
)
out = req._convert_messages([msg])
parts = out[0]['content']
types = [p.get('type') for p in parts]
# image is preserved and reshaped to the OpenAI image_url form
assert types == ['text', 'image_url']
img_part = parts[1]
assert img_part['image_url'] == {'url': 'data:image/png;base64,AAAA'}
assert 'image_base64' not in img_part
def test_convert_messages_mixed_history_strips_only_files():
req = _make_requester()
# Simulate replayed history: an old voice turn + a current text turn.
history_voice = provider_message.Message(
role='user',
content=[
provider_message.ContentElement.from_text('old audio turn'),
provider_message.ContentElement.from_file_base64('data:audio/wav;base64,BBBB', 'voice.wav'),
],
)
current = provider_message.Message(
role='user',
content=[provider_message.ContentElement.from_text('now do the csv')],
)
out = req._convert_messages([history_voice, current])
assert [p.get('type') for p in out[0]['content']] == ['text']
assert [p.get('type') for p in out[1]['content']] == ['text']
def test_convert_messages_plain_string_content_untouched():
req = _make_requester()
msg = provider_message.Message(role='user', content='just text')
out = req._convert_messages([msg])
assert out[0]['content'] == 'just text'
@@ -0,0 +1,146 @@
"""Unit tests for LocalAgentRunner._inject_inbound_attachments.
Covers the user -> sandbox attachment path added for the Box attachment
round-trip:
* materialized descriptors are stashed on the query and described to the model
via an appended text note (in-sandbox paths + outbox convention);
* non-image file parts (file_base64 / file_url) are stripped from the user
message content because OpenAI-compatible chat models reject them, while
image and text parts are kept for vision models;
* the helper is a no-op when the box service is unavailable or yields nothing,
and never raises into the chat turn on materialization failure.
"""
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock
import pytest
import langbot_plugin.api.entities.builtin.provider.message as provider_message
from langbot.pkg.provider.runners.localagent import LocalAgentRunner
def _make_runner(box_service) -> LocalAgentRunner:
runner = LocalAgentRunner.__new__(LocalAgentRunner)
runner.ap = SimpleNamespace(logger=Mock(), box_service=box_service)
return runner
def _make_query():
return SimpleNamespace(variables={}, query_id='q-123')
def _box_service(attachments):
svc = SimpleNamespace(
available=True,
OUTBOX_MOUNT_DIR='/outbox',
materialize_inbound_attachments=AsyncMock(return_value=attachments),
)
return svc
@pytest.mark.asyncio
async def test_inject_strips_file_parts_and_appends_note():
box = _box_service([{'type': 'Voice', 'path': '/inbox/q-123/voice.wav', 'size': 176000}])
runner = _make_runner(box)
query = _make_query()
user_message = provider_message.Message(
role='user',
content=[
provider_message.ContentElement.from_text('transcribe this'),
provider_message.ContentElement.from_file_base64('data:audio/wav;base64,AAAA', 'voice.wav'),
],
)
await runner._inject_inbound_attachments(query, user_message)
types = [getattr(ce, 'type', None) for ce in user_message.content]
# file_base64 dropped; text kept; sandbox-path note appended as text
assert 'file_base64' not in types
assert types.count('text') == 2
note = user_message.content[-1].text
assert '/inbox/q-123/voice.wav' in note
assert '/outbox/q-123' in note
# descriptors stashed for downstream stages
assert query.variables['_sandbox_inbound_attachments'] == box.materialize_inbound_attachments.return_value
@pytest.mark.asyncio
async def test_inject_keeps_image_parts():
box = _box_service([{'type': 'Image', 'path': '/inbox/q-123/pic.png', 'size': 1234}])
runner = _make_runner(box)
query = _make_query()
user_message = provider_message.Message(
role='user',
content=[
provider_message.ContentElement.from_text('what is this'),
provider_message.ContentElement.from_image_base64('data:image/png;base64,iVBORw0K'),
],
)
await runner._inject_inbound_attachments(query, user_message)
types = [getattr(ce, 'type', None) for ce in user_message.content]
assert 'image_base64' in types # vision part preserved
assert types[-1] == 'text' # note appended last
@pytest.mark.asyncio
async def test_inject_promotes_string_content_to_list_with_note():
box = _box_service([{'type': 'File', 'path': '/inbox/q-123/data.csv', 'size': 42}])
runner = _make_runner(box)
query = _make_query()
user_message = provider_message.Message(role='user', content='clean this csv')
await runner._inject_inbound_attachments(query, user_message)
assert isinstance(user_message.content, list)
assert [getattr(ce, 'type', None) for ce in user_message.content] == ['text', 'text']
assert user_message.content[0].text == 'clean this csv'
assert '/inbox/q-123/data.csv' in user_message.content[1].text
@pytest.mark.asyncio
async def test_inject_noop_without_box_service():
runner = _make_runner(box_service=None)
query = _make_query()
user_message = provider_message.Message(role='user', content='hello')
await runner._inject_inbound_attachments(query, user_message)
assert user_message.content == 'hello'
assert '_sandbox_inbound_attachments' not in query.variables
@pytest.mark.asyncio
async def test_inject_noop_when_no_attachments():
box = _box_service([])
runner = _make_runner(box)
query = _make_query()
user_message = provider_message.Message(role='user', content='hello')
await runner._inject_inbound_attachments(query, user_message)
assert user_message.content == 'hello'
assert '_sandbox_inbound_attachments' not in query.variables
@pytest.mark.asyncio
async def test_inject_swallows_materialization_error():
box = SimpleNamespace(
available=True,
OUTBOX_MOUNT_DIR='/outbox',
materialize_inbound_attachments=AsyncMock(side_effect=RuntimeError('disk full')),
)
runner = _make_runner(box)
query = _make_query()
user_message = provider_message.Message(role='user', content='hello')
# must not raise
await runner._inject_inbound_attachments(query, user_message)
assert user_message.content == 'hello'
runner.ap.logger.warning.assert_called_once()
-26
View File
@@ -407,32 +407,6 @@ class TestRuntimeKnowledgeBaseRetrieve:
call_args = mock_app.plugin_connector.call_rag_retrieve.call_args call_args = mock_app.plugin_connector.call_rag_retrieve.call_args
assert call_args[0][1]['retrieval_settings']['top_k'] == 5 assert call_args[0][1]['retrieval_settings']['top_k'] == 5
@pytest.mark.asyncio
async def test_retrieve_records_host_rag_duration(self, monkeypatch):
"""Test host RAG span duration is measured even if plugin omits it."""
rag_module = get_rag_module()
mock_app = create_mock_app()
mock_app.monitoring_service = AsyncMock()
mock_kb = create_mock_kb_entity()
mock_app.plugin_connector.call_rag_retrieve = AsyncMock(
return_value={'results': [], 'metadata': {'status': 'success'}}
)
monkeypatch.setattr(rag_module.time, 'perf_counter', Mock(side_effect=[10.0, 10.25]))
runtime_kb = rag_module.RuntimeKnowledgeBase(mock_app, mock_kb)
await runtime_kb._retrieve(
'query text',
{
'_trace_context': {
'trace_id': 'trace-1',
'parent_span_id': 'span-root',
}
},
)
assert mock_app.monitoring_service.record_span.await_args.kwargs['duration'] == 250
@pytest.mark.asyncio @pytest.mark.asyncio
async def test_retrieve_converts_dict_to_entry(self): async def test_retrieve_converts_dict_to_entry(self):
"""Test that dict results are converted to RetrievalResultEntry.""" """Test that dict results are converted to RetrievalResultEntry."""
@@ -5,7 +5,6 @@ import {
ModelCall, ModelCall,
LLMCall, LLMCall,
EmbeddingCall, EmbeddingCall,
MonitoringTrace,
} from '../types/monitoring'; } from '../types/monitoring';
import { backendClient } from '@/app/infra/http'; import { backendClient } from '@/app/infra/http';
import { parseUTCTimestamp } from '../utils/dateUtils'; import { parseUTCTimestamp } from '../utils/dateUtils';
@@ -264,48 +263,12 @@ export function useMonitoringData(filterState: FilterState) {
messageId: error.message_id, messageId: error.message_id,
}), }),
), ),
traces: (response.traces || []).map(
(trace: {
trace_id: string;
started_at: string;
ended_at?: string;
duration?: number;
status: string;
name: string;
bot_id?: string;
bot_name?: string;
pipeline_id?: string;
pipeline_name?: string;
session_id?: string;
message_id?: string;
query_id?: string;
attributes?: Record<string, unknown>;
}): MonitoringTrace => ({
traceId: trace.trace_id,
name: trace.name,
startedAt: parseUTCTimestamp(trace.started_at),
endedAt: trace.ended_at
? parseUTCTimestamp(trace.ended_at)
: undefined,
duration: trace.duration,
status: trace.status as 'running' | 'success' | 'error',
botId: trace.bot_id,
botName: trace.bot_name,
pipelineId: trace.pipeline_id,
pipelineName: trace.pipeline_name,
sessionId: trace.session_id,
messageId: trace.message_id,
queryId: trace.query_id,
attributes: trace.attributes || {},
}),
),
totalCount: { totalCount: {
messages: response.totalCount.messages, messages: response.totalCount.messages,
llmCalls: response.totalCount.llmCalls, llmCalls: response.totalCount.llmCalls,
embeddingCalls: response.totalCount.embeddingCalls || 0, embeddingCalls: response.totalCount.embeddingCalls || 0,
sessions: response.totalCount.sessions, sessions: response.totalCount.sessions,
errors: response.totalCount.errors, errors: response.totalCount.errors,
traces: response.totalCount.traces || 0,
}, },
}; };
+1 -297
View File
@@ -10,7 +10,6 @@ import {
MessageSquare, MessageSquare,
Sparkles, Sparkles,
CheckCircle2, CheckCircle2,
GitBranch,
} from 'lucide-react'; } from 'lucide-react';
import OverviewCards from './components/overview-cards/OverviewCards'; import OverviewCards from './components/overview-cards/OverviewCards';
import MonitoringFilters from './components/filters/MonitoringFilters'; import MonitoringFilters from './components/filters/MonitoringFilters';
@@ -23,15 +22,9 @@ import { MessageDetailsCard } from './components/MessageDetailsCard';
import { MessageContentRenderer } from './components/MessageContentRenderer'; import { MessageContentRenderer } from './components/MessageContentRenderer';
import { FeedbackStatsCards } from './components/FeedbackCard'; import { FeedbackStatsCards } from './components/FeedbackCard';
import { FeedbackList } from './components/FeedbackList'; import { FeedbackList } from './components/FeedbackList';
import { import { MessageDetails } from './types/monitoring';
MessageDetails,
TraceDetails,
MonitoringSpan,
} from './types/monitoring';
import { httpClient } from '@/app/infra/http/HttpClient'; import { httpClient } from '@/app/infra/http/HttpClient';
import { backendClient } from '@/app/infra/http';
import { LoadingSpinner, LoadingPage } from '@/components/ui/loading-spinner'; import { LoadingSpinner, LoadingPage } from '@/components/ui/loading-spinner';
import { parseUTCTimestamp } from './utils/dateUtils';
interface RawMessageData { interface RawMessageData {
id: string; id: string;
@@ -79,97 +72,6 @@ interface RawErrorData {
stack_trace: string | null; stack_trace: string | null;
} }
interface RawTraceData {
trace_id: string;
started_at: string;
ended_at?: string;
duration?: number;
status: string;
name: string;
bot_id?: string;
bot_name?: string;
pipeline_id?: string;
pipeline_name?: string;
session_id?: string;
message_id?: string;
query_id?: string;
attributes?: Record<string, unknown>;
}
interface RawSpanData {
span_id: string;
trace_id: string;
parent_span_id?: string;
name: string;
kind: string;
status: string;
started_at: string;
ended_at?: string;
duration?: number;
message_id?: string;
session_id?: string;
bot_id?: string;
pipeline_id?: string;
attributes?: Record<string, unknown>;
error_message?: string;
}
function mapTrace(raw: RawTraceData) {
return {
traceId: raw.trace_id,
name: raw.name,
startedAt: parseUTCTimestamp(raw.started_at),
endedAt: raw.ended_at ? parseUTCTimestamp(raw.ended_at) : undefined,
duration: raw.duration,
status: raw.status as 'running' | 'success' | 'error',
botId: raw.bot_id,
botName: raw.bot_name,
pipelineId: raw.pipeline_id,
pipelineName: raw.pipeline_name,
sessionId: raw.session_id,
messageId: raw.message_id,
queryId: raw.query_id,
attributes: raw.attributes || {},
};
}
function mapSpan(raw: RawSpanData): MonitoringSpan {
return {
spanId: raw.span_id,
traceId: raw.trace_id,
parentSpanId: raw.parent_span_id,
name: raw.name,
kind: raw.kind,
status: raw.status as 'running' | 'success' | 'error',
startedAt: parseUTCTimestamp(raw.started_at),
endedAt: raw.ended_at ? parseUTCTimestamp(raw.ended_at) : undefined,
duration: raw.duration,
messageId: raw.message_id,
sessionId: raw.session_id,
botId: raw.bot_id,
pipelineId: raw.pipeline_id,
attributes: raw.attributes || {},
errorMessage: raw.error_message,
};
}
function spanDepth(
span: MonitoringSpan,
spansById: Map<string, MonitoringSpan>,
) {
let depth = 0;
let current = span.parentSpanId
? spansById.get(span.parentSpanId)
: undefined;
while (current && depth < 8) {
depth += 1;
current = current.parentSpanId
? spansById.get(current.parentSpanId)
: undefined;
}
return depth;
}
function MonitoringPageContent() { function MonitoringPageContent() {
const { t } = useTranslation(); const { t } = useTranslation();
const { filterState, setSelectedBots, setSelectedPipelines, setTimeRange } = const { filterState, setSelectedBots, setSelectedPipelines, setTimeRange } =
@@ -256,13 +158,6 @@ function MonitoringPageContent() {
// State for expanded errors // State for expanded errors
const [expandedErrorId, setExpandedErrorId] = useState<string | null>(null); const [expandedErrorId, setExpandedErrorId] = useState<string | null>(null);
const [expandedTraceId, setExpandedTraceId] = useState<string | null>(null);
const [traceDetails, setTraceDetails] = useState<
Record<string, TraceDetails>
>({});
const [loadingTraceDetails, setLoadingTraceDetails] = useState<
Record<string, boolean>
>({});
// State for controlled tabs // State for controlled tabs
const [activeTab, setActiveTab] = useState<string>('messages'); const [activeTab, setActiveTab] = useState<string>('messages');
@@ -370,34 +265,6 @@ function MonitoringPageContent() {
} }
}; };
const toggleTraceExpand = async (traceId: string) => {
if (expandedTraceId === traceId) {
setExpandedTraceId(null);
return;
}
setExpandedTraceId(traceId);
if (traceDetails[traceId]) return;
setLoadingTraceDetails((prev) => ({ ...prev, [traceId]: true }));
try {
const result = await backendClient.getMonitoringTraceDetails(traceId);
setTraceDetails((prev) => ({
...prev,
[traceId]: {
traceId: result.trace_id,
found: result.found,
trace: result.trace ? mapTrace(result.trace) : undefined,
spans: (result.spans || []).map(mapSpan),
},
}));
} catch (error) {
console.error('Failed to fetch trace details:', error);
} finally {
setLoadingTraceDetails((prev) => ({ ...prev, [traceId]: false }));
}
};
return ( return (
<div className="w-full h-full overflow-y-auto overflow-x-hidden"> <div className="w-full h-full overflow-y-auto overflow-x-hidden">
{/* Filters and Refresh Button - Sticky */} {/* Filters and Refresh Button - Sticky */}
@@ -456,9 +323,6 @@ function MonitoringPageContent() {
<TabsTrigger value="tokens" className="px-6 py-2"> <TabsTrigger value="tokens" className="px-6 py-2">
{t('monitoring.tabs.tokens')} {t('monitoring.tabs.tokens')}
</TabsTrigger> </TabsTrigger>
<TabsTrigger value="traces" className="px-6 py-2">
{t('monitoring.tabs.traces')}
</TabsTrigger>
<TabsTrigger value="feedback" className="px-6 py-2"> <TabsTrigger value="feedback" className="px-6 py-2">
{t('monitoring.tabs.feedback')} {t('monitoring.tabs.feedback')}
</TabsTrigger> </TabsTrigger>
@@ -826,166 +690,6 @@ function MonitoringPageContent() {
/> />
</TabsContent> </TabsContent>
<TabsContent value="traces" className="p-6 m-0">
<div>
{loading && (
<div className="py-12 flex justify-center">
<LoadingSpinner text={t('common.loading')} />
</div>
)}
{!loading && data && data.traces && data.traces.length > 0 && (
<div className="space-y-4">
{data.traces.map((trace) => {
const details = traceDetails[trace.traceId];
const spans = details?.spans || [];
const spansById = new Map(
spans.map((span) => [span.spanId, span]),
);
const maxDuration = Math.max(
1,
...spans.map((span) => span.duration || 0),
);
return (
<div
key={trace.traceId}
className="border rounded-xl overflow-hidden transition-all duration-200"
>
<div
className="p-5 cursor-pointer hover:bg-accent transition-colors"
onClick={() => toggleTraceExpand(trace.traceId)}
>
<div className="flex items-start justify-between gap-4">
<div className="flex items-start flex-1 min-w-0">
<div className="mr-3 mt-0.5">
{expandedTraceId === trace.traceId ? (
<ChevronDown className="w-5 h-5 text-muted-foreground" />
) : (
<ChevronRight className="w-5 h-5 text-muted-foreground" />
)}
</div>
<div className="min-w-0 flex-1">
<div className="flex flex-wrap items-center gap-2 mb-2">
<span className="text-xs text-muted-foreground font-mono">
{trace.traceId}
</span>
<span
className={`text-xs px-2 py-1 rounded ${
trace.status === 'error'
? 'bg-red-100 text-red-800 dark:bg-red-900 dark:text-red-200'
: trace.status === 'running'
? 'bg-yellow-100 text-yellow-800 dark:bg-yellow-900 dark:text-yellow-200'
: 'bg-green-100 text-green-800 dark:bg-green-900 dark:text-green-200'
}`}
>
{trace.status}
</span>
</div>
<div className="font-medium text-sm text-foreground mb-1">
{trace.name}
</div>
<div className="text-xs text-muted-foreground truncate">
{trace.botName || '-'} {' '}
{trace.pipelineName || '-'}
{trace.sessionId
? ` · ${trace.sessionId}`
: ''}
</div>
</div>
</div>
<div className="flex flex-col items-end gap-1 text-xs text-muted-foreground whitespace-nowrap">
<span>{trace.startedAt.toLocaleString()}</span>
<span>{trace.duration ?? 0}ms</span>
</div>
</div>
</div>
{expandedTraceId === trace.traceId && (
<div className="border-t p-5 bg-muted">
{loadingTraceDetails[trace.traceId] && (
<div className="py-4 flex justify-center">
<LoadingSpinner size="sm" text="" />
</div>
)}
{!loadingTraceDetails[trace.traceId] && (
<div className="space-y-3">
{spans.length === 0 && (
<div className="text-sm text-muted-foreground">
{t('monitoring.traces.noSpans')}
</div>
)}
{spans.map((span) => {
const depth = spanDepth(span, spansById);
const width = Math.max(
6,
Math.min(
100,
((span.duration || 0) / maxDuration) *
100,
),
);
return (
<div
key={span.spanId}
className="grid grid-cols-[minmax(180px,1fr)_minmax(140px,2fr)_80px] gap-3 items-center text-xs"
>
<div
className="min-w-0"
style={{
paddingLeft: `${depth * 16}px`,
}}
>
<div className="font-medium text-foreground truncate">
{span.name}
</div>
<div className="text-muted-foreground truncate">
{span.kind}
</div>
</div>
<div className="h-7 bg-background rounded border overflow-hidden">
<div
className={`h-full ${
span.status === 'error'
? 'bg-red-500/70'
: 'bg-blue-500/70'
}`}
style={{ width: `${width}%` }}
/>
</div>
<div className="text-right text-muted-foreground">
{span.duration ?? 0}ms
</div>
{span.errorMessage && (
<div className="col-span-3 text-red-600 dark:text-red-400 bg-background rounded p-2">
{span.errorMessage}
</div>
)}
</div>
);
})}
</div>
)}
</div>
)}
</div>
);
})}
</div>
)}
{!loading &&
(!data || !data.traces || data.traces.length === 0) && (
<div className="flex flex-col items-center justify-center text-muted-foreground py-16 gap-2">
<GitBranch className="h-[3rem] w-[3rem]" />
<div className="text-sm">
{t('monitoring.traces.noTraces')}
</div>
</div>
)}
</div>
</TabsContent>
<TabsContent value="feedback" className="p-6 m-0"> <TabsContent value="feedback" className="p-6 m-0">
<div> <div>
{loading && ( {loading && (
@@ -111,48 +111,6 @@ export interface ErrorLog {
messageId?: string; messageId?: string;
} }
export interface MonitoringTrace {
traceId: string;
name: string;
startedAt: Date;
endedAt?: Date;
duration?: number;
status: 'running' | 'success' | 'error';
botId?: string;
botName?: string;
pipelineId?: string;
pipelineName?: string;
sessionId?: string;
messageId?: string;
queryId?: string;
attributes: Record<string, unknown>;
}
export interface MonitoringSpan {
spanId: string;
traceId: string;
parentSpanId?: string;
name: string;
kind: string;
status: 'success' | 'error' | 'running';
startedAt: Date;
endedAt?: Date;
duration?: number;
messageId?: string;
sessionId?: string;
botId?: string;
pipelineId?: string;
attributes: Record<string, unknown>;
errorMessage?: string;
}
export interface TraceDetails {
traceId: string;
found: boolean;
trace?: MonitoringTrace;
spans: MonitoringSpan[];
}
export interface MessageDetails { export interface MessageDetails {
messageId: string; messageId: string;
found: boolean; found: boolean;
@@ -167,7 +125,6 @@ export interface MessageDetails {
averageDurationMs: number; averageDurationMs: number;
}; };
errors: ErrorLog[]; errors: ErrorLog[];
trace?: MonitoringTrace;
} }
export interface OverviewMetrics { export interface OverviewMetrics {
@@ -246,7 +203,6 @@ export interface MonitoringData {
modelCalls: ModelCall[]; modelCalls: ModelCall[];
sessions: SessionInfo[]; sessions: SessionInfo[];
errors: ErrorLog[]; errors: ErrorLog[];
traces: MonitoringTrace[];
feedback?: FeedbackRecord[]; feedback?: FeedbackRecord[];
feedbackStats?: FeedbackStats; feedbackStats?: FeedbackStats;
totalCount: { totalCount: {
@@ -255,7 +211,6 @@ export interface MonitoringData {
embeddingCalls: number; embeddingCalls: number;
sessions: number; sessions: number;
errors: number; errors: number;
traces: number;
feedback?: number; feedback?: number;
}; };
} }
@@ -15,6 +15,7 @@ import {
At, At,
Quote, Quote,
Voice, Voice,
File as FileComponent,
Source, Source,
} from '@/app/infra/entities/message'; } from '@/app/infra/entities/message';
import { toast } from 'sonner'; import { toast } from 'sonner';
@@ -64,7 +65,12 @@ export default function DebugDialog({
const [isHovering, setIsHovering] = useState(false); const [isHovering, setIsHovering] = useState(false);
const [isConnected, setIsConnected] = useState(false); const [isConnected, setIsConnected] = useState(false);
const [selectedImages, setSelectedImages] = useState< const [selectedImages, setSelectedImages] = useState<
Array<{ file: File; preview: string; fileKey?: string }> Array<{
file: File;
preview: string;
fileKey?: string;
kind: 'image' | 'voice' | 'file';
}>
>([]); >([]);
const [isUploading, setIsUploading] = useState(false); const [isUploading, setIsUploading] = useState(false);
const [previewImageUrl, setPreviewImageUrl] = useState<string>(''); const [previewImageUrl, setPreviewImageUrl] = useState<string>('');
@@ -292,23 +298,38 @@ export default function DebugDialog({
const files = e.target.files; const files = e.target.files;
if (!files || files.length === 0) return; if (!files || files.length === 0) return;
const newImages: Array<{ file: File; preview: string }> = []; const newImages: Array<{
file: File;
preview: string;
kind: 'image' | 'voice' | 'file';
}> = [];
for (let i = 0; i < files.length; i++) { for (let i = 0; i < files.length; i++) {
const file = files[i]; const file = files[i];
if (file.type.startsWith('image/')) { if (file.type.startsWith('image/')) {
const preview = URL.createObjectURL(file); newImages.push({
newImages.push({ file, preview }); file,
preview: URL.createObjectURL(file),
kind: 'image',
});
} else if (file.type.startsWith('audio/')) {
newImages.push({ file, preview: '', kind: 'voice' });
} else {
newImages.push({ file, preview: '', kind: 'file' });
} }
} }
setSelectedImages((prev) => [...prev, ...newImages]); setSelectedImages((prev) => [...prev, ...newImages]);
// reset the input so selecting the same file again re-triggers onChange
e.target.value = '';
}; };
const handleRemoveImage = (index: number) => { const handleRemoveImage = (index: number) => {
setSelectedImages((prev) => { setSelectedImages((prev) => {
const newImages = [...prev]; const newImages = [...prev];
URL.revokeObjectURL(newImages[index].preview); if (newImages[index].preview) {
URL.revokeObjectURL(newImages[index].preview);
}
newImages.splice(index, 1); newImages.splice(index, 1);
return newImages; return newImages;
}); });
@@ -372,19 +393,33 @@ export default function DebugDialog({
}); });
} }
// Upload images and add to message chain // Upload attachments and add to message chain
for (const image of selectedImages) { for (const attachment of selectedImages) {
try { try {
const result = await httpClient.uploadWebSocketImage( if (attachment.kind === 'image') {
selectedPipelineId, const result = await httpClient.uploadWebSocketImage(
image.file, selectedPipelineId,
); attachment.file,
messageChain.push({ );
type: 'Image', messageChain.push({
path: result.file_key, type: 'Image',
}); path: result.file_key,
});
} else {
// Voice / File go through the generic document upload endpoint,
// which returns a storage key the backend resolves into the
// sandbox inbox just like images.
const result = await httpClient.uploadDocumentFile(attachment.file);
messageChain.push({
type: attachment.kind === 'voice' ? 'Voice' : 'File',
path: result.file_id,
...(attachment.kind === 'file'
? { name: attachment.file.name }
: {}),
});
}
} catch (error) { } catch (error) {
console.error('Image upload failed:', error); console.error('Attachment upload failed:', error);
toast.error(t('pipelines.debugDialog.imageUploadFailed')); toast.error(t('pipelines.debugDialog.imageUploadFailed'));
} }
} }
@@ -393,7 +428,9 @@ export default function DebugDialog({
setInputValue(''); setInputValue('');
setHasAt(false); setHasAt(false);
setQuotedMessage(null); setQuotedMessage(null);
selectedImages.forEach((img) => URL.revokeObjectURL(img.preview)); selectedImages.forEach((img) => {
if (img.preview) URL.revokeObjectURL(img.preview);
});
setSelectedImages([]); setSelectedImages([]);
// Send message via WebSocket // Send message via WebSocket
@@ -460,13 +497,29 @@ export default function DebugDialog({
} }
case 'File': { case 'File': {
const file = component as MessageChainComponent & { name?: string }; const file = component as FileComponent;
const downloadHref = file.base64
? file.base64.startsWith('data:')
? file.base64
: `data:application/octet-stream;base64,${file.base64}`
: file.url || '';
const fileName = file.name || 'Unknown';
return ( return (
<div key={index} className="my-2 flex items-center gap-2 text-sm"> <div key={index} className="my-2 flex items-center gap-2 text-sm">
<Paperclip className="size-4" /> <Paperclip className="size-4" />
<span> {downloadHref ? (
[{t('pipelines.debugDialog.file')}] {file.name || 'Unknown'} <a
</span> href={downloadHref}
download={fileName}
className="text-primary underline hover:opacity-80"
>
[{t('pipelines.debugDialog.file')}] {fileName}
</a>
) : (
<span>
[{t('pipelines.debugDialog.file')}] {fileName}
</span>
)}
</div> </div>
); );
} }
@@ -844,17 +897,30 @@ export default function DebugDialog({
</div> </div>
)} )}
{/* Image preview area */} {/* Attachment preview area */}
{selectedImages.length > 0 && ( {selectedImages.length > 0 && (
<div className="px-4 pb-2"> <div className="px-4 pb-2">
<div className="flex gap-2 flex-wrap"> <div className="flex gap-2 flex-wrap">
{selectedImages.map((image, index) => ( {selectedImages.map((image, index) => (
<div key={index} className="relative group"> <div key={index} className="relative group">
<img {image.kind === 'image' ? (
src={image.preview} <img
alt={`preview-${index}`} src={image.preview}
className="w-20 h-20 object-cover rounded-lg border" alt={`preview-${index}`}
/> className="w-20 h-20 object-cover rounded-lg border"
/>
) : (
<div className="w-36 h-20 px-2 rounded-lg border bg-muted/40 flex items-center gap-2 overflow-hidden">
{image.kind === 'voice' ? (
<Music className="size-5 shrink-0 text-muted-foreground" />
) : (
<Paperclip className="size-5 shrink-0 text-muted-foreground" />
)}
<span className="text-xs text-muted-foreground truncate">
{image.file.name}
</span>
</div>
)}
<button <button
type="button" type="button"
onClick={() => handleRemoveImage(index)} onClick={() => handleRemoveImage(index)}
@@ -883,7 +949,7 @@ export default function DebugDialog({
<input <input
ref={fileInputRef} ref={fileInputRef}
type="file" type="file"
accept="image/*" accept="image/*,audio/*,*/*"
multiple multiple
onChange={handleImageSelect} onChange={handleImageSelect}
className="hidden" className="hidden"
@@ -64,6 +64,8 @@ export interface File extends MessageComponent {
name?: string; name?: string;
size?: number; size?: number;
url?: string; url?: string;
path?: string;
base64?: string;
} }
// Unknown component // Unknown component
-101
View File
@@ -1185,29 +1185,12 @@ export class BackendClient extends BaseHttpClient {
stack_trace?: string; stack_trace?: string;
message_id?: string; message_id?: string;
}>; }>;
traces?: Array<{
trace_id: string;
started_at: string;
ended_at?: string;
duration?: number;
status: string;
name: string;
bot_id?: string;
bot_name?: string;
pipeline_id?: string;
pipeline_name?: string;
session_id?: string;
message_id?: string;
query_id?: string;
attributes?: Record<string, unknown>;
}>;
totalCount: { totalCount: {
messages: number; messages: number;
llmCalls: number; llmCalls: number;
embeddingCalls: number; embeddingCalls: number;
sessions: number; sessions: number;
errors: number; errors: number;
traces?: number;
}; };
}> { }> {
const queryParams = new URLSearchParams(); const queryParams = new URLSearchParams();
@@ -1230,90 +1213,6 @@ export class BackendClient extends BaseHttpClient {
return this.get(`/api/v1/monitoring/data?${queryParams.toString()}`); return this.get(`/api/v1/monitoring/data?${queryParams.toString()}`);
} }
public getMonitoringTraces(params: {
botId?: string[];
pipelineId?: string[];
startTime?: string;
endTime?: string;
limit?: number;
}): Promise<{
traces: Array<{
trace_id: string;
started_at: string;
ended_at?: string;
duration?: number;
status: string;
name: string;
bot_id?: string;
bot_name?: string;
pipeline_id?: string;
pipeline_name?: string;
session_id?: string;
message_id?: string;
query_id?: string;
attributes?: Record<string, unknown>;
}>;
total: number;
}> {
const queryParams = new URLSearchParams();
if (params.botId) {
params.botId.forEach((id) => queryParams.append('botId', id));
}
if (params.pipelineId) {
params.pipelineId.forEach((id) => queryParams.append('pipelineId', id));
}
if (params.startTime) {
queryParams.append('startTime', params.startTime);
}
if (params.endTime) {
queryParams.append('endTime', params.endTime);
}
if (params.limit) {
queryParams.append('limit', params.limit.toString());
}
return this.get(`/api/v1/monitoring/traces?${queryParams.toString()}`);
}
public getMonitoringTraceDetails(traceId: string): Promise<{
trace_id: string;
found: boolean;
trace: {
trace_id: string;
started_at: string;
ended_at?: string;
duration?: number;
status: string;
name: string;
bot_id?: string;
bot_name?: string;
pipeline_id?: string;
pipeline_name?: string;
session_id?: string;
message_id?: string;
query_id?: string;
attributes?: Record<string, unknown>;
};
spans: Array<{
span_id: string;
trace_id: string;
parent_span_id?: string;
name: string;
kind: string;
status: string;
started_at: string;
ended_at?: string;
duration?: number;
message_id?: string;
session_id?: string;
bot_id?: string;
pipeline_id?: string;
attributes?: Record<string, unknown>;
error_message?: string;
}>;
}> {
return this.get(`/api/v1/monitoring/traces/${traceId}`);
}
public getMonitoringOverview(params: { public getMonitoringOverview(params: {
botId?: string[]; botId?: string[];
pipelineId?: string[]; pipelineId?: string[];
-6
View File
@@ -1217,7 +1217,6 @@ const enUS = {
embeddingCalls: 'Embedding Calls', embeddingCalls: 'Embedding Calls',
modelCalls: 'Model Calls', modelCalls: 'Model Calls',
tokens: 'Token Monitoring', tokens: 'Token Monitoring',
traces: 'Traces',
feedback: 'User Feedback', feedback: 'User Feedback',
sessions: 'Session Analysis', sessions: 'Session Analysis',
errors: 'Error Logs', errors: 'Error Logs',
@@ -1322,11 +1321,6 @@ const enUS = {
noErrors: 'No errors found', noErrors: 'No errors found',
stackTrace: 'Stack Trace', stackTrace: 'Stack Trace',
}, },
traces: {
title: 'Traces',
noTraces: 'No traces found',
noSpans: 'No spans recorded for this trace',
},
feedback: { feedback: {
title: 'User Feedback', title: 'User Feedback',
totalFeedback: 'Total Feedback', totalFeedback: 'Total Feedback',
-6
View File
@@ -1158,7 +1158,6 @@ const zhHans = {
embeddingCalls: 'Embedding调用', embeddingCalls: 'Embedding调用',
modelCalls: '模型调用', modelCalls: '模型调用',
tokens: 'Token 监控', tokens: 'Token 监控',
traces: '链路追踪',
feedback: '用户反馈', feedback: '用户反馈',
sessions: '会话分析', sessions: '会话分析',
errors: '错误日志', errors: '错误日志',
@@ -1263,11 +1262,6 @@ const zhHans = {
noErrors: '未找到错误', noErrors: '未找到错误',
stackTrace: '堆栈追踪', stackTrace: '堆栈追踪',
}, },
traces: {
title: '链路追踪',
noTraces: '未找到链路记录',
noSpans: '此链路暂无 Span 记录',
},
feedback: { feedback: {
title: '用户反馈', title: '用户反馈',
totalFeedback: '总反馈数', totalFeedback: '总反馈数',