Compare commits

...

10 Commits

Author SHA1 Message Date
Junyan Qin
4fd488b97a chore: Bump version to 4.8.6 in pyproject.toml, uv.lock, and __init__.py 2026-02-26 22:54:13 +08:00
Junyan Qin
422a34ead4 fix: plugins in recommendation cannot be installed 2026-02-26 22:53:29 +08:00
Junyan Qin
02a1036d63 chore: Bump version to 4.8.5 in pyproject.toml and __init__.py 2026-02-26 14:34:23 +08:00
Junyan Chin
2d837c9cb4 feat: add in-product survey system (#2008)
* feat: add in-product survey system

- SurveyManager: event-based trigger, Space API communication
- Trigger on first successful non-WebSocket response
- Backend API: /api/v1/survey/{pending,respond,dismiss}
- Frontend: floating survey widget with progressive questions
- Flat radio/checkbox style (not dropdown Select)

* fix: persist triggered survey events to disk across restarts

Store triggered events in data/survey_triggered_events.json so that
restarting the process doesn't re-query Space for already-triggered events.

* fix: use metadata table for survey event persistence instead of file

Store triggered events in the existing metadata KV table
(key='survey_triggered_events') instead of a standalone JSON file.

* fix: ruff format and prettier fixes
2026-02-26 13:50:14 +08:00
Junyan Chin
2ded774747 docs: add LangBot Cloud references to all READMEs (#2007) 2026-02-25 22:18:22 +08:00
Junyan Chin
d9a630b8c1 feat: add session message monitoring tab to bot detail dialog (#2005)
* feat: add session message monitoring tab to bot detail dialog

Add a new "Sessions" tab in the bot detail dialog that displays
sent & received messages grouped by sessions. Users can select
any session to view its messages in a chat-bubble style layout.

Backend changes:
- Add sessionId filter to monitoring messages endpoint
- Add role column to MonitoringMessage (user/assistant)
- Record bot responses in monitoring via record_query_response()
- Add DB migration (dbm019) for the new role column

Frontend changes:
- New BotSessionMonitor component with session list + message viewer
- Add Sessions sidebar tab to BotDetailDialog
- Add getBotSessions/getSessionMessages API methods to BackendClient
- Add i18n translations (en-US, zh-Hans, zh-Hant, ja-JP)

Generated with [Claude Code](https://claude.ai/code)
via [Happy](https://happy.engineering)

Co-Authored-By: Claude <noreply@anthropic.com>
Co-Authored-By: Happy <yesreply@happy.engineering>

* refactor: remove outdated version comment from PipelineManager class

* fix: bump required_database_version to 19 to trigger monitoring_messages.role migration

* fix: prevent session message auto-scroll from pushing dialog content out of view

Replace scrollIntoView (which scrolls all ancestor containers) with
direct scrollTop manipulation on the ScrollArea viewport. This keeps
the scroll contained within the messages panel only.

* ui: redesign BotSessionMonitor with polished chat UI

- Wider session list (w-72) with avatar circles and cleaner layout
- Richer chat header with avatar, platform info, and active indicator
- User messages now use blue-500 (solid) instead of blue-100 for
  clear visual distinction
- Metadata (time, runner) shown on hover below bubbles, not inside
- Proper empty state illustrations for both panels
- Better spacing, rounded corners, and shadow treatment
- Consistent dark mode styling

* fix: infinite re-render loop in DynamicFormComponent

The useEffect depended on onSubmit which was a new closure every
parent render. Calling onSubmit inside the effect triggered parent
state update → re-render → new onSubmit ref → effect re-runs → loop.

Fix: use useRef to hold a stable reference to onSubmit, removing it
from the useEffect dependency array.

Also add DialogDescription to BotDetailDialog to suppress Radix
aria-describedby warning.

* fix: remove .html suffix from docs.langbot.app links (Mintlify migration)

* style: fix prettier and ruff formatting

---------

Co-authored-by: Claude <noreply@anthropic.com>
Co-authored-by: Happy <yesreply@happy.engineering>
2026-02-25 21:56:24 +08:00
Guanchao Wang
b8df0dbd7f feat: message aggregator (#1985)
* feat: aggregator

* fix: resolve deadlock, mutation, and safety issues in message aggregator

- Fix deadlock: don't await cancelled timer tasks inside the lock;
  _flush_buffer acquires the same lock, causing a deadlock cycle
- Fix message_event mutation: keep original message_event unmodified
  to preserve message_id/metadata for reply/quote; only pass merged
  message_chain separately
- Fix Plain positional arg: Plain('\n') → Plain(text='\n')
- Fix float() ValueError: wrap delay cast in try/except
- Add MAX_BUFFER_MESSAGES (10) cap to prevent unbounded buffer growth
- Default enabled to false to avoid surprising latency on upgrade
- Fix flush_all: cancel all timers under one lock acquisition, then
  flush outside the lock to avoid deadlock

---------

Co-authored-by: RockChinQ <rockchinq@gmail.com>
2026-02-25 14:20:34 +08:00
Dongze Yang
298437f352 feat(platform): add Forward message support for aiocqhttp adapter (#2003)
* feat(platform): add Forward message support for aiocqhttp adapter

- Add _send_forward_message method to send merged forward cards via OneBot API
- Support NapCat's send_forward_msg API with fallback to send_group_forward_msg
- Fix MessageChain deserialization for Forward messages in handler.py
- Properly deserialize nested ForwardMessageNode.message_chain to preserve data

This enables plugins to send QQ merged forward cards through the standard
LangBot send_message API using the Forward message component.

* style: fix ruff lint and format issues

- Remove f-string prefix from log message without placeholders
- Apply ruff format to aiocqhttp.py and handler.py

* refactor: remove custom deserializer, rely on SDK for Forward deserialization

- Remove _deserialize_message_chain from handler.py; use standard
  MessageChain.model_validate() (Forward handling fixed in SDK via
  langbot-app/langbot-plugin-sdk#38)
- Fix group_id type: use int instead of str for OneBot compatibility
- Add warning log when Forward message is used with non-group target

* chore: bump langbot-plugin to 0.2.7 (Forward deserialization fix)

---------

Co-authored-by: RockChinQ <rockchinq@gmail.com>
2026-02-25 14:03:17 +08:00
Dongze Yang
94d72c378c fix(web): emit initial form values on mount to prevent saving empty config (#2004)
DynamicFormComponent uses form.watch(callback) to notify parent of form
values, but react-hook-form's watch callback only fires on subsequent
changes, not on mount. This causes PluginForm's currentFormValues to
remain as {} if the user saves without modifying any field, overwriting
the existing plugin config with an empty object in the database.
2026-02-25 13:34:52 +08:00
fdc310
f09ba6a0e3 fix: Add the file upload function and optimize the media message proc… (#2002)
* fix: Add the file upload function and optimize the media message processing

* fix: Optimize the message processing logic, improve the concatenation of text elements and the sending of media messages

* fix: Simplify the file request construction and message processing logic to enhance code readability
2026-02-25 12:24:16 +08:00
43 changed files with 2125 additions and 71 deletions

View File

@@ -19,9 +19,10 @@ English / [简体中文](README_CN.md) / [繁體中文](README_TW.md) / [日本
[![GitHub stars](https://img.shields.io/github/stars/langbot-app/LangBot?style=social)](https://github.com/langbot-app/LangBot/stargazers)
<a href="https://langbot.app">Website</a>
<a href="https://docs.langbot.app/en/insight/features.html">Features</a>
<a href="https://docs.langbot.app/en/insight/guide.html">Docs</a>
<a href="https://docs.langbot.app/en/tags/readme.html">API</a>
<a href="https://docs.langbot.app/en/insight/features">Features</a>
<a href="https://docs.langbot.app/en/insight/guide">Docs</a>
<a href="https://docs.langbot.app/en/tags/readme">API</a>
<a href="https://space.langbot.app/cloud">Cloud</a>
<a href="https://space.langbot.app">Plugin Market</a>
<a href="https://langbot.featurebase.app/roadmap">Roadmap</a>
@@ -44,12 +45,16 @@ LangBot is an **open-source, production-grade platform** for building AI-powered
- **Web Management Panel** — Configure, manage, and monitor your bots through an intuitive browser interface. No YAML editing required.
- **Multi-Pipeline Architecture** — Different bots for different scenarios, with comprehensive monitoring and exception handling.
[→ Learn more about all features](https://docs.langbot.app/en/insight/features.html)
[→ Learn more about all features](https://docs.langbot.app/en/insight/features)
---
## Quick Start
### ☁️ LangBot Cloud (Recommended)
**[LangBot Cloud](https://space.langbot.app/cloud)** — Zero deployment, ready to use.
### One-Line Launch
```bash
@@ -71,7 +76,7 @@ docker compose up -d
[![Deploy on Zeabur](https://zeabur.com/button.svg)](https://zeabur.com/en-US/templates/ZKTBDH)
[![Deploy on Railway](https://railway.com/button.svg)](https://railway.app/template/yRrAyL?referralCode=vogKPF)
**More options:** [Docker](https://docs.langbot.app/en/deploy/langbot/docker.html) · [Manual](https://docs.langbot.app/en/deploy/langbot/manual.html) · [BTPanel](https://docs.langbot.app/en/deploy/langbot/one-click/bt.html) · [Kubernetes](./docker/README_K8S.md)
**More options:** [Docker](https://docs.langbot.app/en/deploy/langbot/docker) · [Manual](https://docs.langbot.app/en/deploy/langbot/manual) · [BTPanel](https://docs.langbot.app/en/deploy/langbot/one-click/bt) · [Kubernetes](./docker/README_K8S.md)
---
@@ -119,7 +124,7 @@ docker compose up -d
| [接口 AI](https://jiekou.ai/) | Gateway | ✅ |
| [302.AI](https://share.302.ai/SuTG99) | Gateway | ✅ |
[→ View all integrations](https://docs.langbot.app/en/insight/features.html)
[→ View all integrations](https://docs.langbot.app/en/insight/features)
---

View File

@@ -24,6 +24,7 @@
<a href="https://docs.langbot.app/zh/insight/features.html">特性</a>
<a href="https://docs.langbot.app/zh/insight/guide.html">文档</a>
<a href="https://docs.langbot.app/zh/tags/readme.html">API</a>
<a href="https://space.langbot.app/cloud">Cloud</a>
<a href="https://space.langbot.app">插件市场</a>
<a href="https://langbot.featurebase.app/roadmap">路线图</a>
@@ -52,6 +53,10 @@ LangBot 是一个**开源的生产级平台**,用于构建 AI 驱动的即时
## 快速开始
### ☁️ LangBot Cloud推荐
**[LangBot Cloud](https://space.langbot.app/cloud)** — 免部署,开箱即用。
### 一键启动
```bash

View File

@@ -50,6 +50,10 @@ LangBot es una **plataforma de código abierto y grado de producción** para con
## Inicio Rápido
### ☁️ LangBot Cloud (Recomendado)
**[LangBot Cloud](https://space.langbot.app/cloud)** — Sin despliegue, listo para usar.
### Lanzamiento en una línea
```bash

View File

@@ -50,6 +50,10 @@ LangBot est une **plateforme open-source de niveau production** pour créer des
## Démarrage Rapide
### ☁️ LangBot Cloud (Recommandé)
**[LangBot Cloud](https://space.langbot.app/cloud)** — Sans déploiement, prêt à utiliser.
### Lancement en une ligne
```bash

View File

@@ -50,6 +50,10 @@ LangBot は、AI搭載のインスタントメッセージングボットを構
## クイックスタート
### ☁️ LangBot Cloud推奨
**[LangBot Cloud](https://space.langbot.app/cloud)** — デプロイ不要、すぐに使えます。
### ワンライン起動
```bash

View File

@@ -50,6 +50,10 @@ LangBot은 AI 기반 인스턴트 메시징 봇을 구축하기 위한 **오픈
## 빠른 시작
### ☁️ LangBot Cloud (추천)
**[LangBot Cloud](https://space.langbot.app/cloud)** — 배포 없이 바로 사용.
### 원라인 실행
```bash

View File

@@ -50,6 +50,10 @@ LangBot — это **платформа с открытым исходным к
## Быстрый старт
### ☁️ LangBot Cloud (Рекомендуется)
**[LangBot Cloud](https://space.langbot.app/cloud)** — Без развёртывания, готово к использованию.
### Запуск одной командой
```bash

View File

@@ -52,6 +52,10 @@ LangBot 是一個**開源的生產級平台**,用於建構 AI 驅動的即時
## 快速開始
### ☁️ LangBot Cloud推薦
**[LangBot Cloud](https://space.langbot.app/cloud)** — 免部署,開箱即用。
### 一鍵啟動
```bash

View File

@@ -50,6 +50,10 @@ LangBot là một **nền tảng mã nguồn mở, cấp sản xuất** để x
## Bắt đầu nhanh
### ☁️ LangBot Cloud (Khuyên dùng)
**[LangBot Cloud](https://space.langbot.app/cloud)** — Không cần triển khai, sẵn sàng sử dụng.
### Khởi chạy một dòng
```bash

View File

@@ -1,6 +1,6 @@
[project]
name = "langbot"
version = "4.8.4"
version = "4.8.6"
description = "Production-grade platform for building agentic IM bots"
readme = "README.md"
license-files = ["LICENSE"]
@@ -64,7 +64,7 @@ dependencies = [
"chromadb>=0.4.24",
"qdrant-client (>=1.15.1,<2.0.0)",
"pyseekdb==1.0.0b7",
"langbot-plugin==0.2.6",
"langbot-plugin==0.2.7",
"asyncpg>=0.30.0",
"line-bot-sdk>=3.19.0",
"tboxsdk>=0.0.10",

View File

@@ -1,3 +1,3 @@
"""LangBot - Production-grade platform for building agentic IM bots"""
__version__ = '4.8.4'
__version__ = '4.8.6'

View File

@@ -52,6 +52,7 @@ class MonitoringRouterGroup(group.RouterGroup):
# Parse query parameters
bot_ids = quart.request.args.getlist('botId')
pipeline_ids = quart.request.args.getlist('pipelineId')
session_ids = quart.request.args.getlist('sessionId')
start_time_str = quart.request.args.get('startTime')
end_time_str = quart.request.args.get('endTime')
limit = int(quart.request.args.get('limit', 100))
@@ -64,6 +65,7 @@ class MonitoringRouterGroup(group.RouterGroup):
messages, total = await self.ap.monitoring_service.get_messages(
bot_ids=bot_ids if bot_ids else None,
pipeline_ids=pipeline_ids if pipeline_ids else None,
session_ids=session_ids if session_ids else None,
start_time=start_time,
end_time=end_time,
limit=limit,

View File

@@ -0,0 +1,47 @@
import quart
from .. import group
@group.group_class('survey', '/api/v1/survey')
class SurveyRouterGroup(group.RouterGroup):
async def initialize(self) -> None:
@self.route('/pending', methods=['GET'], auth_type=group.AuthType.USER_TOKEN)
async def _get_pending() -> str:
"""Get pending survey for the frontend to display."""
survey = self.ap.survey.get_pending_survey() if self.ap.survey else None
return self.success(data={'survey': survey})
@self.route('/respond', methods=['POST'], auth_type=group.AuthType.USER_TOKEN)
async def _respond() -> str:
"""Submit survey response."""
json_data = await quart.request.json
survey_id = json_data.get('survey_id')
answers = json_data.get('answers', {})
completed = json_data.get('completed', True)
if not survey_id:
return self.fail(1, 'survey_id required')
if self.ap.survey:
ok = await self.ap.survey.submit_response(survey_id, answers, completed)
if ok:
return self.success()
return self.fail(2, 'Failed to submit response')
return self.fail(3, 'Survey not available')
@self.route('/dismiss', methods=['POST'], auth_type=group.AuthType.USER_TOKEN)
async def _dismiss() -> str:
"""Dismiss survey."""
json_data = await quart.request.json
survey_id = json_data.get('survey_id')
if not survey_id:
return self.fail(1, 'survey_id required')
if self.ap.survey:
ok = await self.ap.survey.dismiss_survey(survey_id)
if ok:
return self.success()
return self.fail(2, 'Failed to dismiss')
return self.fail(3, 'Survey not available')

View File

@@ -32,6 +32,7 @@ class MonitoringService:
user_id: str | None = None,
runner_name: str | None = None,
variables: str | None = None,
role: str = 'user',
) -> str:
"""Record a message"""
message_id = str(uuid.uuid4())
@@ -50,6 +51,7 @@ class MonitoringService:
'user_id': user_id,
'runner_name': runner_name,
'variables': variables,
'role': role,
}
await self.ap.persistence_mgr.execute_async(
@@ -355,6 +357,7 @@ class MonitoringService:
self,
bot_ids: list[str] | None = None,
pipeline_ids: list[str] | None = None,
session_ids: list[str] | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int = 100,
@@ -367,6 +370,8 @@ class MonitoringService:
conditions.append(persistence_monitoring.MonitoringMessage.bot_id.in_(bot_ids))
if pipeline_ids:
conditions.append(persistence_monitoring.MonitoringMessage.pipeline_id.in_(pipeline_ids))
if session_ids:
conditions.append(persistence_monitoring.MonitoringMessage.session_id.in_(session_ids))
if start_time:
conditions.append(persistence_monitoring.MonitoringMessage.timestamp >= start_time)
if end_time:

View File

@@ -15,6 +15,7 @@ from ..command import cmdmgr
from ..plugin import connector as plugin_connector
from ..pipeline import pool
from ..pipeline import controller, pipelinemgr
from ..pipeline import aggregator as message_aggregator
from ..utils import version as version_mgr, proxy as proxy_mgr
from ..persistence import mgr as persistencemgr
from ..api.http.controller import main as http_controller
@@ -38,6 +39,7 @@ from . import entities as core_entities
from ..rag.knowledge import kbmgr as rag_mgr
from ..vector import mgr as vectordb_mgr
from ..telemetry import telemetry as telemetry_module
from ..survey import manager as survey_module
class Application:
@@ -96,6 +98,8 @@ class Application:
query_pool: pool.QueryPool = None
msg_aggregator: message_aggregator.MessageAggregator = None
ctrl: controller.Controller = None
pipeline_mgr: pipelinemgr.PipelineManager = None
@@ -144,6 +148,8 @@ class Application:
telemetry: telemetry_module.TelemetryManager = None
survey: survey_module.SurveyManager = None
monitoring_service: monitoring_service.MonitoringService = None
def __init__(self):

View File

@@ -5,6 +5,7 @@ import asyncio
from .. import stage, app
from ...utils import version, proxy
from ...pipeline import pool, controller, pipelinemgr
from ...pipeline import aggregator as message_aggregator
from ...plugin import connector as plugin_connector
from ...command import cmdmgr
from ...provider.session import sessionmgr as llm_session_mgr
@@ -33,6 +34,7 @@ from ...utils import logcache
from ...vector import mgr as vectordb_mgr
from .. import taskmgr
from ...telemetry import telemetry as telemetry_module
from ...survey import manager as survey_module
@stage.stage_class('BuildAppStage')
@@ -109,6 +111,11 @@ class BuildAppStage(stage.BootingStage):
await telemetry_inst.initialize()
ap.telemetry = telemetry_inst
# Survey manager
survey_inst = survey_module.SurveyManager(ap)
await survey_inst.initialize()
ap.survey = survey_inst
cmd_mgr_inst = cmdmgr.CommandManager(ap)
await cmd_mgr_inst.initialize()
ap.cmd_mgr = cmd_mgr_inst
@@ -137,6 +144,10 @@ class BuildAppStage(stage.BootingStage):
await pipeline_mgr.initialize()
ap.pipeline_mgr = pipeline_mgr
# Initialize message aggregator (after pipeline_mgr, as it needs pipeline config)
msg_aggregator_inst = message_aggregator.MessageAggregator(ap)
ap.msg_aggregator = msg_aggregator_inst
rag_mgr_inst = rag_mgr.RAGManager(ap)
await rag_mgr_inst.initialize()
ap.rag_mgr = rag_mgr_inst

View File

@@ -22,6 +22,7 @@ class MonitoringMessage(Base):
user_id = sqlalchemy.Column(sqlalchemy.String(255), nullable=True)
runner_name = sqlalchemy.Column(sqlalchemy.String(255), nullable=True) # Runner name for this query
variables = sqlalchemy.Column(sqlalchemy.Text, nullable=True) # Query variables as JSON string
role = sqlalchemy.Column(sqlalchemy.String(50), nullable=True, default='user') # user, assistant
class MonitoringLLMCall(Base):

View File

@@ -0,0 +1,24 @@
import sqlalchemy
from .. import migration
@migration.migration_class(19)
class DBMigrateMonitoringMessageRole(migration.DBMigration):
"""Add role column to monitoring_messages table"""
async def upgrade(self):
"""Upgrade"""
try:
sql_text = sqlalchemy.text("ALTER TABLE monitoring_messages ADD COLUMN role VARCHAR(50) DEFAULT 'user'")
await self.ap.persistence_mgr.execute_async(sql_text)
except Exception:
# Column may already exist
pass
async def downgrade(self):
"""Downgrade"""
try:
sql_text = sqlalchemy.text('ALTER TABLE monitoring_messages DROP COLUMN role')
await self.ap.persistence_mgr.execute_async(sql_text)
except Exception:
pass

View File

@@ -0,0 +1,289 @@
"""Message Aggregator Module
This module provides message aggregation/debounce functionality.
When users send multiple messages consecutively, the aggregator will wait
for a configurable delay period and merge them into a single message
before processing.
"""
from __future__ import annotations
import asyncio
import time
import typing
from dataclasses import dataclass, field
import langbot_plugin.api.entities.builtin.platform.message as platform_message
import langbot_plugin.api.entities.builtin.platform.events as platform_events
import langbot_plugin.api.entities.builtin.provider.session as provider_session
import langbot_plugin.api.definition.abstract.platform.adapter as abstract_platform_adapter
if typing.TYPE_CHECKING:
from ..core import app
# Maximum number of messages to buffer before forcing a flush
MAX_BUFFER_MESSAGES = 10
@dataclass
class PendingMessage:
"""A pending message waiting to be aggregated"""
bot_uuid: str
launcher_type: provider_session.LauncherTypes
launcher_id: typing.Union[int, str]
sender_id: typing.Union[int, str]
message_event: platform_events.MessageEvent
message_chain: platform_message.MessageChain
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter
pipeline_uuid: typing.Optional[str]
timestamp: float = field(default_factory=time.time)
@dataclass
class SessionBuffer:
"""Buffer for a single session's pending messages"""
session_id: str
messages: list[PendingMessage] = field(default_factory=list)
timer_task: typing.Optional[asyncio.Task] = None
last_message_time: float = field(default_factory=time.time)
class MessageAggregator:
"""Message aggregator that buffers and merges consecutive messages
This class implements a debounce mechanism for incoming messages.
When a message arrives, it starts a timer. If more messages arrive
before the timer expires, they are buffered. When the timer expires,
all buffered messages are merged and sent to the query pool.
"""
ap: app.Application
buffers: dict[str, SessionBuffer]
"""Session ID -> SessionBuffer mapping"""
lock: asyncio.Lock
"""Lock for thread-safe buffer operations"""
def __init__(self, ap: app.Application):
self.ap = ap
self.buffers = {}
self.lock = asyncio.Lock()
def _get_session_id(
self,
bot_uuid: str,
launcher_type: provider_session.LauncherTypes,
launcher_id: typing.Union[int, str],
) -> str:
"""Generate a unique session ID"""
return f'{bot_uuid}:{launcher_type.value}:{launcher_id}'
async def _get_aggregation_config(self, pipeline_uuid: typing.Optional[str]) -> tuple[bool, float]:
"""Get aggregation configuration for a pipeline
Returns:
tuple: (enabled, delay_seconds)
"""
default_enabled = False
default_delay = 1.5
if pipeline_uuid is None:
return default_enabled, default_delay
# Get pipeline from pipeline manager
pipeline = await self.ap.pipeline_mgr.get_pipeline_by_uuid(pipeline_uuid)
if pipeline is None:
return default_enabled, default_delay
config = pipeline.pipeline_entity.config or {}
trigger_config = config.get('trigger', {})
aggregation_config = trigger_config.get('message-aggregation', {})
enabled = aggregation_config.get('enabled', default_enabled)
delay_raw = aggregation_config.get('delay', default_delay)
try:
delay = float(delay_raw)
except (TypeError, ValueError):
delay = default_delay
# Clamp delay to valid range
delay = max(1.0, min(10.0, delay))
return enabled, delay
async def add_message(
self,
bot_uuid: str,
launcher_type: provider_session.LauncherTypes,
launcher_id: typing.Union[int, str],
sender_id: typing.Union[int, str],
message_event: platform_events.MessageEvent,
message_chain: platform_message.MessageChain,
adapter: abstract_platform_adapter.AbstractMessagePlatformAdapter,
pipeline_uuid: typing.Optional[str] = None,
) -> None:
"""Add a message to the aggregation buffer
If aggregation is disabled for the pipeline, the message is sent
directly to the query pool. Otherwise, it's buffered and will be
merged with other messages from the same session.
"""
enabled, delay = await self._get_aggregation_config(pipeline_uuid)
if not enabled:
# Aggregation disabled, send directly to query pool
await self.ap.query_pool.add_query(
bot_uuid=bot_uuid,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=sender_id,
message_event=message_event,
message_chain=message_chain,
adapter=adapter,
pipeline_uuid=pipeline_uuid,
)
return
session_id = self._get_session_id(bot_uuid, launcher_type, launcher_id)
pending_msg = PendingMessage(
bot_uuid=bot_uuid,
launcher_type=launcher_type,
launcher_id=launcher_id,
sender_id=sender_id,
message_event=message_event,
message_chain=message_chain,
adapter=adapter,
pipeline_uuid=pipeline_uuid,
)
force_flush = False
async with self.lock:
if session_id in self.buffers:
buffer = self.buffers[session_id]
# Cancel existing timer (just cancel, don't await inside lock)
if buffer.timer_task and not buffer.timer_task.done():
buffer.timer_task.cancel()
buffer.messages.append(pending_msg)
else:
buffer = SessionBuffer(
session_id=session_id,
messages=[pending_msg],
)
self.buffers[session_id] = buffer
buffer.last_message_time = time.time()
# Check if buffer reached max capacity
if len(buffer.messages) >= MAX_BUFFER_MESSAGES:
force_flush = True
else:
# Start new timer
buffer.timer_task = asyncio.create_task(self._delayed_flush(session_id, delay))
if force_flush:
await self._flush_buffer(session_id)
async def _delayed_flush(self, session_id: str, delay: float) -> None:
"""Wait for delay then flush the buffer"""
try:
await asyncio.sleep(delay)
await self._flush_buffer(session_id)
except asyncio.CancelledError:
# Timer was cancelled, new message arrived
pass
async def _flush_buffer(self, session_id: str) -> None:
"""Flush the buffer for a session, merging all messages"""
async with self.lock:
buffer = self.buffers.pop(session_id, None)
if buffer is None or not buffer.messages:
return
if len(buffer.messages) == 1:
# Only one message, no need to merge
msg = buffer.messages[0]
await self.ap.query_pool.add_query(
bot_uuid=msg.bot_uuid,
launcher_type=msg.launcher_type,
launcher_id=msg.launcher_id,
sender_id=msg.sender_id,
message_event=msg.message_event,
message_chain=msg.message_chain,
adapter=msg.adapter,
pipeline_uuid=msg.pipeline_uuid,
)
return
# Merge multiple messages
merged_msg = self._merge_messages(buffer.messages)
await self.ap.query_pool.add_query(
bot_uuid=merged_msg.bot_uuid,
launcher_type=merged_msg.launcher_type,
launcher_id=merged_msg.launcher_id,
sender_id=merged_msg.sender_id,
message_event=merged_msg.message_event,
message_chain=merged_msg.message_chain,
adapter=merged_msg.adapter,
pipeline_uuid=merged_msg.pipeline_uuid,
)
def _merge_messages(self, messages: list[PendingMessage]) -> PendingMessage:
"""Merge multiple messages into one
The merged message uses the first message as base and combines
all message chains with newline separators.
The original message_event is kept unmodified to preserve
message metadata (message_id, etc.) for reply/quote.
"""
if len(messages) == 1:
return messages[0]
base_msg = messages[0]
# Build merged message chain
merged_chain = platform_message.MessageChain([])
for i, msg in enumerate(messages):
if i > 0:
# Add newline separator between messages
merged_chain.append(platform_message.Plain(text='\n'))
# Copy all components from this message
for component in msg.message_chain:
merged_chain.append(component)
# Keep message_event unmodified (preserves original message_id and
# metadata for reply/quote), only pass merged chain separately
return PendingMessage(
bot_uuid=base_msg.bot_uuid,
launcher_type=base_msg.launcher_type,
launcher_id=base_msg.launcher_id,
sender_id=base_msg.sender_id,
message_event=base_msg.message_event,
message_chain=merged_chain,
adapter=base_msg.adapter,
pipeline_uuid=base_msg.pipeline_uuid,
)
async def flush_all(self) -> None:
"""Flush all pending buffers immediately
This is useful during shutdown to ensure no messages are lost.
"""
# Snapshot session IDs and cancel all timers under lock
async with self.lock:
session_ids = list(self.buffers.keys())
for sid in session_ids:
buffer = self.buffers.get(sid)
if buffer and buffer.timer_task and not buffer.timer_task.done():
buffer.timer_task.cancel()
# Flush each buffer outside the lock
for session_id in session_ids:
await self._flush_buffer(session_id)

View File

@@ -114,6 +114,60 @@ class MonitoringHelper:
except Exception as e:
ap.logger.error(f'Failed to record query success: {e}')
@staticmethod
async def record_query_response(
ap: app.Application,
query: pipeline_query.Query,
bot_id: str,
bot_name: str,
pipeline_id: str,
pipeline_name: str,
runner_name: str | None = None,
):
"""Record bot response message to monitoring"""
try:
session_id = f'{query.launcher_type}_{query.launcher_id}'
# Extract response content from resp_message_chain
if hasattr(query, 'resp_message_chain') and query.resp_message_chain:
# Serialize the last response message chain
last_resp = query.resp_message_chain[-1]
if hasattr(last_resp, 'model_dump'):
message_content = json.dumps(last_resp.model_dump(), ensure_ascii=False)
else:
message_content = str(last_resp)
elif hasattr(query, 'resp_messages') and query.resp_messages:
last_resp = query.resp_messages[-1]
if hasattr(last_resp, 'get_content_platform_message_chain'):
chain = last_resp.get_content_platform_message_chain()
if hasattr(chain, 'model_dump'):
message_content = json.dumps(chain.model_dump(), ensure_ascii=False)
else:
message_content = str(chain)
else:
message_content = str(last_resp)
else:
return # No response to record
await ap.monitoring_service.record_message(
bot_id=bot_id,
bot_name=bot_name,
pipeline_id=pipeline_id,
pipeline_name=pipeline_name,
message_content=message_content,
session_id=session_id,
status='success',
level='info',
platform=query.launcher_type.value
if hasattr(query.launcher_type, 'value')
else str(query.launcher_type),
user_id=query.sender_id,
runner_name=runner_name,
role='assistant',
)
except Exception as e:
ap.logger.error(f'Failed to record query response: {e}')
@staticmethod
async def record_query_error(
ap: app.Application,

View File

@@ -339,6 +339,20 @@ class RuntimePipeline:
except Exception as e:
self.ap.logger.error(f'Failed to record query success: {e}')
# Record bot response message
try:
await monitoring_helper.MonitoringHelper.record_query_response(
ap=self.ap,
query=query,
bot_id=query.bot_uuid or 'unknown',
bot_name=bot_name,
pipeline_id=self.pipeline_entity.uuid,
pipeline_name=pipeline_name,
runner_name=runner_name,
)
except Exception as e:
self.ap.logger.error(f'Failed to record query response: {e}')
except Exception as e:
inst_name = query.current_stage_name if query.current_stage_name else 'unknown'
self.ap.logger.error(f'Error processing query {query.query_id} stage={inst_name} : {e}')
@@ -369,8 +383,6 @@ class RuntimePipeline:
class PipelineManager:
"""流水线管理器"""
# ====== 4.0 ======
ap: app.Application
pipelines: list[RuntimePipeline]

View File

@@ -200,6 +200,11 @@ class ChatMessageHandler(handler.MessageHandler):
# Send telemetry asynchronously and do not block pipeline via app's telemetry manager
await self.ap.telemetry.start_send_task(payload)
# Trigger survey event on first successful non-WebSocket response
if not locals().get('error_info') and adapter_name and 'WebSocket' not in adapter_name:
if self.ap.survey:
await self.ap.survey.trigger_event('first_bot_response_success')
except Exception as ex:
# Ensure telemetry issues do not affect normal flow
self.ap.logger.warning(f'Failed to send telemetry: {ex}')

View File

@@ -82,7 +82,7 @@ class RuntimeBot:
if custom_launcher_id:
launcher_id = custom_launcher_id
await self.ap.query_pool.add_query(
await self.ap.msg_aggregator.add_message(
bot_uuid=self.bot_entity.uuid,
launcher_type=provider_session.LauncherTypes.PERSON,
launcher_id=launcher_id,
@@ -125,7 +125,7 @@ class RuntimeBot:
if custom_launcher_id:
launcher_id = custom_launcher_id
await self.ap.query_pool.add_query(
await self.ap.msg_aggregator.add_message(
bot_uuid=self.bot_entity.uuid,
launcher_type=provider_session.LauncherTypes.GROUP,
launcher_id=launcher_id,

View File

@@ -375,6 +375,18 @@ class AiocqhttpAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter)
self.bot = aiocqhttp.CQHttp()
async def send_message(self, target_type: str, target_id: str, message: platform_message.MessageChain):
# Check if message contains a Forward component
forward_msg = message.get_first(platform_message.Forward)
if forward_msg:
if target_type == 'group':
# Send as merged forward message via OneBot API
await self._send_forward_message(int(target_id), forward_msg)
return
else:
await self.logger.warning(
f'Forward message is only supported for group targets, got target_type={target_type}. Falling through to normal send.'
)
aiocq_msg = (await AiocqhttpMessageConverter.yiri2target(message))[0]
if target_type == 'group':
@@ -382,6 +394,90 @@ class AiocqhttpAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter)
elif target_type == 'person':
await self.bot.send_private_msg(user_id=int(target_id), message=aiocq_msg)
async def _send_forward_message(self, group_id: int, forward: platform_message.Forward):
"""Send a merged forward message to a group using NapCat extended API."""
messages = []
for node in forward.node_list:
# Build content for each node
content = []
if node.message_chain:
for component in node.message_chain:
if isinstance(component, platform_message.Plain):
if component.text:
content.append({'type': 'text', 'data': {'text': component.text}})
elif isinstance(component, platform_message.Image):
img_data = {}
if component.base64:
b64 = component.base64
if b64.startswith('data:'):
b64 = b64.split(',', 1)[-1] if ',' in b64 else b64
img_data['file'] = f'base64://{b64}'
elif component.url:
img_data['file'] = component.url
elif component.path:
img_data['file'] = str(component.path)
if img_data:
content.append({'type': 'image', 'data': img_data})
if not content:
continue
# Build node data - use user_id and nickname format for NapCat
user_id = str(node.sender_id) if node.sender_id else str(self.bot_account_id or '10000')
node_data = {
'type': 'node',
'data': {
'user_id': user_id,
'nickname': node.sender_name or '未知',
'content': content,
},
}
messages.append(node_data)
if not messages:
return
# Build the full message payload for NapCat's send_forward_msg API
# This matches the format used by GiveMeSetuPlugin
bot_id = str(self.bot_account_id) if self.bot_account_id else '10000'
payload = {
'group_id': group_id,
'user_id': bot_id, # Required by NapCat for display
'messages': messages,
}
# Add display settings if available
if forward.display:
if forward.display.title:
payload['news'] = [{'text': forward.display.title}]
if forward.display.brief:
payload['prompt'] = forward.display.brief
if forward.display.summary:
payload['summary'] = forward.display.summary
if forward.display.source:
payload['source'] = forward.display.source
try:
# Use send_forward_msg (NapCat extended API) instead of send_group_forward_msg
await self.logger.info(
f'Sending forward message to group {group_id} with {len(messages)} nodes, payload keys: {list(payload.keys())}'
)
result = await self.bot.call_action('send_forward_msg', **payload)
await self.logger.info(f'Forward message sent to group {group_id}, result: {result}')
except Exception as e:
await self.logger.error(f'Failed to send forward message to group {group_id}: {e}')
# Fallback: try standard OneBot API with integer group_id
try:
await self.logger.info('Trying fallback API send_group_forward_msg')
await self.bot.call_action('send_group_forward_msg', group_id=group_id, messages=messages)
await self.logger.info(f'Forward message sent via fallback API to group {group_id}')
except Exception as e2:
await self.logger.error(f'Fallback also failed: {e2}')
raise
async def reply_message(
self,
message_source: platform_events.MessageEvent,

View File

@@ -1,7 +1,7 @@
from __future__ import annotations
import lark_oapi
from lark_oapi.api.im.v1 import CreateImageRequest, CreateImageRequestBody
from lark_oapi.api.im.v1 import CreateImageRequest, CreateImageRequestBody, CreateFileRequest, CreateFileRequestBody
import traceback
import typing
import asyncio
@@ -141,6 +141,88 @@ class LarkMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
traceback.print_exc()
return None
@staticmethod
async def upload_file_to_lark(
file_bytes: bytes,
api_client: lark_oapi.Client,
file_type: str,
file_name: str = 'file',
duration: typing.Optional[int] = None,
) -> typing.Optional[str]:
"""Upload a file to Lark and return the file_key, or None if upload fails.
Args:
file_bytes: Raw file bytes.
api_client: Lark API client.
file_type: Lark file type, e.g. 'opus', 'mp4', 'pdf', 'doc', etc.
file_name: Display name for the file.
duration: Duration in milliseconds (for audio files).
"""
try:
with tempfile.NamedTemporaryFile(delete=False) as temp_file:
temp_file.write(file_bytes)
temp_file_path = temp_file.name
try:
body_builder = (
CreateFileRequestBody.builder()
.file_type(file_type)
.file_name(file_name)
.file(open(temp_file_path, 'rb'))
)
if duration is not None:
body_builder = body_builder.duration(duration)
request = CreateFileRequest.builder().request_body(body_builder.build()).build()
response = await api_client.im.v1.file.acreate(request)
if not response.success():
print(
f'client.im.v1.file.create failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}'
)
return None
return response.data.file_key
finally:
os.unlink(temp_file_path)
except Exception as e:
print(f'Failed to upload file to Lark: {e}')
traceback.print_exc()
return None
@staticmethod
async def _get_media_bytes(
msg: typing.Union[platform_message.Voice, platform_message.File],
) -> typing.Optional[bytes]:
"""Get bytes from a Voice or File message (base64, url, or path)."""
data = None
if msg.base64:
try:
base64_str = msg.base64
if ',' in base64_str:
base64_str = base64_str.split(',', 1)[1]
data = base64.b64decode(base64_str)
except Exception:
pass
elif msg.url:
try:
async with aiohttp.ClientSession() as session:
async with session.get(msg.url) as resp:
if resp.status == 200:
data = await resp.read()
except Exception:
pass
elif msg.path:
try:
with open(msg.path, 'rb') as f:
data = f.read()
except Exception:
pass
return data
@staticmethod
async def yiri2target(
message_chain: platform_message.MessageChain, api_client: lark_oapi.Client
@@ -150,10 +232,10 @@ class LarkMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
Returns:
Tuple of (text_elements, image_keys):
- text_elements: List of paragraphs for post message format
- image_keys: List of image_key strings for separate image messages
- media_items: List of dicts with 'msg_type' and 'content' for separate media messages
"""
message_elements = []
image_keys = []
media_items = []
pending_paragraph = []
# Regex pattern to match Markdown image syntax: ![alt](url)
@@ -196,40 +278,77 @@ class LarkMessageConverter(abstract_platform_adapter.AbstractMessageConverter):
# Check for and extract Markdown images from text
cleaned_text, extracted_urls = await process_text_with_images(text)
# Add cleaned text if not empty
# Split by blank lines to create separate paragraphs for Lark post format.
# Lark truncates md elements at the first \n\n, so we must use the
# post format's native paragraph structure instead.
if cleaned_text:
pending_paragraph.append({'tag': 'md', 'text': cleaned_text})
segments = re.split(r'\n\s*\n', cleaned_text)
for i, segment in enumerate(segments):
segment = segment.strip()
if not segment:
continue
if i > 0 and pending_paragraph:
message_elements.append(pending_paragraph)
pending_paragraph = []
pending_paragraph.append({'tag': 'md', 'text': segment})
# Process extracted image URLs
for url in extracted_urls:
# Create a temporary Image message to upload
temp_image = platform_message.Image(url=url)
image_key = await LarkMessageConverter.upload_image_to_lark(temp_image, api_client)
if image_key:
image_keys.append(image_key)
media_items.append({'msg_type': 'image', 'content': {'image_key': image_key}})
elif isinstance(msg, platform_message.At):
pending_paragraph.append({'tag': 'at', 'user_id': msg.target, 'style': []})
elif isinstance(msg, platform_message.AtAll):
pending_paragraph.append({'tag': 'at', 'user_id': 'all', 'style': []})
elif isinstance(msg, platform_message.Image):
# Upload image and get image_key
image_key = await LarkMessageConverter.upload_image_to_lark(msg, api_client)
if image_key:
# Store image_key for separate image message
image_keys.append(image_key)
media_items.append({'msg_type': 'image', 'content': {'image_key': image_key}})
elif isinstance(msg, platform_message.Voice):
data = await LarkMessageConverter._get_media_bytes(msg)
if data:
duration = int(msg.length * 1000) if msg.length else None
file_key = await LarkMessageConverter.upload_file_to_lark(
data, api_client, file_type='opus', file_name='voice.opus', duration=duration
)
if file_key:
media_items.append({'msg_type': 'audio', 'content': {'file_key': file_key}})
elif isinstance(msg, platform_message.File):
data = await LarkMessageConverter._get_media_bytes(msg)
if data:
file_name = msg.name or 'file'
# Guess file_type from extension
ext = os.path.splitext(file_name)[1].lstrip('.').lower() if file_name else ''
file_type_map = {
'opus': 'opus',
'mp4': 'mp4',
'pdf': 'pdf',
'doc': 'doc',
'docx': 'doc',
'xls': 'xls',
'xlsx': 'xls',
'ppt': 'ppt',
'pptx': 'ppt',
}
file_type = file_type_map.get(ext, 'stream')
file_key = await LarkMessageConverter.upload_file_to_lark(
data, api_client, file_type=file_type, file_name=file_name
)
if file_key:
media_items.append({'msg_type': 'file', 'content': {'file_key': file_key}})
elif isinstance(msg, platform_message.Forward):
for node in msg.node_list:
sub_elements, sub_image_keys = await LarkMessageConverter.yiri2target(
node.message_chain, api_client
)
sub_elements, sub_media = await LarkMessageConverter.yiri2target(node.message_chain, api_client)
message_elements.extend(sub_elements)
image_keys.extend(sub_image_keys)
media_items.extend(sub_media)
if pending_paragraph:
message_elements.append(pending_paragraph)
return message_elements, image_keys
return message_elements, media_items
@staticmethod
async def target2yiri(
@@ -917,23 +1036,40 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
):
# 不再需要了因为message_id已经被包含到message_chain中
# lark_event = await self.event_converter.yiri2target(message_source)
text_elements, image_keys = await self.message_converter.yiri2target(message, self.api_client)
text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
# Send text message if there are text elements
if text_elements:
final_content = {
'zh_Hans': {
'title': '',
'content': text_elements,
},
}
# Determine msg_type based on content: use 'post' if at mentions
# are present (requires post paragraph structure), otherwise 'text'
needs_post = any(ele['tag'] == 'at' for paragraph in text_elements for ele in paragraph)
if needs_post:
msg_type = 'post'
final_content = json.dumps(
{
'zh_Hans': {
'title': '',
'content': text_elements,
},
}
)
else:
msg_type = 'text'
parts = []
for paragraph in text_elements:
para_text = ''.join(ele.get('text', '') for ele in paragraph)
if para_text:
parts.append(para_text)
final_content = json.dumps({'text': '\n\n'.join(parts)})
request: ReplyMessageRequest = (
ReplyMessageRequest.builder()
.message_id(message_source.message_chain.message_id)
.request_body(
ReplyMessageRequestBody.builder()
.content(json.dumps(final_content))
.msg_type('post')
.content(final_content)
.msg_type(msg_type)
.reply_in_thread(False)
.uuid(str(uuid.uuid4()))
.build()
@@ -963,17 +1099,15 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
f'client.im.v1.message.reply failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
)
# Send image messages separately using msg_type='image'
for image_key in image_keys:
image_content = json.dumps({'image_key': image_key})
# Send media messages separately (image, audio, file, etc.)
for media in media_items:
request: ReplyMessageRequest = (
ReplyMessageRequest.builder()
.message_id(message_source.message_chain.message_id)
.request_body(
ReplyMessageRequestBody.builder()
.content(image_content)
.msg_type('image')
.content(json.dumps(media['content']))
.msg_type(media['msg_type'])
.reply_in_thread(False)
.uuid(str(uuid.uuid4()))
.build()
@@ -1000,7 +1134,7 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
if not response.success():
raise Exception(
f'client.im.v1.message.reply (image) failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {response.code}, msg: {response.msg}, log_id: {response.get_log_id()}, resp: \n{json.dumps(json.loads(response.raw.content), indent=4, ensure_ascii=False)}'
)
async def reply_message_chunk(
@@ -1018,15 +1152,16 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
message_id = bot_message.resp_message_id
msg_seq = bot_message.msg_sequence
if msg_seq % 8 == 0 or is_final:
text_elements, image_keys = await self.message_converter.yiri2target(message, self.api_client)
text_elements, media_items = await self.message_converter.yiri2target(message, self.api_client)
text_message = ''
if text_elements:
for ele in text_elements[0]:
if ele['tag'] == 'text':
text_message += ele['text']
elif ele['tag'] == 'md':
text_message += ele['text']
parts = []
for paragraph in text_elements:
para_text = ''.join(ele['text'] for ele in paragraph if ele['tag'] in ('text', 'md'))
if para_text:
parts.append(para_text)
text_message = '\n\n'.join(parts)
# content = {
# 'type': 'card_json',
@@ -1076,6 +1211,30 @@ class LarkAdapter(abstract_platform_adapter.AbstractMessagePlatformAdapter):
)
return
# Send media messages when streaming is done
if is_final and media_items:
for media in media_items:
media_request: ReplyMessageRequest = (
ReplyMessageRequest.builder()
.message_id(message_source.message_chain.message_id)
.request_body(
ReplyMessageRequestBody.builder()
.content(json.dumps(media['content']))
.msg_type(media['msg_type'])
.reply_in_thread(False)
.uuid(str(uuid.uuid4()))
.build()
)
.build()
)
media_response: ReplyMessageResponse = await self.api_client.im.v1.message.areply(
media_request, req_opt
)
if not media_response.success():
raise Exception(
f'client.im.v1.message.reply ({media["msg_type"]}) failed, code: {media_response.code}, msg: {media_response.msg}, log_id: {media_response.get_log_id()}'
)
async def is_muted(self, group_id: int) -> bool:
return False

View File

@@ -279,6 +279,7 @@ class RuntimeConnectionHandler(handler.Handler):
target_id = data['target_id']
message_chain = data['message_chain']
# Use custom deserializer that properly handles Forward messages
message_chain_obj = platform_message.MessageChain.model_validate(message_chain)
bot = await self.ap.platform_mgr.get_bot_by_uuid(bot_uuid)

View File

@@ -0,0 +1 @@
"""Survey module for in-product surveys triggered by events."""

View File

@@ -0,0 +1,148 @@
"""Survey manager: tracks events, communicates with Space to fetch/submit surveys."""
from __future__ import annotations
import asyncio
import json
import typing
import httpx
import sqlalchemy
from ..core import app as core_app
from ..entity.persistence.metadata import Metadata
from ..utils import constants
SURVEY_TRIGGERED_KEY = 'survey_triggered_events'
class SurveyManager:
"""Manages survey lifecycle: event tracking, pending survey fetch, submission."""
def __init__(self, ap: core_app.Application):
self.ap = ap
self._triggered_events: set[str] = set()
self._pending_survey: typing.Optional[dict] = None
self._space_url: str = ''
async def initialize(self):
space_config = self.ap.instance_config.data.get('space', {})
self._space_url = space_config.get('url', '').rstrip('/')
await self._load_triggered_events()
async def _load_triggered_events(self):
"""Load previously triggered events from metadata table."""
try:
result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.select(Metadata).where(Metadata.key == SURVEY_TRIGGERED_KEY)
)
row = result.first()
if row:
self._triggered_events = set(json.loads(row[0].value))
except Exception:
self._triggered_events = set()
async def _save_triggered_events(self):
"""Persist triggered events to metadata table."""
try:
value = json.dumps(list(self._triggered_events))
result = await self.ap.persistence_mgr.execute_async(
sqlalchemy.select(Metadata).where(Metadata.key == SURVEY_TRIGGERED_KEY)
)
if result.first():
await self.ap.persistence_mgr.execute_async(
sqlalchemy.update(Metadata).where(Metadata.key == SURVEY_TRIGGERED_KEY).values(value=value)
)
else:
await self.ap.persistence_mgr.execute_async(
sqlalchemy.insert(Metadata).values(key=SURVEY_TRIGGERED_KEY, value=value)
)
except Exception as e:
self.ap.logger.debug(f'Failed to save survey triggered events: {e}')
def _is_space_configured(self) -> bool:
space_config = self.ap.instance_config.data.get('space', {})
if space_config.get('disable_telemetry', False):
return False
return bool(self._space_url)
async def trigger_event(self, event: str):
"""Called when an event occurs. Checks Space for a pending survey."""
if event in self._triggered_events:
return
if not self._is_space_configured():
return
self._triggered_events.add(event)
await self._save_triggered_events()
# Check for pending survey asynchronously
asyncio.create_task(self._fetch_pending_survey(event))
async def _fetch_pending_survey(self, event: str):
"""Fetch pending survey from Space for this event."""
try:
url = f'{self._space_url}/api/v1/survey/pending'
payload = {
'instance_id': constants.instance_id,
'event': event,
}
async with httpx.AsyncClient(timeout=httpx.Timeout(10)) as client:
resp = await client.post(url, json=payload)
if resp.status_code == 200:
data = resp.json()
if data.get('code') == 0 and data.get('data', {}).get('survey'):
self._pending_survey = data['data']['survey']
self.ap.logger.info(f'Survey pending: {self._pending_survey.get("survey_id")}')
except Exception as e:
self.ap.logger.debug(f'Failed to fetch pending survey: {e}')
def get_pending_survey(self) -> typing.Optional[dict]:
"""Return the current pending survey (if any) for the frontend to display."""
return self._pending_survey
def clear_pending_survey(self):
"""Clear the pending survey (after user responds or dismisses)."""
self._pending_survey = None
async def submit_response(self, survey_id: str, answers: dict, completed: bool = True) -> bool:
"""Submit a survey response to Space."""
if not self._is_space_configured():
return False
try:
url = f'{self._space_url}/api/v1/survey/respond'
payload = {
'survey_id': survey_id,
'instance_id': constants.instance_id,
'answers': answers,
'metadata': {
'version': constants.semantic_version,
},
'completed': completed,
}
async with httpx.AsyncClient(timeout=httpx.Timeout(10)) as client:
resp = await client.post(url, json=payload)
if resp.status_code == 200:
self.clear_pending_survey()
return True
except Exception as e:
self.ap.logger.warning(f'Failed to submit survey response: {e}')
return False
async def dismiss_survey(self, survey_id: str) -> bool:
"""Dismiss a survey."""
if not self._is_space_configured():
return False
try:
url = f'{self._space_url}/api/v1/survey/dismiss'
payload = {
'survey_id': survey_id,
'instance_id': constants.instance_id,
}
async with httpx.AsyncClient(timeout=httpx.Timeout(10)) as client:
resp = await client.post(url, json=payload)
if resp.status_code == 200:
self.clear_pending_survey()
return True
except Exception as e:
self.ap.logger.warning(f'Failed to dismiss survey: {e}')
return False

View File

@@ -2,7 +2,7 @@ import langbot
semantic_version = f'v{langbot.__version__}'
required_database_version = 18
required_database_version = 19
"""Tag the version of the database schema, used to check if the database needs to be migrated"""
debug_mode = False

View File

@@ -17,6 +17,10 @@
"prefix": [],
"regexp": []
},
"message-aggregation": {
"enabled": false,
"delay": 1.5
},
"misc": {
"combine-quote-message": true
}

View File

@@ -123,6 +123,34 @@ stages:
type: array[string]
required: true
default: []
- name: message-aggregation
label:
en_US: Message Aggregation
zh_Hans: 消息聚合
description:
en_US: When a user sends multiple messages consecutively, wait for a period and merge them into one before processing
zh_Hans: 当用户连续发送多条消息时,等待一段时间后合并为一条消息再处理(防抖)
config:
- name: enabled
label:
en_US: Enable Message Aggregation
zh_Hans: 启用消息聚合
description:
en_US: If enabled, consecutive messages from the same user will be merged after a delay
zh_Hans: 如果启用,同一用户连续发送的消息将在延迟后合并处理
type: boolean
required: true
default: false
- name: delay
label:
en_US: Aggregation Delay (seconds)
zh_Hans: 聚合延迟(秒)
description:
en_US: 'Wait time before merging messages. Range: 1.0-10.0 seconds.'
zh_Hans: '合并消息前的等待时间。范围1.0-10.0 秒。'
type: float
required: true
default: 1.5
- name: misc
label:
en_US: Misc

10
uv.lock generated
View File

@@ -1799,7 +1799,7 @@ wheels = [
[[package]]
name = "langbot"
version = "4.8.3"
version = "4.8.6"
source = { editable = "." }
dependencies = [
{ name = "aiocqhttp" },
@@ -1902,7 +1902,7 @@ requires-dist = [
{ name = "ebooklib", specifier = ">=0.18" },
{ name = "gewechat-client", specifier = ">=0.1.5" },
{ name = "html2text", specifier = ">=2024.2.26" },
{ name = "langbot-plugin", specifier = "==0.2.5" },
{ name = "langbot-plugin", specifier = "==0.2.7" },
{ name = "langchain", specifier = ">=0.2.0" },
{ name = "langchain-text-splitters", specifier = ">=0.0.1" },
{ name = "lark-oapi", specifier = ">=1.4.15" },
@@ -1958,7 +1958,7 @@ dev = [
[[package]]
name = "langbot-plugin"
version = "0.2.5"
version = "0.2.7"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "aiofiles" },
@@ -1976,9 +1976,9 @@ dependencies = [
{ name = "watchdog" },
{ name = "websockets" },
]
sdist = { url = "https://files.pythonhosted.org/packages/28/0e/117dfc00f36323cce2369be5176d5cd5247ff52edb34791413af9623f290/langbot_plugin-0.2.5.tar.gz", hash = "sha256:a1bf04c1c07b30c72fb9b28e1330372bb4a43ae2db309394435fc088c513cfd5", size = 103910, upload-time = "2026-01-29T13:55:34.328Z" }
sdist = { url = "https://files.pythonhosted.org/packages/9e/a0/babd76596e5de38149da67b8da20e0519cc5f10080de9dc2b16919486f29/langbot_plugin-0.2.7.tar.gz", hash = "sha256:5c8ad1820283901a33356f79a56c84b4744712a463e1c7aecc6e9defe4db4446", size = 162458, upload-time = "2026-02-25T06:00:52.512Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/b1/0e/19b9a427206fa46aafbff03437296e38f425365c9ea6a97cbcfa791da2f8/langbot_plugin-0.2.5-py3-none-any.whl", hash = "sha256:b784248fc1f4754cd143bd9a16a7abd89a5c9735a4aa2b03c1c1e771b7d361e9", size = 133362, upload-time = "2026-01-29T13:55:32.486Z" },
{ url = "https://files.pythonhosted.org/packages/32/2a/6575cf5d5babb7a9400a8aca243e4b8341d83b673e5e9c0394c0393f1c3e/langbot_plugin-0.2.7-py3-none-any.whl", hash = "sha256:17344e61537a5bb97fc77cd83812b5db926f29005e92fefbcbaca5bb47bf55f0", size = 133476, upload-time = "2026-02-25T06:00:50.988Z" },
]
[[package]]

View File

@@ -5,6 +5,7 @@ import {
Dialog,
DialogContent,
DialogHeader,
DialogDescription,
DialogTitle,
DialogFooter,
} from '@/components/ui/dialog';
@@ -21,6 +22,7 @@ import {
import { Button } from '@/components/ui/button';
import BotForm from '@/app/home/bots/components/bot-form/BotForm';
import { BotLogListComponent } from '@/app/home/bots/components/bot-log/view/BotLogListComponent';
import BotSessionMonitor from '@/app/home/bots/components/bot-session/BotSessionMonitor';
import { useTranslation } from 'react-i18next';
import { z } from 'zod';
import { httpClient } from '@/app/infra/http/HttpClient';
@@ -82,6 +84,19 @@ export default function BotDetailDialog({
</svg>
),
},
{
key: 'sessions',
label: t('bots.sessionMonitor.title'),
icon: (
<svg
xmlns="http://www.w3.org/2000/svg"
viewBox="0 0 24 24"
fill="currentColor"
>
<path d="M2 22C2 17.5817 5.58172 14 10 14C14.4183 14 18 17.5817 18 22H16C16 18.6863 13.3137 16 10 16C6.68629 16 4 18.6863 4 22H2ZM10 13C6.685 13 4 10.315 4 7C4 3.685 6.685 1 10 1C13.315 1 16 3.685 16 7C16 10.315 13.315 13 10 13ZM10 11C12.21 11 14 9.21 14 7C14 4.79 12.21 3 10 3C7.79 3 6 4.79 6 7C6 9.21 7.79 11 10 11ZM18.2837 14.7028C21.0644 15.9561 23 18.752 23 22H21C21 19.564 19.5483 17.4671 17.4628 16.5271L18.2837 14.7028ZM17.5962 3.41321C19.5944 4.23703 21 6.20361 21 8.5C21 11.3702 18.8042 13.7252 16 13.9776V11.9646C17.6967 11.7222 19 10.264 19 8.5C19 7.11935 18.2016 5.92603 17.041 5.35635L17.5962 3.41321Z"></path>
</svg>
),
},
];
// eslint-disable-next-line @typescript-eslint/no-explicit-any
@@ -122,6 +137,9 @@ export default function BotDetailDialog({
<main className="flex flex-1 flex-col h-[70vh]">
<DialogHeader className="px-6 pt-6 pb-4 shrink-0">
<DialogTitle>{t('bots.createBot')}</DialogTitle>
<DialogDescription className="sr-only">
{t('bots.createBot')}
</DialogDescription>
</DialogHeader>
<div className="flex-1 overflow-y-auto px-6 pb-6">
<BotForm
@@ -155,7 +173,7 @@ export default function BotDetailDialog({
return (
<>
<Dialog open={open} onOpenChange={onOpenChange}>
<DialogContent className="overflow-hidden p-0 !max-w-[50rem] max-h-[75vh] flex">
<DialogContent className="overflow-hidden p-0 !max-w-[70rem] max-h-[75vh] flex">
<SidebarProvider className="items-start w-full flex">
<Sidebar
collapsible="none"
@@ -189,10 +207,25 @@ export default function BotDetailDialog({
<DialogTitle>
{activeMenu === 'config'
? t('bots.editBot')
: t('bots.botLogTitle')}
: activeMenu === 'logs'
? t('bots.botLogTitle')
: t('bots.sessionMonitor.title')}
</DialogTitle>
<DialogDescription className="sr-only">
{activeMenu === 'config'
? t('bots.editBot')
: activeMenu === 'logs'
? t('bots.botLogTitle')
: t('bots.sessionMonitor.title')}
</DialogDescription>
</DialogHeader>
<div className="flex-1 overflow-y-auto px-6 pb-6">
<div
className={
activeMenu === 'sessions'
? 'flex-1 min-h-0'
: 'flex-1 overflow-y-auto px-6 pb-6'
}
>
{activeMenu === 'config' && (
<BotForm
initBotId={botId}
@@ -204,6 +237,9 @@ export default function BotDetailDialog({
{activeMenu === 'logs' && botId && (
<BotLogListComponent botId={botId} />
)}
{activeMenu === 'sessions' && botId && (
<BotSessionMonitor botId={botId} />
)}
</div>
{activeMenu === 'config' && (
<DialogFooter className="px-6 py-4 border-t shrink-0">
@@ -238,6 +274,9 @@ export default function BotDetailDialog({
<DialogContent>
<DialogHeader>
<DialogTitle>{t('common.confirmDelete')}</DialogTitle>
<DialogDescription className="sr-only">
{t('bots.deleteConfirmation')}
</DialogDescription>
</DialogHeader>
<div className="py-4">{t('bots.deleteConfirmation')}</div>
<DialogFooter>

View File

@@ -0,0 +1,502 @@
'use client';
import React, { useState, useEffect, useRef, useCallback } from 'react';
import { useTranslation } from 'react-i18next';
import { httpClient } from '@/app/infra/http/HttpClient';
import { ScrollArea } from '@/components/ui/scroll-area';
import { Button } from '@/components/ui/button';
import { cn } from '@/lib/utils';
import {
MessageChainComponent,
Plain,
At,
Image,
Quote,
Voice,
} from '@/app/infra/entities/message';
interface SessionInfo {
session_id: string;
bot_id: string;
bot_name: string;
pipeline_id: string;
pipeline_name: string;
message_count: number;
start_time: string;
last_activity: string;
is_active: boolean;
platform?: string | null;
user_id?: string | null;
}
interface SessionMessage {
id: string;
timestamp: string;
bot_id: string;
bot_name: string;
pipeline_id: string;
pipeline_name: string;
message_content: string;
session_id: string;
status: string;
level: string;
platform?: string | null;
user_id?: string | null;
runner_name?: string | null;
variables?: string | null;
role?: string | null;
}
interface BotSessionMonitorProps {
botId: string;
}
export default function BotSessionMonitor({ botId }: BotSessionMonitorProps) {
const { t } = useTranslation();
const [sessions, setSessions] = useState<SessionInfo[]>([]);
const [selectedSessionId, setSelectedSessionId] = useState<string | null>(
null,
);
const [messages, setMessages] = useState<SessionMessage[]>([]);
const [loadingSessions, setLoadingSessions] = useState(false);
const [loadingMessages, setLoadingMessages] = useState(false);
const messagesContainerRef = useRef<HTMLDivElement>(null);
const loadSessions = useCallback(async () => {
setLoadingSessions(true);
try {
const response = await httpClient.getBotSessions(botId);
setSessions(response.sessions ?? []);
} catch (error) {
console.error('Failed to load sessions:', error);
} finally {
setLoadingSessions(false);
}
}, [botId]);
const loadMessages = useCallback(async (sessionId: string) => {
setLoadingMessages(true);
try {
const response = await httpClient.getSessionMessages(sessionId);
const sorted = (response.messages ?? []).sort(
(a, b) =>
new Date(a.timestamp).getTime() - new Date(b.timestamp).getTime(),
);
setMessages(sorted);
} catch (error) {
console.error('Failed to load session messages:', error);
} finally {
setLoadingMessages(false);
}
}, []);
useEffect(() => {
loadSessions();
}, [loadSessions]);
useEffect(() => {
if (selectedSessionId) {
loadMessages(selectedSessionId);
} else {
setMessages([]);
}
}, [selectedSessionId, loadMessages]);
useEffect(() => {
const container = messagesContainerRef.current;
if (container) {
const viewport = container.querySelector(
'[data-radix-scroll-area-viewport]',
);
const scrollTarget = viewport || container;
scrollTarget.scrollTop = scrollTarget.scrollHeight;
}
}, [messages]);
const parseMessageChain = (content: string): MessageChainComponent[] => {
try {
const parsed = JSON.parse(content);
if (Array.isArray(parsed)) {
return parsed as MessageChainComponent[];
}
} catch {
// Not JSON, return as plain text
}
return [{ type: 'Plain', text: content } as Plain];
};
const isUserMessage = (msg: SessionMessage): boolean => {
if (msg.role === 'assistant') return false;
if (msg.role === 'user') return true;
return !msg.runner_name;
};
const renderMessageComponent = (
component: MessageChainComponent,
index: number,
) => {
switch (component.type) {
case 'Plain':
return <span key={index}>{(component as Plain).text}</span>;
case 'At': {
const atComponent = component as At;
const displayName =
atComponent.display || atComponent.target?.toString() || '';
return (
<span
key={index}
className="inline-flex align-middle mx-0.5 px-1.5 py-0.5 bg-blue-200/60 dark:bg-blue-800/60 text-blue-700 dark:text-blue-300 rounded-md text-xs font-medium"
>
@{displayName}
</span>
);
}
case 'AtAll':
return (
<span
key={index}
className="inline-flex align-middle mx-0.5 px-1.5 py-0.5 bg-blue-200/60 dark:bg-blue-800/60 text-blue-700 dark:text-blue-300 rounded-md text-xs font-medium"
>
@All
</span>
);
case 'Image': {
const img = component as Image;
const imageUrl = img.url || (img.base64 ? img.base64 : '');
if (!imageUrl) {
return (
<span
key={index}
className="inline-flex items-center gap-1 text-muted-foreground text-xs"
>
[Image]
</span>
);
}
return (
<div key={index} className="my-1.5">
<img
src={imageUrl}
alt="Image"
className="max-w-full max-h-52 rounded-lg"
/>
</div>
);
}
case 'Voice': {
const voice = component as Voice;
const voiceUrl = voice.url || (voice.base64 ? voice.base64 : '');
if (!voiceUrl) {
return (
<span
key={index}
className="inline-flex items-center gap-1 text-muted-foreground text-xs"
>
🎙 [Voice]
</span>
);
}
return (
<div key={index} className="my-1">
<audio controls src={voiceUrl} className="h-8 max-w-[220px]" />
</div>
);
}
case 'Quote': {
const quote = component as Quote;
return (
<div
key={index}
className="mb-2 pl-2.5 border-l-2 border-gray-300 dark:border-gray-600 opacity-80"
>
<div className="text-sm">
{quote.origin?.map((comp, idx) =>
renderMessageComponent(comp as MessageChainComponent, idx),
)}
</div>
</div>
);
}
case 'Source':
return null;
case 'File': {
const file = component as MessageChainComponent & { name?: string };
return (
<span key={index} className="text-muted-foreground text-xs">
📎 {file.name || 'File'}
</span>
);
}
default:
return (
<span key={index} className="text-muted-foreground text-xs">
[{component.type}]
</span>
);
}
};
const renderMessageContent = (msg: SessionMessage) => {
const chain = parseMessageChain(msg.message_content);
return (
<div className="whitespace-pre-wrap break-words">
{chain.map((component, index) =>
renderMessageComponent(component, index),
)}
</div>
);
};
const formatTime = (timestamp: string): string => {
if (!timestamp) return '';
const date = new Date(timestamp);
const hours = date.getHours().toString().padStart(2, '0');
const minutes = date.getMinutes().toString().padStart(2, '0');
return `${hours}:${minutes}`;
};
const formatRelativeTime = (timestamp: string): string => {
if (!timestamp) return '';
const date = new Date(timestamp);
const now = new Date();
const diffMs = now.getTime() - date.getTime();
const diffMins = Math.floor(diffMs / 60000);
const diffHours = Math.floor(diffMs / 3600000);
const diffDays = Math.floor(diffMs / 86400000);
if (diffMins < 1) return '<1m';
if (diffMins < 60) return `${diffMins}m`;
if (diffHours < 24) return `${diffHours}h`;
return `${diffDays}d`;
};
const selectedSession = sessions.find(
(s) => s.session_id === selectedSessionId,
);
return (
<div className="flex h-full min-h-0">
{/* Left Panel: Session List */}
<div className="w-64 flex-shrink-0 border-r flex flex-col min-h-0">
{/* Refresh Button */}
<div className="px-2 py-2 border-b shrink-0">
<Button
variant="ghost"
className="w-full h-9 text-sm text-muted-foreground"
onClick={loadSessions}
disabled={loadingSessions}
>
<svg
xmlns="http://www.w3.org/2000/svg"
viewBox="0 0 24 24"
fill="none"
stroke="currentColor"
strokeWidth={2}
strokeLinecap="round"
strokeLinejoin="round"
className={cn(
'w-3.5 h-3.5 mr-1.5',
loadingSessions && 'animate-spin',
)}
>
<path d="M21.5 2v6h-6M2.5 22v-6h6M2 11.5a10 10 0 0 1 18.8-4.3M22 12.5a10 10 0 0 1-18.8 4.2" />
</svg>
{t('bots.sessionMonitor.refresh')}
</Button>
</div>
{/* Session List */}
<ScrollArea className="flex-1 min-h-0">
{loadingSessions && sessions.length === 0 ? (
<div className="flex items-center justify-center py-12 text-sm text-muted-foreground">
{t('bots.sessionMonitor.loading')}
</div>
) : sessions.length === 0 ? (
<div className="text-center text-muted-foreground py-12 text-sm">
{t('bots.sessionMonitor.noSessions')}
</div>
) : (
<div className="p-1">
{sessions.map((session) => {
const isSelected = selectedSessionId === session.session_id;
return (
<button
key={session.session_id}
className={cn(
'w-full text-left px-3 py-2.5 rounded-md transition-colors',
isSelected ? 'bg-accent' : 'hover:bg-accent/50',
)}
onClick={() => setSelectedSessionId(session.session_id)}
>
<div className="flex items-center justify-between mb-0.5">
<span className="text-sm font-medium truncate mr-2">
{session.user_id || session.session_id.slice(0, 12)}
</span>
<span className="text-[11px] text-muted-foreground tabular-nums flex-shrink-0">
{formatRelativeTime(session.last_activity)}
</span>
</div>
<div className="flex items-center gap-1.5 text-xs text-muted-foreground">
{session.platform && (
<span className="px-1 py-0.5 rounded bg-muted text-[10px]">
{session.platform}
</span>
)}
{session.is_active && (
<span className="flex items-center gap-0.5 text-green-600 dark:text-green-400">
<span className="w-1.5 h-1.5 rounded-full bg-green-500 inline-block" />
</span>
)}
<span>{session.pipeline_name}</span>
</div>
</button>
);
})}
</div>
)}
</ScrollArea>
</div>
{/* Right Panel: Messages */}
<div className="flex-1 flex flex-col min-h-0 min-w-0">
{!selectedSessionId ? (
<div className="text-center text-muted-foreground py-12 text-lg flex-1 flex items-center justify-center">
{t('bots.sessionMonitor.selectSession')}
</div>
) : (
<>
{/* Chat Header */}
<div className="px-6 py-3 border-b shrink-0 flex items-center justify-between">
<div className="min-w-0">
<div className="text-sm font-medium truncate">
{selectedSession?.user_id || selectedSessionId.slice(0, 20)}
</div>
<div className="flex items-center gap-2 text-xs text-muted-foreground">
{selectedSession?.platform && (
<span>{selectedSession.platform}</span>
)}
{selectedSession?.pipeline_name && (
<>
{selectedSession?.platform && <span>·</span>}
<span>{selectedSession.pipeline_name}</span>
</>
)}
{selectedSession?.is_active && (
<>
<span>·</span>
<span className="flex items-center gap-1 text-green-600 dark:text-green-400">
<span className="w-1.5 h-1.5 rounded-full bg-green-500 inline-block" />
Active
</span>
</>
)}
</div>
</div>
<Button
variant="ghost"
size="icon"
className="w-8 h-8"
onClick={() => loadMessages(selectedSessionId)}
disabled={loadingMessages}
>
<svg
xmlns="http://www.w3.org/2000/svg"
viewBox="0 0 24 24"
fill="none"
stroke="currentColor"
strokeWidth={2}
strokeLinecap="round"
strokeLinejoin="round"
className={cn('w-4 h-4', loadingMessages && 'animate-spin')}
>
<path d="M21.5 2v6h-6M2.5 22v-6h6M2 11.5a10 10 0 0 1 18.8-4.3M22 12.5a10 10 0 0 1-18.8 4.2" />
</svg>
</Button>
</div>
{/* Messages Area — matches DebugDialog style */}
<ScrollArea
ref={messagesContainerRef}
className="flex-1 p-6 overflow-y-auto min-h-0 bg-white dark:bg-black"
>
<div className="space-y-6">
{loadingMessages ? (
<div className="text-center text-muted-foreground py-12 text-lg">
{t('bots.sessionMonitor.loading')}
</div>
) : messages.length === 0 ? (
<div className="text-center text-muted-foreground py-12 text-lg">
{t('bots.sessionMonitor.noMessages')}
</div>
) : (
messages.map((msg) => {
const isUser = isUserMessage(msg);
return (
<div
key={msg.id}
className={cn(
'flex',
isUser ? 'justify-end' : 'justify-start',
)}
>
<div
className={cn(
'max-w-3xl px-5 py-3 rounded-2xl',
isUser
? 'bg-blue-100 dark:bg-blue-900 text-gray-900 dark:text-gray-100 rounded-br-none'
: 'bg-gray-100 dark:bg-gray-800 text-gray-900 dark:text-gray-100 rounded-bl-none',
msg.status === 'error' && 'ring-1 ring-red-400/50',
)}
>
{renderMessageContent(msg)}
{/* Role label + timestamp inside bubble, matching DebugDialog */}
<div
className={cn(
'text-xs mt-2 flex items-center gap-2',
isUser
? 'text-gray-600 dark:text-gray-300'
: 'text-gray-500 dark:text-gray-400',
)}
>
<span>
{isUser
? t('bots.sessionMonitor.userMessage', {
defaultValue: 'User',
})
: t('bots.sessionMonitor.botMessage', {
defaultValue: 'Assistant',
})}
</span>
<span className="tabular-nums">
{formatTime(msg.timestamp)}
</span>
{msg.status === 'error' && (
<span className="text-red-500">error</span>
)}
{msg.runner_name && (
<span className="opacity-70">
{msg.runner_name}
</span>
)}
</div>
</div>
</div>
);
})
)}
</div>
</ScrollArea>
</>
)}
</div>
</div>
);
}

View File

@@ -141,10 +141,27 @@ export default function DynamicFormComponent({
}
}, [initialValues, form, itemConfigList]);
// Stable ref for onSubmit to avoid re-triggering the effect when the
// parent passes a new closure on every render.
const onSubmitRef = useRef(onSubmit);
onSubmitRef.current = onSubmit;
// 监听表单值变化
useEffect(() => {
// Emit initial form values immediately so the parent always has a valid snapshot,
// even if the user saves without modifying any field.
// form.watch(callback) only fires on subsequent changes, not on mount.
const formValues = form.getValues();
const initialFinalValues = itemConfigList.reduce(
(acc, item) => {
acc[item.name] = formValues[item.name] ?? item.default;
return acc;
},
{} as Record<string, object>,
);
onSubmitRef.current?.(initialFinalValues);
const subscription = form.watch(() => {
// 获取完整的表单值,确保包含所有默认值
const formValues = form.getValues();
const finalValues = itemConfigList.reduce(
(acc, item) => {
@@ -153,10 +170,10 @@ export default function DynamicFormComponent({
},
{} as Record<string, object>,
);
onSubmit?.(finalValues);
onSubmitRef.current?.(finalValues);
});
return () => subscription.unsubscribe();
}, [form, onSubmit, itemConfigList]);
}, [form, itemConfigList]);
return (
<Form {...form}>

View File

@@ -0,0 +1,391 @@
'use client';
import React, { useState, useEffect, useCallback } from 'react';
import { httpClient } from '@/app/infra/http/HttpClient';
import type {
SurveyQuestion,
SurveyOption,
} from '@/app/infra/http/BackendClient';
import { X, ChevronRight, ChevronLeft, MessageSquare } from 'lucide-react';
import { Button } from '@/components/ui/button';
import { Checkbox } from '@/components/ui/checkbox';
/**
* Get i18n text from a Record<string, string> based on browser locale.
*/
function getI18nText(obj?: Record<string, string> | null): string {
if (!obj) return '';
const lang = typeof navigator !== 'undefined' ? navigator.language : 'en';
if (lang.startsWith('zh'))
return obj['zh_Hans'] || obj['en_US'] || Object.values(obj)[0] || '';
if (lang.startsWith('ja'))
return obj['ja_JP'] || obj['en_US'] || Object.values(obj)[0] || '';
return obj['en_US'] || Object.values(obj)[0] || '';
}
interface SurveyData {
survey_id: string;
version: number;
title: Record<string, string>;
description: Record<string, string>;
questions: SurveyQuestion[];
}
export default function SurveyWidget() {
const [survey, setSurvey] = useState<SurveyData | null>(null);
const [visible, setVisible] = useState(false);
const [currentStep, setCurrentStep] = useState(0);
const [answers, setAnswers] = useState<Record<string, unknown>>({});
const [otherInputs, setOtherInputs] = useState<Record<string, string>>({});
const [submitted, setSubmitted] = useState(false);
const [collapsed, setCollapsed] = useState(false);
// Poll for pending survey
useEffect(() => {
let timer: NodeJS.Timeout;
let cancelled = false;
const checkSurvey = async () => {
try {
const resp = await httpClient.getSurveyPending();
if (!cancelled && resp?.survey) {
setSurvey(resp.survey);
setVisible(true);
}
} catch {
// Silently ignore
}
};
// Check after 5 seconds, then every 60 seconds
timer = setTimeout(() => {
checkSurvey();
timer = setInterval(checkSurvey, 60000) as unknown as NodeJS.Timeout;
}, 5000);
return () => {
cancelled = true;
clearTimeout(timer);
clearInterval(timer);
};
}, []);
const handleDismiss = useCallback(async () => {
if (survey) {
try {
await httpClient.dismissSurvey(survey.survey_id);
} catch {
/* ignore */
}
}
setVisible(false);
}, [survey]);
const handleSubmit = useCallback(async () => {
if (!survey) return;
// Merge "other" text inputs into answers
const finalAnswers = { ...answers };
for (const [qId, text] of Object.entries(otherInputs)) {
if (text.trim()) {
const current = finalAnswers[qId];
if (Array.isArray(current)) {
// Replace 'other' with the text
finalAnswers[qId] = (current as string[]).map((v) =>
v === 'other' ? `other:${text}` : v,
);
} else if (current === 'other') {
finalAnswers[qId] = `other:${text}`;
}
}
}
try {
await httpClient.submitSurveyResponse(
survey.survey_id,
finalAnswers,
true,
);
setSubmitted(true);
setTimeout(() => setVisible(false), 2000);
} catch {
/* ignore */
}
}, [survey, answers, otherInputs]);
const setAnswer = useCallback((qId: string, value: unknown) => {
setAnswers((prev) => ({ ...prev, [qId]: value }));
}, []);
if (!visible || !survey) return null;
const questions = survey.questions || [];
const totalSteps = questions.length;
const currentQuestion = questions[currentStep];
if (submitted) {
return (
<div className="fixed bottom-6 right-6 z-50 w-80 bg-card border rounded-xl shadow-lg p-6 animate-in slide-in-from-bottom-4">
<div className="text-center">
<div className="text-3xl mb-2">🎉</div>
<p className="text-sm font-medium">
{getI18nText({
zh_Hans: '感谢你的反馈!',
en_US: 'Thanks for your feedback!',
})}
</p>
</div>
</div>
);
}
if (collapsed) {
return (
<button
onClick={() => setCollapsed(false)}
className="fixed bottom-6 right-6 z-50 w-12 h-12 bg-primary text-primary-foreground rounded-full shadow-lg flex items-center justify-center hover:scale-105 transition-transform"
>
<MessageSquare className="w-5 h-5" />
</button>
);
}
return (
<div className="fixed bottom-6 right-6 z-50 w-[340px] bg-card border rounded-xl shadow-lg animate-in slide-in-from-bottom-4">
{/* Header */}
<div className="flex items-center justify-between px-4 py-3 border-b">
<div className="flex items-center gap-2">
<MessageSquare className="w-4 h-4 text-primary" />
<span className="text-sm font-medium">
{getI18nText(survey.title)}
</span>
</div>
<div className="flex items-center gap-1">
<button
onClick={() => setCollapsed(true)}
className="p-1 hover:bg-accent rounded"
>
<ChevronRight className="w-4 h-4 text-muted-foreground" />
</button>
<button
onClick={handleDismiss}
className="p-1 hover:bg-accent rounded"
>
<X className="w-4 h-4 text-muted-foreground" />
</button>
</div>
</div>
{/* Progress */}
<div className="px-4 pt-3">
<div className="flex gap-1">
{questions.map((_, i) => (
<div
key={i}
className={`h-1 flex-1 rounded-full transition-colors ${
i <= currentStep ? 'bg-primary' : 'bg-secondary'
}`}
/>
))}
</div>
<span className="text-xs text-muted-foreground mt-1 block">
{currentStep + 1} / {totalSteps}
</span>
</div>
{/* Question */}
<div className="px-4 py-3">
<p className="text-sm font-medium mb-1">
{getI18nText(currentQuestion?.title)}
</p>
{currentQuestion?.subtitle && (
<p className="text-xs text-muted-foreground mb-3">
{getI18nText(currentQuestion.subtitle)}
</p>
)}
<div className="space-y-2 max-h-[260px] overflow-y-auto">
{currentQuestion?.type === 'single_select' &&
currentQuestion.options && (
<SingleSelectField
options={currentQuestion.options}
value={answers[currentQuestion.id] as string}
onChange={(v) => setAnswer(currentQuestion.id, v)}
otherText={otherInputs[currentQuestion.id] || ''}
onOtherChange={(t) =>
setOtherInputs((prev) => ({
...prev,
[currentQuestion.id]: t,
}))
}
/>
)}
{currentQuestion?.type === 'multi_select' &&
currentQuestion.options && (
<MultiSelectField
options={currentQuestion.options}
value={(answers[currentQuestion.id] as string[]) || []}
onChange={(v) => setAnswer(currentQuestion.id, v)}
otherText={otherInputs[currentQuestion.id] || ''}
onOtherChange={(t) =>
setOtherInputs((prev) => ({
...prev,
[currentQuestion.id]: t,
}))
}
/>
)}
{currentQuestion?.type === 'text' && (
<textarea
className="w-full h-20 text-sm border rounded-lg p-2 bg-background resize-none focus:outline-none focus:ring-1 focus:ring-primary"
placeholder={getI18nText(currentQuestion.placeholder)}
maxLength={currentQuestion.max_length || 500}
value={(answers[currentQuestion.id] as string) || ''}
onChange={(e) => setAnswer(currentQuestion.id, e.target.value)}
/>
)}
</div>
</div>
{/* Footer */}
<div className="flex items-center justify-between px-4 py-3 border-t">
<Button
variant="ghost"
size="sm"
onClick={() => setCurrentStep(Math.max(0, currentStep - 1))}
disabled={currentStep === 0}
>
<ChevronLeft className="w-4 h-4" />
</Button>
<div className="flex gap-2">
{!currentQuestion?.required && currentStep < totalSteps - 1 && (
<Button
variant="ghost"
size="sm"
onClick={() => setCurrentStep(currentStep + 1)}
>
{getI18nText({ zh_Hans: '跳过', en_US: 'Skip' })}
</Button>
)}
{currentStep < totalSteps - 1 ? (
<Button
size="sm"
onClick={() => setCurrentStep(currentStep + 1)}
disabled={
currentQuestion?.required && !answers[currentQuestion?.id]
}
>
{getI18nText({ zh_Hans: '下一题', en_US: 'Next' })}
</Button>
) : (
<Button size="sm" onClick={handleSubmit}>
{getI18nText({ zh_Hans: '提交', en_US: 'Submit' })}
</Button>
)}
</div>
</div>
</div>
);
}
// ---- Sub-components for flat radio/checkbox style ----
function SingleSelectField({
options,
value,
onChange,
otherText,
onOtherChange,
}: {
options: SurveyOption[];
value?: string;
onChange: (v: string) => void;
otherText: string;
onOtherChange: (t: string) => void;
}) {
return (
<div className="space-y-1.5">
{options.map((opt) => (
<div key={opt.id}>
<button
onClick={() => onChange(opt.id)}
className={`w-full text-left text-sm px-3 py-2 rounded-lg border transition-colors ${
value === opt.id
? 'border-primary bg-primary/5 text-primary'
: 'border-border hover:bg-accent'
}`}
>
{getI18nText(opt.label)}
</button>
{opt.has_input && value === opt.id && (
<input
type="text"
className="mt-1 w-full text-sm border rounded-lg px-3 py-1.5 bg-background focus:outline-none focus:ring-1 focus:ring-primary"
placeholder="..."
value={otherText}
onChange={(e) => onOtherChange(e.target.value)}
/>
)}
</div>
))}
</div>
);
}
function MultiSelectField({
options,
value,
onChange,
otherText,
onOtherChange,
}: {
options: SurveyOption[];
value: string[];
onChange: (v: string[]) => void;
otherText: string;
onOtherChange: (t: string) => void;
}) {
const toggle = (id: string) => {
if (value.includes(id)) {
onChange(value.filter((v) => v !== id));
} else {
onChange([...value, id]);
}
};
return (
<div className="space-y-1.5">
{options.map((opt) => {
const selected = value.includes(opt.id);
return (
<div key={opt.id}>
<button
onClick={() => toggle(opt.id)}
className={`w-full text-left text-sm px-3 py-2 rounded-lg border transition-colors flex items-center gap-2 ${
selected
? 'border-primary bg-primary/5 text-primary'
: 'border-border hover:bg-accent'
}`}
>
<Checkbox checked={selected} className="pointer-events-none" />
{getI18nText(opt.label)}
</button>
{opt.has_input && selected && (
<input
type="text"
className="mt-1 w-full text-sm border rounded-lg px-3 py-1.5 bg-background focus:outline-none focus:ring-1 focus:ring-primary"
placeholder="..."
value={otherText}
onChange={(e) => onOtherChange(e.target.value)}
/>
)}
</div>
);
})}
</div>
);
}

View File

@@ -3,6 +3,7 @@
import styles from './layout.module.css';
import HomeSidebar from '@/app/home/components/home-sidebar/HomeSidebar';
import HomeTitleBar from '@/app/home/components/home-titlebar/HomeTitleBar';
import SurveyWidget from '@/app/home/components/survey/SurveyWidget';
import React, {
useState,
useCallback,
@@ -54,6 +55,8 @@ export default function HomeLayout({
<HomeTitleBar title={title} subtitle={subtitle} helpLink={helpLink} />
<main className={styles.mainContent}>{mainContent}</main>
<SurveyWidget />
</div>
</div>
);

View File

@@ -297,15 +297,6 @@ function MarketPageContent({
const handleInstallPlugin = useCallback(
async (author: string, pluginName: string) => {
try {
// Find the full plugin object from the list
const pluginVO = plugins.find(
(p) => p.author === author && p.pluginName === pluginName,
);
if (!pluginVO) {
console.error('Plugin not found:', author, pluginName);
return;
}
// Fetch full plugin details to get PluginV4 object
const response = await getCloudServiceClientSync().getPluginDetail(
author,

View File

@@ -339,6 +339,64 @@ export class BackendClient extends BaseHttpClient {
return this.post(`/api/v1/platform/bots/${botId}/logs`, request);
}
public getBotSessions(
botId: string,
limit: number = 100,
offset: number = 0,
): Promise<{
sessions: Array<{
session_id: string;
bot_id: string;
bot_name: string;
pipeline_id: string;
pipeline_name: string;
message_count: number;
start_time: string;
last_activity: string;
is_active: boolean;
platform: string | null;
user_id: string | null;
}>;
total: number;
}> {
const queryParams = new URLSearchParams();
queryParams.append('botId', botId);
queryParams.append('limit', limit.toString());
queryParams.append('offset', offset.toString());
return this.get(`/api/v1/monitoring/sessions?${queryParams.toString()}`);
}
public getSessionMessages(
sessionId: string,
limit: number = 200,
offset: number = 0,
): Promise<{
messages: Array<{
id: string;
timestamp: string;
bot_id: string;
bot_name: string;
pipeline_id: string;
pipeline_name: string;
message_content: string;
session_id: string;
status: string;
level: string;
platform: string | null;
user_id: string | null;
runner_name: string | null;
variables: string | null;
role: string | null;
}>;
total: number;
}> {
const queryParams = new URLSearchParams();
queryParams.append('sessionId', sessionId);
queryParams.append('limit', limit.toString());
queryParams.append('offset', offset.toString());
return this.get(`/api/v1/monitoring/messages?${queryParams.toString()}`);
}
// ============ File management API ============
public uploadDocumentFile(file: File): Promise<{ file_id: string }> {
const formData = new FormData();
@@ -949,4 +1007,50 @@ export class BackendClient extends BaseHttpClient {
return this.get(`/api/v1/monitoring/overview?${queryParams.toString()}`);
}
// ============ Survey API ============
public getSurveyPending(): Promise<{
survey: {
survey_id: string;
version: number;
title: Record<string, string>;
description: Record<string, string>;
questions: SurveyQuestion[];
} | null;
}> {
return this.get('/api/v1/survey/pending');
}
public submitSurveyResponse(
surveyId: string,
answers: Record<string, unknown>,
completed: boolean = true,
): Promise<object> {
return this.post('/api/v1/survey/respond', {
survey_id: surveyId,
answers,
completed,
});
}
public dismissSurvey(surveyId: string): Promise<object> {
return this.post('/api/v1/survey/dismiss', { survey_id: surveyId });
}
}
export interface SurveyQuestion {
id: string;
type: 'single_select' | 'multi_select' | 'text';
title: Record<string, string>;
subtitle?: Record<string, string>;
required: boolean;
options?: SurveyOption[];
placeholder?: Record<string, string>;
max_length?: number;
}
export interface SurveyOption {
id: string;
label: Record<string, string>;
has_input?: boolean;
}

View File

@@ -280,6 +280,25 @@ const enUS = {
viewDetails: 'Details',
collapse: 'Collapse',
imagesAttached: 'image(s) attached',
sessionMonitor: {
title: 'Sessions',
sessions: 'Sessions',
noSessions: 'No sessions found',
selectSession: 'Select a session to view messages',
noMessages: 'No messages in this session',
messages: 'messages',
messageCount: '{{count}} messages',
loading: 'Loading...',
loadingSessions: 'Loading sessions...',
loadingMessages: 'Loading messages...',
user: 'User',
variables: 'Variables',
platform: 'Platform',
lastActive: 'Last active',
refresh: 'Refresh',
active: 'Active',
inactive: 'Inactive',
},
},
plugins: {
title: 'Extensions',

View File

@@ -281,6 +281,25 @@ const jaJP = {
allLevels: 'すべてのレベル',
selectLevel: 'レベルを選択',
levelsSelected: 'レベル選択済み',
sessionMonitor: {
title: 'セッション監視',
sessions: 'セッション一覧',
noSessions: 'セッションが見つかりません',
selectSession: 'セッションを選択してメッセージを表示',
noMessages: 'このセッションにはメッセージがありません',
messages: '件のメッセージ',
messageCount: '{{count}} 件のメッセージ',
loading: '読み込み中...',
loadingSessions: 'セッションを読み込み中...',
loadingMessages: 'メッセージを読み込み中...',
user: 'ユーザー',
variables: '変数',
platform: 'プラットフォーム',
lastActive: '最終アクティブ',
refresh: '更新',
active: 'アクティブ',
inactive: '非アクティブ',
},
},
plugins: {
title: '拡張機能',

View File

@@ -269,6 +269,25 @@ const zhHans = {
viewDetails: '详情',
collapse: '收起',
imagesAttached: '张图片',
sessionMonitor: {
title: '会话监控',
sessions: '会话列表',
noSessions: '暂无会话',
selectSession: '选择一个会话查看消息',
noMessages: '该会话暂无消息',
messages: '条消息',
messageCount: '{{count}} 条消息',
loading: '加载中...',
loadingSessions: '加载会话中...',
loadingMessages: '加载消息中...',
user: '用户',
variables: '变量',
platform: '平台',
lastActive: '最近活跃',
refresh: '刷新',
active: '活跃',
inactive: '不活跃',
},
},
plugins: {
title: '插件扩展',

View File

@@ -264,6 +264,25 @@ const zhHant = {
allLevels: '全部級別',
selectLevel: '選擇級別',
levelsSelected: '個級別已選',
sessionMonitor: {
title: '會話監控',
sessions: '會話列表',
noSessions: '暫無會話',
selectSession: '選擇一個會話查看訊息',
noMessages: '該會話暫無訊息',
messages: '條訊息',
messageCount: '{{count}} 條訊息',
loading: '載入中...',
loadingSessions: '載入會話中...',
loadingMessages: '載入訊息中...',
user: '用戶',
variables: '變數',
platform: '平台',
lastActive: '最近活躍',
refresh: '重新整理',
active: '活躍',
inactive: '不活躍',
},
},
plugins: {
title: '外掛擴展',