from __future__ import annotations import typing from typing import Any import base64 import traceback import sqlalchemy from langbot_plugin.runtime.io import handler from langbot_plugin.runtime.io.connection import Connection from langbot_plugin.entities.io.actions.enums import ( CommonAction, RuntimeToLangBotAction, LangBotToRuntimeAction, PluginToRuntimeAction, ) import langbot_plugin.api.entities.builtin.platform.message as platform_message import langbot_plugin.api.entities.builtin.provider.message as provider_message import langbot_plugin.api.entities.builtin.resource.tool as resource_tool from ..entity.persistence import plugin as persistence_plugin from ..entity.persistence import bstorage as persistence_bstorage from ..core import app from ..utils import constants def _make_rag_error_response(error: Exception, error_type: str, **extra_context) -> handler.ActionResponse: """Create a clean error response for RAG operations. Args: error: The caught exception. error_type: A category string like 'EmbeddingError', 'VectorStoreError'. **extra_context: Additional context fields for the error message. """ context_parts = [f'{k}={v}' for k, v in extra_context.items()] context_str = f' [{", ".join(context_parts)}]' if context_parts else '' message = f'[{error_type}/{type(error).__name__}]{context_str} {str(error)}' return handler.ActionResponse.error(message=message) class RuntimeConnectionHandler(handler.Handler): """Runtime connection handler""" ap: app.Application def __init__( self, connection: Connection, disconnect_callback: typing.Callable[[], typing.Coroutine[typing.Any, typing.Any, bool]], ap: app.Application, ): super().__init__(connection, disconnect_callback) self.ap = ap @self.action(RuntimeToLangBotAction.INITIALIZE_PLUGIN_SETTINGS) async def initialize_plugin_settings(data: dict[str, Any]) -> handler.ActionResponse: """Initialize plugin settings""" # check if exists plugin setting plugin_author = data['plugin_author'] plugin_name = data['plugin_name'] install_source = data['install_source'] install_info = data['install_info'] try: result = await self.ap.persistence_mgr.execute_async( sqlalchemy.select(persistence_plugin.PluginSetting) .where(persistence_plugin.PluginSetting.plugin_author == plugin_author) .where(persistence_plugin.PluginSetting.plugin_name == plugin_name) ) setting = result.first() if setting is not None: # delete plugin setting await self.ap.persistence_mgr.execute_async( sqlalchemy.delete(persistence_plugin.PluginSetting) .where(persistence_plugin.PluginSetting.plugin_author == plugin_author) .where(persistence_plugin.PluginSetting.plugin_name == plugin_name) ) # create plugin setting await self.ap.persistence_mgr.execute_async( sqlalchemy.insert(persistence_plugin.PluginSetting).values( plugin_author=plugin_author, plugin_name=plugin_name, install_source=install_source, install_info=install_info, # inherit from existing setting enabled=setting.enabled if setting is not None else True, priority=setting.priority if setting is not None else 0, config=setting.config if setting is not None else {}, # noqa: F821 ) ) return handler.ActionResponse.success( data={}, ) except Exception as e: traceback.print_exc() return handler.ActionResponse.error( message=f'Failed to initialize plugin settings: {e}', ) @self.action(RuntimeToLangBotAction.GET_PLUGIN_SETTINGS) async def get_plugin_settings(data: dict[str, Any]) -> handler.ActionResponse: """Get plugin settings""" plugin_author = data['plugin_author'] plugin_name = data['plugin_name'] result = await self.ap.persistence_mgr.execute_async( sqlalchemy.select(persistence_plugin.PluginSetting) .where(persistence_plugin.PluginSetting.plugin_author == plugin_author) .where(persistence_plugin.PluginSetting.plugin_name == plugin_name) ) data = { 'enabled': True, 'priority': 0, 'plugin_config': {}, 'install_source': 'local', 'install_info': {}, } setting = result.first() if setting is not None: data['enabled'] = setting.enabled data['priority'] = setting.priority data['plugin_config'] = setting.config data['install_source'] = setting.install_source data['install_info'] = setting.install_info return handler.ActionResponse.success( data=data, ) @self.action(PluginToRuntimeAction.REPLY_MESSAGE) async def reply_message(data: dict[str, Any]) -> handler.ActionResponse: """Reply message""" query_id = data['query_id'] message_chain = data['message_chain'] quote_origin = data['quote_origin'] if query_id not in self.ap.query_pool.cached_queries: return handler.ActionResponse.error( message=f'Query with query_id {query_id} not found', ) query = self.ap.query_pool.cached_queries[query_id] message_chain_obj = platform_message.MessageChain.model_validate(message_chain) self.ap.logger.debug(f'Reply message: {message_chain_obj.model_dump(serialize_as_any=False)}') await query.adapter.reply_message( query.message_event, message_chain_obj, quote_origin, ) return handler.ActionResponse.success( data={}, ) @self.action(PluginToRuntimeAction.GET_BOT_UUID) async def get_bot_uuid(data: dict[str, Any]) -> handler.ActionResponse: """Get bot uuid""" query_id = data['query_id'] if query_id not in self.ap.query_pool.cached_queries: return handler.ActionResponse.error( message=f'Query with query_id {query_id} not found', ) query = self.ap.query_pool.cached_queries[query_id] return handler.ActionResponse.success( data={ 'bot_uuid': query.bot_uuid, }, ) @self.action(PluginToRuntimeAction.SET_QUERY_VAR) async def set_query_var(data: dict[str, Any]) -> handler.ActionResponse: """Set query var""" query_id = data['query_id'] key = data['key'] value = data['value'] if query_id not in self.ap.query_pool.cached_queries: return handler.ActionResponse.error( message=f'Query with query_id {query_id} not found', ) query = self.ap.query_pool.cached_queries[query_id] query.variables[key] = value return handler.ActionResponse.success( data={}, ) @self.action(PluginToRuntimeAction.GET_QUERY_VAR) async def get_query_var(data: dict[str, Any]) -> handler.ActionResponse: """Get query var""" query_id = data['query_id'] key = data['key'] if query_id not in self.ap.query_pool.cached_queries: return handler.ActionResponse.error( message=f'Query with query_id {query_id} not found', ) query = self.ap.query_pool.cached_queries[query_id] return handler.ActionResponse.success( data={ 'value': query.variables[key], }, ) @self.action(PluginToRuntimeAction.GET_QUERY_VARS) async def get_query_vars(data: dict[str, Any]) -> handler.ActionResponse: """Get query vars""" query_id = data['query_id'] if query_id not in self.ap.query_pool.cached_queries: return handler.ActionResponse.error( message=f'Query with query_id {query_id} not found', ) query = self.ap.query_pool.cached_queries[query_id] return handler.ActionResponse.success( data={ 'vars': query.variables, }, ) @self.action(PluginToRuntimeAction.CREATE_NEW_CONVERSATION) async def create_new_conversation(data: dict[str, Any]) -> handler.ActionResponse: """Create new conversation""" query_id = data['query_id'] if query_id not in self.ap.query_pool.cached_queries: return handler.ActionResponse.error( message=f'Query with query_id {query_id} not found', ) query = self.ap.query_pool.cached_queries[query_id] query.session.using_conversation = None return handler.ActionResponse.success( data={}, ) @self.action(PluginToRuntimeAction.GET_LANGBOT_VERSION) async def get_langbot_version(data: dict[str, Any]) -> handler.ActionResponse: """Get langbot version""" return handler.ActionResponse.success( data={ 'version': constants.semantic_version, }, ) @self.action(PluginToRuntimeAction.GET_BOTS) async def get_bots(data: dict[str, Any]) -> handler.ActionResponse: """Get bots""" bots = await self.ap.bot_service.get_bots(include_secret=False) return handler.ActionResponse.success( data={ 'bots': bots, }, ) @self.action(PluginToRuntimeAction.GET_BOT_INFO) async def get_bot_info(data: dict[str, Any]) -> handler.ActionResponse: """Get bot info""" bot_uuid = data['bot_uuid'] bot = await self.ap.bot_service.get_runtime_bot_info(bot_uuid, include_secret=False) return handler.ActionResponse.success( data={ 'bot': bot, }, ) @self.action(PluginToRuntimeAction.SEND_MESSAGE) async def send_message(data: dict[str, Any]) -> handler.ActionResponse: """Send message""" bot_uuid = data['bot_uuid'] target_type = data['target_type'] target_id = data['target_id'] message_chain = data['message_chain'] # Use custom deserializer that properly handles Forward messages message_chain_obj = platform_message.MessageChain.model_validate(message_chain) bot = await self.ap.platform_mgr.get_bot_by_uuid(bot_uuid) if bot is None: return handler.ActionResponse.error( message=f'Bot with bot_uuid {bot_uuid} not found', ) await bot.adapter.send_message( target_type, target_id, message_chain_obj, ) return handler.ActionResponse.success( data={}, ) @self.action(PluginToRuntimeAction.GET_LLM_MODELS) async def get_llm_models(data: dict[str, Any]) -> handler.ActionResponse: """Get llm models, returns list of UUID strings""" llm_models = await self.ap.llm_model_service.get_llm_models(include_secret=False) return handler.ActionResponse.success( data={ 'llm_models': [m['uuid'] for m in llm_models], }, ) @self.action(PluginToRuntimeAction.INVOKE_LLM) async def invoke_llm(data: dict[str, Any]) -> handler.ActionResponse: """Invoke llm""" llm_model_uuid = data['llm_model_uuid'] messages = data['messages'] funcs = data.get('funcs', []) extra_args = data.get('extra_args', {}) llm_model = await self.ap.model_mgr.get_model_by_uuid(llm_model_uuid) if llm_model is None: return handler.ActionResponse.error( message=f'LLM model with llm_model_uuid {llm_model_uuid} not found', ) messages_obj = [provider_message.Message.model_validate(message) for message in messages] # The func field is excluded during model_dump() in plugin side (marked as exclude=True), # but it's a required field for LLMTool validation. We need to provide a placeholder # function when reconstructing the LLMTool objects from serialized data. async def _placeholder_func(**kwargs): pass funcs_obj = [resource_tool.LLMTool.model_validate({**func, 'func': _placeholder_func}) for func in funcs] result = await llm_model.provider.invoke_llm( query=None, model=llm_model, messages=messages_obj, funcs=funcs_obj, extra_args=extra_args, ) return handler.ActionResponse.success( data={ 'message': result.model_dump(), }, ) @self.action(PluginToRuntimeAction.INVOKE_LLM_STREAM) async def invoke_llm_stream(data: dict[str, Any]): """Invoke llm with streaming response""" llm_model_uuid = data['llm_model_uuid'] messages = data['messages'] funcs = data.get('funcs', []) extra_args = data.get('extra_args', {}) llm_model = await self.ap.model_mgr.get_model_by_uuid(llm_model_uuid) if llm_model is None: yield handler.ActionResponse.error( message=f'LLM model with llm_model_uuid {llm_model_uuid} not found', ) return messages_obj = [provider_message.Message.model_validate(message) for message in messages] funcs_obj = [resource_tool.LLMTool.model_validate(func) for func in funcs] async for chunk in llm_model.provider.invoke_llm_stream( query=None, model=llm_model, messages=messages_obj, funcs=funcs_obj, extra_args=extra_args, ): yield handler.ActionResponse.success( data={ 'chunk': chunk.model_dump(), }, ) @self.action(PluginToRuntimeAction.CALL_TOOL) async def call_tool(data: dict[str, Any]) -> handler.ActionResponse: """Call a tool""" tool_name = data['tool_name'] parameters = data['parameters'] # session_data = data['session'] # query_id = data['query_id'] # Convert session_data to Session object (simplified) # In real implementation, you would reconstruct the full session # For now, we'll call the tool manager's execute method try: result = await self.ap.tool_mgr.execute_func_call( name=tool_name, parameters=parameters, query=None, # TODO: reconstruct query from session_data if needed ) return handler.ActionResponse.success( data={ 'result': result, }, ) except Exception as e: traceback.print_exc() return handler.ActionResponse.error( message=f'Failed to execute tool {tool_name}: {e}', ) @self.action(PluginToRuntimeAction.RETRIEVE_KNOWLEDGE) async def retrieve_knowledge(data: dict[str, Any]) -> handler.ActionResponse: """Retrieve knowledge from a knowledge base""" kb_uuid = data['kb_uuid'] query = data['query'] top_k = data.get('top_k', 5) try: kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid) if kb is None: return handler.ActionResponse.error( message=f'Knowledge base with uuid {kb_uuid} not found', ) results = await kb.retrieve(query=query, top_k=top_k) # Convert results to dict format results_data = [ { 'id': r.id, 'content': [c.model_dump() for c in r.content], 'metadata': r.metadata, } for r in results ] return handler.ActionResponse.success( data={ 'results': results_data, }, ) except Exception as e: traceback.print_exc() return handler.ActionResponse.error( message=f'Failed to retrieve knowledge: {e}', ) @self.action(PluginToRuntimeAction.INVOKE_EMBEDDING) async def invoke_embedding(data: dict[str, Any]) -> handler.ActionResponse: """Invoke an embedding model""" embedding_model_uuid = data['embedding_model_uuid'] texts = data['texts'] try: embedding_model = await self.ap.model_mgr.get_embedding_model_by_uuid(embedding_model_uuid) if embedding_model is None: return handler.ActionResponse.error( message=f'Embedding model with uuid {embedding_model_uuid} not found', ) # Call embedding model to generate embeddings embeddings = [] for text in texts: embedding = await embedding_model.provider.invoke_embedding( model=embedding_model, text=text, ) embeddings.append(embedding) return handler.ActionResponse.success( data={ 'embeddings': embeddings, }, ) except Exception as e: traceback.print_exc() return handler.ActionResponse.error( message=f'Failed to invoke embedding model: {e}', ) @self.action(RuntimeToLangBotAction.SET_BINARY_STORAGE) async def set_binary_storage(data: dict[str, Any]) -> handler.ActionResponse: """Set binary storage""" key = data['key'] owner_type = data['owner_type'] owner = data['owner'] value = base64.b64decode(data['value_base64']) max_value_bytes = ( self.ap.instance_config.data.get('plugin', {}) .get('binary_storage', {}) .get( 'max_value_bytes', 10 * 1024 * 1024, ) ) try: max_value_bytes = int(max_value_bytes) except (TypeError, ValueError): max_value_bytes = 10 * 1024 * 1024 if max_value_bytes >= 0 and len(value) > max_value_bytes: return handler.ActionResponse.error( message=f'Binary storage value exceeds limit ({len(value)} > {max_value_bytes} bytes)', ) result = await self.ap.persistence_mgr.execute_async( sqlalchemy.select(persistence_bstorage.BinaryStorage) .where(persistence_bstorage.BinaryStorage.key == key) .where(persistence_bstorage.BinaryStorage.owner_type == owner_type) .where(persistence_bstorage.BinaryStorage.owner == owner) ) if result.first() is not None: await self.ap.persistence_mgr.execute_async( sqlalchemy.update(persistence_bstorage.BinaryStorage) .where(persistence_bstorage.BinaryStorage.key == key) .where(persistence_bstorage.BinaryStorage.owner_type == owner_type) .where(persistence_bstorage.BinaryStorage.owner == owner) .values(value=value) ) else: await self.ap.persistence_mgr.execute_async( sqlalchemy.insert(persistence_bstorage.BinaryStorage).values( unique_key=f'{owner_type}:{owner}:{key}', key=key, owner_type=owner_type, owner=owner, value=value, ) ) return handler.ActionResponse.success( data={}, ) @self.action(RuntimeToLangBotAction.GET_BINARY_STORAGE) async def get_binary_storage(data: dict[str, Any]) -> handler.ActionResponse: """Get binary storage""" key = data['key'] owner_type = data['owner_type'] owner = data['owner'] result = await self.ap.persistence_mgr.execute_async( sqlalchemy.select(persistence_bstorage.BinaryStorage) .where(persistence_bstorage.BinaryStorage.key == key) .where(persistence_bstorage.BinaryStorage.owner_type == owner_type) .where(persistence_bstorage.BinaryStorage.owner == owner) ) storage = result.first() if storage is None: return handler.ActionResponse.error( message=f'Storage with key {key} not found', ) return handler.ActionResponse.success( data={ 'value_base64': base64.b64encode(storage.value).decode('utf-8'), }, ) @self.action(RuntimeToLangBotAction.DELETE_BINARY_STORAGE) async def delete_binary_storage(data: dict[str, Any]) -> handler.ActionResponse: """Delete binary storage""" key = data['key'] owner_type = data['owner_type'] owner = data['owner'] await self.ap.persistence_mgr.execute_async( sqlalchemy.delete(persistence_bstorage.BinaryStorage) .where(persistence_bstorage.BinaryStorage.key == key) .where(persistence_bstorage.BinaryStorage.owner_type == owner_type) .where(persistence_bstorage.BinaryStorage.owner == owner) ) return handler.ActionResponse.success( data={}, ) @self.action(RuntimeToLangBotAction.GET_BINARY_STORAGE_KEYS) async def get_binary_storage_keys(data: dict[str, Any]) -> handler.ActionResponse: """Get binary storage keys""" owner_type = data['owner_type'] owner = data['owner'] result = await self.ap.persistence_mgr.execute_async( sqlalchemy.select(persistence_bstorage.BinaryStorage.key) .where(persistence_bstorage.BinaryStorage.owner_type == owner_type) .where(persistence_bstorage.BinaryStorage.owner == owner) ) return handler.ActionResponse.success( data={ 'keys': result.scalars().all(), }, ) @self.action(PluginToRuntimeAction.GET_CONFIG_FILE) async def get_config_file(data: dict[str, Any]) -> handler.ActionResponse: """Get a config file by file key""" file_key = data['file_key'] try: # Load file from storage file_bytes = await self.ap.storage_mgr.storage_provider.load(file_key) return handler.ActionResponse.success( data={ 'file_base64': base64.b64encode(file_bytes).decode('utf-8'), }, ) except Exception as e: return handler.ActionResponse.error( message=f'Failed to load config file {file_key}: {e}', ) # ================= RAG Capability Handlers ================= @self.action(PluginToRuntimeAction.INVOKE_EMBEDDING) async def invoke_embedding(data: dict[str, Any]) -> handler.ActionResponse: embedding_model_uuid = data['embedding_model_uuid'] texts = data['texts'] embedding_model = await self.ap.model_mgr.get_embedding_model_by_uuid(embedding_model_uuid) if embedding_model is None: return handler.ActionResponse.error( message=f'Embedding model with embedding_model_uuid {embedding_model_uuid} not found', ) try: vectors = await embedding_model.provider.invoke_embedding(embedding_model, texts) return handler.ActionResponse.success(data={'vectors': vectors}) except Exception as e: return _make_rag_error_response(e, 'EmbeddingError', embedding_model_uuid=embedding_model_uuid) @self.action(PluginToRuntimeAction.VECTOR_UPSERT) async def vector_upsert(data: dict[str, Any]) -> handler.ActionResponse: collection_id = data['collection_id'] vectors = data['vectors'] ids = data['ids'] metadata = data.get('metadata') documents = data.get('documents') if len(vectors) != len(ids): return handler.ActionResponse.error(message='vectors and ids must have same length') if metadata and len(metadata) != len(vectors): return handler.ActionResponse.error(message='metadata must match vectors length') if documents and len(documents) != len(vectors): return handler.ActionResponse.error(message='documents must match vectors length') try: await self.ap.rag_runtime_service.vector_upsert( collection_id, vectors, ids, metadata, documents, ) return handler.ActionResponse.success(data={}) except Exception as e: return _make_rag_error_response(e, 'VectorStoreError', collection_id=collection_id) @self.action(PluginToRuntimeAction.VECTOR_SEARCH) async def vector_search(data: dict[str, Any]) -> handler.ActionResponse: collection_id = data['collection_id'] query_vector = data['query_vector'] top_k = data['top_k'] filters = data.get('filters') search_type = data.get('search_type', 'vector') query_text = data.get('query_text', '') vector_weight = data.get('vector_weight') try: results = await self.ap.rag_runtime_service.vector_search( collection_id, query_vector, top_k, filters, search_type, query_text, vector_weight=vector_weight, ) return handler.ActionResponse.success(data={'results': results}) except Exception as e: return _make_rag_error_response(e, 'VectorStoreError', collection_id=collection_id) @self.action(PluginToRuntimeAction.VECTOR_DELETE) async def vector_delete(data: dict[str, Any]) -> handler.ActionResponse: collection_id = data['collection_id'] file_ids = data.get('file_ids') filters = data.get('filters') try: count = await self.ap.rag_runtime_service.vector_delete(collection_id, file_ids, filters) return handler.ActionResponse.success(data={'count': count}) except Exception as e: return _make_rag_error_response(e, 'VectorStoreError', collection_id=collection_id) @self.action(PluginToRuntimeAction.VECTOR_LIST) async def vector_list(data: dict[str, Any]) -> handler.ActionResponse: collection_id = data['collection_id'] filters = data.get('filters') limit = data.get('limit', 20) offset = data.get('offset', 0) try: items, total = await self.ap.rag_runtime_service.vector_list(collection_id, filters, limit, offset) return handler.ActionResponse.success(data={'items': items, 'total': total}) except Exception as e: return _make_rag_error_response(e, 'VectorStoreError', collection_id=collection_id) @self.action(PluginToRuntimeAction.GET_KNOWLEDEGE_FILE_STREAM) async def get_knowledge_file_stream(data: dict[str, Any]) -> handler.ActionResponse: storage_path = data['storage_path'] try: content_bytes = await self.ap.rag_runtime_service.get_file_stream(storage_path) file_key = await self.send_file(content_bytes, '') return handler.ActionResponse.success(data={'file_key': file_key}) except Exception as e: return _make_rag_error_response(e, 'FileServiceError', storage_path=storage_path) @self.action(PluginToRuntimeAction.LIST_PARSERS) async def list_parsers(data: dict[str, Any]) -> handler.ActionResponse: """Plugin requests host to list available parser plugins.""" mime_type = data.get('mime_type') try: parsers = await self.ap.knowledge_service.list_parsers(mime_type) return handler.ActionResponse.success(data={'parsers': parsers}) except Exception as e: return _make_rag_error_response(e, 'ParserDiscoveryError', mime_type=mime_type) @self.action(PluginToRuntimeAction.INVOKE_PARSER) async def invoke_parser(data: dict[str, Any]) -> handler.ActionResponse: """Plugin requests host to invoke a parser plugin.""" plugin_author = data['plugin_author'] plugin_name = data['plugin_name'] storage_path = data['storage_path'] mime_type = data.get('mime_type', 'application/octet-stream') filename = data.get('filename', '') metadata = data.get('metadata', {}) try: # Read file from storage file_bytes = await self.ap.rag_runtime_service.get_file_stream(storage_path) context_data = { 'mime_type': mime_type, 'filename': filename, 'metadata': metadata, } result = await self.ap.plugin_connector.call_parser( f'{plugin_author}/{plugin_name}', context_data, file_bytes ) return handler.ActionResponse.success(data=result) except Exception as e: return _make_rag_error_response(e, 'ParserError') # ================= Knowledge Base Query APIs ================= @self.action(PluginToRuntimeAction.LIST_KNOWLEDGE_BASES) async def list_knowledge_bases(data: dict[str, Any]) -> handler.ActionResponse: """List all knowledge bases available in the LangBot instance (unrestricted).""" knowledge_bases = [] for kb_uuid, kb in self.ap.rag_mgr.knowledge_bases.items(): knowledge_bases.append( { 'uuid': kb.get_uuid(), 'name': kb.get_name(), 'description': kb.knowledge_base_entity.description or '', } ) return handler.ActionResponse.success(data={'knowledge_bases': knowledge_bases}) @self.action(PluginToRuntimeAction.RETRIEVE_KNOWLEDGE) async def retrieve_knowledge(data: dict[str, Any]) -> handler.ActionResponse: """Retrieve documents from any knowledge base (unrestricted).""" kb_id = data['kb_id'] query_text = data['query_text'] top_k = data.get('top_k', 5) filters = data.get('filters', {}) kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_id) if not kb: return handler.ActionResponse.error( message=f'Knowledge base {kb_id} not found', ) try: entries = await kb.retrieve( query_text, settings={ 'top_k': top_k, 'filters': filters, }, ) results = [entry.model_dump(mode='json') for entry in entries] return handler.ActionResponse.success(data={'results': results}) except Exception as e: return _make_rag_error_response(e, 'RetrievalError', kb_id=kb_id) @self.action(PluginToRuntimeAction.LIST_PIPELINE_KNOWLEDGE_BASES) async def list_pipeline_knowledge_bases(data: dict[str, Any]) -> handler.ActionResponse: """List knowledge bases configured for the current query's pipeline.""" query_id = data['query_id'] if query_id not in self.ap.query_pool.cached_queries: return handler.ActionResponse.error( message=f'Query with query_id {query_id} not found', ) query = self.ap.query_pool.cached_queries[query_id] kb_uuids = [] if query.pipeline_config: local_agent_config = query.pipeline_config.get('ai', {}).get('local-agent', {}) kb_uuids = local_agent_config.get('knowledge-bases', []) # Backward compatibility if not kb_uuids: old_kb_uuid = local_agent_config.get('knowledge-base', '') if old_kb_uuid and old_kb_uuid != '__none__': kb_uuids = [old_kb_uuid] knowledge_bases = [] for kb_uuid in kb_uuids: kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid) if kb: knowledge_bases.append( { 'uuid': kb.get_uuid(), 'name': kb.get_name(), 'description': kb.knowledge_base_entity.description or '', } ) return handler.ActionResponse.success(data={'knowledge_bases': knowledge_bases}) @self.action(PluginToRuntimeAction.RETRIEVE_KNOWLEDGE_BASE) async def retrieve_knowledge_base(data: dict[str, Any]) -> handler.ActionResponse: """Retrieve documents from a knowledge base within the pipeline's scope.""" query_id = data['query_id'] kb_id = data['kb_id'] query_text = data['query_text'] top_k = data.get('top_k', 5) filters = data.get('filters', {}) if query_id not in self.ap.query_pool.cached_queries: return handler.ActionResponse.error( message=f'Query with query_id {query_id} not found', ) query = self.ap.query_pool.cached_queries[query_id] # Validate kb_id is in pipeline's allowed list allowed_kb_uuids = [] if query.pipeline_config: local_agent_config = query.pipeline_config.get('ai', {}).get('local-agent', {}) allowed_kb_uuids = local_agent_config.get('knowledge-bases', []) if not allowed_kb_uuids: old_kb_uuid = local_agent_config.get('knowledge-base', '') if old_kb_uuid and old_kb_uuid != '__none__': allowed_kb_uuids = [old_kb_uuid] if kb_id not in allowed_kb_uuids: return handler.ActionResponse.error( message=f'Knowledge base {kb_id} is not configured for this pipeline', ) kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_id) if not kb: return handler.ActionResponse.error( message=f'Knowledge base {kb_id} not found', ) try: session_name = f'{query.session.launcher_type.value}_{query.session.launcher_id}' entries = await kb.retrieve( query_text, settings={ 'top_k': top_k, 'filters': filters, 'session_name': session_name, 'bot_uuid': query.bot_uuid or '', 'sender_id': str(query.sender_id), }, ) results = [entry.model_dump(mode='json') for entry in entries] return handler.ActionResponse.success(data={'results': results}) except Exception as e: return _make_rag_error_response(e, 'RetrievalError', kb_id=kb_id) @self.action(CommonAction.PING) async def ping(data: dict[str, Any]) -> handler.ActionResponse: """Ping""" return handler.ActionResponse.success( data={ 'pong': 'pong', }, ) async def ping(self) -> dict[str, Any]: """Ping the runtime""" return await self.call_action( CommonAction.PING, {}, timeout=10, ) async def install_plugin( self, install_source: str, install_info: dict[str, Any] ) -> typing.AsyncGenerator[dict[str, Any], None]: """Install plugin""" gen = self.call_action_generator( LangBotToRuntimeAction.INSTALL_PLUGIN, { 'install_source': install_source, 'install_info': install_info, }, timeout=120, ) async for ret in gen: yield ret async def upgrade_plugin(self, plugin_author: str, plugin_name: str) -> typing.AsyncGenerator[dict[str, Any], None]: """Upgrade plugin""" gen = self.call_action_generator( LangBotToRuntimeAction.UPGRADE_PLUGIN, { 'plugin_author': plugin_author, 'plugin_name': plugin_name, }, timeout=120, ) async for ret in gen: yield ret async def delete_plugin(self, plugin_author: str, plugin_name: str) -> typing.AsyncGenerator[dict[str, Any], None]: """Delete plugin""" gen = self.call_action_generator( LangBotToRuntimeAction.DELETE_PLUGIN, { 'plugin_author': plugin_author, 'plugin_name': plugin_name, }, ) async for ret in gen: yield ret async def list_plugins(self) -> list[dict[str, Any]]: """List plugins""" result = await self.call_action( LangBotToRuntimeAction.LIST_PLUGINS, {}, timeout=10, ) return result['plugins'] async def get_plugin_info(self, author: str, plugin_name: str) -> dict[str, Any]: """Get plugin""" result = await self.call_action( LangBotToRuntimeAction.GET_PLUGIN_INFO, { 'author': author, 'plugin_name': plugin_name, }, timeout=10, ) return result['plugin'] async def set_plugin_config(self, plugin_author: str, plugin_name: str, config: dict[str, Any]) -> dict[str, Any]: """Set plugin config""" # update plugin setting await self.ap.persistence_mgr.execute_async( sqlalchemy.update(persistence_plugin.PluginSetting) .where(persistence_plugin.PluginSetting.plugin_author == plugin_author) .where(persistence_plugin.PluginSetting.plugin_name == plugin_name) .values(config=config) ) # restart plugin gen = self.call_action_generator( LangBotToRuntimeAction.RESTART_PLUGIN, { 'plugin_author': plugin_author, 'plugin_name': plugin_name, }, ) async for ret in gen: pass return {} async def emit_event( self, event_context: dict[str, Any], include_plugins: list[str] | None = None, ) -> dict[str, Any]: """Emit event""" result = await self.call_action( LangBotToRuntimeAction.EMIT_EVENT, { 'event_context': event_context, 'include_plugins': include_plugins, }, timeout=180, ) return result async def list_tools(self, include_plugins: list[str] | None = None) -> list[dict[str, Any]]: """List tools""" result = await self.call_action( LangBotToRuntimeAction.LIST_TOOLS, { 'include_plugins': include_plugins, }, timeout=20, ) return result['tools'] async def get_plugin_icon(self, plugin_author: str, plugin_name: str) -> dict[str, Any]: """Get plugin icon""" result = await self.call_action( LangBotToRuntimeAction.GET_PLUGIN_ICON, { 'plugin_author': plugin_author, 'plugin_name': plugin_name, }, ) plugin_icon_file_key = result['plugin_icon_file_key'] mime_type = result['mime_type'] plugin_icon_bytes = await self.read_local_file(plugin_icon_file_key) await self.delete_local_file(plugin_icon_file_key) return { 'plugin_icon_base64': base64.b64encode(plugin_icon_bytes).decode('utf-8'), 'mime_type': mime_type, } async def get_plugin_readme(self, plugin_author: str, plugin_name: str, language: str = 'en') -> str: """Get plugin readme""" try: result = await self.call_action( LangBotToRuntimeAction.GET_PLUGIN_README, { 'plugin_author': plugin_author, 'plugin_name': plugin_name, 'language': language, }, timeout=20, ) except Exception: traceback.print_exc() return '' readme_file_key = result.get('readme_file_key') if not readme_file_key: return '' readme_bytes = await self.read_local_file(readme_file_key) await self.delete_local_file(readme_file_key) return readme_bytes.decode('utf-8') async def get_plugin_assets(self, plugin_author: str, plugin_name: str, filepath: str) -> dict[str, Any]: """Get plugin assets""" result = await self.call_action( LangBotToRuntimeAction.GET_PLUGIN_ASSETS_FILE, { 'plugin_author': plugin_author, 'plugin_name': plugin_name, 'file_path': filepath, }, timeout=20, ) asset_file_key = result['file_file_key'] if not asset_file_key: return { 'asset_base64': '', 'mime_type': '', } mime_type = result['mime_type'] asset_bytes = await self.read_local_file(asset_file_key) await self.delete_local_file(asset_file_key) return { 'asset_base64': base64.b64encode(asset_bytes).decode('utf-8'), 'mime_type': mime_type, } async def handle_page_api( self, plugin_author: str, plugin_name: str, page_id: str, endpoint: str, method: str, body: Any = None, ) -> dict[str, Any]: """Forward a page API call to the plugin via runtime.""" result = await self.call_action( LangBotToRuntimeAction.PAGE_API, { 'plugin_author': plugin_author, 'plugin_name': plugin_name, 'page_id': page_id, 'endpoint': endpoint, 'method': method, 'body': body, }, timeout=30, ) return result async def cleanup_plugin_data(self, plugin_author: str, plugin_name: str) -> None: """Cleanup plugin settings and binary storage""" # Delete plugin settings await self.ap.persistence_mgr.execute_async( sqlalchemy.delete(persistence_plugin.PluginSetting) .where(persistence_plugin.PluginSetting.plugin_author == plugin_author) .where(persistence_plugin.PluginSetting.plugin_name == plugin_name) ) # Delete all binary storage for this plugin owner = f'{plugin_author}/{plugin_name}' await self.ap.persistence_mgr.execute_async( sqlalchemy.delete(persistence_bstorage.BinaryStorage) .where(persistence_bstorage.BinaryStorage.owner_type == 'plugin') .where(persistence_bstorage.BinaryStorage.owner == owner) ) async def call_tool( self, tool_name: str, parameters: dict[str, Any], session: dict[str, Any], query_id: int, include_plugins: list[str] | None = None, ) -> dict[str, Any]: """Call tool""" result = await self.call_action( LangBotToRuntimeAction.CALL_TOOL, { 'tool_name': tool_name, 'tool_parameters': parameters, 'session': session, 'query_id': query_id, 'include_plugins': include_plugins, }, timeout=180, ) return result['tool_response'] async def list_commands(self, include_plugins: list[str] | None = None) -> list[dict[str, Any]]: """List commands""" result = await self.call_action( LangBotToRuntimeAction.LIST_COMMANDS, { 'include_plugins': include_plugins, }, timeout=10, ) return result['commands'] async def execute_command( self, command_context: dict[str, Any], include_plugins: list[str] | None = None ) -> typing.AsyncGenerator[dict[str, Any], None]: """Execute command""" gen = self.call_action_generator( LangBotToRuntimeAction.EXECUTE_COMMAND, { 'command_context': command_context, 'include_plugins': include_plugins, }, timeout=180, ) async for ret in gen: yield ret async def retrieve_knowledge( self, plugin_author: str, plugin_name: str, retriever_name: str, retrieval_context: dict[str, Any], ) -> dict[str, Any]: """Retrieve knowledge""" result = await self.call_action( LangBotToRuntimeAction.RETRIEVE_KNOWLEDGE, { 'plugin_author': plugin_author, 'plugin_name': plugin_name, 'retriever_name': retriever_name, 'retrieval_context': retrieval_context, }, timeout=30, ) return result async def get_debug_info(self) -> dict[str, Any]: """Get debug information including debug key and WS URL""" result = await self.call_action( LangBotToRuntimeAction.GET_DEBUG_INFO, {}, timeout=10, ) return result # ================= RAG Capability Callers (LangBot -> Runtime) ================= async def rag_ingest_document( self, plugin_author: str, plugin_name: str, context_data: dict[str, Any] ) -> dict[str, Any]: """Send INGEST_DOCUMENT action to runtime.""" result = await self.call_action( LangBotToRuntimeAction.RAG_INGEST_DOCUMENT, {'plugin_author': plugin_author, 'plugin_name': plugin_name, 'context': context_data}, timeout=1200, # Ingestion can be slow for large documents ) return result async def rag_delete_document(self, plugin_author: str, plugin_name: str, document_id: str, kb_id: str) -> bool: result = await self.call_action( LangBotToRuntimeAction.RAG_DELETE_DOCUMENT, {'plugin_author': plugin_author, 'plugin_name': plugin_name, 'document_id': document_id, 'kb_id': kb_id}, timeout=30, ) return result.get('success', False) async def rag_on_kb_create( self, plugin_author: str, plugin_name: str, kb_id: str, config: dict[str, Any] ) -> dict[str, Any]: """Notify plugin about KB creation.""" result = await self.call_action( LangBotToRuntimeAction.RAG_ON_KB_CREATE, {'plugin_author': plugin_author, 'plugin_name': plugin_name, 'kb_id': kb_id, 'config': config}, timeout=30, ) return result async def rag_on_kb_delete(self, plugin_author: str, plugin_name: str, kb_id: str) -> dict[str, Any]: """Notify plugin about KB deletion.""" result = await self.call_action( LangBotToRuntimeAction.RAG_ON_KB_DELETE, {'plugin_author': plugin_author, 'plugin_name': plugin_name, 'kb_id': kb_id}, timeout=30, ) return result async def get_rag_creation_schema(self, plugin_author: str, plugin_name: str) -> dict[str, Any]: return await self.call_action( LangBotToRuntimeAction.GET_RAG_CREATION_SETTINGS_SCHEMA, {'plugin_author': plugin_author, 'plugin_name': plugin_name}, timeout=10, ) async def get_rag_retrieval_schema(self, plugin_author: str, plugin_name: str) -> dict[str, Any]: return await self.call_action( LangBotToRuntimeAction.GET_RAG_RETRIEVAL_SETTINGS_SCHEMA, {'plugin_author': plugin_author, 'plugin_name': plugin_name}, timeout=10, ) async def list_knowledge_engines(self) -> list[dict[str, Any]]: """List all available Knowledge Engines from plugins.""" result = await self.call_action(LangBotToRuntimeAction.LIST_KNOWLEDGE_ENGINES, {}, timeout=60) return result.get('engines', []) # ================= Parser Capability Callers (LangBot -> Runtime) ================= async def list_parsers(self) -> list[dict[str, Any]]: """List all available parsers from plugins.""" result = await self.call_action(LangBotToRuntimeAction.LIST_PARSERS, {}, timeout=60) return result.get('parsers', []) async def parse_document( self, plugin_author: str, plugin_name: str, context_data: dict[str, Any], file_bytes: bytes ) -> dict[str, Any]: """Send PARSE_DOCUMENT action to runtime. Sends file content via chunked FILE_CHUNK transfer, then invokes the PARSE_DOCUMENT action with a file_key reference. """ # Send file to runtime via chunked transfer file_key = await self.send_file(file_bytes, '') # Include file_key in context_data for the runtime to read context_data['file_key'] = file_key result = await self.call_action( LangBotToRuntimeAction.PARSE_DOCUMENT, {'plugin_author': plugin_author, 'plugin_name': plugin_name, 'context': context_data}, timeout=300, ) return result