mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-23 14:04:19 +00:00
b96f209b98
- Added module-level storage for pending forms to manage state across sessions. - Introduced functions to set, get, and clear pending forms with expiration handling. - Enhanced DifyServiceAPIRunner to support resuming paused workflows via form actions. - Implemented logic to yield human input requests and display appropriate messages. - Updated workflow submission methods to handle paused states and resume actions. - Ensured proper merging of pending form actions with user inputs for seamless interaction.
211 lines
6.8 KiB
Python
211 lines
6.8 KiB
Python
from __future__ import annotations
|
|
|
|
import httpx
|
|
import typing
|
|
import json
|
|
|
|
from .errors import DifyAPIError
|
|
from pathlib import Path
|
|
import os
|
|
|
|
|
|
class AsyncDifyServiceClient:
|
|
"""Dify Service API 客户端"""
|
|
|
|
api_key: str
|
|
base_url: str
|
|
|
|
def __init__(
|
|
self,
|
|
api_key: str,
|
|
base_url: str = 'https://api.dify.ai/v1',
|
|
) -> None:
|
|
self.api_key = api_key
|
|
self.base_url = base_url
|
|
|
|
async def chat_messages(
|
|
self,
|
|
inputs: dict[str, typing.Any],
|
|
query: str,
|
|
user: str,
|
|
response_mode: str = 'streaming', # 当前不支持 blocking
|
|
conversation_id: str = '',
|
|
files: list[dict[str, typing.Any]] = [],
|
|
timeout: float = 30.0,
|
|
model_config: dict[str, typing.Any] | None = None,
|
|
) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
|
|
"""发送消息"""
|
|
if response_mode != 'streaming':
|
|
raise DifyAPIError('当前仅支持 streaming 模式')
|
|
|
|
async with httpx.AsyncClient(
|
|
base_url=self.base_url,
|
|
trust_env=True,
|
|
timeout=timeout,
|
|
) as client:
|
|
payload = {
|
|
'inputs': inputs,
|
|
'query': query,
|
|
'user': user,
|
|
'response_mode': response_mode,
|
|
'conversation_id': conversation_id,
|
|
'files': files,
|
|
'model_config': model_config or {},
|
|
}
|
|
|
|
async with client.stream(
|
|
'POST',
|
|
'/chat-messages',
|
|
headers={
|
|
'Authorization': f'Bearer {self.api_key}',
|
|
'Content-Type': 'application/json',
|
|
},
|
|
json=payload,
|
|
) as r:
|
|
async for chunk in r.aiter_lines():
|
|
if r.status_code != 200:
|
|
raise DifyAPIError(f'{r.status_code} {chunk}')
|
|
if chunk.strip() == '':
|
|
continue
|
|
if chunk.startswith('data:'):
|
|
yield json.loads(chunk[5:])
|
|
|
|
async def workflow_run(
|
|
self,
|
|
inputs: dict[str, typing.Any],
|
|
user: str,
|
|
response_mode: str = 'streaming', # 当前不支持 blocking
|
|
files: list[dict[str, typing.Any]] = [],
|
|
timeout: float = 30.0,
|
|
) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
|
|
"""运行工作流"""
|
|
if response_mode != 'streaming':
|
|
raise DifyAPIError('当前仅支持 streaming 模式')
|
|
|
|
async with httpx.AsyncClient(
|
|
base_url=self.base_url,
|
|
trust_env=True,
|
|
timeout=timeout,
|
|
) as client:
|
|
async with client.stream(
|
|
'POST',
|
|
'/workflows/run',
|
|
headers={
|
|
'Authorization': f'Bearer {self.api_key}',
|
|
'Content-Type': 'application/json',
|
|
},
|
|
json={
|
|
'inputs': inputs,
|
|
'user': user,
|
|
'response_mode': response_mode,
|
|
'files': files,
|
|
},
|
|
) as r:
|
|
async for chunk in r.aiter_lines():
|
|
if r.status_code != 200:
|
|
raise DifyAPIError(f'{r.status_code} {chunk}')
|
|
if chunk.strip() == '':
|
|
continue
|
|
if chunk.startswith('data:'):
|
|
yield json.loads(chunk[5:])
|
|
|
|
async def workflow_submit(
|
|
self,
|
|
form_token: str,
|
|
workflow_run_id: str,
|
|
inputs: dict[str, typing.Any],
|
|
user: str,
|
|
action: str = '',
|
|
timeout: float = 120.0,
|
|
) -> typing.AsyncGenerator[dict[str, typing.Any], None]:
|
|
"""Submit human input to resume a paused workflow, then stream events.
|
|
|
|
1. POST /form/human_input/{form_token} to submit the form
|
|
2. GET /workflow/{task_id}/events to stream the resumed workflow events
|
|
"""
|
|
|
|
headers = {
|
|
'Authorization': f'Bearer {self.api_key}',
|
|
'Content-Type': 'application/json',
|
|
}
|
|
|
|
async with httpx.AsyncClient(
|
|
base_url=self.base_url,
|
|
trust_env=True,
|
|
timeout=timeout,
|
|
) as client:
|
|
# Step 1: Submit the form
|
|
payload: dict[str, typing.Any] = {
|
|
'inputs': inputs if isinstance(inputs, dict) else {},
|
|
'user': user,
|
|
'action': action,
|
|
}
|
|
|
|
submit_resp = await client.post(
|
|
f'/form/human_input/{form_token}',
|
|
headers=headers,
|
|
json=payload,
|
|
)
|
|
if submit_resp.status_code != 200:
|
|
raise DifyAPIError(f'{submit_resp.status_code} {submit_resp.text}')
|
|
|
|
# Step 2: Stream resumed workflow events
|
|
async with client.stream(
|
|
'GET',
|
|
f'/workflow/{workflow_run_id}/events',
|
|
headers={'Authorization': f'Bearer {self.api_key}'},
|
|
params={'user': user},
|
|
) as r:
|
|
async for chunk in r.aiter_lines():
|
|
if r.status_code != 200:
|
|
raise DifyAPIError(f'{r.status_code} {chunk}')
|
|
if chunk.strip() == '':
|
|
continue
|
|
if chunk.startswith('data:'):
|
|
yield json.loads(chunk[5:])
|
|
|
|
async def upload_file(
|
|
self,
|
|
file: httpx._types.FileTypes,
|
|
user: str,
|
|
timeout: float = 30.0,
|
|
) -> str:
|
|
# 处理 Path 对象
|
|
if isinstance(file, Path):
|
|
if not file.exists():
|
|
raise ValueError(f'File not found: {file}')
|
|
with open(file, 'rb') as f:
|
|
file = f.read()
|
|
|
|
# 处理文件路径字符串
|
|
elif isinstance(file, str):
|
|
if not os.path.isfile(file):
|
|
raise ValueError(f'File not found: {file}')
|
|
with open(file, 'rb') as f:
|
|
file = f.read()
|
|
|
|
# 处理文件对象
|
|
elif hasattr(file, 'read'):
|
|
file = file.read()
|
|
async with httpx.AsyncClient(
|
|
base_url=self.base_url,
|
|
trust_env=True,
|
|
timeout=timeout,
|
|
) as client:
|
|
# multipart/form-data
|
|
response = await client.post(
|
|
'/files/upload',
|
|
headers={'Authorization': f'Bearer {self.api_key}'},
|
|
files={
|
|
'file': file,
|
|
},
|
|
data={
|
|
'user': (None, user),
|
|
},
|
|
)
|
|
|
|
if response.status_code != 201:
|
|
raise DifyAPIError(f'{response.status_code} {response.text}')
|
|
|
|
return response.json()
|