Files
LangBot/tests/unit_tests/provider/test_localagent_sandbox_exec.py
huanghuoguoguo 9ecb587ac0 refactor(provider): use LiteLLM as unified LLM requester backend (#2150)
* refactor(provider): use LiteLLM as unified LLM requester backend

  - Replace 23+ individual requester implementations with unified litellmchat.py
  - Add litellm_provider field to 27 YAML manifests for provider routing
  - Delete redundant requester subclasses
  - Add unit tests for LiteLLMRequester (29 tests)
  - Fix num_retries parameter name (was max_retries)
  - Fix exception handling order for subclass exceptions

  LiteLLM provides unified API for 100+ providers, eliminating need for
  provider-specific requesters.

* fix: ruff format provider.py

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* refactor(provider): simplify LiteLLM requester usage handling

  - Remove unused Anthropic-specific tool schema generation
  - Share completion argument construction between normal and streaming calls
  - Use LiteLLM/OpenAI native usage fields for monitoring
  - Collect stream token usage from LiteLLM stream_options
  - Update LiteLLM requester tests for unified usage fields

* restore: restore deleted provider requester files

Restore individual provider requester implementations that were
removed in de61b5d3. These files coexist with the unified
litellmchat.py backend.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>

* feat: update requesters and improve provider selection UI

- Added `litellm_provider` field to various requesters' YAML configurations.
- Removed obsolete Python requester files for OpenRouter, PPIO, QHAIGC, ShengSuanYun, SiliconFlow, Space, TokenPony, VolcArk, and Xai.
- Introduced new requesters for Tencent and Together AI with corresponding YAML configurations and SVG icons.
- Enhanced the ProviderForm component to include a searchable dropdown for selecting providers, improving user experience.
- Updated localization files to include search provider text for both English and Chinese.

* fix(provider): align litellm rebase with master

* fix(provider): capture streaming token usage; add token observability

The LiteLLM streaming requester only captured usage when a chunk had an
empty `choices` list. Many OpenAI-compatible gateways (e.g. new-api) and
providers send the final usage payload in a chunk that still carries an
empty-delta choice, so streamed calls always recorded 0 tokens in the
monitoring logs/dashboard (non-streaming worked).

- Capture stream usage whenever a chunk carries it, regardless of choices
- Add robust _normalize_usage (dict/obj shapes, derive missing total_tokens)
- Register litellm in bootutils/deps.py (was in pyproject only)
- Add MonitoringService.get_token_statistics + /monitoring/token-statistics
  endpoint: summary, per-model breakdown, token timeseries, and a
  zero-token-success data-quality signal
- Add TokenMonitoring dashboard tab (summary tiles, stacked token chart,
  per-model table) + i18n (en/zh)
- Regression tests for stream usage capture and usage normalization

Verified end-to-end against a real OpenAI-compatible endpoint with
gpt-5.5 and claude-opus-4-8: tokens now recorded non-zero for both
streaming and non-streaming paths.

* refactor(provider): simplify litellm capabilities

* style: simplify wrapped expressions

* feat(models): persist context metadata

* fix(provider): handle dict embeddings and openai-compatible rerank in LiteLLMRequester

- invoke_embedding: support both object- and dict-shaped response.data
  entries (OpenAI-compatible gateways like new-api return dicts)
- invoke_rerank: litellm.arerank rejects the 'openai' provider, so for
  openai-compatible (or unspecified) providers call the standard
  Jina/Cohere-style POST /v1/rerank endpoint directly over HTTP
- accept both 'relevance_score' and 'score' fields in rerank results
- add unit tests for the openai-compatible HTTP rerank path

* feat(provider): enforce requester support_type when adding models

- frontend: AddModelPopover only shows model-type tabs (llm/embedding/
  rerank) that the provider's requester declares in its manifest
  support_type; ModelsDialog fetches requester manifests and maps
  requester -> support_type, passed down through ProviderCard
- backend: add _validate_provider_supports guard in create_llm_model /
  create_embedding_model / create_rerank_model so a model cannot be
  attached to a provider whose requester does not support that type,
  even if the frontend restriction is bypassed (manifests without
  support_type are allowed for backward compatibility)
- manifests: correct support_type for providers that do not offer all
  three model types:
  - llm only: anthropic, deepseek, groq, moonshot, openrouter, xai
  - llm + text-embedding: openai, gemini, mistral
  - add rerank to new-api (verified working via /v1/rerank)
  - set llm + text-embedding + rerank for aggregator/unknown gateways

* feat(provider): add searchable alias to requester manifests

- add a free-text 'alias' field to every requester manifest spec,
  containing the vendor's English/Chinese names, pinyin, common
  nicknames and flagship model-series names (e.g. moonshot -> kimi,
  月之暗面; zhipu -> glm, 智谱清言)
- frontend: ProviderForm requester search now also matches against
  alias (substring/contains), so searching 'kimi' surfaces Moonshot,
  '硅基' surfaces SiliconFlow, etc.
- also fix support_type: openrouter (relay) supports embedding+rerank;
  LangBot Space gains rerank (coming soon)

* fix(provider): make support_type guard defensive against incomplete model_mgr

- _validate_provider_supports now uses getattr to gracefully skip when
  model_mgr / provider_dict / manifest lookup is unavailable, instead of
  raising AttributeError (fixes unit tests that mock ap.model_mgr as a
  bare SimpleNamespace)
- add TestValidateProviderSupports covering: allow supported type,
  reject unsupported type, allow when support_type missing, allow when
  provider unknown, degrade safely when model_mgr is incomplete

* fix(persistence): guard 0004 migration against missing llm_models table

The 0004_add_llm_model_context_length migration called
inspector.get_columns('llm_models') unconditionally, raising
NoSuchTableError when the table does not exist (e.g. migrating a
fresh/empty DB, as exercised by the integration tests where
create_all() registers no tables because the ORM models are not
imported). Every other migration guards with a table-existence check
first; add the same guard here for both upgrade and downgrade.

Also restore the test head assertion to 0004 (it had been lowered to
0003 to mask this failure).

* Merge branch 'master' into feat/litellm

Resolve conflicts:
- uv.lock: regenerated via 'uv lock' to reconcile litellm/fastuuid
  (ours) with openai bump (master).
- Alembic migrations: master added 0004_add_mcp_readme while this
  branch added 0004_add_llm_model_context_length, both as children of
  0003 (would create multiple heads). Re-chain the litellm migration as
  0005_add_llm_model_context_length with down_revision=0004_add_mcp_readme
  for a single linear head. Update test head assertion accordingly.

* fix(persistence): shorten migration revision id to fit varchar(32)

PostgreSQL stores alembic_version.version_num as varchar(32).
'0005_add_llm_model_context_length' (33 chars) overflowed it, raising
StringDataRightTruncationError in the PG migration tests. Rename the
revision (and file) to '0005_add_llm_context_length' (27 chars) and
update the head assertions in both SQLite and PostgreSQL migration
tests.

---------

Co-authored-by: Claude Opus 4.7 <noreply@anthropic.com>
Co-authored-by: fdc310 <2213070223@qq.com>
Co-authored-by: RockChinQ <rockchinq@gmail.com>
2026-06-13 16:59:48 +08:00

282 lines
9.5 KiB
Python

from __future__ import annotations
import json
from types import SimpleNamespace
from unittest.mock import AsyncMock, Mock
import pytest
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
import langbot_plugin.api.entities.builtin.provider.message as provider_message
import langbot_plugin.api.entities.builtin.provider.session as provider_session
from langbot.pkg.provider.runners.localagent import LocalAgentRunner, _StreamAccumulator
class RecordingProvider:
def __init__(self):
self.requests: list[dict] = []
async def invoke_llm(self, query, model, messages, funcs, extra_args=None, remove_think=None):
self.requests.append(
{
'messages': list(messages),
'funcs': list(funcs),
'remove_think': remove_think,
}
)
if len(self.requests) == 1:
return provider_message.Message(
role='assistant',
content='Let me calculate that exactly.',
tool_calls=[
provider_message.ToolCall(
id='call-1',
type='function',
function=provider_message.FunctionCall(
name='exec',
arguments=json.dumps(
{'command': ("python - <<'PY'\nnums = [1, 2, 3, 4]\nprint(sum(nums) / len(nums))\nPY")}
),
),
)
],
)
tool_result = json.loads(messages[-1].content)
return provider_message.Message(
role='assistant',
content=f'The average is {tool_result["stdout"]}.',
)
class RecordingStreamProvider:
def __init__(self):
self.stream_requests: list[dict] = []
def invoke_llm_stream(self, query, model, messages, funcs, extra_args=None, remove_think=None):
self.stream_requests.append(
{
'messages': list(messages),
'funcs': list(funcs),
'remove_think': remove_think,
}
)
async def _stream():
if len(self.stream_requests) == 1:
yield provider_message.MessageChunk(
role='assistant',
tool_calls=[
provider_message.ToolCall(
id='call-1',
type='function',
function=provider_message.FunctionCall(
name='exec',
arguments=json.dumps({'command': "python -c 'print(1)'"}),
),
)
],
is_final=True,
)
return
yield provider_message.MessageChunk(
role='assistant',
content='Tool execution failed.',
is_final=True,
)
return _stream()
def make_query() -> pipeline_query.Query:
adapter = AsyncMock()
adapter.is_stream_output_supported = AsyncMock(return_value=False)
return pipeline_query.Query.model_construct(
query_id='avg-query',
launcher_type=provider_session.LauncherTypes.PERSON,
launcher_id=12345,
sender_id=12345,
message_chain=[],
message_event=None,
adapter=adapter,
pipeline_uuid='pipeline-uuid',
bot_uuid='bot-uuid',
pipeline_config={
'ai': {
'runner': {'runner': 'local-agent'},
'local-agent': {'model': {'primary': 'test-model-uuid', 'fallbacks': []}, 'prompt': 'test-prompt'},
},
'output': {'misc': {'remove-think': False}},
},
prompt=SimpleNamespace(messages=[]),
messages=[],
user_message=provider_message.Message(
role='user',
content='Please calculate the average of 1, 2, 3, and 4.',
),
use_funcs=[SimpleNamespace(name='exec')],
use_llm_model_uuid='test-model-uuid',
variables={},
)
def test_stream_accumulator_merges_fragmented_tool_call_arguments():
accumulator = _StreamAccumulator(msg_sequence=1)
assert (
accumulator.add(
provider_message.MessageChunk(
role='assistant',
tool_calls=[
provider_message.ToolCall(
id='call-1',
type='function',
function=provider_message.FunctionCall(name='exec', arguments='{"command":'),
)
],
)
)
is None
)
emitted = accumulator.add(
provider_message.MessageChunk(
role='assistant',
tool_calls=[
provider_message.ToolCall(
id='call-1',
type='function',
function=provider_message.FunctionCall(name='exec', arguments='"pwd"}'),
)
],
is_final=True,
)
)
assert emitted is not None
final_msg = accumulator.final_message()
assert final_msg.tool_calls[0].function.name == 'exec'
assert final_msg.tool_calls[0].function.arguments == '{"command":"pwd"}'
@pytest.mark.asyncio
async def test_localagent_uses_exec_for_exact_calculation():
provider = RecordingProvider()
model = SimpleNamespace(
provider=provider,
model_entity=SimpleNamespace(
uuid='test-model-uuid',
name='test-model',
abilities=['func_call'],
extra_args={},
),
)
tool_manager = SimpleNamespace(
execute_func_call=AsyncMock(
return_value={
'session_id': 'avg-query',
'backend': 'podman',
'status': 'completed',
'ok': True,
'exit_code': 0,
'stdout': '2.5',
'stderr': '',
'duration_ms': 18,
}
)
)
app = SimpleNamespace(
logger=Mock(),
model_mgr=SimpleNamespace(get_model_by_uuid=AsyncMock(return_value=model)),
tool_mgr=tool_manager,
rag_mgr=SimpleNamespace(),
box_service=SimpleNamespace(
get_system_guidance=Mock(
return_value=(
'When the exec tool is available, use it for exact calculations, statistics, '
'structured data parsing, and code execution instead of estimating mentally. '
'Unless the user explicitly asks for the script, code, or implementation details, '
'do not include the generated script in the final answer. '
'A default workspace is mounted at /workspace for file tasks.'
)
),
),
skill_mgr=SimpleNamespace(
get_skills_for_pipeline=AsyncMock(return_value=[]),
detect_skill_activation=AsyncMock(return_value=None),
build_activation_prompt=Mock(return_value=None),
),
)
runner = LocalAgentRunner(app, pipeline_config={})
query = make_query()
results = [message async for message in runner.run(query)]
assert [message.role for message in results] == ['assistant', 'tool', 'assistant']
assert results[-1].content == 'The average is 2.5.'
tool_manager.execute_func_call.assert_awaited_once()
tool_name, tool_parameters = tool_manager.execute_func_call.await_args.args[:2]
assert tool_name == 'exec'
assert 'print(sum(nums) / len(nums))' in tool_parameters['command']
first_request = provider.requests[0]
assert any(
message.role == 'system'
and 'exec' in str(message.content)
and 'exact calculations' in str(message.content)
and 'Unless the user explicitly asks for the script' in str(message.content)
and '/workspace' in str(message.content)
for message in first_request['messages']
)
assert [tool.name for tool in first_request['funcs']] == ['exec']
@pytest.mark.asyncio
async def test_localagent_streaming_tool_error_yields_message_chunks():
provider = RecordingStreamProvider()
model = SimpleNamespace(
provider=provider,
model_entity=SimpleNamespace(
uuid='test-model-uuid',
name='test-model',
abilities=['func_call'],
extra_args={},
),
)
adapter = AsyncMock()
adapter.is_stream_output_supported = AsyncMock(return_value=True)
query = make_query()
query.adapter = adapter
app = SimpleNamespace(
logger=Mock(),
model_mgr=SimpleNamespace(get_model_by_uuid=AsyncMock(return_value=model)),
tool_mgr=SimpleNamespace(execute_func_call=AsyncMock(side_effect=RuntimeError('boom'))),
rag_mgr=SimpleNamespace(),
box_service=SimpleNamespace(
get_system_guidance=Mock(return_value='sandbox guidance'),
),
skill_mgr=SimpleNamespace(
get_skills_for_pipeline=AsyncMock(return_value=[]),
detect_skill_activation=AsyncMock(return_value=None),
build_activation_prompt=Mock(return_value=None),
),
)
runner = LocalAgentRunner(app, pipeline_config={})
results = [message async for message in runner.run(query)]
assert all(isinstance(message, provider_message.MessageChunk) for message in results)
assert any(message.role == 'tool' and message.content == 'err: boom' for message in results)