feat: add MCP server selection to pipeline extensions (#1754)

* Initial plan

* Backend: Add MCP server selection support to pipeline extensions

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* Frontend: Add MCP server selection UI to pipeline extensions

Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>

* perf: ui

* perf: ui

* perf: desc for extension page

---------

Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com>
Co-authored-by: RockChinQ <45992437+RockChinQ@users.noreply.github.com>
Co-authored-by: Junyan Qin <rockchinq@gmail.com>
This commit is contained in:
Copilot
2025-11-06 19:38:12 +08:00
committed by GitHub
parent 68eb0290e0
commit cb48221ed3
12 changed files with 334 additions and 84 deletions

View File

@@ -56,18 +56,24 @@ class PipelinesRouterGroup(group.RouterGroup):
return self.http_status(404, -1, 'pipeline not found')
plugins = await self.ap.plugin_connector.list_plugins()
mcp_servers = await self.ap.mcp_service.get_mcp_servers(contain_runtime_info=True)
return self.success(
data={
'bound_plugins': pipeline.get('extensions_preferences', {}).get('plugins', []),
'available_plugins': plugins,
'bound_mcp_servers': pipeline.get('extensions_preferences', {}).get('mcp_servers', []),
'available_mcp_servers': mcp_servers,
}
)
elif quart.request.method == 'PUT':
# Update bound plugins for this pipeline
# Update bound plugins and MCP servers for this pipeline
json_data = await quart.request.json
bound_plugins = json_data.get('bound_plugins', [])
bound_mcp_servers = json_data.get('bound_mcp_servers', [])
await self.ap.pipeline_service.update_pipeline_extensions(pipeline_uuid, bound_plugins)
await self.ap.pipeline_service.update_pipeline_extensions(
pipeline_uuid, bound_plugins, bound_mcp_servers
)
return self.success()

View File

@@ -137,8 +137,8 @@ class PipelineService:
)
await self.ap.pipeline_mgr.remove_pipeline(pipeline_uuid)
async def update_pipeline_extensions(self, pipeline_uuid: str, bound_plugins: list[dict]) -> None:
"""Update the bound plugins for a pipeline"""
async def update_pipeline_extensions(self, pipeline_uuid: str, bound_plugins: list[dict], bound_mcp_servers: list[str] = None) -> None:
"""Update the bound plugins and MCP servers for a pipeline"""
# Get current pipeline
result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.select(persistence_pipeline.LegacyPipeline).where(
@@ -153,6 +153,8 @@ class PipelineService:
# Update extensions_preferences
extensions_preferences = pipeline.extensions_preferences or {}
extensions_preferences['plugins'] = bound_plugins
if bound_mcp_servers is not None:
extensions_preferences['mcp_servers'] = bound_mcp_servers
await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(persistence_pipeline.LegacyPipeline)

View File

@@ -71,6 +71,9 @@ class RuntimePipeline:
bound_plugins: list[str]
"""绑定到此流水线的插件列表格式author/plugin_name"""
bound_mcp_servers: list[str]
"""绑定到此流水线的MCP服务器列表格式uuid"""
def __init__(
self,
@@ -82,15 +85,19 @@ class RuntimePipeline:
self.pipeline_entity = pipeline_entity
self.stage_containers = stage_containers
# Extract bound plugins from extensions_preferences
# Extract bound plugins and MCP servers from extensions_preferences
extensions_prefs = pipeline_entity.extensions_preferences or {}
plugin_list = extensions_prefs.get('plugins', [])
self.bound_plugins = [f"{p['author']}/{p['name']}" for p in plugin_list] if plugin_list else []
mcp_server_list = extensions_prefs.get('mcp_servers', [])
self.bound_mcp_servers = mcp_server_list if mcp_server_list else []
async def run(self, query: pipeline_query.Query):
query.pipeline_config = self.pipeline_entity.config
# Store bound plugins in query for filtering
# Store bound plugins and MCP servers in query for filtering
query.variables['_pipeline_bound_plugins'] = self.bound_plugins
query.variables['_pipeline_bound_mcp_servers'] = self.bound_mcp_servers
await self.process_query(query)
async def _check_output(self, query: pipeline_query.Query, result: pipeline_entities.StageProcessResult):

View File

@@ -65,9 +65,14 @@ class PreProcessor(stage.PipelineStage):
query.use_llm_model_uuid = llm_model.model_entity.uuid
if llm_model.model_entity.abilities.__contains__('func_call'):
# Get bound plugins for filtering tools
# Get bound plugins and MCP servers for filtering tools
bound_plugins = query.variables.get('_pipeline_bound_plugins', None)
query.use_funcs = await self.ap.tool_mgr.get_all_tools(bound_plugins)
bound_mcp_servers = query.variables.get('_pipeline_bound_mcp_servers', None)
query.use_funcs = await self.ap.tool_mgr.get_all_tools(bound_plugins, bound_mcp_servers)
self.ap.logger.debug(f'Bound plugins: {bound_plugins}')
self.ap.logger.debug(f'Bound MCP servers: {bound_mcp_servers}')
self.ap.logger.debug(f'Use funcs: {query.use_funcs}')
variables = {
'session_id': f'{query.session.launcher_type.value}_{query.session.launcher_id}',

View File

@@ -30,6 +30,8 @@ class RuntimeMCPSession:
server_name: str
server_uuid: str
server_config: dict
session: ClientSession
@@ -43,7 +45,6 @@ class RuntimeMCPSession:
# connected: bool
status: MCPSessionStatus
_lifecycle_task: asyncio.Task | None
_shutdown_event: asyncio.Event
@@ -52,6 +53,7 @@ class RuntimeMCPSession:
def __init__(self, server_name: str, server_config: dict, enable: bool, ap: app.Application):
self.server_name = server_name
self.server_uuid = server_config.get('uuid', '')
self.server_config = server_config
self.ap = ap
self.enable = enable
@@ -286,12 +288,14 @@ class MCPLoader(loader.ToolLoader):
"""
name = server_config['name']
uuid = server_config['uuid']
mode = server_config['mode']
enable = server_config['enable']
extra_args = server_config.get('extra_args', {})
mixed_config = {
'name': name,
'uuid': uuid,
'mode': mode,
'enable': enable,
**extra_args,
@@ -301,11 +305,17 @@ class MCPLoader(loader.ToolLoader):
return session
async def get_tools(self, bound_plugins: list[str] | None = None) -> list[resource_tool.LLMTool]:
async def get_tools(self, bound_mcp_servers: list[str] | None = None) -> list[resource_tool.LLMTool]:
all_functions = []
for session in self.sessions.values():
all_functions.extend(session.get_tools())
# If bound_mcp_servers is specified, only include tools from those servers
if bound_mcp_servers is not None:
if session.server_uuid in bound_mcp_servers:
all_functions.extend(session.get_tools())
else:
# If no bound servers specified, include all tools
all_functions.extend(session.get_tools())
self._last_listed_functions = all_functions

View File

@@ -28,12 +28,12 @@ class ToolManager:
self.mcp_tool_loader = mcp_loader.MCPLoader(self.ap)
await self.mcp_tool_loader.initialize()
async def get_all_tools(self, bound_plugins: list[str] | None = None) -> list[resource_tool.LLMTool]:
async def get_all_tools(self, bound_plugins: list[str] | None = None, bound_mcp_servers: list[str] | None = None) -> list[resource_tool.LLMTool]:
"""获取所有函数"""
all_functions: list[resource_tool.LLMTool] = []
all_functions.extend(await self.plugin_tool_loader.get_tools(bound_plugins))
all_functions.extend(await self.mcp_tool_loader.get_tools(bound_plugins))
all_functions.extend(await self.mcp_tool_loader.get_tools(bound_mcp_servers))
return all_functions