From 1a10b40b179f898d34cb30a6bc363734d9fea50b Mon Sep 17 00:00:00 2001 From: Junyan Qin Date: Wed, 2 Jul 2025 12:46:30 +0800 Subject: [PATCH] refactor: use `emit_event` from connector --- pkg/pipeline/pipelinemgr.py | 11 +---- pkg/pipeline/preproc/preproc.py | 19 +++---- pkg/pipeline/process/handlers/chat.py | 21 +++----- pkg/pipeline/process/handlers/command.py | 27 ++++------ pkg/pipeline/wrapper/wrapper.py | 63 ++++++++++-------------- pkg/plugin/connector.py | 10 +++- 6 files changed, 59 insertions(+), 92 deletions(-) diff --git a/pkg/pipeline/pipelinemgr.py b/pkg/pipeline/pipelinemgr.py index 6abb3972..5719b9e6 100644 --- a/pkg/pipeline/pipelinemgr.py +++ b/pkg/pipeline/pipelinemgr.py @@ -16,7 +16,6 @@ from ..utils import importutil import langbot_plugin.api.entities.builtin.provider.session as provider_session import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query -import langbot_plugin.api.entities.context as event_context from . import ( resprule, @@ -191,15 +190,7 @@ class RuntimePipeline: message_chain=query.message_chain, ) - event_ctx = event_context.EventContext( - event=event_obj, - ) - - event_ctx_data = event_ctx.model_dump(serialize_as_any=True) - - event_ctx_result = await self.ap.plugin_connector.handler.emit_event(event_ctx_data) - - event_ctx = event_context.EventContext.parse_from_dict(event_ctx_result['event_context']) + event_ctx = await self.ap.plugin_connector.emit_event(event_obj) if event_ctx.is_prevented_default(): return diff --git a/pkg/pipeline/preproc/preproc.py b/pkg/pipeline/preproc/preproc.py index 344a136c..5b82bdf4 100644 --- a/pkg/pipeline/preproc/preproc.py +++ b/pkg/pipeline/preproc/preproc.py @@ -7,7 +7,6 @@ from langbot_plugin.api.entities.builtin.provider import message as provider_mes import langbot_plugin.api.entities.events as events import langbot_plugin.api.entities.builtin.platform.message as platform_message import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query -import langbot_plugin.api.entities.context as event_context @stage.stage_class('PreProcessor') @@ -109,20 +108,14 @@ class PreProcessor(stage.PipelineStage): query.user_message = provider_message.Message(role='user', content=content_list) # =========== 触发事件 PromptPreProcessing - event_ctx = event_context.EventContext( - event=events.PromptPreProcessing( - session_name=f'{query.session.launcher_type.value}_{query.session.launcher_id}', - default_prompt=query.prompt.messages, - prompt=query.messages, - query=query, - ) + event = events.PromptPreProcessing( + session_name=f'{query.session.launcher_type.value}_{query.session.launcher_id}', + default_prompt=query.prompt.messages, + prompt=query.messages, + query=query, ) - event_ctx_result = await self.ap.plugin_connector.handler.emit_event( - event_ctx.model_dump(serialize_as_any=True) - ) - - event_ctx = event_context.EventContext.parse_from_dict(event_ctx_result['event_context']) + event_ctx = await self.ap.plugin_connector.emit_event(event) query.prompt.messages = event_ctx.event.default_prompt query.messages = event_ctx.event.prompt diff --git a/pkg/pipeline/process/handlers/chat.py b/pkg/pipeline/process/handlers/chat.py index 24f4553d..5bb5f07b 100644 --- a/pkg/pipeline/process/handlers/chat.py +++ b/pkg/pipeline/process/handlers/chat.py @@ -14,7 +14,6 @@ from ....utils import importutil from ....provider import runners import langbot_plugin.api.entities.builtin.provider.session as provider_session import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query -import langbot_plugin.api.entities.context as event_context importutil.import_modules_in_pkg(runners) @@ -36,21 +35,15 @@ class ChatMessageHandler(handler.MessageHandler): else events.GroupNormalMessageReceived ) - event_ctx = event_context.EventContext( - event=event_class( - launcher_type=query.launcher_type.value, - launcher_id=query.launcher_id, - sender_id=query.sender_id, - text_message=str(query.message_chain), - query=query, - ) + event = event_class( + launcher_type=query.launcher_type.value, + launcher_id=query.launcher_id, + sender_id=query.sender_id, + text_message=str(query.message_chain), + query=query, ) - event_ctx_result = await self.ap.plugin_connector.handler.emit_event( - event_ctx.model_dump(serialize_as_any=True) - ) - - event_ctx = event_context.EventContext.parse_from_dict(event_ctx_result['event_context']) + event_ctx = await self.ap.plugin_connector.emit_event(event) if event_ctx.is_prevented_default(): if event_ctx.event.reply is not None: diff --git a/pkg/pipeline/process/handlers/command.py b/pkg/pipeline/process/handlers/command.py index cc659955..7a2ec08f 100644 --- a/pkg/pipeline/process/handlers/command.py +++ b/pkg/pipeline/process/handlers/command.py @@ -8,7 +8,6 @@ import langbot_plugin.api.entities.builtin.provider.message as provider_message import langbot_plugin.api.entities.builtin.platform.message as platform_message import langbot_plugin.api.entities.builtin.provider.session as provider_session import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query -import langbot_plugin.api.entities.context as event_context import langbot_plugin.api.entities.events as events @@ -34,24 +33,18 @@ class CommandHandler(handler.MessageHandler): else events.GroupCommandSent ) - event_ctx = event_context.EventContext( - event=event_class( - launcher_type=query.launcher_type.value, - launcher_id=query.launcher_id, - sender_id=query.sender_id, - command=spt[0], - params=spt[1:] if len(spt) > 1 else [], - text_message=str(query.message_chain), - is_admin=(privilege == 2), - query=query, - ) + event = event_class( + launcher_type=query.launcher_type.value, + launcher_id=query.launcher_id, + sender_id=query.sender_id, + command=spt[0], + params=spt[1:] if len(spt) > 1 else [], + text_message=str(query.message_chain), + is_admin=(privilege == 2), + query=query, ) - event_ctx_result = await self.ap.plugin_connector.handler.emit_event( - event_ctx.model_dump(serialize_as_any=True) - ) - - event_ctx = event_context.EventContext.parse_from_dict(event_ctx_result['event_context']) + event_ctx = await self.ap.plugin_connector.emit_event(event) if event_ctx.is_prevented_default(): if event_ctx.event.reply is not None: diff --git a/pkg/pipeline/wrapper/wrapper.py b/pkg/pipeline/wrapper/wrapper.py index ba17091c..f3323622 100644 --- a/pkg/pipeline/wrapper/wrapper.py +++ b/pkg/pipeline/wrapper/wrapper.py @@ -7,7 +7,6 @@ from .. import stage import langbot_plugin.api.entities.builtin.platform.message as platform_message import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query -import langbot_plugin.api.entities.context as event_context import langbot_plugin.api.entities.events as events @@ -59,27 +58,21 @@ class ResponseWrapper(stage.PipelineStage): reply_text = str(result.get_content_platform_message_chain()) # ============= 触发插件事件 =============== - event_ctx = event_context.EventContext( - event=events.NormalMessageResponded( - launcher_type=query.launcher_type.value, - launcher_id=query.launcher_id, - sender_id=query.sender_id, - session=session, - prefix='', - response_text=reply_text, - finish_reason='stop', - funcs_called=[fc.function.name for fc in result.tool_calls] - if result.tool_calls is not None - else [], - query=query, - ) + event = events.NormalMessageResponded( + launcher_type=query.launcher_type.value, + launcher_id=query.launcher_id, + sender_id=query.sender_id, + session=session, + prefix='', + response_text=reply_text, + finish_reason='stop', + funcs_called=[fc.function.name for fc in result.tool_calls] + if result.tool_calls is not None + else [], + query=query, ) - serialized_event_ctx = event_ctx.model_dump(serialize_as_any=True) - - event_ctx_result = await self.ap.plugin_connector.handler.emit_event(serialized_event_ctx) - - event_ctx = event_context.EventContext.parse_from_dict(event_ctx_result['event_context']) + event_ctx = await self.ap.plugin_connector.emit_event(event) if event_ctx.is_prevented_default(): yield entities.StageProcessResult( @@ -108,25 +101,21 @@ class ResponseWrapper(stage.PipelineStage): ) if query.pipeline_config['output']['misc']['track-function-calls']: - event_ctx = event_context.EventContext( - event=events.NormalMessageResponded( - launcher_type=query.launcher_type.value, - launcher_id=query.launcher_id, - sender_id=query.sender_id, - session=session, - prefix='', - response_text=reply_text, - finish_reason='stop', - funcs_called=[fc.function.name for fc in result.tool_calls] - if result.tool_calls is not None - else [], - query=query, - ) + event = events.NormalMessageResponded( + launcher_type=query.launcher_type.value, + launcher_id=query.launcher_id, + sender_id=query.sender_id, + session=session, + prefix='', + response_text=reply_text, + finish_reason='stop', + funcs_called=[fc.function.name for fc in result.tool_calls] + if result.tool_calls is not None + else [], + query=query, ) - event_ctx_result = await self.ap.plugin_connector.handler.emit_event(serialized_event_ctx) - - event_ctx = event_context.EventContext.parse_from_dict(event_ctx_result['event_context']) + event_ctx = await self.ap.plugin_connector.emit_event(event) if event_ctx.is_prevented_default(): yield entities.StageProcessResult( diff --git a/pkg/plugin/connector.py b/pkg/plugin/connector.py index 26cf6fb7..302ee600 100644 --- a/pkg/plugin/connector.py +++ b/pkg/plugin/connector.py @@ -66,4 +66,12 @@ class PluginRuntimeConnector: self, event: events.BaseEventModel, ) -> context.EventContext: - pass + event_ctx = context.EventContext( + event=event, + ) + + event_ctx_result = await self.handler.emit_event(event_ctx.model_dump(serialize_as_any=True)) + + event_ctx = context.EventContext.parse_from_dict(event_ctx_result['event_context']) + + return event_ctx