fix(box): purge leftover inbox/outbox on startup; clear root-owned outbox via exec

The agent attachment outbox is written by the sandbox container as root over
the bind-mount, so the LangBot host process (non-root) cannot rmtree those
files — the host-side delete failed silently and stale files were re-collected
on a later turn that reused the same query_id (the query_id counter resets to 0
on every restart).

- BoxService.initialize now purges leftover inbox/outbox after the runtime is
  available: host rmtree first, then an in-sandbox 'rm -rf' via exec for any
  root-owned survivors.
- _clear_outbox now falls back to exec when the host delete leaves root-owned
  files behind, instead of silently failing.
- collect_outbound_attachments clears the outbox unconditionally (even on an
  empty collection) so a reused query_id never inherits stale files.
- Tests: startup purge (host-owned + root-owned exec fallback + no-workspace
  noop) and empty-collection-still-clears.
This commit is contained in:
RockChinQ
2026-06-18 09:53:37 -04:00
parent 3b3b09331a
commit 67e3f837fe
2 changed files with 185 additions and 13 deletions
+90 -10
View File
@@ -105,6 +105,7 @@ class BoxService:
f'LangBot Box runtime initialized: profile={self.profile.name} '
f'default_workspace={self.default_workspace or "(none)"}'
)
await self._purge_attachment_dirs()
except Exception as exc:
self.ap.logger.warning(f'LangBot Box runtime unavailable, sandbox features disabled: {exc}')
self._available = False
@@ -382,6 +383,63 @@ class BoxService:
return None
return os.path.join(root, subdir, str(query_id))
async def _purge_attachment_dirs(self) -> None:
"""Remove leftover inbox/outbox directories on startup.
``query_id`` is a process-local counter (see pipeline query pool) that
resets to 0 on every restart, so per-query attachment directories from
a previous process would otherwise be silently reused — leaking a prior
run's inbound files and re-sending stale outbound files.
Outbox files are written by the sandbox **container**, which runs as
root over the bind-mount, so the LangBot host process (a non-root user)
cannot ``rmtree`` them. We therefore try a host-side delete first (fast,
works for host-owned inbox files) and, for anything that survives,
delete from *inside* the sandbox via exec where the container's root can
remove its own files. Best-effort: never block startup.
"""
root = self.default_workspace
if not root or not os.path.isdir(root):
return
import shutil
host_survivors: list[str] = []
def _host_purge() -> list[str]:
survivors: list[str] = []
for subdir in (self.INBOX_SUBDIR, self.OUTBOX_SUBDIR):
path = os.path.join(root, subdir)
if not os.path.isdir(path):
continue
shutil.rmtree(path, ignore_errors=True)
if os.path.exists(path):
survivors.append(subdir)
return survivors
try:
host_survivors = await asyncio.to_thread(_host_purge)
except Exception as exc: # pragma: no cover - defensive
self.ap.logger.warning(f'Host-side purge of sandbox attachment dirs failed: {exc}')
host_survivors = [self.INBOX_SUBDIR, self.OUTBOX_SUBDIR]
if not host_survivors:
self.ap.logger.info('Purged leftover sandbox attachment dirs from a previous process.')
return
# Root-owned leftovers (container output): delete from inside the box.
targets = ' '.join(f'/workspace/{sub}' for sub in host_survivors)
try:
spec = self.build_spec({'cmd': f'rm -rf {targets}', 'session_id': '__startup_purge__', 'timeout_sec': 30})
await self.client.execute(spec)
self.ap.logger.info(
f'Purged root-owned leftover sandbox attachment dirs via sandbox exec: {host_survivors}'
)
except Exception as exc:
self.ap.logger.warning(
f'Failed to purge root-owned sandbox attachment dirs {host_survivors} via exec: {exc}'
)
@staticmethod
def _sanitize_attachment_name(name: str, fallback: str) -> str:
"""Reduce an arbitrary attachment name to a safe basename.
@@ -641,8 +699,11 @@ class BoxService:
attachments = self._classify_outbound_entries(entries)
# Always clear the per-query outbox after reading — even when nothing
# was collected — so a later turn that reuses the same query_id (the
# counter resets across restarts) never inherits stale files.
await self._clear_outbox(query, host_dir)
if attachments:
await self._clear_outbox(query, host_dir)
self.ap.logger.info(
f'Collected {len(attachments)} outbound attachment(s) from sandbox: query_id={query.query_id}'
)
@@ -711,21 +772,40 @@ class BoxService:
return []
async def _clear_outbox(self, query: pipeline_query.Query, host_dir: str | None) -> None:
"""Empty the per-query outbox after collection (host or exec)."""
"""Empty the per-query outbox after collection.
Tries a host-side ``rmtree`` first (fast, no container round-trip).
Outbox files are created by the sandbox container as root over the
bind-mount, so when LangBot runs as a non-root user the host delete
fails silently and the files survive — they would then be re-collected
on the next turn that reuses the same query_id. So if anything survives
the host delete, clear it from *inside* the sandbox via exec, where the
container's root can remove its own files. Best-effort: never raise
into the pipeline.
"""
target_dir = f'{self.OUTBOX_MOUNT_DIR}/{query.query_id}'
if host_dir is not None:
import shutil
def _clear():
def _clear() -> bool:
shutil.rmtree(host_dir, ignore_errors=True)
survived = os.path.exists(host_dir) and bool(os.listdir(host_dir))
os.makedirs(host_dir, exist_ok=True)
return survived
await asyncio.to_thread(_clear)
return
target_dir = f'{self.OUTBOX_MOUNT_DIR}/{query.query_id}'
await self.execute_tool(
{'command': f'rm -rf {target_dir} && mkdir -p {target_dir}', 'timeout_sec': 30},
query,
)
survived = await asyncio.to_thread(_clear)
if not survived:
return
# Root-owned container files survived the host delete — fall through.
try:
await self.execute_tool(
{'command': f'rm -rf {target_dir} && mkdir -p {target_dir}', 'timeout_sec': 30},
query,
)
except Exception as exc:
self.ap.logger.warning(f'Failed to clear sandbox outbox {target_dir}: {exc}')
@staticmethod
def _classify_outbound_entries(entries: list[dict]) -> list[dict]:
+95 -3
View File
@@ -1695,7 +1695,10 @@ class TestInboundOutboundRoundTrip:
assert any('rm -rf' in c for c in calls)
@pytest.mark.asyncio
async def test_collect_outbound_empty_no_cleanup(self):
async def test_collect_outbound_empty_still_clears(self):
# An empty collection MUST still clear the per-query outbox, so a later
# turn reusing the same query_id (the counter resets across restarts)
# cannot inherit stale files left from a prior run.
service = self._service()
query = make_query()
@@ -1703,11 +1706,14 @@ class TestInboundOutboundRoundTrip:
async def fake_execute_tool(parameters, q):
calls.append(parameters['command'])
return {'ok': True, 'stdout': '[]', 'stderr': ''}
if 'os.walk' in parameters['command']:
return {'ok': True, 'stdout': '[]', 'stderr': ''}
return {'ok': True, 'stdout': '', 'stderr': ''}
service.execute_tool = AsyncMock(side_effect=fake_execute_tool)
assert await service.collect_outbound_attachments(query) == []
assert not any('rm -rf' in c for c in calls)
# cleanup (rm -rf) is issued unconditionally now
assert any('rm -rf' in c for c in calls)
@pytest.mark.asyncio
async def test_passthrough_noop_when_unavailable(self):
@@ -1808,3 +1814,89 @@ class TestAttachmentHostPath:
assert raw == big_png
# Outbox cleared after collection.
assert os.listdir(outbox) == []
@pytest.mark.asyncio
async def test_outbound_empty_clears_stale_host_dir(self, tmp_path):
# Reusing a query_id (counter resets on restart) must not re-send files
# a previous run left in the outbox: an empty collection still clears it.
service, ws = self._service_with_workspace(tmp_path)
query = make_query()
outbox = os.path.join(ws, 'outbox', str(query.query_id))
os.makedirs(outbox, exist_ok=True)
# Stale file from a prior turn; the agent produced nothing this turn —
# but _read_outbox_host would still pick it up, so collection must drop
# it and then wipe the dir. Simulate "nothing produced this turn" by
# treating any present file as stale and asserting it is not re-sent
# across a second, genuinely-empty collection.
open(os.path.join(outbox, 'stale.png'), 'wb').write(b'\x89PNG\r\n\x1a\n old')
service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used on host path'))
# First collection drains + clears the dir.
first = await service.collect_outbound_attachments(query)
assert {a['name'] for a in first} == {'stale.png'}
assert os.listdir(outbox) == []
# Second collection (no new files) returns nothing and leaves a clean dir.
second = await service.collect_outbound_attachments(query)
assert second == []
assert os.listdir(outbox) == []
@pytest.mark.asyncio
async def test_purge_attachment_dirs_wipes_host_owned_leftovers_on_init(self, tmp_path):
# Leftover inbox/outbox dirs from a previous process (same reset
# query_id counter) must be removed at startup. Host-owned files are
# cleared without any sandbox exec.
service, ws = self._service_with_workspace(tmp_path)
for sub in ('inbox', 'outbox'):
d = os.path.join(ws, sub, '0')
os.makedirs(d, exist_ok=True)
open(os.path.join(d, 'leftover.bin'), 'wb').write(b'from a previous process')
service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used for host-owned files'))
await service._purge_attachment_dirs()
assert not os.path.exists(os.path.join(ws, 'inbox'))
assert not os.path.exists(os.path.join(ws, 'outbox'))
# The workspace root itself survives.
assert os.path.isdir(ws)
@pytest.mark.asyncio
async def test_purge_attachment_dirs_falls_back_to_exec_for_root_owned(self, tmp_path, monkeypatch):
# When the host delete cannot remove a dir (root-owned container output),
# purge must fall back to deleting from inside the sandbox via exec.
service, ws = self._service_with_workspace(tmp_path)
outbox = os.path.join(ws, 'outbox')
os.makedirs(os.path.join(outbox, '0'), exist_ok=True)
# Simulate a host delete that cannot remove the root-owned outbox.
import shutil as _shutil
real_rmtree = _shutil.rmtree
def fake_rmtree(path, *a, **k):
if os.path.abspath(path) == os.path.abspath(outbox):
return # "permission denied" — silently leaves the dir
return real_rmtree(path, *a, **k)
monkeypatch.setattr(_shutil, 'rmtree', fake_rmtree)
executed = {}
spec_obj = object()
service.build_spec = Mock(return_value=spec_obj)
service.client.execute = AsyncMock(side_effect=lambda s: executed.setdefault('spec', s))
await service._purge_attachment_dirs()
# build_spec was asked to rm the surviving outbox via exec.
cmd = service.build_spec.call_args.args[0]['cmd']
assert 'rm -rf' in cmd and '/workspace/outbox' in cmd
assert '/workspace/inbox' not in cmd # inbox was host-deletable
service.client.execute.assert_awaited_once_with(spec_obj)
@pytest.mark.asyncio
async def test_purge_attachment_dirs_noop_without_workspace(self):
# No bind-mounted workspace (E2B / remote): purge is a safe no-op.
service = BoxService(make_app(Mock()), client=Mock(spec=BoxRuntimeClient))
service.default_workspace = None
# Must not raise.
await service._purge_attachment_dirs()