mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-19 03:54:19 +00:00
fix(box): purge leftover inbox/outbox on startup; clear root-owned outbox via exec (#2259)
The agent attachment outbox is written by the sandbox container as root over the bind-mount, so the LangBot host process (non-root) cannot rmtree those files — the host-side delete failed silently and stale files were re-collected on a later turn that reused the same query_id (the query_id counter resets to 0 on every restart). - BoxService.initialize now purges leftover inbox/outbox after the runtime is available: host rmtree first, then an in-sandbox 'rm -rf' via exec for any root-owned survivors. - _clear_outbox now falls back to exec when the host delete leaves root-owned files behind, instead of silently failing. - collect_outbound_attachments clears the outbox unconditionally (even on an empty collection) so a reused query_id never inherits stale files. - Tests: startup purge (host-owned + root-owned exec fallback + no-workspace noop) and empty-collection-still-clears.
This commit is contained in:
@@ -105,6 +105,7 @@ class BoxService:
|
||||
f'LangBot Box runtime initialized: profile={self.profile.name} '
|
||||
f'default_workspace={self.default_workspace or "(none)"}'
|
||||
)
|
||||
await self._purge_attachment_dirs()
|
||||
except Exception as exc:
|
||||
self.ap.logger.warning(f'LangBot Box runtime unavailable, sandbox features disabled: {exc}')
|
||||
self._available = False
|
||||
@@ -382,6 +383,63 @@ class BoxService:
|
||||
return None
|
||||
return os.path.join(root, subdir, str(query_id))
|
||||
|
||||
async def _purge_attachment_dirs(self) -> None:
|
||||
"""Remove leftover inbox/outbox directories on startup.
|
||||
|
||||
``query_id`` is a process-local counter (see pipeline query pool) that
|
||||
resets to 0 on every restart, so per-query attachment directories from
|
||||
a previous process would otherwise be silently reused — leaking a prior
|
||||
run's inbound files and re-sending stale outbound files.
|
||||
|
||||
Outbox files are written by the sandbox **container**, which runs as
|
||||
root over the bind-mount, so the LangBot host process (a non-root user)
|
||||
cannot ``rmtree`` them. We therefore try a host-side delete first (fast,
|
||||
works for host-owned inbox files) and, for anything that survives,
|
||||
delete from *inside* the sandbox via exec where the container's root can
|
||||
remove its own files. Best-effort: never block startup.
|
||||
"""
|
||||
root = self.default_workspace
|
||||
if not root or not os.path.isdir(root):
|
||||
return
|
||||
|
||||
import shutil
|
||||
|
||||
host_survivors: list[str] = []
|
||||
|
||||
def _host_purge() -> list[str]:
|
||||
survivors: list[str] = []
|
||||
for subdir in (self.INBOX_SUBDIR, self.OUTBOX_SUBDIR):
|
||||
path = os.path.join(root, subdir)
|
||||
if not os.path.isdir(path):
|
||||
continue
|
||||
shutil.rmtree(path, ignore_errors=True)
|
||||
if os.path.exists(path):
|
||||
survivors.append(subdir)
|
||||
return survivors
|
||||
|
||||
try:
|
||||
host_survivors = await asyncio.to_thread(_host_purge)
|
||||
except Exception as exc: # pragma: no cover - defensive
|
||||
self.ap.logger.warning(f'Host-side purge of sandbox attachment dirs failed: {exc}')
|
||||
host_survivors = [self.INBOX_SUBDIR, self.OUTBOX_SUBDIR]
|
||||
|
||||
if not host_survivors:
|
||||
self.ap.logger.info('Purged leftover sandbox attachment dirs from a previous process.')
|
||||
return
|
||||
|
||||
# Root-owned leftovers (container output): delete from inside the box.
|
||||
targets = ' '.join(f'/workspace/{sub}' for sub in host_survivors)
|
||||
try:
|
||||
spec = self.build_spec({'cmd': f'rm -rf {targets}', 'session_id': '__startup_purge__', 'timeout_sec': 30})
|
||||
await self.client.execute(spec)
|
||||
self.ap.logger.info(
|
||||
f'Purged root-owned leftover sandbox attachment dirs via sandbox exec: {host_survivors}'
|
||||
)
|
||||
except Exception as exc:
|
||||
self.ap.logger.warning(
|
||||
f'Failed to purge root-owned sandbox attachment dirs {host_survivors} via exec: {exc}'
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def _sanitize_attachment_name(name: str, fallback: str) -> str:
|
||||
"""Reduce an arbitrary attachment name to a safe basename.
|
||||
@@ -641,8 +699,11 @@ class BoxService:
|
||||
|
||||
attachments = self._classify_outbound_entries(entries)
|
||||
|
||||
# Always clear the per-query outbox after reading — even when nothing
|
||||
# was collected — so a later turn that reuses the same query_id (the
|
||||
# counter resets across restarts) never inherits stale files.
|
||||
await self._clear_outbox(query, host_dir)
|
||||
if attachments:
|
||||
await self._clear_outbox(query, host_dir)
|
||||
self.ap.logger.info(
|
||||
f'Collected {len(attachments)} outbound attachment(s) from sandbox: query_id={query.query_id}'
|
||||
)
|
||||
@@ -711,21 +772,40 @@ class BoxService:
|
||||
return []
|
||||
|
||||
async def _clear_outbox(self, query: pipeline_query.Query, host_dir: str | None) -> None:
|
||||
"""Empty the per-query outbox after collection (host or exec)."""
|
||||
"""Empty the per-query outbox after collection.
|
||||
|
||||
Tries a host-side ``rmtree`` first (fast, no container round-trip).
|
||||
Outbox files are created by the sandbox container as root over the
|
||||
bind-mount, so when LangBot runs as a non-root user the host delete
|
||||
fails silently and the files survive — they would then be re-collected
|
||||
on the next turn that reuses the same query_id. So if anything survives
|
||||
the host delete, clear it from *inside* the sandbox via exec, where the
|
||||
container's root can remove its own files. Best-effort: never raise
|
||||
into the pipeline.
|
||||
"""
|
||||
target_dir = f'{self.OUTBOX_MOUNT_DIR}/{query.query_id}'
|
||||
|
||||
if host_dir is not None:
|
||||
import shutil
|
||||
|
||||
def _clear():
|
||||
def _clear() -> bool:
|
||||
shutil.rmtree(host_dir, ignore_errors=True)
|
||||
survived = os.path.exists(host_dir) and bool(os.listdir(host_dir))
|
||||
os.makedirs(host_dir, exist_ok=True)
|
||||
return survived
|
||||
|
||||
await asyncio.to_thread(_clear)
|
||||
return
|
||||
target_dir = f'{self.OUTBOX_MOUNT_DIR}/{query.query_id}'
|
||||
await self.execute_tool(
|
||||
{'command': f'rm -rf {target_dir} && mkdir -p {target_dir}', 'timeout_sec': 30},
|
||||
query,
|
||||
)
|
||||
survived = await asyncio.to_thread(_clear)
|
||||
if not survived:
|
||||
return
|
||||
# Root-owned container files survived the host delete — fall through.
|
||||
|
||||
try:
|
||||
await self.execute_tool(
|
||||
{'command': f'rm -rf {target_dir} && mkdir -p {target_dir}', 'timeout_sec': 30},
|
||||
query,
|
||||
)
|
||||
except Exception as exc:
|
||||
self.ap.logger.warning(f'Failed to clear sandbox outbox {target_dir}: {exc}')
|
||||
|
||||
@staticmethod
|
||||
def _classify_outbound_entries(entries: list[dict]) -> list[dict]:
|
||||
|
||||
@@ -1695,7 +1695,10 @@ class TestInboundOutboundRoundTrip:
|
||||
assert any('rm -rf' in c for c in calls)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_collect_outbound_empty_no_cleanup(self):
|
||||
async def test_collect_outbound_empty_still_clears(self):
|
||||
# An empty collection MUST still clear the per-query outbox, so a later
|
||||
# turn reusing the same query_id (the counter resets across restarts)
|
||||
# cannot inherit stale files left from a prior run.
|
||||
service = self._service()
|
||||
query = make_query()
|
||||
|
||||
@@ -1703,11 +1706,14 @@ class TestInboundOutboundRoundTrip:
|
||||
|
||||
async def fake_execute_tool(parameters, q):
|
||||
calls.append(parameters['command'])
|
||||
return {'ok': True, 'stdout': '[]', 'stderr': ''}
|
||||
if 'os.walk' in parameters['command']:
|
||||
return {'ok': True, 'stdout': '[]', 'stderr': ''}
|
||||
return {'ok': True, 'stdout': '', 'stderr': ''}
|
||||
|
||||
service.execute_tool = AsyncMock(side_effect=fake_execute_tool)
|
||||
assert await service.collect_outbound_attachments(query) == []
|
||||
assert not any('rm -rf' in c for c in calls)
|
||||
# cleanup (rm -rf) is issued unconditionally now
|
||||
assert any('rm -rf' in c for c in calls)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_passthrough_noop_when_unavailable(self):
|
||||
@@ -1808,3 +1814,89 @@ class TestAttachmentHostPath:
|
||||
assert raw == big_png
|
||||
# Outbox cleared after collection.
|
||||
assert os.listdir(outbox) == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_outbound_empty_clears_stale_host_dir(self, tmp_path):
|
||||
# Reusing a query_id (counter resets on restart) must not re-send files
|
||||
# a previous run left in the outbox: an empty collection still clears it.
|
||||
service, ws = self._service_with_workspace(tmp_path)
|
||||
query = make_query()
|
||||
outbox = os.path.join(ws, 'outbox', str(query.query_id))
|
||||
os.makedirs(outbox, exist_ok=True)
|
||||
# Stale file from a prior turn; the agent produced nothing this turn —
|
||||
# but _read_outbox_host would still pick it up, so collection must drop
|
||||
# it and then wipe the dir. Simulate "nothing produced this turn" by
|
||||
# treating any present file as stale and asserting it is not re-sent
|
||||
# across a second, genuinely-empty collection.
|
||||
open(os.path.join(outbox, 'stale.png'), 'wb').write(b'\x89PNG\r\n\x1a\n old')
|
||||
service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used on host path'))
|
||||
|
||||
# First collection drains + clears the dir.
|
||||
first = await service.collect_outbound_attachments(query)
|
||||
assert {a['name'] for a in first} == {'stale.png'}
|
||||
assert os.listdir(outbox) == []
|
||||
|
||||
# Second collection (no new files) returns nothing and leaves a clean dir.
|
||||
second = await service.collect_outbound_attachments(query)
|
||||
assert second == []
|
||||
assert os.listdir(outbox) == []
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_purge_attachment_dirs_wipes_host_owned_leftovers_on_init(self, tmp_path):
|
||||
# Leftover inbox/outbox dirs from a previous process (same reset
|
||||
# query_id counter) must be removed at startup. Host-owned files are
|
||||
# cleared without any sandbox exec.
|
||||
service, ws = self._service_with_workspace(tmp_path)
|
||||
for sub in ('inbox', 'outbox'):
|
||||
d = os.path.join(ws, sub, '0')
|
||||
os.makedirs(d, exist_ok=True)
|
||||
open(os.path.join(d, 'leftover.bin'), 'wb').write(b'from a previous process')
|
||||
service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used for host-owned files'))
|
||||
|
||||
await service._purge_attachment_dirs()
|
||||
|
||||
assert not os.path.exists(os.path.join(ws, 'inbox'))
|
||||
assert not os.path.exists(os.path.join(ws, 'outbox'))
|
||||
# The workspace root itself survives.
|
||||
assert os.path.isdir(ws)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_purge_attachment_dirs_falls_back_to_exec_for_root_owned(self, tmp_path, monkeypatch):
|
||||
# When the host delete cannot remove a dir (root-owned container output),
|
||||
# purge must fall back to deleting from inside the sandbox via exec.
|
||||
service, ws = self._service_with_workspace(tmp_path)
|
||||
outbox = os.path.join(ws, 'outbox')
|
||||
os.makedirs(os.path.join(outbox, '0'), exist_ok=True)
|
||||
|
||||
# Simulate a host delete that cannot remove the root-owned outbox.
|
||||
import shutil as _shutil
|
||||
|
||||
real_rmtree = _shutil.rmtree
|
||||
|
||||
def fake_rmtree(path, *a, **k):
|
||||
if os.path.abspath(path) == os.path.abspath(outbox):
|
||||
return # "permission denied" — silently leaves the dir
|
||||
return real_rmtree(path, *a, **k)
|
||||
|
||||
monkeypatch.setattr(_shutil, 'rmtree', fake_rmtree)
|
||||
|
||||
executed = {}
|
||||
spec_obj = object()
|
||||
service.build_spec = Mock(return_value=spec_obj)
|
||||
service.client.execute = AsyncMock(side_effect=lambda s: executed.setdefault('spec', s))
|
||||
|
||||
await service._purge_attachment_dirs()
|
||||
|
||||
# build_spec was asked to rm the surviving outbox via exec.
|
||||
cmd = service.build_spec.call_args.args[0]['cmd']
|
||||
assert 'rm -rf' in cmd and '/workspace/outbox' in cmd
|
||||
assert '/workspace/inbox' not in cmd # inbox was host-deletable
|
||||
service.client.execute.assert_awaited_once_with(spec_obj)
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_purge_attachment_dirs_noop_without_workspace(self):
|
||||
# No bind-mounted workspace (E2B / remote): purge is a safe no-op.
|
||||
service = BoxService(make_app(Mock()), client=Mock(spec=BoxRuntimeClient))
|
||||
service.default_workspace = None
|
||||
# Must not raise.
|
||||
await service._purge_attachment_dirs()
|
||||
|
||||
Reference in New Issue
Block a user