mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-08 14:56:03 +00:00
perf: reduce memory usage by ~200MB+ at startup (#2013)
* perf: reduce memory usage by ~200MB+ at startup
Two key optimizations:
1. Use importlib.util.find_spec() instead of __import__() in dependency
checking. find_spec() only locates modules without executing them,
avoiding loading all 36 dependencies (~222MB) into memory at startup.
2. Introduce shared aiohttp.ClientSession via httpclient module.
Previously, every HTTP request created a new ClientSession, which
creates a new TCPConnector and SSL context, loading system root
certificates each time (~270MB total allocations observed via memray).
Now all HTTP client code reuses shared sessions.
- satori.py and coze_server_api/client.py are left unchanged as they
create one session per adapter lifecycle (not per-request).
Profiling data (memray):
- Peak memory: 403MB
- SSL context creation: 270MB / 6.7M allocations (67% of total)
- Dependency import: 222MB (55% of peak)
- Expected reduction: 150-350MB at startup
* fix: remove unused aiohttp imports (ruff F401)
* style: ruff format
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
import requests
|
||||
import aiohttp
|
||||
from langbot.pkg.utils import httpclient
|
||||
|
||||
|
||||
def post_json(base_url, token, data=None):
|
||||
@@ -63,16 +63,16 @@ async def async_request(
|
||||
"""
|
||||
headers = {'Content-Type': 'application/json'}
|
||||
url = f'{base_url}?key={token_key}'
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.request(
|
||||
method=method, url=url, params=params, headers=headers, data=data, json=json
|
||||
) as response:
|
||||
response.raise_for_status() # 如果状态码不是200,抛出异常
|
||||
result = await response.json()
|
||||
# print(result)
|
||||
return result
|
||||
# if result.get('Code') == 200:
|
||||
#
|
||||
# return await result
|
||||
# else:
|
||||
# raise RuntimeError("请求失败",response.text)
|
||||
session = httpclient.get_session()
|
||||
async with session.request(
|
||||
method=method, url=url, params=params, headers=headers, data=data, json=json
|
||||
) as response:
|
||||
response.raise_for_status() # 如果状态码不是200,抛出异常
|
||||
result = await response.json()
|
||||
# print(result)
|
||||
return result
|
||||
# if result.get('Code') == 200:
|
||||
#
|
||||
# return await result
|
||||
# else:
|
||||
# raise RuntimeError("请求失败",response.text)
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import aiohttp
|
||||
from langbot.pkg.utils import httpclient
|
||||
import typing
|
||||
import datetime
|
||||
import time
|
||||
@@ -99,49 +99,49 @@ class SpaceService:
|
||||
space_config = self._get_space_config()
|
||||
space_url = space_config['url']
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
f'{space_url}/api/v1/accounts/oauth/token',
|
||||
json={'code': code, 'instance_id': constants.instance_id},
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
raise ValueError(f'Failed to exchange OAuth code: {await response.text()}')
|
||||
data = await response.json()
|
||||
if data.get('code') != 0:
|
||||
raise ValueError(f'Failed to exchange OAuth code: {data.get("msg")}')
|
||||
return data.get('data', {})
|
||||
session = httpclient.get_session()
|
||||
async with session.post(
|
||||
f'{space_url}/api/v1/accounts/oauth/token',
|
||||
json={'code': code, 'instance_id': constants.instance_id},
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
raise ValueError(f'Failed to exchange OAuth code: {await response.text()}')
|
||||
data = await response.json()
|
||||
if data.get('code') != 0:
|
||||
raise ValueError(f'Failed to exchange OAuth code: {data.get("msg")}')
|
||||
return data.get('data', {})
|
||||
|
||||
async def refresh_token(self, refresh_token: str) -> typing.Dict:
|
||||
"""Refresh Space access token"""
|
||||
space_config = self._get_space_config()
|
||||
space_url = space_config['url']
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
f'{space_url}/api/v1/accounts/token/refresh', json={'refresh_token': refresh_token}
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
raise ValueError(f'Failed to refresh token: {await response.text()}')
|
||||
data = await response.json()
|
||||
if data.get('code') != 0:
|
||||
raise ValueError(f'Failed to refresh token: {data.get("msg")}')
|
||||
return data.get('data', {})
|
||||
session = httpclient.get_session()
|
||||
async with session.post(
|
||||
f'{space_url}/api/v1/accounts/token/refresh', json={'refresh_token': refresh_token}
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
raise ValueError(f'Failed to refresh token: {await response.text()}')
|
||||
data = await response.json()
|
||||
if data.get('code') != 0:
|
||||
raise ValueError(f'Failed to refresh token: {data.get("msg")}')
|
||||
return data.get('data', {})
|
||||
|
||||
async def get_user_info_raw(self, access_token: str) -> typing.Dict:
|
||||
"""Get user info from Space using access token (no validation)"""
|
||||
space_config = self._get_space_config()
|
||||
space_url = space_config['url']
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(
|
||||
f'{space_url}/api/v1/accounts/me', headers={'Authorization': f'Bearer {access_token}'}
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
raise ValueError(f'Failed to get user info: {await response.text()}')
|
||||
data = await response.json()
|
||||
if data.get('code') != 0:
|
||||
raise ValueError(f'Failed to get user info: {data.get("msg")}')
|
||||
return data.get('data', {})
|
||||
session = httpclient.get_session()
|
||||
async with session.get(
|
||||
f'{space_url}/api/v1/accounts/me', headers={'Authorization': f'Bearer {access_token}'}
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
raise ValueError(f'Failed to get user info: {await response.text()}')
|
||||
data = await response.json()
|
||||
if data.get('code') != 0:
|
||||
raise ValueError(f'Failed to get user info: {data.get("msg")}')
|
||||
return data.get('data', {})
|
||||
|
||||
# === API calls with token validation ===
|
||||
|
||||
@@ -178,12 +178,12 @@ class SpaceService:
|
||||
space_config = self._get_space_config()
|
||||
space_url = space_config['url']
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(f'{space_url}/api/v1/models') as response:
|
||||
if response.status != 200:
|
||||
raise ValueError(f'Failed to get models: {await response.text()}')
|
||||
data = await response.json()
|
||||
if data.get('code') != 0:
|
||||
raise ValueError(f'Failed to get models: {data.get("msg")}')
|
||||
models_data = data.get('data', {}).get('models', [])
|
||||
return [SpaceModel.model_validate(model_dict) for model_dict in models_data]
|
||||
session = httpclient.get_session()
|
||||
async with session.get(f'{space_url}/api/v1/models') as response:
|
||||
if response.status != 200:
|
||||
raise ValueError(f'Failed to get models: {await response.text()}')
|
||||
data = await response.json()
|
||||
if data.get('code') != 0:
|
||||
raise ValueError(f'Failed to get models: {data.get("msg")}')
|
||||
models_data = data.get('data', {}).get('models', [])
|
||||
return [SpaceModel.model_validate(model_dict) for model_dict in models_data]
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
import importlib.util
|
||||
import pip
|
||||
import os
|
||||
from ...utils import pkgmgr
|
||||
@@ -49,9 +50,10 @@ async def check_deps() -> list[str]:
|
||||
|
||||
missing_deps = []
|
||||
for dep in required_deps:
|
||||
try:
|
||||
__import__(dep)
|
||||
except ImportError:
|
||||
# Use find_spec instead of __import__ to avoid actually loading
|
||||
# all modules into memory. find_spec only checks if the module
|
||||
# can be found, without executing module-level code.
|
||||
if importlib.util.find_spec(dep) is None:
|
||||
missing_deps.append(dep)
|
||||
return missing_deps
|
||||
|
||||
|
||||
@@ -1,10 +1,9 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import aiohttp
|
||||
|
||||
from .. import entities
|
||||
from .. import filter as filter_model
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
from langbot.pkg.utils import httpclient
|
||||
|
||||
BAIDU_EXAMINE_URL = 'https://aip.baidubce.com/rest/2.0/solution/v1/text_censor/v2/user_defined?access_token={}'
|
||||
BAIDU_EXAMINE_TOKEN_URL = 'https://aip.baidubce.com/oauth/2.0/token'
|
||||
@@ -15,50 +14,50 @@ class BaiduCloudExamine(filter_model.ContentFilter):
|
||||
"""百度云内容审核"""
|
||||
|
||||
async def _get_token(self) -> str:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
BAIDU_EXAMINE_TOKEN_URL,
|
||||
params={
|
||||
'grant_type': 'client_credentials',
|
||||
'client_id': self.ap.pipeline_cfg.data['baidu-cloud-examine']['api-key'],
|
||||
'client_secret': self.ap.pipeline_cfg.data['baidu-cloud-examine']['api-secret'],
|
||||
},
|
||||
) as resp:
|
||||
return (await resp.json())['access_token']
|
||||
session = httpclient.get_session()
|
||||
async with session.post(
|
||||
BAIDU_EXAMINE_TOKEN_URL,
|
||||
params={
|
||||
'grant_type': 'client_credentials',
|
||||
'client_id': self.ap.pipeline_cfg.data['baidu-cloud-examine']['api-key'],
|
||||
'client_secret': self.ap.pipeline_cfg.data['baidu-cloud-examine']['api-secret'],
|
||||
},
|
||||
) as resp:
|
||||
return (await resp.json())['access_token']
|
||||
|
||||
async def process(self, query: pipeline_query.Query, message: str) -> entities.FilterResult:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
BAIDU_EXAMINE_URL.format(await self._get_token()),
|
||||
headers={
|
||||
'Content-Type': 'application/x-www-form-urlencoded',
|
||||
'Accept': 'application/json',
|
||||
},
|
||||
data=f'text={message}'.encode('utf-8'),
|
||||
) as resp:
|
||||
result = await resp.json()
|
||||
session = httpclient.get_session()
|
||||
async with session.post(
|
||||
BAIDU_EXAMINE_URL.format(await self._get_token()),
|
||||
headers={
|
||||
'Content-Type': 'application/x-www-form-urlencoded',
|
||||
'Accept': 'application/json',
|
||||
},
|
||||
data=f'text={message}'.encode('utf-8'),
|
||||
) as resp:
|
||||
result = await resp.json()
|
||||
|
||||
if 'error_code' in result:
|
||||
if 'error_code' in result:
|
||||
return entities.FilterResult(
|
||||
level=entities.ResultLevel.BLOCK,
|
||||
replacement=message,
|
||||
user_notice='',
|
||||
console_notice=f'百度云判定出错,错误信息:{result["error_msg"]}',
|
||||
)
|
||||
else:
|
||||
conclusion = result['conclusion']
|
||||
|
||||
if conclusion in ('合规'):
|
||||
return entities.FilterResult(
|
||||
level=entities.ResultLevel.PASS,
|
||||
replacement=message,
|
||||
user_notice='',
|
||||
console_notice=f'百度云判定结果:{conclusion}',
|
||||
)
|
||||
else:
|
||||
return entities.FilterResult(
|
||||
level=entities.ResultLevel.BLOCK,
|
||||
replacement=message,
|
||||
user_notice='',
|
||||
console_notice=f'百度云判定出错,错误信息:{result["error_msg"]}',
|
||||
user_notice='消息中存在不合适的内容, 请修改',
|
||||
console_notice=f'百度云判定结果:{conclusion}',
|
||||
)
|
||||
else:
|
||||
conclusion = result['conclusion']
|
||||
|
||||
if conclusion in ('合规'):
|
||||
return entities.FilterResult(
|
||||
level=entities.ResultLevel.PASS,
|
||||
replacement=message,
|
||||
user_notice='',
|
||||
console_notice=f'百度云判定结果:{conclusion}',
|
||||
)
|
||||
else:
|
||||
return entities.FilterResult(
|
||||
level=entities.ResultLevel.BLOCK,
|
||||
replacement=message,
|
||||
user_notice='消息中存在不合适的内容, 请修改',
|
||||
console_notice=f'百度云判定结果:{conclusion}',
|
||||
)
|
||||
|
||||
@@ -14,7 +14,7 @@ import io
|
||||
import asyncio
|
||||
from enum import Enum
|
||||
|
||||
import aiohttp
|
||||
from langbot.pkg.utils import httpclient
|
||||
import pydantic
|
||||
|
||||
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
|
||||
@@ -622,23 +622,23 @@ class DiscordMessageConverter(abstract_platform_adapter.AbstractMessageConverter
|
||||
image_bytes = base64.b64decode(base64_data)
|
||||
elif ele.url:
|
||||
# 从URL下载图片
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(ele.url) as response:
|
||||
image_bytes = await response.read()
|
||||
# 从URL或Content-Type推断文件类型
|
||||
content_type = response.headers.get('Content-Type', '')
|
||||
if 'jpeg' in content_type or 'jpg' in content_type:
|
||||
filename = f'{uuid.uuid4()}.jpg'
|
||||
elif 'gif' in content_type:
|
||||
filename = f'{uuid.uuid4()}.gif'
|
||||
elif 'webp' in content_type:
|
||||
filename = f'{uuid.uuid4()}.webp'
|
||||
elif ele.url.lower().endswith(('.jpg', '.jpeg')):
|
||||
filename = f'{uuid.uuid4()}.jpg'
|
||||
elif ele.url.lower().endswith('.gif'):
|
||||
filename = f'{uuid.uuid4()}.gif'
|
||||
elif ele.url.lower().endswith('.webp'):
|
||||
filename = f'{uuid.uuid4()}.webp'
|
||||
session = httpclient.get_session()
|
||||
async with session.get(ele.url) as response:
|
||||
image_bytes = await response.read()
|
||||
# 从URL或Content-Type推断文件类型
|
||||
content_type = response.headers.get('Content-Type', '')
|
||||
if 'jpeg' in content_type or 'jpg' in content_type:
|
||||
filename = f'{uuid.uuid4()}.jpg'
|
||||
elif 'gif' in content_type:
|
||||
filename = f'{uuid.uuid4()}.gif'
|
||||
elif 'webp' in content_type:
|
||||
filename = f'{uuid.uuid4()}.webp'
|
||||
elif ele.url.lower().endswith(('.jpg', '.jpeg')):
|
||||
filename = f'{uuid.uuid4()}.jpg'
|
||||
elif ele.url.lower().endswith('.gif'):
|
||||
filename = f'{uuid.uuid4()}.gif'
|
||||
elif ele.url.lower().endswith('.webp'):
|
||||
filename = f'{uuid.uuid4()}.webp'
|
||||
elif ele.path:
|
||||
# 从文件路径读取图片
|
||||
# 确保路径没有空字节
|
||||
@@ -702,9 +702,9 @@ class DiscordMessageConverter(abstract_platform_adapter.AbstractMessageConverter
|
||||
file_base64 = ele.base64.split(',')[-1]
|
||||
file_bytes = base64.b64decode(file_base64)
|
||||
elif ele.url:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(ele.url) as response:
|
||||
file_bytes = await response.read()
|
||||
session = httpclient.get_session()
|
||||
async with session.get(ele.url) as response:
|
||||
file_bytes = await response.read()
|
||||
if file_bytes:
|
||||
files.append(discord.File(fp=io.BytesIO(file_bytes), filename=filename))
|
||||
elif isinstance(ele, platform_message.File):
|
||||
@@ -717,9 +717,9 @@ class DiscordMessageConverter(abstract_platform_adapter.AbstractMessageConverter
|
||||
else:
|
||||
file_bytes = base64.b64decode(ele.base64)
|
||||
elif ele.url:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(ele.url) as response:
|
||||
file_bytes = await response.read()
|
||||
session = httpclient.get_session()
|
||||
async with session.get(ele.url) as response:
|
||||
file_bytes = await response.read()
|
||||
if file_bytes:
|
||||
files.append(discord.File(fp=io.BytesIO(file_bytes), filename=filename))
|
||||
elif isinstance(ele, platform_message.Forward):
|
||||
@@ -775,12 +775,12 @@ class DiscordMessageConverter(abstract_platform_adapter.AbstractMessageConverter
|
||||
|
||||
# attachments
|
||||
for attachment in message.attachments:
|
||||
async with aiohttp.ClientSession(trust_env=True) as session:
|
||||
async with session.get(attachment.url) as response:
|
||||
image_data = await response.read()
|
||||
image_base64 = base64.b64encode(image_data).decode('utf-8')
|
||||
image_format = response.headers['Content-Type']
|
||||
element_list.append(platform_message.Image(base64=f'data:{image_format};base64,{image_base64}'))
|
||||
session = httpclient.get_session(trust_env=True)
|
||||
async with session.get(attachment.url) as response:
|
||||
image_data = await response.read()
|
||||
image_base64 = base64.b64encode(image_data).decode('utf-8')
|
||||
image_format = response.headers['Content-Type']
|
||||
element_list.append(platform_message.Image(base64=f'data:{image_format};base64,{image_base64}'))
|
||||
|
||||
return platform_message.MessageChain(element_list)
|
||||
|
||||
|
||||
@@ -9,6 +9,8 @@ import traceback
|
||||
import time
|
||||
|
||||
import aiohttp
|
||||
|
||||
from langbot.pkg.utils import httpclient
|
||||
import websockets
|
||||
import pydantic
|
||||
|
||||
@@ -120,16 +122,16 @@ class KookMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
|
||||
if content:
|
||||
# Download image and convert to base64
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(content) as response:
|
||||
if response.status == 200:
|
||||
image_bytes = await response.read()
|
||||
image_base64 = base64.b64encode(image_bytes).decode('utf-8')
|
||||
# Detect image format
|
||||
content_type = response.headers.get('Content-Type', 'image/png')
|
||||
components.append(
|
||||
platform_message.Image(base64=f'data:{content_type};base64,{image_base64}')
|
||||
)
|
||||
session = httpclient.get_session()
|
||||
async with session.get(content) as response:
|
||||
if response.status == 200:
|
||||
image_bytes = await response.read()
|
||||
image_base64 = base64.b64encode(image_bytes).decode('utf-8')
|
||||
# Detect image format
|
||||
content_type = response.headers.get('Content-Type', 'image/png')
|
||||
components.append(
|
||||
platform_message.Image(base64=f'data:{content_type};base64,{image_base64}')
|
||||
)
|
||||
except Exception:
|
||||
# If download fails, just add as plain text
|
||||
components.append(platform_message.Plain(text=f'[Image: {content}]'))
|
||||
@@ -295,17 +297,17 @@ class KookAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
'Authorization': f'Bot {self.config["token"]}',
|
||||
}
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(base_url, params=params, headers=headers) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
if data.get('code') == 0:
|
||||
gateway_url = data['data']['url']
|
||||
return gateway_url
|
||||
else:
|
||||
raise Exception(f'Failed to get gateway URL: {data.get("message")}')
|
||||
session = httpclient.get_session()
|
||||
async with session.get(base_url, params=params, headers=headers) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
if data.get('code') == 0:
|
||||
gateway_url = data['data']['url']
|
||||
return gateway_url
|
||||
else:
|
||||
raise Exception(f'Failed to get gateway URL: HTTP {response.status}')
|
||||
raise Exception(f'Failed to get gateway URL: {data.get("message")}')
|
||||
else:
|
||||
raise Exception(f'Failed to get gateway URL: HTTP {response.status}')
|
||||
|
||||
async def _get_bot_user_info(self) -> dict:
|
||||
"""Get bot's own user information from KOOK API"""
|
||||
@@ -315,17 +317,17 @@ class KookAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
'Authorization': f'Bot {self.config["token"]}',
|
||||
}
|
||||
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(base_url, headers=headers) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
if data.get('code') == 0:
|
||||
user_info = data['data']
|
||||
return user_info
|
||||
else:
|
||||
raise Exception(f'Failed to get bot user info: {data.get("message")}')
|
||||
session = httpclient.get_session()
|
||||
async with session.get(base_url, headers=headers) as response:
|
||||
if response.status == 200:
|
||||
data = await response.json()
|
||||
if data.get('code') == 0:
|
||||
user_info = data['data']
|
||||
return user_info
|
||||
else:
|
||||
raise Exception(f'Failed to get bot user info: HTTP {response.status}')
|
||||
raise Exception(f'Failed to get bot user info: {data.get("message")}')
|
||||
else:
|
||||
raise Exception(f'Failed to get bot user info: HTTP {response.status}')
|
||||
|
||||
async def _handle_hello(self, data: dict):
|
||||
"""Handle HELLO signal (signal 1)"""
|
||||
@@ -510,7 +512,7 @@ class KookAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
|
||||
try:
|
||||
if not self.http_session:
|
||||
self.http_session = aiohttp.ClientSession()
|
||||
self.http_session = httpclient.get_session()
|
||||
|
||||
async with self.http_session.post(url, json=payload, headers=headers) as response:
|
||||
if response.status == 200:
|
||||
@@ -576,7 +578,7 @@ class KookAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
|
||||
try:
|
||||
if not self.http_session:
|
||||
self.http_session = aiohttp.ClientSession()
|
||||
self.http_session = httpclient.get_session()
|
||||
|
||||
async with self.http_session.post(url, json=payload, headers=headers) as response:
|
||||
if response.status == 200:
|
||||
@@ -624,7 +626,7 @@ class KookAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
|
||||
try:
|
||||
# Create HTTP session
|
||||
self.http_session = aiohttp.ClientSession()
|
||||
self.http_session = httpclient.get_session()
|
||||
|
||||
await self.logger.info('Starting KOOK adapter')
|
||||
|
||||
|
||||
@@ -17,7 +17,7 @@ import tempfile
|
||||
import os
|
||||
import mimetypes
|
||||
|
||||
import aiohttp
|
||||
from langbot.pkg.utils import httpclient
|
||||
import lark_oapi.ws.exception
|
||||
import quart
|
||||
from lark_oapi.api.im.v1 import *
|
||||
@@ -78,13 +78,13 @@ class LarkMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
|
||||
return None
|
||||
elif msg.url:
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(msg.url) as response:
|
||||
if response.status == 200:
|
||||
image_bytes = await response.read()
|
||||
else:
|
||||
print(f'Failed to download image from {msg.url}: HTTP {response.status}')
|
||||
return None
|
||||
session = httpclient.get_session()
|
||||
async with session.get(msg.url) as response:
|
||||
if response.status == 200:
|
||||
image_bytes = await response.read()
|
||||
else:
|
||||
print(f'Failed to download image from {msg.url}: HTTP {response.status}')
|
||||
return None
|
||||
except Exception as e:
|
||||
print(f'Failed to download image from {msg.url}: {e}')
|
||||
traceback.print_exc()
|
||||
@@ -208,10 +208,10 @@ class LarkMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
|
||||
pass
|
||||
elif msg.url:
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(msg.url) as resp:
|
||||
if resp.status == 200:
|
||||
data = await resp.read()
|
||||
session = httpclient.get_session()
|
||||
async with session.get(msg.url) as resp:
|
||||
if resp.status == 200:
|
||||
data = await resp.read()
|
||||
except Exception:
|
||||
pass
|
||||
elif msg.path:
|
||||
|
||||
@@ -9,7 +9,7 @@ import copy
|
||||
import threading
|
||||
|
||||
import quart
|
||||
import aiohttp
|
||||
from langbot.pkg.utils import httpclient
|
||||
|
||||
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
|
||||
from ....core import app
|
||||
@@ -639,14 +639,14 @@ class GeWeChatAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
|
||||
|
||||
async def run_async(self):
|
||||
if not self.config['token']:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
f'{self.config["gewechat_url"]}/v2/api/tools/getTokenId',
|
||||
json={'app_id': self.config['app_id']},
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
raise Exception(f'获取gewechat token失败: {await response.text()}')
|
||||
self.config['token'] = (await response.json())['data']
|
||||
session = httpclient.get_session()
|
||||
async with session.post(
|
||||
f'{self.config["gewechat_url"]}/v2/api/tools/getTokenId',
|
||||
json={'app_id': self.config['app_id']},
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
raise Exception(f'获取gewechat token失败: {await response.text()}')
|
||||
self.config['token'] = (await response.json())['data']
|
||||
|
||||
self.bot = gewechat_client.GewechatClient(f'{self.config["gewechat_url"]}/v2/api', self.config['token'])
|
||||
|
||||
|
||||
@@ -9,9 +9,9 @@ import telegramify_markdown
|
||||
import typing
|
||||
import traceback
|
||||
import base64
|
||||
import aiohttp
|
||||
import pydantic
|
||||
|
||||
from langbot.pkg.utils import httpclient
|
||||
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
|
||||
import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
||||
import langbot_plugin.api.entities.builtin.platform.events as platform_events
|
||||
@@ -33,9 +33,9 @@ class TelegramMessageConverter(abstract_platform_adapter.AbstractMessageConverte
|
||||
if component.base64:
|
||||
photo_bytes = base64.b64decode(component.base64)
|
||||
elif component.url:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(component.url) as response:
|
||||
photo_bytes = await response.read()
|
||||
session = httpclient.get_session()
|
||||
async with session.get(component.url) as response:
|
||||
photo_bytes = await response.read()
|
||||
elif component.path:
|
||||
with open(component.path, 'rb') as f:
|
||||
photo_bytes = f.read()
|
||||
@@ -74,10 +74,9 @@ class TelegramMessageConverter(abstract_platform_adapter.AbstractMessageConverte
|
||||
file_bytes = None
|
||||
file_format = ''
|
||||
|
||||
async with aiohttp.ClientSession(trust_env=True) as session:
|
||||
async with session.get(file.file_path) as response:
|
||||
file_bytes = await response.read()
|
||||
file_format = 'image/jpeg'
|
||||
async with httpclient.get_session(trust_env=True).get(file.file_path) as response:
|
||||
file_bytes = await response.read()
|
||||
file_format = 'image/jpeg'
|
||||
|
||||
message_components.append(
|
||||
platform_message.Image(
|
||||
@@ -94,9 +93,8 @@ class TelegramMessageConverter(abstract_platform_adapter.AbstractMessageConverte
|
||||
file_bytes = None
|
||||
file_format = message.voice.mime_type or 'audio/ogg'
|
||||
|
||||
async with aiohttp.ClientSession(trust_env=True) as session:
|
||||
async with session.get(file.file_path) as response:
|
||||
file_bytes = await response.read()
|
||||
async with httpclient.get_session(trust_env=True).get(file.file_path) as response:
|
||||
file_bytes = await response.read()
|
||||
|
||||
message_components.append(
|
||||
platform_message.Voice(
|
||||
|
||||
@@ -3,6 +3,8 @@ from __future__ import annotations
|
||||
import asyncio
|
||||
import logging
|
||||
import aiohttp
|
||||
|
||||
from langbot.pkg.utils import httpclient
|
||||
import uuid
|
||||
from typing import TYPE_CHECKING
|
||||
|
||||
@@ -119,23 +121,23 @@ class WebhookPusher:
|
||||
dict | None: The response JSON if successful, None otherwise
|
||||
"""
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.post(
|
||||
url,
|
||||
json=payload,
|
||||
headers={'Content-Type': 'application/json'},
|
||||
timeout=aiohttp.ClientTimeout(total=15),
|
||||
) as response:
|
||||
if response.status >= 400:
|
||||
self.logger.warning(f'Webhook {url} returned status {response.status}')
|
||||
session = httpclient.get_session()
|
||||
async with session.post(
|
||||
url,
|
||||
json=payload,
|
||||
headers={'Content-Type': 'application/json'},
|
||||
timeout=aiohttp.ClientTimeout(total=15),
|
||||
) as response:
|
||||
if response.status >= 400:
|
||||
self.logger.warning(f'Webhook {url} returned status {response.status}')
|
||||
return None
|
||||
else:
|
||||
self.logger.debug(f'Successfully pushed to webhook {url}')
|
||||
try:
|
||||
return await response.json()
|
||||
except Exception as json_error:
|
||||
self.logger.debug(f'Failed to parse JSON response from webhook {url}: {json_error}')
|
||||
return None
|
||||
else:
|
||||
self.logger.debug(f'Successfully pushed to webhook {url}')
|
||||
try:
|
||||
return await response.json()
|
||||
except Exception as json_error:
|
||||
self.logger.debug(f'Failed to parse JSON response from webhook {url}: {json_error}')
|
||||
return None
|
||||
except asyncio.TimeoutError:
|
||||
self.logger.warning(f'Timeout pushing to webhook {url}')
|
||||
return None
|
||||
|
||||
@@ -5,6 +5,8 @@ import json
|
||||
import uuid
|
||||
import aiohttp
|
||||
|
||||
from langbot.pkg.utils import httpclient
|
||||
|
||||
from .. import runner
|
||||
from ...core import app
|
||||
import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query
|
||||
@@ -217,50 +219,50 @@ class N8nServiceAPIRunner(runner.RequestRunner):
|
||||
self.ap.logger.debug('no auth')
|
||||
|
||||
# 调用webhook
|
||||
async with aiohttp.ClientSession() as session:
|
||||
if is_stream:
|
||||
# 流式请求
|
||||
async with session.post(
|
||||
self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout
|
||||
) as response:
|
||||
session = httpclient.get_session()
|
||||
if is_stream:
|
||||
# 流式请求
|
||||
async with session.post(
|
||||
self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
error_text = await response.text()
|
||||
self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}')
|
||||
raise Exception(f'n8n webhook call failed: {response.status}, {error_text}')
|
||||
|
||||
# 处理流式响应
|
||||
async for chunk in self._process_stream_response(response):
|
||||
yield chunk
|
||||
else:
|
||||
async with session.post(
|
||||
self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout
|
||||
) as response:
|
||||
try:
|
||||
async for chunk in self._process_stream_response(response):
|
||||
output_content = chunk.content if chunk.is_final else ''
|
||||
except:
|
||||
# 非流式请求(保持原有逻辑)
|
||||
if response.status != 200:
|
||||
error_text = await response.text()
|
||||
self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}')
|
||||
raise Exception(f'n8n webhook call failed: {response.status}, {error_text}')
|
||||
|
||||
# 处理流式响应
|
||||
async for chunk in self._process_stream_response(response):
|
||||
yield chunk
|
||||
else:
|
||||
async with session.post(
|
||||
self.webhook_url, json=payload, headers=headers, auth=auth, timeout=self.timeout
|
||||
) as response:
|
||||
try:
|
||||
async for chunk in self._process_stream_response(response):
|
||||
output_content = chunk.content if chunk.is_final else ''
|
||||
except:
|
||||
# 非流式请求(保持原有逻辑)
|
||||
if response.status != 200:
|
||||
error_text = await response.text()
|
||||
self.ap.logger.error(f'n8n webhook call failed: {response.status}, {error_text}')
|
||||
raise Exception(f'n8n webhook call failed: {response.status}, {error_text}')
|
||||
# 解析响应
|
||||
response_data = await response.json()
|
||||
self.ap.logger.debug(f'n8n webhook response: {response_data}')
|
||||
|
||||
# 解析响应
|
||||
response_data = await response.json()
|
||||
self.ap.logger.debug(f'n8n webhook response: {response_data}')
|
||||
# 从响应中提取输出
|
||||
if self.output_key in response_data:
|
||||
output_content = response_data[self.output_key]
|
||||
else:
|
||||
# 如果没有指定的输出键,则使用整个响应
|
||||
output_content = json.dumps(response_data, ensure_ascii=False)
|
||||
|
||||
# 从响应中提取输出
|
||||
if self.output_key in response_data:
|
||||
output_content = response_data[self.output_key]
|
||||
else:
|
||||
# 如果没有指定的输出键,则使用整个响应
|
||||
output_content = json.dumps(response_data, ensure_ascii=False)
|
||||
|
||||
# 返回消息
|
||||
yield provider_message.Message(
|
||||
role='assistant',
|
||||
content=output_content,
|
||||
)
|
||||
# 返回消息
|
||||
yield provider_message.Message(
|
||||
role='assistant',
|
||||
content=output_content,
|
||||
)
|
||||
except Exception as e:
|
||||
self.ap.logger.error(f'n8n webhook call exception: {str(e)}')
|
||||
raise N8nAPIError(f'n8n webhook call exception: {str(e)}')
|
||||
|
||||
43
src/langbot/pkg/utils/httpclient.py
Normal file
43
src/langbot/pkg/utils/httpclient.py
Normal file
@@ -0,0 +1,43 @@
|
||||
"""Shared aiohttp.ClientSession to avoid repeated SSL context creation.
|
||||
|
||||
Each call to `aiohttp.ClientSession()` creates a new `TCPConnector` which in turn
|
||||
creates a new `ssl.SSLContext` and loads all system root certificates. This is
|
||||
extremely expensive in both CPU and memory (~270MB total allocations observed via
|
||||
memray profiling).
|
||||
|
||||
This module provides a shared session pool so that all HTTP client code in LangBot
|
||||
reuses the same underlying SSL context and connection pool.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import aiohttp
|
||||
|
||||
_sessions: dict[str, aiohttp.ClientSession] = {}
|
||||
|
||||
|
||||
def get_session(*, trust_env: bool = False) -> aiohttp.ClientSession:
|
||||
"""Get or create a shared aiohttp.ClientSession.
|
||||
|
||||
Args:
|
||||
trust_env: Whether to trust environment variables for proxy settings.
|
||||
|
||||
Returns:
|
||||
A shared aiohttp.ClientSession instance.
|
||||
"""
|
||||
key = f'trust_env={trust_env}'
|
||||
|
||||
session = _sessions.get(key)
|
||||
if session is None or session.closed:
|
||||
session = aiohttp.ClientSession(trust_env=trust_env)
|
||||
_sessions[key] = session
|
||||
|
||||
return session
|
||||
|
||||
|
||||
async def close_all():
|
||||
"""Close all shared sessions. Call on application shutdown."""
|
||||
for session in _sessions.values():
|
||||
if not session.closed:
|
||||
await session.close()
|
||||
_sessions.clear()
|
||||
@@ -5,6 +5,8 @@ from urllib.parse import urlparse, parse_qs
|
||||
import ssl
|
||||
|
||||
import aiohttp
|
||||
|
||||
from langbot.pkg.utils import httpclient
|
||||
import PIL.Image
|
||||
import httpx
|
||||
|
||||
@@ -47,53 +49,54 @@ async def get_gewechat_image_base64(
|
||||
)
|
||||
|
||||
try:
|
||||
async with aiohttp.ClientSession(timeout=timeout) as session:
|
||||
# 获取图片下载链接
|
||||
try:
|
||||
async with session.post(
|
||||
f'{gewechat_url}/v2/api/message/downloadImage',
|
||||
headers=headers,
|
||||
json={'appId': app_id, 'type': image_type, 'xml': xml_content},
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
# print(response)
|
||||
raise Exception(f'获取gewechat图片下载失败: {await response.text()}')
|
||||
session = httpclient.get_session()
|
||||
# 获取图片下载链接
|
||||
try:
|
||||
async with session.post(
|
||||
f'{gewechat_url}/v2/api/message/downloadImage',
|
||||
headers=headers,
|
||||
json={'appId': app_id, 'type': image_type, 'xml': xml_content},
|
||||
timeout=timeout,
|
||||
) as response:
|
||||
if response.status != 200:
|
||||
# print(response)
|
||||
raise Exception(f'获取gewechat图片下载失败: {await response.text()}')
|
||||
|
||||
resp_data = await response.json()
|
||||
if resp_data.get('ret') != 200:
|
||||
raise Exception(f'获取gewechat图片下载链接失败: {resp_data}')
|
||||
resp_data = await response.json()
|
||||
if resp_data.get('ret') != 200:
|
||||
raise Exception(f'获取gewechat图片下载链接失败: {resp_data}')
|
||||
|
||||
file_url = resp_data['data']['fileUrl']
|
||||
except asyncio.TimeoutError:
|
||||
raise Exception('获取图片下载链接超时')
|
||||
except aiohttp.ClientError as e:
|
||||
raise Exception(f'获取图片下载链接网络错误: {str(e)}')
|
||||
file_url = resp_data['data']['fileUrl']
|
||||
except asyncio.TimeoutError:
|
||||
raise Exception('获取图片下载链接超时')
|
||||
except aiohttp.ClientError as e:
|
||||
raise Exception(f'获取图片下载链接网络错误: {str(e)}')
|
||||
|
||||
# 解析原始URL并替换端口
|
||||
base_url = gewechat_file_url
|
||||
download_url = f'{base_url}/download/{file_url}'
|
||||
# 解析原始URL并替换端口
|
||||
base_url = gewechat_file_url
|
||||
download_url = f'{base_url}/download/{file_url}'
|
||||
|
||||
# 下载图片
|
||||
try:
|
||||
async with session.get(download_url) as img_response:
|
||||
if img_response.status != 200:
|
||||
raise Exception(f'下载图片失败: {await img_response.text()}, URL: {download_url}')
|
||||
# 下载图片
|
||||
try:
|
||||
async with session.get(download_url) as img_response:
|
||||
if img_response.status != 200:
|
||||
raise Exception(f'下载图片失败: {await img_response.text()}, URL: {download_url}')
|
||||
|
||||
image_data = await img_response.read()
|
||||
image_data = await img_response.read()
|
||||
|
||||
content_type = img_response.headers.get('Content-Type', '')
|
||||
if content_type:
|
||||
image_format = content_type.split('/')[-1]
|
||||
else:
|
||||
image_format = file_url.split('.')[-1]
|
||||
content_type = img_response.headers.get('Content-Type', '')
|
||||
if content_type:
|
||||
image_format = content_type.split('/')[-1]
|
||||
else:
|
||||
image_format = file_url.split('.')[-1]
|
||||
|
||||
base64_str = base64.b64encode(image_data).decode('utf-8')
|
||||
base64_str = base64.b64encode(image_data).decode('utf-8')
|
||||
|
||||
return base64_str, image_format
|
||||
except asyncio.TimeoutError:
|
||||
raise Exception(f'下载图片超时, URL: {download_url}')
|
||||
except aiohttp.ClientError as e:
|
||||
raise Exception(f'下载图片网络错误: {str(e)}, URL: {download_url}')
|
||||
return base64_str, image_format
|
||||
except asyncio.TimeoutError:
|
||||
raise Exception(f'下载图片超时, URL: {download_url}')
|
||||
except aiohttp.ClientError as e:
|
||||
raise Exception(f'下载图片网络错误: {str(e)}, URL: {download_url}')
|
||||
except Exception as e:
|
||||
raise Exception(f'获取图片失败: {str(e)}') from e
|
||||
|
||||
@@ -104,24 +107,24 @@ async def get_wecom_image_base64(pic_url: str) -> tuple[str, str]:
|
||||
:param pic_url: 企业微信图片URL
|
||||
:return: (base64_str, image_format)
|
||||
"""
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(pic_url) as response:
|
||||
if response.status != 200:
|
||||
raise Exception(f'Failed to download image: {response.status}')
|
||||
session = httpclient.get_session()
|
||||
async with session.get(pic_url) as response:
|
||||
if response.status != 200:
|
||||
raise Exception(f'Failed to download image: {response.status}')
|
||||
|
||||
# 读取图片数据
|
||||
image_data = await response.read()
|
||||
# 读取图片数据
|
||||
image_data = await response.read()
|
||||
|
||||
# 获取图片格式
|
||||
content_type = response.headers.get('Content-Type', '')
|
||||
image_format = content_type.split('/')[-1] # 例如 'image/jpeg' -> 'jpeg'
|
||||
# 获取图片格式
|
||||
content_type = response.headers.get('Content-Type', '')
|
||||
image_format = content_type.split('/')[-1] # 例如 'image/jpeg' -> 'jpeg'
|
||||
|
||||
# 转换为 base64
|
||||
import base64
|
||||
# 转换为 base64
|
||||
import base64
|
||||
|
||||
image_base64 = base64.b64encode(image_data).decode('utf-8')
|
||||
image_base64 = base64.b64encode(image_data).decode('utf-8')
|
||||
|
||||
return image_base64, image_format
|
||||
return image_base64, image_format
|
||||
|
||||
|
||||
async def get_qq_official_image_base64(pic_url: str, content_type: str) -> tuple[str, str]:
|
||||
@@ -152,21 +155,19 @@ async def get_qq_image_bytes(image_url: str, query: dict = {}) -> tuple[bytes, s
|
||||
ssl_context = ssl.create_default_context()
|
||||
ssl_context.check_hostname = False
|
||||
ssl_context.verify_mode = ssl.CERT_NONE
|
||||
async with aiohttp.ClientSession(trust_env=False) as session:
|
||||
async with session.get(
|
||||
image_url, params=query, ssl=ssl_context, timeout=aiohttp.ClientTimeout(total=30.0)
|
||||
) as resp:
|
||||
resp.raise_for_status()
|
||||
file_bytes = await resp.read()
|
||||
content_type = resp.headers.get('Content-Type')
|
||||
if not content_type:
|
||||
image_format = 'jpeg'
|
||||
elif not content_type.startswith('image/'):
|
||||
pil_img = PIL.Image.open(io.BytesIO(file_bytes))
|
||||
image_format = pil_img.format.lower()
|
||||
else:
|
||||
image_format = content_type.split('/')[-1]
|
||||
return file_bytes, image_format
|
||||
session = httpclient.get_session()
|
||||
async with session.get(image_url, params=query, ssl=ssl_context, timeout=aiohttp.ClientTimeout(total=30.0)) as resp:
|
||||
resp.raise_for_status()
|
||||
file_bytes = await resp.read()
|
||||
content_type = resp.headers.get('Content-Type')
|
||||
if not content_type:
|
||||
image_format = 'jpeg'
|
||||
elif not content_type.startswith('image/'):
|
||||
pil_img = PIL.Image.open(io.BytesIO(file_bytes))
|
||||
image_format = pil_img.format.lower()
|
||||
else:
|
||||
image_format = content_type.split('/')[-1]
|
||||
return file_bytes, image_format
|
||||
|
||||
|
||||
async def qq_image_url_to_base64(image_url: str) -> typing.Tuple[str, str]:
|
||||
@@ -204,11 +205,11 @@ async def extract_b64_and_format(image_base64_data: str) -> typing.Tuple[str, st
|
||||
async def get_slack_image_to_base64(pic_url: str, bot_token: str):
|
||||
headers = {'Authorization': f'Bearer {bot_token}'}
|
||||
try:
|
||||
async with aiohttp.ClientSession() as session:
|
||||
async with session.get(pic_url, headers=headers) as resp:
|
||||
mime_type = resp.headers.get('Content-Type', 'application/octet-stream')
|
||||
file_bytes = await resp.read()
|
||||
base64_str = base64.b64encode(file_bytes).decode('utf-8')
|
||||
return f'data:{mime_type};base64,{base64_str}'
|
||||
session = httpclient.get_session()
|
||||
async with session.get(pic_url, headers=headers) as resp:
|
||||
mime_type = resp.headers.get('Content-Type', 'application/octet-stream')
|
||||
file_bytes = await resp.read()
|
||||
base64_str = base64.b64encode(file_bytes).decode('utf-8')
|
||||
return f'data:{mime_type};base64,{base64_str}'
|
||||
except Exception as e:
|
||||
raise (e)
|
||||
|
||||
Reference in New Issue
Block a user