From 88132dff8ae82912d148299b24cddc61760258c6 Mon Sep 17 00:00:00 2001 From: Junyan Chin Date: Fri, 27 Feb 2026 20:09:03 +0800 Subject: [PATCH] perf: reduce memory usage by ~200MB+ at startup (#2013) * perf: reduce memory usage by ~200MB+ at startup Two key optimizations: 1. Use importlib.util.find_spec() instead of __import__() in dependency checking. find_spec() only locates modules without executing them, avoiding loading all 36 dependencies (~222MB) into memory at startup. 2. Introduce shared aiohttp.ClientSession via httpclient module. Previously, every HTTP request created a new ClientSession, which creates a new TCPConnector and SSL context, loading system root certificates each time (~270MB total allocations observed via memray). Now all HTTP client code reuses shared sessions. - satori.py and coze_server_api/client.py are left unchanged as they create one session per adapter lifecycle (not per-request). Profiling data (memray): - Peak memory: 403MB - SSL context creation: 270MB / 6.7M allocations (67% of total) - Dependency import: 222MB (55% of peak) - Expected reduction: 150-350MB at startup * fix: remove unused aiohttp imports (ruff F401) * style: ruff format --- .../libs/wechatpad_api/util/http_util.py | 28 ++-- src/langbot/pkg/api/http/service/space.py | 82 +++++----- src/langbot/pkg/core/bootutils/deps.py | 8 +- .../cntfilter/filters/baiduexamine.py | 83 +++++----- src/langbot/pkg/platform/sources/discord.py | 60 +++---- src/langbot/pkg/platform/sources/kook.py | 68 ++++---- src/langbot/pkg/platform/sources/lark.py | 24 +-- .../pkg/platform/sources/legacy/gewechat.py | 18 +-- src/langbot/pkg/platform/sources/telegram.py | 20 ++- src/langbot/pkg/platform/webhook_pusher.py | 34 ++-- src/langbot/pkg/provider/runners/n8nsvapi.py | 76 ++++----- src/langbot/pkg/utils/httpclient.py | 43 +++++ src/langbot/pkg/utils/image.py | 147 +++++++++--------- 13 files changed, 370 insertions(+), 321 deletions(-) create mode 100644 src/langbot/pkg/utils/httpclient.py diff --git a/src/langbot/libs/wechatpad_api/util/http_util.py b/src/langbot/libs/wechatpad_api/util/http_util.py index 447c29df..7390f43e 100644 --- a/src/langbot/libs/wechatpad_api/util/http_util.py +++ b/src/langbot/libs/wechatpad_api/util/http_util.py @@ -1,5 +1,5 @@ import requests -import aiohttp +from langbot.pkg.utils import httpclient def post_json(base_url, token, data=None): @@ -63,16 +63,16 @@ async def async_request( """ headers = {'Content-Type': 'application/json'} url = f'{base_url}?key={token_key}' - async with aiohttp.ClientSession() as session: - async with session.request( - method=method, url=url, params=params, headers=headers, data=data, json=json - ) as response: - response.raise_for_status() # 如果状态码不是200,抛出异常 - result = await response.json() - # print(result) - return result - # if result.get('Code') == 200: - # - # return await result - # else: - # raise RuntimeError("请求失败",response.text) + session = httpclient.get_session() + async with session.request( + method=method, url=url, params=params, headers=headers, data=data, json=json + ) as response: + response.raise_for_status() # 如果状态码不是200,抛出异常 + result = await response.json() + # print(result) + return result + # if result.get('Code') == 200: + # + # return await result + # else: + # raise RuntimeError("请求失败",response.text) diff --git a/src/langbot/pkg/api/http/service/space.py b/src/langbot/pkg/api/http/service/space.py index cd694883..c05e4896 100644 --- a/src/langbot/pkg/api/http/service/space.py +++ b/src/langbot/pkg/api/http/service/space.py @@ -1,6 +1,6 @@ from __future__ import annotations -import aiohttp +from langbot.pkg.utils import httpclient import typing import datetime import time @@ -99,49 +99,49 @@ class SpaceService: space_config = self._get_space_config() space_url = space_config['url'] - async with aiohttp.ClientSession() as session: - async with session.post( - f'{space_url}/api/v1/accounts/oauth/token', - json={'code': code, 'instance_id': constants.instance_id}, - ) as response: - if response.status != 200: - raise ValueError(f'Failed to exchange OAuth code: {await response.text()}') - data = await response.json() - if data.get('code') != 0: - raise ValueError(f'Failed to exchange OAuth code: {data.get("msg")}') - return data.get('data', {}) + session = httpclient.get_session() + async with session.post( + f'{space_url}/api/v1/accounts/oauth/token', + json={'code': code, 'instance_id': constants.instance_id}, + ) as response: + if response.status != 200: + raise ValueError(f'Failed to exchange OAuth code: {await response.text()}') + data = await response.json() + if data.get('code') != 0: + raise ValueError(f'Failed to exchange OAuth code: {data.get("msg")}') + return data.get('data', {}) async def refresh_token(self, refresh_token: str) -> typing.Dict: """Refresh Space access token""" space_config = self._get_space_config() space_url = space_config['url'] - async with aiohttp.ClientSession() as session: - async with session.post( - f'{space_url}/api/v1/accounts/token/refresh', json={'refresh_token': refresh_token} - ) as response: - if response.status != 200: - raise ValueError(f'Failed to refresh token: {await response.text()}') - data = await response.json() - if data.get('code') != 0: - raise ValueError(f'Failed to refresh token: {data.get("msg")}') - return data.get('data', {}) + session = httpclient.get_session() + async with session.post( + f'{space_url}/api/v1/accounts/token/refresh', json={'refresh_token': refresh_token} + ) as response: + if response.status != 200: + raise ValueError(f'Failed to refresh token: {await response.text()}') + data = await response.json() + if data.get('code') != 0: + raise ValueError(f'Failed to refresh token: {data.get("msg")}') + return data.get('data', {}) async def get_user_info_raw(self, access_token: str) -> typing.Dict: """Get user info from Space using access token (no validation)""" space_config = self._get_space_config() space_url = space_config['url'] - async with aiohttp.ClientSession() as session: - async with session.get( - f'{space_url}/api/v1/accounts/me', headers={'Authorization': f'Bearer {access_token}'} - ) as response: - if response.status != 200: - raise ValueError(f'Failed to get user info: {await response.text()}') - data = await response.json() - if data.get('code') != 0: - raise ValueError(f'Failed to get user info: {data.get("msg")}') - return data.get('data', {}) + session = httpclient.get_session() + async with session.get( + f'{space_url}/api/v1/accounts/me', headers={'Authorization': f'Bearer {access_token}'} + ) as response: + if response.status != 200: + raise ValueError(f'Failed to get user info: {await response.text()}') + data = await response.json() + if data.get('code') != 0: + raise ValueError(f'Failed to get user info: {data.get("msg")}') + return data.get('data', {}) # === API calls with token validation === @@ -178,12 +178,12 @@ class SpaceService: space_config = self._get_space_config() space_url = space_config['url'] - async with aiohttp.ClientSession() as session: - async with session.get(f'{space_url}/api/v1/models') as response: - if response.status != 200: - raise ValueError(f'Failed to get models: {await response.text()}') - data = await response.json() - if data.get('code') != 0: - raise ValueError(f'Failed to get models: {data.get("msg")}') - models_data = data.get('data', {}).get('models', []) - return [SpaceModel.model_validate(model_dict) for model_dict in models_data] + session = httpclient.get_session() + async with session.get(f'{space_url}/api/v1/models') as response: + if response.status != 200: + raise ValueError(f'Failed to get models: {await response.text()}') + data = await response.json() + if data.get('code') != 0: + raise ValueError(f'Failed to get models: {data.get("msg")}') + models_data = data.get('data', {}).get('models', []) + return [SpaceModel.model_validate(model_dict) for model_dict in models_data] diff --git a/src/langbot/pkg/core/bootutils/deps.py b/src/langbot/pkg/core/bootutils/deps.py index b2508b22..1f653037 100644 --- a/src/langbot/pkg/core/bootutils/deps.py +++ b/src/langbot/pkg/core/bootutils/deps.py @@ -1,3 +1,4 @@ +import importlib.util import pip import os from ...utils import pkgmgr @@ -49,9 +50,10 @@ async def check_deps() -> list[str]: missing_deps = [] for dep in required_deps: - try: - __import__(dep) - except ImportError: + # Use find_spec instead of __import__ to avoid actually loading + # all modules into memory. find_spec only checks if the module + # can be found, without executing module-level code. + if importlib.util.find_spec(dep) is None: missing_deps.append(dep) return missing_deps diff --git a/src/langbot/pkg/pipeline/cntfilter/filters/baiduexamine.py b/src/langbot/pkg/pipeline/cntfilter/filters/baiduexamine.py index 4213e662..a376310f 100644 --- a/src/langbot/pkg/pipeline/cntfilter/filters/baiduexamine.py +++ b/src/langbot/pkg/pipeline/cntfilter/filters/baiduexamine.py @@ -1,10 +1,9 @@ from __future__ import annotations -import aiohttp - from .. import entities from .. import filter as filter_model import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query +from langbot.pkg.utils import httpclient BAIDU_EXAMINE_URL = 'https://aip.baidubce.com/rest/2.0/solution/v1/text_censor/v2/user_defined?access_token={}' BAIDU_EXAMINE_TOKEN_URL = 'https://aip.baidubce.com/oauth/2.0/token' @@ -15,50 +14,50 @@ class BaiduCloudExamine(filter_model.ContentFilter): """百度云内容审核""" async def _get_token(self) -> str: - async with aiohttp.ClientSession() as session: - async with session.post( - BAIDU_EXAMINE_TOKEN_URL, - params={ - 'grant_type': 'client_credentials', - 'client_id': self.ap.pipeline_cfg.data['baidu-cloud-examine']['api-key'], - 'client_secret': self.ap.pipeline_cfg.data['baidu-cloud-examine']['api-secret'], - }, - ) as resp: - return (await resp.json())['access_token'] + session = httpclient.get_session() + async with session.post( + BAIDU_EXAMINE_TOKEN_URL, + params={ + 'grant_type': 'client_credentials', + 'client_id': self.ap.pipeline_cfg.data['baidu-cloud-examine']['api-key'], + 'client_secret': self.ap.pipeline_cfg.data['baidu-cloud-examine']['api-secret'], + }, + ) as resp: + return (await resp.json())['access_token'] async def process(self, query: pipeline_query.Query, message: str) -> entities.FilterResult: - async with aiohttp.ClientSession() as session: - async with session.post( - BAIDU_EXAMINE_URL.format(await self._get_token()), - headers={ - 'Content-Type': 'application/x-www-form-urlencoded', - 'Accept': 'application/json', - }, - data=f'text={message}'.encode('utf-8'), - ) as resp: - result = await resp.json() + session = httpclient.get_session() + async with session.post( + BAIDU_EXAMINE_URL.format(await self._get_token()), + headers={ + 'Content-Type': 'application/x-www-form-urlencoded', + 'Accept': 'application/json', + }, + data=f'text={message}'.encode('utf-8'), + ) as resp: + result = await resp.json() - if 'error_code' in result: + if 'error_code' in result: + return entities.FilterResult( + level=entities.ResultLevel.BLOCK, + replacement=message, + user_notice='', + console_notice=f'百度云判定出错,错误信息:{result["error_msg"]}', + ) + else: + conclusion = result['conclusion'] + + if conclusion in ('合规'): + return entities.FilterResult( + level=entities.ResultLevel.PASS, + replacement=message, + user_notice='', + console_notice=f'百度云判定结果:{conclusion}', + ) + else: return entities.FilterResult( level=entities.ResultLevel.BLOCK, replacement=message, - user_notice='', - console_notice=f'百度云判定出错,错误信息:{result["error_msg"]}', + user_notice='消息中存在不合适的内容, 请修改', + console_notice=f'百度云判定结果:{conclusion}', ) - else: - conclusion = result['conclusion'] - - if conclusion in ('合规'): - return entities.FilterResult( - level=entities.ResultLevel.PASS, - replacement=message, - user_notice='', - console_notice=f'百度云判定结果:{conclusion}', - ) - else: - return entities.FilterResult( - level=entities.ResultLevel.BLOCK, - replacement=message, - user_notice='消息中存在不合适的内容, 请修改', - console_notice=f'百度云判定结果:{conclusion}', - ) diff --git a/src/langbot/pkg/platform/sources/discord.py b/src/langbot/pkg/platform/sources/discord.py index cb80ce48..e9cc7a37 100644 --- a/src/langbot/pkg/platform/sources/discord.py +++ b/src/langbot/pkg/platform/sources/discord.py @@ -14,7 +14,7 @@ import io import asyncio from enum import Enum -import aiohttp +from langbot.pkg.utils import httpclient import pydantic import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter @@ -622,23 +622,23 @@ class DiscordMessageConverter(abstract_platform_adapter.AbstractMessageConverter image_bytes = base64.b64decode(base64_data) elif ele.url: # 从URL下载图片 - async with aiohttp.ClientSession() as session: - async with session.get(ele.url) as response: - image_bytes = await response.read() - # 从URL或Content-Type推断文件类型 - content_type = response.headers.get('Content-Type', '') - if 'jpeg' in content_type or 'jpg' in content_type: - filename = f'{uuid.uuid4()}.jpg' - elif 'gif' in content_type: - filename = f'{uuid.uuid4()}.gif' - elif 'webp' in content_type: - filename = f'{uuid.uuid4()}.webp' - elif ele.url.lower().endswith(('.jpg', '.jpeg')): - filename = f'{uuid.uuid4()}.jpg' - elif ele.url.lower().endswith('.gif'): - filename = f'{uuid.uuid4()}.gif' - elif ele.url.lower().endswith('.webp'): - filename = f'{uuid.uuid4()}.webp' + session = httpclient.get_session() + async with session.get(ele.url) as response: + image_bytes = await response.read() + # 从URL或Content-Type推断文件类型 + content_type = response.headers.get('Content-Type', '') + if 'jpeg' in content_type or 'jpg' in content_type: + filename = f'{uuid.uuid4()}.jpg' + elif 'gif' in content_type: + filename = f'{uuid.uuid4()}.gif' + elif 'webp' in content_type: + filename = f'{uuid.uuid4()}.webp' + elif ele.url.lower().endswith(('.jpg', '.jpeg')): + filename = f'{uuid.uuid4()}.jpg' + elif ele.url.lower().endswith('.gif'): + filename = f'{uuid.uuid4()}.gif' + elif ele.url.lower().endswith('.webp'): + filename = f'{uuid.uuid4()}.webp' elif ele.path: # 从文件路径读取图片 # 确保路径没有空字节 @@ -702,9 +702,9 @@ class DiscordMessageConverter(abstract_platform_adapter.AbstractMessageConverter file_base64 = ele.base64.split(',')[-1] file_bytes = base64.b64decode(file_base64) elif ele.url: - async with aiohttp.ClientSession() as session: - async with session.get(ele.url) as response: - file_bytes = await response.read() + session = httpclient.get_session() + async with session.get(ele.url) as response: + file_bytes = await response.read() if file_bytes: files.append(discord.File(fp=io.BytesIO(file_bytes), filename=filename)) elif isinstance(ele, platform_message.File): @@ -717,9 +717,9 @@ class DiscordMessageConverter(abstract_platform_adapter.AbstractMessageConverter else: file_bytes = base64.b64decode(ele.base64) elif ele.url: - async with aiohttp.ClientSession() as session: - async with session.get(ele.url) as response: - file_bytes = await response.read() + session = httpclient.get_session() + async with session.get(ele.url) as response: + file_bytes = await response.read() if file_bytes: files.append(discord.File(fp=io.BytesIO(file_bytes), filename=filename)) elif isinstance(ele, platform_message.Forward): @@ -775,12 +775,12 @@ class DiscordMessageConverter(abstract_platform_adapter.AbstractMessageConverter # attachments for attachment in message.attachments: - async with aiohttp.ClientSession(trust_env=True) as session: - async with session.get(attachment.url) as response: - image_data = await response.read() - image_base64 = base64.b64encode(image_data).decode('utf-8') - image_format = response.headers['Content-Type'] - element_list.append(platform_message.Image(base64=f'data:{image_format};base64,{image_base64}')) + session = httpclient.get_session(trust_env=True) + async with session.get(attachment.url) as response: + image_data = await response.read() + image_base64 = base64.b64encode(image_data).decode('utf-8') + image_format = response.headers['Content-Type'] + element_list.append(platform_message.Image(base64=f'data:{image_format};base64,{image_base64}')) return platform_message.MessageChain(element_list) diff --git a/src/langbot/pkg/platform/sources/kook.py b/src/langbot/pkg/platform/sources/kook.py index 17777a95..5a6bade3 100644 --- a/src/langbot/pkg/platform/sources/kook.py +++ b/src/langbot/pkg/platform/sources/kook.py @@ -9,6 +9,8 @@ import traceback import time import aiohttp + +from langbot.pkg.utils import httpclient import websockets import pydantic @@ -120,16 +122,16 @@ class KookMessageConverter(abstract_platform_adapter.AbstractMessageConverter): if content: # Download image and convert to base64 try: - async with aiohttp.ClientSession() as session: - async with session.get(content) as response: - if response.status == 200: - image_bytes = await response.read() - image_base64 = base64.b64encode(image_bytes).decode('utf-8') - # Detect image format - content_type = response.headers.get('Content-Type', 'image/png') - components.append( - platform_message.Image(base64=f'data:{content_type};base64,{image_base64}') - ) + session = httpclient.get_session() + async with session.get(content) as response: + if response.status == 200: + image_bytes = await response.read() + image_base64 = base64.b64encode(image_bytes).decode('utf-8') + # Detect image format + content_type = response.headers.get('Content-Type', 'image/png') + components.append( + platform_message.Image(base64=f'data:{content_type};base64,{image_base64}') + ) except Exception: # If download fails, just add as plain text components.append(platform_message.Plain(text=f'[Image: {content}]')) @@ -295,17 +297,17 @@ class KookAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): 'Authorization': f'Bot {self.config["token"]}', } - async with aiohttp.ClientSession() as session: - async with session.get(base_url, params=params, headers=headers) as response: - if response.status == 200: - data = await response.json() - if data.get('code') == 0: - gateway_url = data['data']['url'] - return gateway_url - else: - raise Exception(f'Failed to get gateway URL: {data.get("message")}') + session = httpclient.get_session() + async with session.get(base_url, params=params, headers=headers) as response: + if response.status == 200: + data = await response.json() + if data.get('code') == 0: + gateway_url = data['data']['url'] + return gateway_url else: - raise Exception(f'Failed to get gateway URL: HTTP {response.status}') + raise Exception(f'Failed to get gateway URL: {data.get("message")}') + else: + raise Exception(f'Failed to get gateway URL: HTTP {response.status}') async def _get_bot_user_info(self) -> dict: """Get bot's own user information from KOOK API""" @@ -315,17 +317,17 @@ class KookAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): 'Authorization': f'Bot {self.config["token"]}', } - async with aiohttp.ClientSession() as session: - async with session.get(base_url, headers=headers) as response: - if response.status == 200: - data = await response.json() - if data.get('code') == 0: - user_info = data['data'] - return user_info - else: - raise Exception(f'Failed to get bot user info: {data.get("message")}') + session = httpclient.get_session() + async with session.get(base_url, headers=headers) as response: + if response.status == 200: + data = await response.json() + if data.get('code') == 0: + user_info = data['data'] + return user_info else: - raise Exception(f'Failed to get bot user info: HTTP {response.status}') + raise Exception(f'Failed to get bot user info: {data.get("message")}') + else: + raise Exception(f'Failed to get bot user info: HTTP {response.status}') async def _handle_hello(self, data: dict): """Handle HELLO signal (signal 1)""" @@ -510,7 +512,7 @@ class KookAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): try: if not self.http_session: - self.http_session = aiohttp.ClientSession() + self.http_session = httpclient.get_session() async with self.http_session.post(url, json=payload, headers=headers) as response: if response.status == 200: @@ -576,7 +578,7 @@ class KookAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): try: if not self.http_session: - self.http_session = aiohttp.ClientSession() + self.http_session = httpclient.get_session() async with self.http_session.post(url, json=payload, headers=headers) as response: if response.status == 200: @@ -624,7 +626,7 @@ class KookAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): try: # Create HTTP session - self.http_session = aiohttp.ClientSession() + self.http_session = httpclient.get_session() await self.logger.info('Starting KOOK adapter') diff --git a/src/langbot/pkg/platform/sources/lark.py b/src/langbot/pkg/platform/sources/lark.py index ce527731..3ce4280c 100644 --- a/src/langbot/pkg/platform/sources/lark.py +++ b/src/langbot/pkg/platform/sources/lark.py @@ -17,7 +17,7 @@ import tempfile import os import mimetypes -import aiohttp +from langbot.pkg.utils import httpclient import lark_oapi.ws.exception import quart from lark_oapi.api.im.v1 import * @@ -78,13 +78,13 @@ class LarkMessageConverter(abstract_platform_adapter.AbstractMessageConverter): return None elif msg.url: try: - async with aiohttp.ClientSession() as session: - async with session.get(msg.url) as response: - if response.status == 200: - image_bytes = await response.read() - else: - print(f'Failed to download image from {msg.url}: HTTP {response.status}') - return None + session = httpclient.get_session() + async with session.get(msg.url) as response: + if response.status == 200: + image_bytes = await response.read() + else: + print(f'Failed to download image from {msg.url}: HTTP {response.status}') + return None except Exception as e: print(f'Failed to download image from {msg.url}: {e}') traceback.print_exc() @@ -208,10 +208,10 @@ class LarkMessageConverter(abstract_platform_adapter.AbstractMessageConverter): pass elif msg.url: try: - async with aiohttp.ClientSession() as session: - async with session.get(msg.url) as resp: - if resp.status == 200: - data = await resp.read() + session = httpclient.get_session() + async with session.get(msg.url) as resp: + if resp.status == 200: + data = await resp.read() except Exception: pass elif msg.path: diff --git a/src/langbot/pkg/platform/sources/legacy/gewechat.py b/src/langbot/pkg/platform/sources/legacy/gewechat.py index 93bef53c..68e1bded 100644 --- a/src/langbot/pkg/platform/sources/legacy/gewechat.py +++ b/src/langbot/pkg/platform/sources/legacy/gewechat.py @@ -9,7 +9,7 @@ import copy import threading import quart -import aiohttp +from langbot.pkg.utils import httpclient import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter from ....core import app @@ -639,14 +639,14 @@ class GeWeChatAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter): async def run_async(self): if not self.config['token']: - async with aiohttp.ClientSession() as session: - async with session.post( - f'{self.config["gewechat_url"]}/v2/api/tools/getTokenId', - json={'app_id': self.config['app_id']}, - ) as response: - if response.status != 200: - raise Exception(f'获取gewechat token失败: {await response.text()}') - self.config['token'] = (await response.json())['data'] + session = httpclient.get_session() + async with session.post( + f'{self.config["gewechat_url"]}/v2/api/tools/getTokenId', + json={'app_id': self.config['app_id']}, + ) as response: + if response.status != 200: + raise Exception(f'获取gewechat token失败: {await response.text()}') + self.config['token'] = (await response.json())['data'] self.bot = gewechat_client.GewechatClient(f'{self.config["gewechat_url"]}/v2/api', self.config['token']) diff --git a/src/langbot/pkg/platform/sources/telegram.py b/src/langbot/pkg/platform/sources/telegram.py index c2f6cd6d..7e7c8d9a 100644 --- a/src/langbot/pkg/platform/sources/telegram.py +++ b/src/langbot/pkg/platform/sources/telegram.py @@ -9,9 +9,9 @@ import telegramify_markdown import typing import traceback import base64 -import aiohttp import pydantic +from langbot.pkg.utils import httpclient import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter import langbot_plugin.api.entities.builtin.platform.message as platform_message import langbot_plugin.api.entities.builtin.platform.events as platform_events @@ -33,9 +33,9 @@ class TelegramMessageConverter(abstract_platform_adapter.AbstractMessageConverte if component.base64: photo_bytes = base64.b64decode(component.base64) elif component.url: - async with aiohttp.ClientSession() as session: - async with session.get(component.url) as response: - photo_bytes = await response.read() + session = httpclient.get_session() + async with session.get(component.url) as response: + photo_bytes = await response.read() elif component.path: with open(component.path, 'rb') as f: photo_bytes = f.read() @@ -74,10 +74,9 @@ class TelegramMessageConverter(abstract_platform_adapter.AbstractMessageConverte file_bytes = None file_format = '' - async with aiohttp.ClientSession(trust_env=True) as session: - async with session.get(file.file_path) as response: - file_bytes = await response.read() - file_format = 'image/jpeg' + async with httpclient.get_session(trust_env=True).get(file.file_path) as response: + file_bytes = await response.read() + file_format = 'image/jpeg' message_components.append( platform_message.Image( @@ -94,9 +93,8 @@ class TelegramMessageConverter(abstract_platform_adapter.AbstractMessageConverte file_bytes = None file_format = message.voice.mime_type or 'audio/ogg' - async with aiohttp.ClientSession(trust_env=True) as session: - async with session.get(file.file_path) as response: - file_bytes = await response.read() + async with httpclient.get_session(trust_env=True).get(file.file_path) as response: + file_bytes = await response.read() message_components.append( platform_message.Voice( diff --git a/src/langbot/pkg/platform/webhook_pusher.py b/src/langbot/pkg/platform/webhook_pusher.py index 5a8d2564..f3cf39b2 100644 --- a/src/langbot/pkg/platform/webhook_pusher.py +++ b/src/langbot/pkg/platform/webhook_pusher.py @@ -3,6 +3,8 @@ from __future__ import annotations import asyncio import logging import aiohttp + +from langbot.pkg.utils import httpclient import uuid from typing import TYPE_CHECKING @@ -119,23 +121,23 @@ class WebhookPusher: dict | None: The response JSON if successful, None otherwise """ try: - async with aiohttp.ClientSession() as session: - async with session.post( - url, - json=payload, - headers={'Content-Type': 'application/json'}, - timeout=aiohttp.ClientTimeout(total=15), - ) as response: - if response.status >= 400: - self.logger.warning(f'Webhook {url} returned status {response.status}') + session = httpclient.get_session() + async with session.post( + url, + json=payload, + headers={'Content-Type': 'application/json'}, + timeout=aiohttp.ClientTimeout(total=15), + ) as response: + if response.status >= 400: + self.logger.warning(f'Webhook {url} returned status {response.status}') + return None + else: + self.logger.debug(f'Successfully pushed to webhook {url}') + try: + return await response.json() + except Exception as json_error: + self.logger.debug(f'Failed to parse JSON response from webhook {url}: {json_error}') return None - else: - self.logger.debug(f'Successfully pushed to webhook {url}') - try: - return await response.json() - except Exception as json_error: - self.logger.debug(f'Failed to parse JSON response from webhook {url}: {json_error}') - return None except asyncio.TimeoutError: self.logger.warning(f'Timeout pushing to webhook {url}') return None diff --git a/src/langbot/pkg/provider/runners/n8nsvapi.py b/src/langbot/pkg/provider/runners/n8nsvapi.py index d7ec3ccb..d177d6b8 100644 --- a/src/langbot/pkg/provider/runners/n8nsvapi.py +++ b/src/langbot/pkg/provider/runners/n8nsvapi.py @@ -5,6 +5,8 @@ import json import uuid import aiohttp +from langbot.pkg.utils import httpclient + from .. import runner from ...core import app import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query @@ -217,50 +219,50 @@ class N8nServiceAPIRunner(runner.RequestRunner): self.ap.logger.debug('no auth') # 调用webhook - async with aiohttp.ClientSession() as session: - if is_stream: - # 流式请求 - async with session.post( - self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout - ) as response: + session = httpclient.get_session() + if is_stream: + # 流式请求 + async with session.post( + self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout + ) as response: + if response.status != 200: + error_text = await response.text() + self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}') + raise Exception(f'n8n webhook call failed: {response.status}, {error_text}') + + # 处理流式响应 + async for chunk in self._process_stream_response(response): + yield chunk + else: + async with session.post( + self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout + ) as response: + try: + async for chunk in self._process_stream_response(response): + output_content = chunk.content if chunk.is_final else '' + except: + # 非流式请求(保持原有逻辑) if response.status != 200: error_text = await response.text() self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}') raise Exception(f'n8n webhook call failed: {response.status}, {error_text}') - # 处理流式响应 - async for chunk in self._process_stream_response(response): - yield chunk - else: - async with session.post( - self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout - ) as response: - try: - async for chunk in self._process_stream_response(response): - output_content = chunk.content if chunk.is_final else '' - except: - # 非流式请求(保持原有逻辑) - if response.status != 200: - error_text = await response.text() - self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}') - raise Exception(f'n8n webhook call failed: {response.status}, {error_text}') + # 解析响应 + response_data = await response.json() + self.ap.logger.debug(f'n8n webhook response: {response_data}') - # 解析响应 - response_data = await response.json() - self.ap.logger.debug(f'n8n webhook response: {response_data}') + # 从响应中提取输出 + if self.output_key in response_data: + output_content = response_data[self.output_key] + else: + # 如果没有指定的输出键,则使用整个响应 + output_content = json.dumps(response_data, ensure_ascii=False) - # 从响应中提取输出 - if self.output_key in response_data: - output_content = response_data[self.output_key] - else: - # 如果没有指定的输出键,则使用整个响应 - output_content = json.dumps(response_data, ensure_ascii=False) - - # 返回消息 - yield provider_message.Message( - role='assistant', - content=output_content, - ) + # 返回消息 + yield provider_message.Message( + role='assistant', + content=output_content, + ) except Exception as e: self.ap.logger.error(f'n8n webhook call exception: {str(e)}') raise N8nAPIError(f'n8n webhook call exception: {str(e)}') diff --git a/src/langbot/pkg/utils/httpclient.py b/src/langbot/pkg/utils/httpclient.py new file mode 100644 index 00000000..e9c04b34 --- /dev/null +++ b/src/langbot/pkg/utils/httpclient.py @@ -0,0 +1,43 @@ +"""Shared aiohttp.ClientSession to avoid repeated SSL context creation. + +Each call to `aiohttp.ClientSession()` creates a new `TCPConnector` which in turn +creates a new `ssl.SSLContext` and loads all system root certificates. This is +extremely expensive in both CPU and memory (~270MB total allocations observed via +memray profiling). + +This module provides a shared session pool so that all HTTP client code in LangBot +reuses the same underlying SSL context and connection pool. +""" + +from __future__ import annotations + +import aiohttp + +_sessions: dict[str, aiohttp.ClientSession] = {} + + +def get_session(*, trust_env: bool = False) -> aiohttp.ClientSession: + """Get or create a shared aiohttp.ClientSession. + + Args: + trust_env: Whether to trust environment variables for proxy settings. + + Returns: + A shared aiohttp.ClientSession instance. + """ + key = f'trust_env={trust_env}' + + session = _sessions.get(key) + if session is None or session.closed: + session = aiohttp.ClientSession(trust_env=trust_env) + _sessions[key] = session + + return session + + +async def close_all(): + """Close all shared sessions. Call on application shutdown.""" + for session in _sessions.values(): + if not session.closed: + await session.close() + _sessions.clear() diff --git a/src/langbot/pkg/utils/image.py b/src/langbot/pkg/utils/image.py index e07caec6..5716b07d 100644 --- a/src/langbot/pkg/utils/image.py +++ b/src/langbot/pkg/utils/image.py @@ -5,6 +5,8 @@ from urllib.parse import urlparse, parse_qs import ssl import aiohttp + +from langbot.pkg.utils import httpclient import PIL.Image import httpx @@ -47,53 +49,54 @@ async def get_gewechat_image_base64( ) try: - async with aiohttp.ClientSession(timeout=timeout) as session: - # 获取图片下载链接 - try: - async with session.post( - f'{gewechat_url}/v2/api/message/downloadImage', - headers=headers, - json={'appId': app_id, 'type': image_type, 'xml': xml_content}, - ) as response: - if response.status != 200: - # print(response) - raise Exception(f'获取gewechat图片下载失败: {await response.text()}') + session = httpclient.get_session() + # 获取图片下载链接 + try: + async with session.post( + f'{gewechat_url}/v2/api/message/downloadImage', + headers=headers, + json={'appId': app_id, 'type': image_type, 'xml': xml_content}, + timeout=timeout, + ) as response: + if response.status != 200: + # print(response) + raise Exception(f'获取gewechat图片下载失败: {await response.text()}') - resp_data = await response.json() - if resp_data.get('ret') != 200: - raise Exception(f'获取gewechat图片下载链接失败: {resp_data}') + resp_data = await response.json() + if resp_data.get('ret') != 200: + raise Exception(f'获取gewechat图片下载链接失败: {resp_data}') - file_url = resp_data['data']['fileUrl'] - except asyncio.TimeoutError: - raise Exception('获取图片下载链接超时') - except aiohttp.ClientError as e: - raise Exception(f'获取图片下载链接网络错误: {str(e)}') + file_url = resp_data['data']['fileUrl'] + except asyncio.TimeoutError: + raise Exception('获取图片下载链接超时') + except aiohttp.ClientError as e: + raise Exception(f'获取图片下载链接网络错误: {str(e)}') - # 解析原始URL并替换端口 - base_url = gewechat_file_url - download_url = f'{base_url}/download/{file_url}' + # 解析原始URL并替换端口 + base_url = gewechat_file_url + download_url = f'{base_url}/download/{file_url}' - # 下载图片 - try: - async with session.get(download_url) as img_response: - if img_response.status != 200: - raise Exception(f'下载图片失败: {await img_response.text()}, URL: {download_url}') + # 下载图片 + try: + async with session.get(download_url) as img_response: + if img_response.status != 200: + raise Exception(f'下载图片失败: {await img_response.text()}, URL: {download_url}') - image_data = await img_response.read() + image_data = await img_response.read() - content_type = img_response.headers.get('Content-Type', '') - if content_type: - image_format = content_type.split('/')[-1] - else: - image_format = file_url.split('.')[-1] + content_type = img_response.headers.get('Content-Type', '') + if content_type: + image_format = content_type.split('/')[-1] + else: + image_format = file_url.split('.')[-1] - base64_str = base64.b64encode(image_data).decode('utf-8') + base64_str = base64.b64encode(image_data).decode('utf-8') - return base64_str, image_format - except asyncio.TimeoutError: - raise Exception(f'下载图片超时, URL: {download_url}') - except aiohttp.ClientError as e: - raise Exception(f'下载图片网络错误: {str(e)}, URL: {download_url}') + return base64_str, image_format + except asyncio.TimeoutError: + raise Exception(f'下载图片超时, URL: {download_url}') + except aiohttp.ClientError as e: + raise Exception(f'下载图片网络错误: {str(e)}, URL: {download_url}') except Exception as e: raise Exception(f'获取图片失败: {str(e)}') from e @@ -104,24 +107,24 @@ async def get_wecom_image_base64(pic_url: str) -> tuple[str, str]: :param pic_url: 企业微信图片URL :return: (base64_str, image_format) """ - async with aiohttp.ClientSession() as session: - async with session.get(pic_url) as response: - if response.status != 200: - raise Exception(f'Failed to download image: {response.status}') + session = httpclient.get_session() + async with session.get(pic_url) as response: + if response.status != 200: + raise Exception(f'Failed to download image: {response.status}') - # 读取图片数据 - image_data = await response.read() + # 读取图片数据 + image_data = await response.read() - # 获取图片格式 - content_type = response.headers.get('Content-Type', '') - image_format = content_type.split('/')[-1] # 例如 'image/jpeg' -> 'jpeg' + # 获取图片格式 + content_type = response.headers.get('Content-Type', '') + image_format = content_type.split('/')[-1] # 例如 'image/jpeg' -> 'jpeg' - # 转换为 base64 - import base64 + # 转换为 base64 + import base64 - image_base64 = base64.b64encode(image_data).decode('utf-8') + image_base64 = base64.b64encode(image_data).decode('utf-8') - return image_base64, image_format + return image_base64, image_format async def get_qq_official_image_base64(pic_url: str, content_type: str) -> tuple[str, str]: @@ -152,21 +155,19 @@ async def get_qq_image_bytes(image_url: str, query: dict = {}) -> tuple[bytes, s ssl_context = ssl.create_default_context() ssl_context.check_hostname = False ssl_context.verify_mode = ssl.CERT_NONE - async with aiohttp.ClientSession(trust_env=False) as session: - async with session.get( - image_url, params=query, ssl=ssl_context, timeout=aiohttp.ClientTimeout(total=30.0) - ) as resp: - resp.raise_for_status() - file_bytes = await resp.read() - content_type = resp.headers.get('Content-Type') - if not content_type: - image_format = 'jpeg' - elif not content_type.startswith('image/'): - pil_img = PIL.Image.open(io.BytesIO(file_bytes)) - image_format = pil_img.format.lower() - else: - image_format = content_type.split('/')[-1] - return file_bytes, image_format + session = httpclient.get_session() + async with session.get(image_url, params=query, ssl=ssl_context, timeout=aiohttp.ClientTimeout(total=30.0)) as resp: + resp.raise_for_status() + file_bytes = await resp.read() + content_type = resp.headers.get('Content-Type') + if not content_type: + image_format = 'jpeg' + elif not content_type.startswith('image/'): + pil_img = PIL.Image.open(io.BytesIO(file_bytes)) + image_format = pil_img.format.lower() + else: + image_format = content_type.split('/')[-1] + return file_bytes, image_format async def qq_image_url_to_base64(image_url: str) -> typing.Tuple[str, str]: @@ -204,11 +205,11 @@ async def extract_b64_and_format(image_base64_data: str) -> typing.Tuple[str, st async def get_slack_image_to_base64(pic_url: str, bot_token: str): headers = {'Authorization': f'Bearer {bot_token}'} try: - async with aiohttp.ClientSession() as session: - async with session.get(pic_url, headers=headers) as resp: - mime_type = resp.headers.get('Content-Type', 'application/octet-stream') - file_bytes = await resp.read() - base64_str = base64.b64encode(file_bytes).decode('utf-8') - return f'data:{mime_type};base64,{base64_str}' + session = httpclient.get_session() + async with session.get(pic_url, headers=headers) as resp: + mime_type = resp.headers.get('Content-Type', 'application/octet-stream') + file_bytes = await resp.read() + base64_str = base64.b64encode(file_bytes).decode('utf-8') + return f'data:{mime_type};base64,{base64_str}' except Exception as e: raise (e)