Files
LangBot/libs/wechatpad_api/util/http_util.py
T
2025-07-05 21:56:54 +08:00

79 lines
2.2 KiB
Python

import requests
import aiohttp
def post_json(base_url, token, data=None):
headers = {'Content-Type': 'application/json'}
url = base_url + f'?key={token}'
try:
response = requests.post(url, json=data, headers=headers, timeout=60)
response.raise_for_status()
result = response.json()
if result:
return result
else:
raise RuntimeError(response.text)
except Exception as e:
print(f'http请求失败, url={url}, exception={e}')
raise RuntimeError(str(e))
def get_json(base_url, token):
headers = {'Content-Type': 'application/json'}
url = base_url + f'?key={token}'
try:
response = requests.get(url, headers=headers, timeout=60)
response.raise_for_status()
result = response.json()
if result:
return result
else:
raise RuntimeError(response.text)
except Exception as e:
print(f'http请求失败, url={url}, exception={e}')
raise RuntimeError(str(e))
async def async_request(
base_url: str,
token_key: str,
method: str = 'POST',
params: dict = None,
# headers: dict = None,
data: dict = None,
json: dict = None,
):
"""
通用异步请求函数
:param base_url: 请求URL
:param token_key: 请求token
:param method: HTTP方法 (GET, POST, PUT, DELETE等)
:param params: URL查询参数
# :param headers: 请求头
:param data: 表单数据
:param json: JSON数据
:return: 响应文本
"""
headers = {'Content-Type': 'application/json'}
url = f'{base_url}?key={token_key}'
async with aiohttp.ClientSession() as session:
async with session.request(
method=method, url=url, params=params, headers=headers, data=data, json=json
) as response:
response.raise_for_status() # 如果状态码不是200,抛出异常
result = await response.json()
# print(result)
return result
# if result.get('Code') == 200:
#
# return await result
# else:
# raise RuntimeError("请求失败",response.text)