Compare commits

...

12 Commits

Author SHA1 Message Date
huanghuoguoguo e5207d9131 fix(litellmchat): preserve provider_specific_fields for Gemini thought_signature
Update _normalize_stream_tool_calls to preserve provider_specific_fields
(including thought_signature) from streaming tool call chunks. Also preserve
provider_specific_fields from delta in invoke_llm_stream.

This ensures Gemini's thought_signature is round-tripped correctly:
1. LiteLLM extracts thought_signature from Gemini response
2. It's preserved in Message/ToolCall entities (via SDK changes)
3. _convert_messages includes it in the next request

Also add unit tests for provider_specific_fields round-tripping.

Fixes: langbot-app/LangBot#1899
2026-06-19 23:16:05 +08:00
huanghuoguoguo 4538fca901 chore(deps): bump langbot-plugin to 0.4.5 (#2266)
Bumps the pinned langbot-plugin SDK from 0.4.4 to 0.4.5, which adds
`provider_specific_fields` to the Message/ToolCall entities. This is the
SDK dependency required by the Gemini thought_signature fix (#1899, #2265).

The lock update is scoped to langbot-plugin only. pylibseekdb is deliberately
held at 1.1.0: a free re-resolve drifts it to 1.3.0 (pyseekdb==1.1.0.post3
has no upper bound on it), which is out of scope here and should be handled
in a separate dependency-upgrade PR.
2026-06-19 23:13:56 +08:00
Junyan Chin b02c9517f6 feat(modelmgr): split Moonshot/Kimi into Global and China presets (#2264)
Adding a Kimi/Moonshot provider failed model scanning out of the box for
CN-region API keys: the single preset defaulted its base URL to the
global endpoint `https://api.moonshot.ai/v1`, but CN-issued keys are only
valid against `https://api.moonshot.cn/v1`, so scanning returned
`401 Invalid Authentication`. Flipping the default would just move the
breakage to international keys, since the base_url field is plain
free-text and either region is equally common.

Instead, offer two clearly labelled presets, mirroring how the Lark
adapter exposes feishu.cn vs larksuite.com:

- `moonshot-chat-completions`   -> "Moonshot / Kimi (Global · api.moonshot.ai)"
- `moonshot-cn-chat-completions` -> "Moonshot / Kimi (China · api.moonshot.cn)"

The existing component name is kept unchanged so provider rows already in
the DB keep resolving; only its display label is clarified. Both presets
keep base_url as a free-text field, so users behind a proxy / one-api
gateway can still enter a custom endpoint. Both carry the same `kimi`
search aliases so either shows up when searching.

Fixes #2232
2026-06-19 18:39:58 +08:00
RockChinQ 511b5a7bf4 style(web): shrink market tag filter row (height + font)
Make the quick-filter tag pills more compact: h-8 -> h-7, default text
-> text-xs with px-2.5, gap-2 -> gap-1.5, and the selected-X icon
h-3.5 -> h-3. Keeps the single-row horizontal-scroll layout.
2026-06-19 06:20:17 -04:00
RockChinQ 65fbf4db59 style(web): keep market tag filter on a single horizontal-scroll row
With many category tags the quick-filter row used `sm:flex-wrap` on
desktop, so once tags overflowed the available width they wrapped onto a
second, center-aligned line — leaving an orphan tag floating under the
row (looked broken and only gets worse as more tags are added).

Make the row a single, never-wrapping line that scrolls horizontally at
every breakpoint, left-aligned, with the scrollbar hidden and a subtle
right-edge fade to signal there's more to scroll. Adds a reusable
`.scrollbar-hide` utility to global.css.
2026-06-19 06:15:31 -04:00
Junyan Chin 3d5b70cc5d fix(modelmgr): keep id-less streamed tool calls (Ollama) (#2262)
Ollama's OpenAI-compatible streaming endpoint emits a tool-call delta
carrying an `index` and a `function` payload but never an OpenAI-style
`id`. `_normalize_stream_tool_calls` dropped any tool call without an
`id`, so a tool-only turn yielded neither content nor a tool call: the
stream "completed" with 0 chars, the tool never ran, and the chat
appeared stuck. Models on standard OpenAI APIs (e.g. SiliconFlow) were
unaffected because they always send a `call_...` id.

Synthesize a stable per-index id (`call_<index>`) when the provider
omits one but a function name is present. Providers that do send ids
keep theirs, and parallel id-less calls keep distinct ids.

Adds regression tests for the single and multi id-less tool-call cases.

Fixes #2261
2026-06-19 18:07:25 +08:00
RockChinQ 83623f6afe fix(box): always advertise outbox path in exec guidance
Outbound attachment collection (pipeline wrapper) runs on every turn
regardless of inbound files, but the agent was only told the per-query
outbox path inside the inbound-attachment note in LocalAgentRunner. So on
pure-generation turns (e.g. "generate a QR code"/chart/mermaid where the
user sent no file), the agent never learned the outbox path or the
query_id, wrote the generated file nowhere deliverable, and it was
silently dropped.

Move the outbox instruction into BoxService.get_system_guidance(query_id),
which is injected as a system message on every turn the exec tool is
available. The inbound note keeps its own (now redundant but harmless)
outbox line. Add unit tests asserting the outbox path is present with a
query_id and absent without one.
2026-06-19 04:09:45 -04:00
huanghuoguoguo a020ca680f Harden agent runner tool runtimes (#2247)
* fix(tools): harden agent runner tool runtimes

* fix(tools): bootstrap Python workspaces with available interpreter

* fix(tools): clear stale Python workspace env locks

* fix(tools): decouple runtime from agent runner

* test(tools): cover runtime hardening edge cases

* fix(tools): support binary workspace file chunks
2026-06-18 14:06:04 +00:00
huanghuoguoguo 3a2edf9753 fix(survey): prevent option controls from submitting forms (#2249) 2026-06-18 22:01:10 +08:00
huanghuoguoguo 5fe63ce822 Bound Space model sync startup wait (#2248)
* fix(modelmgr): bound Space model sync startup wait

* style(provider): format model manager
2026-06-18 22:00:33 +08:00
Junyan Chin 6b15a732e4 fix(box): purge leftover inbox/outbox on startup; clear root-owned outbox via exec (#2259)
The agent attachment outbox is written by the sandbox container as root over
the bind-mount, so the LangBot host process (non-root) cannot rmtree those
files — the host-side delete failed silently and stale files were re-collected
on a later turn that reused the same query_id (the query_id counter resets to 0
on every restart).

- BoxService.initialize now purges leftover inbox/outbox after the runtime is
  available: host rmtree first, then an in-sandbox 'rm -rf' via exec for any
  root-owned survivors.
- _clear_outbox now falls back to exec when the host delete leaves root-owned
  files behind, instead of silently failing.
- collect_outbound_attachments clears the outbox unconditionally (even on an
  empty collection) so a reused query_id never inherits stale files.
- Tests: startup purge (host-owned + root-owned exec fallback + no-workspace
  noop) and empty-collection-still-clears.
2026-06-18 21:59:48 +08:00
Junyan Chin a1e6eccdeb feat(box): bidirectional attachment transfer for sandbox (#2257)
* feat(box): bidirectional attachment transfer for sandbox

Materialize inbound attachments into the sandbox workspace so agents can
process user-sent files, and collect agent-produced files from the outbox
to attach them back to the reply.

- box(service): add materialize_inbound_attachments / collect_outbound
  attachments. Prefer direct host-filesystem read/write on the bind-mounted
  workspace (no size limit), falling back to chunked exec only for
  non-shared backends (e2b/remote). Clear per-query inbox/outbox dirs at
  turn start to avoid query_id-reuse collisions.
- provider(localagent): inject inbound attachment descriptors into the
  sandbox and append a system note telling the agent the inbox/outbox paths.
- pipeline(wrapper): collect outbox files on the final stream chunk and
  append them as attachment components to the response chain.
- web(debug-dialog): render File components with a download link when
  base64/url is present; add base64/path fields to the File entity.
- tests: cover inbound/outbound, large-file transfer without truncation,
  and stale-dir clearing (86 passing).

* feat(box): support voice/file attachment round-trip end-to-end

Extends the bidirectional attachment transfer to audio and arbitrary files
through the real webchat UI, and fixes the model-payload errors that
non-image attachments triggered.

- platform(websocket_adapter): resolve Voice/File component storage keys to
  base64 (previously only Image), so audio/documents reach the sandbox inbox.
- web(debug-dialog): accept audio/* and any file in the uploader (was
  image-only), classify by mimetype, upload Voice/File via the documents
  endpoint, and render non-image staged attachments as a chip.
- provider(litellmchat): drop non-image file parts (file_base64 / file_url)
  when building the OpenAI/LiteLLM payload. These come from Voice/File
  attachments — including ones replayed from conversation history — and the
  agent reads their bytes from the sandbox, not the model. Without this the
  provider rejects the request: 'invalid content type=file_base64'.
- provider(localagent): also strip those parts from the current user message
  alongside the sandbox-path note (model-facing clarity; the requester is the
  real safety net for history).
- tests: cover the requester strip/keep behavior (file dropped, image kept and
  reshaped to image_url, mixed history, plain-string content).

* test(box): cover inbound/outbound attachment helpers; fix ruff format

- ruff format localagent.py (CI ruff format --check was failing)
- add unit tests for ResponseWrapper outbound-attachment helpers (wrapper.py 78%->98%)
- add unit tests for LocalAgentRunner._inject_inbound_attachments
- add unit tests for WebSocketAdapter._process_image_components (0%->covered)

Lifts PR patch coverage from 68.97% to ~88% (>75% target).
2026-06-18 21:40:31 +08:00
33 changed files with 3110 additions and 276 deletions
+1 -1
View File
@@ -70,7 +70,7 @@ dependencies = [
"chromadb>=1.0.0,<2.0.0",
"qdrant-client (>=1.15.1,<2.0.0)",
"pyseekdb==1.1.0.post3",
"langbot-plugin==0.4.4",
"langbot-plugin==0.4.5",
"asyncpg>=0.30.0",
"line-bot-sdk>=3.19.0",
"matrix-nio>=0.25.2",
+518 -1
View File
@@ -105,6 +105,7 @@ class BoxService:
f'LangBot Box runtime initialized: profile={self.profile.name} '
f'default_workspace={self.default_workspace or "(none)"}'
)
await self._purge_attachment_dirs()
except Exception as exc:
self.ap.logger.warning(f'LangBot Box runtime unavailable, sandbox features disabled: {exc}')
self._available = False
@@ -335,6 +336,507 @@ class BoxService:
return await self.execute_spec_payload(spec_payload, query)
# ── Attachment passthrough (inbound / outbound) ──────────────────
#
# IM/webchat attachments (images, voices, files) reach the LLM as
# multimodal content, but historically never landed on the sandbox
# filesystem, so the agent's exec/read/write tools could not operate on
# them. Conversely, files the agent produced inside the sandbox were
# never surfaced back to the user. These two helpers close both gaps:
#
# inbound : message_chain attachments -> /workspace/inbox/<query_id>/
# outbound : /workspace/outbox/<query_id>/ -> reply MessageChain
#
# Transfer prefers DIRECT HOST FILESYSTEM access to the bind-mounted
# workspace (default_workspace on the host maps to /workspace inside the
# container), which has no size limit. This covers the local docker /
# nsjail / stdio backends. For backends where the workspace is NOT visible
# on the LangBot host (E2B, an external remote runtime.endpoint), it falls
# back to a base64-through-exec round-trip. The exec channel can only move
# small files reliably — the docker backend passes the command as a single
# argv (ARG_MAX) and exec stdout is truncated by output_limit_chars — so
# the host path is strongly preferred and used whenever available.
INBOX_MOUNT_DIR = '/workspace/inbox'
OUTBOX_MOUNT_DIR = '/workspace/outbox'
INBOX_SUBDIR = 'inbox'
OUTBOX_SUBDIR = 'outbox'
# Hard cap on a single attachment. The HTTP upload endpoints already cap
# uploads at 10MiB; keep parity.
_ATTACHMENT_MAX_BYTES = 10 * _MIB
# Conservative cap for the exec FALLBACK path only (ARG_MAX / stdout
# truncation). The host-filesystem path has no such limit.
_EXEC_FALLBACK_MAX_BYTES = 256 * 1024
def _host_query_dir(self, subdir: str, query_id) -> str | None:
"""Host path for ``/workspace/<subdir>/<query_id>`` when LangBot can
access the bind-mounted workspace directly, else ``None``.
``default_workspace`` is the host directory bind-mounted to
``/workspace`` for the local docker/nsjail backends and shared
outright in stdio mode, so a file written there by LangBot is visible
to the sandbox (and vice-versa). It is ``None`` / not a local dir for
E2B and remote runtimes, where we must fall back to the exec channel.
"""
root = self.default_workspace
if not root or not os.path.isdir(root):
return None
return os.path.join(root, subdir, str(query_id))
async def _purge_attachment_dirs(self) -> None:
"""Remove leftover inbox/outbox directories on startup.
``query_id`` is a process-local counter (see pipeline query pool) that
resets to 0 on every restart, so per-query attachment directories from
a previous process would otherwise be silently reused — leaking a prior
run's inbound files and re-sending stale outbound files.
Outbox files are written by the sandbox **container**, which runs as
root over the bind-mount, so the LangBot host process (a non-root user)
cannot ``rmtree`` them. We therefore try a host-side delete first (fast,
works for host-owned inbox files) and, for anything that survives,
delete from *inside* the sandbox via exec where the container's root can
remove its own files. Best-effort: never block startup.
"""
root = self.default_workspace
if not root or not os.path.isdir(root):
return
import shutil
host_survivors: list[str] = []
def _host_purge() -> list[str]:
survivors: list[str] = []
for subdir in (self.INBOX_SUBDIR, self.OUTBOX_SUBDIR):
path = os.path.join(root, subdir)
if not os.path.isdir(path):
continue
shutil.rmtree(path, ignore_errors=True)
if os.path.exists(path):
survivors.append(subdir)
return survivors
try:
host_survivors = await asyncio.to_thread(_host_purge)
except Exception as exc: # pragma: no cover - defensive
self.ap.logger.warning(f'Host-side purge of sandbox attachment dirs failed: {exc}')
host_survivors = [self.INBOX_SUBDIR, self.OUTBOX_SUBDIR]
if not host_survivors:
self.ap.logger.info('Purged leftover sandbox attachment dirs from a previous process.')
return
# Root-owned leftovers (container output): delete from inside the box.
targets = ' '.join(f'/workspace/{sub}' for sub in host_survivors)
try:
spec = self.build_spec({'cmd': f'rm -rf {targets}', 'session_id': '__startup_purge__', 'timeout_sec': 30})
await self.client.execute(spec)
self.ap.logger.info(
f'Purged root-owned leftover sandbox attachment dirs via sandbox exec: {host_survivors}'
)
except Exception as exc:
self.ap.logger.warning(
f'Failed to purge root-owned sandbox attachment dirs {host_survivors} via exec: {exc}'
)
@staticmethod
def _sanitize_attachment_name(name: str, fallback: str) -> str:
"""Reduce an arbitrary attachment name to a safe basename.
Strips directory separators and parent refs so a crafted file name
can never escape the inbox/outbox directory.
"""
base = os.path.basename(str(name or '').replace('\\', '/').strip())
base = base.lstrip('.') or ''
# Drop anything that is not a conservative filename charset.
cleaned = ''.join(c for c in base if c.isalnum() or c in ('.', '_', '-', ' ')).strip()
cleaned = cleaned.replace(' ', '_')
return cleaned or fallback
@staticmethod
async def _component_to_bytes(component) -> tuple[bytes, str] | None:
"""Best-effort extraction of (bytes, mime) from a platform component.
Handles base64, http(s) url and local path sources. Returns None when
no payload can be resolved.
"""
import base64 as _b64
b64 = getattr(component, 'base64', None)
if b64:
data = b64
mime = 'application/octet-stream'
if isinstance(data, str) and data.startswith('data:'):
split_index = data.find(';base64,')
if split_index != -1:
mime = data[5:split_index]
data = data[split_index + 8 :]
try:
return _b64.b64decode(data), mime
except Exception:
return None
url = getattr(component, 'url', None)
if url:
try:
import httpx
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.get(url)
resp.raise_for_status()
return resp.content, resp.headers.get('Content-Type', 'application/octet-stream')
except Exception:
return None
path = getattr(component, 'path', None)
if path:
try:
import aiofiles
async with aiofiles.open(path, 'rb') as f:
return await f.read(), 'application/octet-stream'
except Exception:
return None
return None
async def _write_files_into_sandbox(
self,
query: pipeline_query.Query,
subdir: str,
target_mount_dir: str,
files: list[tuple[str, bytes]],
) -> list[str]:
"""Write *files* (name, bytes) into the per-query directory.
Prefers a direct host-filesystem write to the bind-mounted workspace
(no size limit). Falls back to a base64-through-exec round-trip only
when the workspace is not visible on the LangBot host (E2B / remote).
Returns the list of in-sandbox paths actually written.
"""
if not files:
return []
host_dir = self._host_query_dir(subdir, query.query_id)
if host_dir is not None:
return await asyncio.to_thread(self._write_files_host, host_dir, target_mount_dir, files)
return await self._write_files_via_exec(query, target_mount_dir, files)
def _write_files_host(
self,
host_dir: str,
target_mount_dir: str,
files: list[tuple[str, bytes]],
) -> list[str]:
"""Write attachments straight onto the bind-mounted host directory.
Recreates the per-query directory from scratch so a reused query_id
(the webchat session uses small sequential ids) never inherits stale
files from an earlier turn.
"""
import shutil
shutil.rmtree(host_dir, ignore_errors=True)
os.makedirs(host_dir, exist_ok=True)
written: list[str] = []
for name, data in files:
with open(os.path.join(host_dir, name), 'wb') as fh:
fh.write(data)
written.append(f'{target_mount_dir}/{name}')
return written
async def _write_files_via_exec(
self,
query: pipeline_query.Query,
target_dir: str,
files: list[tuple[str, bytes]],
) -> list[str]:
"""Fallback: ship files into the sandbox over the exec channel.
Only used for backends without host-filesystem access (E2B / remote).
Each file is base64-decoded inside the sandbox. Files larger than the
conservative exec cap are skipped (ARG_MAX / stdout limits).
"""
import base64 as _b64
import json as _json
manifest = []
for name, data in files:
if len(data) > self._EXEC_FALLBACK_MAX_BYTES:
self.ap.logger.warning(
f'Attachment "{name}" ({len(data)} bytes) exceeds the exec-channel '
f'fallback limit ({self._EXEC_FALLBACK_MAX_BYTES} bytes); skipping. '
f'Configure a host-shared workspace to transfer large files.'
)
continue
manifest.append({'name': name, 'b64': _b64.b64encode(data).decode('ascii')})
if not manifest:
return []
manifest_b64 = _b64.b64encode(_json.dumps(manifest).encode('utf-8')).decode('ascii')
script = (
'import base64, json, os, shutil\n'
f'target = {target_dir!r}\n'
'shutil.rmtree(target, ignore_errors=True)\n'
'os.makedirs(target, exist_ok=True)\n'
f'manifest = json.loads(base64.b64decode({manifest_b64!r}))\n'
'written = []\n'
'for item in manifest:\n'
" p = os.path.join(target, item['name'])\n"
" with open(p, 'wb') as f:\n"
" f.write(base64.b64decode(item['b64']))\n"
' written.append(p)\n'
'print(json.dumps(written))\n'
)
result = await self.execute_tool(
{'command': f"python3 - <<'LBPY'\n{script}\nLBPY", 'timeout_sec': 120},
query,
)
if not result.get('ok'):
self.ap.logger.warning(
f'Failed to write inbound attachments into sandbox via exec: '
f'query_id={query.query_id} stderr={result.get("stderr", "")[:200]}'
)
return []
try:
return _json.loads(str(result.get('stdout') or '').strip().splitlines()[-1])
except Exception:
return []
async def materialize_inbound_attachments(self, query: pipeline_query.Query) -> list[dict]:
"""Persist message-chain attachments into the sandbox inbox.
Returns a list of ``{path, name, type, size}`` describing what was
written, so the runner can tell the LLM the exact in-sandbox paths.
Returns ``[]`` when sandbox is unavailable or there are no attachments.
"""
if not self._available:
return []
import langbot_plugin.api.entities.builtin.platform.message as platform_message
message_chain = getattr(query, 'message_chain', None)
if not message_chain:
return []
type_map = [
(platform_message.Image, 'Image', 'image', 'png'),
(platform_message.Voice, 'Voice', 'voice', 'wav'),
(platform_message.File, 'File', 'file', 'bin'),
]
pending: list[tuple[str, bytes]] = []
descriptors: list[dict] = []
index = 0
for component in message_chain:
matched = None
for cls, kind, prefix, default_ext in type_map:
if isinstance(component, cls):
matched = (kind, prefix, default_ext)
break
if matched is None:
continue
kind, prefix, default_ext = matched
payload = await self._component_to_bytes(component)
if payload is None:
continue
data, _mime = payload
if not data or len(data) > self._ATTACHMENT_MAX_BYTES:
continue
index += 1
raw_name = getattr(component, 'name', None) or f'{prefix}_{index}.{default_ext}'
safe_name = self._sanitize_attachment_name(raw_name, f'{prefix}_{index}.{default_ext}')
pending.append((safe_name, data))
descriptors.append(
{
'name': safe_name,
'type': kind,
'size': len(data),
}
)
if not pending:
return []
target_dir = f'{self.INBOX_MOUNT_DIR}/{query.query_id}'
written = await self._write_files_into_sandbox(query, self.INBOX_SUBDIR, target_dir, pending)
written_basenames = {os.path.basename(p) for p in written}
result: list[dict] = []
for desc in descriptors:
if desc['name'] in written_basenames:
desc['path'] = f'{target_dir}/{desc["name"]}'
result.append(desc)
if result:
self.ap.logger.info(
f'Materialized {len(result)} inbound attachment(s) into sandbox: '
f'query_id={query.query_id} dir={target_dir}'
)
return result
async def collect_outbound_attachments(self, query: pipeline_query.Query) -> list[dict]:
"""Collect files the agent produced in the sandbox outbox.
Reads ``/workspace/outbox/<query_id>/`` (recursively) — directly from
the bind-mounted host directory when available (no size limit), else
via the exec channel — returns a list of ``{type, name, base64}``
ready to become platform message components, then clears the outbox so
a later turn in the same session does not re-send stale files. Returns
``[]`` when nothing was produced.
"""
if not self._available:
return []
host_dir = self._host_query_dir(self.OUTBOX_SUBDIR, query.query_id)
if host_dir is not None:
entries = await asyncio.to_thread(self._read_outbox_host, host_dir)
else:
entries = await self._read_outbox_via_exec(query)
attachments = self._classify_outbound_entries(entries)
# Always clear the per-query outbox after reading — even when nothing
# was collected — so a later turn that reuses the same query_id (the
# counter resets across restarts) never inherits stale files.
await self._clear_outbox(query, host_dir)
if attachments:
self.ap.logger.info(
f'Collected {len(attachments)} outbound attachment(s) from sandbox: query_id={query.query_id}'
)
return attachments
def _read_outbox_host(self, host_dir: str) -> list[dict]:
"""Read outbox files straight off the bind-mounted host directory."""
import base64 as _b64
entries: list[dict] = []
if not os.path.isdir(host_dir):
return entries
for root, _dirs, names in os.walk(host_dir):
for name in sorted(names):
path = os.path.join(root, name)
try:
if os.path.getsize(path) > self._ATTACHMENT_MAX_BYTES:
continue
with open(path, 'rb') as fh:
data = fh.read()
except OSError:
continue
rel = os.path.relpath(path, host_dir)
entries.append({'name': rel, 'b64': _b64.b64encode(data).decode('ascii')})
return entries
async def _read_outbox_via_exec(self, query: pipeline_query.Query) -> list[dict]:
"""Fallback: read the outbox over the exec channel (E2B / remote).
Note: exec stdout is truncated by ``output_limit_chars``, so this path
only reliably transfers small files. The host path is preferred.
"""
import json as _json
target_dir = f'{self.OUTBOX_MOUNT_DIR}/{query.query_id}'
max_bytes = self._EXEC_FALLBACK_MAX_BYTES
script = (
'import base64, json, os\n'
f'target = {target_dir!r}\n'
f'max_bytes = {max_bytes}\n'
'out = []\n'
'if os.path.isdir(target):\n'
' for root, _dirs, names in os.walk(target):\n'
' for n in sorted(names):\n'
' p = os.path.join(root, n)\n'
' try:\n'
' if os.path.getsize(p) > max_bytes:\n'
' continue\n'
" with open(p, 'rb') as f:\n"
' data = f.read()\n'
' except OSError:\n'
' continue\n'
' rel = os.path.relpath(p, target)\n'
" out.append({'name': rel, 'b64': base64.b64encode(data).decode('ascii')})\n"
'print(json.dumps(out))\n'
)
result = await self.execute_tool(
{'command': f"python3 - <<'LBPY'\n{script}\nLBPY", 'timeout_sec': 120},
query,
)
if not result.get('ok'):
return []
try:
return _json.loads(str(result.get('stdout') or '').strip().splitlines()[-1])
except Exception:
return []
async def _clear_outbox(self, query: pipeline_query.Query, host_dir: str | None) -> None:
"""Empty the per-query outbox after collection.
Tries a host-side ``rmtree`` first (fast, no container round-trip).
Outbox files are created by the sandbox container as root over the
bind-mount, so when LangBot runs as a non-root user the host delete
fails silently and the files survive — they would then be re-collected
on the next turn that reuses the same query_id. So if anything survives
the host delete, clear it from *inside* the sandbox via exec, where the
container's root can remove its own files. Best-effort: never raise
into the pipeline.
"""
target_dir = f'{self.OUTBOX_MOUNT_DIR}/{query.query_id}'
if host_dir is not None:
import shutil
def _clear() -> bool:
shutil.rmtree(host_dir, ignore_errors=True)
survived = os.path.exists(host_dir) and bool(os.listdir(host_dir))
os.makedirs(host_dir, exist_ok=True)
return survived
survived = await asyncio.to_thread(_clear)
if not survived:
return
# Root-owned container files survived the host delete — fall through.
try:
await self.execute_tool(
{'command': f'rm -rf {target_dir} && mkdir -p {target_dir}', 'timeout_sec': 30},
query,
)
except Exception as exc:
self.ap.logger.warning(f'Failed to clear sandbox outbox {target_dir}: {exc}')
@staticmethod
def _classify_outbound_entries(entries: list[dict]) -> list[dict]:
"""Classify outbox files into Image/Voice/File component descriptors."""
image_exts = {'png', 'jpg', 'jpeg', 'gif', 'webp', 'bmp'}
voice_exts = {'wav', 'mp3', 'silk', 'amr', 'ogg', 'm4a', 'aac'}
mime_by_ext = {
'png': 'image/png',
'jpg': 'image/jpeg',
'jpeg': 'image/jpeg',
'gif': 'image/gif',
'webp': 'image/webp',
'bmp': 'image/bmp',
}
attachments: list[dict] = []
for entry in entries or []:
name = str(entry.get('name', '') or '')
b64 = entry.get('b64')
if not name or not b64:
continue
ext = name.rsplit('.', 1)[-1].lower() if '.' in name else ''
base_name = os.path.basename(name)
if ext in image_exts:
mime = mime_by_ext.get(ext, 'image/png')
attachments.append({'type': 'Image', 'name': base_name, 'base64': f'data:{mime};base64,{b64}'})
elif ext in voice_exts:
attachments.append({'type': 'Voice', 'name': base_name, 'base64': f'data:audio/{ext};base64,{b64}'})
else:
attachments.append({'type': 'File', 'name': base_name, 'base64': b64})
return attachments
async def shutdown(self):
await self.client.shutdown()
@@ -800,11 +1302,19 @@ class BoxService:
def get_recent_errors(self) -> list[dict]:
return list(self._recent_errors)
def get_system_guidance(self) -> str:
def get_system_guidance(self, query_id=None) -> str:
"""Return LLM system-prompt guidance for the exec tool.
All execution-specific prompt text is kept here so that callers
(e.g. LocalAgentRunner) stay free of box domain knowledge.
``query_id`` is the current turn's pipeline query id. When provided,
the guidance ALWAYS advertises the per-query outbox path so the agent
knows how to deliver generated files back to the user — even on turns
where the user sent no inbound attachment (e.g. "generate a QR code"),
which is exactly when the inbound-attachment note never fires. Outbound
collection in the wrapper runs on every turn regardless of inbound
files, so without this the file would be produced and silently dropped.
"""
guidance = (
'When the exec tool is available, use it for exact calculations, statistics, structured data parsing, '
@@ -819,6 +1329,13 @@ class BoxService:
'modify local files in the working directory, use exec with /workspace paths directly; do not ask the '
'user for directory parameters unless they explicitly need a different directory.'
)
if query_id is not None:
outbox_dir = f'{self.OUTBOX_MOUNT_DIR}/{query_id}'
guidance += (
f' If you produce any file (image, audio, document, etc.) that should be sent back to the user, '
f'write it into {outbox_dir}/ (create the directory if needed). Every file placed there will be '
'delivered to the user automatically; do not paste file contents or base64 into your reply.'
)
return guidance
async def get_status(self) -> dict:
+20 -3
View File
@@ -146,13 +146,19 @@ def wrap_python_command_with_env(command: str, *, mount_path: str = '/workspace'
_LB_PIP_CACHE_DIR="{mount_path}/.cache/pip"
mkdir -p "$_LB_META_DIR" "$_LB_TMP_DIR" "$_LB_PIP_CACHE_DIR"
_LB_SYSTEM_PYTHON="$(command -v python3 || command -v python || true)"
if [ -z "$_LB_SYSTEM_PYTHON" ]; then
echo "python3 or python is required to prepare the workspace Python environment" >&2
exit 127
fi
export TMPDIR="$_LB_TMP_DIR"
export TEMP="$_LB_TMP_DIR"
export TMP="$_LB_TMP_DIR"
export PIP_CACHE_DIR="$_LB_PIP_CACHE_DIR"
_lb_python_meta() {{
python - <<'PY'
"$_LB_SYSTEM_PYTHON" - <<'PY'
import hashlib
import json
import os
@@ -201,15 +207,26 @@ def wrap_python_command_with_env(command: str, *, mount_path: str = '/workspace'
_LB_LOCK_WAIT=0
while ! mkdir "$_LB_LOCK_DIR" 2>/dev/null; do
if [ "$_LB_LOCK_WAIT" -ge 120 ]; then
_LB_LOCK_OWNER="$(cat "$_LB_LOCK_DIR/pid" 2>/dev/null || true)"
if [ -n "$_LB_LOCK_OWNER" ] && kill -0 "$_LB_LOCK_OWNER" 2>/dev/null; then
echo "Timed out waiting for active Python environment lock: $_LB_LOCK_DIR" >&2
exit 1
fi
echo "Timed out waiting for Python environment lock, clearing stale lock: $_LB_LOCK_DIR" >&2
rm -rf "$_LB_LOCK_DIR" 2>/dev/null || true
if mkdir "$_LB_LOCK_DIR" 2>/dev/null; then
break
fi
echo "Timed out waiting for Python environment lock: $_LB_LOCK_DIR" >&2
exit 1
fi
sleep 1
_LB_LOCK_WAIT=$((_LB_LOCK_WAIT + 1))
done
printf '%s\\n' "$$" > "$_LB_LOCK_DIR/pid" 2>/dev/null || true
_lb_cleanup_lock() {{
rmdir "$_LB_LOCK_DIR" >/dev/null 2>&1 || true
rm -rf "$_LB_LOCK_DIR" >/dev/null 2>&1 || true
}}
trap _lb_cleanup_lock EXIT INT TERM
@@ -225,7 +242,7 @@ def wrap_python_command_with_env(command: str, *, mount_path: str = '/workspace'
if [ "$_LB_NEEDS_BOOTSTRAP" -eq 1 ]; then
rm -rf "$_LB_VENV_DIR"
python -m venv "$_LB_VENV_DIR"
"$_LB_SYSTEM_PYTHON" -m venv "$_LB_VENV_DIR"
. "$_LB_VENV_DIR/bin/activate"
python -m pip install --upgrade pip setuptools wheel
if [ -f "{mount_path}/requirements.txt" ]; then
+54 -3
View File
@@ -7,6 +7,7 @@ from .. import stage
import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.events as events
@@ -23,6 +24,50 @@ class ResponseWrapper(stage.PipelineStage):
async def initialize(self, pipeline_config: dict):
pass
def _is_final_assistant_message(self, result) -> bool:
"""Whether *result* is the agent's final, tool-call-free answer.
Intermediate streaming chunks and tool-call rounds must NOT trigger
outbound attachment collection — only the terminal assistant message.
"""
if getattr(result, 'role', None) != 'assistant':
return False
if result.tool_calls:
return False
if isinstance(result, provider_message.MessageChunk):
return bool(result.is_final)
return True
async def _append_outbound_attachments(
self,
query: pipeline_query.Query,
message_chain: platform_message.MessageChain,
) -> None:
"""Collect sandbox outbox files and append them to *message_chain*.
Runs at most once per query (guarded by a query variable) and never
raises into the pipeline — attachment delivery is best-effort.
"""
if query.variables.get('_sandbox_outbound_collected'):
return
box_service = getattr(self.ap, 'box_service', None)
if box_service is None or not getattr(box_service, 'available', False):
return
query.variables['_sandbox_outbound_collected'] = True
try:
attachments = await box_service.collect_outbound_attachments(query)
except Exception as e:
self.ap.logger.warning(f'Outbound attachment collection failed: {e}')
return
for att in attachments:
att_type = att.get('type')
if att_type == 'Image':
message_chain.append(platform_message.Image(base64=att['base64']))
elif att_type == 'Voice':
message_chain.append(platform_message.Voice(base64=att['base64']))
else:
message_chain.append(platform_message.File(name=att.get('name', 'file'), base64=att['base64']))
async def process(
self,
query: pipeline_query.Query,
@@ -83,10 +128,16 @@ class ResponseWrapper(stage.PipelineStage):
)
else:
if event_ctx.event.reply_message_chain is not None:
query.resp_message_chain.append(event_ctx.event.reply_message_chain)
reply_chain = event_ctx.event.reply_message_chain
else:
query.resp_message_chain.append(result.get_content_platform_message_chain())
reply_chain = result.get_content_platform_message_chain()
# Attach files the agent produced in the sandbox
# outbox, but only on the terminal assistant message.
if self._is_final_assistant_message(result):
await self._append_outbound_attachments(query, reply_chain)
query.resp_message_chain.append(reply_chain)
yield entities.StageProcessResult(
result_type=entities.ResultType.CONTINUE,
@@ -312,12 +312,18 @@ class WebSocketAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter)
async def _process_image_components(self, message_chain_obj: list):
"""
处理消息链中的图片和文件组件,将path转换为base64
处理消息链中的图片、语音和文件组件,将 path 转换为 base64
Image / Voice / File components uploaded from the web client carry a
storage key in ``path``. Resolve it to a base64 data URI so downstream
stages (multimodal LLM input and the Box sandbox inbox) have a usable
payload, then drop the now-consumed storage object.
Args:
message_chain_obj: 消息链对象列表
"""
import base64
import mimetypes
storage_mgr = self.ap.storage_mgr
@@ -325,31 +331,33 @@ class WebSocketAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter)
comp_type = component.get('type', '')
comp_path = component.get('path', '')
if not comp_path:
if not comp_path or comp_type not in ('Image', 'Voice', 'File'):
continue
if comp_type == 'Image':
try:
file_content = await storage_mgr.storage_provider.load(comp_path)
base64_str = base64.b64encode(file_content).decode('utf-8')
try:
file_content = await storage_mgr.storage_provider.load(comp_path)
base64_str = base64.b64encode(file_content).decode('utf-8')
file_key = comp_path
if file_key.lower().endswith(('.jpg', '.jpeg')):
lowered = comp_path.lower()
if comp_type == 'Image':
if lowered.endswith(('.jpg', '.jpeg')):
mime_type = 'image/jpeg'
elif file_key.lower().endswith('.png'):
mime_type = 'image/png'
elif file_key.lower().endswith('.gif'):
elif lowered.endswith('.gif'):
mime_type = 'image/gif'
elif file_key.lower().endswith('.webp'):
elif lowered.endswith('.webp'):
mime_type = 'image/webp'
else:
mime_type = 'image/png'
elif comp_type == 'Voice':
mime_type = mimetypes.guess_type(comp_path)[0] or 'audio/wav'
else: # File
mime_type = mimetypes.guess_type(comp_path)[0] or 'application/octet-stream'
component['base64'] = f'data:{mime_type};base64,{base64_str}'
await storage_mgr.storage_provider.delete(comp_path)
component['path'] = ''
except Exception as e:
await self.logger.error(f'Failed to load image file {comp_path}: {e}')
component['base64'] = f'data:{mime_type};base64,{base64_str}'
await storage_mgr.storage_provider.delete(comp_path)
component['path'] = ''
except Exception as e:
await self.logger.error(f'Failed to load {comp_type} file {comp_path}: {e}')
async def handle_websocket_message(
self,
+11 -1
View File
@@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
import sqlalchemy
import traceback
@@ -84,8 +85,17 @@ class ModelManager:
self.ap.logger.info('LangBot Space Models service is disabled, skipping sync.')
return
sync_timeout = space_config.get('models_sync_timeout')
try:
await self.sync_new_models_from_space()
if sync_timeout:
await asyncio.wait_for(
self.sync_new_models_from_space(),
timeout=float(sync_timeout),
)
else:
await self.sync_new_models_from_space()
except asyncio.TimeoutError:
self.ap.logger.warning(f'LangBot Space model sync timed out after {sync_timeout}s, skipping startup sync.')
except Exception as e:
self.ap.logger.warning('Failed to sync new models from LangBot Space, model list may not be updated.')
self.ap.logger.warning(f' - Error: {e}')
@@ -216,11 +216,22 @@ class LiteLLMRequester(requester.ProviderAPIRequester):
content = msg_dict.get('content')
if isinstance(content, list):
converted_parts = []
for part in content:
if isinstance(part, dict) and part.get('type') == 'image_base64':
part['image_url'] = {'url': part['image_base64']}
part['type'] = 'image_url'
del part['image_base64']
# OpenAI-compatible chat models reject non-image file parts
# (audio/document base64 or url). These originate from Voice /
# File attachments — including ones replayed from conversation
# history — and the agent already accesses their bytes via the
# sandbox. Drop them from the model payload to avoid
# "Invalid user message ... invalid content type=file_base64".
if isinstance(part, dict) and part.get('type') in ('file_base64', 'file_url'):
continue
converted_parts.append(part)
msg_dict['content'] = converted_parts
req_messages.append(msg_dict)
@@ -352,9 +363,13 @@ class LiteLLMRequester(requester.ProviderAPIRequester):
def _normalize_stream_tool_calls(
self,
raw_tool_calls: typing.Any,
tool_call_state: dict[int, dict[str, str]],
tool_call_state: dict[int, dict[str, typing.Any]],
) -> list[dict] | None:
"""Fill OpenAI-style streaming tool-call deltas so MessageChunk can validate them."""
"""Fill OpenAI-style streaming tool-call deltas so MessageChunk can validate them.
Also preserves provider_specific_fields (e.g., Gemini thought_signature) for
round-tripping to the next request.
"""
if not raw_tool_calls:
return None
@@ -365,35 +380,72 @@ class LiteLLMRequester(requester.ProviderAPIRequester):
if not isinstance(index, int):
index = fallback_index
state = tool_call_state.setdefault(index, {'id': '', 'type': 'function', 'name': ''})
state = tool_call_state.setdefault(
index,
{
'id': '',
'type': 'function',
'name': '',
'provider_specific_fields': None,
},
)
if tool_call.get('id'):
state['id'] = tool_call['id']
if tool_call.get('type'):
state['type'] = tool_call['type']
# Preserve provider_specific_fields from the raw tool call
if 'provider_specific_fields' in tool_call:
state['provider_specific_fields'] = tool_call['provider_specific_fields']
function = self._as_dict(tool_call.get('function'))
if function.get('name'):
state['name'] = function['name']
# Also check function-level provider_specific_fields
if 'provider_specific_fields' in function:
# Merge function-level into tool-level, function-level takes precedence
func_psf = function['provider_specific_fields']
if state['provider_specific_fields']:
merged = {**state['provider_specific_fields'], **func_psf}
state['provider_specific_fields'] = merged
else:
state['provider_specific_fields'] = func_psf
arguments = function.get('arguments')
if arguments is None:
arguments = ''
elif not isinstance(arguments, str):
arguments = str(arguments)
# Some OpenAI-compatible providers (notably Ollama's
# /v1/chat/completions) stream a tool-call delta with an `index` and
# a `function` payload but never emit an OpenAI-style `id`. Without
# an id the call used to be dropped here, so the whole tool call
# silently vanished: a tool-only turn then yielded no content and no
# tool call, the stream "completed" with 0 chars, and the chat
# appeared stuck. Synthesize a stable per-index id so named-but-idless
# tool calls survive. Providers that do send ids keep theirs.
if not state['id'] and state['name']:
state['id'] = f'call_{index}'
if not state['id'] or not state['name']:
continue
normalized.append(
{
'id': state['id'],
'type': state['type'] or 'function',
'function': {
'name': state['name'],
'arguments': arguments,
},
}
)
tool_call_dict: dict[str, typing.Any] = {
'id': state['id'],
'type': state['type'] or 'function',
'function': {
'name': state['name'],
'arguments': arguments,
},
}
# Include provider_specific_fields if present
if state['provider_specific_fields']:
tool_call_dict['provider_specific_fields'] = state['provider_specific_fields']
normalized.append(tool_call_dict)
return normalized or None
@@ -517,7 +569,7 @@ class LiteLLMRequester(requester.ProviderAPIRequester):
chunk_idx = 0
role = 'assistant'
tool_call_state: dict[int, dict[str, str]] = {}
tool_call_state: dict[int, dict[str, typing.Any]] = {}
try:
response = await acompletion(**args)
@@ -567,13 +619,17 @@ class LiteLLMRequester(requester.ProviderAPIRequester):
chunk_idx += 1
continue
chunk_data = {
chunk_data: dict[str, typing.Any] = {
'role': role,
'content': delta_content if delta_content else None,
'tool_calls': tool_calls,
'is_final': bool(finish_reason),
}
# Preserve provider_specific_fields from delta (e.g., Gemini thought_signatures)
if delta.get('provider_specific_fields'):
chunk_data['provider_specific_fields'] = delta['provider_specific_fields']
chunk_data = {k: v for k, v in chunk_data.items() if v is not None}
yield provider_message.MessageChunk(**chunk_data)
chunk_idx += 1
@@ -3,8 +3,8 @@ kind: LLMAPIRequester
metadata:
name: moonshot-chat-completions
label:
en_US: Moonshot
zh_Hans: 月之暗面
en_US: Moonshot / Kimi (Global · api.moonshot.ai)
zh_Hans: 月之暗面 / Kimi(国际站 · api.moonshot.ai
icon: moonshot.png
spec:
litellm_provider: openai
@@ -0,0 +1,33 @@
apiVersion: v1
kind: LLMAPIRequester
metadata:
name: moonshot-cn-chat-completions
label:
en_US: Moonshot / Kimi (China · api.moonshot.cn)
zh_Hans: 月之暗面 / Kimi(国内站 · api.moonshot.cn
icon: moonshot.png
spec:
litellm_provider: openai
config:
- name: base_url
label:
en_US: Base URL
zh_Hans: 基础 URL
type: string
required: true
default: https://api.moonshot.cn/v1
- name: timeout
label:
en_US: Timeout
zh_Hans: 超时时间
type: integer
required: true
default: 120
alias: "moonshot Moonshot 月之暗面 月暗 kimi Kimi 月之 暗面 moonshot-v1 k2 cn 国内 国内站"
support_type:
- llm
provider_category: manufacturer
execution:
python:
path: ./moonshotchatcmpl.py
attr: MoonshotChatCompletions
+69 -1
View File
@@ -104,6 +104,68 @@ class _StreamAccumulator:
class LocalAgentRunner(runner.RequestRunner):
"""Local agent request runner"""
async def _inject_inbound_attachments(
self,
query: pipeline_query.Query,
user_message: provider_message.Message,
) -> None:
"""Persist inbound attachments into the sandbox and tell the model.
No-op when the box service is unavailable or there are no attachments.
On success, appends an extra text ContentElement to the user message
listing the in-sandbox paths and the outbox convention, and stashes the
descriptors in ``query.variables['_sandbox_inbound_attachments']``.
"""
box_service = getattr(self.ap, 'box_service', None)
if box_service is None or not getattr(box_service, 'available', False):
return
try:
attachments = await box_service.materialize_inbound_attachments(query)
except Exception as e: # never break the chat turn over attachment IO
self.ap.logger.warning(f'Inbound attachment materialization failed: {e}')
return
if not attachments:
return
query.variables['_sandbox_inbound_attachments'] = attachments
lines = [
'The user sent attachments. They have been saved into the sandbox and are '
'available to the exec/read/write tools at these paths:'
]
for att in attachments:
lines.append(f'- {att["type"]}: {att["path"]} ({att["size"]} bytes)')
outbox_dir = f'{box_service.OUTBOX_MOUNT_DIR}/{query.query_id}'
lines.append(
'If you produce any file (image, audio, document, etc.) that should be sent '
f'back to the user, write it into {outbox_dir}/ (create the directory if '
'needed). Every file placed there will be delivered to the user automatically.'
)
note = '\n'.join(lines)
# Voice/File attachments are now available to the agent via the sandbox
# (exec/read/write tools). Their raw bytes must NOT be forwarded to the
# chat model as multimodal content: providers reject non-image file
# parts ("Invalid user message ... ensure all user messages are valid
# OpenAI chat completion messages"). Strip those content elements and
# rely on the sandbox-path note instead. Images are kept so vision
# models can still see them.
_model_unsafe_types = {'file_base64', 'file_url'}
if isinstance(user_message.content, list):
user_message.content = [
ce for ce in user_message.content if getattr(ce, 'type', None) not in _model_unsafe_types
]
if isinstance(user_message.content, str):
user_message.content = [
provider_message.ContentElement.from_text(user_message.content),
provider_message.ContentElement.from_text(note),
]
elif isinstance(user_message.content, list):
user_message.content.append(provider_message.ContentElement.from_text(note))
else:
user_message.content = [provider_message.ContentElement.from_text(note)]
def _build_request_messages(
self,
query: pipeline_query.Query,
@@ -115,7 +177,7 @@ class LocalAgentRunner(runner.RequestRunner):
req_messages.append(
provider_message.Message(
role='system',
content=self.ap.box_service.get_system_guidance(),
content=self.ap.box_service.get_system_guidance(query.query_id),
)
)
@@ -232,6 +294,12 @@ class LocalAgentRunner(runner.RequestRunner):
user_message = copy.deepcopy(query.user_message)
# Materialize inbound attachments (images / voices / files) into the
# sandbox so the agent's exec/read/write tools can operate on the real
# bytes — not just the multimodal copy the model sees. The exact
# in-sandbox paths are announced to the model as a system note.
await self._inject_inbound_attachments(query, user_message)
user_message_text = ''
if isinstance(user_message.content, str):
@@ -0,0 +1,18 @@
from __future__ import annotations
from typing import Any
async def is_box_backend_available(ap: Any) -> bool:
"""Return whether the configured Box backend is ready for tool execution."""
box_service = getattr(ap, 'box_service', None)
if box_service is None:
return False
if not getattr(box_service, 'available', False):
return False
try:
status = await box_service.get_status()
backend_info = status.get('backend', {})
return bool(backend_info.get('available', False))
except Exception:
return False
@@ -5,6 +5,8 @@ import asyncio
import os
import shutil
import shlex
import threading
from contextlib import suppress
from typing import TYPE_CHECKING, Any
import pydantic
@@ -18,12 +20,26 @@ from ....box.workspace import (
rewrite_mounted_path,
rewrite_venv_command,
unwrap_venv_path,
wrap_python_command_with_env,
)
if TYPE_CHECKING:
from .mcp import RuntimeMCPSession
_WORKSPACE_COPY_LOCKS: dict[str, threading.Lock] = {}
_WORKSPACE_COPY_LOCKS_GUARD = threading.Lock()
def _workspace_copy_lock(path: str) -> threading.Lock:
with _WORKSPACE_COPY_LOCKS_GUARD:
lock = _WORKSPACE_COPY_LOCKS.get(path)
if lock is None:
lock = threading.Lock()
_WORKSPACE_COPY_LOCKS[path] = lock
return lock
class MCPSessionErrorPhase(enum.Enum):
"""Which phase of the MCP lifecycle failed."""
@@ -49,7 +65,7 @@ class MCPServerBoxConfig(pydantic.BaseModel):
host_path: str | None = None
host_path_mode: str = 'ro' # MCP servers default to read-write mount only when explicitly requested
env: dict[str, str] = pydantic.Field(default_factory=dict)
startup_timeout_sec: int = 120 # Longer default to allow dependency bootstrap
startup_timeout_sec: int = 300 # First Docker bootstrap may need to build a venv and install MCP deps.
cpus: float | None = None
memory_mb: int | None = None
pids_limit: int | None = None
@@ -128,6 +144,7 @@ class BoxStdioSessionRuntime:
workspace = self._build_workspace(host_path=None)
host_path = self.resolve_host_path()
process_cwd = '/workspace'
install_cmd: str | None = None
try:
await workspace.create_session()
@@ -168,6 +185,8 @@ class BoxStdioSessionRuntime:
env=self.server_config.get('env', {}),
cwd=process_cwd,
)
if install_cmd:
payload = self._wrap_process_payload_with_python_env(payload, process_cwd)
payload['process_id'] = self.process_id
await workspace.box_service.start_managed_process(workspace.session_id, payload)
except Exception:
@@ -253,14 +272,42 @@ class BoxStdioSessionRuntime:
@staticmethod
def _copy_workspace_tree(source_path: str, process_host_root: str, process_host_workspace: str) -> None:
shutil.rmtree(process_host_root, ignore_errors=True)
os.makedirs(process_host_root, exist_ok=True)
shutil.copytree(
source_path,
process_host_workspace,
symlinks=True,
ignore=shutil.ignore_patterns('.git', '__pycache__', '.pytest_cache', '.mypy_cache', '.ruff_cache'),
)
# Docker-backed bootstrap writes root-owned runtime directories such as
# .venv/.tmp into the staged workspace. The host process may not be able
# to delete them, so refresh source files in place and preserve runtime
# directories instead of rmtree'ing the whole staging root.
with _workspace_copy_lock(process_host_root):
preserved_names = {'.venv', 'venv', 'env', '.cache', '.tmp', '.langbot'}
os.makedirs(process_host_workspace, exist_ok=True)
for name in os.listdir(process_host_workspace):
if name in preserved_names:
continue
path = os.path.join(process_host_workspace, name)
if os.path.isdir(path) and not os.path.islink(path):
shutil.rmtree(path, ignore_errors=True)
else:
# The entry may disappear between listdir and unlink if cleanup races us.
with suppress(FileNotFoundError):
os.unlink(path)
shutil.copytree(
source_path,
process_host_workspace,
symlinks=True,
dirs_exist_ok=True,
ignore=shutil.ignore_patterns(
'.git',
'__pycache__',
'.pytest_cache',
'.mypy_cache',
'.ruff_cache',
'.venv',
'venv',
'env',
'.cache',
'.tmp',
'.langbot',
),
)
async def _cleanup_staged_workspace(self) -> None:
if not self.resolve_host_path():
@@ -343,23 +390,25 @@ class BoxStdioSessionRuntime:
@staticmethod
def detect_install_command(host_path: str, workspace_path: str = '/workspace') -> str | None:
workspace_kind = classify_python_workspace(host_path)
quoted_workspace_path = shlex.quote(workspace_path)
if workspace_kind == 'package':
return (
'mkdir -p /opt/_lb_src'
f' && tar -C {quoted_workspace_path}'
' --exclude=.venv --exclude=.git --exclude=__pycache__'
' --exclude=node_modules --exclude=.tox --exclude=.nox'
' --exclude="*.egg-info" --exclude=.uv-cache'
' -cf - .'
' | tar -C /opt/_lb_src -xf -'
' && pip install --no-cache-dir /opt/_lb_src'
' && rm -rf /opt/_lb_src'
)
if workspace_kind == 'requirements':
return f'pip install --no-cache-dir -r {quoted_workspace_path}/requirements.txt'
if workspace_kind in {'package', 'requirements'}:
return wrap_python_command_with_env('python -c "pass"', mount_path=workspace_path).rstrip()
return None
@staticmethod
def _wrap_process_payload_with_python_env(payload: dict[str, Any], workspace_path: str) -> dict[str, Any]:
"""Start a prepared Python workspace without writing bootstrap output to MCP stdio."""
workspace_root = workspace_path.rstrip('/') or '/workspace'
venv_dir = f'{workspace_root}/.venv'
venv_bin = f'{venv_dir}/bin'
command = ' '.join([shlex.quote(payload['command']), *[shlex.quote(arg) for arg in payload.get('args', [])]])
wrapped = dict(payload)
wrapped['command'] = 'sh'
wrapped['args'] = [
'-lc',
(f'export VIRTUAL_ENV={shlex.quote(venv_dir)}; export PATH={shlex.quote(venv_bin)}:$PATH; exec {command}'),
]
return wrapped
def build_box_session_payload(self, session_id: str, host_path: str | None = None) -> dict[str, Any]:
workspace = self._build_workspace()
workspace.session_id = session_id
+532 -71
View File
@@ -1,5 +1,6 @@
from __future__ import annotations
import base64
import json
import os
@@ -8,6 +9,7 @@ from langbot_plugin.api.entities.events import pipeline_query
from .. import loader
from ..errors import ToolNotFoundError
from .availability import is_box_backend_available
from . import skill as skill_loader
EXEC_TOOL_NAME = 'exec'
@@ -22,6 +24,15 @@ _ALL_TOOL_NAMES = {EXEC_TOOL_NAME, READ_TOOL_NAME, WRITE_TOOL_NAME, EDIT_TOOL_NA
# Skip these dirs during grep walk to avoid noise
_SKIP_DIRS = {'.git', 'node_modules', '__pycache__', '.venv', 'venv', '.tox', 'dist', 'build'}
_DEFAULT_READ_MAX_LINES = 2000
_MAX_READ_MAX_LINES = 10000
_DEFAULT_TOOL_RESULT_MAX_BYTES = 50 * 1024
_BOX_FILE_SCRIPT_MAX_BYTES = 2048
_GLOB_MAX_MATCHES = 100
_GREP_MAX_MATCHES = 200
_GREP_MAX_FILES = 5000
_GREP_MAX_LINE_CHARS = 500
class NativeToolLoader(loader.ToolLoader):
def __init__(self, ap):
@@ -43,18 +54,7 @@ class NativeToolLoader(loader.ToolLoader):
async def _check_backend_available(self) -> bool:
"""Check if the box backend is truly available (not just the runtime)."""
box_service = getattr(self.ap, 'box_service', None)
if box_service is None:
return False
if not getattr(box_service, 'available', False):
return False
# Check if backend is truly available via get_status
try:
status = await box_service.get_status()
backend_info = status.get('backend', {})
return backend_info.get('available', False)
except Exception:
return False
return await is_box_backend_available(self.ap)
async def get_tools(self, bound_plugins: list[str] | None = None) -> list[resource_tool.LLMTool]:
if not self._is_sandbox_available():
@@ -139,6 +139,7 @@ class NativeToolLoader(loader.ToolLoader):
# via execute_tool. Skills are mounted at /workspace/.skills/{name}/
# via extra_mounts built by BoxService.
result = await self.ap.box_service.execute_tool(parameters, query)
result = self._normalize_exec_result(result)
if selected_skill is not None:
self._refresh_skill_from_disk(selected_skill)
@@ -227,34 +228,121 @@ class NativeToolLoader(loader.ToolLoader):
except Exception:
return {'ok': False, 'error': stdout or 'Box file operation returned no result'}
async def _read_workspace_via_box(self, path: str, query: pipeline_query.Query) -> dict:
async def _read_workspace_via_box(self, path: str, parameters: dict, query: pipeline_query.Query) -> dict:
offset = self._positive_int(parameters.get('offset'), default=1)
byte_offset = self._non_negative_int(parameters.get('byte_offset'), default=0)
max_lines = self._positive_int(
parameters.get('limit'),
default=_DEFAULT_READ_MAX_LINES,
max_value=_MAX_READ_MAX_LINES,
)
# Box file fallback returns through exec stdout, which is already capped
# by BoxService. Keep this payload small enough to remain valid JSON.
max_bytes = min(
self._positive_int(parameters.get('max_bytes'), default=_DEFAULT_TOOL_RESULT_MAX_BYTES),
_BOX_FILE_SCRIPT_MAX_BYTES,
)
encoding = self._read_encoding(parameters)
script = f"""
import json, os
import base64, json, os
path = {json.dumps(path)}
offset = {offset}
byte_offset = {byte_offset}
max_lines = {max_lines}
max_bytes = {max_bytes}
encoding = {json.dumps(encoding)}
if not path.startswith('/workspace'):
print(json.dumps({{'ok': False, 'error': 'Path must be under /workspace.'}}))
elif not os.path.exists(path):
print(json.dumps({{'ok': False, 'error': f'File not found: {{path}}'}}))
elif os.path.isdir(path):
print(json.dumps({{'ok': True, 'content': '\\n'.join(sorted(os.listdir(path))), 'is_directory': True}}))
entries = sorted(os.listdir(path))
content = '\\n'.join(entries)
print(json.dumps({{'ok': True, 'content': content, 'is_directory': True, 'total': len(entries), 'truncated': False}}))
elif encoding == 'base64':
size_bytes = os.path.getsize(path)
with open(path, 'rb') as f:
f.seek(byte_offset)
data = f.read(max_bytes + 1)
chunk = data[:max_bytes]
has_more = len(data) > max_bytes
print(json.dumps({{
'ok': True,
'content': base64.b64encode(chunk).decode('ascii'),
'encoding': 'base64',
'byte_offset': byte_offset,
'length': len(chunk),
'size_bytes': size_bytes,
'has_more': has_more,
'next_byte_offset': byte_offset + len(chunk) if has_more else None,
'max_bytes': max_bytes,
}}))
else:
lines = []
output_bytes = 0
end_line = offset - 1
truncated = False
next_offset = None
with open(path, 'r', encoding='utf-8', errors='replace') as f:
print(json.dumps({{'ok': True, 'content': f.read()}}))
for line_number, line in enumerate(f, 1):
if line_number < offset:
continue
if len(lines) >= max_lines:
truncated = True
next_offset = line_number
break
line_bytes = len(line.encode('utf-8'))
if output_bytes + line_bytes > max_bytes:
truncated = True
next_offset = line_number
break
lines.append(line.rstrip('\\n'))
output_bytes += line_bytes
end_line = line_number
print(json.dumps({{
'ok': True,
'content': '\\n'.join(lines),
'truncated': truncated,
'start_line': offset,
'end_line': end_line,
'next_offset': next_offset,
'max_lines': max_lines,
'max_bytes': max_bytes,
}}))
""".strip()
return await self._run_workspace_file_script(script, query)
async def _write_workspace_via_box(self, path: str, content: str, query: pipeline_query.Query) -> dict:
async def _write_workspace_via_box(
self,
path: str,
content: str,
parameters: dict,
query: pipeline_query.Query,
) -> dict:
encoding, mode = self._write_options(parameters)
script = f"""
import json, os
import base64, json, os
path = {json.dumps(path)}
content = {json.dumps(content)}
encoding = {json.dumps(encoding)}
mode = {json.dumps(mode)}
if not path.startswith('/workspace'):
print(json.dumps({{'ok': False, 'error': 'Path must be under /workspace.'}}))
else:
os.makedirs(os.path.dirname(path) or '/workspace', exist_ok=True)
with open(path, 'w', encoding='utf-8') as f:
f.write(content)
print(json.dumps({{'ok': True, 'path': path}}))
if encoding == 'base64':
try:
data = base64.b64decode(content, validate=True)
except Exception as exc:
print(json.dumps({{'ok': False, 'error': f'invalid base64 content: {{exc}}'}}))
else:
with open(path, 'ab' if mode == 'append' else 'wb') as f:
f.write(data)
print(json.dumps({{'ok': True, 'path': path}}))
else:
with open(path, 'a' if mode == 'append' else 'w', encoding='utf-8') as f:
f.write(content)
print(json.dumps({{'ok': True, 'path': path}}))
""".strip()
return await self._run_workspace_file_script(script, query)
@@ -307,12 +395,27 @@ else:
if not any(part in skip_dirs for part in item.parts)
]
hits.sort(key=lambda item: item.stat().st_mtime if item.exists() else 0, reverse=True)
shown = hits[:100]
shown = hits[:{_GLOB_MAX_MATCHES}]
matches = []
output_bytes = 0
truncated_by_bytes = False
for item in shown:
rel = os.path.relpath(str(item), path)
matches.append(os.path.join(path, rel).replace(os.sep, '/'))
print(json.dumps({{'ok': True, 'matches': matches, 'total': len(hits), 'truncated': len(hits) > 100}}))
sandbox_path = os.path.join(path, rel).replace(os.sep, '/')
entry_bytes = len(sandbox_path.encode('utf-8')) + (1 if matches else 0)
if output_bytes + entry_bytes > {_DEFAULT_TOOL_RESULT_MAX_BYTES}:
truncated_by_bytes = True
break
matches.append(sandbox_path)
output_bytes += entry_bytes
print(json.dumps({{
'ok': True,
'matches': matches,
'preview': '\\n'.join(matches),
'total': len(hits),
'truncated': len(hits) > len(matches) or truncated_by_bytes,
'truncated_by': 'bytes' if truncated_by_bytes else ('matches' if len(hits) > len(matches) else None),
}}))
""".strip()
return await self._run_workspace_file_script(script, query)
@@ -350,29 +453,54 @@ else:
continue
if item.is_file():
files.append(item)
if len(files) >= 5000:
if len(files) >= {_GREP_MAX_FILES}:
break
matches = []
output_bytes = 0
truncated_by = None
for fp in files:
try:
text = fp.read_text(errors='ignore')
handle = fp.open('r', encoding='utf-8', errors='ignore')
except OSError:
continue
for lineno, line in enumerate(text.splitlines(), 1):
if regex.search(line):
if base.is_file():
file_path = path
else:
rel = os.path.relpath(str(fp), path)
file_path = os.path.join(path, rel).replace(os.sep, '/')
matches.append({{'file': file_path, 'line': lineno, 'content': line.rstrip()}})
if len(matches) >= 200:
break
if len(matches) >= 200:
with handle:
for lineno, line in enumerate(handle, 1):
if regex.search(line):
if base.is_file():
file_path = path
else:
rel = os.path.relpath(str(fp), path)
file_path = os.path.join(path, rel).replace(os.sep, '/')
content = line.rstrip()
line_truncated = False
if len(content) > {_GREP_MAX_LINE_CHARS}:
content = content[:{_GREP_MAX_LINE_CHARS}] + '... [truncated]'
line_truncated = True
entry = {{'file': file_path, 'line': lineno, 'content': content}}
entry_bytes = len(json.dumps(entry, ensure_ascii=False).encode('utf-8')) + 1
if output_bytes + entry_bytes > {_DEFAULT_TOOL_RESULT_MAX_BYTES}:
truncated_by = 'bytes'
break
if line_truncated and truncated_by is None:
truncated_by = 'line'
matches.append(entry)
output_bytes += entry_bytes
if len(matches) >= {_GREP_MAX_MATCHES}:
truncated_by = truncated_by or 'matches'
break
if truncated_by == 'bytes' or len(matches) >= {_GREP_MAX_MATCHES}:
break
if truncated_by == 'bytes' or len(matches) >= {_GREP_MAX_MATCHES}:
break
print(json.dumps({{'ok': True, 'matches': matches, 'total': len(matches), 'truncated': len(matches) >= 200}}))
print(json.dumps({{
'ok': True,
'matches': matches,
'total': len(matches),
'truncated': truncated_by is not None,
'truncated_by': truncated_by,
}}))
""".strip()
return await self._run_workspace_file_script(script, query)
@@ -387,14 +515,20 @@ else:
)
if skill_request is not None and hasattr(self.ap.box_service, 'read_skill_file'):
selected_skill, relative = skill_request
host_path = self._resolve_skill_host_path(selected_skill, relative)
if host_path and os.path.exists(host_path):
if os.path.isdir(host_path):
return self._build_directory_result(os.listdir(host_path))
return self._read_text_file_preview(host_path, parameters)
try:
result = await self.ap.box_service.read_skill_file(selected_skill['name'], relative)
return {'ok': True, 'content': result.get('content', '')}
return self._build_read_result_from_text(str(result.get('content', '')), parameters)
except Exception:
try:
result = await self.ap.box_service.list_skill_files(selected_skill['name'], relative)
entries = [entry['name'] for entry in result.get('entries', [])]
return {'ok': True, 'content': '\n'.join(sorted(entries)), 'is_directory': True}
return self._build_directory_result(entries)
except Exception as exc:
return {'ok': False, 'error': str(exc)}
@@ -405,20 +539,19 @@ else:
include_activated=True,
)
if self._should_use_box_workspace_files(selected_skill):
return await self._read_workspace_via_box(path, query)
return await self._read_workspace_via_box(path, parameters, query)
if not os.path.exists(host_path):
return {'ok': False, 'error': f'File not found: {path}'}
if os.path.isdir(host_path):
entries = os.listdir(host_path)
return {'ok': True, 'content': '\n'.join(sorted(entries)), 'is_directory': True}
with open(host_path, 'r', errors='replace') as f:
content = f.read()
return {'ok': True, 'content': content}
return self._build_directory_result(entries)
return self._read_text_file_preview(host_path, parameters)
async def _invoke_write(self, parameters: dict, query: pipeline_query.Query) -> dict:
path = parameters['path']
content = parameters['content']
self.ap.logger.info(f'write tool invoked: query_id={query.query_id} path={path} length={len(content)}')
encoding, _mode = self._write_options(parameters)
skill_request = self._resolve_skill_relative_path(
query,
path,
@@ -426,6 +559,8 @@ else:
include_activated=True,
)
if skill_request is not None and hasattr(self.ap.box_service, 'write_skill_file'):
if encoding != 'text':
return {'ok': False, 'error': 'base64 writes to skill packages are not supported.'}
selected_skill, relative = skill_request
await self.ap.box_service.write_skill_file(selected_skill['name'], relative, content)
await self.ap.skill_mgr.reload_skills()
@@ -438,10 +573,12 @@ else:
include_activated=True,
)
if self._should_use_box_workspace_files(selected_skill):
return await self._write_workspace_via_box(path, content, query)
return await self._write_workspace_via_box(path, content, parameters, query)
os.makedirs(os.path.dirname(host_path), exist_ok=True)
with open(host_path, 'w', encoding='utf-8') as f:
f.write(content)
try:
self._write_host_file(host_path, content, parameters)
except ValueError as exc:
return {'ok': False, 'error': str(exc)}
self._refresh_skill_from_disk(selected_skill)
return {'ok': True, 'path': path}
@@ -584,6 +721,40 @@ else:
'type': 'string',
'description': 'Absolute path to the file (must be under /workspace).',
},
'offset': {
'type': 'integer',
'description': '1-indexed line number to start reading from. Defaults to 1.',
'default': 1,
'minimum': 1,
},
'limit': {
'type': 'integer',
'description': f'Maximum number of lines to return. Defaults to {_DEFAULT_READ_MAX_LINES}.',
'default': _DEFAULT_READ_MAX_LINES,
'minimum': 1,
'maximum': _MAX_READ_MAX_LINES,
},
'max_bytes': {
'type': 'integer',
'description': (
f'Maximum bytes of file content to return. Defaults to {_DEFAULT_TOOL_RESULT_MAX_BYTES}.'
),
'default': _DEFAULT_TOOL_RESULT_MAX_BYTES,
'minimum': 1,
'maximum': _DEFAULT_TOOL_RESULT_MAX_BYTES,
},
'encoding': {
'type': 'string',
'description': 'Return text by default, or base64 for binary byte-range reads.',
'enum': ['text', 'base64'],
'default': 'text',
},
'byte_offset': {
'type': 'integer',
'description': '0-indexed byte offset used when encoding is base64. Defaults to 0.',
'default': 0,
'minimum': 0,
},
},
'required': ['path'],
'additionalProperties': False,
@@ -609,7 +780,19 @@ else:
},
'content': {
'type': 'string',
'description': 'Content to write to the file.',
'description': 'Text content, or base64 content when encoding is base64.',
},
'encoding': {
'type': 'string',
'description': 'Write content as text by default, or decode it from base64 for binary files.',
'enum': ['text', 'base64'],
'default': 'text',
},
'mode': {
'type': 'string',
'description': 'Overwrite the file by default, or append to it.',
'enum': ['overwrite', 'append'],
'default': 'overwrite',
},
},
'required': ['path', 'content'],
@@ -740,22 +923,30 @@ else:
hits.sort(key=lambda p: p.stat().st_mtime if p.exists() else 0, reverse=True)
total = len(hits)
shown = hits[:100]
shown = hits[:_GLOB_MAX_MATCHES]
# Convert back to sandbox paths
sandbox_paths = []
output_bytes = 0
truncated_by_bytes = False
for h in shown:
rel = os.path.relpath(str(h), host_path)
sandbox_path = os.path.join(path, rel)
entry_bytes = len(sandbox_path.encode('utf-8')) + (1 if sandbox_paths else 0)
if output_bytes + entry_bytes > _DEFAULT_TOOL_RESULT_MAX_BYTES:
truncated_by_bytes = True
break
sandbox_paths.append(sandbox_path)
output_bytes += entry_bytes
result_lines = sandbox_paths
result = '\n'.join(result_lines)
if total > 100:
result += f'\n... ({total} matches, showing first 100)'
return {'ok': True, 'matches': result_lines, 'total': total, 'truncated': total > 100}
return {
'ok': True,
'matches': sandbox_paths,
'preview': '\n'.join(sandbox_paths),
'total': total,
'truncated': total > len(sandbox_paths) or truncated_by_bytes,
'truncated_by': 'bytes' if truncated_by_bytes else ('matches' if total > len(sandbox_paths) else None),
}
async def _invoke_grep(self, parameters: dict, query: pipeline_query.Query) -> dict:
pattern = parameters['pattern']
@@ -791,32 +982,46 @@ else:
files = self._grep_walk(base, include)
matches = []
output_bytes = 0
truncated_by = None
for fp in files:
try:
text = fp.read_text(errors='ignore')
handle = fp.open('r', encoding='utf-8', errors='ignore')
except OSError:
continue
for lineno, line in enumerate(text.splitlines(), 1):
if regex.search(line):
rel = os.path.relpath(str(fp), host_path)
sandbox_path = os.path.join(path, rel)
matches.append(
{
with handle:
for lineno, line in enumerate(handle, 1):
if regex.search(line):
rel = os.path.relpath(str(fp), host_path)
sandbox_path = os.path.join(path, rel)
content, line_truncated = self._truncate_grep_line(line.rstrip())
entry = {
'file': sandbox_path,
'line': lineno,
'content': line.rstrip(),
'content': content,
}
)
if len(matches) >= 200:
break
if len(matches) >= 200:
entry_bytes = len(json.dumps(entry, ensure_ascii=False).encode('utf-8')) + 1
if output_bytes + entry_bytes > _DEFAULT_TOOL_RESULT_MAX_BYTES:
truncated_by = 'bytes'
break
if line_truncated and truncated_by is None:
truncated_by = 'line'
matches.append(entry)
output_bytes += entry_bytes
if len(matches) >= _GREP_MAX_MATCHES:
truncated_by = truncated_by or 'matches'
break
if truncated_by == 'bytes' or len(matches) >= _GREP_MAX_MATCHES:
break
if truncated_by == 'bytes' or len(matches) >= _GREP_MAX_MATCHES:
break
return {
'ok': True,
'matches': matches,
'total': len(matches),
'truncated': len(matches) >= 200,
'truncated': truncated_by is not None,
'truncated_by': truncated_by,
}
@staticmethod
@@ -828,10 +1033,266 @@ else:
continue
if item.is_file():
results.append(item)
if len(results) >= 5000:
if len(results) >= _GREP_MAX_FILES:
break
return results
@staticmethod
def _resolve_skill_host_path(selected_skill: dict, relative: str) -> str | None:
package_root = str(selected_skill.get('package_root', '') or '').strip()
if not package_root:
return None
host_root = os.path.realpath(package_root)
host_path = os.path.realpath(os.path.join(host_root, relative))
if not (host_path == host_root or host_path.startswith(host_root + os.sep)):
raise ValueError('Path escapes the skill package boundary.')
return host_path
def _normalize_exec_result(self, result: dict) -> dict:
normalized = dict(result)
stdout = str(normalized.get('stdout') or '')
stderr = str(normalized.get('stderr') or '')
stdout, stdout_capped = self._truncate_text_to_bytes_with_flag(stdout, _DEFAULT_TOOL_RESULT_MAX_BYTES)
stderr, stderr_capped = self._truncate_text_to_bytes_with_flag(stderr, _DEFAULT_TOOL_RESULT_MAX_BYTES)
normalized['stdout'] = stdout
normalized['stderr'] = stderr
normalized['stdout_truncated'] = bool(normalized.get('stdout_truncated') or stdout_capped)
normalized['stderr_truncated'] = bool(normalized.get('stderr_truncated') or stderr_capped)
if stdout and stderr:
preview_raw = f'stdout:\n{stdout}\n\nstderr:\n{stderr}'
else:
preview_raw = stdout or stderr
preview, preview_capped = self._truncate_text_to_bytes_with_flag(preview_raw, _DEFAULT_TOOL_RESULT_MAX_BYTES)
normalized['preview'] = preview
normalized['truncated'] = bool(
normalized['stdout_truncated'] or normalized['stderr_truncated'] or preview_capped
)
if preview_capped and not normalized.get('truncated_by'):
normalized['truncated_by'] = 'bytes'
return normalized
def _build_directory_result(self, entries: list[str]) -> dict:
sorted_entries = sorted(str(entry) for entry in entries)
content = '\n'.join(sorted_entries)
preview = self._truncate_text_to_bytes(content, _DEFAULT_TOOL_RESULT_MAX_BYTES)
truncated = preview != content
return {
'ok': True,
'content': preview,
'is_directory': True,
'total': len(sorted_entries),
'truncated': truncated,
'truncated_by': 'bytes' if truncated else None,
}
def _read_text_file_preview(self, host_path: str, parameters: dict) -> dict:
if self._read_encoding(parameters) == 'base64':
return self._read_binary_file_chunk(host_path, parameters)
offset = self._positive_int(parameters.get('offset'), default=1)
max_lines = self._positive_int(
parameters.get('limit'),
default=_DEFAULT_READ_MAX_LINES,
max_value=_MAX_READ_MAX_LINES,
)
max_bytes = self._positive_int(
parameters.get('max_bytes'),
default=_DEFAULT_TOOL_RESULT_MAX_BYTES,
max_value=_DEFAULT_TOOL_RESULT_MAX_BYTES,
)
lines: list[str] = []
output_bytes = 0
end_line = offset - 1
truncated = False
truncated_by: str | None = None
next_offset: int | None = None
with open(host_path, 'r', encoding='utf-8', errors='replace') as f:
for line_number, line in enumerate(f, 1):
if line_number < offset:
continue
if len(lines) >= max_lines:
truncated = True
truncated_by = 'lines'
next_offset = line_number
break
line_bytes = len(line.encode('utf-8'))
if output_bytes + line_bytes > max_bytes:
truncated = True
truncated_by = 'bytes'
next_offset = line_number
break
lines.append(line.rstrip('\n'))
output_bytes += line_bytes
end_line = line_number
if not lines and truncated_by == 'bytes':
content = (
f'[Line {next_offset or offset} exceeds the {self._format_size(max_bytes)} read limit. '
'Use exec with a byte-range command for this line, or read a different offset.]'
)
else:
content = '\n'.join(lines)
return {
'ok': True,
'content': content,
'truncated': truncated,
'truncated_by': truncated_by,
'start_line': offset,
'end_line': end_line,
'next_offset': next_offset,
'max_lines': max_lines,
'max_bytes': max_bytes,
}
def _read_binary_file_chunk(self, host_path: str, parameters: dict) -> dict:
byte_offset = self._non_negative_int(parameters.get('byte_offset'), default=0)
max_bytes = self._positive_int(
parameters.get('max_bytes'),
default=_DEFAULT_TOOL_RESULT_MAX_BYTES,
max_value=_DEFAULT_TOOL_RESULT_MAX_BYTES,
)
size_bytes = os.path.getsize(host_path)
with open(host_path, 'rb') as f:
f.seek(byte_offset)
data = f.read(max_bytes + 1)
chunk = data[:max_bytes]
has_more = len(data) > max_bytes
return {
'ok': True,
'content': base64.b64encode(chunk).decode('ascii'),
'encoding': 'base64',
'byte_offset': byte_offset,
'length': len(chunk),
'size_bytes': size_bytes,
'has_more': has_more,
'next_byte_offset': byte_offset + len(chunk) if has_more else None,
'max_bytes': max_bytes,
}
def _write_host_file(self, host_path: str, content: str, parameters: dict) -> None:
encoding, mode = self._write_options(parameters)
if encoding == 'base64':
try:
data = base64.b64decode(content, validate=True)
except Exception as exc:
raise ValueError(f'invalid base64 content: {exc}') from exc
with open(host_path, 'ab' if mode == 'append' else 'wb') as f:
f.write(data)
return
with open(host_path, 'a' if mode == 'append' else 'w', encoding='utf-8') as f:
f.write(content)
@staticmethod
def _read_encoding(parameters: dict) -> str:
return 'base64' if parameters.get('encoding') == 'base64' else 'text'
@staticmethod
def _write_options(parameters: dict) -> tuple[str, str]:
encoding = 'base64' if parameters.get('encoding') == 'base64' else 'text'
mode = 'append' if parameters.get('mode') == 'append' else 'overwrite'
return encoding, mode
def _build_read_result_from_text(self, content: str, parameters: dict) -> dict:
offset = self._positive_int(parameters.get('offset'), default=1)
max_lines = self._positive_int(
parameters.get('limit'),
default=_DEFAULT_READ_MAX_LINES,
max_value=_MAX_READ_MAX_LINES,
)
max_bytes = self._positive_int(
parameters.get('max_bytes'),
default=_DEFAULT_TOOL_RESULT_MAX_BYTES,
max_value=_DEFAULT_TOOL_RESULT_MAX_BYTES,
)
all_lines = content.splitlines()
start_index = offset - 1
if start_index >= len(all_lines) and all_lines:
return {'ok': False, 'error': f'Offset {offset} is beyond end of file ({len(all_lines)} lines total)'}
output_lines: list[str] = []
output_bytes = 0
truncated = False
truncated_by: str | None = None
next_offset: int | None = None
for index, line in enumerate(all_lines[start_index:], start_index + 1):
if len(output_lines) >= max_lines:
truncated = True
truncated_by = 'lines'
next_offset = index
break
line_bytes = len(line.encode('utf-8')) + (1 if output_lines else 0)
if output_bytes + line_bytes > max_bytes:
truncated = True
truncated_by = 'bytes'
next_offset = index
break
output_lines.append(line)
output_bytes += line_bytes
end_line = offset + len(output_lines) - 1
return {
'ok': True,
'content': '\n'.join(output_lines),
'truncated': truncated,
'truncated_by': truncated_by,
'start_line': offset,
'end_line': end_line,
'next_offset': next_offset,
'max_lines': max_lines,
'max_bytes': max_bytes,
}
@staticmethod
def _positive_int(value, *, default: int, max_value: int | None = None) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
parsed = default
if parsed <= 0:
parsed = default
if max_value is not None:
parsed = min(parsed, max_value)
return parsed
@staticmethod
def _non_negative_int(value, *, default: int) -> int:
try:
parsed = int(value)
except (TypeError, ValueError):
parsed = default
return parsed if parsed >= 0 else default
@staticmethod
def _truncate_grep_line(line: str) -> tuple[str, bool]:
if len(line) <= _GREP_MAX_LINE_CHARS:
return line, False
return f'{line[:_GREP_MAX_LINE_CHARS]}... [truncated]', True
@staticmethod
def _truncate_text_to_bytes(text: str, max_bytes: int) -> str:
return NativeToolLoader._truncate_text_to_bytes_with_flag(text, max_bytes)[0]
@staticmethod
def _truncate_text_to_bytes_with_flag(text: str, max_bytes: int) -> tuple[str, bool]:
data = text.encode('utf-8')
if len(data) <= max_bytes:
return text, False
truncated = data[:max_bytes]
while truncated and (truncated[-1] & 0xC0) == 0x80:
truncated = truncated[:-1]
return truncated.decode('utf-8', errors='ignore'), True
@staticmethod
def _format_size(bytes_count: int) -> str:
if bytes_count < 1024:
return f'{bytes_count}B'
return f'{bytes_count / 1024:.1f}KB'
def _summarize_parameters(self, parameters: dict) -> dict:
summary = dict(parameters)
cmd = str(summary.get('command', '')).strip()
@@ -72,6 +72,45 @@ def register_activated_skill(query: pipeline_query.Query, skill_data: dict) -> N
activated[skill_name] = skill_data
def normalize_skill_names(value: typing.Any) -> list[str]:
"""Return a de-duplicated list of non-empty skill names."""
if not isinstance(value, list):
return []
names: list[str] = []
for item in value:
skill_name = str(item or '').strip()
if skill_name and skill_name not in names:
names.append(skill_name)
return names
def get_activated_skill_names(query: pipeline_query.Query) -> list[str]:
"""Return activated skill names for callers that own persistence policy."""
return normalize_skill_names(list(get_activated_skills(query).keys()))
def restore_activated_skills(
ap: app.Application,
query: pipeline_query.Query,
skill_names: typing.Any,
) -> list[str]:
"""Restore caller-provided activated skill names into Query variables.
Persistence and state scope ownership belong to higher-level flows. This
helper only rebuilds current Query state from pipeline-visible skills, so
removed or unbound skills stay unavailable to native exec/write/edit.
"""
restored: list[str] = []
for skill_name in normalize_skill_names(skill_names):
skill_data = get_visible_skill(ap, query, skill_name)
if skill_data is None:
continue
register_activated_skill(query, skill_data)
restored.append(skill_name)
return restored
def parse_skill_mount_path(sandbox_path: str) -> tuple[str | None, str]:
normalized_path = str(sandbox_path or '/workspace').strip() or '/workspace'
if normalized_path == SKILL_MOUNT_PREFIX:
@@ -6,6 +6,7 @@ import typing
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
from .. import loader
from .availability import is_box_backend_available
# Align with Claude Code's Skill tool design:
# - activate: Activate a skill via Tool Call, returns SKILL.md content
@@ -45,18 +46,7 @@ class SkillToolLoader(loader.ToolLoader):
async def _check_sandbox_available(self) -> bool:
"""Check if the box backend is truly available (not just the runtime)."""
box_service = getattr(self.ap, 'box_service', None)
if box_service is None:
return False
if not getattr(box_service, 'available', False):
return False
# Check if backend is truly available via get_status
try:
status = await box_service.get_status()
backend_info = status.get('backend', {})
return backend_info.get('available', False)
except Exception:
return False
return await is_box_backend_available(self.ap)
async def get_tools(self, bound_plugins: list[str] | None = None) -> list[resource_tool.LLMTool]:
if not self._is_available():
@@ -92,16 +82,15 @@ class SkillToolLoader(loader.ToolLoader):
if not skill_name:
raise ValueError('skill_name is required')
skill_mgr = self.ap.skill_mgr
skill_data = skill_mgr.get_skill_by_name(skill_name)
from . import skill as skill_loader
skill_data = skill_loader.get_visible_skill(self.ap, query, skill_name)
if skill_data is None:
visible_skills = getattr(skill_mgr, 'skills', {})
visible_skills = skill_loader.get_visible_skills(self.ap, query)
available_names = ', '.join(sorted(visible_skills.keys())) or 'none'
raise ValueError(f'Skill "{skill_name}" not found. Available skills: {available_names}')
# Register activated skill for sandbox mount path resolution
from . import skill as skill_loader
skill_loader.register_activated_skill(query, skill_data)
# Return SKILL.md content as Tool Result (injects into context)
@@ -127,6 +116,7 @@ class SkillToolLoader(loader.ToolLoader):
'activated': True,
'skill_name': skill_name,
'mount_path': mount_path,
'activated_skill_names': skill_loader.get_activated_skill_names(query),
'content': result_content,
}
@@ -201,13 +191,13 @@ class SkillToolLoader(loader.ToolLoader):
return resource_tool.LLMTool(
name=ACTIVATE_SKILL_TOOL_NAME,
human_desc='Activate a skill',
description=self._build_activate_tool_description(),
description='Activate a pipeline-visible skill by name and return its instructions as a tool result.',
parameters={
'type': 'object',
'properties': {
'skill_name': {
'type': 'string',
'description': 'The skill name to activate (no arguments). E.g., "pdf" or "data-analysis"',
'description': 'The skill name to activate.',
},
},
'required': ['skill_name'],
@@ -255,50 +245,3 @@ class SkillToolLoader(loader.ToolLoader):
},
func=lambda parameters: parameters,
)
def _build_activate_tool_description(self) -> str:
"""Build tool description with embedded available_skills list."""
skill_mgr = getattr(self.ap, 'skill_mgr', None)
if skill_mgr is None:
return 'Activate a skill. No skills are currently available.'
skills = getattr(skill_mgr, 'skills', {})
if not skills:
return 'Activate a skill. No skills are currently available.'
# Build <available_skills> section
available_skills_lines = ['<available_skills>']
for skill_name, skill_data in sorted(skills.items()):
description = skill_data.get('description', '')
available_skills_lines.append('<skill>')
available_skills_lines.append(f'<name>{skill_name}</name>')
available_skills_lines.append(f'<description>{description}</description>')
available_skills_lines.append('</skill>')
available_skills_lines.append('</available_skills>')
available_skills_block = '\n'.join(available_skills_lines)
return f"""Activate a skill within the main conversation.
<skills_instructions>
When users ask you to perform tasks, check if any of the available skills
below can help complete the task more effectively. Skills provide specialized
capabilities and domain knowledge.
How to use skills:
- Invoke skills using this tool with the skill name only (no arguments)
- When you invoke a skill, you will see <command-message>
The skill is activated
</command-message>
- The skill's instructions will be provided in the tool result
- Examples:
- skill_name: "pdf" - invoke the pdf skill
- skill_name: "data-analysis" - invoke the data-analysis skill
Important:
- Only use skills listed in <available_skills> below
- Do not invoke a skill that is already running
- To create a new skill: prepare it in /workspace, then use register_skill tool
</skills_instructions>
{available_skills_block}"""
+379
View File
@@ -546,6 +546,41 @@ async def test_box_service_rejects_host_mount_outside_allowed_roots(tmp_path):
)
class TestGetSystemGuidance:
"""``get_system_guidance`` must ALWAYS advertise the per-query outbox path
when given a ``query_id`` even with no inbound attachment so files the
agent generates (QR codes, charts, rendered docs) are actually delivered.
The wrapper collects the outbox on every turn regardless of inbound files;
before this, the agent was only told the outbox path inside the
inbound-attachment note, so pure-generation turns produced files that were
silently dropped.
"""
def _service(self, logger=None):
logger = logger or Mock()
runtime = BoxRuntime(logger=logger, backends=[FakeBackend(logger)], session_ttl_sec=300)
return BoxService(make_app(logger), client=_InProcessBoxRuntimeClient(logger, runtime))
def test_guidance_includes_outbox_when_query_id_given(self):
service = self._service()
guidance = service.get_system_guidance(42)
assert f'{service.OUTBOX_MOUNT_DIR}/42' in guidance
assert 'delivered to the user automatically' in guidance
def test_guidance_omits_outbox_without_query_id(self):
service = self._service()
guidance = service.get_system_guidance()
assert service.OUTBOX_MOUNT_DIR not in guidance
# core exec guidance is still present
assert 'exec tool' in guidance
def test_guidance_outbox_independent_of_inbound_attachments(self):
# A bare query_id (the pure-generation case) still gets the outbox note.
service = self._service()
assert f'{service.OUTBOX_MOUNT_DIR}/0' in service.get_system_guidance(0)
@pytest.mark.asyncio
async def test_box_runtime_rejects_host_mount_conflict_in_same_session(tmp_path):
logger = Mock()
@@ -1556,3 +1591,347 @@ class TestBuildSkillExtraMounts:
service = BoxService(app, client=Mock(spec=BoxRuntimeClient))
assert service.build_skill_extra_mounts(make_query()) == []
# ── Attachment passthrough (inbound / outbound) ─────────────────────────────
class TestAttachmentHelpers:
def test_sanitize_attachment_name_strips_traversal(self):
assert BoxService._sanitize_attachment_name('../../etc/passwd', 'fb') == 'passwd'
assert BoxService._sanitize_attachment_name('/a/b/c.png', 'fb') == 'c.png'
assert BoxService._sanitize_attachment_name('a b c.txt', 'fb') == 'a_b_c.txt'
assert BoxService._sanitize_attachment_name('', 'fallback.bin') == 'fallback.bin'
assert BoxService._sanitize_attachment_name('...', 'fb.bin') == 'fb.bin'
# weird unicode / shell chars dropped, but keeps a usable name
out = BoxService._sanitize_attachment_name('rm -rf $(x).png', 'fb')
assert '/' not in out and '$' not in out and out.endswith('.png')
def test_classify_outbound_entries_by_extension(self):
entries = [
{'name': 'chart.png', 'b64': 'AAA'},
{'name': 'clip.mp3', 'b64': 'BBB'},
{'name': 'report.pdf', 'b64': 'CCC'},
{'name': 'sub/dir/photo.JPG', 'b64': 'DDD'},
{'name': 'noext', 'b64': 'EEE'},
{'name': 'skip', 'b64': ''}, # dropped (no payload)
]
out = BoxService._classify_outbound_entries(entries)
by_name = {a['name']: a for a in out}
assert by_name['chart.png']['type'] == 'Image'
assert by_name['chart.png']['base64'].startswith('data:image/png;base64,')
assert by_name['clip.mp3']['type'] == 'Voice'
assert by_name['clip.mp3']['base64'].startswith('data:audio/mp3;base64,')
assert by_name['report.pdf']['type'] == 'File'
assert by_name['report.pdf']['base64'] == 'CCC' # raw b64, no data: prefix
# nested path collapses to basename, case-insensitive ext
assert by_name['photo.JPG']['type'] == 'Image'
assert by_name['noext']['type'] == 'File'
assert 'skip' not in by_name
@pytest.mark.asyncio
async def test_component_to_bytes_from_data_uri(self):
import base64
raw = b'hello-bytes'
data_uri = 'data:text/plain;base64,' + base64.b64encode(raw).decode()
component = SimpleNamespace(base64=data_uri, url=None, path=None)
result = await BoxService._component_to_bytes(component)
assert result is not None
data, mime = result
assert data == raw
assert mime == 'text/plain'
@pytest.mark.asyncio
async def test_component_to_bytes_returns_none_when_empty(self):
component = SimpleNamespace(base64=None, url=None, path=None)
assert await BoxService._component_to_bytes(component) is None
class TestInboundOutboundRoundTrip:
def _service(self) -> BoxService:
service = BoxService(make_app(Mock()), client=Mock(spec=BoxRuntimeClient))
service._available = True
return service
@pytest.mark.asyncio
async def test_materialize_inbound_writes_and_describes(self):
import base64
import langbot_plugin.api.entities.builtin.platform.message as platform_message
service = self._service()
img_bytes = b'\x89PNG\r\n\x1a\n fake png'
img_b64 = 'data:image/png;base64,' + base64.b64encode(img_bytes).decode()
query = make_query()
query.message_chain = platform_message.MessageChain(
[
platform_message.Plain(text='please resize this'),
platform_message.Image(base64=img_b64),
]
)
# Mock the sandbox write path: echo back the written paths.
async def fake_execute_tool(parameters, q):
assert '/workspace/inbox/' in parameters['command']
return {
'ok': True,
'stdout': '["/workspace/inbox/42/image_1.png"]',
'stderr': '',
}
service.execute_tool = AsyncMock(side_effect=fake_execute_tool)
descriptors = await service.materialize_inbound_attachments(query)
assert len(descriptors) == 1
d = descriptors[0]
assert d['type'] == 'Image'
assert d['path'] == '/workspace/inbox/42/image_1.png'
assert d['size'] == len(img_bytes)
@pytest.mark.asyncio
async def test_materialize_inbound_noop_without_attachments(self):
import langbot_plugin.api.entities.builtin.platform.message as platform_message
service = self._service()
query = make_query()
query.message_chain = platform_message.MessageChain([platform_message.Plain(text='just text')])
service.execute_tool = AsyncMock()
assert await service.materialize_inbound_attachments(query) == []
service.execute_tool.assert_not_called()
@pytest.mark.asyncio
async def test_collect_outbound_reads_and_clears(self):
service = self._service()
query = make_query()
calls = []
async def fake_execute_tool(parameters, q):
calls.append(parameters['command'])
if 'os.walk' in parameters['command']:
return {
'ok': True,
'stdout': '[{"name": "out.png", "b64": "QUJD"}]',
'stderr': '',
}
# the rm -rf cleanup call
return {'ok': True, 'stdout': '', 'stderr': ''}
service.execute_tool = AsyncMock(side_effect=fake_execute_tool)
attachments = await service.collect_outbound_attachments(query)
assert len(attachments) == 1
assert attachments[0]['type'] == 'Image'
assert attachments[0]['name'] == 'out.png'
# cleanup (rm -rf) must have been issued after a successful collection
assert any('rm -rf' in c for c in calls)
@pytest.mark.asyncio
async def test_collect_outbound_empty_still_clears(self):
# An empty collection MUST still clear the per-query outbox, so a later
# turn reusing the same query_id (the counter resets across restarts)
# cannot inherit stale files left from a prior run.
service = self._service()
query = make_query()
calls = []
async def fake_execute_tool(parameters, q):
calls.append(parameters['command'])
if 'os.walk' in parameters['command']:
return {'ok': True, 'stdout': '[]', 'stderr': ''}
return {'ok': True, 'stdout': '', 'stderr': ''}
service.execute_tool = AsyncMock(side_effect=fake_execute_tool)
assert await service.collect_outbound_attachments(query) == []
# cleanup (rm -rf) is issued unconditionally now
assert any('rm -rf' in c for c in calls)
@pytest.mark.asyncio
async def test_passthrough_noop_when_unavailable(self):
service = BoxService(make_app(Mock()), client=Mock(spec=BoxRuntimeClient))
service._available = False
query = make_query()
assert await service.materialize_inbound_attachments(query) == []
assert await service.collect_outbound_attachments(query) == []
class TestAttachmentHostPath:
"""Direct host-filesystem transfer path (bind-mounted workspace).
When ``default_workspace`` is a real local dir, inbound/outbound bypass the
exec channel entirely (no ARG_MAX / stdout-truncation limits) and read/write
the bind-mounted host dir directly.
"""
def _service_with_workspace(self, tmp_path):
ws = str(tmp_path / 'box' / 'default')
os.makedirs(ws, exist_ok=True)
app = make_app(Mock(), allowed_mount_roots=[str(tmp_path)], host_root=str(tmp_path / 'box'))
service = BoxService(app, client=Mock(spec=BoxRuntimeClient))
service._available = True
# Force the default_workspace to our tmp dir so _host_query_dir resolves.
service.default_workspace = ws
return service, ws
@pytest.mark.asyncio
async def test_inbound_writes_to_host_no_exec(self, tmp_path):
import base64
import langbot_plugin.api.entities.builtin.platform.message as platform_message
service, ws = self._service_with_workspace(tmp_path)
# Big payload that would blow ARG_MAX on the exec path:
big = b'\x89PNG\r\n\x1a\n' + b'x' * (300 * 1024)
b64 = 'data:image/png;base64,' + base64.b64encode(big).decode()
query = make_query()
query.message_chain = platform_message.MessageChain([platform_message.Image(base64=b64)])
# execute_tool must NOT be called on the host path.
service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used on host path'))
descriptors = await service.materialize_inbound_attachments(query)
assert len(descriptors) == 1
d = descriptors[0]
assert d['type'] == 'Image'
assert d['size'] == len(big)
# File actually landed on the host workspace.
host_file = os.path.join(ws, 'inbox', str(query.query_id), d['name'])
assert os.path.isfile(host_file)
assert open(host_file, 'rb').read() == big
@pytest.mark.asyncio
async def test_inbound_host_clears_stale_query_dir(self, tmp_path):
import base64
import langbot_plugin.api.entities.builtin.platform.message as platform_message
service, ws = self._service_with_workspace(tmp_path)
# Seed a stale file under the same query_id (simulates webchat id reuse).
stale_dir = os.path.join(ws, 'inbox', '42')
os.makedirs(stale_dir, exist_ok=True)
open(os.path.join(stale_dir, 'image_1.png'), 'wb').write(b'STALE-OLD-IMAGE')
new = b'\x89PNG\r\n\x1a\n NEW'
b64 = 'data:image/png;base64,' + base64.b64encode(new).decode()
query = make_query(query_id=42)
query.message_chain = platform_message.MessageChain([platform_message.Image(base64=b64)])
service.execute_tool = AsyncMock()
descriptors = await service.materialize_inbound_attachments(query)
# The new write recreated the dir; the stale file is gone, new bytes present.
host_file = os.path.join(stale_dir, descriptors[0]['name'])
assert open(host_file, 'rb').read() == new
# No leftover content from the stale image.
assert b'STALE-OLD-IMAGE' not in open(host_file, 'rb').read()
@pytest.mark.asyncio
async def test_outbound_reads_host_and_clears(self, tmp_path):
service, ws = self._service_with_workspace(tmp_path)
query = make_query()
outbox = os.path.join(ws, 'outbox', str(query.query_id))
os.makedirs(outbox, exist_ok=True)
# A large file that would be truncated on the exec/stdout path:
big_png = b'\x89PNG\r\n\x1a\n' + b'y' * (400 * 1024)
open(os.path.join(outbox, 'result.png'), 'wb').write(big_png)
open(os.path.join(outbox, 'notes.txt'), 'wb').write(b'hello')
service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used on host path'))
attachments = await service.collect_outbound_attachments(query)
by_name = {a['name']: a for a in attachments}
assert by_name['result.png']['type'] == 'Image'
assert by_name['notes.txt']['type'] == 'File'
# Full image survived (no truncation).
import base64
raw = base64.b64decode(by_name['result.png']['base64'].split(',', 1)[-1])
assert raw == big_png
# Outbox cleared after collection.
assert os.listdir(outbox) == []
@pytest.mark.asyncio
async def test_outbound_empty_clears_stale_host_dir(self, tmp_path):
# Reusing a query_id (counter resets on restart) must not re-send files
# a previous run left in the outbox: an empty collection still clears it.
service, ws = self._service_with_workspace(tmp_path)
query = make_query()
outbox = os.path.join(ws, 'outbox', str(query.query_id))
os.makedirs(outbox, exist_ok=True)
# Stale file from a prior turn; the agent produced nothing this turn —
# but _read_outbox_host would still pick it up, so collection must drop
# it and then wipe the dir. Simulate "nothing produced this turn" by
# treating any present file as stale and asserting it is not re-sent
# across a second, genuinely-empty collection.
open(os.path.join(outbox, 'stale.png'), 'wb').write(b'\x89PNG\r\n\x1a\n old')
service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used on host path'))
# First collection drains + clears the dir.
first = await service.collect_outbound_attachments(query)
assert {a['name'] for a in first} == {'stale.png'}
assert os.listdir(outbox) == []
# Second collection (no new files) returns nothing and leaves a clean dir.
second = await service.collect_outbound_attachments(query)
assert second == []
assert os.listdir(outbox) == []
@pytest.mark.asyncio
async def test_purge_attachment_dirs_wipes_host_owned_leftovers_on_init(self, tmp_path):
# Leftover inbox/outbox dirs from a previous process (same reset
# query_id counter) must be removed at startup. Host-owned files are
# cleared without any sandbox exec.
service, ws = self._service_with_workspace(tmp_path)
for sub in ('inbox', 'outbox'):
d = os.path.join(ws, sub, '0')
os.makedirs(d, exist_ok=True)
open(os.path.join(d, 'leftover.bin'), 'wb').write(b'from a previous process')
service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used for host-owned files'))
await service._purge_attachment_dirs()
assert not os.path.exists(os.path.join(ws, 'inbox'))
assert not os.path.exists(os.path.join(ws, 'outbox'))
# The workspace root itself survives.
assert os.path.isdir(ws)
@pytest.mark.asyncio
async def test_purge_attachment_dirs_falls_back_to_exec_for_root_owned(self, tmp_path, monkeypatch):
# When the host delete cannot remove a dir (root-owned container output),
# purge must fall back to deleting from inside the sandbox via exec.
service, ws = self._service_with_workspace(tmp_path)
outbox = os.path.join(ws, 'outbox')
os.makedirs(os.path.join(outbox, '0'), exist_ok=True)
# Simulate a host delete that cannot remove the root-owned outbox.
import shutil as _shutil
real_rmtree = _shutil.rmtree
def fake_rmtree(path, *a, **k):
if os.path.abspath(path) == os.path.abspath(outbox):
return # "permission denied" — silently leaves the dir
return real_rmtree(path, *a, **k)
monkeypatch.setattr(_shutil, 'rmtree', fake_rmtree)
executed = {}
spec_obj = object()
service.build_spec = Mock(return_value=spec_obj)
service.client.execute = AsyncMock(side_effect=lambda s: executed.setdefault('spec', s))
await service._purge_attachment_dirs()
# build_spec was asked to rm the surviving outbox via exec.
cmd = service.build_spec.call_args.args[0]['cmd']
assert 'rm -rf' in cmd and '/workspace/outbox' in cmd
assert '/workspace/inbox' not in cmd # inbox was host-deletable
service.client.execute.assert_awaited_once_with(spec_obj)
@pytest.mark.asyncio
async def test_purge_attachment_dirs_noop_without_workspace(self):
# No bind-mounted workspace (E2B / remote): purge is a safe no-op.
service = BoxService(make_app(Mock()), client=Mock(spec=BoxRuntimeClient))
service.default_workspace = None
# Must not raise.
await service._purge_attachment_dirs()
+3 -1
View File
@@ -54,7 +54,9 @@ def test_classify_python_workspace_detects_package_and_requirements():
def test_wrap_python_command_with_env_contains_bootstrap_and_command():
command = wrap_python_command_with_env('python script.py')
assert 'python -m venv "$_LB_VENV_DIR"' in command
assert '_LB_SYSTEM_PYTHON="$(command -v python3 || command -v python || true)"' in command
assert '"$_LB_SYSTEM_PYTHON" -m venv "$_LB_VENV_DIR"' in command
assert 'kill -0 "$_LB_LOCK_OWNER"' in command
assert 'export VIRTUAL_ENV="$_LB_VENV_DIR"' in command
assert command.rstrip().endswith('python script.py')
@@ -0,0 +1,146 @@
"""Unit tests for ResponseWrapper outbound-attachment helpers.
Covers the sandbox -> user attachment path added for the Box attachment
round-trip:
* ``_is_final_assistant_message`` only the terminal, tool-call-free assistant
message (or a final MessageChunk) should trigger collection.
* ``_append_outbound_attachments`` collects sandbox outbox files exactly once
per query and maps each descriptor to the right platform component, swallowing
collection errors.
"""
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock
import pytest
import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.provider.message as provider_message
from langbot.pkg.pipeline.wrapper.wrapper import ResponseWrapper
def _make_wrapper(box_service) -> ResponseWrapper:
app = SimpleNamespace(logger=Mock())
wrapper = ResponseWrapper.__new__(ResponseWrapper)
wrapper.ap = app
return wrapper
def _make_query():
return SimpleNamespace(variables={})
def test_is_final_assistant_message_plain_assistant():
wrapper = _make_wrapper(box_service=None)
msg = provider_message.Message(role='assistant', content='done')
assert wrapper._is_final_assistant_message(msg) is True
def test_is_final_assistant_message_rejects_non_assistant():
wrapper = _make_wrapper(box_service=None)
msg = provider_message.Message(role='tool', content='{}')
assert wrapper._is_final_assistant_message(msg) is False
def test_is_final_assistant_message_rejects_tool_call_round():
wrapper = _make_wrapper(box_service=None)
msg = provider_message.Message(
role='assistant',
content='calling',
tool_calls=[
provider_message.ToolCall(
id='c1',
type='function',
function=provider_message.FunctionCall(name='exec', arguments='{}'),
)
],
)
assert wrapper._is_final_assistant_message(msg) is False
def test_is_final_assistant_message_non_final_chunk():
wrapper = _make_wrapper(box_service=None)
chunk = provider_message.MessageChunk(role='assistant', content='partial', is_final=False)
assert wrapper._is_final_assistant_message(chunk) is False
final_chunk = provider_message.MessageChunk(role='assistant', content='partial', is_final=True)
assert wrapper._is_final_assistant_message(final_chunk) is True
@pytest.mark.asyncio
async def test_append_outbound_attachments_maps_each_type():
box_service = SimpleNamespace(
available=True,
collect_outbound_attachments=AsyncMock(
return_value=[
{'type': 'Image', 'base64': 'data:image/png;base64,iVBORw0K'},
{'type': 'Voice', 'base64': 'data:audio/wav;base64,UklGRg=='},
{'type': 'File', 'name': 'report.xlsx', 'base64': 'data:app;base64,UEsDBA=='},
]
),
)
wrapper = _make_wrapper(box_service)
wrapper.ap.box_service = box_service
query = _make_query()
chain = platform_message.MessageChain([])
await wrapper._append_outbound_attachments(query, chain)
kinds = [type(c).__name__ for c in chain]
assert kinds == ['Image', 'Voice', 'File']
assert query.variables['_sandbox_outbound_collected'] is True
# File keeps its name
file_comp = chain[2]
assert getattr(file_comp, 'name', None) == 'report.xlsx'
@pytest.mark.asyncio
async def test_append_outbound_attachments_runs_once_per_query():
box_service = SimpleNamespace(
available=True,
collect_outbound_attachments=AsyncMock(return_value=[]),
)
wrapper = _make_wrapper(box_service)
wrapper.ap.box_service = box_service
query = _make_query()
query.variables['_sandbox_outbound_collected'] = True
chain = platform_message.MessageChain([])
await wrapper._append_outbound_attachments(query, chain)
box_service.collect_outbound_attachments.assert_not_awaited()
assert len(chain) == 0
@pytest.mark.asyncio
async def test_append_outbound_attachments_noop_without_box_service():
wrapper = _make_wrapper(box_service=None)
wrapper.ap.box_service = None
query = _make_query()
chain = platform_message.MessageChain([])
await wrapper._append_outbound_attachments(query, chain)
assert len(chain) == 0
# not marked collected, since service is unavailable
assert '_sandbox_outbound_collected' not in query.variables
@pytest.mark.asyncio
async def test_append_outbound_attachments_swallows_collection_error():
box_service = SimpleNamespace(
available=True,
collect_outbound_attachments=AsyncMock(side_effect=RuntimeError('boom')),
)
wrapper = _make_wrapper(box_service)
wrapper.ap.box_service = box_service
query = _make_query()
chain = platform_message.MessageChain([])
# must not raise
await wrapper._append_outbound_attachments(query, chain)
assert len(chain) == 0
wrapper.ap.logger.warning.assert_called_once()
@@ -0,0 +1,92 @@
"""Unit tests for WebSocketAdapter._process_image_components.
The web debug client uploads Image / Voice / File components carrying a storage
key in ``path``. This helper resolves each to a base64 data URI (so multimodal
LLM input and the Box sandbox inbox have usable bytes), then deletes the
consumed storage object and clears ``path``. Covers mimetype selection per
type and graceful error handling.
"""
from __future__ import annotations
import base64
from unittest.mock import AsyncMock, Mock
import pytest
from langbot.pkg.platform.sources.websocket_adapter import WebSocketAdapter
def _make_adapter(load_return=b'hello', load_side_effect=None):
provider = Mock()
provider.load = AsyncMock(return_value=load_return, side_effect=load_side_effect)
provider.delete = AsyncMock()
ap = Mock()
ap.storage_mgr.storage_provider = provider
logger = Mock()
logger.error = AsyncMock()
# WebSocketAdapter is a pydantic model; bypass full __init__/validation.
adapter = WebSocketAdapter.model_construct(ap=ap, logger=logger)
return adapter, provider
@pytest.mark.asyncio
async def test_image_jpeg_mimetype_and_cleanup():
adapter, provider = _make_adapter(load_return=b'\xff\xd8\xff')
chain = [{'type': 'Image', 'path': 'storage://abc/photo.jpg'}]
await adapter._process_image_components(chain)
expected_b64 = base64.b64encode(b'\xff\xd8\xff').decode('utf-8')
assert chain[0]['base64'] == f'data:image/jpeg;base64,{expected_b64}'
assert chain[0]['path'] == '' # consumed
provider.delete.assert_awaited_once_with('storage://abc/photo.jpg')
@pytest.mark.asyncio
async def test_image_defaults_to_png():
adapter, _ = _make_adapter()
chain = [{'type': 'Image', 'path': 'storage://abc/blob'}]
await adapter._process_image_components(chain)
assert chain[0]['base64'].startswith('data:image/png;base64,')
@pytest.mark.asyncio
async def test_voice_uses_guessed_or_wav_mimetype():
adapter, _ = _make_adapter()
chain = [{'type': 'Voice', 'path': 'storage://abc/clip.wav'}]
await adapter._process_image_components(chain)
assert chain[0]['base64'].startswith('data:audio/')
@pytest.mark.asyncio
async def test_file_uses_octet_stream_fallback():
adapter, _ = _make_adapter()
chain = [{'type': 'File', 'path': 'storage://abc/unknownblob'}]
await adapter._process_image_components(chain)
assert chain[0]['base64'].startswith('data:application/octet-stream;base64,')
@pytest.mark.asyncio
async def test_skips_components_without_path_or_unknown_type():
adapter, provider = _make_adapter()
chain = [
{'type': 'Image', 'path': ''}, # no path
{'type': 'Plain', 'path': 'storage://abc/x'}, # not a file component
{'type': 'At', 'target': '123'}, # no path key at all
]
await adapter._process_image_components(chain)
provider.load.assert_not_awaited()
assert 'base64' not in chain[0]
assert 'base64' not in chain[1]
@pytest.mark.asyncio
async def test_load_failure_is_logged_not_raised():
adapter, _ = _make_adapter(load_side_effect=RuntimeError('storage down'))
chain = [{'type': 'File', 'path': 'storage://abc/doc.pdf'}]
# must not raise
await adapter._process_image_components(chain)
assert 'base64' not in chain[0]
adapter.logger.error.assert_awaited_once()
@@ -0,0 +1,93 @@
"""Unit tests for LiteLLMRequester._convert_messages.
Focus: the content-part normalization that (a) converts image_base64 parts to
the OpenAI image_url shape and (b) drops non-image file parts (file_base64 /
file_url) which OpenAI-compatible chat models reject. The latter is essential
for Voice/File attachments including ones replayed from conversation history
since the agent consumes their bytes via the sandbox, not the model payload.
"""
import langbot_plugin.api.entities.builtin.provider.message as provider_message
from langbot.pkg.provider.modelmgr.requesters.litellmchat import LiteLLMRequester
def _make_requester() -> LiteLLMRequester:
# _convert_messages does not touch instance config, so bypass __init__.
return LiteLLMRequester.__new__(LiteLLMRequester)
def test_convert_messages_drops_file_base64_part():
req = _make_requester()
msg = provider_message.Message(
role='user',
content=[
provider_message.ContentElement.from_text('analyze this audio'),
provider_message.ContentElement.from_file_base64('data:audio/wav;base64,AAAA', 'voice.wav'),
],
)
out = req._convert_messages([msg])
parts = out[0]['content']
types = [p.get('type') for p in parts]
assert 'file_base64' not in types
assert types == ['text']
assert parts[0]['text'] == 'analyze this audio'
def test_convert_messages_drops_file_url_part():
req = _make_requester()
msg = provider_message.Message(
role='user',
content=[
provider_message.ContentElement.from_text('here is a doc'),
provider_message.ContentElement.from_file_url('http://example.com/report.xlsx', 'report.xlsx'),
],
)
out = req._convert_messages([msg])
types = [p.get('type') for p in out[0]['content']]
assert types == ['text']
def test_convert_messages_keeps_image_and_converts_to_image_url():
req = _make_requester()
msg = provider_message.Message(
role='user',
content=[
provider_message.ContentElement.from_text('look'),
provider_message.ContentElement.from_image_base64('data:image/png;base64,AAAA'),
],
)
out = req._convert_messages([msg])
parts = out[0]['content']
types = [p.get('type') for p in parts]
# image is preserved and reshaped to the OpenAI image_url form
assert types == ['text', 'image_url']
img_part = parts[1]
assert img_part['image_url'] == {'url': 'data:image/png;base64,AAAA'}
assert 'image_base64' not in img_part
def test_convert_messages_mixed_history_strips_only_files():
req = _make_requester()
# Simulate replayed history: an old voice turn + a current text turn.
history_voice = provider_message.Message(
role='user',
content=[
provider_message.ContentElement.from_text('old audio turn'),
provider_message.ContentElement.from_file_base64('data:audio/wav;base64,BBBB', 'voice.wav'),
],
)
current = provider_message.Message(
role='user',
content=[provider_message.ContentElement.from_text('now do the csv')],
)
out = req._convert_messages([history_voice, current])
assert [p.get('type') for p in out[0]['content']] == ['text']
assert [p.get('type') for p in out[1]['content']] == ['text']
def test_convert_messages_plain_string_content_untouched():
req = _make_requester()
msg = provider_message.Message(role='user', content='just text')
out = req._convert_messages([msg])
assert out[0]['content'] == 'just text'
@@ -352,6 +352,117 @@ class TestInvokeLLMStreamUsage:
assert tool_chunks[1].tool_calls[0].function.arguments == '{"text":'
assert tool_chunks[2].tool_calls[0].function.arguments == '"plugin-tool-ok"}'
@pytest.mark.asyncio
async def test_stream_tool_call_without_id_is_not_dropped(self):
"""Regression for #2261.
Ollama's OpenAI-compatible streaming endpoint emits a tool-call delta
carrying an ``index`` and a ``function`` payload but never an
OpenAI-style ``id``. The requester used to drop any id-less tool call,
so a tool-only turn yielded nothing, the stream "completed" with 0
chars, and the chat got stuck. A stable per-index id must be
synthesized so the tool call survives.
"""
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
mock_ap = Mock()
mock_ap.tool_mgr = Mock()
mock_ap.tool_mgr.generate_tools_for_openai = AsyncMock(
return_value=[{'type': 'function', 'function': {'name': 'zotero_search_items'}}]
)
requester = litellmchat.LiteLLMRequester(ap=mock_ap, config={'custom_llm_provider': 'openai'})
model = MockRuntimeModel('gpt-oss:20b', 'ollama')
# Ollama delivers the whole tool call in a single delta, with no id.
chunks = [
self._make_chunk(
tool_calls=[
{
'index': 0,
'function': {'name': 'zotero_search_items', 'arguments': '{"query":"hello"}'},
}
]
),
self._make_chunk(finish_reason='tool_calls'),
]
async def _aiter(*args, **kwargs):
for c in chunks:
yield c
query = Mock(spec=pipeline_query.Query)
query.variables = {}
messages = [provider_message.Message(role='user', content='hello?')]
funcs = [Mock()]
with patch.object(litellmchat, 'acompletion', new=AsyncMock(side_effect=lambda **kw: _aiter())):
collected = [
chunk
async for chunk in requester.invoke_llm_stream(
query=query,
model=model,
messages=messages,
funcs=funcs,
)
]
tool_chunks = [chunk for chunk in collected if chunk.tool_calls]
assert len(tool_chunks) == 1, 'id-less Ollama tool call must not be dropped'
tc = tool_chunks[0].tool_calls[0]
assert tc.id == 'call_0'
assert tc.function.name == 'zotero_search_items'
assert tc.function.arguments == '{"query":"hello"}'
@pytest.mark.asyncio
async def test_stream_multiple_tool_calls_without_id_get_distinct_ids(self):
"""Two parallel id-less tool calls must keep distinct synthesized ids."""
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
mock_ap = Mock()
mock_ap.tool_mgr = Mock()
mock_ap.tool_mgr.generate_tools_for_openai = AsyncMock(
return_value=[{'type': 'function', 'function': {'name': 'zotero_search_items'}}]
)
requester = litellmchat.LiteLLMRequester(ap=mock_ap, config={'custom_llm_provider': 'openai'})
model = MockRuntimeModel('gpt-oss:20b', 'ollama')
chunks = [
self._make_chunk(
tool_calls=[
{'index': 0, 'function': {'name': 'zotero_search_items', 'arguments': '{"q":"a"}'}},
{'index': 1, 'function': {'name': 'zotero_get_notes', 'arguments': '{"q":"b"}'}},
]
),
self._make_chunk(finish_reason='tool_calls'),
]
async def _aiter(*args, **kwargs):
for c in chunks:
yield c
query = Mock(spec=pipeline_query.Query)
query.variables = {}
messages = [provider_message.Message(role='user', content='hello?')]
funcs = [Mock()]
with patch.object(litellmchat, 'acompletion', new=AsyncMock(side_effect=lambda **kw: _aiter())):
collected = [
chunk
async for chunk in requester.invoke_llm_stream(
query=query,
model=model,
messages=messages,
funcs=funcs,
)
]
tool_chunks = [chunk for chunk in collected if chunk.tool_calls]
assert len(tool_chunks) == 1
ids = {tc.id for tc in tool_chunks[0].tool_calls}
assert ids == {'call_0', 'call_1'}
class TestProcessThinkingContent:
"""Test _process_thinking_content method"""
@@ -0,0 +1,146 @@
"""Unit tests for LocalAgentRunner._inject_inbound_attachments.
Covers the user -> sandbox attachment path added for the Box attachment
round-trip:
* materialized descriptors are stashed on the query and described to the model
via an appended text note (in-sandbox paths + outbox convention);
* non-image file parts (file_base64 / file_url) are stripped from the user
message content because OpenAI-compatible chat models reject them, while
image and text parts are kept for vision models;
* the helper is a no-op when the box service is unavailable or yields nothing,
and never raises into the chat turn on materialization failure.
"""
from __future__ import annotations
from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock
import pytest
import langbot_plugin.api.entities.builtin.provider.message as provider_message
from langbot.pkg.provider.runners.localagent import LocalAgentRunner
def _make_runner(box_service) -> LocalAgentRunner:
runner = LocalAgentRunner.__new__(LocalAgentRunner)
runner.ap = SimpleNamespace(logger=Mock(), box_service=box_service)
return runner
def _make_query():
return SimpleNamespace(variables={}, query_id='q-123')
def _box_service(attachments):
svc = SimpleNamespace(
available=True,
OUTBOX_MOUNT_DIR='/outbox',
materialize_inbound_attachments=AsyncMock(return_value=attachments),
)
return svc
@pytest.mark.asyncio
async def test_inject_strips_file_parts_and_appends_note():
box = _box_service([{'type': 'Voice', 'path': '/inbox/q-123/voice.wav', 'size': 176000}])
runner = _make_runner(box)
query = _make_query()
user_message = provider_message.Message(
role='user',
content=[
provider_message.ContentElement.from_text('transcribe this'),
provider_message.ContentElement.from_file_base64('data:audio/wav;base64,AAAA', 'voice.wav'),
],
)
await runner._inject_inbound_attachments(query, user_message)
types = [getattr(ce, 'type', None) for ce in user_message.content]
# file_base64 dropped; text kept; sandbox-path note appended as text
assert 'file_base64' not in types
assert types.count('text') == 2
note = user_message.content[-1].text
assert '/inbox/q-123/voice.wav' in note
assert '/outbox/q-123' in note
# descriptors stashed for downstream stages
assert query.variables['_sandbox_inbound_attachments'] == box.materialize_inbound_attachments.return_value
@pytest.mark.asyncio
async def test_inject_keeps_image_parts():
box = _box_service([{'type': 'Image', 'path': '/inbox/q-123/pic.png', 'size': 1234}])
runner = _make_runner(box)
query = _make_query()
user_message = provider_message.Message(
role='user',
content=[
provider_message.ContentElement.from_text('what is this'),
provider_message.ContentElement.from_image_base64('data:image/png;base64,iVBORw0K'),
],
)
await runner._inject_inbound_attachments(query, user_message)
types = [getattr(ce, 'type', None) for ce in user_message.content]
assert 'image_base64' in types # vision part preserved
assert types[-1] == 'text' # note appended last
@pytest.mark.asyncio
async def test_inject_promotes_string_content_to_list_with_note():
box = _box_service([{'type': 'File', 'path': '/inbox/q-123/data.csv', 'size': 42}])
runner = _make_runner(box)
query = _make_query()
user_message = provider_message.Message(role='user', content='clean this csv')
await runner._inject_inbound_attachments(query, user_message)
assert isinstance(user_message.content, list)
assert [getattr(ce, 'type', None) for ce in user_message.content] == ['text', 'text']
assert user_message.content[0].text == 'clean this csv'
assert '/inbox/q-123/data.csv' in user_message.content[1].text
@pytest.mark.asyncio
async def test_inject_noop_without_box_service():
runner = _make_runner(box_service=None)
query = _make_query()
user_message = provider_message.Message(role='user', content='hello')
await runner._inject_inbound_attachments(query, user_message)
assert user_message.content == 'hello'
assert '_sandbox_inbound_attachments' not in query.variables
@pytest.mark.asyncio
async def test_inject_noop_when_no_attachments():
box = _box_service([])
runner = _make_runner(box)
query = _make_query()
user_message = provider_message.Message(role='user', content='hello')
await runner._inject_inbound_attachments(query, user_message)
assert user_message.content == 'hello'
assert '_sandbox_inbound_attachments' not in query.variables
@pytest.mark.asyncio
async def test_inject_swallows_materialization_error():
box = SimpleNamespace(
available=True,
OUTBOX_MOUNT_DIR='/outbox',
materialize_inbound_attachments=AsyncMock(side_effect=RuntimeError('disk full')),
)
runner = _make_runner(box)
query = _make_query()
user_message = provider_message.Message(role='user', content='hello')
# must not raise
await runner._inject_inbound_attachments(query, user_message)
assert user_message.content == 'hello'
runner.ap.logger.warning.assert_called_once()
@@ -180,7 +180,7 @@ class TestMCPServerBoxConfig:
assert cfg.host_path is None
assert cfg.host_path_mode == 'ro'
assert cfg.env == {}
assert cfg.startup_timeout_sec == 120
assert cfg.startup_timeout_sec == 300
assert cfg.cpus is None
assert cfg.memory_mb is None
assert cfg.pids_limit is None
@@ -494,6 +494,84 @@ class TestBuildBoxProcessPayload:
assert payload['args'] == ['/opt/other/server.py', '--flag']
# ── Python Workspace Preparation ────────────────────────────────────
class TestPythonWorkspacePreparation:
def test_requirements_workspace_uses_venv_bootstrap(self, mcp_module, tmp_path):
host_path = tmp_path / 'mcp-source'
host_path.mkdir()
(host_path / 'requirements.txt').write_text('mcp==1.26.0\n', encoding='utf-8')
command = mcp_module.BoxStdioSessionRuntime.detect_install_command(
str(host_path),
'/workspace/.mcp/u1/workspace',
)
assert command is not None
assert '_LB_SYSTEM_PYTHON="$(command -v python3 || command -v python || true)"' in command
assert '"$_LB_SYSTEM_PYTHON" -m venv "$_LB_VENV_DIR"' in command
assert 'python -m pip install -r "/workspace/.mcp/u1/workspace/requirements.txt"' in command
assert 'pip install --no-cache-dir -r' not in command
def test_staging_refresh_removes_stale_source_files_but_preserves_runtime_dirs(self, mcp_module, tmp_path):
source = tmp_path / 'source'
source.mkdir()
(source / 'server.py').write_text('print("new")\n', encoding='utf-8')
(source / 'requirements.txt').write_text('mcp==1.26.0\n', encoding='utf-8')
(source / '.env').write_text('TOKEN=new\n', encoding='utf-8')
process_root = tmp_path / 'shared' / '.mcp' / 'u1'
workspace = process_root / 'workspace'
(workspace / '.venv' / 'bin').mkdir(parents=True)
(workspace / '.venv' / 'bin' / 'python').write_text('', encoding='utf-8')
(workspace / '.langbot').mkdir()
(workspace / '.langbot' / 'python-env.lock').mkdir()
(workspace / '.env').write_text('TOKEN=old\n', encoding='utf-8')
(workspace / 'server.py').write_text('print("old")\n', encoding='utf-8')
(workspace / 'removed.py').write_text('stale\n', encoding='utf-8')
(workspace / 'removed_dir').mkdir()
(workspace / 'removed_dir' / 'old.txt').write_text('stale\n', encoding='utf-8')
mcp_module.BoxStdioSessionRuntime._copy_workspace_tree(str(source), str(process_root), str(workspace))
assert (workspace / 'server.py').read_text(encoding='utf-8') == 'print("new")\n'
assert (workspace / 'requirements.txt').read_text(encoding='utf-8') == 'mcp==1.26.0\n'
assert (workspace / '.env').read_text(encoding='utf-8') == 'TOKEN=new\n'
assert not (workspace / 'removed.py').exists()
assert not (workspace / 'removed_dir').exists()
assert (workspace / '.venv' / 'bin' / 'python').exists()
assert (workspace / '.langbot' / 'python-env.lock').is_dir()
def test_staging_refresh_ignores_unlink_race(self, mcp_module, tmp_path, monkeypatch):
mcp_stdio_module = sys.modules['langbot.pkg.provider.tools.loaders.mcp_stdio']
source = tmp_path / 'source'
source.mkdir()
(source / 'server.py').write_text('print("new")\n', encoding='utf-8')
process_root = tmp_path / 'shared' / '.mcp' / 'u1'
workspace = process_root / 'workspace'
workspace.mkdir(parents=True)
stale_file = workspace / 'removed.py'
stale_file.write_text('stale\n', encoding='utf-8')
real_unlink = os.unlink
def unlink_with_race(path):
if os.fspath(path) == str(stale_file):
real_unlink(path)
raise FileNotFoundError(path)
real_unlink(path)
monkeypatch.setattr(mcp_stdio_module.os, 'unlink', unlink_with_race)
mcp_module.BoxStdioSessionRuntime._copy_workspace_tree(str(source), str(process_root), str(workspace))
assert not stale_file.exists()
assert (workspace / 'server.py').read_text(encoding='utf-8') == 'print("new")\n'
# ── get_runtime_info_dict ───────────────────────────────────────────
@@ -1,5 +1,6 @@
from __future__ import annotations
import asyncio
from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock
@@ -88,6 +89,28 @@ def test_token_manager_next_token_ignores_empty_token_list():
assert token_mgr.using_token_index == 0
@pytest.mark.asyncio
async def test_model_manager_initialize_skips_space_sync_after_timeout():
ap = SimpleNamespace()
ap.discover = SimpleNamespace(get_components_by_kind=Mock(return_value=[]))
ap.instance_config = SimpleNamespace(data={'space': {'models_sync_timeout': 0.01}})
ap.logger = Mock()
mgr = ModelManager(ap)
mgr.load_models_from_db = AsyncMock()
async def slow_sync():
await asyncio.sleep(1)
mgr.sync_new_models_from_space = AsyncMock(side_effect=slow_sync)
await mgr.initialize()
mgr.load_models_from_db.assert_awaited_once()
mgr.sync_new_models_from_space.assert_awaited_once()
ap.logger.warning.assert_any_call('LangBot Space model sync timed out after 0.01s, skipping startup sync.')
@pytest.mark.asyncio
async def test_updated_llm_model_is_immediately_usable_by_local_agent_pipeline():
from langbot.pkg.api.http.service.model import LLMModelsService
@@ -0,0 +1,172 @@
"""Unit tests for provider_specific_fields round-trip in LiteLLMRequester.
This tests the fix for GitHub issue #1899: Gemini requires thought_signature
to be preserved across tool call rounds for function calls to work correctly.
"""
import langbot_plugin.api.entities.builtin.provider.message as provider_message
from langbot.pkg.provider.modelmgr.requesters.litellmchat import LiteLLMRequester
def _make_requester() -> LiteLLMRequester:
# _convert_messages and _normalize_stream_tool_calls do not touch instance config.
return LiteLLMRequester.__new__(LiteLLMRequester)
def test_convert_messages_preserves_tool_call_provider_specific_fields():
"""Tool calls should retain provider_specific_fields through _convert_messages."""
req = _make_requester()
msg = provider_message.Message(
role='assistant',
content=None,
tool_calls=[
provider_message.ToolCall(
id='call_123',
type='function',
function=provider_message.FunctionCall(
name='search',
arguments='{"query": "test"}',
),
provider_specific_fields={
'thought_signature': 'c2tpcF90aG91Z2h0X3NpZ25hdHVyZQ==',
},
),
],
)
out = req._convert_messages([msg])
assert len(out) == 1
assert out[0]['tool_calls'] is not None
assert len(out[0]['tool_calls']) == 1
tc = out[0]['tool_calls'][0]
assert tc['id'] == 'call_123'
assert tc['function']['name'] == 'search'
assert 'provider_specific_fields' in tc
assert tc['provider_specific_fields']['thought_signature'] == 'c2tpcF90aG91Z2h0X3NpZ25hdHVyZQ=='
def test_convert_messages_preserves_message_provider_specific_fields():
"""Messages should retain provider_specific_fields through _convert_messages."""
req = _make_requester()
msg = provider_message.Message(
role='assistant',
content='Hello',
provider_specific_fields={
'thought_signatures': ['sig1', 'sig2'],
},
)
out = req._convert_messages([msg])
assert len(out) == 1
assert 'provider_specific_fields' in out[0]
assert out[0]['provider_specific_fields']['thought_signatures'] == ['sig1', 'sig2']
def test_normalize_stream_tool_calls_preserves_provider_specific_fields():
"""Streaming tool calls should retain provider_specific_fields."""
req = _make_requester()
tool_call_state: dict[int, dict] = {}
# Simulate first chunk with id and type
raw_tool_calls_1 = [
{
'index': 0,
'id': 'call_abc',
'type': 'function',
'function': {
'name': 'get_weather',
'arguments': '',
},
'provider_specific_fields': {
'thought_signature': 'dGVzdF9zaWduYXR1cmU=',
},
},
]
result_1 = req._normalize_stream_tool_calls(raw_tool_calls_1, tool_call_state)
assert result_1 is not None
assert len(result_1) == 1
assert result_1[0]['provider_specific_fields']['thought_signature'] == 'dGVzdF9zaWduYXR1cmU='
# Simulate second chunk without provider_specific_fields (should be retained from state)
raw_tool_calls_2 = [
{
'index': 0,
'function': {
'arguments': '{"city": "Tokyo"}',
},
},
]
result_2 = req._normalize_stream_tool_calls(raw_tool_calls_2, tool_call_state)
assert result_2 is not None
assert len(result_2) == 1
# Should retain the provider_specific_fields from the first chunk
assert result_2[0]['provider_specific_fields']['thought_signature'] == 'dGVzdF9zaWduYXR1cmU='
assert result_2[0]['function']['arguments'] == '{"city": "Tokyo"}'
def test_normalize_stream_tool_calls_merges_function_level_psf():
"""Function-level provider_specific_fields should be merged into tool-level."""
req = _make_requester()
tool_call_state: dict[int, dict] = {}
raw_tool_calls = [
{
'index': 0,
'id': 'call_xyz',
'type': 'function',
'function': {
'name': 'search',
'arguments': '{}',
'provider_specific_fields': {
'thought_signature': 'ZnVuY19sZXZlbF9zaWc=',
},
},
},
]
result = req._normalize_stream_tool_calls(raw_tool_calls, tool_call_state)
assert result is not None
assert result[0]['provider_specific_fields']['thought_signature'] == 'ZnVuY19sZXZlbF9zaWc='
def test_tool_call_roundtrip_through_message_entity():
"""Full round-trip: LiteLLM response dict -> Message entity -> _convert_messages."""
# Simulate what LiteLLM returns for a Gemini tool call response
message_data = {
'role': 'assistant',
'content': None,
'tool_calls': [
{
'id': 'call_gemini_123',
'type': 'function',
'function': {
'name': 'web_search',
'arguments': '{"query": "test"}',
},
'provider_specific_fields': {
'thought_signature': 'Z2VtaW5pX3NpZ25hdHVyZQ==',
},
},
],
'provider_specific_fields': {
'thought_signatures': ['Z2VtaW5pX3NpZ25hdHVyZQ=='],
},
}
# Parse into Message entity (this is what invoke_llm does)
msg = provider_message.Message(**message_data)
# Verify the entity has the fields
assert msg.tool_calls is not None
assert len(msg.tool_calls) == 1
assert msg.tool_calls[0].provider_specific_fields is not None
assert msg.tool_calls[0].provider_specific_fields['thought_signature'] == 'Z2VtaW5pX3NpZ25hdHVyZQ=='
assert msg.provider_specific_fields is not None
assert msg.provider_specific_fields['thought_signatures'] == ['Z2VtaW5pX3NpZ25hdHVyZQ==']
# Convert back to dict for LiteLLM (this is what _convert_messages does)
req = _make_requester()
out = req._convert_messages([msg])
# Verify the fields are preserved in the output
assert out[0]['tool_calls'][0]['provider_specific_fields']['thought_signature'] == 'Z2VtaW5pX3NpZ25hdHVyZQ=='
assert out[0]['provider_specific_fields']['thought_signatures'] == ['Z2VtaW5pX3NpZ25hdHVyZQ==']
+30 -3
View File
@@ -193,6 +193,29 @@ class TestSkillPathHelpers:
assert list(result.keys()) == ['visible']
def test_restore_activated_skills_uses_caller_provided_names_and_visibility(self):
from langbot.pkg.provider.tools.loaders.skill import (
ACTIVATED_SKILLS_KEY,
PIPELINE_BOUND_SKILLS_KEY,
get_activated_skill_names,
restore_activated_skills,
)
ap = _make_ap()
ap.skill_mgr = SimpleNamespace(
skills={
'visible': _make_skill_data(name='visible'),
'hidden': _make_skill_data(name='hidden'),
}
)
query = SimpleNamespace(variables={PIPELINE_BOUND_SKILLS_KEY: ['visible']})
restored = restore_activated_skills(ap, query, ['visible', 'hidden', 'visible', ''])
assert restored == ['visible']
assert list(query.variables[ACTIVATED_SKILLS_KEY].keys()) == ['visible']
assert get_activated_skill_names(query) == ['visible']
def test_resolve_virtual_skill_path_allows_visible_skill_reads(self):
from langbot.pkg.provider.tools.loaders.skill import (
PIPELINE_BOUND_SKILLS_KEY,
@@ -245,7 +268,8 @@ class TestSkillPathHelpers:
command = wrap_skill_command_with_python_env('python scripts/run.py')
assert 'python -m venv "$_LB_VENV_DIR"' in command
assert '_LB_SYSTEM_PYTHON="$(command -v python3 || command -v python || true)"' in command
assert '"$_LB_SYSTEM_PYTHON" -m venv "$_LB_VENV_DIR"' in command
assert 'export VIRTUAL_ENV="$_LB_VENV_DIR"' in command
assert command.rstrip().endswith('python scripts/run.py')
@@ -281,6 +305,7 @@ class TestSkillToolLoader:
assert result['activated'] is True
assert result['skill_name'] == 'demo'
assert result['mount_path'] == '/workspace/.skills/demo'
assert result['activated_skill_names'] == ['demo']
assert 'Step 1' in result['content']
assert set(query.variables[ACTIVATED_SKILLS_KEY].keys()) == {'demo'}
@@ -456,7 +481,9 @@ class TestNativeToolLoaderSkillPaths:
SimpleNamespace(query_id='q1', variables={PIPELINE_BOUND_SKILLS_KEY: ['demo']}),
)
assert result == {'ok': True, 'content': 'demo instructions'}
assert result['ok'] is True
assert result['content'] == 'demo instructions'
assert result['truncated'] is False
@pytest.mark.asyncio
async def test_exec_in_activated_skill_mount_rewrites_command_and_refreshes(self):
@@ -485,7 +512,7 @@ class TestNativeToolLoaderSkillPaths:
query,
)
assert result == {'ok': True}
assert result['ok'] is True
tool_parameters = ap.box_service.execute_tool.await_args.args[0]
assert tool_parameters['command'] == 'python /workspace/.skills/demo/scripts/run.py'
assert tool_parameters['workdir'] == '/workspace/.skills/demo'
@@ -1,5 +1,6 @@
from __future__ import annotations
import base64
import os
import tempfile
from types import SimpleNamespace
@@ -189,6 +190,78 @@ async def test_write_creates_subdirectories():
assert f.read() == 'nested'
@pytest.mark.asyncio
async def test_read_binary_file_as_base64_chunk():
with tempfile.TemporaryDirectory() as tmpdir:
loader, _ = _make_loader_with_workspace(tmpdir)
with open(os.path.join(tmpdir, 'blob.bin'), 'wb') as f:
f.write(b'\x00\x01\x02\x03\x04')
result = await loader.invoke_tool(
'read',
{
'path': '/workspace/blob.bin',
'encoding': 'base64',
'byte_offset': 1,
'max_bytes': 2,
},
_make_query(),
)
assert result['ok'] is True
assert result['content'] == base64.b64encode(b'\x01\x02').decode('ascii')
assert result['encoding'] == 'base64'
assert result['byte_offset'] == 1
assert result['length'] == 2
assert result['size_bytes'] == 5
assert result['has_more'] is True
assert result['next_byte_offset'] == 3
@pytest.mark.asyncio
async def test_write_base64_file_append():
with tempfile.TemporaryDirectory() as tmpdir:
loader, _ = _make_loader_with_workspace(tmpdir)
first = base64.b64encode(b'\x00\x01').decode('ascii')
second = base64.b64encode(b'\x02\x03').decode('ascii')
await loader.invoke_tool(
'write',
{'path': '/workspace/blob.bin', 'content': first, 'encoding': 'base64'},
_make_query(),
)
result = await loader.invoke_tool(
'write',
{
'path': '/workspace/blob.bin',
'content': second,
'encoding': 'base64',
'mode': 'append',
},
_make_query(),
)
assert result['ok'] is True
with open(os.path.join(tmpdir, 'blob.bin'), 'rb') as f:
assert f.read() == b'\x00\x01\x02\x03'
@pytest.mark.asyncio
async def test_write_base64_rejects_invalid_content():
with tempfile.TemporaryDirectory() as tmpdir:
loader, _ = _make_loader_with_workspace(tmpdir)
result = await loader.invoke_tool(
'write',
{'path': '/workspace/blob.bin', 'content': 'not base64!', 'encoding': 'base64'},
_make_query(),
)
assert result['ok'] is False
assert 'invalid base64' in result['error']
assert not os.path.exists(os.path.join(tmpdir, 'blob.bin'))
@pytest.mark.asyncio
async def test_edit_replaces_unique_string():
with tempfile.TemporaryDirectory() as tmpdir:
@@ -248,3 +321,135 @@ async def test_path_escape_blocked():
with pytest.raises(ValueError, match='escapes'):
await loader.invoke_tool('read', {'path': '/workspace/../../etc/passwd'}, _make_query())
@pytest.mark.asyncio
async def test_box_availability_helper_handles_unavailable_and_errors():
from langbot.pkg.provider.tools.loaders.availability import is_box_backend_available
assert await is_box_backend_available(SimpleNamespace()) is False
assert await is_box_backend_available(SimpleNamespace(box_service=SimpleNamespace(available=False))) is False
unavailable_backend = SimpleNamespace(
available=True,
get_status=AsyncMock(return_value={'backend': {'available': False}}),
)
assert await is_box_backend_available(SimpleNamespace(box_service=unavailable_backend)) is False
failing_backend = SimpleNamespace(
available=True,
get_status=AsyncMock(side_effect=RuntimeError('box unavailable')),
)
assert await is_box_backend_available(SimpleNamespace(box_service=failing_backend)) is False
@pytest.mark.asyncio
async def test_read_file_supports_offset_limit_and_truncation_metadata():
with tempfile.TemporaryDirectory() as tmpdir:
loader, _ = _make_loader_with_workspace(tmpdir)
with open(os.path.join(tmpdir, 'lines.txt'), 'w', encoding='utf-8') as f:
f.write('one\ntwo\nthree\nfour\n')
result = await loader.invoke_tool(
'read',
{'path': '/workspace/lines.txt', 'offset': 2, 'limit': 2},
_make_query(),
)
assert result == {
'ok': True,
'content': 'two\nthree',
'truncated': True,
'truncated_by': 'lines',
'start_line': 2,
'end_line': 3,
'next_offset': 4,
'max_lines': 2,
'max_bytes': 50 * 1024,
}
@pytest.mark.asyncio
async def test_read_file_handles_line_larger_than_byte_limit():
with tempfile.TemporaryDirectory() as tmpdir:
loader, _ = _make_loader_with_workspace(tmpdir)
with open(os.path.join(tmpdir, 'long-line.txt'), 'w', encoding='utf-8') as f:
f.write('abcdef\n')
result = await loader.invoke_tool(
'read',
{'path': '/workspace/long-line.txt', 'max_bytes': 3},
_make_query(),
)
assert result['ok'] is True
assert result['truncated'] is True
assert result['truncated_by'] == 'bytes'
assert result['next_offset'] == 1
assert 'exceeds the 3B read limit' in result['content']
@pytest.mark.asyncio
async def test_exec_result_is_capped_and_exposes_preview_metadata():
with tempfile.TemporaryDirectory() as tmpdir:
box_service = SimpleNamespace(
available=True,
default_workspace=tmpdir,
execute_tool=AsyncMock(
return_value={
'ok': True,
'stdout': 'a' * 60000,
'stderr': 'b' * 60000,
'exit_code': 0,
}
),
)
loader = NativeToolLoader(SimpleNamespace(box_service=box_service, logger=Mock()))
result = await loader.invoke_tool('exec', {'command': 'python -V'}, _make_query())
assert result['ok'] is True
assert len(result['stdout'].encode('utf-8')) == 50 * 1024
assert len(result['stderr'].encode('utf-8')) == 50 * 1024
assert len(result['preview'].encode('utf-8')) == 50 * 1024
assert result['stdout_truncated'] is True
assert result['stderr_truncated'] is True
assert result['truncated'] is True
assert result['truncated_by'] == 'bytes'
@pytest.mark.asyncio
async def test_glob_caps_match_count_and_returns_preview():
with tempfile.TemporaryDirectory() as tmpdir:
loader, _ = _make_loader_with_workspace(tmpdir)
for index in range(105):
with open(os.path.join(tmpdir, f'file-{index:03d}.txt'), 'w', encoding='utf-8') as f:
f.write(str(index))
result = await loader.invoke_tool('glob', {'path': '/workspace', 'pattern': '*.txt'}, _make_query())
assert result['ok'] is True
assert result['total'] == 105
assert len(result['matches']) == 100
assert result['preview'] == '\n'.join(result['matches'])
assert result['truncated'] is True
assert result['truncated_by'] == 'matches'
@pytest.mark.asyncio
async def test_grep_reports_invalid_regex_and_truncates_long_matching_lines():
with tempfile.TemporaryDirectory() as tmpdir:
loader, _ = _make_loader_with_workspace(tmpdir)
with open(os.path.join(tmpdir, 'data.txt'), 'w', encoding='utf-8') as f:
f.write('needle ' + ('x' * 600) + '\n')
invalid = await loader.invoke_tool('grep', {'path': '/workspace', 'pattern': '['}, _make_query())
result = await loader.invoke_tool('grep', {'path': '/workspace', 'pattern': 'needle'}, _make_query())
assert invalid['ok'] is False
assert 'Invalid regex' in invalid['error']
assert result['ok'] is True
assert result['truncated'] is True
assert result['truncated_by'] == 'line'
assert result['matches'][0]['file'] == '/workspace/data.txt'
assert result['matches'][0]['content'].endswith('... [truncated]')
Generated
+4 -4
View File
@@ -2082,7 +2082,7 @@ requires-dist = [
{ name = "ebooklib", specifier = ">=0.18" },
{ name = "gewechat-client", specifier = ">=0.1.5" },
{ name = "html2text", specifier = ">=2024.2.26" },
{ name = "langbot-plugin", specifier = "==0.4.4" },
{ name = "langbot-plugin", specifier = "==0.4.5" },
{ name = "langchain", specifier = ">=0.2.0" },
{ name = "langchain-core", specifier = ">=1.3.3" },
{ name = "langchain-text-splitters", specifier = ">=1.1.2" },
@@ -2146,7 +2146,7 @@ dev = [
[[package]]
name = "langbot-plugin"
version = "0.4.4"
version = "0.4.5"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aiofiles" },
@@ -2167,9 +2167,9 @@ dependencies = [
{ name = "watchdog" },
{ name = "websockets" },
]
sdist = { url = "https://files.pythonhosted.org/packages/68/1a/636c057f6e07a0c87dc7b9c1a373d73df82787b7706ba3ba1a95f633ce7c/langbot_plugin-0.4.4.tar.gz", hash = "sha256:8fdad2d22fe8360d2911557fac17f258f57e85f1a36bd50cd488cb44f61225a4", size = 312741, upload-time = "2026-06-13T11:59:36.772Z" }
sdist = { url = "https://files.pythonhosted.org/packages/f3/db/db33ec42b3242ea7de0c93b0523a8d32a3df76b13de177fd31671db0ba3f/langbot_plugin-0.4.5.tar.gz", hash = "sha256:3cafa5694f31e9e4b4a3d870c1bc23ee7ac6e8d271a0140c5198993471f220ec", size = 326504, upload-time = "2026-06-19T14:53:51.687Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/f9/c6/3c313e4ec431fca68326f348bd2c7a61777d43c940bb46ae6c8ebfb66973/langbot_plugin-0.4.4-py3-none-any.whl", hash = "sha256:c91f082ca431539f34790e497e2f056f4e7030e46e0d2bf01a6114b055dd2feb", size = 214164, upload-time = "2026-06-13T11:59:38.053Z" },
{ url = "https://files.pythonhosted.org/packages/81/92/8a08f8793de479fffa12a1906a25b6ff5b67a018520fa72d981569e1a6e4/langbot_plugin-0.4.5-py3-none-any.whl", hash = "sha256:12ab9aff0fb2459f75a11ba6999d2b5dfc753dcc7d265b078777b24e04b23c83", size = 215602, upload-time = "2026-06-19T14:53:50.021Z" },
]
[[package]]
+9
View File
@@ -74,6 +74,15 @@
}
}
/* Hide scrollbar while keeping scroll behaviour (horizontal tag/filter rows). */
.scrollbar-hide {
-ms-overflow-style: none; /* IE / Edge */
scrollbar-width: none; /* Firefox */
}
.scrollbar-hide::-webkit-scrollbar {
display: none; /* Chrome / Safari / WebKit */
}
@custom-variant dark (&:is(.dark *));
@theme inline {
@@ -310,6 +310,7 @@ function SingleSelectField({
{options.map((opt) => (
<div key={opt.id}>
<button
type="button"
onClick={() => onChange(opt.id)}
className={`w-full text-left text-sm px-3 py-2 rounded-lg border transition-colors ${
value === opt.id
@@ -361,8 +362,16 @@ function MultiSelectField({
const selected = value.includes(opt.id);
return (
<div key={opt.id}>
<button
<div
role="button"
tabIndex={0}
onClick={() => toggle(opt.id)}
onKeyDown={(e) => {
if (e.key === 'Enter' || e.key === ' ') {
e.preventDefault();
toggle(opt.id);
}
}}
className={`w-full text-left text-sm px-3 py-2 rounded-lg border transition-colors flex items-center gap-2 ${
selected
? 'border-primary bg-primary/5 text-primary'
@@ -371,7 +380,7 @@ function MultiSelectField({
>
<Checkbox checked={selected} className="pointer-events-none" />
{getI18nText(opt.label)}
</button>
</div>
{opt.has_input && selected && (
<input
type="text"
@@ -15,6 +15,7 @@ import {
At,
Quote,
Voice,
File as FileComponent,
Source,
} from '@/app/infra/entities/message';
import { toast } from 'sonner';
@@ -64,7 +65,12 @@ export default function DebugDialog({
const [isHovering, setIsHovering] = useState(false);
const [isConnected, setIsConnected] = useState(false);
const [selectedImages, setSelectedImages] = useState<
Array<{ file: File; preview: string; fileKey?: string }>
Array<{
file: File;
preview: string;
fileKey?: string;
kind: 'image' | 'voice' | 'file';
}>
>([]);
const [isUploading, setIsUploading] = useState(false);
const [previewImageUrl, setPreviewImageUrl] = useState<string>('');
@@ -292,23 +298,38 @@ export default function DebugDialog({
const files = e.target.files;
if (!files || files.length === 0) return;
const newImages: Array<{ file: File; preview: string }> = [];
const newImages: Array<{
file: File;
preview: string;
kind: 'image' | 'voice' | 'file';
}> = [];
for (let i = 0; i < files.length; i++) {
const file = files[i];
if (file.type.startsWith('image/')) {
const preview = URL.createObjectURL(file);
newImages.push({ file, preview });
newImages.push({
file,
preview: URL.createObjectURL(file),
kind: 'image',
});
} else if (file.type.startsWith('audio/')) {
newImages.push({ file, preview: '', kind: 'voice' });
} else {
newImages.push({ file, preview: '', kind: 'file' });
}
}
setSelectedImages((prev) => [...prev, ...newImages]);
// reset the input so selecting the same file again re-triggers onChange
e.target.value = '';
};
const handleRemoveImage = (index: number) => {
setSelectedImages((prev) => {
const newImages = [...prev];
URL.revokeObjectURL(newImages[index].preview);
if (newImages[index].preview) {
URL.revokeObjectURL(newImages[index].preview);
}
newImages.splice(index, 1);
return newImages;
});
@@ -372,19 +393,33 @@ export default function DebugDialog({
});
}
// Upload images and add to message chain
for (const image of selectedImages) {
// Upload attachments and add to message chain
for (const attachment of selectedImages) {
try {
const result = await httpClient.uploadWebSocketImage(
selectedPipelineId,
image.file,
);
messageChain.push({
type: 'Image',
path: result.file_key,
});
if (attachment.kind === 'image') {
const result = await httpClient.uploadWebSocketImage(
selectedPipelineId,
attachment.file,
);
messageChain.push({
type: 'Image',
path: result.file_key,
});
} else {
// Voice / File go through the generic document upload endpoint,
// which returns a storage key the backend resolves into the
// sandbox inbox just like images.
const result = await httpClient.uploadDocumentFile(attachment.file);
messageChain.push({
type: attachment.kind === 'voice' ? 'Voice' : 'File',
path: result.file_id,
...(attachment.kind === 'file'
? { name: attachment.file.name }
: {}),
});
}
} catch (error) {
console.error('Image upload failed:', error);
console.error('Attachment upload failed:', error);
toast.error(t('pipelines.debugDialog.imageUploadFailed'));
}
}
@@ -393,7 +428,9 @@ export default function DebugDialog({
setInputValue('');
setHasAt(false);
setQuotedMessage(null);
selectedImages.forEach((img) => URL.revokeObjectURL(img.preview));
selectedImages.forEach((img) => {
if (img.preview) URL.revokeObjectURL(img.preview);
});
setSelectedImages([]);
// Send message via WebSocket
@@ -460,13 +497,29 @@ export default function DebugDialog({
}
case 'File': {
const file = component as MessageChainComponent & { name?: string };
const file = component as FileComponent;
const downloadHref = file.base64
? file.base64.startsWith('data:')
? file.base64
: `data:application/octet-stream;base64,${file.base64}`
: file.url || '';
const fileName = file.name || 'Unknown';
return (
<div key={index} className="my-2 flex items-center gap-2 text-sm">
<Paperclip className="size-4" />
<span>
[{t('pipelines.debugDialog.file')}] {file.name || 'Unknown'}
</span>
{downloadHref ? (
<a
href={downloadHref}
download={fileName}
className="text-primary underline hover:opacity-80"
>
[{t('pipelines.debugDialog.file')}] {fileName}
</a>
) : (
<span>
[{t('pipelines.debugDialog.file')}] {fileName}
</span>
)}
</div>
);
}
@@ -844,17 +897,30 @@ export default function DebugDialog({
</div>
)}
{/* Image preview area */}
{/* Attachment preview area */}
{selectedImages.length > 0 && (
<div className="px-4 pb-2">
<div className="flex gap-2 flex-wrap">
{selectedImages.map((image, index) => (
<div key={index} className="relative group">
<img
src={image.preview}
alt={`preview-${index}`}
className="w-20 h-20 object-cover rounded-lg border"
/>
{image.kind === 'image' ? (
<img
src={image.preview}
alt={`preview-${index}`}
className="w-20 h-20 object-cover rounded-lg border"
/>
) : (
<div className="w-36 h-20 px-2 rounded-lg border bg-muted/40 flex items-center gap-2 overflow-hidden">
{image.kind === 'voice' ? (
<Music className="size-5 shrink-0 text-muted-foreground" />
) : (
<Paperclip className="size-5 shrink-0 text-muted-foreground" />
)}
<span className="text-xs text-muted-foreground truncate">
{image.file.name}
</span>
</div>
)}
<button
type="button"
onClick={() => handleRemoveImage(index)}
@@ -883,7 +949,7 @@ export default function DebugDialog({
<input
ref={fileInputRef}
type="file"
accept="image/*"
accept="image/*,audio/*,*/*"
multiple
onChange={handleImageSelect}
className="hidden"
@@ -787,38 +787,42 @@ function MarketPageContent({
</div>
</div>
{/* 用真实标签做快速筛选 */}
<div className="mx-auto flex w-full max-w-4xl items-center gap-2 overflow-x-auto pb-1 sm:flex-wrap sm:justify-center sm:overflow-visible">
<Button
type="button"
variant={selectedTags.length === 0 ? 'secondary' : 'ghost'}
size="sm"
className="h-8 shrink-0"
onClick={() => handleTagsChange([])}
>
{t('market.allExtensions')}
</Button>
{availableTags.map((tag) => {
const selected = selectedTags.includes(tag.tag);
return (
<Button
key={tag.tag}
type="button"
variant={selected ? 'secondary' : 'ghost'}
size="sm"
className="h-8 shrink-0"
onClick={() => {
const newTags = selected
? selectedTags.filter((t) => t !== tag.tag)
: [...selectedTags, tag.tag];
handleTagsChange(newTags);
}}
>
{tagNames[tag.tag] || tag.tag}
{selected && <X className="h-3.5 w-3.5" />}
</Button>
);
})}
{/* 用真实标签做快速筛选 —— 始终单行横向滚动,避免标签变多时换行错位 */}
<div className="relative mx-auto w-full max-w-4xl">
<div className="scrollbar-hide flex items-center gap-1.5 overflow-x-auto pb-1 pr-6">
<Button
type="button"
variant={selectedTags.length === 0 ? 'secondary' : 'ghost'}
size="sm"
className="h-7 shrink-0 px-2.5 text-xs"
onClick={() => handleTagsChange([])}
>
{t('market.allExtensions')}
</Button>
{availableTags.map((tag) => {
const selected = selectedTags.includes(tag.tag);
return (
<Button
key={tag.tag}
type="button"
variant={selected ? 'secondary' : 'ghost'}
size="sm"
className="h-7 shrink-0 px-2.5 text-xs"
onClick={() => {
const newTags = selected
? selectedTags.filter((t) => t !== tag.tag)
: [...selectedTags, tag.tag];
handleTagsChange(newTags);
}}
>
{tagNames[tag.tag] || tag.tag}
{selected && <X className="h-3 w-3" />}
</Button>
);
})}
</div>
{/* 右侧渐隐,提示还有更多标签可横向滚动查看 */}
<div className="pointer-events-none absolute right-0 top-0 bottom-1 w-8 bg-gradient-to-l from-background to-transparent" />
</div>
</div>
@@ -64,6 +64,8 @@ export interface File extends MessageComponent {
name?: string;
size?: number;
url?: string;
path?: string;
base64?: string;
}
// Unknown component