From 67e3f837fe1960adcf7c56ad80ac53a78bf73bf6 Mon Sep 17 00:00:00 2001 From: RockChinQ Date: Thu, 18 Jun 2026 09:53:37 -0400 Subject: [PATCH] fix(box): purge leftover inbox/outbox on startup; clear root-owned outbox via exec MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The agent attachment outbox is written by the sandbox container as root over the bind-mount, so the LangBot host process (non-root) cannot rmtree those files — the host-side delete failed silently and stale files were re-collected on a later turn that reused the same query_id (the query_id counter resets to 0 on every restart). - BoxService.initialize now purges leftover inbox/outbox after the runtime is available: host rmtree first, then an in-sandbox 'rm -rf' via exec for any root-owned survivors. - _clear_outbox now falls back to exec when the host delete leaves root-owned files behind, instead of silently failing. - collect_outbound_attachments clears the outbox unconditionally (even on an empty collection) so a reused query_id never inherits stale files. - Tests: startup purge (host-owned + root-owned exec fallback + no-workspace noop) and empty-collection-still-clears. --- src/langbot/pkg/box/service.py | 100 ++++++++++++++++++++--- tests/unit_tests/box/test_box_service.py | 98 +++++++++++++++++++++- 2 files changed, 185 insertions(+), 13 deletions(-) diff --git a/src/langbot/pkg/box/service.py b/src/langbot/pkg/box/service.py index dc6c317e..336971b6 100644 --- a/src/langbot/pkg/box/service.py +++ b/src/langbot/pkg/box/service.py @@ -105,6 +105,7 @@ class BoxService: f'LangBot Box runtime initialized: profile={self.profile.name} ' f'default_workspace={self.default_workspace or "(none)"}' ) + await self._purge_attachment_dirs() except Exception as exc: self.ap.logger.warning(f'LangBot Box runtime unavailable, sandbox features disabled: {exc}') self._available = False @@ -382,6 +383,63 @@ class BoxService: return None return os.path.join(root, subdir, str(query_id)) + async def _purge_attachment_dirs(self) -> None: + """Remove leftover inbox/outbox directories on startup. + + ``query_id`` is a process-local counter (see pipeline query pool) that + resets to 0 on every restart, so per-query attachment directories from + a previous process would otherwise be silently reused — leaking a prior + run's inbound files and re-sending stale outbound files. + + Outbox files are written by the sandbox **container**, which runs as + root over the bind-mount, so the LangBot host process (a non-root user) + cannot ``rmtree`` them. We therefore try a host-side delete first (fast, + works for host-owned inbox files) and, for anything that survives, + delete from *inside* the sandbox via exec where the container's root can + remove its own files. Best-effort: never block startup. + """ + root = self.default_workspace + if not root or not os.path.isdir(root): + return + + import shutil + + host_survivors: list[str] = [] + + def _host_purge() -> list[str]: + survivors: list[str] = [] + for subdir in (self.INBOX_SUBDIR, self.OUTBOX_SUBDIR): + path = os.path.join(root, subdir) + if not os.path.isdir(path): + continue + shutil.rmtree(path, ignore_errors=True) + if os.path.exists(path): + survivors.append(subdir) + return survivors + + try: + host_survivors = await asyncio.to_thread(_host_purge) + except Exception as exc: # pragma: no cover - defensive + self.ap.logger.warning(f'Host-side purge of sandbox attachment dirs failed: {exc}') + host_survivors = [self.INBOX_SUBDIR, self.OUTBOX_SUBDIR] + + if not host_survivors: + self.ap.logger.info('Purged leftover sandbox attachment dirs from a previous process.') + return + + # Root-owned leftovers (container output): delete from inside the box. + targets = ' '.join(f'/workspace/{sub}' for sub in host_survivors) + try: + spec = self.build_spec({'cmd': f'rm -rf {targets}', 'session_id': '__startup_purge__', 'timeout_sec': 30}) + await self.client.execute(spec) + self.ap.logger.info( + f'Purged root-owned leftover sandbox attachment dirs via sandbox exec: {host_survivors}' + ) + except Exception as exc: + self.ap.logger.warning( + f'Failed to purge root-owned sandbox attachment dirs {host_survivors} via exec: {exc}' + ) + @staticmethod def _sanitize_attachment_name(name: str, fallback: str) -> str: """Reduce an arbitrary attachment name to a safe basename. @@ -641,8 +699,11 @@ class BoxService: attachments = self._classify_outbound_entries(entries) + # Always clear the per-query outbox after reading — even when nothing + # was collected — so a later turn that reuses the same query_id (the + # counter resets across restarts) never inherits stale files. + await self._clear_outbox(query, host_dir) if attachments: - await self._clear_outbox(query, host_dir) self.ap.logger.info( f'Collected {len(attachments)} outbound attachment(s) from sandbox: query_id={query.query_id}' ) @@ -711,21 +772,40 @@ class BoxService: return [] async def _clear_outbox(self, query: pipeline_query.Query, host_dir: str | None) -> None: - """Empty the per-query outbox after collection (host or exec).""" + """Empty the per-query outbox after collection. + + Tries a host-side ``rmtree`` first (fast, no container round-trip). + Outbox files are created by the sandbox container as root over the + bind-mount, so when LangBot runs as a non-root user the host delete + fails silently and the files survive — they would then be re-collected + on the next turn that reuses the same query_id. So if anything survives + the host delete, clear it from *inside* the sandbox via exec, where the + container's root can remove its own files. Best-effort: never raise + into the pipeline. + """ + target_dir = f'{self.OUTBOX_MOUNT_DIR}/{query.query_id}' + if host_dir is not None: import shutil - def _clear(): + def _clear() -> bool: shutil.rmtree(host_dir, ignore_errors=True) + survived = os.path.exists(host_dir) and bool(os.listdir(host_dir)) os.makedirs(host_dir, exist_ok=True) + return survived - await asyncio.to_thread(_clear) - return - target_dir = f'{self.OUTBOX_MOUNT_DIR}/{query.query_id}' - await self.execute_tool( - {'command': f'rm -rf {target_dir} && mkdir -p {target_dir}', 'timeout_sec': 30}, - query, - ) + survived = await asyncio.to_thread(_clear) + if not survived: + return + # Root-owned container files survived the host delete — fall through. + + try: + await self.execute_tool( + {'command': f'rm -rf {target_dir} && mkdir -p {target_dir}', 'timeout_sec': 30}, + query, + ) + except Exception as exc: + self.ap.logger.warning(f'Failed to clear sandbox outbox {target_dir}: {exc}') @staticmethod def _classify_outbound_entries(entries: list[dict]) -> list[dict]: diff --git a/tests/unit_tests/box/test_box_service.py b/tests/unit_tests/box/test_box_service.py index 0b7183ba..88f1927e 100644 --- a/tests/unit_tests/box/test_box_service.py +++ b/tests/unit_tests/box/test_box_service.py @@ -1695,7 +1695,10 @@ class TestInboundOutboundRoundTrip: assert any('rm -rf' in c for c in calls) @pytest.mark.asyncio - async def test_collect_outbound_empty_no_cleanup(self): + async def test_collect_outbound_empty_still_clears(self): + # An empty collection MUST still clear the per-query outbox, so a later + # turn reusing the same query_id (the counter resets across restarts) + # cannot inherit stale files left from a prior run. service = self._service() query = make_query() @@ -1703,11 +1706,14 @@ class TestInboundOutboundRoundTrip: async def fake_execute_tool(parameters, q): calls.append(parameters['command']) - return {'ok': True, 'stdout': '[]', 'stderr': ''} + if 'os.walk' in parameters['command']: + return {'ok': True, 'stdout': '[]', 'stderr': ''} + return {'ok': True, 'stdout': '', 'stderr': ''} service.execute_tool = AsyncMock(side_effect=fake_execute_tool) assert await service.collect_outbound_attachments(query) == [] - assert not any('rm -rf' in c for c in calls) + # cleanup (rm -rf) is issued unconditionally now + assert any('rm -rf' in c for c in calls) @pytest.mark.asyncio async def test_passthrough_noop_when_unavailable(self): @@ -1808,3 +1814,89 @@ class TestAttachmentHostPath: assert raw == big_png # Outbox cleared after collection. assert os.listdir(outbox) == [] + + @pytest.mark.asyncio + async def test_outbound_empty_clears_stale_host_dir(self, tmp_path): + # Reusing a query_id (counter resets on restart) must not re-send files + # a previous run left in the outbox: an empty collection still clears it. + service, ws = self._service_with_workspace(tmp_path) + query = make_query() + outbox = os.path.join(ws, 'outbox', str(query.query_id)) + os.makedirs(outbox, exist_ok=True) + # Stale file from a prior turn; the agent produced nothing this turn — + # but _read_outbox_host would still pick it up, so collection must drop + # it and then wipe the dir. Simulate "nothing produced this turn" by + # treating any present file as stale and asserting it is not re-sent + # across a second, genuinely-empty collection. + open(os.path.join(outbox, 'stale.png'), 'wb').write(b'\x89PNG\r\n\x1a\n old') + service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used on host path')) + + # First collection drains + clears the dir. + first = await service.collect_outbound_attachments(query) + assert {a['name'] for a in first} == {'stale.png'} + assert os.listdir(outbox) == [] + + # Second collection (no new files) returns nothing and leaves a clean dir. + second = await service.collect_outbound_attachments(query) + assert second == [] + assert os.listdir(outbox) == [] + + @pytest.mark.asyncio + async def test_purge_attachment_dirs_wipes_host_owned_leftovers_on_init(self, tmp_path): + # Leftover inbox/outbox dirs from a previous process (same reset + # query_id counter) must be removed at startup. Host-owned files are + # cleared without any sandbox exec. + service, ws = self._service_with_workspace(tmp_path) + for sub in ('inbox', 'outbox'): + d = os.path.join(ws, sub, '0') + os.makedirs(d, exist_ok=True) + open(os.path.join(d, 'leftover.bin'), 'wb').write(b'from a previous process') + service.execute_tool = AsyncMock(side_effect=AssertionError('exec must not be used for host-owned files')) + + await service._purge_attachment_dirs() + + assert not os.path.exists(os.path.join(ws, 'inbox')) + assert not os.path.exists(os.path.join(ws, 'outbox')) + # The workspace root itself survives. + assert os.path.isdir(ws) + + @pytest.mark.asyncio + async def test_purge_attachment_dirs_falls_back_to_exec_for_root_owned(self, tmp_path, monkeypatch): + # When the host delete cannot remove a dir (root-owned container output), + # purge must fall back to deleting from inside the sandbox via exec. + service, ws = self._service_with_workspace(tmp_path) + outbox = os.path.join(ws, 'outbox') + os.makedirs(os.path.join(outbox, '0'), exist_ok=True) + + # Simulate a host delete that cannot remove the root-owned outbox. + import shutil as _shutil + + real_rmtree = _shutil.rmtree + + def fake_rmtree(path, *a, **k): + if os.path.abspath(path) == os.path.abspath(outbox): + return # "permission denied" — silently leaves the dir + return real_rmtree(path, *a, **k) + + monkeypatch.setattr(_shutil, 'rmtree', fake_rmtree) + + executed = {} + spec_obj = object() + service.build_spec = Mock(return_value=spec_obj) + service.client.execute = AsyncMock(side_effect=lambda s: executed.setdefault('spec', s)) + + await service._purge_attachment_dirs() + + # build_spec was asked to rm the surviving outbox via exec. + cmd = service.build_spec.call_args.args[0]['cmd'] + assert 'rm -rf' in cmd and '/workspace/outbox' in cmd + assert '/workspace/inbox' not in cmd # inbox was host-deletable + service.client.execute.assert_awaited_once_with(spec_obj) + + @pytest.mark.asyncio + async def test_purge_attachment_dirs_noop_without_workspace(self): + # No bind-mounted workspace (E2B / remote): purge is a safe no-op. + service = BoxService(make_app(Mock()), client=Mock(spec=BoxRuntimeClient)) + service.default_workspace = None + # Must not raise. + await service._purge_attachment_dirs()