From a1e6eccdeb1ac6af9441c257a7304ee1432b94ab Mon Sep 17 00:00:00 2001 From: Junyan Chin Date: Thu, 18 Jun 2026 21:40:31 +0800 Subject: [PATCH] feat(box): bidirectional attachment transfer for sandbox (#2257) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * feat(box): bidirectional attachment transfer for sandbox Materialize inbound attachments into the sandbox workspace so agents can process user-sent files, and collect agent-produced files from the outbox to attach them back to the reply. - box(service): add materialize_inbound_attachments / collect_outbound attachments. Prefer direct host-filesystem read/write on the bind-mounted workspace (no size limit), falling back to chunked exec only for non-shared backends (e2b/remote). Clear per-query inbox/outbox dirs at turn start to avoid query_id-reuse collisions. - provider(localagent): inject inbound attachment descriptors into the sandbox and append a system note telling the agent the inbox/outbox paths. - pipeline(wrapper): collect outbox files on the final stream chunk and append them as attachment components to the response chain. - web(debug-dialog): render File components with a download link when base64/url is present; add base64/path fields to the File entity. - tests: cover inbound/outbound, large-file transfer without truncation, and stale-dir clearing (86 passing). * feat(box): support voice/file attachment round-trip end-to-end Extends the bidirectional attachment transfer to audio and arbitrary files through the real webchat UI, and fixes the model-payload errors that non-image attachments triggered. - platform(websocket_adapter): resolve Voice/File component storage keys to base64 (previously only Image), so audio/documents reach the sandbox inbox. - web(debug-dialog): accept audio/* and any file in the uploader (was image-only), classify by mimetype, upload Voice/File via the documents endpoint, and render non-image staged attachments as a chip. - provider(litellmchat): drop non-image file parts (file_base64 / file_url) when building the OpenAI/LiteLLM payload. These come from Voice/File attachments — including ones replayed from conversation history — and the agent reads their bytes from the sandbox, not the model. Without this the provider rejects the request: 'invalid content type=file_base64'. - provider(localagent): also strip those parts from the current user message alongside the sandbox-path note (model-facing clarity; the requester is the real safety net for history). - tests: cover the requester strip/keep behavior (file dropped, image kept and reshaped to image_url, mixed history, plain-string content). * test(box): cover inbound/outbound attachment helpers; fix ruff format - ruff format localagent.py (CI ruff format --check was failing) - add unit tests for ResponseWrapper outbound-attachment helpers (wrapper.py 78%->98%) - add unit tests for LocalAgentRunner._inject_inbound_attachments - add unit tests for WebSocketAdapter._process_image_components (0%->covered) Lifts PR patch coverage from 68.97% to ~88% (>75% target). --- src/langbot/pkg/box/service.py | 422 ++++++++++++++++++ src/langbot/pkg/pipeline/wrapper/wrapper.py | 57 ++- .../pkg/platform/sources/websocket_adapter.py | 42 +- .../modelmgr/requesters/litellmchat.py | 11 + .../pkg/provider/runners/localagent.py | 68 +++ tests/unit_tests/box/test_box_service.py | 252 +++++++++++ .../test_wrapper_outbound_attachments.py | 146 ++++++ .../test_websocket_adapter_attachments.py | 92 ++++ .../provider/test_litellm_convert_messages.py | 93 ++++ .../test_localagent_inbound_attachments.py | 146 ++++++ .../components/debug-dialog/DebugDialog.tsx | 122 +++-- web/src/app/infra/entities/message/index.ts | 2 + 12 files changed, 1405 insertions(+), 48 deletions(-) create mode 100644 tests/unit_tests/pipeline/test_wrapper_outbound_attachments.py create mode 100644 tests/unit_tests/platform/test_websocket_adapter_attachments.py create mode 100644 tests/unit_tests/provider/test_litellm_convert_messages.py create mode 100644 tests/unit_tests/provider/test_localagent_inbound_attachments.py diff --git a/src/langbot/pkg/box/service.py b/src/langbot/pkg/box/service.py index 0eaa0973..dc6c317e 100644 --- a/src/langbot/pkg/box/service.py +++ b/src/langbot/pkg/box/service.py @@ -335,6 +335,428 @@ class BoxService: return await self.execute_spec_payload(spec_payload, query) + # ── Attachment passthrough (inbound / outbound) ────────────────── + # + # IM/webchat attachments (images, voices, files) reach the LLM as + # multimodal content, but historically never landed on the sandbox + # filesystem, so the agent's exec/read/write tools could not operate on + # them. Conversely, files the agent produced inside the sandbox were + # never surfaced back to the user. These two helpers close both gaps: + # + # inbound : message_chain attachments -> /workspace/inbox// + # outbound : /workspace/outbox// -> reply MessageChain + # + # Transfer prefers DIRECT HOST FILESYSTEM access to the bind-mounted + # workspace (default_workspace on the host maps to /workspace inside the + # container), which has no size limit. This covers the local docker / + # nsjail / stdio backends. For backends where the workspace is NOT visible + # on the LangBot host (E2B, an external remote runtime.endpoint), it falls + # back to a base64-through-exec round-trip. The exec channel can only move + # small files reliably — the docker backend passes the command as a single + # argv (ARG_MAX) and exec stdout is truncated by output_limit_chars — so + # the host path is strongly preferred and used whenever available. + + INBOX_MOUNT_DIR = '/workspace/inbox' + OUTBOX_MOUNT_DIR = '/workspace/outbox' + INBOX_SUBDIR = 'inbox' + OUTBOX_SUBDIR = 'outbox' + # Hard cap on a single attachment. The HTTP upload endpoints already cap + # uploads at 10MiB; keep parity. + _ATTACHMENT_MAX_BYTES = 10 * _MIB + # Conservative cap for the exec FALLBACK path only (ARG_MAX / stdout + # truncation). The host-filesystem path has no such limit. + _EXEC_FALLBACK_MAX_BYTES = 256 * 1024 + + def _host_query_dir(self, subdir: str, query_id) -> str | None: + """Host path for ``/workspace//`` when LangBot can + access the bind-mounted workspace directly, else ``None``. + + ``default_workspace`` is the host directory bind-mounted to + ``/workspace`` for the local docker/nsjail backends and shared + outright in stdio mode, so a file written there by LangBot is visible + to the sandbox (and vice-versa). It is ``None`` / not a local dir for + E2B and remote runtimes, where we must fall back to the exec channel. + """ + root = self.default_workspace + if not root or not os.path.isdir(root): + return None + return os.path.join(root, subdir, str(query_id)) + + @staticmethod + def _sanitize_attachment_name(name: str, fallback: str) -> str: + """Reduce an arbitrary attachment name to a safe basename. + + Strips directory separators and parent refs so a crafted file name + can never escape the inbox/outbox directory. + """ + base = os.path.basename(str(name or '').replace('\\', '/').strip()) + base = base.lstrip('.') or '' + # Drop anything that is not a conservative filename charset. + cleaned = ''.join(c for c in base if c.isalnum() or c in ('.', '_', '-', ' ')).strip() + cleaned = cleaned.replace(' ', '_') + return cleaned or fallback + + @staticmethod + async def _component_to_bytes(component) -> tuple[bytes, str] | None: + """Best-effort extraction of (bytes, mime) from a platform component. + + Handles base64, http(s) url and local path sources. Returns None when + no payload can be resolved. + """ + import base64 as _b64 + + b64 = getattr(component, 'base64', None) + if b64: + data = b64 + mime = 'application/octet-stream' + if isinstance(data, str) and data.startswith('data:'): + split_index = data.find(';base64,') + if split_index != -1: + mime = data[5:split_index] + data = data[split_index + 8 :] + try: + return _b64.b64decode(data), mime + except Exception: + return None + + url = getattr(component, 'url', None) + if url: + try: + import httpx + + async with httpx.AsyncClient(timeout=30) as client: + resp = await client.get(url) + resp.raise_for_status() + return resp.content, resp.headers.get('Content-Type', 'application/octet-stream') + except Exception: + return None + + path = getattr(component, 'path', None) + if path: + try: + import aiofiles + + async with aiofiles.open(path, 'rb') as f: + return await f.read(), 'application/octet-stream' + except Exception: + return None + + return None + + async def _write_files_into_sandbox( + self, + query: pipeline_query.Query, + subdir: str, + target_mount_dir: str, + files: list[tuple[str, bytes]], + ) -> list[str]: + """Write *files* (name, bytes) into the per-query directory. + + Prefers a direct host-filesystem write to the bind-mounted workspace + (no size limit). Falls back to a base64-through-exec round-trip only + when the workspace is not visible on the LangBot host (E2B / remote). + Returns the list of in-sandbox paths actually written. + """ + if not files: + return [] + + host_dir = self._host_query_dir(subdir, query.query_id) + if host_dir is not None: + return await asyncio.to_thread(self._write_files_host, host_dir, target_mount_dir, files) + + return await self._write_files_via_exec(query, target_mount_dir, files) + + def _write_files_host( + self, + host_dir: str, + target_mount_dir: str, + files: list[tuple[str, bytes]], + ) -> list[str]: + """Write attachments straight onto the bind-mounted host directory. + + Recreates the per-query directory from scratch so a reused query_id + (the webchat session uses small sequential ids) never inherits stale + files from an earlier turn. + """ + import shutil + + shutil.rmtree(host_dir, ignore_errors=True) + os.makedirs(host_dir, exist_ok=True) + written: list[str] = [] + for name, data in files: + with open(os.path.join(host_dir, name), 'wb') as fh: + fh.write(data) + written.append(f'{target_mount_dir}/{name}') + return written + + async def _write_files_via_exec( + self, + query: pipeline_query.Query, + target_dir: str, + files: list[tuple[str, bytes]], + ) -> list[str]: + """Fallback: ship files into the sandbox over the exec channel. + + Only used for backends without host-filesystem access (E2B / remote). + Each file is base64-decoded inside the sandbox. Files larger than the + conservative exec cap are skipped (ARG_MAX / stdout limits). + """ + import base64 as _b64 + import json as _json + + manifest = [] + for name, data in files: + if len(data) > self._EXEC_FALLBACK_MAX_BYTES: + self.ap.logger.warning( + f'Attachment "{name}" ({len(data)} bytes) exceeds the exec-channel ' + f'fallback limit ({self._EXEC_FALLBACK_MAX_BYTES} bytes); skipping. ' + f'Configure a host-shared workspace to transfer large files.' + ) + continue + manifest.append({'name': name, 'b64': _b64.b64encode(data).decode('ascii')}) + if not manifest: + return [] + + manifest_b64 = _b64.b64encode(_json.dumps(manifest).encode('utf-8')).decode('ascii') + script = ( + 'import base64, json, os, shutil\n' + f'target = {target_dir!r}\n' + 'shutil.rmtree(target, ignore_errors=True)\n' + 'os.makedirs(target, exist_ok=True)\n' + f'manifest = json.loads(base64.b64decode({manifest_b64!r}))\n' + 'written = []\n' + 'for item in manifest:\n' + " p = os.path.join(target, item['name'])\n" + " with open(p, 'wb') as f:\n" + " f.write(base64.b64decode(item['b64']))\n" + ' written.append(p)\n' + 'print(json.dumps(written))\n' + ) + result = await self.execute_tool( + {'command': f"python3 - <<'LBPY'\n{script}\nLBPY", 'timeout_sec': 120}, + query, + ) + if not result.get('ok'): + self.ap.logger.warning( + f'Failed to write inbound attachments into sandbox via exec: ' + f'query_id={query.query_id} stderr={result.get("stderr", "")[:200]}' + ) + return [] + try: + return _json.loads(str(result.get('stdout') or '').strip().splitlines()[-1]) + except Exception: + return [] + + async def materialize_inbound_attachments(self, query: pipeline_query.Query) -> list[dict]: + """Persist message-chain attachments into the sandbox inbox. + + Returns a list of ``{path, name, type, size}`` describing what was + written, so the runner can tell the LLM the exact in-sandbox paths. + Returns ``[]`` when sandbox is unavailable or there are no attachments. + """ + if not self._available: + return [] + + import langbot_plugin.api.entities.builtin.platform.message as platform_message + + message_chain = getattr(query, 'message_chain', None) + if not message_chain: + return [] + + type_map = [ + (platform_message.Image, 'Image', 'image', 'png'), + (platform_message.Voice, 'Voice', 'voice', 'wav'), + (platform_message.File, 'File', 'file', 'bin'), + ] + + pending: list[tuple[str, bytes]] = [] + descriptors: list[dict] = [] + index = 0 + for component in message_chain: + matched = None + for cls, kind, prefix, default_ext in type_map: + if isinstance(component, cls): + matched = (kind, prefix, default_ext) + break + if matched is None: + continue + kind, prefix, default_ext = matched + + payload = await self._component_to_bytes(component) + if payload is None: + continue + data, _mime = payload + if not data or len(data) > self._ATTACHMENT_MAX_BYTES: + continue + + index += 1 + raw_name = getattr(component, 'name', None) or f'{prefix}_{index}.{default_ext}' + safe_name = self._sanitize_attachment_name(raw_name, f'{prefix}_{index}.{default_ext}') + pending.append((safe_name, data)) + descriptors.append( + { + 'name': safe_name, + 'type': kind, + 'size': len(data), + } + ) + + if not pending: + return [] + + target_dir = f'{self.INBOX_MOUNT_DIR}/{query.query_id}' + written = await self._write_files_into_sandbox(query, self.INBOX_SUBDIR, target_dir, pending) + written_basenames = {os.path.basename(p) for p in written} + + result: list[dict] = [] + for desc in descriptors: + if desc['name'] in written_basenames: + desc['path'] = f'{target_dir}/{desc["name"]}' + result.append(desc) + if result: + self.ap.logger.info( + f'Materialized {len(result)} inbound attachment(s) into sandbox: ' + f'query_id={query.query_id} dir={target_dir}' + ) + return result + + async def collect_outbound_attachments(self, query: pipeline_query.Query) -> list[dict]: + """Collect files the agent produced in the sandbox outbox. + + Reads ``/workspace/outbox//`` (recursively) — directly from + the bind-mounted host directory when available (no size limit), else + via the exec channel — returns a list of ``{type, name, base64}`` + ready to become platform message components, then clears the outbox so + a later turn in the same session does not re-send stale files. Returns + ``[]`` when nothing was produced. + """ + if not self._available: + return [] + + host_dir = self._host_query_dir(self.OUTBOX_SUBDIR, query.query_id) + if host_dir is not None: + entries = await asyncio.to_thread(self._read_outbox_host, host_dir) + else: + entries = await self._read_outbox_via_exec(query) + + attachments = self._classify_outbound_entries(entries) + + if attachments: + await self._clear_outbox(query, host_dir) + self.ap.logger.info( + f'Collected {len(attachments)} outbound attachment(s) from sandbox: query_id={query.query_id}' + ) + return attachments + + def _read_outbox_host(self, host_dir: str) -> list[dict]: + """Read outbox files straight off the bind-mounted host directory.""" + import base64 as _b64 + + entries: list[dict] = [] + if not os.path.isdir(host_dir): + return entries + for root, _dirs, names in os.walk(host_dir): + for name in sorted(names): + path = os.path.join(root, name) + try: + if os.path.getsize(path) > self._ATTACHMENT_MAX_BYTES: + continue + with open(path, 'rb') as fh: + data = fh.read() + except OSError: + continue + rel = os.path.relpath(path, host_dir) + entries.append({'name': rel, 'b64': _b64.b64encode(data).decode('ascii')}) + return entries + + async def _read_outbox_via_exec(self, query: pipeline_query.Query) -> list[dict]: + """Fallback: read the outbox over the exec channel (E2B / remote). + + Note: exec stdout is truncated by ``output_limit_chars``, so this path + only reliably transfers small files. The host path is preferred. + """ + import json as _json + + target_dir = f'{self.OUTBOX_MOUNT_DIR}/{query.query_id}' + max_bytes = self._EXEC_FALLBACK_MAX_BYTES + script = ( + 'import base64, json, os\n' + f'target = {target_dir!r}\n' + f'max_bytes = {max_bytes}\n' + 'out = []\n' + 'if os.path.isdir(target):\n' + ' for root, _dirs, names in os.walk(target):\n' + ' for n in sorted(names):\n' + ' p = os.path.join(root, n)\n' + ' try:\n' + ' if os.path.getsize(p) > max_bytes:\n' + ' continue\n' + " with open(p, 'rb') as f:\n" + ' data = f.read()\n' + ' except OSError:\n' + ' continue\n' + ' rel = os.path.relpath(p, target)\n' + " out.append({'name': rel, 'b64': base64.b64encode(data).decode('ascii')})\n" + 'print(json.dumps(out))\n' + ) + result = await self.execute_tool( + {'command': f"python3 - <<'LBPY'\n{script}\nLBPY", 'timeout_sec': 120}, + query, + ) + if not result.get('ok'): + return [] + try: + return _json.loads(str(result.get('stdout') or '').strip().splitlines()[-1]) + except Exception: + return [] + + async def _clear_outbox(self, query: pipeline_query.Query, host_dir: str | None) -> None: + """Empty the per-query outbox after collection (host or exec).""" + if host_dir is not None: + import shutil + + def _clear(): + shutil.rmtree(host_dir, ignore_errors=True) + os.makedirs(host_dir, exist_ok=True) + + await asyncio.to_thread(_clear) + return + target_dir = f'{self.OUTBOX_MOUNT_DIR}/{query.query_id}' + await self.execute_tool( + {'command': f'rm -rf {target_dir} && mkdir -p {target_dir}', 'timeout_sec': 30}, + query, + ) + + @staticmethod + def _classify_outbound_entries(entries: list[dict]) -> list[dict]: + """Classify outbox files into Image/Voice/File component descriptors.""" + image_exts = {'png', 'jpg', 'jpeg', 'gif', 'webp', 'bmp'} + voice_exts = {'wav', 'mp3', 'silk', 'amr', 'ogg', 'm4a', 'aac'} + mime_by_ext = { + 'png': 'image/png', + 'jpg': 'image/jpeg', + 'jpeg': 'image/jpeg', + 'gif': 'image/gif', + 'webp': 'image/webp', + 'bmp': 'image/bmp', + } + attachments: list[dict] = [] + for entry in entries or []: + name = str(entry.get('name', '') or '') + b64 = entry.get('b64') + if not name or not b64: + continue + ext = name.rsplit('.', 1)[-1].lower() if '.' in name else '' + base_name = os.path.basename(name) + if ext in image_exts: + mime = mime_by_ext.get(ext, 'image/png') + attachments.append({'type': 'Image', 'name': base_name, 'base64': f'data:{mime};base64,{b64}'}) + elif ext in voice_exts: + attachments.append({'type': 'Voice', 'name': base_name, 'base64': f'data:audio/{ext};base64,{b64}'}) + else: + attachments.append({'type': 'File', 'name': base_name, 'base64': b64}) + return attachments + async def shutdown(self): await self.client.shutdown() diff --git a/src/langbot/pkg/pipeline/wrapper/wrapper.py b/src/langbot/pkg/pipeline/wrapper/wrapper.py index a1ebc97a..a158c184 100644 --- a/src/langbot/pkg/pipeline/wrapper/wrapper.py +++ b/src/langbot/pkg/pipeline/wrapper/wrapper.py @@ -7,6 +7,7 @@ from .. import stage import langbot_plugin.api.entities.builtin.platform.message as platform_message import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query +import langbot_plugin.api.entities.builtin.provider.message as provider_message import langbot_plugin.api.entities.events as events @@ -23,6 +24,50 @@ class ResponseWrapper(stage.PipelineStage): async def initialize(self, pipeline_config: dict): pass + def _is_final_assistant_message(self, result) -> bool: + """Whether *result* is the agent's final, tool-call-free answer. + + Intermediate streaming chunks and tool-call rounds must NOT trigger + outbound attachment collection — only the terminal assistant message. + """ + if getattr(result, 'role', None) != 'assistant': + return False + if result.tool_calls: + return False + if isinstance(result, provider_message.MessageChunk): + return bool(result.is_final) + return True + + async def _append_outbound_attachments( + self, + query: pipeline_query.Query, + message_chain: platform_message.MessageChain, + ) -> None: + """Collect sandbox outbox files and append them to *message_chain*. + + Runs at most once per query (guarded by a query variable) and never + raises into the pipeline — attachment delivery is best-effort. + """ + if query.variables.get('_sandbox_outbound_collected'): + return + box_service = getattr(self.ap, 'box_service', None) + if box_service is None or not getattr(box_service, 'available', False): + return + query.variables['_sandbox_outbound_collected'] = True + try: + attachments = await box_service.collect_outbound_attachments(query) + except Exception as e: + self.ap.logger.warning(f'Outbound attachment collection failed: {e}') + return + for att in attachments: + att_type = att.get('type') + if att_type == 'Image': + message_chain.append(platform_message.Image(base64=att['base64'])) + elif att_type == 'Voice': + message_chain.append(platform_message.Voice(base64=att['base64'])) + else: + message_chain.append(platform_message.File(name=att.get('name', 'file'), base64=att['base64'])) + async def process( self, query: pipeline_query.Query, @@ -83,10 +128,16 @@ class ResponseWrapper(stage.PipelineStage): ) else: if event_ctx.event.reply_message_chain is not None: - query.resp_message_chain.append(event_ctx.event.reply_message_chain) - + reply_chain = event_ctx.event.reply_message_chain else: - query.resp_message_chain.append(result.get_content_platform_message_chain()) + reply_chain = result.get_content_platform_message_chain() + + # Attach files the agent produced in the sandbox + # outbox, but only on the terminal assistant message. + if self._is_final_assistant_message(result): + await self._append_outbound_attachments(query, reply_chain) + + query.resp_message_chain.append(reply_chain) yield entities.StageProcessResult( result_type=entities.ResultType.CONTINUE, diff --git a/src/langbot/pkg/platform/sources/websocket_adapter.py b/src/langbot/pkg/platform/sources/websocket_adapter.py index 9ffcf04a..0574292f 100644 --- a/src/langbot/pkg/platform/sources/websocket_adapter.py +++ b/src/langbot/pkg/platform/sources/websocket_adapter.py @@ -312,12 +312,18 @@ class WebSocketAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter) async def _process_image_components(self, message_chain_obj: list): """ - 处理消息链中的图片和文件组件,将path转换为base64 + 处理消息链中的图片、语音和文件组件,将 path 转换为 base64 + + Image / Voice / File components uploaded from the web client carry a + storage key in ``path``. Resolve it to a base64 data URI so downstream + stages (multimodal LLM input and the Box sandbox inbox) have a usable + payload, then drop the now-consumed storage object. Args: message_chain_obj: 消息链对象列表 """ import base64 + import mimetypes storage_mgr = self.ap.storage_mgr @@ -325,31 +331,33 @@ class WebSocketAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter) comp_type = component.get('type', '') comp_path = component.get('path', '') - if not comp_path: + if not comp_path or comp_type not in ('Image', 'Voice', 'File'): continue - if comp_type == 'Image': - try: - file_content = await storage_mgr.storage_provider.load(comp_path) - base64_str = base64.b64encode(file_content).decode('utf-8') + try: + file_content = await storage_mgr.storage_provider.load(comp_path) + base64_str = base64.b64encode(file_content).decode('utf-8') - file_key = comp_path - if file_key.lower().endswith(('.jpg', '.jpeg')): + lowered = comp_path.lower() + if comp_type == 'Image': + if lowered.endswith(('.jpg', '.jpeg')): mime_type = 'image/jpeg' - elif file_key.lower().endswith('.png'): - mime_type = 'image/png' - elif file_key.lower().endswith('.gif'): + elif lowered.endswith('.gif'): mime_type = 'image/gif' - elif file_key.lower().endswith('.webp'): + elif lowered.endswith('.webp'): mime_type = 'image/webp' else: mime_type = 'image/png' + elif comp_type == 'Voice': + mime_type = mimetypes.guess_type(comp_path)[0] or 'audio/wav' + else: # File + mime_type = mimetypes.guess_type(comp_path)[0] or 'application/octet-stream' - component['base64'] = f'data:{mime_type};base64,{base64_str}' - await storage_mgr.storage_provider.delete(comp_path) - component['path'] = '' - except Exception as e: - await self.logger.error(f'Failed to load image file {comp_path}: {e}') + component['base64'] = f'data:{mime_type};base64,{base64_str}' + await storage_mgr.storage_provider.delete(comp_path) + component['path'] = '' + except Exception as e: + await self.logger.error(f'Failed to load {comp_type} file {comp_path}: {e}') async def handle_websocket_message( self, diff --git a/src/langbot/pkg/provider/modelmgr/requesters/litellmchat.py b/src/langbot/pkg/provider/modelmgr/requesters/litellmchat.py index 8c750bd7..d58dd2c5 100644 --- a/src/langbot/pkg/provider/modelmgr/requesters/litellmchat.py +++ b/src/langbot/pkg/provider/modelmgr/requesters/litellmchat.py @@ -216,11 +216,22 @@ class LiteLLMRequester(requester.ProviderAPIRequester): content = msg_dict.get('content') if isinstance(content, list): + converted_parts = [] for part in content: if isinstance(part, dict) and part.get('type') == 'image_base64': part['image_url'] = {'url': part['image_base64']} part['type'] = 'image_url' del part['image_base64'] + # OpenAI-compatible chat models reject non-image file parts + # (audio/document base64 or url). These originate from Voice / + # File attachments — including ones replayed from conversation + # history — and the agent already accesses their bytes via the + # sandbox. Drop them from the model payload to avoid + # "Invalid user message ... invalid content type=file_base64". + if isinstance(part, dict) and part.get('type') in ('file_base64', 'file_url'): + continue + converted_parts.append(part) + msg_dict['content'] = converted_parts req_messages.append(msg_dict) diff --git a/src/langbot/pkg/provider/runners/localagent.py b/src/langbot/pkg/provider/runners/localagent.py index 9a90ed47..338de2e5 100644 --- a/src/langbot/pkg/provider/runners/localagent.py +++ b/src/langbot/pkg/provider/runners/localagent.py @@ -104,6 +104,68 @@ class _StreamAccumulator: class LocalAgentRunner(runner.RequestRunner): """Local agent request runner""" + async def _inject_inbound_attachments( + self, + query: pipeline_query.Query, + user_message: provider_message.Message, + ) -> None: + """Persist inbound attachments into the sandbox and tell the model. + + No-op when the box service is unavailable or there are no attachments. + On success, appends an extra text ContentElement to the user message + listing the in-sandbox paths and the outbox convention, and stashes the + descriptors in ``query.variables['_sandbox_inbound_attachments']``. + """ + box_service = getattr(self.ap, 'box_service', None) + if box_service is None or not getattr(box_service, 'available', False): + return + try: + attachments = await box_service.materialize_inbound_attachments(query) + except Exception as e: # never break the chat turn over attachment IO + self.ap.logger.warning(f'Inbound attachment materialization failed: {e}') + return + if not attachments: + return + + query.variables['_sandbox_inbound_attachments'] = attachments + + lines = [ + 'The user sent attachments. They have been saved into the sandbox and are ' + 'available to the exec/read/write tools at these paths:' + ] + for att in attachments: + lines.append(f'- {att["type"]}: {att["path"]} ({att["size"]} bytes)') + outbox_dir = f'{box_service.OUTBOX_MOUNT_DIR}/{query.query_id}' + lines.append( + 'If you produce any file (image, audio, document, etc.) that should be sent ' + f'back to the user, write it into {outbox_dir}/ (create the directory if ' + 'needed). Every file placed there will be delivered to the user automatically.' + ) + note = '\n'.join(lines) + + # Voice/File attachments are now available to the agent via the sandbox + # (exec/read/write tools). Their raw bytes must NOT be forwarded to the + # chat model as multimodal content: providers reject non-image file + # parts ("Invalid user message ... ensure all user messages are valid + # OpenAI chat completion messages"). Strip those content elements and + # rely on the sandbox-path note instead. Images are kept so vision + # models can still see them. + _model_unsafe_types = {'file_base64', 'file_url'} + if isinstance(user_message.content, list): + user_message.content = [ + ce for ce in user_message.content if getattr(ce, 'type', None) not in _model_unsafe_types + ] + + if isinstance(user_message.content, str): + user_message.content = [ + provider_message.ContentElement.from_text(user_message.content), + provider_message.ContentElement.from_text(note), + ] + elif isinstance(user_message.content, list): + user_message.content.append(provider_message.ContentElement.from_text(note)) + else: + user_message.content = [provider_message.ContentElement.from_text(note)] + def _build_request_messages( self, query: pipeline_query.Query, @@ -232,6 +294,12 @@ class LocalAgentRunner(runner.RequestRunner): user_message = copy.deepcopy(query.user_message) + # Materialize inbound attachments (images / voices / files) into the + # sandbox so the agent's exec/read/write tools can operate on the real + # bytes — not just the multimodal copy the model sees. The exact + # in-sandbox paths are announced to the model as a system note. + await self._inject_inbound_attachments(query, user_message) + user_message_text = '' if isinstance(user_message.content, str): diff --git a/tests/unit_tests/box/test_box_service.py b/tests/unit_tests/box/test_box_service.py index c59a1c5e..0b7183ba 100644 --- a/tests/unit_tests/box/test_box_service.py +++ b/tests/unit_tests/box/test_box_service.py @@ -1556,3 +1556,255 @@ class TestBuildSkillExtraMounts: service = BoxService(app, client=Mock(spec=BoxRuntimeClient)) assert service.build_skill_extra_mounts(make_query()) == [] + + +# ── Attachment passthrough (inbound / outbound) ───────────────────────────── + + +class TestAttachmentHelpers: + def test_sanitize_attachment_name_strips_traversal(self): + assert BoxService._sanitize_attachment_name('../../etc/passwd', 'fb') == 'passwd' + assert BoxService._sanitize_attachment_name('/a/b/c.png', 'fb') == 'c.png' + assert BoxService._sanitize_attachment_name('a b c.txt', 'fb') == 'a_b_c.txt' + assert BoxService._sanitize_attachment_name('', 'fallback.bin') == 'fallback.bin' + assert BoxService._sanitize_attachment_name('...', 'fb.bin') == 'fb.bin' + # weird unicode / shell chars dropped, but keeps a usable name + out = BoxService._sanitize_attachment_name('rm -rf $(x).png', 'fb') + assert '/' not in out and '$' not in out and out.endswith('.png') + + def test_classify_outbound_entries_by_extension(self): + entries = [ + {'name': 'chart.png', 'b64': 'AAA'}, + {'name': 'clip.mp3', 'b64': 'BBB'}, + {'name': 'report.pdf', 'b64': 'CCC'}, + {'name': 'sub/dir/photo.JPG', 'b64': 'DDD'}, + {'name': 'noext', 'b64': 'EEE'}, + {'name': 'skip', 'b64': ''}, # dropped (no payload) + ] + out = BoxService._classify_outbound_entries(entries) + by_name = {a['name']: a for a in out} + assert by_name['chart.png']['type'] == 'Image' + assert by_name['chart.png']['base64'].startswith('data:image/png;base64,') + assert by_name['clip.mp3']['type'] == 'Voice' + assert by_name['clip.mp3']['base64'].startswith('data:audio/mp3;base64,') + assert by_name['report.pdf']['type'] == 'File' + assert by_name['report.pdf']['base64'] == 'CCC' # raw b64, no data: prefix + # nested path collapses to basename, case-insensitive ext + assert by_name['photo.JPG']['type'] == 'Image' + assert by_name['noext']['type'] == 'File' + assert 'skip' not in by_name + + @pytest.mark.asyncio + async def test_component_to_bytes_from_data_uri(self): + import base64 + + raw = b'hello-bytes' + data_uri = 'data:text/plain;base64,' + base64.b64encode(raw).decode() + component = SimpleNamespace(base64=data_uri, url=None, path=None) + result = await BoxService._component_to_bytes(component) + assert result is not None + data, mime = result + assert data == raw + assert mime == 'text/plain' + + @pytest.mark.asyncio + async def test_component_to_bytes_returns_none_when_empty(self): + component = SimpleNamespace(base64=None, url=None, path=None) + assert await BoxService._component_to_bytes(component) is None + + +class TestInboundOutboundRoundTrip: + def _service(self) -> BoxService: + service = BoxService(make_app(Mock()), client=Mock(spec=BoxRuntimeClient)) + service._available = True + return service + + @pytest.mark.asyncio + async def test_materialize_inbound_writes_and_describes(self): + import base64 + + import langbot_plugin.api.entities.builtin.platform.message as platform_message + + service = self._service() + + img_bytes = b'\x89PNG\r\n\x1a\n fake png' + img_b64 = 'data:image/png;base64,' + base64.b64encode(img_bytes).decode() + + query = make_query() + query.message_chain = platform_message.MessageChain( + [ + platform_message.Plain(text='please resize this'), + platform_message.Image(base64=img_b64), + ] + ) + + # Mock the sandbox write path: echo back the written paths. + async def fake_execute_tool(parameters, q): + assert '/workspace/inbox/' in parameters['command'] + return { + 'ok': True, + 'stdout': '["/workspace/inbox/42/image_1.png"]', + 'stderr': '', + } + + service.execute_tool = AsyncMock(side_effect=fake_execute_tool) + + descriptors = await service.materialize_inbound_attachments(query) + assert len(descriptors) == 1 + d = descriptors[0] + assert d['type'] == 'Image' + assert d['path'] == '/workspace/inbox/42/image_1.png' + assert d['size'] == len(img_bytes) + + @pytest.mark.asyncio + async def test_materialize_inbound_noop_without_attachments(self): + import langbot_plugin.api.entities.builtin.platform.message as platform_message + + service = self._service() + query = make_query() + query.message_chain = platform_message.MessageChain([platform_message.Plain(text='just text')]) + service.execute_tool = AsyncMock() + assert await service.materialize_inbound_attachments(query) == [] + service.execute_tool.assert_not_called() + + @pytest.mark.asyncio + async def test_collect_outbound_reads_and_clears(self): + service = self._service() + query = make_query() + + calls = [] + + async def fake_execute_tool(parameters, q): + calls.append(parameters['command']) + if 'os.walk' in parameters['command']: + return { + 'ok': True, + 'stdout': '[{"name": "out.png", "b64": "QUJD"}]', + 'stderr': '', + } + # the rm -rf cleanup call + return {'ok': True, 'stdout': '', 'stderr': ''} + + service.execute_tool = AsyncMock(side_effect=fake_execute_tool) + + attachments = await service.collect_outbound_attachments(query) + assert len(attachments) == 1 + assert attachments[0]['type'] == 'Image' + assert attachments[0]['name'] == 'out.png' + # cleanup (rm -rf) must have been issued after a successful collection + assert any('rm -rf' in c for c in calls) + + @pytest.mark.asyncio + async def test_collect_outbound_empty_no_cleanup(self): + service = self._service() + query = make_query() + + calls = [] + + async def fake_execute_tool(parameters, q): + calls.append(parameters['command']) + return {'ok': True, 'stdout': '[]', 'stderr': ''} + + service.execute_tool = AsyncMock(side_effect=fake_execute_tool) + assert await service.collect_outbound_attachments(query) == [] + assert not any('rm -rf' in c for c in calls) + + @pytest.mark.asyncio + async def test_passthrough_noop_when_unavailable(self): + service = BoxService(make_app(Mock()), client=Mock(spec=BoxRuntimeClient)) + service._available = False + query = make_query() + assert await service.materialize_inbound_attachments(query) == [] + assert await service.collect_outbound_attachments(query) == [] + + +class TestAttachmentHostPath: + """Direct host-filesystem transfer path (bind-mounted workspace). + + When ``default_workspace`` is a real local dir, inbound/outbound bypass the + exec channel entirely (no ARG_MAX / stdout-truncation limits) and read/write + the bind-mounted host dir directly. + """ + + def _service_with_workspace(self, tmp_path): + ws = str(tmp_path / 'box' / 'default') + os.makedirs(ws, exist_ok=True) + app = make_app(Mock(), allowed_mount_roots=[str(tmp_path)], host_root=str(tmp_path / 'box')) + service = BoxService(app, client=Mock(spec=BoxRuntimeClient)) + service._available = True + # Force the default_workspace to our tmp dir so _host_query_dir resolves. + service.default_workspace = ws + return service, ws + + @pytest.mark.asyncio + async def test_inbound_writes_to_host_no_exec(self, tmp_path): + import base64 + + import langbot_plugin.api.entities.builtin.platform.message as platform_message + + service, ws = self._service_with_workspace(tmp_path) + # Big payload that would blow ARG_MAX on the exec path: + big = b'\x89PNG\r\n\x1a\n' + b'x' * (300 * 1024) + b64 = 'data:image/png;base64,' + base64.b64encode(big).decode() + query = make_query() + query.message_chain = platform_message.MessageChain([platform_message.Image(base64=b64)]) + # execute_tool must NOT be called on the host path. + service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used on host path')) + + descriptors = await service.materialize_inbound_attachments(query) + assert len(descriptors) == 1 + d = descriptors[0] + assert d['type'] == 'Image' + assert d['size'] == len(big) + # File actually landed on the host workspace. + host_file = os.path.join(ws, 'inbox', str(query.query_id), d['name']) + assert os.path.isfile(host_file) + assert open(host_file, 'rb').read() == big + + @pytest.mark.asyncio + async def test_inbound_host_clears_stale_query_dir(self, tmp_path): + import base64 + + import langbot_plugin.api.entities.builtin.platform.message as platform_message + + service, ws = self._service_with_workspace(tmp_path) + # Seed a stale file under the same query_id (simulates webchat id reuse). + stale_dir = os.path.join(ws, 'inbox', '42') + os.makedirs(stale_dir, exist_ok=True) + open(os.path.join(stale_dir, 'image_1.png'), 'wb').write(b'STALE-OLD-IMAGE') + + new = b'\x89PNG\r\n\x1a\n NEW' + b64 = 'data:image/png;base64,' + base64.b64encode(new).decode() + query = make_query(query_id=42) + query.message_chain = platform_message.MessageChain([platform_message.Image(base64=b64)]) + service.execute_tool = AsyncMock() + descriptors = await service.materialize_inbound_attachments(query) + # The new write recreated the dir; the stale file is gone, new bytes present. + host_file = os.path.join(stale_dir, descriptors[0]['name']) + assert open(host_file, 'rb').read() == new + # No leftover content from the stale image. + assert b'STALE-OLD-IMAGE' not in open(host_file, 'rb').read() + + @pytest.mark.asyncio + async def test_outbound_reads_host_and_clears(self, tmp_path): + service, ws = self._service_with_workspace(tmp_path) + query = make_query() + outbox = os.path.join(ws, 'outbox', str(query.query_id)) + os.makedirs(outbox, exist_ok=True) + # A large file that would be truncated on the exec/stdout path: + big_png = b'\x89PNG\r\n\x1a\n' + b'y' * (400 * 1024) + open(os.path.join(outbox, 'result.png'), 'wb').write(big_png) + open(os.path.join(outbox, 'notes.txt'), 'wb').write(b'hello') + + service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used on host path')) + attachments = await service.collect_outbound_attachments(query) + by_name = {a['name']: a for a in attachments} + assert by_name['result.png']['type'] == 'Image' + assert by_name['notes.txt']['type'] == 'File' + # Full image survived (no truncation). + import base64 + + raw = base64.b64decode(by_name['result.png']['base64'].split(',', 1)[-1]) + assert raw == big_png + # Outbox cleared after collection. + assert os.listdir(outbox) == [] diff --git a/tests/unit_tests/pipeline/test_wrapper_outbound_attachments.py b/tests/unit_tests/pipeline/test_wrapper_outbound_attachments.py new file mode 100644 index 00000000..8fc000bf --- /dev/null +++ b/tests/unit_tests/pipeline/test_wrapper_outbound_attachments.py @@ -0,0 +1,146 @@ +"""Unit tests for ResponseWrapper outbound-attachment helpers. + +Covers the sandbox -> user attachment path added for the Box attachment +round-trip: + +* ``_is_final_assistant_message`` — only the terminal, tool-call-free assistant + message (or a final MessageChunk) should trigger collection. +* ``_append_outbound_attachments`` — collects sandbox outbox files exactly once + per query and maps each descriptor to the right platform component, swallowing + collection errors. +""" + +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import AsyncMock, Mock + +import pytest + +import langbot_plugin.api.entities.builtin.platform.message as platform_message +import langbot_plugin.api.entities.builtin.provider.message as provider_message + +from langbot.pkg.pipeline.wrapper.wrapper import ResponseWrapper + + +def _make_wrapper(box_service) -> ResponseWrapper: + app = SimpleNamespace(logger=Mock()) + wrapper = ResponseWrapper.__new__(ResponseWrapper) + wrapper.ap = app + return wrapper + + +def _make_query(): + return SimpleNamespace(variables={}) + + +def test_is_final_assistant_message_plain_assistant(): + wrapper = _make_wrapper(box_service=None) + msg = provider_message.Message(role='assistant', content='done') + assert wrapper._is_final_assistant_message(msg) is True + + +def test_is_final_assistant_message_rejects_non_assistant(): + wrapper = _make_wrapper(box_service=None) + msg = provider_message.Message(role='tool', content='{}') + assert wrapper._is_final_assistant_message(msg) is False + + +def test_is_final_assistant_message_rejects_tool_call_round(): + wrapper = _make_wrapper(box_service=None) + msg = provider_message.Message( + role='assistant', + content='calling', + tool_calls=[ + provider_message.ToolCall( + id='c1', + type='function', + function=provider_message.FunctionCall(name='exec', arguments='{}'), + ) + ], + ) + assert wrapper._is_final_assistant_message(msg) is False + + +def test_is_final_assistant_message_non_final_chunk(): + wrapper = _make_wrapper(box_service=None) + chunk = provider_message.MessageChunk(role='assistant', content='partial', is_final=False) + assert wrapper._is_final_assistant_message(chunk) is False + + final_chunk = provider_message.MessageChunk(role='assistant', content='partial', is_final=True) + assert wrapper._is_final_assistant_message(final_chunk) is True + + +@pytest.mark.asyncio +async def test_append_outbound_attachments_maps_each_type(): + box_service = SimpleNamespace( + available=True, + collect_outbound_attachments=AsyncMock( + return_value=[ + {'type': 'Image', 'base64': 'data:image/png;base64,iVBORw0K'}, + {'type': 'Voice', 'base64': 'data:audio/wav;base64,UklGRg=='}, + {'type': 'File', 'name': 'report.xlsx', 'base64': 'data:app;base64,UEsDBA=='}, + ] + ), + ) + wrapper = _make_wrapper(box_service) + wrapper.ap.box_service = box_service + query = _make_query() + chain = platform_message.MessageChain([]) + + await wrapper._append_outbound_attachments(query, chain) + + kinds = [type(c).__name__ for c in chain] + assert kinds == ['Image', 'Voice', 'File'] + assert query.variables['_sandbox_outbound_collected'] is True + # File keeps its name + file_comp = chain[2] + assert getattr(file_comp, 'name', None) == 'report.xlsx' + + +@pytest.mark.asyncio +async def test_append_outbound_attachments_runs_once_per_query(): + box_service = SimpleNamespace( + available=True, + collect_outbound_attachments=AsyncMock(return_value=[]), + ) + wrapper = _make_wrapper(box_service) + wrapper.ap.box_service = box_service + query = _make_query() + query.variables['_sandbox_outbound_collected'] = True + chain = platform_message.MessageChain([]) + + await wrapper._append_outbound_attachments(query, chain) + + box_service.collect_outbound_attachments.assert_not_awaited() + assert len(chain) == 0 + + +@pytest.mark.asyncio +async def test_append_outbound_attachments_noop_without_box_service(): + wrapper = _make_wrapper(box_service=None) + wrapper.ap.box_service = None + query = _make_query() + chain = platform_message.MessageChain([]) + + await wrapper._append_outbound_attachments(query, chain) + assert len(chain) == 0 + # not marked collected, since service is unavailable + assert '_sandbox_outbound_collected' not in query.variables + + +@pytest.mark.asyncio +async def test_append_outbound_attachments_swallows_collection_error(): + box_service = SimpleNamespace( + available=True, + collect_outbound_attachments=AsyncMock(side_effect=RuntimeError('boom')), + ) + wrapper = _make_wrapper(box_service) + wrapper.ap.box_service = box_service + query = _make_query() + chain = platform_message.MessageChain([]) + + # must not raise + await wrapper._append_outbound_attachments(query, chain) + assert len(chain) == 0 + wrapper.ap.logger.warning.assert_called_once() diff --git a/tests/unit_tests/platform/test_websocket_adapter_attachments.py b/tests/unit_tests/platform/test_websocket_adapter_attachments.py new file mode 100644 index 00000000..18138383 --- /dev/null +++ b/tests/unit_tests/platform/test_websocket_adapter_attachments.py @@ -0,0 +1,92 @@ +"""Unit tests for WebSocketAdapter._process_image_components. + +The web debug client uploads Image / Voice / File components carrying a storage +key in ``path``. This helper resolves each to a base64 data URI (so multimodal +LLM input and the Box sandbox inbox have usable bytes), then deletes the +consumed storage object and clears ``path``. Covers mimetype selection per +type and graceful error handling. +""" + +from __future__ import annotations + +import base64 +from unittest.mock import AsyncMock, Mock + +import pytest + +from langbot.pkg.platform.sources.websocket_adapter import WebSocketAdapter + + +def _make_adapter(load_return=b'hello', load_side_effect=None): + provider = Mock() + provider.load = AsyncMock(return_value=load_return, side_effect=load_side_effect) + provider.delete = AsyncMock() + ap = Mock() + ap.storage_mgr.storage_provider = provider + logger = Mock() + logger.error = AsyncMock() + # WebSocketAdapter is a pydantic model; bypass full __init__/validation. + adapter = WebSocketAdapter.model_construct(ap=ap, logger=logger) + return adapter, provider + + +@pytest.mark.asyncio +async def test_image_jpeg_mimetype_and_cleanup(): + adapter, provider = _make_adapter(load_return=b'\xff\xd8\xff') + chain = [{'type': 'Image', 'path': 'storage://abc/photo.jpg'}] + + await adapter._process_image_components(chain) + + expected_b64 = base64.b64encode(b'\xff\xd8\xff').decode('utf-8') + assert chain[0]['base64'] == f'data:image/jpeg;base64,{expected_b64}' + assert chain[0]['path'] == '' # consumed + provider.delete.assert_awaited_once_with('storage://abc/photo.jpg') + + +@pytest.mark.asyncio +async def test_image_defaults_to_png(): + adapter, _ = _make_adapter() + chain = [{'type': 'Image', 'path': 'storage://abc/blob'}] + await adapter._process_image_components(chain) + assert chain[0]['base64'].startswith('data:image/png;base64,') + + +@pytest.mark.asyncio +async def test_voice_uses_guessed_or_wav_mimetype(): + adapter, _ = _make_adapter() + chain = [{'type': 'Voice', 'path': 'storage://abc/clip.wav'}] + await adapter._process_image_components(chain) + assert chain[0]['base64'].startswith('data:audio/') + + +@pytest.mark.asyncio +async def test_file_uses_octet_stream_fallback(): + adapter, _ = _make_adapter() + chain = [{'type': 'File', 'path': 'storage://abc/unknownblob'}] + await adapter._process_image_components(chain) + assert chain[0]['base64'].startswith('data:application/octet-stream;base64,') + + +@pytest.mark.asyncio +async def test_skips_components_without_path_or_unknown_type(): + adapter, provider = _make_adapter() + chain = [ + {'type': 'Image', 'path': ''}, # no path + {'type': 'Plain', 'path': 'storage://abc/x'}, # not a file component + {'type': 'At', 'target': '123'}, # no path key at all + ] + await adapter._process_image_components(chain) + provider.load.assert_not_awaited() + assert 'base64' not in chain[0] + assert 'base64' not in chain[1] + + +@pytest.mark.asyncio +async def test_load_failure_is_logged_not_raised(): + adapter, _ = _make_adapter(load_side_effect=RuntimeError('storage down')) + chain = [{'type': 'File', 'path': 'storage://abc/doc.pdf'}] + + # must not raise + await adapter._process_image_components(chain) + assert 'base64' not in chain[0] + adapter.logger.error.assert_awaited_once() diff --git a/tests/unit_tests/provider/test_litellm_convert_messages.py b/tests/unit_tests/provider/test_litellm_convert_messages.py new file mode 100644 index 00000000..87ad2e02 --- /dev/null +++ b/tests/unit_tests/provider/test_litellm_convert_messages.py @@ -0,0 +1,93 @@ +"""Unit tests for LiteLLMRequester._convert_messages. + +Focus: the content-part normalization that (a) converts image_base64 parts to +the OpenAI image_url shape and (b) drops non-image file parts (file_base64 / +file_url) which OpenAI-compatible chat models reject. The latter is essential +for Voice/File attachments — including ones replayed from conversation history — +since the agent consumes their bytes via the sandbox, not the model payload. +""" + +import langbot_plugin.api.entities.builtin.provider.message as provider_message + +from langbot.pkg.provider.modelmgr.requesters.litellmchat import LiteLLMRequester + + +def _make_requester() -> LiteLLMRequester: + # _convert_messages does not touch instance config, so bypass __init__. + return LiteLLMRequester.__new__(LiteLLMRequester) + + +def test_convert_messages_drops_file_base64_part(): + req = _make_requester() + msg = provider_message.Message( + role='user', + content=[ + provider_message.ContentElement.from_text('analyze this audio'), + provider_message.ContentElement.from_file_base64('data:audio/wav;base64,AAAA', 'voice.wav'), + ], + ) + out = req._convert_messages([msg]) + parts = out[0]['content'] + types = [p.get('type') for p in parts] + assert 'file_base64' not in types + assert types == ['text'] + assert parts[0]['text'] == 'analyze this audio' + + +def test_convert_messages_drops_file_url_part(): + req = _make_requester() + msg = provider_message.Message( + role='user', + content=[ + provider_message.ContentElement.from_text('here is a doc'), + provider_message.ContentElement.from_file_url('http://example.com/report.xlsx', 'report.xlsx'), + ], + ) + out = req._convert_messages([msg]) + types = [p.get('type') for p in out[0]['content']] + assert types == ['text'] + + +def test_convert_messages_keeps_image_and_converts_to_image_url(): + req = _make_requester() + msg = provider_message.Message( + role='user', + content=[ + provider_message.ContentElement.from_text('look'), + provider_message.ContentElement.from_image_base64('data:image/png;base64,AAAA'), + ], + ) + out = req._convert_messages([msg]) + parts = out[0]['content'] + types = [p.get('type') for p in parts] + # image is preserved and reshaped to the OpenAI image_url form + assert types == ['text', 'image_url'] + img_part = parts[1] + assert img_part['image_url'] == {'url': 'data:image/png;base64,AAAA'} + assert 'image_base64' not in img_part + + +def test_convert_messages_mixed_history_strips_only_files(): + req = _make_requester() + # Simulate replayed history: an old voice turn + a current text turn. + history_voice = provider_message.Message( + role='user', + content=[ + provider_message.ContentElement.from_text('old audio turn'), + provider_message.ContentElement.from_file_base64('data:audio/wav;base64,BBBB', 'voice.wav'), + ], + ) + current = provider_message.Message( + role='user', + content=[provider_message.ContentElement.from_text('now do the csv')], + ) + out = req._convert_messages([history_voice, current]) + assert [p.get('type') for p in out[0]['content']] == ['text'] + assert [p.get('type') for p in out[1]['content']] == ['text'] + + +def test_convert_messages_plain_string_content_untouched(): + req = _make_requester() + msg = provider_message.Message(role='user', content='just text') + out = req._convert_messages([msg]) + assert out[0]['content'] == 'just text' diff --git a/tests/unit_tests/provider/test_localagent_inbound_attachments.py b/tests/unit_tests/provider/test_localagent_inbound_attachments.py new file mode 100644 index 00000000..bc7352f1 --- /dev/null +++ b/tests/unit_tests/provider/test_localagent_inbound_attachments.py @@ -0,0 +1,146 @@ +"""Unit tests for LocalAgentRunner._inject_inbound_attachments. + +Covers the user -> sandbox attachment path added for the Box attachment +round-trip: + +* materialized descriptors are stashed on the query and described to the model + via an appended text note (in-sandbox paths + outbox convention); +* non-image file parts (file_base64 / file_url) are stripped from the user + message content because OpenAI-compatible chat models reject them, while + image and text parts are kept for vision models; +* the helper is a no-op when the box service is unavailable or yields nothing, + and never raises into the chat turn on materialization failure. +""" + +from __future__ import annotations + +from types import SimpleNamespace +from unittest.mock import AsyncMock, Mock + +import pytest + +import langbot_plugin.api.entities.builtin.provider.message as provider_message + +from langbot.pkg.provider.runners.localagent import LocalAgentRunner + + +def _make_runner(box_service) -> LocalAgentRunner: + runner = LocalAgentRunner.__new__(LocalAgentRunner) + runner.ap = SimpleNamespace(logger=Mock(), box_service=box_service) + return runner + + +def _make_query(): + return SimpleNamespace(variables={}, query_id='q-123') + + +def _box_service(attachments): + svc = SimpleNamespace( + available=True, + OUTBOX_MOUNT_DIR='/outbox', + materialize_inbound_attachments=AsyncMock(return_value=attachments), + ) + return svc + + +@pytest.mark.asyncio +async def test_inject_strips_file_parts_and_appends_note(): + box = _box_service([{'type': 'Voice', 'path': '/inbox/q-123/voice.wav', 'size': 176000}]) + runner = _make_runner(box) + query = _make_query() + user_message = provider_message.Message( + role='user', + content=[ + provider_message.ContentElement.from_text('transcribe this'), + provider_message.ContentElement.from_file_base64('data:audio/wav;base64,AAAA', 'voice.wav'), + ], + ) + + await runner._inject_inbound_attachments(query, user_message) + + types = [getattr(ce, 'type', None) for ce in user_message.content] + # file_base64 dropped; text kept; sandbox-path note appended as text + assert 'file_base64' not in types + assert types.count('text') == 2 + note = user_message.content[-1].text + assert '/inbox/q-123/voice.wav' in note + assert '/outbox/q-123' in note + # descriptors stashed for downstream stages + assert query.variables['_sandbox_inbound_attachments'] == box.materialize_inbound_attachments.return_value + + +@pytest.mark.asyncio +async def test_inject_keeps_image_parts(): + box = _box_service([{'type': 'Image', 'path': '/inbox/q-123/pic.png', 'size': 1234}]) + runner = _make_runner(box) + query = _make_query() + user_message = provider_message.Message( + role='user', + content=[ + provider_message.ContentElement.from_text('what is this'), + provider_message.ContentElement.from_image_base64('data:image/png;base64,iVBORw0K'), + ], + ) + + await runner._inject_inbound_attachments(query, user_message) + + types = [getattr(ce, 'type', None) for ce in user_message.content] + assert 'image_base64' in types # vision part preserved + assert types[-1] == 'text' # note appended last + + +@pytest.mark.asyncio +async def test_inject_promotes_string_content_to_list_with_note(): + box = _box_service([{'type': 'File', 'path': '/inbox/q-123/data.csv', 'size': 42}]) + runner = _make_runner(box) + query = _make_query() + user_message = provider_message.Message(role='user', content='clean this csv') + + await runner._inject_inbound_attachments(query, user_message) + + assert isinstance(user_message.content, list) + assert [getattr(ce, 'type', None) for ce in user_message.content] == ['text', 'text'] + assert user_message.content[0].text == 'clean this csv' + assert '/inbox/q-123/data.csv' in user_message.content[1].text + + +@pytest.mark.asyncio +async def test_inject_noop_without_box_service(): + runner = _make_runner(box_service=None) + query = _make_query() + user_message = provider_message.Message(role='user', content='hello') + + await runner._inject_inbound_attachments(query, user_message) + + assert user_message.content == 'hello' + assert '_sandbox_inbound_attachments' not in query.variables + + +@pytest.mark.asyncio +async def test_inject_noop_when_no_attachments(): + box = _box_service([]) + runner = _make_runner(box) + query = _make_query() + user_message = provider_message.Message(role='user', content='hello') + + await runner._inject_inbound_attachments(query, user_message) + + assert user_message.content == 'hello' + assert '_sandbox_inbound_attachments' not in query.variables + + +@pytest.mark.asyncio +async def test_inject_swallows_materialization_error(): + box = SimpleNamespace( + available=True, + OUTBOX_MOUNT_DIR='/outbox', + materialize_inbound_attachments=AsyncMock(side_effect=RuntimeError('disk full')), + ) + runner = _make_runner(box) + query = _make_query() + user_message = provider_message.Message(role='user', content='hello') + + # must not raise + await runner._inject_inbound_attachments(query, user_message) + assert user_message.content == 'hello' + runner.ap.logger.warning.assert_called_once() diff --git a/web/src/app/home/pipelines/components/debug-dialog/DebugDialog.tsx b/web/src/app/home/pipelines/components/debug-dialog/DebugDialog.tsx index 318dcc7b..b45e87dd 100644 --- a/web/src/app/home/pipelines/components/debug-dialog/DebugDialog.tsx +++ b/web/src/app/home/pipelines/components/debug-dialog/DebugDialog.tsx @@ -15,6 +15,7 @@ import { At, Quote, Voice, + File as FileComponent, Source, } from '@/app/infra/entities/message'; import { toast } from 'sonner'; @@ -64,7 +65,12 @@ export default function DebugDialog({ const [isHovering, setIsHovering] = useState(false); const [isConnected, setIsConnected] = useState(false); const [selectedImages, setSelectedImages] = useState< - Array<{ file: File; preview: string; fileKey?: string }> + Array<{ + file: File; + preview: string; + fileKey?: string; + kind: 'image' | 'voice' | 'file'; + }> >([]); const [isUploading, setIsUploading] = useState(false); const [previewImageUrl, setPreviewImageUrl] = useState(''); @@ -292,23 +298,38 @@ export default function DebugDialog({ const files = e.target.files; if (!files || files.length === 0) return; - const newImages: Array<{ file: File; preview: string }> = []; + const newImages: Array<{ + file: File; + preview: string; + kind: 'image' | 'voice' | 'file'; + }> = []; for (let i = 0; i < files.length; i++) { const file = files[i]; if (file.type.startsWith('image/')) { - const preview = URL.createObjectURL(file); - newImages.push({ file, preview }); + newImages.push({ + file, + preview: URL.createObjectURL(file), + kind: 'image', + }); + } else if (file.type.startsWith('audio/')) { + newImages.push({ file, preview: '', kind: 'voice' }); + } else { + newImages.push({ file, preview: '', kind: 'file' }); } } setSelectedImages((prev) => [...prev, ...newImages]); + // reset the input so selecting the same file again re-triggers onChange + e.target.value = ''; }; const handleRemoveImage = (index: number) => { setSelectedImages((prev) => { const newImages = [...prev]; - URL.revokeObjectURL(newImages[index].preview); + if (newImages[index].preview) { + URL.revokeObjectURL(newImages[index].preview); + } newImages.splice(index, 1); return newImages; }); @@ -372,19 +393,33 @@ export default function DebugDialog({ }); } - // Upload images and add to message chain - for (const image of selectedImages) { + // Upload attachments and add to message chain + for (const attachment of selectedImages) { try { - const result = await httpClient.uploadWebSocketImage( - selectedPipelineId, - image.file, - ); - messageChain.push({ - type: 'Image', - path: result.file_key, - }); + if (attachment.kind === 'image') { + const result = await httpClient.uploadWebSocketImage( + selectedPipelineId, + attachment.file, + ); + messageChain.push({ + type: 'Image', + path: result.file_key, + }); + } else { + // Voice / File go through the generic document upload endpoint, + // which returns a storage key the backend resolves into the + // sandbox inbox just like images. + const result = await httpClient.uploadDocumentFile(attachment.file); + messageChain.push({ + type: attachment.kind === 'voice' ? 'Voice' : 'File', + path: result.file_id, + ...(attachment.kind === 'file' + ? { name: attachment.file.name } + : {}), + }); + } } catch (error) { - console.error('Image upload failed:', error); + console.error('Attachment upload failed:', error); toast.error(t('pipelines.debugDialog.imageUploadFailed')); } } @@ -393,7 +428,9 @@ export default function DebugDialog({ setInputValue(''); setHasAt(false); setQuotedMessage(null); - selectedImages.forEach((img) => URL.revokeObjectURL(img.preview)); + selectedImages.forEach((img) => { + if (img.preview) URL.revokeObjectURL(img.preview); + }); setSelectedImages([]); // Send message via WebSocket @@ -460,13 +497,29 @@ export default function DebugDialog({ } case 'File': { - const file = component as MessageChainComponent & { name?: string }; + const file = component as FileComponent; + const downloadHref = file.base64 + ? file.base64.startsWith('data:') + ? file.base64 + : `data:application/octet-stream;base64,${file.base64}` + : file.url || ''; + const fileName = file.name || 'Unknown'; return (
- - [{t('pipelines.debugDialog.file')}] {file.name || 'Unknown'} - + {downloadHref ? ( + + [{t('pipelines.debugDialog.file')}] {fileName} + + ) : ( + + [{t('pipelines.debugDialog.file')}] {fileName} + + )}
); } @@ -844,17 +897,30 @@ export default function DebugDialog({ )} - {/* Image preview area */} + {/* Attachment preview area */} {selectedImages.length > 0 && (
{selectedImages.map((image, index) => (
- {`preview-${index}`} + {image.kind === 'image' ? ( + {`preview-${index}`} + ) : ( +
+ {image.kind === 'voice' ? ( + + ) : ( + + )} + + {image.file.name} + +
+ )}