from __future__ import annotations import typing from typing import Any import sqlalchemy from langbot_plugin.runtime.io import handler from langbot_plugin.runtime.io.connection import Connection from langbot_plugin.entities.io.actions.enums import ( CommonAction, RuntimeToLangBotAction, LangBotToRuntimeAction, PluginToRuntimeAction, ) import langbot_plugin.api.entities.context as event_context_module import langbot_plugin.api.entities.builtin.platform.message as platform_message from ..entity.persistence import plugin as persistence_plugin from ..core import app class RuntimeConnectionHandler(handler.Handler): """Runtime connection handler""" ap: app.Application def __init__( self, connection: Connection, disconnect_callback: typing.Callable[[], typing.Coroutine[typing.Any, typing.Any, bool]], ap: app.Application, ): super().__init__(connection, disconnect_callback) self.ap = ap @self.action(RuntimeToLangBotAction.GET_PLUGIN_SETTINGS) async def get_plugin_settings(data: dict[str, Any]) -> handler.ActionResponse: """Get plugin settings""" plugin_author = data['plugin_author'] plugin_name = data['plugin_name'] result = await self.ap.persistence_mgr.execute_async( sqlalchemy.select(persistence_plugin.PluginSetting) .where(persistence_plugin.PluginSetting.plugin_author == plugin_author) .where(persistence_plugin.PluginSetting.plugin_name == plugin_name) ) data = { 'enabled': False, 'priority': 0, 'plugin_config': {}, } setting = result.first() if setting is not None: data['enabled'] = setting.enabled data['priority'] = setting.priority data['plugin_config'] = setting.config return handler.ActionResponse.success( data=data, ) @self.action(PluginToRuntimeAction.REPLY_MESSAGE) async def reply_message(data: dict[str, Any]) -> handler.ActionResponse: """Reply message""" eid = data['eid'] message_chain = data['message_chain'] quote_origin = data['quote_origin'] if eid not in event_context_module.cached_event_contexts: return handler.ActionResponse.error( message=f'Event context with eid {eid} not found', ) event_context = event_context_module.cached_event_contexts[eid] message_chain_obj = platform_message.MessageChain.model_validate(message_chain) await event_context.event.query.adapter.reply_message( event_context.event.query.message_event, message_chain_obj, quote_origin, ) return handler.ActionResponse.success( data={}, ) async def ping(self) -> dict[str, Any]: """Ping the runtime""" return await self.call_action( CommonAction.PING, {}, timeout=10, ) async def list_plugins(self) -> list[dict[str, Any]]: """List plugins""" result = await self.call_action( LangBotToRuntimeAction.LIST_PLUGINS, {}, timeout=10, ) return result['plugins'] async def emit_event( self, event_context: dict[str, Any], ) -> dict[str, Any]: """Emit event""" result = await self.call_action( LangBotToRuntimeAction.EMIT_EVENT, { 'event_context': event_context, }, timeout=10, ) return result async def list_tools(self) -> list[dict[str, Any]]: """List tools""" result = await self.call_action( LangBotToRuntimeAction.LIST_TOOLS, {}, timeout=10, ) return result['tools'] async def call_tool(self, tool_name: str, parameters: dict[str, Any]) -> dict[str, Any]: """Call tool""" result = await self.call_action( LangBotToRuntimeAction.CALL_TOOL, { 'tool_name': tool_name, 'tool_parameters': parameters, }, timeout=30, ) return result['tool_response'] async def list_commands(self) -> list[dict[str, Any]]: """List commands""" result = await self.call_action( LangBotToRuntimeAction.LIST_COMMANDS, {}, timeout=10, ) return result['commands'] async def execute_command(self, command_context: dict[str, Any]) -> typing.AsyncGenerator[dict[str, Any], None]: """Execute command""" gen = self.call_action_generator( LangBotToRuntimeAction.EXECUTE_COMMAND, { 'command_context': command_context, }, timeout=10, ) async for ret in gen: yield ret