mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-07 14:26:03 +00:00
feat(agent-runner): enrich plugin runner host context
This commit is contained in:
@@ -122,6 +122,7 @@ class AgentRunContextV1(typing.TypedDict):
|
||||
actor: dict[str, typing.Any] | None
|
||||
subject: dict[str, typing.Any] | None
|
||||
messages: list[dict[str, typing.Any]]
|
||||
prompt: list[dict[str, typing.Any]]
|
||||
input: AgentInput
|
||||
params: dict[str, typing.Any]
|
||||
resources: AgentResources
|
||||
@@ -221,6 +222,9 @@ class AgentRunContextBuilder:
|
||||
descriptor.id,
|
||||
)
|
||||
|
||||
streaming_supported = await self._is_stream_output_supported(query)
|
||||
remove_think = query.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False)
|
||||
|
||||
# Build runtime context
|
||||
runtime: AgentRuntimeContext = {
|
||||
'langbot_version': self.ap.ver_mgr.get_current_version(),
|
||||
@@ -231,6 +235,8 @@ class AgentRunContextBuilder:
|
||||
'metadata': {
|
||||
'bot_name': query.variables.get('_monitoring_bot_name', 'Unknown'),
|
||||
'pipeline_name': query.variables.get('_monitoring_pipeline_name', 'Unknown'),
|
||||
'streaming_supported': streaming_supported,
|
||||
'remove_think': remove_think,
|
||||
},
|
||||
}
|
||||
|
||||
@@ -243,6 +249,7 @@ class AgentRunContextBuilder:
|
||||
'actor': self._build_actor(query),
|
||||
'subject': self._build_subject(query),
|
||||
'messages': messages,
|
||||
'prompt': self._build_prompt(query),
|
||||
'input': input,
|
||||
'params': params,
|
||||
'resources': resources,
|
||||
@@ -256,6 +263,7 @@ class AgentRunContextBuilder:
|
||||
def _build_input(self, query: pipeline_query.Query) -> AgentInput:
|
||||
"""Build AgentInput from query."""
|
||||
text = None
|
||||
text_parts: list[str] = []
|
||||
contents: list[dict[str, typing.Any]] = []
|
||||
|
||||
if query.user_message:
|
||||
@@ -264,12 +272,17 @@ class AgentRunContextBuilder:
|
||||
for elem in query.user_message.content:
|
||||
contents.append(elem.model_dump(mode='json'))
|
||||
if elem.type == 'text':
|
||||
text = getattr(elem, 'text', None)
|
||||
elem_text = getattr(elem, 'text', None)
|
||||
if elem_text:
|
||||
text_parts.append(elem_text)
|
||||
else:
|
||||
# Single string content
|
||||
text = str(query.user_message.content)
|
||||
contents.append({'type': 'text', 'text': text})
|
||||
|
||||
if text_parts:
|
||||
text = ''.join(text_parts)
|
||||
|
||||
# Include message_chain for platform-specific format
|
||||
message_chain_dict = None
|
||||
if query.message_chain:
|
||||
@@ -473,6 +486,29 @@ class AgentRunContextBuilder:
|
||||
|
||||
return int(time.time() + timeout_seconds)
|
||||
|
||||
async def _is_stream_output_supported(self, query: pipeline_query.Query) -> bool:
|
||||
"""Check whether the current adapter can consume streaming chunks."""
|
||||
try:
|
||||
return await query.adapter.is_stream_output_supported()
|
||||
except AttributeError:
|
||||
return False
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
def _build_prompt(self, query: pipeline_query.Query) -> list[dict[str, typing.Any]]:
|
||||
"""Build effective prompt messages from query.prompt after preprocessing."""
|
||||
prompt_messages: list[dict[str, typing.Any]] = []
|
||||
|
||||
prompt = getattr(query, 'prompt', None)
|
||||
messages = getattr(prompt, 'messages', None)
|
||||
if not messages:
|
||||
return prompt_messages
|
||||
|
||||
for msg in messages:
|
||||
prompt_messages.append(msg.model_dump(mode='json'))
|
||||
|
||||
return prompt_messages
|
||||
|
||||
def _build_messages(self, query: pipeline_query.Query) -> list[dict[str, typing.Any]]:
|
||||
"""Build messages list from query."""
|
||||
messages: list[dict[str, typing.Any]] = []
|
||||
|
||||
@@ -71,7 +71,7 @@ class AgentResourceBuilder:
|
||||
|
||||
# Build each resource category in parallel
|
||||
models, tools, knowledge_bases = await asyncio.gather(
|
||||
self._build_models(manifest_perms, query),
|
||||
self._build_models(manifest_perms, runner_config, descriptor, query),
|
||||
self._build_tools(manifest_perms, bound_plugins, bound_mcp_servers, query),
|
||||
self._build_knowledge_bases(manifest_perms, runner_config, query),
|
||||
)
|
||||
@@ -89,10 +89,13 @@ class AgentResourceBuilder:
|
||||
async def _build_models(
|
||||
self,
|
||||
manifest_perms: dict[str, list[str]],
|
||||
runner_config: dict[str, typing.Any],
|
||||
descriptor: AgentRunnerDescriptor,
|
||||
query: typing.Any,
|
||||
) -> list[ModelResource]:
|
||||
"""Build models list with SDK v1 field names."""
|
||||
models: list[ModelResource] = []
|
||||
seen_model_ids: set[str] = set()
|
||||
|
||||
# Check manifest permission
|
||||
model_perms = manifest_perms.get('models', [])
|
||||
@@ -101,8 +104,72 @@ class AgentResourceBuilder:
|
||||
|
||||
# Get model from query (preproc already resolved this)
|
||||
model_uuid = getattr(query, 'use_llm_model_uuid', None)
|
||||
if not model_uuid:
|
||||
return models
|
||||
if model_uuid:
|
||||
await self._append_llm_model_resource(models, seen_model_ids, model_uuid)
|
||||
|
||||
# Add fallback models if present
|
||||
fallback_uuids = query.variables.get('_fallback_model_uuids', [])
|
||||
for fb_uuid in fallback_uuids:
|
||||
await self._append_llm_model_resource(models, seen_model_ids, fb_uuid)
|
||||
|
||||
# Add model resources referenced by the runner binding config schema.
|
||||
# This makes authorization generic for AgentRunner plugins instead of
|
||||
# hard-coding only local-agent's primary/fallback model path.
|
||||
await self._append_config_declared_model_resources(
|
||||
models=models,
|
||||
seen_model_ids=seen_model_ids,
|
||||
descriptor=descriptor,
|
||||
runner_config=runner_config,
|
||||
)
|
||||
|
||||
return models
|
||||
|
||||
async def _append_config_declared_model_resources(
|
||||
self,
|
||||
models: list[ModelResource],
|
||||
seen_model_ids: set[str],
|
||||
descriptor: AgentRunnerDescriptor,
|
||||
runner_config: dict[str, typing.Any],
|
||||
) -> None:
|
||||
"""Authorize model-like values selected through DynamicForm fields."""
|
||||
for item in descriptor.config_schema or []:
|
||||
if not isinstance(item, dict):
|
||||
continue
|
||||
|
||||
field_name = item.get('name')
|
||||
field_type = item.get('type')
|
||||
if not field_name or field_name not in runner_config:
|
||||
continue
|
||||
|
||||
value = runner_config.get(field_name)
|
||||
if field_type == 'model-fallback-selector':
|
||||
if isinstance(value, str):
|
||||
await self._append_llm_model_resource(models, seen_model_ids, value)
|
||||
elif isinstance(value, dict):
|
||||
primary = value.get('primary')
|
||||
if isinstance(primary, str):
|
||||
await self._append_llm_model_resource(models, seen_model_ids, primary)
|
||||
fallbacks = value.get('fallbacks', [])
|
||||
if isinstance(fallbacks, list):
|
||||
for fallback_uuid in fallbacks:
|
||||
if isinstance(fallback_uuid, str):
|
||||
await self._append_llm_model_resource(models, seen_model_ids, fallback_uuid)
|
||||
elif field_type == 'llm-model-selector':
|
||||
if isinstance(value, str):
|
||||
await self._append_llm_model_resource(models, seen_model_ids, value)
|
||||
elif field_type == 'rerank-model-selector':
|
||||
if isinstance(value, str):
|
||||
await self._append_rerank_model_resource(models, seen_model_ids, value)
|
||||
|
||||
async def _append_llm_model_resource(
|
||||
self,
|
||||
models: list[ModelResource],
|
||||
seen_model_ids: set[str],
|
||||
model_uuid: str | None,
|
||||
) -> None:
|
||||
"""Append an LLM model resource if it exists and has not been added."""
|
||||
if not model_uuid or model_uuid == '__none__' or model_uuid in seen_model_ids:
|
||||
return
|
||||
|
||||
try:
|
||||
model = await self.ap.model_mgr.get_model_by_uuid(model_uuid)
|
||||
@@ -112,24 +179,31 @@ class AgentResourceBuilder:
|
||||
'model_type': getattr(model.model_entity, 'model_type', None),
|
||||
'provider': getattr(model.provider_entity, 'name', None) if hasattr(model, 'provider_entity') else None,
|
||||
})
|
||||
seen_model_ids.add(model_uuid)
|
||||
except Exception as e:
|
||||
self.ap.logger.warning(f'Failed to build model resource {model_uuid}: {e}')
|
||||
self.ap.logger.warning(f'Failed to build LLM model resource {model_uuid}: {e}')
|
||||
|
||||
# Add fallback models if present
|
||||
fallback_uuids = query.variables.get('_fallback_model_uuids', [])
|
||||
for fb_uuid in fallback_uuids:
|
||||
try:
|
||||
model = await self.ap.model_mgr.get_model_by_uuid(fb_uuid)
|
||||
if model and model.model_entity:
|
||||
models.append({
|
||||
'model_id': fb_uuid,
|
||||
'model_type': model.model_entity.model_type,
|
||||
'provider': model.provider_entity.name if hasattr(model, 'provider_entity') else None,
|
||||
})
|
||||
except Exception as e:
|
||||
self.ap.logger.warning(f'Failed to build fallback model resource {fb_uuid}: {e}')
|
||||
async def _append_rerank_model_resource(
|
||||
self,
|
||||
models: list[ModelResource],
|
||||
seen_model_ids: set[str],
|
||||
model_uuid: str | None,
|
||||
) -> None:
|
||||
"""Append a rerank model resource if it exists and has not been added."""
|
||||
if not model_uuid or model_uuid == '__none__' or model_uuid in seen_model_ids:
|
||||
return
|
||||
|
||||
return models
|
||||
try:
|
||||
model = await self.ap.model_mgr.get_rerank_model_by_uuid(model_uuid)
|
||||
if model and model.model_entity:
|
||||
models.append({
|
||||
'model_id': model_uuid,
|
||||
'model_type': getattr(model.model_entity, 'model_type', 'rerank') or 'rerank',
|
||||
'provider': getattr(model.provider_entity, 'name', None) if hasattr(model, 'provider_entity') else None,
|
||||
})
|
||||
seen_model_ids.add(model_uuid)
|
||||
except Exception as e:
|
||||
self.ap.logger.warning(f'Failed to build rerank model resource {model_uuid}: {e}')
|
||||
|
||||
async def _build_tools(
|
||||
self,
|
||||
|
||||
@@ -154,6 +154,51 @@ async def _validate_run_authorization(
|
||||
return session, None
|
||||
|
||||
|
||||
def _get_cached_query(ap: app.Application, query_id: int | None) -> Any | None:
|
||||
"""Return a cached pipeline Query for runtime actions when available."""
|
||||
if query_id is None:
|
||||
return None
|
||||
|
||||
try:
|
||||
return ap.query_pool.cached_queries.get(query_id)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
|
||||
def _resolve_action_query(data: dict[str, Any], session: Any | None, ap: app.Application) -> Any | None:
|
||||
"""Resolve the current Query from an AgentRunner session or action payload."""
|
||||
query_id = None
|
||||
if session:
|
||||
query_id = session.get('query_id')
|
||||
if query_id is None:
|
||||
query_id = data.get('query_id')
|
||||
return _get_cached_query(ap, query_id)
|
||||
|
||||
|
||||
def _resolve_remove_think(data: dict[str, Any], query: Any | None) -> bool:
|
||||
"""Resolve remove-think using explicit action override, then pipeline config."""
|
||||
if 'remove_think' in data:
|
||||
return bool(data.get('remove_think'))
|
||||
|
||||
if query and getattr(query, 'pipeline_config', None):
|
||||
return bool(query.pipeline_config.get('output', {}).get('misc', {}).get('remove-think', False))
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _merge_model_extra_args(model: Any, call_extra_args: Any) -> dict[str, Any]:
|
||||
"""Merge persisted model extra_args with action-level overrides."""
|
||||
merged: dict[str, Any] = {}
|
||||
|
||||
model_extra_args = getattr(getattr(model, 'model_entity', None), 'extra_args', None)
|
||||
if isinstance(model_extra_args, dict):
|
||||
merged.update(model_extra_args)
|
||||
if isinstance(call_extra_args, dict):
|
||||
merged.update(call_extra_args)
|
||||
|
||||
return merged
|
||||
|
||||
|
||||
class RuntimeConnectionHandler(handler.Handler):
|
||||
"""Runtime connection handler"""
|
||||
|
||||
@@ -449,6 +494,7 @@ class RuntimeConnectionHandler(handler.Handler):
|
||||
extra_args = data.get('extra_args', {})
|
||||
run_id = data.get('run_id') # Optional: present for AgentRunner calls
|
||||
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
|
||||
session = None
|
||||
|
||||
# Permission validation for AgentRunner calls
|
||||
if run_id:
|
||||
@@ -473,13 +519,18 @@ class RuntimeConnectionHandler(handler.Handler):
|
||||
pass
|
||||
|
||||
funcs_obj = [resource_tool.LLMTool.model_validate({**func, 'func': _placeholder_func}) for func in funcs]
|
||||
query = _resolve_action_query(data, session, self.ap)
|
||||
effective_extra_args = _merge_model_extra_args(llm_model, extra_args)
|
||||
remove_think = _resolve_remove_think(data, query)
|
||||
effective_funcs = funcs_obj if 'func_call' in (llm_model.model_entity.abilities or []) else []
|
||||
|
||||
result = await llm_model.provider.invoke_llm(
|
||||
query=None,
|
||||
query=query,
|
||||
model=llm_model,
|
||||
messages=messages_obj,
|
||||
funcs=funcs_obj,
|
||||
extra_args=extra_args,
|
||||
funcs=effective_funcs,
|
||||
extra_args=effective_extra_args,
|
||||
remove_think=remove_think,
|
||||
)
|
||||
|
||||
return handler.ActionResponse.success(
|
||||
@@ -501,6 +552,7 @@ class RuntimeConnectionHandler(handler.Handler):
|
||||
extra_args = data.get('extra_args', {})
|
||||
run_id = data.get('run_id') # Optional: present for AgentRunner calls
|
||||
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
|
||||
session = None
|
||||
|
||||
# Permission validation for AgentRunner calls
|
||||
if run_id:
|
||||
@@ -526,13 +578,18 @@ class RuntimeConnectionHandler(handler.Handler):
|
||||
pass
|
||||
|
||||
funcs_obj = [resource_tool.LLMTool.model_validate({**func, 'func': _placeholder_func}) for func in funcs]
|
||||
query = _resolve_action_query(data, session, self.ap)
|
||||
effective_extra_args = _merge_model_extra_args(llm_model, extra_args)
|
||||
remove_think = _resolve_remove_think(data, query)
|
||||
effective_funcs = funcs_obj if 'func_call' in (llm_model.model_entity.abilities or []) else []
|
||||
|
||||
async for chunk in llm_model.provider.invoke_llm_stream(
|
||||
query=None,
|
||||
query=query,
|
||||
model=llm_model,
|
||||
messages=messages_obj,
|
||||
funcs=funcs_obj,
|
||||
extra_args=extra_args,
|
||||
funcs=effective_funcs,
|
||||
extra_args=effective_extra_args,
|
||||
remove_think=remove_think,
|
||||
):
|
||||
yield handler.ActionResponse.success(
|
||||
data={
|
||||
@@ -558,6 +615,7 @@ class RuntimeConnectionHandler(handler.Handler):
|
||||
caller_plugin_identity = data.get('caller_plugin_identity') # Optional: for cross-plugin validation
|
||||
# session_data = data['session']
|
||||
# query_id = data['query_id']
|
||||
session = None
|
||||
|
||||
# Permission validation for AgentRunner calls
|
||||
if run_id:
|
||||
@@ -571,10 +629,11 @@ class RuntimeConnectionHandler(handler.Handler):
|
||||
# In real implementation, you would reconstruct the full session
|
||||
# For now, we'll call the tool manager's execute method
|
||||
try:
|
||||
query = _resolve_action_query(data, session, self.ap)
|
||||
result = await self.ap.tool_mgr.execute_func_call(
|
||||
name=tool_name,
|
||||
parameters=parameters,
|
||||
query=None, # TODO: reconstruct query from session_data if needed
|
||||
query=query,
|
||||
)
|
||||
# Return both 'tool_response' (LangBotAPIProxy) and 'result' (AgentRunAPIProxy)
|
||||
# LangBotAPIProxy expects 'tool_response', AgentRunAPIProxy expects 'result'
|
||||
@@ -872,10 +931,11 @@ class RuntimeConnectionHandler(handler.Handler):
|
||||
query = data['query']
|
||||
documents = data['documents']
|
||||
top_k = data.get('top_k')
|
||||
caller_plugin_identity = data.get('caller_plugin_identity')
|
||||
|
||||
# Validate run authorization
|
||||
session, error = await _validate_run_authorization(
|
||||
run_id, 'model', rerank_model_uuid, self.ap
|
||||
run_id, 'model', rerank_model_uuid, self.ap, caller_plugin_identity
|
||||
)
|
||||
if error:
|
||||
return error
|
||||
@@ -895,6 +955,7 @@ class RuntimeConnectionHandler(handler.Handler):
|
||||
model=rerank_model,
|
||||
query=query,
|
||||
documents=documents_capped,
|
||||
extra_args=_merge_model_extra_args(rerank_model, data.get('extra_args', {})),
|
||||
)
|
||||
|
||||
# Sort by relevance score descending
|
||||
|
||||
Reference in New Issue
Block a user