From 9148e02679884461b1ba8829b2ca3676180dc746 Mon Sep 17 00:00:00 2001 From: doujianghub <131904691+doujianghub@users.noreply.github.com> Date: Mon, 9 Mar 2026 14:30:07 +0800 Subject: [PATCH] fix: centralized pipeline config type coercion to prevent string-type crashes (#2031) * fix: coerce pipeline config types at load time using metadata definitions Pipeline configs stored in SQLAlchemy JSON columns can have values turned into strings after UI edits (e.g. "120" instead of 120), causing runtime arithmetic/logic errors. Add centralized type coercion in load_pipeline() that leverages existing metadata YAML type definitions (integer, number, float, boolean) to convert values before they reach downstream stages. Co-Authored-By: Claude Opus 4.6 * fix: address review - defensive getattr + add unit tests for config_coercion - Use getattr with defaults for pipeline_config_meta_* attributes to avoid AttributeError when MockApplication lacks these fields - Add 18 unit tests for config_coercion module covering all code paths Co-Authored-By: Claude Opus 4.6 * feat: add dynamic form stage tracking and snapshot management * fix: standardize string formatting in config coercion and improve logging messages --------- Co-authored-by: KPC Co-authored-by: Claude Opus 4.6 Co-authored-by: Junyan Qin --- src/langbot/pkg/pipeline/config_coercion.py | 105 ++++++++++++++++ src/langbot/pkg/pipeline/pipelinemgr.py | 9 ++ .../pipeline/test_config_coercion.py | 113 ++++++++++++++++++ .../pipeline-form/PipelineFormComponent.tsx | 54 +++++---- 4 files changed, 260 insertions(+), 21 deletions(-) create mode 100644 src/langbot/pkg/pipeline/config_coercion.py create mode 100644 tests/unit_tests/pipeline/test_config_coercion.py diff --git a/src/langbot/pkg/pipeline/config_coercion.py b/src/langbot/pkg/pipeline/config_coercion.py new file mode 100644 index 00000000..649f9051 --- /dev/null +++ b/src/langbot/pkg/pipeline/config_coercion.py @@ -0,0 +1,105 @@ +from __future__ import annotations + +import logging + +logger = logging.getLogger(__name__) + +# metadata type -> coercion function +_COERCE_MAP = { + 'integer': lambda v: int(v), + 'number': lambda v: float(v), + 'float': lambda v: float(v), +} + + +def _coerce_bool(v): + if isinstance(v, bool): + return v + if isinstance(v, str): + if v.lower() == 'true': + return True + if v.lower() == 'false': + return False + raise ValueError(f'Cannot convert string {v!r} to bool') + return bool(v) + + +def _coerce_value(value, expected_type: str): + """Convert a single value to the expected type. + + Returns the converted value, or the original value if no conversion needed. + """ + if value is None: + return value + + if expected_type == 'boolean': + if isinstance(value, bool): + return value + return _coerce_bool(value) + + coerce_fn = _COERCE_MAP.get(expected_type) + if coerce_fn is None: + return value + + # Already the correct type + if expected_type == 'integer' and isinstance(value, int) and not isinstance(value, bool): + return value + if expected_type in ('number', 'float') and isinstance(value, (int, float)) and not isinstance(value, bool): + return float(value) + + return coerce_fn(value) + + +def coerce_pipeline_config( + config: dict, + *metadata_list: dict, +) -> None: + """Coerce pipeline config values according to metadata type definitions. + + Walks each metadata dict (trigger, safety, ai, output) and converts + config values in-place so that strings coming from the JSON column are + cast to their declared types (integer, number/float, boolean). + + Args: + config: The pipeline config dict to modify in-place. + *metadata_list: Metadata dicts loaded from the YAML templates. + """ + for meta in metadata_list: + section_name = meta.get('name') + if not section_name or section_name not in config: + continue + + section = config[section_name] + if not isinstance(section, dict): + continue + + for stage_def in meta.get('stages', []): + stage_name = stage_def.get('name') + if not stage_name or stage_name not in section: + continue + + stage_config = section[stage_name] + if not isinstance(stage_config, dict): + continue + + for field_def in stage_def.get('config', []): + field_name = field_def.get('name') + field_type = field_def.get('type') + if not field_name or not field_type or field_name not in stage_config: + continue + + old_value = stage_config[field_name] + try: + new_value = _coerce_value(old_value, field_type) + if new_value is not old_value: + stage_config[field_name] = new_value + except (ValueError, TypeError) as e: + logger.warning( + 'Failed to coerce config %s.%s.%s (%r) to %s: %s', + section_name, + stage_name, + field_name, + old_value, + field_type, + e, + ) diff --git a/src/langbot/pkg/pipeline/pipelinemgr.py b/src/langbot/pkg/pipeline/pipelinemgr.py index d56f626c..5d0012d1 100644 --- a/src/langbot/pkg/pipeline/pipelinemgr.py +++ b/src/langbot/pkg/pipeline/pipelinemgr.py @@ -13,6 +13,7 @@ import langbot_plugin.api.entities.builtin.platform.message as platform_message import langbot_plugin.api.entities.builtin.platform.events as platform_events import langbot_plugin.api.entities.events as events from ..utils import importutil +from .config_coercion import coerce_pipeline_config import langbot_plugin.api.entities.builtin.provider.session as provider_session import langbot_plugin.api.entities.builtin.pipeline.query as pipeline_query @@ -420,6 +421,14 @@ class PipelineManager: elif isinstance(pipeline_entity, dict): pipeline_entity = persistence_pipeline.LegacyPipeline(**pipeline_entity) + coerce_pipeline_config( + pipeline_entity.config, + getattr(self.ap, 'pipeline_config_meta_trigger', {'name': 'trigger', 'stages': []}), + getattr(self.ap, 'pipeline_config_meta_safety', {'name': 'safety', 'stages': []}), + getattr(self.ap, 'pipeline_config_meta_ai', {'name': 'ai', 'stages': []}), + getattr(self.ap, 'pipeline_config_meta_output', {'name': 'output', 'stages': []}), + ) + # initialize stage containers according to pipeline_entity.stages stage_containers: list[StageInstContainer] = [] for stage_name in pipeline_entity.stages: diff --git a/tests/unit_tests/pipeline/test_config_coercion.py b/tests/unit_tests/pipeline/test_config_coercion.py new file mode 100644 index 00000000..a23f54de --- /dev/null +++ b/tests/unit_tests/pipeline/test_config_coercion.py @@ -0,0 +1,113 @@ +"""Unit tests for config_coercion module""" + +from __future__ import annotations + +import pytest + +from langbot.pkg.pipeline.config_coercion import _coerce_value, coerce_pipeline_config + + +class TestCoerceValue: + """Tests for _coerce_value function""" + + def test_none_passthrough(self): + assert _coerce_value(None, 'integer') is None + assert _coerce_value(None, 'boolean') is None + + def test_string_to_integer(self): + assert _coerce_value('120', 'integer') == 120 + assert _coerce_value('0', 'integer') == 0 + assert _coerce_value('-5', 'integer') == -5 + + def test_integer_passthrough(self): + assert _coerce_value(42, 'integer') == 42 + + def test_string_to_float(self): + assert _coerce_value('3.14', 'number') == 3.14 + assert _coerce_value('3.14', 'float') == 3.14 + + def test_int_to_float(self): + assert _coerce_value(3, 'number') == 3.0 + assert isinstance(_coerce_value(3, 'number'), float) + + def test_float_passthrough(self): + assert _coerce_value(3.14, 'float') == 3.14 + + def test_string_to_bool(self): + assert _coerce_value('true', 'boolean') is True + assert _coerce_value('True', 'boolean') is True + assert _coerce_value('false', 'boolean') is False + assert _coerce_value('False', 'boolean') is False + + def test_bool_passthrough(self): + assert _coerce_value(True, 'boolean') is True + assert _coerce_value(False, 'boolean') is False + + def test_invalid_bool_string_raises(self): + with pytest.raises(ValueError): + _coerce_value('notabool', 'boolean') + + def test_unknown_type_passthrough(self): + assert _coerce_value('hello', 'string') == 'hello' + assert _coerce_value('hello', 'unknown') == 'hello' + + def test_invalid_integer_raises(self): + with pytest.raises(ValueError): + _coerce_value('abc', 'integer') + + +class TestCoercePipelineConfig: + """Tests for coerce_pipeline_config function""" + + def _make_meta(self, section_name: str, stage_name: str, fields: list[dict]) -> dict: + return { + 'name': section_name, + 'stages': [{'name': stage_name, 'config': fields}], + } + + def test_coerce_integer_in_config(self): + config = {'trigger': {'misc': {'timeout': '120'}}} + meta = self._make_meta('trigger', 'misc', [{'name': 'timeout', 'type': 'integer'}]) + coerce_pipeline_config(config, meta) + assert config['trigger']['misc']['timeout'] == 120 + + def test_coerce_boolean_in_config(self): + config = {'output': {'misc': {'at-sender': 'true'}}} + meta = self._make_meta('output', 'misc', [{'name': 'at-sender', 'type': 'boolean'}]) + coerce_pipeline_config(config, meta) + assert config['output']['misc']['at-sender'] is True + + def test_missing_section_skipped(self): + config = {'ai': {}} + meta = self._make_meta('trigger', 'misc', [{'name': 'x', 'type': 'integer'}]) + coerce_pipeline_config(config, meta) # should not raise + + def test_missing_field_skipped(self): + config = {'trigger': {'misc': {}}} + meta = self._make_meta('trigger', 'misc', [{'name': 'nonexistent', 'type': 'integer'}]) + coerce_pipeline_config(config, meta) # should not raise + + def test_invalid_value_logs_warning(self, caplog): + config = {'trigger': {'misc': {'timeout': 'abc'}}} + meta = self._make_meta('trigger', 'misc', [{'name': 'timeout', 'type': 'integer'}]) + import logging + + with caplog.at_level(logging.WARNING): + coerce_pipeline_config(config, meta) + assert config['trigger']['misc']['timeout'] == 'abc' # unchanged + assert 'Failed to coerce' in caplog.text + + def test_empty_metadata(self): + config = {'trigger': {'misc': {'timeout': '120'}}} + coerce_pipeline_config(config) # no metadata args, should not raise + + def test_multiple_metadata(self): + config = { + 'trigger': {'misc': {'timeout': '120'}}, + 'output': {'misc': {'at-sender': 'false'}}, + } + meta_trigger = self._make_meta('trigger', 'misc', [{'name': 'timeout', 'type': 'integer'}]) + meta_output = self._make_meta('output', 'misc', [{'name': 'at-sender', 'type': 'boolean'}]) + coerce_pipeline_config(config, meta_trigger, meta_output) + assert config['trigger']['misc']['timeout'] == 120 + assert config['output']['misc']['at-sender'] is False diff --git a/web/src/app/home/pipelines/components/pipeline-form/PipelineFormComponent.tsx b/web/src/app/home/pipelines/components/pipeline-form/PipelineFormComponent.tsx index 517d0f76..392430d3 100644 --- a/web/src/app/home/pipelines/components/pipeline-form/PipelineFormComponent.tsx +++ b/web/src/app/home/pipelines/components/pipeline-form/PipelineFormComponent.tsx @@ -120,6 +120,8 @@ export default function PipelineFormComponent({ // Track unsaved changes by comparing current form values against a saved snapshot const savedSnapshotRef = useRef(''); + // Track which dynamic form stages have completed their initial mount emission. + const initializedStagesRef = useRef>(new Set()); const watchedValues = form.watch(); const hasUnsavedChanges = useMemo(() => { if (!isEditMode || !savedSnapshotRef.current) return false; @@ -160,6 +162,7 @@ export default function PipelineFormComponent({ }; form.reset(loadedValues); savedSnapshotRef.current = JSON.stringify(loadedValues); + initializedStagesRef.current.clear(); }); } }, []); @@ -235,6 +238,33 @@ export default function PipelineFormComponent({ }); } + // Called from DynamicFormComponent/N8nAuthFormComponent onSubmit callbacks. + // On the first emission for a stage (mount-time default filling), the + // snapshot is synchronously re-captured so that hasUnsavedChanges stays false. + function handleDynamicFormEmit( + formName: keyof FormValues, + stageName: string, + values: object, + ) { + const stageKey = `${String(formName)}.${stageName}`; + const isFirstEmission = !initializedStagesRef.current.has(stageKey); + + const currentValues = + // eslint-disable-next-line @typescript-eslint/no-explicit-any + (form.getValues(formName) as Record) || {}; + form.setValue(formName, { + ...currentValues, + [stageName]: values, + }); + + if (isFirstEmission) { + initializedStagesRef.current.add(stageKey); + // Synchronously re-capture snapshot so that the useMemo comparison + // in the same render cycle still returns false. + savedSnapshotRef.current = JSON.stringify(form.getValues()); + } + } + function renderDynamicForms( stage: PipelineConfigStage, formName: keyof FormValues, @@ -264,13 +294,7 @@ export default function PipelineFormComponent({ {} } onSubmit={(values) => { - const currentValues = - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (form.getValues(formName) as Record) || {}; - form.setValue(formName, { - ...currentValues, - [stage.name]: values, - }); + handleDynamicFormEmit(formName, stage.name, values); }} /> @@ -302,13 +326,7 @@ export default function PipelineFormComponent({ {} } onSubmit={(values) => { - const currentValues = - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (form.getValues(formName) as Record) || {}; - form.setValue(formName, { - ...currentValues, - [stage.name]: values, - }); + handleDynamicFormEmit(formName, stage.name, values); }} /> @@ -333,13 +351,7 @@ export default function PipelineFormComponent({ (form.watch(formName) as Record)?.[stage.name] || {} } onSubmit={(values) => { - const currentValues = - // eslint-disable-next-line @typescript-eslint/no-explicit-any - (form.getValues(formName) as Record) || {}; - form.setValue(formName, { - ...currentValues, - [stage.name]: values, - }); + handleDynamicFormEmit(formName, stage.name, values); }} />