mirror of
https://github.com/langbot-app/LangBot.git
synced 2026-06-02 12:05:54 +00:00
* fix: add storage retention cleanup * fix: prune completed tasks on completion * fix: complete storage analysis i18n
1178 lines
46 KiB
Python
1178 lines
46 KiB
Python
from __future__ import annotations
|
|
|
|
import typing
|
|
from typing import Any
|
|
import base64
|
|
import traceback
|
|
|
|
import sqlalchemy
|
|
|
|
from langbot_plugin.runtime.io import handler
|
|
from langbot_plugin.runtime.io.connection import Connection
|
|
from langbot_plugin.entities.io.actions.enums import (
|
|
CommonAction,
|
|
RuntimeToLangBotAction,
|
|
LangBotToRuntimeAction,
|
|
PluginToRuntimeAction,
|
|
)
|
|
import langbot_plugin.api.entities.builtin.platform.message as platform_message
|
|
import langbot_plugin.api.entities.builtin.provider.message as provider_message
|
|
import langbot_plugin.api.entities.builtin.resource.tool as resource_tool
|
|
|
|
from ..entity.persistence import plugin as persistence_plugin
|
|
from ..entity.persistence import bstorage as persistence_bstorage
|
|
|
|
from ..core import app
|
|
from ..utils import constants
|
|
|
|
|
|
def _make_rag_error_response(error: Exception, error_type: str, **extra_context) -> handler.ActionResponse:
|
|
"""Create a clean error response for RAG operations.
|
|
|
|
Args:
|
|
error: The caught exception.
|
|
error_type: A category string like 'EmbeddingError', 'VectorStoreError'.
|
|
**extra_context: Additional context fields for the error message.
|
|
"""
|
|
context_parts = [f'{k}={v}' for k, v in extra_context.items()]
|
|
context_str = f' [{", ".join(context_parts)}]' if context_parts else ''
|
|
message = f'[{error_type}/{type(error).__name__}]{context_str} {str(error)}'
|
|
return handler.ActionResponse.error(message=message)
|
|
|
|
|
|
class RuntimeConnectionHandler(handler.Handler):
|
|
"""Runtime connection handler"""
|
|
|
|
ap: app.Application
|
|
|
|
def __init__(
|
|
self,
|
|
connection: Connection,
|
|
disconnect_callback: typing.Callable[[], typing.Coroutine[typing.Any, typing.Any, bool]],
|
|
ap: app.Application,
|
|
):
|
|
super().__init__(connection, disconnect_callback)
|
|
self.ap = ap
|
|
|
|
@self.action(RuntimeToLangBotAction.INITIALIZE_PLUGIN_SETTINGS)
|
|
async def initialize_plugin_settings(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Initialize plugin settings"""
|
|
# check if exists plugin setting
|
|
plugin_author = data['plugin_author']
|
|
plugin_name = data['plugin_name']
|
|
install_source = data['install_source']
|
|
install_info = data['install_info']
|
|
|
|
try:
|
|
result = await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.select(persistence_plugin.PluginSetting)
|
|
.where(persistence_plugin.PluginSetting.plugin_author == plugin_author)
|
|
.where(persistence_plugin.PluginSetting.plugin_name == plugin_name)
|
|
)
|
|
|
|
setting = result.first()
|
|
|
|
if setting is not None:
|
|
# delete plugin setting
|
|
await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.delete(persistence_plugin.PluginSetting)
|
|
.where(persistence_plugin.PluginSetting.plugin_author == plugin_author)
|
|
.where(persistence_plugin.PluginSetting.plugin_name == plugin_name)
|
|
)
|
|
|
|
# create plugin setting
|
|
await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.insert(persistence_plugin.PluginSetting).values(
|
|
plugin_author=plugin_author,
|
|
plugin_name=plugin_name,
|
|
install_source=install_source,
|
|
install_info=install_info,
|
|
# inherit from existing setting
|
|
enabled=setting.enabled if setting is not None else True,
|
|
priority=setting.priority if setting is not None else 0,
|
|
config=setting.config if setting is not None else {}, # noqa: F821
|
|
)
|
|
)
|
|
|
|
return handler.ActionResponse.success(
|
|
data={},
|
|
)
|
|
except Exception as e:
|
|
traceback.print_exc()
|
|
return handler.ActionResponse.error(
|
|
message=f'Failed to initialize plugin settings: {e}',
|
|
)
|
|
|
|
@self.action(RuntimeToLangBotAction.GET_PLUGIN_SETTINGS)
|
|
async def get_plugin_settings(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Get plugin settings"""
|
|
|
|
plugin_author = data['plugin_author']
|
|
plugin_name = data['plugin_name']
|
|
|
|
result = await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.select(persistence_plugin.PluginSetting)
|
|
.where(persistence_plugin.PluginSetting.plugin_author == plugin_author)
|
|
.where(persistence_plugin.PluginSetting.plugin_name == plugin_name)
|
|
)
|
|
|
|
data = {
|
|
'enabled': True,
|
|
'priority': 0,
|
|
'plugin_config': {},
|
|
'install_source': 'local',
|
|
'install_info': {},
|
|
}
|
|
|
|
setting = result.first()
|
|
|
|
if setting is not None:
|
|
data['enabled'] = setting.enabled
|
|
data['priority'] = setting.priority
|
|
data['plugin_config'] = setting.config
|
|
data['install_source'] = setting.install_source
|
|
data['install_info'] = setting.install_info
|
|
|
|
return handler.ActionResponse.success(
|
|
data=data,
|
|
)
|
|
|
|
@self.action(PluginToRuntimeAction.REPLY_MESSAGE)
|
|
async def reply_message(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Reply message"""
|
|
query_id = data['query_id']
|
|
message_chain = data['message_chain']
|
|
quote_origin = data['quote_origin']
|
|
|
|
if query_id not in self.ap.query_pool.cached_queries:
|
|
return handler.ActionResponse.error(
|
|
message=f'Query with query_id {query_id} not found',
|
|
)
|
|
|
|
query = self.ap.query_pool.cached_queries[query_id]
|
|
|
|
message_chain_obj = platform_message.MessageChain.model_validate(message_chain)
|
|
|
|
self.ap.logger.debug(f'Reply message: {message_chain_obj.model_dump(serialize_as_any=False)}')
|
|
|
|
await query.adapter.reply_message(
|
|
query.message_event,
|
|
message_chain_obj,
|
|
quote_origin,
|
|
)
|
|
|
|
return handler.ActionResponse.success(
|
|
data={},
|
|
)
|
|
|
|
@self.action(PluginToRuntimeAction.GET_BOT_UUID)
|
|
async def get_bot_uuid(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Get bot uuid"""
|
|
query_id = data['query_id']
|
|
if query_id not in self.ap.query_pool.cached_queries:
|
|
return handler.ActionResponse.error(
|
|
message=f'Query with query_id {query_id} not found',
|
|
)
|
|
|
|
query = self.ap.query_pool.cached_queries[query_id]
|
|
|
|
return handler.ActionResponse.success(
|
|
data={
|
|
'bot_uuid': query.bot_uuid,
|
|
},
|
|
)
|
|
|
|
@self.action(PluginToRuntimeAction.SET_QUERY_VAR)
|
|
async def set_query_var(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Set query var"""
|
|
query_id = data['query_id']
|
|
key = data['key']
|
|
value = data['value']
|
|
|
|
if query_id not in self.ap.query_pool.cached_queries:
|
|
return handler.ActionResponse.error(
|
|
message=f'Query with query_id {query_id} not found',
|
|
)
|
|
|
|
query = self.ap.query_pool.cached_queries[query_id]
|
|
|
|
query.variables[key] = value
|
|
|
|
return handler.ActionResponse.success(
|
|
data={},
|
|
)
|
|
|
|
@self.action(PluginToRuntimeAction.GET_QUERY_VAR)
|
|
async def get_query_var(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Get query var"""
|
|
query_id = data['query_id']
|
|
key = data['key']
|
|
|
|
if query_id not in self.ap.query_pool.cached_queries:
|
|
return handler.ActionResponse.error(
|
|
message=f'Query with query_id {query_id} not found',
|
|
)
|
|
|
|
query = self.ap.query_pool.cached_queries[query_id]
|
|
|
|
return handler.ActionResponse.success(
|
|
data={
|
|
'value': query.variables[key],
|
|
},
|
|
)
|
|
|
|
@self.action(PluginToRuntimeAction.GET_QUERY_VARS)
|
|
async def get_query_vars(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Get query vars"""
|
|
query_id = data['query_id']
|
|
if query_id not in self.ap.query_pool.cached_queries:
|
|
return handler.ActionResponse.error(
|
|
message=f'Query with query_id {query_id} not found',
|
|
)
|
|
|
|
query = self.ap.query_pool.cached_queries[query_id]
|
|
|
|
return handler.ActionResponse.success(
|
|
data={
|
|
'vars': query.variables,
|
|
},
|
|
)
|
|
|
|
@self.action(PluginToRuntimeAction.CREATE_NEW_CONVERSATION)
|
|
async def create_new_conversation(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Create new conversation"""
|
|
query_id = data['query_id']
|
|
if query_id not in self.ap.query_pool.cached_queries:
|
|
return handler.ActionResponse.error(
|
|
message=f'Query with query_id {query_id} not found',
|
|
)
|
|
|
|
query = self.ap.query_pool.cached_queries[query_id]
|
|
|
|
query.session.using_conversation = None
|
|
|
|
return handler.ActionResponse.success(
|
|
data={},
|
|
)
|
|
|
|
@self.action(PluginToRuntimeAction.GET_LANGBOT_VERSION)
|
|
async def get_langbot_version(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Get langbot version"""
|
|
return handler.ActionResponse.success(
|
|
data={
|
|
'version': constants.semantic_version,
|
|
},
|
|
)
|
|
|
|
@self.action(PluginToRuntimeAction.GET_BOTS)
|
|
async def get_bots(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Get bots"""
|
|
bots = await self.ap.bot_service.get_bots(include_secret=False)
|
|
return handler.ActionResponse.success(
|
|
data={
|
|
'bots': bots,
|
|
},
|
|
)
|
|
|
|
@self.action(PluginToRuntimeAction.GET_BOT_INFO)
|
|
async def get_bot_info(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Get bot info"""
|
|
bot_uuid = data['bot_uuid']
|
|
bot = await self.ap.bot_service.get_runtime_bot_info(bot_uuid, include_secret=False)
|
|
return handler.ActionResponse.success(
|
|
data={
|
|
'bot': bot,
|
|
},
|
|
)
|
|
|
|
@self.action(PluginToRuntimeAction.SEND_MESSAGE)
|
|
async def send_message(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Send message"""
|
|
bot_uuid = data['bot_uuid']
|
|
target_type = data['target_type']
|
|
target_id = data['target_id']
|
|
message_chain = data['message_chain']
|
|
|
|
# Use custom deserializer that properly handles Forward messages
|
|
message_chain_obj = platform_message.MessageChain.model_validate(message_chain)
|
|
|
|
bot = await self.ap.platform_mgr.get_bot_by_uuid(bot_uuid)
|
|
if bot is None:
|
|
return handler.ActionResponse.error(
|
|
message=f'Bot with bot_uuid {bot_uuid} not found',
|
|
)
|
|
|
|
await bot.adapter.send_message(
|
|
target_type,
|
|
target_id,
|
|
message_chain_obj,
|
|
)
|
|
|
|
return handler.ActionResponse.success(
|
|
data={},
|
|
)
|
|
|
|
@self.action(PluginToRuntimeAction.GET_LLM_MODELS)
|
|
async def get_llm_models(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Get llm models, returns list of UUID strings"""
|
|
llm_models = await self.ap.llm_model_service.get_llm_models(include_secret=False)
|
|
return handler.ActionResponse.success(
|
|
data={
|
|
'llm_models': [m['uuid'] for m in llm_models],
|
|
},
|
|
)
|
|
|
|
@self.action(PluginToRuntimeAction.INVOKE_LLM)
|
|
async def invoke_llm(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Invoke llm"""
|
|
llm_model_uuid = data['llm_model_uuid']
|
|
messages = data['messages']
|
|
funcs = data.get('funcs', [])
|
|
extra_args = data.get('extra_args', {})
|
|
|
|
llm_model = await self.ap.model_mgr.get_model_by_uuid(llm_model_uuid)
|
|
if llm_model is None:
|
|
return handler.ActionResponse.error(
|
|
message=f'LLM model with llm_model_uuid {llm_model_uuid} not found',
|
|
)
|
|
|
|
messages_obj = [provider_message.Message.model_validate(message) for message in messages]
|
|
|
|
# The func field is excluded during model_dump() in plugin side (marked as exclude=True),
|
|
# but it's a required field for LLMTool validation. We need to provide a placeholder
|
|
# function when reconstructing the LLMTool objects from serialized data.
|
|
async def _placeholder_func(**kwargs):
|
|
pass
|
|
|
|
funcs_obj = [resource_tool.LLMTool.model_validate({**func, 'func': _placeholder_func}) for func in funcs]
|
|
|
|
result = await llm_model.provider.invoke_llm(
|
|
query=None,
|
|
model=llm_model,
|
|
messages=messages_obj,
|
|
funcs=funcs_obj,
|
|
extra_args=extra_args,
|
|
)
|
|
|
|
return handler.ActionResponse.success(
|
|
data={
|
|
'message': result.model_dump(),
|
|
},
|
|
)
|
|
|
|
@self.action(RuntimeToLangBotAction.SET_BINARY_STORAGE)
|
|
async def set_binary_storage(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Set binary storage"""
|
|
key = data['key']
|
|
owner_type = data['owner_type']
|
|
owner = data['owner']
|
|
value = base64.b64decode(data['value_base64'])
|
|
max_value_bytes = (
|
|
self.ap.instance_config.data.get('plugin', {})
|
|
.get('binary_storage', {})
|
|
.get(
|
|
'max_value_bytes',
|
|
10 * 1024 * 1024,
|
|
)
|
|
)
|
|
try:
|
|
max_value_bytes = int(max_value_bytes)
|
|
except (TypeError, ValueError):
|
|
max_value_bytes = 10 * 1024 * 1024
|
|
if max_value_bytes >= 0 and len(value) > max_value_bytes:
|
|
return handler.ActionResponse.error(
|
|
message=f'Binary storage value exceeds limit ({len(value)} > {max_value_bytes} bytes)',
|
|
)
|
|
|
|
result = await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.select(persistence_bstorage.BinaryStorage)
|
|
.where(persistence_bstorage.BinaryStorage.key == key)
|
|
.where(persistence_bstorage.BinaryStorage.owner_type == owner_type)
|
|
.where(persistence_bstorage.BinaryStorage.owner == owner)
|
|
)
|
|
|
|
if result.first() is not None:
|
|
await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.update(persistence_bstorage.BinaryStorage)
|
|
.where(persistence_bstorage.BinaryStorage.key == key)
|
|
.where(persistence_bstorage.BinaryStorage.owner_type == owner_type)
|
|
.where(persistence_bstorage.BinaryStorage.owner == owner)
|
|
.values(value=value)
|
|
)
|
|
else:
|
|
await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.insert(persistence_bstorage.BinaryStorage).values(
|
|
unique_key=f'{owner_type}:{owner}:{key}',
|
|
key=key,
|
|
owner_type=owner_type,
|
|
owner=owner,
|
|
value=value,
|
|
)
|
|
)
|
|
|
|
return handler.ActionResponse.success(
|
|
data={},
|
|
)
|
|
|
|
@self.action(RuntimeToLangBotAction.GET_BINARY_STORAGE)
|
|
async def get_binary_storage(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Get binary storage"""
|
|
key = data['key']
|
|
owner_type = data['owner_type']
|
|
owner = data['owner']
|
|
|
|
result = await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.select(persistence_bstorage.BinaryStorage)
|
|
.where(persistence_bstorage.BinaryStorage.key == key)
|
|
.where(persistence_bstorage.BinaryStorage.owner_type == owner_type)
|
|
.where(persistence_bstorage.BinaryStorage.owner == owner)
|
|
)
|
|
|
|
storage = result.first()
|
|
if storage is None:
|
|
return handler.ActionResponse.error(
|
|
message=f'Storage with key {key} not found',
|
|
)
|
|
|
|
return handler.ActionResponse.success(
|
|
data={
|
|
'value_base64': base64.b64encode(storage.value).decode('utf-8'),
|
|
},
|
|
)
|
|
|
|
@self.action(RuntimeToLangBotAction.DELETE_BINARY_STORAGE)
|
|
async def delete_binary_storage(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Delete binary storage"""
|
|
key = data['key']
|
|
owner_type = data['owner_type']
|
|
owner = data['owner']
|
|
|
|
await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.delete(persistence_bstorage.BinaryStorage)
|
|
.where(persistence_bstorage.BinaryStorage.key == key)
|
|
.where(persistence_bstorage.BinaryStorage.owner_type == owner_type)
|
|
.where(persistence_bstorage.BinaryStorage.owner == owner)
|
|
)
|
|
|
|
return handler.ActionResponse.success(
|
|
data={},
|
|
)
|
|
|
|
@self.action(RuntimeToLangBotAction.GET_BINARY_STORAGE_KEYS)
|
|
async def get_binary_storage_keys(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Get binary storage keys"""
|
|
owner_type = data['owner_type']
|
|
owner = data['owner']
|
|
|
|
result = await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.select(persistence_bstorage.BinaryStorage.key)
|
|
.where(persistence_bstorage.BinaryStorage.owner_type == owner_type)
|
|
.where(persistence_bstorage.BinaryStorage.owner == owner)
|
|
)
|
|
|
|
return handler.ActionResponse.success(
|
|
data={
|
|
'keys': result.scalars().all(),
|
|
},
|
|
)
|
|
|
|
@self.action(PluginToRuntimeAction.GET_CONFIG_FILE)
|
|
async def get_config_file(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Get a config file by file key"""
|
|
file_key = data['file_key']
|
|
|
|
try:
|
|
# Load file from storage
|
|
file_bytes = await self.ap.storage_mgr.storage_provider.load(file_key)
|
|
|
|
return handler.ActionResponse.success(
|
|
data={
|
|
'file_base64': base64.b64encode(file_bytes).decode('utf-8'),
|
|
},
|
|
)
|
|
except Exception as e:
|
|
return handler.ActionResponse.error(
|
|
message=f'Failed to load config file {file_key}: {e}',
|
|
)
|
|
|
|
# ================= RAG Capability Handlers =================
|
|
|
|
@self.action(PluginToRuntimeAction.INVOKE_EMBEDDING)
|
|
async def invoke_embedding(data: dict[str, Any]) -> handler.ActionResponse:
|
|
embedding_model_uuid = data['embedding_model_uuid']
|
|
texts = data['texts']
|
|
|
|
embedding_model = await self.ap.model_mgr.get_embedding_model_by_uuid(embedding_model_uuid)
|
|
if embedding_model is None:
|
|
return handler.ActionResponse.error(
|
|
message=f'Embedding model with embedding_model_uuid {embedding_model_uuid} not found',
|
|
)
|
|
|
|
try:
|
|
vectors = await embedding_model.provider.invoke_embedding(embedding_model, texts)
|
|
return handler.ActionResponse.success(data={'vectors': vectors})
|
|
except Exception as e:
|
|
return _make_rag_error_response(e, 'EmbeddingError', embedding_model_uuid=embedding_model_uuid)
|
|
|
|
@self.action(PluginToRuntimeAction.VECTOR_UPSERT)
|
|
async def vector_upsert(data: dict[str, Any]) -> handler.ActionResponse:
|
|
collection_id = data['collection_id']
|
|
vectors = data['vectors']
|
|
ids = data['ids']
|
|
metadata = data.get('metadata')
|
|
documents = data.get('documents')
|
|
if len(vectors) != len(ids):
|
|
return handler.ActionResponse.error(message='vectors and ids must have same length')
|
|
if metadata and len(metadata) != len(vectors):
|
|
return handler.ActionResponse.error(message='metadata must match vectors length')
|
|
if documents and len(documents) != len(vectors):
|
|
return handler.ActionResponse.error(message='documents must match vectors length')
|
|
try:
|
|
await self.ap.rag_runtime_service.vector_upsert(
|
|
collection_id,
|
|
vectors,
|
|
ids,
|
|
metadata,
|
|
documents,
|
|
)
|
|
return handler.ActionResponse.success(data={})
|
|
except Exception as e:
|
|
return _make_rag_error_response(e, 'VectorStoreError', collection_id=collection_id)
|
|
|
|
@self.action(PluginToRuntimeAction.VECTOR_SEARCH)
|
|
async def vector_search(data: dict[str, Any]) -> handler.ActionResponse:
|
|
collection_id = data['collection_id']
|
|
query_vector = data['query_vector']
|
|
top_k = data['top_k']
|
|
filters = data.get('filters')
|
|
search_type = data.get('search_type', 'vector')
|
|
query_text = data.get('query_text', '')
|
|
vector_weight = data.get('vector_weight')
|
|
try:
|
|
results = await self.ap.rag_runtime_service.vector_search(
|
|
collection_id,
|
|
query_vector,
|
|
top_k,
|
|
filters,
|
|
search_type,
|
|
query_text,
|
|
vector_weight=vector_weight,
|
|
)
|
|
return handler.ActionResponse.success(data={'results': results})
|
|
except Exception as e:
|
|
return _make_rag_error_response(e, 'VectorStoreError', collection_id=collection_id)
|
|
|
|
@self.action(PluginToRuntimeAction.VECTOR_DELETE)
|
|
async def vector_delete(data: dict[str, Any]) -> handler.ActionResponse:
|
|
collection_id = data['collection_id']
|
|
file_ids = data.get('file_ids')
|
|
filters = data.get('filters')
|
|
try:
|
|
count = await self.ap.rag_runtime_service.vector_delete(collection_id, file_ids, filters)
|
|
return handler.ActionResponse.success(data={'count': count})
|
|
except Exception as e:
|
|
return _make_rag_error_response(e, 'VectorStoreError', collection_id=collection_id)
|
|
|
|
@self.action(PluginToRuntimeAction.VECTOR_LIST)
|
|
async def vector_list(data: dict[str, Any]) -> handler.ActionResponse:
|
|
collection_id = data['collection_id']
|
|
filters = data.get('filters')
|
|
limit = data.get('limit', 20)
|
|
offset = data.get('offset', 0)
|
|
try:
|
|
items, total = await self.ap.rag_runtime_service.vector_list(collection_id, filters, limit, offset)
|
|
return handler.ActionResponse.success(data={'items': items, 'total': total})
|
|
except Exception as e:
|
|
return _make_rag_error_response(e, 'VectorStoreError', collection_id=collection_id)
|
|
|
|
@self.action(PluginToRuntimeAction.GET_KNOWLEDEGE_FILE_STREAM)
|
|
async def get_knowledge_file_stream(data: dict[str, Any]) -> handler.ActionResponse:
|
|
storage_path = data['storage_path']
|
|
try:
|
|
content_bytes = await self.ap.rag_runtime_service.get_file_stream(storage_path)
|
|
file_key = await self.send_file(content_bytes, '')
|
|
return handler.ActionResponse.success(data={'file_key': file_key})
|
|
except Exception as e:
|
|
return _make_rag_error_response(e, 'FileServiceError', storage_path=storage_path)
|
|
|
|
@self.action(PluginToRuntimeAction.LIST_PARSERS)
|
|
async def list_parsers(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Plugin requests host to list available parser plugins."""
|
|
mime_type = data.get('mime_type')
|
|
try:
|
|
parsers = await self.ap.knowledge_service.list_parsers(mime_type)
|
|
return handler.ActionResponse.success(data={'parsers': parsers})
|
|
except Exception as e:
|
|
return _make_rag_error_response(e, 'ParserDiscoveryError', mime_type=mime_type)
|
|
|
|
@self.action(PluginToRuntimeAction.INVOKE_PARSER)
|
|
async def invoke_parser(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Plugin requests host to invoke a parser plugin."""
|
|
plugin_author = data['plugin_author']
|
|
plugin_name = data['plugin_name']
|
|
storage_path = data['storage_path']
|
|
mime_type = data.get('mime_type', 'application/octet-stream')
|
|
filename = data.get('filename', '')
|
|
metadata = data.get('metadata', {})
|
|
try:
|
|
# Read file from storage
|
|
file_bytes = await self.ap.rag_runtime_service.get_file_stream(storage_path)
|
|
context_data = {
|
|
'mime_type': mime_type,
|
|
'filename': filename,
|
|
'metadata': metadata,
|
|
}
|
|
result = await self.ap.plugin_connector.call_parser(
|
|
f'{plugin_author}/{plugin_name}', context_data, file_bytes
|
|
)
|
|
return handler.ActionResponse.success(data=result)
|
|
except Exception as e:
|
|
return _make_rag_error_response(e, 'ParserError')
|
|
|
|
# ================= Knowledge Base Query APIs =================
|
|
|
|
@self.action(PluginToRuntimeAction.LIST_KNOWLEDGE_BASES)
|
|
async def list_knowledge_bases(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""List all knowledge bases available in the LangBot instance (unrestricted)."""
|
|
knowledge_bases = []
|
|
for kb_uuid, kb in self.ap.rag_mgr.knowledge_bases.items():
|
|
knowledge_bases.append(
|
|
{
|
|
'uuid': kb.get_uuid(),
|
|
'name': kb.get_name(),
|
|
'description': kb.knowledge_base_entity.description or '',
|
|
}
|
|
)
|
|
return handler.ActionResponse.success(data={'knowledge_bases': knowledge_bases})
|
|
|
|
@self.action(PluginToRuntimeAction.RETRIEVE_KNOWLEDGE)
|
|
async def retrieve_knowledge(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Retrieve documents from any knowledge base (unrestricted)."""
|
|
kb_id = data['kb_id']
|
|
query_text = data['query_text']
|
|
top_k = data.get('top_k', 5)
|
|
filters = data.get('filters', {})
|
|
|
|
kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_id)
|
|
if not kb:
|
|
return handler.ActionResponse.error(
|
|
message=f'Knowledge base {kb_id} not found',
|
|
)
|
|
|
|
try:
|
|
entries = await kb.retrieve(
|
|
query_text,
|
|
settings={
|
|
'top_k': top_k,
|
|
'filters': filters,
|
|
},
|
|
)
|
|
results = [entry.model_dump(mode='json') for entry in entries]
|
|
return handler.ActionResponse.success(data={'results': results})
|
|
except Exception as e:
|
|
return _make_rag_error_response(e, 'RetrievalError', kb_id=kb_id)
|
|
|
|
@self.action(PluginToRuntimeAction.LIST_PIPELINE_KNOWLEDGE_BASES)
|
|
async def list_pipeline_knowledge_bases(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""List knowledge bases configured for the current query's pipeline."""
|
|
query_id = data['query_id']
|
|
|
|
if query_id not in self.ap.query_pool.cached_queries:
|
|
return handler.ActionResponse.error(
|
|
message=f'Query with query_id {query_id} not found',
|
|
)
|
|
|
|
query = self.ap.query_pool.cached_queries[query_id]
|
|
|
|
kb_uuids = []
|
|
if query.pipeline_config:
|
|
local_agent_config = query.pipeline_config.get('ai', {}).get('local-agent', {})
|
|
kb_uuids = local_agent_config.get('knowledge-bases', [])
|
|
# Backward compatibility
|
|
if not kb_uuids:
|
|
old_kb_uuid = local_agent_config.get('knowledge-base', '')
|
|
if old_kb_uuid and old_kb_uuid != '__none__':
|
|
kb_uuids = [old_kb_uuid]
|
|
|
|
knowledge_bases = []
|
|
for kb_uuid in kb_uuids:
|
|
kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_uuid)
|
|
if kb:
|
|
knowledge_bases.append(
|
|
{
|
|
'uuid': kb.get_uuid(),
|
|
'name': kb.get_name(),
|
|
'description': kb.knowledge_base_entity.description or '',
|
|
}
|
|
)
|
|
|
|
return handler.ActionResponse.success(data={'knowledge_bases': knowledge_bases})
|
|
|
|
@self.action(PluginToRuntimeAction.RETRIEVE_KNOWLEDGE_BASE)
|
|
async def retrieve_knowledge_base(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Retrieve documents from a knowledge base within the pipeline's scope."""
|
|
query_id = data['query_id']
|
|
kb_id = data['kb_id']
|
|
query_text = data['query_text']
|
|
top_k = data.get('top_k', 5)
|
|
filters = data.get('filters', {})
|
|
|
|
if query_id not in self.ap.query_pool.cached_queries:
|
|
return handler.ActionResponse.error(
|
|
message=f'Query with query_id {query_id} not found',
|
|
)
|
|
|
|
query = self.ap.query_pool.cached_queries[query_id]
|
|
|
|
# Validate kb_id is in pipeline's allowed list
|
|
allowed_kb_uuids = []
|
|
if query.pipeline_config:
|
|
local_agent_config = query.pipeline_config.get('ai', {}).get('local-agent', {})
|
|
allowed_kb_uuids = local_agent_config.get('knowledge-bases', [])
|
|
if not allowed_kb_uuids:
|
|
old_kb_uuid = local_agent_config.get('knowledge-base', '')
|
|
if old_kb_uuid and old_kb_uuid != '__none__':
|
|
allowed_kb_uuids = [old_kb_uuid]
|
|
|
|
if kb_id not in allowed_kb_uuids:
|
|
return handler.ActionResponse.error(
|
|
message=f'Knowledge base {kb_id} is not configured for this pipeline',
|
|
)
|
|
|
|
kb = await self.ap.rag_mgr.get_knowledge_base_by_uuid(kb_id)
|
|
if not kb:
|
|
return handler.ActionResponse.error(
|
|
message=f'Knowledge base {kb_id} not found',
|
|
)
|
|
|
|
try:
|
|
session_name = f'{query.session.launcher_type.value}_{query.session.launcher_id}'
|
|
entries = await kb.retrieve(
|
|
query_text,
|
|
settings={
|
|
'top_k': top_k,
|
|
'filters': filters,
|
|
'session_name': session_name,
|
|
'bot_uuid': query.bot_uuid or '',
|
|
'sender_id': str(query.sender_id),
|
|
},
|
|
)
|
|
results = [entry.model_dump(mode='json') for entry in entries]
|
|
return handler.ActionResponse.success(data={'results': results})
|
|
except Exception as e:
|
|
return _make_rag_error_response(e, 'RetrievalError', kb_id=kb_id)
|
|
|
|
@self.action(CommonAction.PING)
|
|
async def ping(data: dict[str, Any]) -> handler.ActionResponse:
|
|
"""Ping"""
|
|
return handler.ActionResponse.success(
|
|
data={
|
|
'pong': 'pong',
|
|
},
|
|
)
|
|
|
|
async def ping(self) -> dict[str, Any]:
|
|
"""Ping the runtime"""
|
|
return await self.call_action(
|
|
CommonAction.PING,
|
|
{},
|
|
timeout=10,
|
|
)
|
|
|
|
async def install_plugin(
|
|
self, install_source: str, install_info: dict[str, Any]
|
|
) -> typing.AsyncGenerator[dict[str, Any], None]:
|
|
"""Install plugin"""
|
|
gen = self.call_action_generator(
|
|
LangBotToRuntimeAction.INSTALL_PLUGIN,
|
|
{
|
|
'install_source': install_source,
|
|
'install_info': install_info,
|
|
},
|
|
timeout=120,
|
|
)
|
|
|
|
async for ret in gen:
|
|
yield ret
|
|
|
|
async def upgrade_plugin(self, plugin_author: str, plugin_name: str) -> typing.AsyncGenerator[dict[str, Any], None]:
|
|
"""Upgrade plugin"""
|
|
gen = self.call_action_generator(
|
|
LangBotToRuntimeAction.UPGRADE_PLUGIN,
|
|
{
|
|
'plugin_author': plugin_author,
|
|
'plugin_name': plugin_name,
|
|
},
|
|
timeout=120,
|
|
)
|
|
|
|
async for ret in gen:
|
|
yield ret
|
|
|
|
async def delete_plugin(self, plugin_author: str, plugin_name: str) -> typing.AsyncGenerator[dict[str, Any], None]:
|
|
"""Delete plugin"""
|
|
gen = self.call_action_generator(
|
|
LangBotToRuntimeAction.DELETE_PLUGIN,
|
|
{
|
|
'plugin_author': plugin_author,
|
|
'plugin_name': plugin_name,
|
|
},
|
|
)
|
|
|
|
async for ret in gen:
|
|
yield ret
|
|
|
|
async def list_plugins(self) -> list[dict[str, Any]]:
|
|
"""List plugins"""
|
|
result = await self.call_action(
|
|
LangBotToRuntimeAction.LIST_PLUGINS,
|
|
{},
|
|
timeout=10,
|
|
)
|
|
|
|
return result['plugins']
|
|
|
|
async def get_plugin_info(self, author: str, plugin_name: str) -> dict[str, Any]:
|
|
"""Get plugin"""
|
|
result = await self.call_action(
|
|
LangBotToRuntimeAction.GET_PLUGIN_INFO,
|
|
{
|
|
'author': author,
|
|
'plugin_name': plugin_name,
|
|
},
|
|
timeout=10,
|
|
)
|
|
return result['plugin']
|
|
|
|
async def set_plugin_config(self, plugin_author: str, plugin_name: str, config: dict[str, Any]) -> dict[str, Any]:
|
|
"""Set plugin config"""
|
|
# update plugin setting
|
|
await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.update(persistence_plugin.PluginSetting)
|
|
.where(persistence_plugin.PluginSetting.plugin_author == plugin_author)
|
|
.where(persistence_plugin.PluginSetting.plugin_name == plugin_name)
|
|
.values(config=config)
|
|
)
|
|
|
|
# restart plugin
|
|
gen = self.call_action_generator(
|
|
LangBotToRuntimeAction.RESTART_PLUGIN,
|
|
{
|
|
'plugin_author': plugin_author,
|
|
'plugin_name': plugin_name,
|
|
},
|
|
)
|
|
async for ret in gen:
|
|
pass
|
|
|
|
return {}
|
|
|
|
async def emit_event(
|
|
self,
|
|
event_context: dict[str, Any],
|
|
include_plugins: list[str] | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Emit event"""
|
|
result = await self.call_action(
|
|
LangBotToRuntimeAction.EMIT_EVENT,
|
|
{
|
|
'event_context': event_context,
|
|
'include_plugins': include_plugins,
|
|
},
|
|
timeout=180,
|
|
)
|
|
|
|
return result
|
|
|
|
async def list_tools(self, include_plugins: list[str] | None = None) -> list[dict[str, Any]]:
|
|
"""List tools"""
|
|
result = await self.call_action(
|
|
LangBotToRuntimeAction.LIST_TOOLS,
|
|
{
|
|
'include_plugins': include_plugins,
|
|
},
|
|
timeout=20,
|
|
)
|
|
|
|
return result['tools']
|
|
|
|
async def get_plugin_icon(self, plugin_author: str, plugin_name: str) -> dict[str, Any]:
|
|
"""Get plugin icon"""
|
|
result = await self.call_action(
|
|
LangBotToRuntimeAction.GET_PLUGIN_ICON,
|
|
{
|
|
'plugin_author': plugin_author,
|
|
'plugin_name': plugin_name,
|
|
},
|
|
)
|
|
|
|
plugin_icon_file_key = result['plugin_icon_file_key']
|
|
mime_type = result['mime_type']
|
|
|
|
plugin_icon_bytes = await self.read_local_file(plugin_icon_file_key)
|
|
|
|
await self.delete_local_file(plugin_icon_file_key)
|
|
|
|
return {
|
|
'plugin_icon_base64': base64.b64encode(plugin_icon_bytes).decode('utf-8'),
|
|
'mime_type': mime_type,
|
|
}
|
|
|
|
async def get_plugin_readme(self, plugin_author: str, plugin_name: str, language: str = 'en') -> str:
|
|
"""Get plugin readme"""
|
|
try:
|
|
result = await self.call_action(
|
|
LangBotToRuntimeAction.GET_PLUGIN_README,
|
|
{
|
|
'plugin_author': plugin_author,
|
|
'plugin_name': plugin_name,
|
|
'language': language,
|
|
},
|
|
timeout=20,
|
|
)
|
|
except Exception:
|
|
traceback.print_exc()
|
|
return ''
|
|
|
|
readme_file_key = result.get('readme_file_key')
|
|
if not readme_file_key:
|
|
return ''
|
|
|
|
readme_bytes = await self.read_local_file(readme_file_key)
|
|
await self.delete_local_file(readme_file_key)
|
|
|
|
return readme_bytes.decode('utf-8')
|
|
|
|
async def get_plugin_assets(self, plugin_author: str, plugin_name: str, filepath: str) -> dict[str, Any]:
|
|
"""Get plugin assets"""
|
|
result = await self.call_action(
|
|
LangBotToRuntimeAction.GET_PLUGIN_ASSETS_FILE,
|
|
{
|
|
'plugin_author': plugin_author,
|
|
'plugin_name': plugin_name,
|
|
'file_path': filepath,
|
|
},
|
|
timeout=20,
|
|
)
|
|
asset_file_key = result['file_file_key']
|
|
if not asset_file_key:
|
|
return {
|
|
'asset_base64': '',
|
|
'mime_type': '',
|
|
}
|
|
mime_type = result['mime_type']
|
|
asset_bytes = await self.read_local_file(asset_file_key)
|
|
await self.delete_local_file(asset_file_key)
|
|
return {
|
|
'asset_base64': base64.b64encode(asset_bytes).decode('utf-8'),
|
|
'mime_type': mime_type,
|
|
}
|
|
|
|
async def handle_page_api(
|
|
self,
|
|
plugin_author: str,
|
|
plugin_name: str,
|
|
page_id: str,
|
|
endpoint: str,
|
|
method: str,
|
|
body: Any = None,
|
|
) -> dict[str, Any]:
|
|
"""Forward a page API call to the plugin via runtime."""
|
|
result = await self.call_action(
|
|
LangBotToRuntimeAction.PAGE_API,
|
|
{
|
|
'plugin_author': plugin_author,
|
|
'plugin_name': plugin_name,
|
|
'page_id': page_id,
|
|
'endpoint': endpoint,
|
|
'method': method,
|
|
'body': body,
|
|
},
|
|
timeout=30,
|
|
)
|
|
return result
|
|
|
|
async def cleanup_plugin_data(self, plugin_author: str, plugin_name: str) -> None:
|
|
"""Cleanup plugin settings and binary storage"""
|
|
# Delete plugin settings
|
|
await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.delete(persistence_plugin.PluginSetting)
|
|
.where(persistence_plugin.PluginSetting.plugin_author == plugin_author)
|
|
.where(persistence_plugin.PluginSetting.plugin_name == plugin_name)
|
|
)
|
|
|
|
# Delete all binary storage for this plugin
|
|
owner = f'{plugin_author}/{plugin_name}'
|
|
await self.ap.persistence_mgr.execute_async(
|
|
sqlalchemy.delete(persistence_bstorage.BinaryStorage)
|
|
.where(persistence_bstorage.BinaryStorage.owner_type == 'plugin')
|
|
.where(persistence_bstorage.BinaryStorage.owner == owner)
|
|
)
|
|
|
|
async def call_tool(
|
|
self,
|
|
tool_name: str,
|
|
parameters: dict[str, Any],
|
|
session: dict[str, Any],
|
|
query_id: int,
|
|
include_plugins: list[str] | None = None,
|
|
) -> dict[str, Any]:
|
|
"""Call tool"""
|
|
result = await self.call_action(
|
|
LangBotToRuntimeAction.CALL_TOOL,
|
|
{
|
|
'tool_name': tool_name,
|
|
'tool_parameters': parameters,
|
|
'session': session,
|
|
'query_id': query_id,
|
|
'include_plugins': include_plugins,
|
|
},
|
|
timeout=180,
|
|
)
|
|
|
|
return result['tool_response']
|
|
|
|
async def list_commands(self, include_plugins: list[str] | None = None) -> list[dict[str, Any]]:
|
|
"""List commands"""
|
|
result = await self.call_action(
|
|
LangBotToRuntimeAction.LIST_COMMANDS,
|
|
{
|
|
'include_plugins': include_plugins,
|
|
},
|
|
timeout=10,
|
|
)
|
|
return result['commands']
|
|
|
|
async def execute_command(
|
|
self, command_context: dict[str, Any], include_plugins: list[str] | None = None
|
|
) -> typing.AsyncGenerator[dict[str, Any], None]:
|
|
"""Execute command"""
|
|
gen = self.call_action_generator(
|
|
LangBotToRuntimeAction.EXECUTE_COMMAND,
|
|
{
|
|
'command_context': command_context,
|
|
'include_plugins': include_plugins,
|
|
},
|
|
timeout=180,
|
|
)
|
|
|
|
async for ret in gen:
|
|
yield ret
|
|
|
|
async def retrieve_knowledge(
|
|
self,
|
|
plugin_author: str,
|
|
plugin_name: str,
|
|
retriever_name: str,
|
|
retrieval_context: dict[str, Any],
|
|
) -> dict[str, Any]:
|
|
"""Retrieve knowledge"""
|
|
result = await self.call_action(
|
|
LangBotToRuntimeAction.RETRIEVE_KNOWLEDGE,
|
|
{
|
|
'plugin_author': plugin_author,
|
|
'plugin_name': plugin_name,
|
|
'retriever_name': retriever_name,
|
|
'retrieval_context': retrieval_context,
|
|
},
|
|
timeout=30,
|
|
)
|
|
return result
|
|
|
|
async def get_debug_info(self) -> dict[str, Any]:
|
|
"""Get debug information including debug key and WS URL"""
|
|
result = await self.call_action(
|
|
LangBotToRuntimeAction.GET_DEBUG_INFO,
|
|
{},
|
|
timeout=10,
|
|
)
|
|
return result
|
|
|
|
# ================= RAG Capability Callers (LangBot -> Runtime) =================
|
|
|
|
async def rag_ingest_document(
|
|
self, plugin_author: str, plugin_name: str, context_data: dict[str, Any]
|
|
) -> dict[str, Any]:
|
|
"""Send INGEST_DOCUMENT action to runtime."""
|
|
result = await self.call_action(
|
|
LangBotToRuntimeAction.RAG_INGEST_DOCUMENT,
|
|
{'plugin_author': plugin_author, 'plugin_name': plugin_name, 'context': context_data},
|
|
timeout=1200, # Ingestion can be slow for large documents
|
|
)
|
|
return result
|
|
|
|
async def rag_delete_document(self, plugin_author: str, plugin_name: str, document_id: str, kb_id: str) -> bool:
|
|
result = await self.call_action(
|
|
LangBotToRuntimeAction.RAG_DELETE_DOCUMENT,
|
|
{'plugin_author': plugin_author, 'plugin_name': plugin_name, 'document_id': document_id, 'kb_id': kb_id},
|
|
timeout=30,
|
|
)
|
|
return result.get('success', False)
|
|
|
|
async def rag_on_kb_create(
|
|
self, plugin_author: str, plugin_name: str, kb_id: str, config: dict[str, Any]
|
|
) -> dict[str, Any]:
|
|
"""Notify plugin about KB creation."""
|
|
result = await self.call_action(
|
|
LangBotToRuntimeAction.RAG_ON_KB_CREATE,
|
|
{'plugin_author': plugin_author, 'plugin_name': plugin_name, 'kb_id': kb_id, 'config': config},
|
|
timeout=30,
|
|
)
|
|
return result
|
|
|
|
async def rag_on_kb_delete(self, plugin_author: str, plugin_name: str, kb_id: str) -> dict[str, Any]:
|
|
"""Notify plugin about KB deletion."""
|
|
result = await self.call_action(
|
|
LangBotToRuntimeAction.RAG_ON_KB_DELETE,
|
|
{'plugin_author': plugin_author, 'plugin_name': plugin_name, 'kb_id': kb_id},
|
|
timeout=30,
|
|
)
|
|
return result
|
|
|
|
async def get_rag_creation_schema(self, plugin_author: str, plugin_name: str) -> dict[str, Any]:
|
|
return await self.call_action(
|
|
LangBotToRuntimeAction.GET_RAG_CREATION_SETTINGS_SCHEMA,
|
|
{'plugin_author': plugin_author, 'plugin_name': plugin_name},
|
|
timeout=10,
|
|
)
|
|
|
|
async def get_rag_retrieval_schema(self, plugin_author: str, plugin_name: str) -> dict[str, Any]:
|
|
return await self.call_action(
|
|
LangBotToRuntimeAction.GET_RAG_RETRIEVAL_SETTINGS_SCHEMA,
|
|
{'plugin_author': plugin_author, 'plugin_name': plugin_name},
|
|
timeout=10,
|
|
)
|
|
|
|
async def list_knowledge_engines(self) -> list[dict[str, Any]]:
|
|
"""List all available Knowledge Engines from plugins."""
|
|
result = await self.call_action(LangBotToRuntimeAction.LIST_KNOWLEDGE_ENGINES, {}, timeout=60)
|
|
return result.get('engines', [])
|
|
|
|
# ================= Parser Capability Callers (LangBot -> Runtime) =================
|
|
|
|
async def list_parsers(self) -> list[dict[str, Any]]:
|
|
"""List all available parsers from plugins."""
|
|
result = await self.call_action(LangBotToRuntimeAction.LIST_PARSERS, {}, timeout=60)
|
|
return result.get('parsers', [])
|
|
|
|
async def parse_document(
|
|
self, plugin_author: str, plugin_name: str, context_data: dict[str, Any], file_bytes: bytes
|
|
) -> dict[str, Any]:
|
|
"""Send PARSE_DOCUMENT action to runtime.
|
|
|
|
Sends file content via chunked FILE_CHUNK transfer, then invokes
|
|
the PARSE_DOCUMENT action with a file_key reference.
|
|
"""
|
|
# Send file to runtime via chunked transfer
|
|
file_key = await self.send_file(file_bytes, '')
|
|
|
|
# Include file_key in context_data for the runtime to read
|
|
context_data['file_key'] = file_key
|
|
|
|
result = await self.call_action(
|
|
LangBotToRuntimeAction.PARSE_DOCUMENT,
|
|
{'plugin_author': plugin_author, 'plugin_name': plugin_name, 'context': context_data},
|
|
timeout=300,
|
|
)
|
|
return result
|