mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-24 14:34:20 +00:00
feat(box): bidirectional attachment transfer for sandbox (#2257)
* feat(box): bidirectional attachment transfer for sandbox Materialize inbound attachments into the sandbox workspace so agents can process user-sent files, and collect agent-produced files from the outbox to attach them back to the reply. - box(service): add materialize_inbound_attachments / collect_outbound attachments. Prefer direct host-filesystem read/write on the bind-mounted workspace (no size limit), falling back to chunked exec only for non-shared backends (e2b/remote). Clear per-query inbox/outbox dirs at turn start to avoid query_id-reuse collisions. - provider(localagent): inject inbound attachment descriptors into the sandbox and append a system note telling the agent the inbox/outbox paths. - pipeline(wrapper): collect outbox files on the final stream chunk and append them as attachment components to the response chain. - web(debug-dialog): render File components with a download link when base64/url is present; add base64/path fields to the File entity. - tests: cover inbound/outbound, large-file transfer without truncation, and stale-dir clearing (86 passing). * feat(box): support voice/file attachment round-trip end-to-end Extends the bidirectional attachment transfer to audio and arbitrary files through the real webchat UI, and fixes the model-payload errors that non-image attachments triggered. - platform(websocket_adapter): resolve Voice/File component storage keys to base64 (previously only Image), so audio/documents reach the sandbox inbox. - web(debug-dialog): accept audio/* and any file in the uploader (was image-only), classify by mimetype, upload Voice/File via the documents endpoint, and render non-image staged attachments as a chip. - provider(litellmchat): drop non-image file parts (file_base64 / file_url) when building the OpenAI/LiteLLM payload. These come from Voice/File attachments — including ones replayed from conversation history — and the agent reads their bytes from the sandbox, not the model. Without this the provider rejects the request: 'invalid content type=file_base64'. - provider(localagent): also strip those parts from the current user message alongside the sandbox-path note (model-facing clarity; the requester is the real safety net for history). - tests: cover the requester strip/keep behavior (file dropped, image kept and reshaped to image_url, mixed history, plain-string content). * test(box): cover inbound/outbound attachment helpers; fix ruff format - ruff format localagent.py (CI ruff format --check was failing) - add unit tests for ResponseWrapper outbound-attachment helpers (wrapper.py 78%->98%) - add unit tests for LocalAgentRunner._inject_inbound_attachments - add unit tests for WebSocketAdapter._process_image_components (0%->covered) Lifts PR patch coverage from 68.97% to ~88% (>75% target).
This commit is contained in:
@@ -335,6 +335,428 @@ class BoxService:
|
||||
|
||||
return await self.execute_spec_payload(spec_payload, query)
|
||||
|
||||
# ── Attachment passthrough (inbound / outbound) ──────────────────
|
||||
#
|
||||
# IM/webchat attachments (images, voices, files) reach the LLM as
|
||||
# multimodal content, but historically never landed on the sandbox
|
||||
# filesystem, so the agent's exec/read/write tools could not operate on
|
||||
# them. Conversely, files the agent produced inside the sandbox were
|
||||
# never surfaced back to the user. These two helpers close both gaps:
|
||||
#
|
||||
# inbound : message_chain attachments -> /workspace/inbox/<query_id>/
|
||||
# outbound : /workspace/outbox/<query_id>/ -> reply MessageChain
|
||||
#
|
||||
# Transfer prefers DIRECT HOST FILESYSTEM access to the bind-mounted
|
||||
# workspace (default_workspace on the host maps to /workspace inside the
|
||||
# container), which has no size limit. This covers the local docker /
|
||||
# nsjail / stdio backends. For backends where the workspace is NOT visible
|
||||
# on the LangBot host (E2B, an external remote runtime.endpoint), it falls
|
||||
# back to a base64-through-exec round-trip. The exec channel can only move
|
||||
# small files reliably — the docker backend passes the command as a single
|
||||
# argv (ARG_MAX) and exec stdout is truncated by output_limit_chars — so
|
||||
# the host path is strongly preferred and used whenever available.
|
||||
|
||||
INBOX_MOUNT_DIR = '/workspace/inbox'
|
||||
OUTBOX_MOUNT_DIR = '/workspace/outbox'
|
||||
INBOX_SUBDIR = 'inbox'
|
||||
OUTBOX_SUBDIR = 'outbox'
|
||||
# Hard cap on a single attachment. The HTTP upload endpoints already cap
|
||||
# uploads at 10MiB; keep parity.
|
||||
_ATTACHMENT_MAX_BYTES = 10 * _MIB
|
||||
# Conservative cap for the exec FALLBACK path only (ARG_MAX / stdout
|
||||
# truncation). The host-filesystem path has no such limit.
|
||||
_EXEC_FALLBACK_MAX_BYTES = 256 * 1024
|
||||
|
||||
def _host_query_dir(self, subdir: str, query_id) -> str | None:
|
||||
"""Host path for ``/workspace/<subdir>/<query_id>`` when LangBot can
|
||||
access the bind-mounted workspace directly, else ``None``.
|
||||
|
||||
``default_workspace`` is the host directory bind-mounted to
|
||||
``/workspace`` for the local docker/nsjail backends and shared
|
||||
outright in stdio mode, so a file written there by LangBot is visible
|
||||
to the sandbox (and vice-versa). It is ``None`` / not a local dir for
|
||||
E2B and remote runtimes, where we must fall back to the exec channel.
|
||||
"""
|
||||
root = self.default_workspace
|
||||
if not root or not os.path.isdir(root):
|
||||
return None
|
||||
return os.path.join(root, subdir, str(query_id))
|
||||
|
||||
@staticmethod
|
||||
def _sanitize_attachment_name(name: str, fallback: str) -> str:
|
||||
"""Reduce an arbitrary attachment name to a safe basename.
|
||||
|
||||
Strips directory separators and parent refs so a crafted file name
|
||||
can never escape the inbox/outbox directory.
|
||||
"""
|
||||
base = os.path.basename(str(name or '').replace('\\', '/').strip())
|
||||
base = base.lstrip('.') or ''
|
||||
# Drop anything that is not a conservative filename charset.
|
||||
cleaned = ''.join(c for c in base if c.isalnum() or c in ('.', '_', '-', ' ')).strip()
|
||||
cleaned = cleaned.replace(' ', '_')
|
||||
return cleaned or fallback
|
||||
|
||||
@staticmethod
|
||||
async def _component_to_bytes(component) -> tuple[bytes, str] | None:
|
||||
"""Best-effort extraction of (bytes, mime) from a platform component.
|
||||
|
||||
Handles base64, http(s) url and local path sources. Returns None when
|
||||
no payload can be resolved.
|
||||
"""
|
||||
import base64 as _b64
|
||||
|
||||
b64 = getattr(component, 'base64', None)
|
||||
if b64:
|
||||
data = b64
|
||||
mime = 'application/octet-stream'
|
||||
if isinstance(data, str) and data.startswith('data:'):
|
||||
split_index = data.find(';base64,')
|
||||
if split_index != -1:
|
||||
mime = data[5:split_index]
|
||||
data = data[split_index + 8 :]
|
||||
try:
|
||||
return _b64.b64decode(data), mime
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
url = getattr(component, 'url', None)
|
||||
if url:
|
||||
try:
|
||||
import httpx
|
||||
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
resp = await client.get(url)
|
||||
resp.raise_for_status()
|
||||
return resp.content, resp.headers.get('Content-Type', 'application/octet-stream')
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
path = getattr(component, 'path', None)
|
||||
if path:
|
||||
try:
|
||||
import aiofiles
|
||||
|
||||
async with aiofiles.open(path, 'rb') as f:
|
||||
return await f.read(), 'application/octet-stream'
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
return None
|
||||
|
||||
async def _write_files_into_sandbox(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
subdir: str,
|
||||
target_mount_dir: str,
|
||||
files: list[tuple[str, bytes]],
|
||||
) -> list[str]:
|
||||
"""Write *files* (name, bytes) into the per-query directory.
|
||||
|
||||
Prefers a direct host-filesystem write to the bind-mounted workspace
|
||||
(no size limit). Falls back to a base64-through-exec round-trip only
|
||||
when the workspace is not visible on the LangBot host (E2B / remote).
|
||||
Returns the list of in-sandbox paths actually written.
|
||||
"""
|
||||
if not files:
|
||||
return []
|
||||
|
||||
host_dir = self._host_query_dir(subdir, query.query_id)
|
||||
if host_dir is not None:
|
||||
return await asyncio.to_thread(self._write_files_host, host_dir, target_mount_dir, files)
|
||||
|
||||
return await self._write_files_via_exec(query, target_mount_dir, files)
|
||||
|
||||
def _write_files_host(
|
||||
self,
|
||||
host_dir: str,
|
||||
target_mount_dir: str,
|
||||
files: list[tuple[str, bytes]],
|
||||
) -> list[str]:
|
||||
"""Write attachments straight onto the bind-mounted host directory.
|
||||
|
||||
Recreates the per-query directory from scratch so a reused query_id
|
||||
(the webchat session uses small sequential ids) never inherits stale
|
||||
files from an earlier turn.
|
||||
"""
|
||||
import shutil
|
||||
|
||||
shutil.rmtree(host_dir, ignore_errors=True)
|
||||
os.makedirs(host_dir, exist_ok=True)
|
||||
written: list[str] = []
|
||||
for name, data in files:
|
||||
with open(os.path.join(host_dir, name), 'wb') as fh:
|
||||
fh.write(data)
|
||||
written.append(f'{target_mount_dir}/{name}')
|
||||
return written
|
||||
|
||||
async def _write_files_via_exec(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
target_dir: str,
|
||||
files: list[tuple[str, bytes]],
|
||||
) -> list[str]:
|
||||
"""Fallback: ship files into the sandbox over the exec channel.
|
||||
|
||||
Only used for backends without host-filesystem access (E2B / remote).
|
||||
Each file is base64-decoded inside the sandbox. Files larger than the
|
||||
conservative exec cap are skipped (ARG_MAX / stdout limits).
|
||||
"""
|
||||
import base64 as _b64
|
||||
import json as _json
|
||||
|
||||
manifest = []
|
||||
for name, data in files:
|
||||
if len(data) > self._EXEC_FALLBACK_MAX_BYTES:
|
||||
self.ap.logger.warning(
|
||||
f'Attachment "{name}" ({len(data)} bytes) exceeds the exec-channel '
|
||||
f'fallback limit ({self._EXEC_FALLBACK_MAX_BYTES} bytes); skipping. '
|
||||
f'Configure a host-shared workspace to transfer large files.'
|
||||
)
|
||||
continue
|
||||
manifest.append({'name': name, 'b64': _b64.b64encode(data).decode('ascii')})
|
||||
if not manifest:
|
||||
return []
|
||||
|
||||
manifest_b64 = _b64.b64encode(_json.dumps(manifest).encode('utf-8')).decode('ascii')
|
||||
script = (
|
||||
'import base64, json, os, shutil\n'
|
||||
f'target = {target_dir!r}\n'
|
||||
'shutil.rmtree(target, ignore_errors=True)\n'
|
||||
'os.makedirs(target, exist_ok=True)\n'
|
||||
f'manifest = json.loads(base64.b64decode({manifest_b64!r}))\n'
|
||||
'written = []\n'
|
||||
'for item in manifest:\n'
|
||||
" p = os.path.join(target, item['name'])\n"
|
||||
" with open(p, 'wb') as f:\n"
|
||||
" f.write(base64.b64decode(item['b64']))\n"
|
||||
' written.append(p)\n'
|
||||
'print(json.dumps(written))\n'
|
||||
)
|
||||
result = await self.execute_tool(
|
||||
{'command': f"python3 - <<'LBPY'\n{script}\nLBPY", 'timeout_sec': 120},
|
||||
query,
|
||||
)
|
||||
if not result.get('ok'):
|
||||
self.ap.logger.warning(
|
||||
f'Failed to write inbound attachments into sandbox via exec: '
|
||||
f'query_id={query.query_id} stderr={result.get("stderr", "")[:200]}'
|
||||
)
|
||||
return []
|
||||
try:
|
||||
return _json.loads(str(result.get('stdout') or '').strip().splitlines()[-1])
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
async def materialize_inbound_attachments(self, query: pipeline_query.Query) -> list[dict]:
|
||||
"""Persist message-chain attachments into the sandbox inbox.
|
||||
|
||||
Returns a list of ``{path, name, type, size}`` describing what was
|
||||
written, so the runner can tell the LLM the exact in-sandbox paths.
|
||||
Returns ``[]`` when sandbox is unavailable or there are no attachments.
|
||||
"""
|
||||
if not self._available:
|
||||
return []
|
||||
|
||||
import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
||||
|
||||
message_chain = getattr(query, 'message_chain', None)
|
||||
if not message_chain:
|
||||
return []
|
||||
|
||||
type_map = [
|
||||
(platform_message.Image, 'Image', 'image', 'png'),
|
||||
(platform_message.Voice, 'Voice', 'voice', 'wav'),
|
||||
(platform_message.File, 'File', 'file', 'bin'),
|
||||
]
|
||||
|
||||
pending: list[tuple[str, bytes]] = []
|
||||
descriptors: list[dict] = []
|
||||
index = 0
|
||||
for component in message_chain:
|
||||
matched = None
|
||||
for cls, kind, prefix, default_ext in type_map:
|
||||
if isinstance(component, cls):
|
||||
matched = (kind, prefix, default_ext)
|
||||
break
|
||||
if matched is None:
|
||||
continue
|
||||
kind, prefix, default_ext = matched
|
||||
|
||||
payload = await self._component_to_bytes(component)
|
||||
if payload is None:
|
||||
continue
|
||||
data, _mime = payload
|
||||
if not data or len(data) > self._ATTACHMENT_MAX_BYTES:
|
||||
continue
|
||||
|
||||
index += 1
|
||||
raw_name = getattr(component, 'name', None) or f'{prefix}_{index}.{default_ext}'
|
||||
safe_name = self._sanitize_attachment_name(raw_name, f'{prefix}_{index}.{default_ext}')
|
||||
pending.append((safe_name, data))
|
||||
descriptors.append(
|
||||
{
|
||||
'name': safe_name,
|
||||
'type': kind,
|
||||
'size': len(data),
|
||||
}
|
||||
)
|
||||
|
||||
if not pending:
|
||||
return []
|
||||
|
||||
target_dir = f'{self.INBOX_MOUNT_DIR}/{query.query_id}'
|
||||
written = await self._write_files_into_sandbox(query, self.INBOX_SUBDIR, target_dir, pending)
|
||||
written_basenames = {os.path.basename(p) for p in written}
|
||||
|
||||
result: list[dict] = []
|
||||
for desc in descriptors:
|
||||
if desc['name'] in written_basenames:
|
||||
desc['path'] = f'{target_dir}/{desc["name"]}'
|
||||
result.append(desc)
|
||||
if result:
|
||||
self.ap.logger.info(
|
||||
f'Materialized {len(result)} inbound attachment(s) into sandbox: '
|
||||
f'query_id={query.query_id} dir={target_dir}'
|
||||
)
|
||||
return result
|
||||
|
||||
async def collect_outbound_attachments(self, query: pipeline_query.Query) -> list[dict]:
|
||||
"""Collect files the agent produced in the sandbox outbox.
|
||||
|
||||
Reads ``/workspace/outbox/<query_id>/`` (recursively) — directly from
|
||||
the bind-mounted host directory when available (no size limit), else
|
||||
via the exec channel — returns a list of ``{type, name, base64}``
|
||||
ready to become platform message components, then clears the outbox so
|
||||
a later turn in the same session does not re-send stale files. Returns
|
||||
``[]`` when nothing was produced.
|
||||
"""
|
||||
if not self._available:
|
||||
return []
|
||||
|
||||
host_dir = self._host_query_dir(self.OUTBOX_SUBDIR, query.query_id)
|
||||
if host_dir is not None:
|
||||
entries = await asyncio.to_thread(self._read_outbox_host, host_dir)
|
||||
else:
|
||||
entries = await self._read_outbox_via_exec(query)
|
||||
|
||||
attachments = self._classify_outbound_entries(entries)
|
||||
|
||||
if attachments:
|
||||
await self._clear_outbox(query, host_dir)
|
||||
self.ap.logger.info(
|
||||
f'Collected {len(attachments)} outbound attachment(s) from sandbox: query_id={query.query_id}'
|
||||
)
|
||||
return attachments
|
||||
|
||||
def _read_outbox_host(self, host_dir: str) -> list[dict]:
|
||||
"""Read outbox files straight off the bind-mounted host directory."""
|
||||
import base64 as _b64
|
||||
|
||||
entries: list[dict] = []
|
||||
if not os.path.isdir(host_dir):
|
||||
return entries
|
||||
for root, _dirs, names in os.walk(host_dir):
|
||||
for name in sorted(names):
|
||||
path = os.path.join(root, name)
|
||||
try:
|
||||
if os.path.getsize(path) > self._ATTACHMENT_MAX_BYTES:
|
||||
continue
|
||||
with open(path, 'rb') as fh:
|
||||
data = fh.read()
|
||||
except OSError:
|
||||
continue
|
||||
rel = os.path.relpath(path, host_dir)
|
||||
entries.append({'name': rel, 'b64': _b64.b64encode(data).decode('ascii')})
|
||||
return entries
|
||||
|
||||
async def _read_outbox_via_exec(self, query: pipeline_query.Query) -> list[dict]:
|
||||
"""Fallback: read the outbox over the exec channel (E2B / remote).
|
||||
|
||||
Note: exec stdout is truncated by ``output_limit_chars``, so this path
|
||||
only reliably transfers small files. The host path is preferred.
|
||||
"""
|
||||
import json as _json
|
||||
|
||||
target_dir = f'{self.OUTBOX_MOUNT_DIR}/{query.query_id}'
|
||||
max_bytes = self._EXEC_FALLBACK_MAX_BYTES
|
||||
script = (
|
||||
'import base64, json, os\n'
|
||||
f'target = {target_dir!r}\n'
|
||||
f'max_bytes = {max_bytes}\n'
|
||||
'out = []\n'
|
||||
'if os.path.isdir(target):\n'
|
||||
' for root, _dirs, names in os.walk(target):\n'
|
||||
' for n in sorted(names):\n'
|
||||
' p = os.path.join(root, n)\n'
|
||||
' try:\n'
|
||||
' if os.path.getsize(p) > max_bytes:\n'
|
||||
' continue\n'
|
||||
" with open(p, 'rb') as f:\n"
|
||||
' data = f.read()\n'
|
||||
' except OSError:\n'
|
||||
' continue\n'
|
||||
' rel = os.path.relpath(p, target)\n'
|
||||
" out.append({'name': rel, 'b64': base64.b64encode(data).decode('ascii')})\n"
|
||||
'print(json.dumps(out))\n'
|
||||
)
|
||||
result = await self.execute_tool(
|
||||
{'command': f"python3 - <<'LBPY'\n{script}\nLBPY", 'timeout_sec': 120},
|
||||
query,
|
||||
)
|
||||
if not result.get('ok'):
|
||||
return []
|
||||
try:
|
||||
return _json.loads(str(result.get('stdout') or '').strip().splitlines()[-1])
|
||||
except Exception:
|
||||
return []
|
||||
|
||||
async def _clear_outbox(self, query: pipeline_query.Query, host_dir: str | None) -> None:
|
||||
"""Empty the per-query outbox after collection (host or exec)."""
|
||||
if host_dir is not None:
|
||||
import shutil
|
||||
|
||||
def _clear():
|
||||
shutil.rmtree(host_dir, ignore_errors=True)
|
||||
os.makedirs(host_dir, exist_ok=True)
|
||||
|
||||
await asyncio.to_thread(_clear)
|
||||
return
|
||||
target_dir = f'{self.OUTBOX_MOUNT_DIR}/{query.query_id}'
|
||||
await self.execute_tool(
|
||||
{'command': f'rm -rf {target_dir} && mkdir -p {target_dir}', 'timeout_sec': 30},
|
||||
query,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _classify_outbound_entries(entries: list[dict]) -> list[dict]:
|
||||
"""Classify outbox files into Image/Voice/File component descriptors."""
|
||||
image_exts = {'png', 'jpg', 'jpeg', 'gif', 'webp', 'bmp'}
|
||||
voice_exts = {'wav', 'mp3', 'silk', 'amr', 'ogg', 'm4a', 'aac'}
|
||||
mime_by_ext = {
|
||||
'png': 'image/png',
|
||||
'jpg': 'image/jpeg',
|
||||
'jpeg': 'image/jpeg',
|
||||
'gif': 'image/gif',
|
||||
'webp': 'image/webp',
|
||||
'bmp': 'image/bmp',
|
||||
}
|
||||
attachments: list[dict] = []
|
||||
for entry in entries or []:
|
||||
name = str(entry.get('name', '') or '')
|
||||
b64 = entry.get('b64')
|
||||
if not name or not b64:
|
||||
continue
|
||||
ext = name.rsplit('.', 1)[-1].lower() if '.' in name else ''
|
||||
base_name = os.path.basename(name)
|
||||
if ext in image_exts:
|
||||
mime = mime_by_ext.get(ext, 'image/png')
|
||||
attachments.append({'type': 'Image', 'name': base_name, 'base64': f'data:{mime};base64,{b64}'})
|
||||
elif ext in voice_exts:
|
||||
attachments.append({'type': 'Voice', 'name': base_name, 'base64': f'data:audio/{ext};base64,{b64}'})
|
||||
else:
|
||||
attachments.append({'type': 'File', 'name': base_name, 'base64': b64})
|
||||
return attachments
|
||||
|
||||
async def shutdown(self):
|
||||
await self.client.shutdown()
|
||||
|
||||
|
||||
@@ -7,6 +7,7 @@ from .. import stage
|
||||
|
||||
import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
||||
import langbot_plugin.api.entities.events as events
|
||||
|
||||
|
||||
@@ -23,6 +24,50 @@ class ResponseWrapper(stage.PipelineStage):
|
||||
async def initialize(self, pipeline_config: dict):
|
||||
pass
|
||||
|
||||
def _is_final_assistant_message(self, result) -> bool:
|
||||
"""Whether *result* is the agent's final, tool-call-free answer.
|
||||
|
||||
Intermediate streaming chunks and tool-call rounds must NOT trigger
|
||||
outbound attachment collection — only the terminal assistant message.
|
||||
"""
|
||||
if getattr(result, 'role', None) != 'assistant':
|
||||
return False
|
||||
if result.tool_calls:
|
||||
return False
|
||||
if isinstance(result, provider_message.MessageChunk):
|
||||
return bool(result.is_final)
|
||||
return True
|
||||
|
||||
async def _append_outbound_attachments(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
message_chain: platform_message.MessageChain,
|
||||
) -> None:
|
||||
"""Collect sandbox outbox files and append them to *message_chain*.
|
||||
|
||||
Runs at most once per query (guarded by a query variable) and never
|
||||
raises into the pipeline — attachment delivery is best-effort.
|
||||
"""
|
||||
if query.variables.get('_sandbox_outbound_collected'):
|
||||
return
|
||||
box_service = getattr(self.ap, 'box_service', None)
|
||||
if box_service is None or not getattr(box_service, 'available', False):
|
||||
return
|
||||
query.variables['_sandbox_outbound_collected'] = True
|
||||
try:
|
||||
attachments = await box_service.collect_outbound_attachments(query)
|
||||
except Exception as e:
|
||||
self.ap.logger.warning(f'Outbound attachment collection failed: {e}')
|
||||
return
|
||||
for att in attachments:
|
||||
att_type = att.get('type')
|
||||
if att_type == 'Image':
|
||||
message_chain.append(platform_message.Image(base64=att['base64']))
|
||||
elif att_type == 'Voice':
|
||||
message_chain.append(platform_message.Voice(base64=att['base64']))
|
||||
else:
|
||||
message_chain.append(platform_message.File(name=att.get('name', 'file'), base64=att['base64']))
|
||||
|
||||
async def process(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
@@ -83,10 +128,16 @@ class ResponseWrapper(stage.PipelineStage):
|
||||
)
|
||||
else:
|
||||
if event_ctx.event.reply_message_chain is not None:
|
||||
query.resp_message_chain.append(event_ctx.event.reply_message_chain)
|
||||
|
||||
reply_chain = event_ctx.event.reply_message_chain
|
||||
else:
|
||||
query.resp_message_chain.append(result.get_content_platform_message_chain())
|
||||
reply_chain = result.get_content_platform_message_chain()
|
||||
|
||||
# Attach files the agent produced in the sandbox
|
||||
# outbox, but only on the terminal assistant message.
|
||||
if self._is_final_assistant_message(result):
|
||||
await self._append_outbound_attachments(query, reply_chain)
|
||||
|
||||
query.resp_message_chain.append(reply_chain)
|
||||
|
||||
yield entities.StageProcessResult(
|
||||
result_type=entities.ResultType.CONTINUE,
|
||||
|
||||
@@ -312,12 +312,18 @@ class WebSocketAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter)
|
||||
|
||||
async def _process_image_components(self, message_chain_obj: list):
|
||||
"""
|
||||
处理消息链中的图片和文件组件,将path转换为base64
|
||||
处理消息链中的图片、语音和文件组件,将 path 转换为 base64
|
||||
|
||||
Image / Voice / File components uploaded from the web client carry a
|
||||
storage key in ``path``. Resolve it to a base64 data URI so downstream
|
||||
stages (multimodal LLM input and the Box sandbox inbox) have a usable
|
||||
payload, then drop the now-consumed storage object.
|
||||
|
||||
Args:
|
||||
message_chain_obj: 消息链对象列表
|
||||
"""
|
||||
import base64
|
||||
import mimetypes
|
||||
|
||||
storage_mgr = self.ap.storage_mgr
|
||||
|
||||
@@ -325,31 +331,33 @@ class WebSocketAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter)
|
||||
comp_type = component.get('type', '')
|
||||
comp_path = component.get('path', '')
|
||||
|
||||
if not comp_path:
|
||||
if not comp_path or comp_type not in ('Image', 'Voice', 'File'):
|
||||
continue
|
||||
|
||||
if comp_type == 'Image':
|
||||
try:
|
||||
file_content = await storage_mgr.storage_provider.load(comp_path)
|
||||
base64_str = base64.b64encode(file_content).decode('utf-8')
|
||||
try:
|
||||
file_content = await storage_mgr.storage_provider.load(comp_path)
|
||||
base64_str = base64.b64encode(file_content).decode('utf-8')
|
||||
|
||||
file_key = comp_path
|
||||
if file_key.lower().endswith(('.jpg', '.jpeg')):
|
||||
lowered = comp_path.lower()
|
||||
if comp_type == 'Image':
|
||||
if lowered.endswith(('.jpg', '.jpeg')):
|
||||
mime_type = 'image/jpeg'
|
||||
elif file_key.lower().endswith('.png'):
|
||||
mime_type = 'image/png'
|
||||
elif file_key.lower().endswith('.gif'):
|
||||
elif lowered.endswith('.gif'):
|
||||
mime_type = 'image/gif'
|
||||
elif file_key.lower().endswith('.webp'):
|
||||
elif lowered.endswith('.webp'):
|
||||
mime_type = 'image/webp'
|
||||
else:
|
||||
mime_type = 'image/png'
|
||||
elif comp_type == 'Voice':
|
||||
mime_type = mimetypes.guess_type(comp_path)[0] or 'audio/wav'
|
||||
else: # File
|
||||
mime_type = mimetypes.guess_type(comp_path)[0] or 'application/octet-stream'
|
||||
|
||||
component['base64'] = f'data:{mime_type};base64,{base64_str}'
|
||||
await storage_mgr.storage_provider.delete(comp_path)
|
||||
component['path'] = ''
|
||||
except Exception as e:
|
||||
await self.logger.error(f'Failed to load image file {comp_path}: {e}')
|
||||
component['base64'] = f'data:{mime_type};base64,{base64_str}'
|
||||
await storage_mgr.storage_provider.delete(comp_path)
|
||||
component['path'] = ''
|
||||
except Exception as e:
|
||||
await self.logger.error(f'Failed to load {comp_type} file {comp_path}: {e}')
|
||||
|
||||
async def handle_websocket_message(
|
||||
self,
|
||||
|
||||
@@ -216,11 +216,22 @@ class LiteLLMRequester(requester.ProviderAPIRequester):
|
||||
content = msg_dict.get('content')
|
||||
|
||||
if isinstance(content, list):
|
||||
converted_parts = []
|
||||
for part in content:
|
||||
if isinstance(part, dict) and part.get('type') == 'image_base64':
|
||||
part['image_url'] = {'url': part['image_base64']}
|
||||
part['type'] = 'image_url'
|
||||
del part['image_base64']
|
||||
# OpenAI-compatible chat models reject non-image file parts
|
||||
# (audio/document base64 or url). These originate from Voice /
|
||||
# File attachments — including ones replayed from conversation
|
||||
# history — and the agent already accesses their bytes via the
|
||||
# sandbox. Drop them from the model payload to avoid
|
||||
# "Invalid user message ... invalid content type=file_base64".
|
||||
if isinstance(part, dict) and part.get('type') in ('file_base64', 'file_url'):
|
||||
continue
|
||||
converted_parts.append(part)
|
||||
msg_dict['content'] = converted_parts
|
||||
|
||||
req_messages.append(msg_dict)
|
||||
|
||||
|
||||
@@ -104,6 +104,68 @@ class _StreamAccumulator:
|
||||
class LocalAgentRunner(runner.RequestRunner):
|
||||
"""Local agent request runner"""
|
||||
|
||||
async def _inject_inbound_attachments(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
user_message: provider_message.Message,
|
||||
) -> None:
|
||||
"""Persist inbound attachments into the sandbox and tell the model.
|
||||
|
||||
No-op when the box service is unavailable or there are no attachments.
|
||||
On success, appends an extra text ContentElement to the user message
|
||||
listing the in-sandbox paths and the outbox convention, and stashes the
|
||||
descriptors in ``query.variables['_sandbox_inbound_attachments']``.
|
||||
"""
|
||||
box_service = getattr(self.ap, 'box_service', None)
|
||||
if box_service is None or not getattr(box_service, 'available', False):
|
||||
return
|
||||
try:
|
||||
attachments = await box_service.materialize_inbound_attachments(query)
|
||||
except Exception as e: # never break the chat turn over attachment IO
|
||||
self.ap.logger.warning(f'Inbound attachment materialization failed: {e}')
|
||||
return
|
||||
if not attachments:
|
||||
return
|
||||
|
||||
query.variables['_sandbox_inbound_attachments'] = attachments
|
||||
|
||||
lines = [
|
||||
'The user sent attachments. They have been saved into the sandbox and are '
|
||||
'available to the exec/read/write tools at these paths:'
|
||||
]
|
||||
for att in attachments:
|
||||
lines.append(f'- {att["type"]}: {att["path"]} ({att["size"]} bytes)')
|
||||
outbox_dir = f'{box_service.OUTBOX_MOUNT_DIR}/{query.query_id}'
|
||||
lines.append(
|
||||
'If you produce any file (image, audio, document, etc.) that should be sent '
|
||||
f'back to the user, write it into {outbox_dir}/ (create the directory if '
|
||||
'needed). Every file placed there will be delivered to the user automatically.'
|
||||
)
|
||||
note = '\n'.join(lines)
|
||||
|
||||
# Voice/File attachments are now available to the agent via the sandbox
|
||||
# (exec/read/write tools). Their raw bytes must NOT be forwarded to the
|
||||
# chat model as multimodal content: providers reject non-image file
|
||||
# parts ("Invalid user message ... ensure all user messages are valid
|
||||
# OpenAI chat completion messages"). Strip those content elements and
|
||||
# rely on the sandbox-path note instead. Images are kept so vision
|
||||
# models can still see them.
|
||||
_model_unsafe_types = {'file_base64', 'file_url'}
|
||||
if isinstance(user_message.content, list):
|
||||
user_message.content = [
|
||||
ce for ce in user_message.content if getattr(ce, 'type', None) not in _model_unsafe_types
|
||||
]
|
||||
|
||||
if isinstance(user_message.content, str):
|
||||
user_message.content = [
|
||||
provider_message.ContentElement.from_text(user_message.content),
|
||||
provider_message.ContentElement.from_text(note),
|
||||
]
|
||||
elif isinstance(user_message.content, list):
|
||||
user_message.content.append(provider_message.ContentElement.from_text(note))
|
||||
else:
|
||||
user_message.content = [provider_message.ContentElement.from_text(note)]
|
||||
|
||||
def _build_request_messages(
|
||||
self,
|
||||
query: pipeline_query.Query,
|
||||
@@ -232,6 +294,12 @@ class LocalAgentRunner(runner.RequestRunner):
|
||||
|
||||
user_message = copy.deepcopy(query.user_message)
|
||||
|
||||
# Materialize inbound attachments (images / voices / files) into the
|
||||
# sandbox so the agent's exec/read/write tools can operate on the real
|
||||
# bytes — not just the multimodal copy the model sees. The exact
|
||||
# in-sandbox paths are announced to the model as a system note.
|
||||
await self._inject_inbound_attachments(query, user_message)
|
||||
|
||||
user_message_text = ''
|
||||
|
||||
if isinstance(user_message.content, str):
|
||||
|
||||
Reference in New Issue
Block a user