Files
LangBot/pkg/plugin/handler.py
2025-07-13 17:44:20 +08:00

187 lines
5.6 KiB
Python

from __future__ import annotations
import typing
from typing import Any
import sqlalchemy
from langbot_plugin.runtime.io import handler
from langbot_plugin.runtime.io.connection import Connection
from langbot_plugin.entities.io.actions.enums import (
CommonAction,
RuntimeToLangBotAction,
LangBotToRuntimeAction,
PluginToRuntimeAction,
)
import langbot_plugin.api.entities.builtin.platform.message as platform_message
from ..entity.persistence import plugin as persistence_plugin
from ..core import app
class RuntimeConnectionHandler(handler.Handler):
"""Runtime connection handler"""
ap: app.Application
def __init__(
self,
connection: Connection,
disconnect_callback: typing.Callable[[], typing.Coroutine[typing.Any, typing.Any, bool]],
ap: app.Application,
):
super().__init__(connection, disconnect_callback)
self.ap = ap
@self.action(RuntimeToLangBotAction.GET_PLUGIN_SETTINGS)
async def get_plugin_settings(data: dict[str, Any]) -> handler.ActionResponse:
"""Get plugin settings"""
plugin_author = data['plugin_author']
plugin_name = data['plugin_name']
result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.select(persistence_plugin.PluginSetting)
.where(persistence_plugin.PluginSetting.plugin_author == plugin_author)
.where(persistence_plugin.PluginSetting.plugin_name == plugin_name)
)
data = {
'enabled': False,
'priority': 0,
'plugin_config': {},
}
setting = result.first()
if setting is not None:
data['enabled'] = setting.enabled
data['priority'] = setting.priority
data['plugin_config'] = setting.config
return handler.ActionResponse.success(
data=data,
)
@self.action(PluginToRuntimeAction.REPLY_MESSAGE)
async def reply_message(data: dict[str, Any]) -> handler.ActionResponse:
"""Reply message"""
query_id = data['query_id']
message_chain = data['message_chain']
quote_origin = data['quote_origin']
if query_id not in self.ap.query_pool.cached_queries:
return handler.ActionResponse.error(
message=f'Query with query_id {query_id} not found',
)
query = self.ap.query_pool.cached_queries[query_id]
message_chain_obj = platform_message.MessageChain.model_validate(message_chain)
await query.adapter.reply_message(
query.message_event,
message_chain_obj,
quote_origin,
)
return handler.ActionResponse.success(
data={},
)
@self.action(PluginToRuntimeAction.GET_BOT_UUID)
async def get_bot_uuid(data: dict[str, Any]) -> handler.ActionResponse:
"""Get bot uuid"""
query_id = data['query_id']
if query_id not in self.ap.query_pool.cached_queries:
return handler.ActionResponse.error(
message=f'Query with query_id {query_id} not found',
)
query = self.ap.query_pool.cached_queries[query_id]
return handler.ActionResponse.success(
data={
'bot_uuid': query.bot_uuid,
},
)
async def ping(self) -> dict[str, Any]:
"""Ping the runtime"""
return await self.call_action(
CommonAction.PING,
{},
timeout=10,
)
async def list_plugins(self) -> list[dict[str, Any]]:
"""List plugins"""
result = await self.call_action(
LangBotToRuntimeAction.LIST_PLUGINS,
{},
timeout=10,
)
return result['plugins']
async def emit_event(
self,
event_context: dict[str, Any],
) -> dict[str, Any]:
"""Emit event"""
result = await self.call_action(
LangBotToRuntimeAction.EMIT_EVENT,
{
'event_context': event_context,
},
timeout=10,
)
return result
async def list_tools(self) -> list[dict[str, Any]]:
"""List tools"""
result = await self.call_action(
LangBotToRuntimeAction.LIST_TOOLS,
{},
timeout=10,
)
return result['tools']
async def call_tool(self, tool_name: str, parameters: dict[str, Any]) -> dict[str, Any]:
"""Call tool"""
result = await self.call_action(
LangBotToRuntimeAction.CALL_TOOL,
{
'tool_name': tool_name,
'tool_parameters': parameters,
},
timeout=30,
)
return result['tool_response']
async def list_commands(self) -> list[dict[str, Any]]:
"""List commands"""
result = await self.call_action(
LangBotToRuntimeAction.LIST_COMMANDS,
{},
timeout=10,
)
return result['commands']
async def execute_command(self, command_context: dict[str, Any]) -> typing.AsyncGenerator[dict[str, Any], None]:
"""Execute command"""
gen = self.call_action_generator(
LangBotToRuntimeAction.EXECUTE_COMMAND,
{
'command_context': command_context,
},
timeout=10,
)
async for ret in gen:
yield ret