feat(box): bidirectional attachment transfer for sandbox

Materialize inbound attachments into the sandbox workspace so agents can
process user-sent files, and collect agent-produced files from the outbox
to attach them back to the reply.

- box(service): add materialize_inbound_attachments / collect_outbound
  attachments. Prefer direct host-filesystem read/write on the bind-mounted
  workspace (no size limit), falling back to chunked exec only for
  non-shared backends (e2b/remote). Clear per-query inbox/outbox dirs at
  turn start to avoid query_id-reuse collisions.
- provider(localagent): inject inbound attachment descriptors into the
  sandbox and append a system note telling the agent the inbox/outbox paths.
- pipeline(wrapper): collect outbox files on the final stream chunk and
  append them as attachment components to the response chain.
- web(debug-dialog): render File components with a download link when
  base64/url is present; add base64/path fields to the File entity.
- tests: cover inbound/outbound, large-file transfer without truncation,
  and stale-dir clearing (86 passing).
This commit is contained in:
RockChinQ
2026-06-17 19:01:03 -04:00
parent b3c6de2072
commit 22c0a18bea
6 changed files with 806 additions and 7 deletions
+422
View File
@@ -335,6 +335,428 @@ class BoxService:
return await self.execute_spec_payload(spec_payload, query)
# ── Attachment passthrough (inbound / outbound) ──────────────────
#
# IM/webchat attachments (images, voices, files) reach the LLM as
# multimodal content, but historically never landed on the sandbox
# filesystem, so the agent's exec/read/write tools could not operate on
# them. Conversely, files the agent produced inside the sandbox were
# never surfaced back to the user. These two helpers close both gaps:
#
# inbound : message_chain attachments -> /workspace/inbox/<query_id>/
# outbound : /workspace/outbox/<query_id>/ -> reply MessageChain
#
# Transfer prefers DIRECT HOST FILESYSTEM access to the bind-mounted
# workspace (default_workspace on the host maps to /workspace inside the
# container), which has no size limit. This covers the local docker /
# nsjail / stdio backends. For backends where the workspace is NOT visible
# on the LangBot host (E2B, an external remote runtime.endpoint), it falls
# back to a base64-through-exec round-trip. The exec channel can only move
# small files reliably — the docker backend passes the command as a single
# argv (ARG_MAX) and exec stdout is truncated by output_limit_chars — so
# the host path is strongly preferred and used whenever available.
INBOX_MOUNT_DIR = '/workspace/inbox'
OUTBOX_MOUNT_DIR = '/workspace/outbox'
INBOX_SUBDIR = 'inbox'
OUTBOX_SUBDIR = 'outbox'
# Hard cap on a single attachment. The HTTP upload endpoints already cap
# uploads at 10MiB; keep parity.
_ATTACHMENT_MAX_BYTES = 10 * _MIB
# Conservative cap for the exec FALLBACK path only (ARG_MAX / stdout
# truncation). The host-filesystem path has no such limit.
_EXEC_FALLBACK_MAX_BYTES = 256 * 1024
def _host_query_dir(self, subdir: str, query_id) -> str | None:
"""Host path for ``/workspace/<subdir>/<query_id>`` when LangBot can
access the bind-mounted workspace directly, else ``None``.
``default_workspace`` is the host directory bind-mounted to
``/workspace`` for the local docker/nsjail backends and shared
outright in stdio mode, so a file written there by LangBot is visible
to the sandbox (and vice-versa). It is ``None`` / not a local dir for
E2B and remote runtimes, where we must fall back to the exec channel.
"""
root = self.default_workspace
if not root or not os.path.isdir(root):
return None
return os.path.join(root, subdir, str(query_id))
@staticmethod
def _sanitize_attachment_name(name: str, fallback: str) -> str:
"""Reduce an arbitrary attachment name to a safe basename.
Strips directory separators and parent refs so a crafted file name
can never escape the inbox/outbox directory.
"""
base = os.path.basename(str(name or '').replace('\\', '/').strip())
base = base.lstrip('.') or ''
# Drop anything that is not a conservative filename charset.
cleaned = ''.join(c for c in base if c.isalnum() or c in ('.', '_', '-', ' ')).strip()
cleaned = cleaned.replace(' ', '_')
return cleaned or fallback
@staticmethod
async def _component_to_bytes(component) -> tuple[bytes, str] | None:
"""Best-effort extraction of (bytes, mime) from a platform component.
Handles base64, http(s) url and local path sources. Returns None when
no payload can be resolved.
"""
import base64 as _b64
b64 = getattr(component, 'base64', None)
if b64:
data = b64
mime = 'application/octet-stream'
if isinstance(data, str) and data.startswith('data:'):
split_index = data.find(';base64,')
if split_index != -1:
mime = data[5:split_index]
data = data[split_index + 8 :]
try:
return _b64.b64decode(data), mime
except Exception:
return None
url = getattr(component, 'url', None)
if url:
try:
import httpx
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url)
resp.raise_for_status()
return resp.content, resp.headers.get('Content-Type', 'application/octet-stream')
except Exception:
return None
path = getattr(component, 'path', None)
if path:
try:
import aiofiles
async with aiofiles.open(path, 'rb') as f:
return await f.read(), 'application/octet-stream'
except Exception:
return None
return None
async def _write_files_into_sandbox(
self,
query: pipeline_query.Query,
subdir: str,
target_mount_dir: str,
files: list[tuple[str, bytes]],
) -> list[str]:
"""Write *files* (name, bytes) into the per-query directory.
Prefers a direct host-filesystem write to the bind-mounted workspace
(no size limit). Falls back to a base64-through-exec round-trip only
when the workspace is not visible on the LangBot host (E2B / remote).
Returns the list of in-sandbox paths actually written.
"""
if not files:
return []
host_dir = self._host_query_dir(subdir, query.query_id)
if host_dir is not None:
return await asyncio.to_thread(self._write_files_host, host_dir, target_mount_dir, files)
return await self._write_files_via_exec(query, target_mount_dir, files)
def _write_files_host(
self,
host_dir: str,
target_mount_dir: str,
files: list[tuple[str, bytes]],
) -> list[str]:
"""Write attachments straight onto the bind-mounted host directory.
Recreates the per-query directory from scratch so a reused query_id
(the webchat session uses small sequential ids) never inherits stale
files from an earlier turn.
"""
import shutil
shutil.rmtree(host_dir, ignore_errors=True)
os.makedirs(host_dir, exist_ok=True)
written: list[str] = []
for name, data in files:
with open(os.path.join(host_dir, name), 'wb') as fh:
fh.write(data)
written.append(f'{target_mount_dir}/{name}')
return written
async def _write_files_via_exec(
self,
query: pipeline_query.Query,
target_dir: str,
files: list[tuple[str, bytes]],
) -> list[str]:
"""Fallback: ship files into the sandbox over the exec channel.
Only used for backends without host-filesystem access (E2B / remote).
Each file is base64-decoded inside the sandbox. Files larger than the
conservative exec cap are skipped (ARG_MAX / stdout limits).
"""
import base64 as _b64
import json as _json
manifest = []
for name, data in files:
if len(data) > self._EXEC_FALLBACK_MAX_BYTES:
self.ap.logger.warning(
f'Attachment "{name}" ({len(data)} bytes) exceeds the exec-channel '
f'fallback limit ({self._EXEC_FALLBACK_MAX_BYTES} bytes); skipping. '
f'Configure a host-shared workspace to transfer large files.'
)
continue
manifest.append({'name': name, 'b64': _b64.b64encode(data).decode('ascii')})
if not manifest:
return []
manifest_b64 = _b64.b64encode(_json.dumps(manifest).encode('utf-8')).decode('ascii')
script = (
'import base64, json, os, shutil\n'
f'target = {target_dir!r}\n'
'shutil.rmtree(target, ignore_errors=True)\n'
'os.makedirs(target, exist_ok=True)\n'
f'manifest = json.loads(base64.b64decode({manifest_b64!r}))\n'
'written = []\n'
'for item in manifest:\n'
" p = os.path.join(target, item['name'])\n"
" with open(p, 'wb') as f:\n"
" f.write(base64.b64decode(item['b64']))\n"
' written.append(p)\n'
'print(json.dumps(written))\n'
)
result = await self.execute_tool(
{'command': f"python3 - <<'LBPY'\n{script}\nLBPY", 'timeout_sec': 120},
query,
)
if not result.get('ok'):
self.ap.logger.warning(
f'Failed to write inbound attachments into sandbox via exec: '
f'query_id={query.query_id} stderr={result.get("stderr", "")[:200]}'
)
return []
try:
return _json.loads(str(result.get('stdout') or '').strip().splitlines()[-1])
except Exception:
return []
async def materialize_inbound_attachments(self, query: pipeline_query.Query) -> list[dict]:
"""Persist message-chain attachments into the sandbox inbox.
Returns a list of ``{path, name, type, size}`` describing what was
written, so the runner can tell the LLM the exact in-sandbox paths.
Returns ``[]`` when sandbox is unavailable or there are no attachments.
"""
if not self._available:
return []
import langbot_plugin.api.entities.builtin.platform.message as platform_message
message_chain = getattr(query, 'message_chain', None)
if not message_chain:
return []
type_map = [
(platform_message.Image, 'Image', 'image', 'png'),
(platform_message.Voice, 'Voice', 'voice', 'wav'),
(platform_message.File, 'File', 'file', 'bin'),
]
pending: list[tuple[str, bytes]] = []
descriptors: list[dict] = []
index = 0
for component in message_chain:
matched = None
for cls, kind, prefix, default_ext in type_map:
if isinstance(component, cls):
matched = (kind, prefix, default_ext)
break
if matched is None:
continue
kind, prefix, default_ext = matched
payload = await self._component_to_bytes(component)
if payload is None:
continue
data, _mime = payload
if not data or len(data) > self._ATTACHMENT_MAX_BYTES:
continue
index += 1
raw_name = getattr(component, 'name', None) or f'{prefix}_{index}.{default_ext}'
safe_name = self._sanitize_attachment_name(raw_name, f'{prefix}_{index}.{default_ext}')
pending.append((safe_name, data))
descriptors.append(
{
'name': safe_name,
'type': kind,
'size': len(data),
}
)
if not pending:
return []
target_dir = f'{self.INBOX_MOUNT_DIR}/{query.query_id}'
written = await self._write_files_into_sandbox(query, self.INBOX_SUBDIR, target_dir, pending)
written_basenames = {os.path.basename(p) for p in written}
result: list[dict] = []
for desc in descriptors:
if desc['name'] in written_basenames:
desc['path'] = f'{target_dir}/{desc["name"]}'
result.append(desc)
if result:
self.ap.logger.info(
f'Materialized {len(result)} inbound attachment(s) into sandbox: '
f'query_id={query.query_id} dir={target_dir}'
)
return result
async def collect_outbound_attachments(self, query: pipeline_query.Query) -> list[dict]:
"""Collect files the agent produced in the sandbox outbox.
Reads ``/workspace/outbox/<query_id>/`` (recursively) — directly from
the bind-mounted host directory when available (no size limit), else
via the exec channel — returns a list of ``{type, name, base64}``
ready to become platform message components, then clears the outbox so
a later turn in the same session does not re-send stale files. Returns
``[]`` when nothing was produced.
"""
if not self._available:
return []
host_dir = self._host_query_dir(self.OUTBOX_SUBDIR, query.query_id)
if host_dir is not None:
entries = await asyncio.to_thread(self._read_outbox_host, host_dir)
else:
entries = await self._read_outbox_via_exec(query)
attachments = self._classify_outbound_entries(entries)
if attachments:
await self._clear_outbox(query, host_dir)
self.ap.logger.info(
f'Collected {len(attachments)} outbound attachment(s) from sandbox: query_id={query.query_id}'
)
return attachments
def _read_outbox_host(self, host_dir: str) -> list[dict]:
"""Read outbox files straight off the bind-mounted host directory."""
import base64 as _b64
entries: list[dict] = []
if not os.path.isdir(host_dir):
return entries
for root, _dirs, names in os.walk(host_dir):
for name in sorted(names):
path = os.path.join(root, name)
try:
if os.path.getsize(path) > self._ATTACHMENT_MAX_BYTES:
continue
with open(path, 'rb') as fh:
data = fh.read()
except OSError:
continue
rel = os.path.relpath(path, host_dir)
entries.append({'name': rel, 'b64': _b64.b64encode(data).decode('ascii')})
return entries
async def _read_outbox_via_exec(self, query: pipeline_query.Query) -> list[dict]:
"""Fallback: read the outbox over the exec channel (E2B / remote).
Note: exec stdout is truncated by ``output_limit_chars``, so this path
only reliably transfers small files. The host path is preferred.
"""
import json as _json
target_dir = f'{self.OUTBOX_MOUNT_DIR}/{query.query_id}'
max_bytes = self._EXEC_FALLBACK_MAX_BYTES
script = (
'import base64, json, os\n'
f'target = {target_dir!r}\n'
f'max_bytes = {max_bytes}\n'
'out = []\n'
'if os.path.isdir(target):\n'
' for root, _dirs, names in os.walk(target):\n'
' for n in sorted(names):\n'
' p = os.path.join(root, n)\n'
' try:\n'
' if os.path.getsize(p) > max_bytes:\n'
' continue\n'
" with open(p, 'rb') as f:\n"
' data = f.read()\n'
' except OSError:\n'
' continue\n'
' rel = os.path.relpath(p, target)\n'
" out.append({'name': rel, 'b64': base64.b64encode(data).decode('ascii')})\n"
'print(json.dumps(out))\n'
)
result = await self.execute_tool(
{'command': f"python3 - <<'LBPY'\n{script}\nLBPY", 'timeout_sec': 120},
query,
)
if not result.get('ok'):
return []
try:
return _json.loads(str(result.get('stdout') or '').strip().splitlines()[-1])
except Exception:
return []
async def _clear_outbox(self, query: pipeline_query.Query, host_dir: str | None) -> None:
"""Empty the per-query outbox after collection (host or exec)."""
if host_dir is not None:
import shutil
def _clear():
shutil.rmtree(host_dir, ignore_errors=True)
os.makedirs(host_dir, exist_ok=True)
await asyncio.to_thread(_clear)
return
target_dir = f'{self.OUTBOX_MOUNT_DIR}/{query.query_id}'
await self.execute_tool(
{'command': f'rm -rf {target_dir} && mkdir -p {target_dir}', 'timeout_sec': 30},
query,
)
@staticmethod
def _classify_outbound_entries(entries: list[dict]) -> list[dict]:
"""Classify outbox files into Image/Voice/File component descriptors."""
image_exts = {'png', 'jpg', 'jpeg', 'gif', 'webp', 'bmp'}
voice_exts = {'wav', 'mp3', 'silk', 'amr', 'ogg', 'm4a', 'aac'}
mime_by_ext = {
'png': 'image/png',
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'gif': 'image/gif',
'webp': 'image/webp',
'bmp': 'image/bmp',
}
attachments: list[dict] = []
for entry in entries or []:
name = str(entry.get('name', '') or '')
b64 = entry.get('b64')
if not name or not b64:
continue
ext = name.rsplit('.', 1)[-1].lower() if '.' in name else ''
base_name = os.path.basename(name)
if ext in image_exts:
mime = mime_by_ext.get(ext, 'image/png')
attachments.append({'type': 'Image', 'name': base_name, 'base64': f'data:{mime};base64,{b64}'})
elif ext in voice_exts:
attachments.append({'type': 'Voice', 'name': base_name, 'base64': f'data:audio/{ext};base64,{b64}'})
else:
attachments.append({'type': 'File', 'name': base_name, 'base64': b64})
return attachments
async def shutdown(self):
await self.client.shutdown()
+54 -3
View File
@@ -7,6 +7,7 @@ from .. import stage
import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.events as events
@@ -23,6 +24,50 @@ class ResponseWrapper(stage.PipelineStage):
async def initialize(self, pipeline_config: dict):
pass
def _is_final_assistant_message(self, result) -> bool:
"""Whether *result* is the agent's final, tool-call-free answer.
Intermediate streaming chunks and tool-call rounds must NOT trigger
outbound attachment collection — only the terminal assistant message.
"""
if getattr(result, 'role', None) != 'assistant':
return False
if result.tool_calls:
return False
if isinstance(result, provider_message.MessageChunk):
return bool(result.is_final)
return True
async def _append_outbound_attachments(
self,
query: pipeline_query.Query,
message_chain: platform_message.MessageChain,
) -> None:
"""Collect sandbox outbox files and append them to *message_chain*.
Runs at most once per query (guarded by a query variable) and never
raises into the pipeline — attachment delivery is best-effort.
"""
if query.variables.get('_sandbox_outbound_collected'):
return
box_service = getattr(self.ap, 'box_service', None)
if box_service is None or not getattr(box_service, 'available', False):
return
query.variables['_sandbox_outbound_collected'] = True
try:
attachments = await box_service.collect_outbound_attachments(query)
except Exception as e:
self.ap.logger.warning(f'Outbound attachment collection failed: {e}')
return
for att in attachments:
att_type = att.get('type')
if att_type == 'Image':
message_chain.append(platform_message.Image(base64=att['base64']))
elif att_type == 'Voice':
message_chain.append(platform_message.Voice(base64=att['base64']))
else:
message_chain.append(platform_message.File(name=att.get('name', 'file'), base64=att['base64']))
async def process(
self,
query: pipeline_query.Query,
@@ -83,10 +128,16 @@ class ResponseWrapper(stage.PipelineStage):
)
else:
if event_ctx.event.reply_message_chain is not None:
query.resp_message_chain.append(event_ctx.event.reply_message_chain)
reply_chain = event_ctx.event.reply_message_chain
else:
query.resp_message_chain.append(result.get_content_platform_message_chain())
reply_chain = result.get_content_platform_message_chain()
# Attach files the agent produced in the sandbox
# outbox, but only on the terminal assistant message.
if self._is_final_assistant_message(result):
await self._append_outbound_attachments(query, reply_chain)
query.resp_message_chain.append(reply_chain)
yield entities.StageProcessResult(
result_type=entities.ResultType.CONTINUE,
@@ -104,6 +104,55 @@ class _StreamAccumulator:
class LocalAgentRunner(runner.RequestRunner):
"""Local agent request runner"""
async def _inject_inbound_attachments(
self,
query: pipeline_query.Query,
user_message: provider_message.Message,
) -> None:
"""Persist inbound attachments into the sandbox and tell the model.
No-op when the box service is unavailable or there are no attachments.
On success, appends an extra text ContentElement to the user message
listing the in-sandbox paths and the outbox convention, and stashes the
descriptors in ``query.variables['_sandbox_inbound_attachments']``.
"""
box_service = getattr(self.ap, 'box_service', None)
if box_service is None or not getattr(box_service, 'available', False):
return
try:
attachments = await box_service.materialize_inbound_attachments(query)
except Exception as e: # never break the chat turn over attachment IO
self.ap.logger.warning(f'Inbound attachment materialization failed: {e}')
return
if not attachments:
return
query.variables['_sandbox_inbound_attachments'] = attachments
lines = [
'The user sent attachments. They have been saved into the sandbox and are '
'available to the exec/read/write tools at these paths:'
]
for att in attachments:
lines.append(f'- {att["type"]}: {att["path"]} ({att["size"]} bytes)')
outbox_dir = f'{box_service.OUTBOX_MOUNT_DIR}/{query.query_id}'
lines.append(
'If you produce any file (image, audio, document, etc.) that should be sent '
f'back to the user, write it into {outbox_dir}/ (create the directory if '
'needed). Every file placed there will be delivered to the user automatically.'
)
note = '\n'.join(lines)
if isinstance(user_message.content, str):
user_message.content = [
provider_message.ContentElement.from_text(user_message.content),
provider_message.ContentElement.from_text(note),
]
elif isinstance(user_message.content, list):
user_message.content.append(provider_message.ContentElement.from_text(note))
else:
user_message.content = [provider_message.ContentElement.from_text(note)]
def _build_request_messages(
self,
query: pipeline_query.Query,
@@ -232,6 +281,12 @@ class LocalAgentRunner(runner.RequestRunner):
user_message = copy.deepcopy(query.user_message)
# Materialize inbound attachments (images / voices / files) into the
# sandbox so the agent's exec/read/write tools can operate on the real
# bytes — not just the multimodal copy the model sees. The exact
# in-sandbox paths are announced to the model as a system note.
await self._inject_inbound_attachments(query, user_message)
user_message_text = ''
if isinstance(user_message.content, str):
+252
View File
@@ -1556,3 +1556,255 @@ class TestBuildSkillExtraMounts:
service = BoxService(app, client=Mock(spec=BoxRuntimeClient))
assert service.build_skill_extra_mounts(make_query()) == []
# ── Attachment passthrough (inbound / outbound) ─────────────────────────────
class TestAttachmentHelpers:
def test_sanitize_attachment_name_strips_traversal(self):
assert BoxService._sanitize_attachment_name('../../etc/passwd', 'fb') == 'passwd'
assert BoxService._sanitize_attachment_name('/a/b/c.png', 'fb') == 'c.png'
assert BoxService._sanitize_attachment_name('a b c.txt', 'fb') == 'a_b_c.txt'
assert BoxService._sanitize_attachment_name('', 'fallback.bin') == 'fallback.bin'
assert BoxService._sanitize_attachment_name('...', 'fb.bin') == 'fb.bin'
# weird unicode / shell chars dropped, but keeps a usable name
out = BoxService._sanitize_attachment_name('rm -rf $(x).png', 'fb')
assert '/' not in out and '$' not in out and out.endswith('.png')
def test_classify_outbound_entries_by_extension(self):
entries = [
{'name': 'chart.png', 'b64': 'AAA'},
{'name': 'clip.mp3', 'b64': 'BBB'},
{'name': 'report.pdf', 'b64': 'CCC'},
{'name': 'sub/dir/photo.JPG', 'b64': 'DDD'},
{'name': 'noext', 'b64': 'EEE'},
{'name': 'skip', 'b64': ''}, # dropped (no payload)
]
out = BoxService._classify_outbound_entries(entries)
by_name = {a['name']: a for a in out}
assert by_name['chart.png']['type'] == 'Image'
assert by_name['chart.png']['base64'].startswith('data:image/png;base64,')
assert by_name['clip.mp3']['type'] == 'Voice'
assert by_name['clip.mp3']['base64'].startswith('data:audio/mp3;base64,')
assert by_name['report.pdf']['type'] == 'File'
assert by_name['report.pdf']['base64'] == 'CCC' # raw b64, no data: prefix
# nested path collapses to basename, case-insensitive ext
assert by_name['photo.JPG']['type'] == 'Image'
assert by_name['noext']['type'] == 'File'
assert 'skip' not in by_name
@pytest.mark.asyncio
async def test_component_to_bytes_from_data_uri(self):
import base64
raw = b'hello-bytes'
data_uri = 'data:text/plain;base64,' + base64.b64encode(raw).decode()
component = SimpleNamespace(base64=data_uri, url=None, path=None)
result = await BoxService._component_to_bytes(component)
assert result is not None
data, mime = result
assert data == raw
assert mime == 'text/plain'
@pytest.mark.asyncio
async def test_component_to_bytes_returns_none_when_empty(self):
component = SimpleNamespace(base64=None, url=None, path=None)
assert await BoxService._component_to_bytes(component) is None
class TestInboundOutboundRoundTrip:
def _service(self) -> BoxService:
service = BoxService(make_app(Mock()), client=Mock(spec=BoxRuntimeClient))
service._available = True
return service
@pytest.mark.asyncio
async def test_materialize_inbound_writes_and_describes(self):
import base64
import langbot_plugin.api.entities.builtin.platform.message as platform_message
service = self._service()
img_bytes = b'\x89PNG\r\n\x1a\n fake png'
img_b64 = 'data:image/png;base64,' + base64.b64encode(img_bytes).decode()
query = make_query()
query.message_chain = platform_message.MessageChain(
[
platform_message.Plain(text='please resize this'),
platform_message.Image(base64=img_b64),
]
)
# Mock the sandbox write path: echo back the written paths.
async def fake_execute_tool(parameters, q):
assert '/workspace/inbox/' in parameters['command']
return {
'ok': True,
'stdout': '["/workspace/inbox/42/image_1.png"]',
'stderr': '',
}
service.execute_tool = AsyncMock(side_effect=fake_execute_tool)
descriptors = await service.materialize_inbound_attachments(query)
assert len(descriptors) == 1
d = descriptors[0]
assert d['type'] == 'Image'
assert d['path'] == '/workspace/inbox/42/image_1.png'
assert d['size'] == len(img_bytes)
@pytest.mark.asyncio
async def test_materialize_inbound_noop_without_attachments(self):
import langbot_plugin.api.entities.builtin.platform.message as platform_message
service = self._service()
query = make_query()
query.message_chain = platform_message.MessageChain([platform_message.Plain(text='just text')])
service.execute_tool = AsyncMock()
assert await service.materialize_inbound_attachments(query) == []
service.execute_tool.assert_not_called()
@pytest.mark.asyncio
async def test_collect_outbound_reads_and_clears(self):
service = self._service()
query = make_query()
calls = []
async def fake_execute_tool(parameters, q):
calls.append(parameters['command'])
if 'os.walk' in parameters['command']:
return {
'ok': True,
'stdout': '[{"name": "out.png", "b64": "QUJD"}]',
'stderr': '',
}
# the rm -rf cleanup call
return {'ok': True, 'stdout': '', 'stderr': ''}
service.execute_tool = AsyncMock(side_effect=fake_execute_tool)
attachments = await service.collect_outbound_attachments(query)
assert len(attachments) == 1
assert attachments[0]['type'] == 'Image'
assert attachments[0]['name'] == 'out.png'
# cleanup (rm -rf) must have been issued after a successful collection
assert any('rm -rf' in c for c in calls)
@pytest.mark.asyncio
async def test_collect_outbound_empty_no_cleanup(self):
service = self._service()
query = make_query()
calls = []
async def fake_execute_tool(parameters, q):
calls.append(parameters['command'])
return {'ok': True, 'stdout': '[]', 'stderr': ''}
service.execute_tool = AsyncMock(side_effect=fake_execute_tool)
assert await service.collect_outbound_attachments(query) == []
assert not any('rm -rf' in c for c in calls)
@pytest.mark.asyncio
async def test_passthrough_noop_when_unavailable(self):
service = BoxService(make_app(Mock()), client=Mock(spec=BoxRuntimeClient))
service._available = False
query = make_query()
assert await service.materialize_inbound_attachments(query) == []
assert await service.collect_outbound_attachments(query) == []
class TestAttachmentHostPath:
"""Direct host-filesystem transfer path (bind-mounted workspace).
When ``default_workspace`` is a real local dir, inbound/outbound bypass the
exec channel entirely (no ARG_MAX / stdout-truncation limits) and read/write
the bind-mounted host dir directly.
"""
def _service_with_workspace(self, tmp_path):
ws = str(tmp_path / 'box' / 'default')
os.makedirs(ws, exist_ok=True)
app = make_app(Mock(), allowed_mount_roots=[str(tmp_path)], host_root=str(tmp_path / 'box'))
service = BoxService(app, client=Mock(spec=BoxRuntimeClient))
service._available = True
# Force the default_workspace to our tmp dir so _host_query_dir resolves.
service.default_workspace = ws
return service, ws
@pytest.mark.asyncio
async def test_inbound_writes_to_host_no_exec(self, tmp_path):
import base64
import langbot_plugin.api.entities.builtin.platform.message as platform_message
service, ws = self._service_with_workspace(tmp_path)
# Big payload that would blow ARG_MAX on the exec path:
big = b'\x89PNG\r\n\x1a\n' + b'x' * (300 * 1024)
b64 = 'data:image/png;base64,' + base64.b64encode(big).decode()
query = make_query()
query.message_chain = platform_message.MessageChain([platform_message.Image(base64=b64)])
# execute_tool must NOT be called on the host path.
service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used on host path'))
descriptors = await service.materialize_inbound_attachments(query)
assert len(descriptors) == 1
d = descriptors[0]
assert d['type'] == 'Image'
assert d['size'] == len(big)
# File actually landed on the host workspace.
host_file = os.path.join(ws, 'inbox', str(query.query_id), d['name'])
assert os.path.isfile(host_file)
assert open(host_file, 'rb').read() == big
@pytest.mark.asyncio
async def test_inbound_host_clears_stale_query_dir(self, tmp_path):
import base64
import langbot_plugin.api.entities.builtin.platform.message as platform_message
service, ws = self._service_with_workspace(tmp_path)
# Seed a stale file under the same query_id (simulates webchat id reuse).
stale_dir = os.path.join(ws, 'inbox', '42')
os.makedirs(stale_dir, exist_ok=True)
open(os.path.join(stale_dir, 'image_1.png'), 'wb').write(b'STALE-OLD-IMAGE')
new = b'\x89PNG\r\n\x1a\n NEW'
b64 = 'data:image/png;base64,' + base64.b64encode(new).decode()
query = make_query(query_id=42)
query.message_chain = platform_message.MessageChain([platform_message.Image(base64=b64)])
service.execute_tool = AsyncMock()
descriptors = await service.materialize_inbound_attachments(query)
# The new write recreated the dir; the stale file is gone, new bytes present.
host_file = os.path.join(stale_dir, descriptors[0]['name'])
assert open(host_file, 'rb').read() == new
# No leftover content from the stale image.
assert b'STALE-OLD-IMAGE' not in open(host_file, 'rb').read()
@pytest.mark.asyncio
async def test_outbound_reads_host_and_clears(self, tmp_path):
service, ws = self._service_with_workspace(tmp_path)
query = make_query()
outbox = os.path.join(ws, 'outbox', str(query.query_id))
os.makedirs(outbox, exist_ok=True)
# A large file that would be truncated on the exec/stdout path:
big_png = b'\x89PNG\r\n\x1a\n' + b'y' * (400 * 1024)
open(os.path.join(outbox, 'result.png'), 'wb').write(big_png)
open(os.path.join(outbox, 'notes.txt'), 'wb').write(b'hello')
service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used on host path'))
attachments = await service.collect_outbound_attachments(query)
by_name = {a['name']: a for a in attachments}
assert by_name['result.png']['type'] == 'Image'
assert by_name['notes.txt']['type'] == 'File'
# Full image survived (no truncation).
import base64
raw = base64.b64decode(by_name['result.png']['base64'].split(',', 1)[-1])
assert raw == big_png
# Outbox cleared after collection.
assert os.listdir(outbox) == []
@@ -15,6 +15,7 @@ import {
At,
Quote,
Voice,
File as FileComponent,
Source,
} from '@/app/infra/entities/message';
import { toast } from 'sonner';
@@ -460,13 +461,29 @@ export default function DebugDialog({
}
case 'File': {
const file = component as MessageChainComponent & { name?: string };
const file = component as FileComponent;
const downloadHref = file.base64
? file.base64.startsWith('data:')
? file.base64
: `data:application/octet-stream;base64,${file.base64}`
: file.url || '';
const fileName = file.name || 'Unknown';
return (
<div key={index} className="my-2 flex items-center gap-2 text-sm">
<Paperclip className="size-4" />
<span>
[{t('pipelines.debugDialog.file')}] {file.name || 'Unknown'}
</span>
{downloadHref ? (
<a
href={downloadHref}
download={fileName}
className="text-primary underline hover:opacity-80"
>
[{t('pipelines.debugDialog.file')}] {fileName}
</a>
) : (
<span>
[{t('pipelines.debugDialog.file')}] {fileName}
</span>
)}
</div>
);
}
@@ -64,6 +64,8 @@ export interface File extends MessageComponent {
name?: string;
size?: number;
url?: string;
path?: string;
base64?: string;
}
// Unknown component