perf: mcp server status checking logic

This commit is contained in:
Junyan Qin
2025-11-04 17:32:05 +08:00
parent 3ee7736361
commit 1afecf01e4
11 changed files with 182 additions and 90 deletions

View File

@@ -3,6 +3,7 @@ from __future__ import annotations
import sqlalchemy
import uuid
import traceback
import asyncio
from ....core import app
from ....entity.persistence import mcp as persistence_mcp
@@ -124,7 +125,6 @@ class MCPService:
async def create_mcp_server(self, server_data: dict) -> str:
server_data['uuid'] = str(uuid.uuid4())
print('server_data:', server_data)
await self.ap.persistence_mgr.execute_async(sqlalchemy.insert(persistence_mcp.MCPServer).values(server_data))
result = await self.ap.persistence_mgr.execute_async(
@@ -134,7 +134,8 @@ class MCPService:
if server_entity:
server_config = self.ap.persistence_mgr.serialize_model(persistence_mcp.MCPServer, server_entity)
if self.ap.tool_mgr.mcp_tool_loader:
await self.ap.tool_mgr.mcp_tool_loader.load_mcp_server(server_config)
task = asyncio.create_task(self.ap.tool_mgr.mcp_tool_loader.host_mcp_server(server_config))
self.ap.tool_mgr.mcp_tool_loader._hosted_mcp_tasks.append(task)
return server_data['uuid']
@@ -175,7 +176,9 @@ class MCPService:
if updated_server:
# convert entity to config dict
server_config = self.ap.persistence_mgr.serialize_model(persistence_mcp.MCPServer, updated_server)
await self.ap.tool_mgr.mcp_tool_loader.load_mcp_server(server_config)
# await self.ap.tool_mgr.mcp_tool_loader.host_mcp_server(server_config)
task = asyncio.create_task(self.ap.tool_mgr.mcp_tool_loader.host_mcp_server(server_config))
self.ap.tool_mgr.mcp_tool_loader._hosted_mcp_tasks.append(task)
async def delete_mcp_server(self, server_uuid: str) -> None:
result = await self.ap.persistence_mgr.execute_async(

View File

@@ -1,5 +1,6 @@
from __future__ import annotations
import enum
import typing
from contextlib import AsyncExitStack
import traceback
@@ -16,6 +17,12 @@ import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
from ....entity.persistence import mcp as persistence_mcp
class MCPSessionStatus(enum.Enum):
CONNECTING = 'connecting'
CONNECTED = 'connected'
ERROR = 'error'
class RuntimeMCPSession:
"""运行时 MCP 会话"""
@@ -33,7 +40,8 @@ class RuntimeMCPSession:
enable: bool
connected: bool
# connected: bool
status: MCPSessionStatus
last_test_error_message: str
@@ -47,7 +55,7 @@ class RuntimeMCPSession:
self.exit_stack = AsyncExitStack()
self.functions = []
self.connected = False
self.status = MCPSessionStatus.CONNECTING
self.last_test_error_message = ''
async def _init_stdio_python_server(self):
@@ -117,10 +125,10 @@ class RuntimeMCPSession:
)
)
self.connected = True
self.status = MCPSessionStatus.CONNECTED
self.last_test_error_message = ''
except Exception as e:
self.connected = False
self.status = MCPSessionStatus.ERROR
self.last_test_error_message = str(e)
raise e
@@ -129,7 +137,7 @@ class RuntimeMCPSession:
def get_runtime_info_dict(self) -> dict:
return {
'connected': self.connected,
'status': self.status.value,
'error_message': self.last_test_error_message,
'tool_count': len(self.get_tools()),
'tools': [
@@ -163,13 +171,13 @@ class MCPLoader(loader.ToolLoader):
_last_listed_functions: list[resource_tool.LLMTool]
_startup_load_tasks: list[asyncio.Task]
_hosted_mcp_tasks: list[asyncio.Task]
def __init__(self, ap: app.Application):
super().__init__(ap)
self.sessions = {}
self._last_listed_functions = []
self._startup_load_tasks = []
self._hosted_mcp_tasks = []
async def initialize(self):
await self.load_mcp_servers_from_db()
@@ -185,30 +193,30 @@ class MCPLoader(loader.ToolLoader):
for server in servers:
config = self.ap.persistence_mgr.serialize_model(persistence_mcp.MCPServer, server)
async def load_mcp_server_task(server_config: dict):
self.ap.logger.debug(f'Loading MCP server {server_config}')
try:
session = await self.load_mcp_server(server_config)
self.sessions[server_config['name']] = session
except Exception as e:
self.ap.logger.error(
f'Failed to load MCP server from db: {server_config["name"]}({server_config["uuid"]}): {e}\n{traceback.format_exc()}'
)
return
task = asyncio.create_task(self.host_mcp_server(config))
self._hosted_mcp_tasks.append(task)
self.ap.logger.debug(f'Starting MCP server {server_config["name"]}({server_config["uuid"]})')
try:
await session.start()
except Exception as e:
self.ap.logger.error(
f'Failed to start MCP server {server_config["name"]}({server_config["uuid"]}): {e}\n{traceback.format_exc()}'
)
return
async def host_mcp_server(self, server_config: dict):
self.ap.logger.debug(f'Loading MCP server {server_config}')
try:
session = await self.load_mcp_server(server_config)
self.sessions[server_config['name']] = session
except Exception as e:
self.ap.logger.error(
f'Failed to load MCP server from db: {server_config["name"]}({server_config["uuid"]}): {e}\n{traceback.format_exc()}'
)
return
self.ap.logger.debug(f'Started MCP server {server_config["name"]}({server_config["uuid"]})')
self.ap.logger.debug(f'Starting MCP server {server_config["name"]}({server_config["uuid"]})')
try:
await session.start()
except Exception as e:
self.ap.logger.error(
f'Failed to start MCP server {server_config["name"]}({server_config["uuid"]}): {e}\n{traceback.format_exc()}'
)
return
task = asyncio.create_task(load_mcp_server_task(config))
self._startup_load_tasks.append(task)
self.ap.logger.debug(f'Started MCP server {server_config["name"]}({server_config["uuid"]})')
async def load_mcp_server(self, server_config: dict) -> RuntimeMCPSession:
"""加载 MCP 服务器到运行时
@@ -271,20 +279,6 @@ class MCPLoader(loader.ToolLoader):
raise ValueError(f'Tool not found: {name}')
async def reload_mcp_server(self, server_config: dict):
"""重新加载 MCP 服务器(先移除再加载)
Args:
server_config: 服务器配置字典,必须包含 name 字段
"""
server_name = server_config['name']
if server_name in self.sessions:
await self.remove_mcp_server(server_name)
# 重新加载
await self.load_mcp_server(server_config)
async def remove_mcp_server(self, server_name: str):
"""移除 MCP 服务器"""
if server_name not in self.sessions: