From 9b0f5b36f3ede6d4d11ff6007aa0975675f1db34 Mon Sep 17 00:00:00 2001 From: huanghuoguoguo <60681390+huanghuoguoguo@users.noreply.github.com> Date: Thu, 25 Jun 2026 13:34:30 +0800 Subject: [PATCH] test(skills): add debug chat timing and isolation probes --- skills/docs/user-guide.md | 4 + .../ensure-fake-provider-cross-pipelines.mjs | 203 +++++ skills/scripts/e2e/fake-openai-provider.mjs | 98 +- skills/skills.index.json | 40 + ...r-debug-chat-cross-pipeline-isolation.yaml | 81 ++ .../probes/langbot-debug-chat-concurrency.mjs | 77 +- ...ot-debug-chat-cross-pipeline-isolation.mjs | 861 ++++++++++++++++++ .../probes/lib/fake-provider-timing.mjs | 134 +++ .../performance-reliability-testing.md | 24 +- .../suites/langbot-debug-chat-load-gate.yaml | 1 + 10 files changed, 1501 insertions(+), 22 deletions(-) create mode 100644 skills/scripts/e2e/ensure-fake-provider-cross-pipelines.mjs create mode 100644 skills/skills/langbot-testing/cases/langbot-fake-provider-debug-chat-cross-pipeline-isolation.yaml create mode 100644 skills/skills/langbot-testing/probes/langbot-debug-chat-cross-pipeline-isolation.mjs create mode 100644 skills/skills/langbot-testing/probes/lib/fake-provider-timing.mjs diff --git a/skills/docs/user-guide.md b/skills/docs/user-guide.md index 007d28d94..ae39ef294 100644 --- a/skills/docs/user-guide.md +++ b/skills/docs/user-guide.md @@ -123,6 +123,7 @@ Controlled Debug Chat message-path load gate: bin/lbs suite plan langbot-debug-chat-load-gate bin/lbs test run langbot-fake-provider-debug-chat-load --run-id langbot-fake-load-local bin/lbs test run langbot-fake-provider-debug-chat-slow-load --run-id langbot-fake-slow-local +bin/lbs test run langbot-fake-provider-debug-chat-cross-pipeline-isolation --run-id langbot-fake-cross-pipeline-local bin/lbs test run langbot-fake-provider-debug-chat-fault-recovery --run-id langbot-fake-fault-local bin/lbs test run langbot-space-debug-chat-concurrency-smoke --run-id langbot-space-smoke-local ``` @@ -132,6 +133,9 @@ OpenAI-compatible fake provider, creates the matching provider/model/pipeline, then sends concurrent WebSocket Debug Chat messages through the real backend. Use `langbot-fake-provider-debug-chat-slow-load` to measure the same path under deterministic streaming latency. Use +`langbot-fake-provider-debug-chat-cross-pipeline-isolation` to verify that +concurrent Debug Chat traffic on two pipelines does not leak assistant +responses across pipeline boundaries. Use `langbot-fake-provider-debug-chat-fault-recovery` to inject bounded provider HTTP failures and confirm later Debug Chat requests recover. Use `langbot-space-debug-chat-concurrency-smoke` only as a low-volume live diff --git a/skills/scripts/e2e/ensure-fake-provider-cross-pipelines.mjs b/skills/scripts/e2e/ensure-fake-provider-cross-pipelines.mjs new file mode 100644 index 000000000..9d2ff295b --- /dev/null +++ b/skills/scripts/e2e/ensure-fake-provider-cross-pipelines.mjs @@ -0,0 +1,203 @@ +#!/usr/bin/env node + +import { spawn } from "node:child_process"; +import { mkdir, readFile, writeFile } from "node:fs/promises"; +import { dirname, resolve } from "node:path"; +import { env } from "node:process"; +import { + appendLine, + ensureEvidence, + evidencePaths, + loadEnvFiles, + redact, + writeResult, +} from "./lib/langbot-e2e.mjs"; + +const caseId = "ensure-fake-provider-cross-pipelines"; +const DEFAULT_PIPELINE_A_NAME = "Agent QA Fake Provider Debug Chat A"; +const DEFAULT_PIPELINE_B_NAME = "Agent QA Fake Provider Debug Chat B"; + +await loadEnvFiles(); +const paths = evidencePaths(caseId); +await ensureEvidence(paths); + +const writeEnv = process.argv.includes("--write-env"); +const envLocalPath = resolve("skills/.env.local"); +const pipelineAName = env.LANGBOT_FAKE_PROVIDER_PIPELINE_A_NAME || DEFAULT_PIPELINE_A_NAME; +const pipelineBName = env.LANGBOT_FAKE_PROVIDER_PIPELINE_B_NAME || DEFAULT_PIPELINE_B_NAME; + +const result = { + source: "setup_automation", + case_id: caseId, + run_id: paths.runId, + status: "fail", + reason: "", + pipeline_a: { + name: pipelineAName, + id: "", + url: "", + }, + pipeline_b: { + name: pipelineBName, + id: "", + url: "", + }, + fake_provider: { + url: "", + base_url: "", + pid: null, + }, + wrote_env: false, + evidence: { + console_log: paths.consoleLog, + automation_result_json: paths.automationResultJson, + result_json: paths.resultJson, + }, + evidence_collected: ["api_diagnostic", "filesystem"], +}; + +try { + if (pipelineAName === pipelineBName) { + throw new Error("LANGBOT_FAKE_PROVIDER_PIPELINE_A_NAME and LANGBOT_FAKE_PROVIDER_PIPELINE_B_NAME must be different."); + } + + const setupA = await runPipelineSetup(pipelineAName, "A"); + const setupB = await runPipelineSetup(pipelineBName, "B"); + result.pipeline_a = { + name: setupA.pipeline_name || pipelineAName, + id: setupA.pipeline_id || "", + url: setupA.pipeline_url || "", + }; + result.pipeline_b = { + name: setupB.pipeline_name || pipelineBName, + id: setupB.pipeline_id || "", + url: setupB.pipeline_url || "", + }; + result.fake_provider = { + url: setupB.fake_provider?.url || setupA.fake_provider?.url || "", + base_url: setupB.fake_provider?.base_url || setupA.fake_provider?.base_url || "", + pid: setupB.fake_provider?.pid ?? setupA.fake_provider?.pid ?? null, + }; + + if (!result.pipeline_a.url || !result.pipeline_b.url || !result.fake_provider.url) { + throw new Error("Cross-pipeline fake provider setup did not return both pipeline URLs and provider URL."); + } + + if (writeEnv) { + await upsertEnvLocal(envLocalPath, { + LANGBOT_FAKE_PROVIDER_URL: result.fake_provider.url, + LANGBOT_FAKE_PROVIDER_BASE_URL: result.fake_provider.base_url, + LANGBOT_FAKE_PROVIDER_PID: result.fake_provider.pid ? String(result.fake_provider.pid) : "", + LANGBOT_FAKE_PROVIDER_PIPELINE_A_URL: result.pipeline_a.url, + LANGBOT_FAKE_PROVIDER_PIPELINE_A_NAME: result.pipeline_a.name, + LANGBOT_FAKE_PROVIDER_PIPELINE_B_URL: result.pipeline_b.url, + LANGBOT_FAKE_PROVIDER_PIPELINE_B_NAME: result.pipeline_b.name, + }); + result.wrote_env = true; + } + + result.status = "pass"; + result.reason = "Fake provider cross-pipeline fixtures are configured."; +} catch (error) { + result.status = looksLikeEnvIssue(error) ? "env_issue" : "fail"; + result.reason = safeReason(error.message); +} finally { + await writeResult(paths, result); + console.log(JSON.stringify(result, null, 2)); +} + +process.exit(result.status === "pass" ? 0 : result.status === "env_issue" ? 2 : 1); + +function runPipelineSetup(pipelineName, label) { + return new Promise((resolvePromise, rejectPromise) => { + const child = spawn(process.execPath, ["scripts/e2e/ensure-fake-provider-pipeline.mjs"], { + cwd: resolve("."), + env: { + ...env, + LANGBOT_FAKE_PROVIDER_PIPELINE_NAME: pipelineName, + LANGBOT_FAKE_PROVIDER_FIRST_TOKEN_DELAY_MS: env.LANGBOT_FAKE_PROVIDER_FIRST_TOKEN_DELAY_MS || "25", + LANGBOT_FAKE_PROVIDER_CHUNK_DELAY_MS: env.LANGBOT_FAKE_PROVIDER_CHUNK_DELAY_MS || "10", + LANGBOT_FAKE_PROVIDER_CHUNK_COUNT: env.LANGBOT_FAKE_PROVIDER_CHUNK_COUNT || "0", + LANGBOT_FAKE_PROVIDER_FAIL_FIRST_N: "0", + LANGBOT_FAKE_PROVIDER_FAIL_EVERY_N: "0", + LANGBOT_FAKE_PROVIDER_FAULT_STATUS: env.LANGBOT_FAKE_PROVIDER_FAULT_STATUS || "500", + LANGBOT_FAKE_PROVIDER_FAIL_AFTER_FIRST_CHUNK: "false", + LANGBOT_FAKE_PROVIDER_DYNAMIC_RESPONSE: "true", + }, + stdio: ["ignore", "pipe", "pipe"], + }); + + let stdout = ""; + let stderr = ""; + child.stdout.on("data", (chunk) => { + const text = chunk.toString(); + stdout += text; + appendLine(paths.consoleLog, `[setup ${label} stdout] ${text.trimEnd()}`).catch(() => {}); + }); + child.stderr.on("data", (chunk) => { + const text = chunk.toString(); + stderr += text; + appendLine(paths.consoleLog, `[setup ${label} stderr] ${text.trimEnd()}`).catch(() => {}); + }); + child.on("error", rejectPromise); + child.on("close", (code) => { + const parsed = parseJsonOutput(stdout); + if (code !== 0 || parsed.status !== "pass") { + rejectPromise(new Error(parsed.reason || stderr || `Fake provider pipeline setup ${label} exited with ${code}.`)); + return; + } + resolvePromise(parsed); + }); + }); +} + +function parseJsonOutput(text) { + const trimmed = String(text || "").trim(); + if (!trimmed) return {}; + try { + return JSON.parse(trimmed); + } catch { + const start = trimmed.indexOf("{"); + const end = trimmed.lastIndexOf("}"); + if (start >= 0 && end > start) { + try { + return JSON.parse(trimmed.slice(start, end + 1)); + } catch { + return {}; + } + } + return {}; + } +} + +async function upsertEnvLocal(path, updates) { + await mkdir(dirname(path), { recursive: true }); + let text = ""; + try { + text = await readFile(path, "utf8"); + } catch { + text = ""; + } + const lines = text.split(/\r?\n/); + const seen = new Set(); + const next = lines.map((line) => { + const trimmed = line.trim(); + const match = trimmed.match(/^([A-Z][A-Z0-9_]*)=/); + if (!match || updates[match[1]] === undefined) return line; + seen.add(match[1]); + return `${match[1]}=${updates[match[1]]}`; + }); + for (const [key, value] of Object.entries(updates)) { + if (!seen.has(key)) next.push(`${key}=${value}`); + } + await writeFile(path, `${next.join("\n").replace(/\n+$/, "")}\n`, "utf8"); +} + +function looksLikeEnvIssue(error) { + const message = String(error?.message || error || ""); + return /fetch failed|ECONNREFUSED|ENOTFOUND|LANGBOT_.*not configured|Could not read recovery_key|Backend did not respond/i.test(message); +} + +function safeReason(value) { + return redact(String(value || "")).slice(0, 1000); +} diff --git a/skills/scripts/e2e/fake-openai-provider.mjs b/skills/scripts/e2e/fake-openai-provider.mjs index 7a4853f42..1cca9c46b 100644 --- a/skills/scripts/e2e/fake-openai-provider.mjs +++ b/skills/scripts/e2e/fake-openai-provider.mjs @@ -28,6 +28,8 @@ const recentRequests = []; const server = createServer(async (request, response) => { const startedAt = Date.now(); + const startedPerf = performance.now(); + let requestRecord = null; const url = new URL(request.url || "/", `http://${request.headers.host || `${host}:${port}`}`); try { if (request.method === "GET" && url.pathname === "/healthz") { @@ -98,13 +100,24 @@ const server = createServer(async (request, response) => { const requestId = `chatcmpl-langbot-fake-${requestCount}`; const shouldFail = requestCount <= config.fail_first_n || (config.fail_every_n > 0 && requestCount % config.fail_every_n === 0); - recordRequest({ + const replyText = responseTextForBody(body); + requestRecord = recordRequest({ id: requestId, + request_number: requestCount, path: url.pathname, stream: Boolean(body.stream), model: body.model || "", message_count: Array.isArray(body.messages) ? body.messages.length : 0, should_fail: shouldFail, + status: "running", + http_status: null, + expected_text: replyText, + response_text_preview: previewText(replyText), + started_at: new Date(startedAt).toISOString(), + started_epoch_ms: startedAt, + configured_first_token_delay_ms: config.first_token_delay_ms, + configured_chunk_delay_ms: config.chunk_delay_ms, + configured_chunk_count: config.chunk_count, }); if (shouldFail) { @@ -116,17 +129,21 @@ const server = createServer(async (request, response) => { code: "fake_provider_fault", }, }); + finishRequestRecord(requestRecord, startedPerf, { + status: "http_fault", + http_status: config.fault_status, + }); return; } - const replyText = responseTextForBody(body); - if (body.stream) { await streamCompletion(response, { requestId, model: body.model || modelName, content: replyText, failAfterFirstChunk: config.fail_after_first_chunk, + requestRecord, + startedPerf, }); } else { await sleep(config.first_token_delay_ms + config.chunk_delay_ms); @@ -135,6 +152,13 @@ const server = createServer(async (request, response) => { model: body.model || modelName, content: replyText, })); + markRequestTiming(requestRecord, "first_chunk", startedPerf); + markRequestTiming(requestRecord, "first_content_chunk", startedPerf); + requestRecord.content_chunk_count = 1; + finishRequestRecord(requestRecord, startedPerf, { + status: "ok", + http_status: 200, + }); } return; } @@ -146,6 +170,13 @@ const server = createServer(async (request, response) => { }, }); } catch (error) { + if (requestRecord) { + finishRequestRecord(requestRecord, startedPerf, { + status: "fake_provider_error", + http_status: 500, + error: error instanceof Error ? error.message : String(error), + }); + } sendJson(response, 500, { error: { message: error instanceof Error ? error.message : String(error), @@ -264,7 +295,14 @@ function completionPayload({ requestId, model, content }) { }; } -async function streamCompletion(response, { requestId, model, content, failAfterFirstChunk: failMidStream }) { +async function streamCompletion(response, { + requestId, + model, + content, + failAfterFirstChunk: failMidStream, + requestRecord, + startedPerf, +}) { response.writeHead(200, { "content-type": "text/event-stream; charset=utf-8", "cache-control": "no-cache", @@ -272,6 +310,7 @@ async function streamCompletion(response, { requestId, model, content, failAfter }); await sleep(config.first_token_delay_ms); + markRequestTiming(requestRecord, "first_chunk", startedPerf); writeSse(response, { id: requestId, object: "chat.completion.chunk", @@ -283,6 +322,8 @@ async function streamCompletion(response, { requestId, model, content, failAfter const chunks = splitContent(content); for (let index = 0; index < chunks.length; index += 1) { await sleep(config.chunk_delay_ms); + if (index === 0) markRequestTiming(requestRecord, "first_content_chunk", startedPerf); + requestRecord.content_chunk_count = (requestRecord.content_chunk_count || 0) + 1; writeSse(response, { id: requestId, object: "chat.completion.chunk", @@ -291,6 +332,10 @@ async function streamCompletion(response, { requestId, model, content, failAfter choices: [{ index: 0, delta: { content: chunks[index] }, finish_reason: null }], }); if (failMidStream && index === 0) { + finishRequestRecord(requestRecord, startedPerf, { + status: "mid_stream_disconnect", + http_status: 200, + }); response.destroy(new Error("LangBot fake provider injected mid-stream disconnect")); return; } @@ -312,6 +357,10 @@ async function streamCompletion(response, { requestId, model, content, failAfter }); response.write("data: [DONE]\n\n"); response.end(); + finishRequestRecord(requestRecord, startedPerf, { + status: "ok", + http_status: 200, + }); } function writeSse(response, payload) { @@ -365,11 +414,48 @@ function flattenContent(content) { } function recordRequest(entry) { - recentRequests.push({ + const item = { ...entry, at: new Date().toISOString(), - }); + finished_at: null, + finished_epoch_ms: null, + duration_ms: null, + first_chunk_at: null, + first_chunk_epoch_ms: null, + first_chunk_ms: null, + first_content_chunk_at: null, + first_content_chunk_epoch_ms: null, + first_content_chunk_ms: null, + content_chunk_count: 0, + }; + recentRequests.push(item); while (recentRequests.length > config.request_log_limit) recentRequests.shift(); + return item; +} + +function markRequestTiming(entry, key, startedPerf) { + if (!entry || entry[`${key}_at`]) return; + const now = Date.now(); + entry[`${key}_at`] = new Date(now).toISOString(); + entry[`${key}_epoch_ms`] = now; + entry[`${key}_ms`] = rounded(performance.now() - startedPerf); +} + +function finishRequestRecord(entry, startedPerf, updates = {}) { + if (!entry || entry.finished_at) return; + const now = Date.now(); + Object.assign(entry, updates); + entry.finished_at = new Date(now).toISOString(); + entry.finished_epoch_ms = now; + entry.duration_ms = rounded(performance.now() - startedPerf); +} + +function rounded(value) { + return Number(value.toFixed(3)); +} + +function previewText(value) { + return String(value || "").slice(0, 120); } function resetRequestState() { diff --git a/skills/skills.index.json b/skills/skills.index.json index f1b14ecf8..39acc97f1 100644 --- a/skills/skills.index.json +++ b/skills/skills.index.json @@ -151,6 +151,7 @@ "agent-runner-release-preflight", "agent-runner-runtime-chaos", "dify-agent-debug-chat", + "langbot-fake-provider-debug-chat-cross-pipeline-isolation", "langbot-fake-provider-debug-chat-fault-recovery", "langbot-fake-provider-debug-chat-load", "langbot-fake-provider-debug-chat-slow-load", @@ -497,6 +498,44 @@ "backend_log" ] }, + { + "id": "langbot-fake-provider-debug-chat-cross-pipeline-isolation", + "title": "LangBot Debug Chat fake-provider cross-pipeline isolation probe", + "mode": "probe", + "area": "reliability", + "type": "reliability", + "priority": "p1", + "risk": "high", + "ci_eligible": false, + "tags": [ + "reliability", + "debug-chat", + "websocket", + "fake-provider", + "isolation", + "concurrency", + "metrics" + ], + "automation": "skills/langbot-testing/probes/langbot-debug-chat-cross-pipeline-isolation.mjs", + "setup_automation": [ + "node:scripts/e2e/ensure-fake-provider-cross-pipelines.mjs --write-env" + ], + "setup_provides_env": [ + "LANGBOT_FAKE_PROVIDER_URL", + "LANGBOT_FAKE_PROVIDER_BASE_URL", + "LANGBOT_FAKE_PROVIDER_PID", + "LANGBOT_FAKE_PROVIDER_PIPELINE_A_URL", + "LANGBOT_FAKE_PROVIDER_PIPELINE_A_NAME", + "LANGBOT_FAKE_PROVIDER_PIPELINE_B_URL", + "LANGBOT_FAKE_PROVIDER_PIPELINE_B_NAME" + ], + "evidence_required": [ + "metrics", + "network", + "api_diagnostic", + "filesystem" + ] + }, { "id": "langbot-fake-provider-debug-chat-fault-recovery", "title": "LangBot Debug Chat fake-provider fault recovery probe", @@ -1456,6 +1495,7 @@ "cases": [ "langbot-fake-provider-debug-chat-load", "langbot-fake-provider-debug-chat-slow-load", + "langbot-fake-provider-debug-chat-cross-pipeline-isolation", "langbot-fake-provider-debug-chat-fault-recovery", "langbot-space-debug-chat-concurrency-smoke" ] diff --git a/skills/skills/langbot-testing/cases/langbot-fake-provider-debug-chat-cross-pipeline-isolation.yaml b/skills/skills/langbot-testing/cases/langbot-fake-provider-debug-chat-cross-pipeline-isolation.yaml new file mode 100644 index 000000000..acf1d7f0f --- /dev/null +++ b/skills/skills/langbot-testing/cases/langbot-fake-provider-debug-chat-cross-pipeline-isolation.yaml @@ -0,0 +1,81 @@ +id: langbot-fake-provider-debug-chat-cross-pipeline-isolation +title: "LangBot Debug Chat fake-provider cross-pipeline isolation probe" +mode: probe +area: reliability +type: reliability +priority: p1 +risk: high +ci_eligible: false +tags: + - reliability + - debug-chat + - websocket + - fake-provider + - isolation + - concurrency + - metrics +skills: + - langbot-env-setup + - langbot-testing +env: + - LANGBOT_BACKEND_URL + - LANGBOT_FRONTEND_URL + - LANGBOT_E2E_LOGIN_USER +automation: skills/langbot-testing/probes/langbot-debug-chat-cross-pipeline-isolation.mjs +automation_env: + - LANGBOT_BACKEND_URL + - LANGBOT_E2E_LOGIN_USER + - LANGBOT_FAKE_PROVIDER_URL + - LANGBOT_FAKE_PROVIDER_PIPELINE_A_URL + - LANGBOT_FAKE_PROVIDER_PIPELINE_A_NAME + - LANGBOT_FAKE_PROVIDER_PIPELINE_B_URL + - LANGBOT_FAKE_PROVIDER_PIPELINE_B_NAME +automation_debug_chat_load_requests: "6" +automation_debug_chat_load_concurrency: "4" +automation_debug_chat_load_timeout_ms: "30000" +automation_debug_chat_load_response_p95_ms: "5000" +automation_debug_chat_load_max_error_rate: "0" +automation_debug_chat_load_prompt_template: '请只回复 "{expected}",不要解释,不要添加其他字符。' +automation_debug_chat_load_stream: "true" +automation_debug_chat_load_reset: "true" +metrics_thresholds_json: '{"cross_pipeline_leak_count":{"max":0},"response_p95_ms":{"max":5000},"error_rate":{"max":0}}' +load_profile_json: '{"requests_per_pipeline":6,"pipelines":2,"concurrency":4,"path":"Pipeline Debug Chat WebSocket","provider":"controlled fake OpenAI-compatible provider","metric":"cross-pipeline response isolation and send-to-final-assistant-response"}' +setup_automation: + - "node:scripts/e2e/ensure-fake-provider-cross-pipelines.mjs --write-env" +setup_provides_env: + - LANGBOT_FAKE_PROVIDER_URL + - LANGBOT_FAKE_PROVIDER_BASE_URL + - LANGBOT_FAKE_PROVIDER_PID + - LANGBOT_FAKE_PROVIDER_PIPELINE_A_URL + - LANGBOT_FAKE_PROVIDER_PIPELINE_A_NAME + - LANGBOT_FAKE_PROVIDER_PIPELINE_B_URL + - LANGBOT_FAKE_PROVIDER_PIPELINE_B_NAME +steps: + - "Start or reuse the local fake OpenAI-compatible provider." + - "Create or update two local-agent pipelines that both point at the controlled fake provider." + - "Reset both Debug Chat sessions and the fake-provider request log." + - "Open concurrent WebSocket Debug Chat connections to both pipelines and send unique pipeline-scoped response tokens." +checks: + - "automation-result.json status is pass only when every request receives its own expected token and cross_pipeline_leak_count is zero." + - "metrics_summary includes by_pipeline status counts, fake-provider request count, and LangBot/provider timing estimates." + - "samples.json contains per-request pipeline labels so any leak can be attributed to the receiving pipeline." +evidence_required: + - metrics + - network + - api_diagnostic + - filesystem +diagnostics: + - "This probe targets Debug Chat isolation under concurrent traffic from two pipelines." + - "It is designed to expose regressions where global pipeline state causes one pipeline's assistant response to be delivered to another pipeline's Debug Chat session." + - "Same-pipeline foreign responses are tolerated because Debug Chat intentionally broadcasts within the same pipeline/session; cross-pipeline tokens are never tolerated." +success_patterns: + - "Debug Chat cross-pipeline isolation probe passed" +failure_patterns: + - "cross_pipeline_leak" + - "Timed out after" + - "WebSocket connection error" + - "Final assistant response did not include" +troubleshooting: + - backend-not-listening + - debug-chat-history-contaminates-automation + - local-agent-model-route-unavailable diff --git a/skills/skills/langbot-testing/probes/langbot-debug-chat-concurrency.mjs b/skills/skills/langbot-testing/probes/langbot-debug-chat-concurrency.mjs index 707fab6b0..af5153dbf 100644 --- a/skills/skills/langbot-testing/probes/langbot-debug-chat-concurrency.mjs +++ b/skills/skills/langbot-testing/probes/langbot-debug-chat-concurrency.mjs @@ -17,6 +17,10 @@ import { resetAndAuthLocalUser, writeResult, } from "../../../scripts/e2e/lib/langbot-e2e.mjs"; +import { + buildProviderTimingMetrics, + summarizeFakeProviderState, +} from "./lib/fake-provider-timing.mjs"; const DEFAULT_LOCAL_PASSWORD = "LangBotE2ELocalPass!2026"; @@ -179,11 +183,18 @@ try { error_rate: metrics.error_rate, response_p50_ms: metrics.response_duration_ms.p50, response_p95_ms: metrics.response_duration_ms.p95, + first_assistant_event_p95_ms: metrics.first_assistant_event_ms.p95, + first_assistant_content_p95_ms: metrics.first_assistant_content_ms.p95, first_response_p95_ms: metrics.first_response_ms.p95, throughput_rps: metrics.throughput_rps, status_counts: metrics.status_counts, fake_provider_request_count: metrics.fake_provider?.request_count ?? null, fake_provider_fault_count: metrics.fake_provider?.fault_count ?? null, + fake_provider_duration_p95_ms: metrics.provider_timing?.provider_duration_ms.p95 ?? null, + langbot_overhead_estimate_p95_ms: metrics.provider_timing?.langbot_overhead_estimate_ms.p95 ?? null, + send_to_provider_start_p95_ms: metrics.provider_timing?.send_to_provider_start_ms.p95 ?? null, + provider_finish_to_ws_final_p95_ms: metrics.provider_timing?.provider_finish_to_ws_final_ms.p95 ?? null, + provider_timing_matched_request_count: metrics.provider_timing?.matched_request_count ?? null, }; result.thresholds_summary = thresholds; result.artifacts = { @@ -391,9 +402,25 @@ function runSingleRequest({ expected_text: expected, prompt, response_text: "", + started_at: new Date().toISOString(), + started_epoch_ms: Date.now(), + connected_at: null, + connected_epoch_ms: null, + sent_at: null, + sent_epoch_ms: null, + first_assistant_event_at: null, + first_assistant_event_epoch_ms: null, + first_assistant_event_ms: null, + first_assistant_content_at: null, + first_assistant_content_epoch_ms: null, + first_assistant_content_ms: null, + first_response_at: null, + first_response_epoch_ms: null, connected_ms: null, first_response_ms: null, response_duration_ms: null, + finished_at: null, + finished_epoch_ms: null, event_count: 0, foreign_response_count: 0, last_foreign_response_text: "", @@ -413,6 +440,9 @@ function runSingleRequest({ client = openRawWebSocket(wsUrl, { onOpen() { connectedAt = performance.now(); + const now = Date.now(); + sample.connected_at = new Date(now).toISOString(); + sample.connected_epoch_ms = now; sample.connected_ms = rounded(connectedAt - startedAt); }, onMessage(text) { @@ -435,6 +465,9 @@ function runSingleRequest({ if (data.type === "connected") { sentAt = performance.now(); + const now = Date.now(); + sample.sent_at = new Date(now).toISOString(); + sample.sent_epoch_ms = now; client.send(JSON.stringify({ type: "message", message: [{ type: "Plain", text: prompt }], @@ -449,7 +482,15 @@ function runSingleRequest({ if (data.type !== "response" || data.data?.role !== "assistant") return; const content = String(data.data.content || ""); + markFirstAssistantEvent(sample, sentAt); if (content) sample.response_text = content; + if (content) markFirstAssistantContent(sample, sentAt); + if (content.includes(expected) && sample.first_response_ms === null && sentAt > 0) { + const now = Date.now(); + sample.first_response_at = new Date(now).toISOString(); + sample.first_response_epoch_ms = now; + sample.first_response_ms = rounded(performance.now() - sentAt); + } if (data.data.is_final === true) { const ok = sample.response_text.includes(expected); if (ok) { @@ -488,6 +529,9 @@ function runSingleRequest({ : reason || ""; if (sentAt > 0) sample.response_duration_ms = rounded(performance.now() - sentAt); else sample.response_duration_ms = rounded(performance.now() - startedAt); + const now = Date.now(); + sample.finished_at = new Date(now).toISOString(); + sample.finished_epoch_ms = now; try { client?.close(); } catch { @@ -498,6 +542,22 @@ function runSingleRequest({ }); } +function markFirstAssistantEvent(sample, sentAt) { + if (sample.first_assistant_event_ms !== null || sentAt <= 0) return; + const now = Date.now(); + sample.first_assistant_event_at = new Date(now).toISOString(); + sample.first_assistant_event_epoch_ms = now; + sample.first_assistant_event_ms = rounded(performance.now() - sentAt); +} + +function markFirstAssistantContent(sample, sentAt) { + if (sample.first_assistant_content_ms !== null || sentAt <= 0) return; + const now = Date.now(); + sample.first_assistant_content_at = new Date(now).toISOString(); + sample.first_assistant_content_epoch_ms = now; + sample.first_assistant_content_ms = rounded(performance.now() - sentAt); +} + function containsLoadToken(text, prefix) { const escaped = String(prefix).replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); return new RegExp(`${escaped}-\\d{4}`).test(String(text || "")); @@ -709,27 +769,16 @@ function buildMetrics({ samples, totalRequests, concurrency, timeoutMs, loadDura throughput_rps: loadDurationMs <= 0 ? 0 : rounded(okSamples.length / (loadDurationMs / 1000)), status_counts: statusCounts, connected_ms: stats(samples.map((sample) => sample.connected_ms).filter(Number.isFinite)), + first_assistant_event_ms: stats(samples.map((sample) => sample.first_assistant_event_ms).filter(Number.isFinite)), + first_assistant_content_ms: stats(samples.map((sample) => sample.first_assistant_content_ms).filter(Number.isFinite)), first_response_ms: stats(okSamples.map((sample) => sample.first_response_ms).filter(Number.isFinite)), response_duration_ms: stats(okSamples.map((sample) => sample.response_duration_ms).filter(Number.isFinite)), fake_provider: summarizeFakeProviderState(fakeProviderState), + provider_timing: buildProviderTimingMetrics(samples, fakeProviderState), samples, }; } -function summarizeFakeProviderState(state) { - if (!state) return null; - const recentRequests = Array.isArray(state.recent_requests) ? state.recent_requests : []; - return { - status: state.status || "unknown", - url: state.url || "", - request_count: Number.isFinite(state.request_count) ? state.request_count : recentRequests.length, - recent_request_count: recentRequests.length, - fault_count: recentRequests.filter((request) => request?.should_fail === true).length, - streamed_request_count: recentRequests.filter((request) => request?.stream === true).length, - config: state.config || {}, - }; -} - function buildThresholds(metrics) { const thresholds = { error_rate: { actual: metrics.error_rate, max: maxErrorRate, pass: metrics.error_rate <= maxErrorRate }, diff --git a/skills/skills/langbot-testing/probes/langbot-debug-chat-cross-pipeline-isolation.mjs b/skills/skills/langbot-testing/probes/langbot-debug-chat-cross-pipeline-isolation.mjs new file mode 100644 index 000000000..b83f6161d --- /dev/null +++ b/skills/skills/langbot-testing/probes/langbot-debug-chat-cross-pipeline-isolation.mjs @@ -0,0 +1,861 @@ +#!/usr/bin/env node + +import crypto from "node:crypto"; +import net from "node:net"; +import tls from "node:tls"; +import { mkdir, writeFile } from "node:fs/promises"; +import { resolve } from "node:path"; +import { env, exit } from "node:process"; +import { + apiJson, + appendLine, + ensureEvidence, + evidencePaths, + loadEnvFiles, + localIsoWithOffset, + redact, + resetAndAuthLocalUser, + writeResult, +} from "../../../scripts/e2e/lib/langbot-e2e.mjs"; +import { + buildProviderTimingMetrics, + summarizeFakeProviderState, +} from "./lib/fake-provider-timing.mjs"; + +const DEFAULT_LOCAL_PASSWORD = "LangBotE2ELocalPass!2026"; + +await loadEnvFiles(); +const caseId = env.LBS_CASE_ID || "langbot-debug-chat-cross-pipeline-isolation"; +const paths = evidencePaths(caseId); +await ensureEvidence(paths); + +const startedAt = new Date(); +const metricsPath = resolve(paths.evidenceDir, "metrics.json"); +const samplesPath = resolve(paths.evidenceDir, "samples.json"); +const fakeProviderStatePath = resolve(paths.evidenceDir, "fake-provider-state.json"); +const resetDiagnosticPath = resolve(paths.evidenceDir, "debug-chat-reset-diagnostic.json"); +const backendUrl = env.LANGBOT_BACKEND_URL || ""; +const fakeProviderUrl = env.LANGBOT_FAKE_PROVIDER_URL || ""; +const sessionType = env.LANGBOT_DEBUG_CHAT_LOAD_SESSION_TYPE || env.LANGBOT_E2E_DEBUG_CHAT_SESSION_TYPE || "person"; +const requestsPerPipeline = positiveInteger(env.LANGBOT_DEBUG_CHAT_LOAD_REQUESTS, 6); +const concurrency = Math.min(requestsPerPipeline * 2, positiveInteger(env.LANGBOT_DEBUG_CHAT_LOAD_CONCURRENCY, 4)); +const timeoutMs = positiveInteger(env.LANGBOT_DEBUG_CHAT_LOAD_TIMEOUT_MS, 30_000); +const stream = bool(env.LANGBOT_DEBUG_CHAT_LOAD_STREAM, true); +const resetBeforeRun = bool(env.LANGBOT_DEBUG_CHAT_LOAD_RESET, true); +const responseP95BudgetMs = positiveNumber(env.LANGBOT_DEBUG_CHAT_LOAD_RESPONSE_P95_MS, 5_000); +const maxErrorRate = positiveNumber(env.LANGBOT_DEBUG_CHAT_LOAD_MAX_ERROR_RATE, 0); +const promptTemplate = env.LANGBOT_DEBUG_CHAT_LOAD_PROMPT_TEMPLATE + || "请只回复 \"{expected}\",不要解释,不要添加其他字符。"; +const failureSignals = textList(env.LANGBOT_E2E_FAILURE_SIGNALS || env.LANGBOT_DEBUG_CHAT_LOAD_FAILURE_SIGNALS || ""); + +const pipelineTargets = [ + { + label: "A", + expectedPrefix: "PIPEA", + otherPrefix: "PIPEB", + url: env.LANGBOT_FAKE_PROVIDER_PIPELINE_A_URL || "", + name: env.LANGBOT_FAKE_PROVIDER_PIPELINE_A_NAME || "", + }, + { + label: "B", + expectedPrefix: "PIPEB", + otherPrefix: "PIPEA", + url: env.LANGBOT_FAKE_PROVIDER_PIPELINE_B_URL || "", + name: env.LANGBOT_FAKE_PROVIDER_PIPELINE_B_NAME || "", + }, +]; + +const result = { + source: "automation", + case_id: caseId, + run_id: paths.runId, + status: "fail", + reason: "", + started_at: startedAt.toISOString(), + started_at_local: localIsoWithOffset(startedAt), + finished_at: "", + finished_at_local: "", + duration_ms: 0, + backend_url: backendUrl, + session_type: sessionType, + pipelines: [], + load_profile: { + requests_per_pipeline: requestsPerPipeline, + total_requests: requestsPerPipeline * 2, + concurrency, + timeout_ms: timeoutMs, + stream, + reset_before_run: resetBeforeRun, + }, + evidence: { + network_log: paths.networkLog, + metrics_json: metricsPath, + samples_json: samplesPath, + fake_provider_state_json: fakeProviderStatePath, + debug_chat_reset_diagnostic_json: resetDiagnosticPath, + automation_result_json: paths.automationResultJson, + result_json: paths.resultJson, + }, + evidence_collected: ["metrics", "network", "api_diagnostic", "filesystem"], +}; + +try { + if (!backendUrl) { + result.status = "env_issue"; + throw new Error("LANGBOT_BACKEND_URL is not configured."); + } + if (!["person", "group"].includes(sessionType)) { + throw new Error(`LANGBOT_DEBUG_CHAT_LOAD_SESSION_TYPE must be person or group, got ${sessionType}.`); + } + for (const target of pipelineTargets) { + if (!target.url && !target.name) { + result.status = "env_issue"; + throw new Error(`Set LANGBOT_FAKE_PROVIDER_PIPELINE_${target.label}_URL or LANGBOT_FAKE_PROVIDER_PIPELINE_${target.label}_NAME.`); + } + } + + const backendReady = await backendReachable(backendUrl); + if (!backendReady) { + result.status = "env_issue"; + throw new Error(`Backend did not respond at ${backendUrl}.`); + } + + const user = env.LANGBOT_E2E_LOGIN_USER || ""; + const password = env.LANGBOT_E2E_LOGIN_PASSWORD || DEFAULT_LOCAL_PASSWORD; + if (!user) { + result.status = "env_issue"; + throw new Error("LANGBOT_E2E_LOGIN_USER is required so this probe can resolve/reset Debug Chat sessions."); + } + const auth = await resetAndAuthLocalUser({ backendUrl, user, password }); + const pipelines = []; + for (const target of pipelineTargets) { + const pipeline = await resolvePipeline({ + backendUrl, + token: auth.token, + pipelineUrl: target.url, + pipelineName: target.name, + }); + pipelines.push({ + ...target, + id: pipeline.id, + name: pipeline.name || target.name, + wsUrl: websocketUrl(backendUrl, pipeline.id, sessionType), + }); + } + result.pipelines = pipelines.map((pipeline) => ({ + label: pipeline.label, + id: pipeline.id, + name: pipeline.name, + url: pipeline.url, + })); + + if (resetBeforeRun) { + const resetDiagnostics = []; + for (const pipeline of pipelines) { + const reset = await apiJson(backendUrl, `/api/v1/pipelines/${encodeURIComponent(pipeline.id)}/ws/reset/${encodeURIComponent(sessionType)}`, { + method: "POST", + token: auth.token, + }); + resetDiagnostics.push({ + pipeline_label: pipeline.label, + pipeline_id: pipeline.id, + status: isApiFailure(reset) ? "fail" : "ready", + http_status: reset.status, + code: reset.json.code ?? null, + reason: isApiFailure(reset) ? reset.json.msg || "Debug Chat reset failed." : "Debug Chat session reset.", + }); + } + await writeFile(resetDiagnosticPath, `${JSON.stringify(resetDiagnostics, null, 2)}\n`, "utf8"); + const failedReset = resetDiagnostics.find((item) => item.status === "fail"); + if (failedReset) throw new Error(failedReset.reason); + } + await resetFakeProvider(fakeProviderUrl); + + const jobs = []; + for (let index = 0; index < requestsPerPipeline; index += 1) { + for (const pipeline of pipelines) { + jobs.push({ ...pipeline, index }); + } + } + + const loadStartedAt = performance.now(); + const samples = await runLoad({ + jobs, + concurrency, + timeoutMs, + promptTemplate, + stream, + failureSignals, + }); + const loadDurationMs = performance.now() - loadStartedAt; + const fakeProviderState = await readFakeProviderState(fakeProviderUrl); + if (fakeProviderState) { + await writeFile(fakeProviderStatePath, `${JSON.stringify(fakeProviderState, null, 2)}\n`, "utf8"); + } + const metrics = buildMetrics({ + samples, + requestsPerPipeline, + concurrency, + timeoutMs, + loadDurationMs, + backendUrl, + sessionType, + fakeProviderState, + }); + const thresholds = buildThresholds(metrics); + const passed = Object.values(thresholds).every((item) => item.pass); + result.status = passed ? "pass" : "fail"; + result.reason = passed + ? "Debug Chat cross-pipeline isolation probe passed all thresholds." + : "Debug Chat cross-pipeline isolation probe found leaks, errors, or latency threshold breaches."; + result.metrics_summary = { + requests_per_pipeline: metrics.requests_per_pipeline, + total_requests: metrics.total_requests, + concurrency: metrics.concurrency, + ok_count: metrics.ok_count, + error_count: metrics.error_count, + cross_pipeline_leak_count: metrics.cross_pipeline_leak_count, + timeout_count: metrics.timeout_count, + error_rate: metrics.error_rate, + response_p95_ms: metrics.response_duration_ms.p95, + first_response_p95_ms: metrics.first_response_ms.p95, + throughput_rps: metrics.throughput_rps, + status_counts: metrics.status_counts, + by_pipeline: metrics.by_pipeline, + fake_provider_request_count: metrics.fake_provider?.request_count ?? null, + fake_provider_duration_p95_ms: metrics.provider_timing?.provider_duration_ms.p95 ?? null, + langbot_overhead_estimate_p95_ms: metrics.provider_timing?.langbot_overhead_estimate_ms.p95 ?? null, + send_to_provider_start_p95_ms: metrics.provider_timing?.send_to_provider_start_ms.p95 ?? null, + provider_finish_to_ws_final_p95_ms: metrics.provider_timing?.provider_finish_to_ws_final_ms.p95 ?? null, + }; + result.thresholds_summary = thresholds; + result.artifacts = { + metrics_json: metricsPath, + samples_json: samplesPath, + fake_provider_state_json: fakeProviderState ? fakeProviderStatePath : "", + network_log: paths.networkLog, + automation_result_json: paths.automationResultJson, + result_json: paths.resultJson, + }; + + await writeFile(metricsPath, `${JSON.stringify({ ...metrics, thresholds }, null, 2)}\n`, "utf8"); + await writeFile(samplesPath, `${JSON.stringify(samples, null, 2)}\n`, "utf8"); +} catch (error) { + if (!["env_issue", "blocked"].includes(result.status)) { + result.status = looksLikeEnvIssue(error) ? "env_issue" : "fail"; + } + result.reason = result.reason || safeReason(error.message); +} finally { + const finishedAt = new Date(); + result.finished_at = finishedAt.toISOString(); + result.finished_at_local = localIsoWithOffset(finishedAt); + result.duration_ms = finishedAt.getTime() - startedAt.getTime(); + await mkdir(paths.evidenceDir, { recursive: true }); + await writeResult(paths, result); + console.log(JSON.stringify(result, null, 2)); +} + +exit(result.status === "pass" ? 0 : result.status === "env_issue" || result.status === "blocked" ? 2 : 1); + +async function backendReachable(baseUrl) { + try { + const response = await fetch(`${baseUrl.replace(/\/$/, "")}/healthz`, { + signal: AbortSignal.timeout(3000), + }); + return response.status < 500; + } catch { + return false; + } +} + +async function resetFakeProvider(rootUrl) { + if (!rootUrl) return; + try { + await fetch(`${normalizeProviderRootUrl(rootUrl)}/__qa/reset`, { + method: "POST", + signal: AbortSignal.timeout(3000), + }); + } catch { + // Missing fake-provider diagnostics should not hide the isolation result. + } +} + +async function readFakeProviderState(rootUrl) { + if (!rootUrl) return null; + try { + const response = await fetch(`${normalizeProviderRootUrl(rootUrl)}/__qa/config`, { + signal: AbortSignal.timeout(3000), + }); + const json = await response.json().catch(() => ({})); + return { + status: response.ok && json.ok === true ? "loaded" : "unavailable", + url: normalizeProviderRootUrl(rootUrl), + http_status: response.status, + model: json.model || "", + config: json.config || {}, + request_count: Number.isFinite(json.request_count) ? json.request_count : null, + recent_requests: Array.isArray(json.recent_requests) ? json.recent_requests : [], + }; + } catch (error) { + return { + status: "unavailable", + url: normalizeProviderRootUrl(rootUrl), + reason: safeReason(error.message), + request_count: null, + recent_requests: [], + }; + } +} + +function normalizeProviderRootUrl(value) { + const trimmed = String(value || "").trim().replace(/\/$/, ""); + return trimmed.endsWith("/v1") ? trimmed.slice(0, -3) : trimmed; +} + +function pipelineIdFromUrl(url) { + if (!url) return ""; + try { + const parsed = new URL(url); + return parsed.searchParams.get("id") || ""; + } catch { + return ""; + } +} + +async function resolvePipeline({ backendUrl, token, pipelineUrl, pipelineName }) { + const idFromUrl = pipelineIdFromUrl(pipelineUrl); + if (idFromUrl) { + const response = await apiJson(backendUrl, `/api/v1/pipelines/${encodeURIComponent(idFromUrl)}`, { token }); + const pipeline = response.json.data?.pipeline; + if (isApiFailure(response) || !pipeline?.uuid) { + throw new Error(response.json.msg || `Could not load pipeline ${idFromUrl}.`); + } + return { id: pipeline.uuid, name: pipeline.name || "" }; + } + if (!pipelineName) { + throw new Error("Set pipeline URL or name before running this probe."); + } + const response = await apiJson(backendUrl, "/api/v1/pipelines", { token }); + if (isApiFailure(response)) { + throw new Error(response.json.msg || "Failed to list pipelines."); + } + const pipeline = (response.json.data?.pipelines || []).find((item) => item.name === pipelineName); + if (!pipeline?.uuid) { + throw new Error(`Could not find pipeline named ${pipelineName}.`); + } + return { id: pipeline.uuid, name: pipeline.name || pipelineName }; +} + +function isApiFailure(response) { + return response.status >= 400 || (response.json.code !== undefined && response.json.code !== 0); +} + +function websocketUrl(baseUrl, pipelineId, sessionTypeValue) { + const parsed = new URL(baseUrl); + parsed.protocol = parsed.protocol === "https:" ? "wss:" : "ws:"; + parsed.pathname = `/api/v1/pipelines/${encodeURIComponent(pipelineId)}/ws/connect`; + parsed.search = `?session_type=${encodeURIComponent(sessionTypeValue)}`; + return parsed.toString(); +} + +async function runLoad(options) { + const samples = []; + const queue = [...options.jobs]; + const workers = Array.from({ length: options.concurrency }, async () => { + while (queue.length > 0) { + const job = queue.shift(); + if (!job) continue; + const sample = await runSingleRequest({ ...options, job }); + samples.push(sample); + } + }); + await Promise.all(workers); + return samples.sort((left, right) => ( + left.pipeline_label.localeCompare(right.pipeline_label) || left.index - right.index + )); +} + +function expectedForIndex(prefix, index) { + return `${prefix}-${String(index + 1).padStart(4, "0")}`; +} + +function promptForIndex(template, expected) { + return template.replaceAll("{expected}", expected); +} + +function runSingleRequest({ + job, + timeoutMs, + promptTemplate, + stream, + failureSignals, +}) { + return new Promise((resolvePromise) => { + const expected = expectedForIndex(job.expectedPrefix, job.index); + const prompt = promptForIndex(promptTemplate, expected); + const sample = { + index: job.index, + pipeline_label: job.label, + pipeline_id: job.id, + pipeline_name: job.name, + status: "running", + ok: false, + expected_text: expected, + expected_prefix: job.expectedPrefix, + other_prefix: job.otherPrefix, + prompt, + response_text: "", + started_at: new Date().toISOString(), + started_epoch_ms: Date.now(), + connected_at: null, + connected_epoch_ms: null, + sent_at: null, + sent_epoch_ms: null, + first_assistant_event_at: null, + first_assistant_event_epoch_ms: null, + first_assistant_event_ms: null, + first_assistant_content_at: null, + first_assistant_content_epoch_ms: null, + first_assistant_content_ms: null, + first_response_at: null, + first_response_epoch_ms: null, + connected_ms: null, + first_response_ms: null, + response_duration_ms: null, + finished_at: null, + finished_epoch_ms: null, + event_count: 0, + same_pipeline_foreign_response_count: 0, + cross_pipeline_leak_count: 0, + last_foreign_response_text: "", + error: "", + close_code: null, + close_reason: "", + }; + let closed = false; + let connectedAt = 0; + let sentAt = 0; + const startedPerf = performance.now(); + let client = null; + const timer = setTimeout(() => { + finish("timeout", `Timed out after ${timeoutMs} ms.`); + }, timeoutMs); + + client = openRawWebSocket(job.wsUrl, { + onOpen() { + connectedAt = performance.now(); + const now = Date.now(); + sample.connected_at = new Date(now).toISOString(); + sample.connected_epoch_ms = now; + sample.connected_ms = rounded(connectedAt - startedPerf); + }, + onMessage(text) { + sample.event_count += 1; + let data; + try { + data = JSON.parse(String(text || "")); + } catch (error) { + finish("error", `Invalid WebSocket JSON: ${error.message}`); + return; + } + appendLine(paths.networkLog, JSON.stringify({ + pipeline_label: job.label, + request_index: job.index, + type: data.type, + session_type: data.session_type || "", + role: data.data?.role || "", + is_final: data.data?.is_final ?? null, + content_preview: redact(String(data.data?.content || data.message || "").slice(0, 200)), + })).catch(() => {}); + + if (data.type === "connected") { + sentAt = performance.now(); + const now = Date.now(); + sample.sent_at = new Date(now).toISOString(); + sample.sent_epoch_ms = now; + client.send(JSON.stringify({ + type: "message", + message: [{ type: "Plain", text: prompt }], + stream, + })); + return; + } + if (data.type === "error") { + finish("error", data.message || "WebSocket error message."); + return; + } + if (data.type !== "response" || data.data?.role !== "assistant") return; + + const content = String(data.data.content || ""); + markFirstAssistantEvent(sample, sentAt); + if (content) sample.response_text = content; + if (content) markFirstAssistantContent(sample, sentAt); + if (containsPipelineToken(content, job.otherPrefix)) { + sample.cross_pipeline_leak_count += 1; + finish("cross_pipeline_leak", `Pipeline ${job.label} received response from ${job.otherPrefix}: ${content}`); + return; + } + if (content.includes(expected) && sample.first_response_ms === null && sentAt > 0) { + const now = Date.now(); + sample.first_response_at = new Date(now).toISOString(); + sample.first_response_epoch_ms = now; + sample.first_response_ms = rounded(performance.now() - sentAt); + } + if (data.data.is_final === true) { + const ok = sample.response_text.includes(expected); + if (ok) { + if (sample.first_response_ms === null && sentAt > 0) { + const now = Date.now(); + sample.first_response_at = new Date(now).toISOString(); + sample.first_response_epoch_ms = now; + sample.first_response_ms = rounded(performance.now() - sentAt); + } + finish("pass", ""); + } else if (matchesFailureSignal(sample.response_text, failureSignals)) { + finish("app_error", `Assistant final response matched a failure signal: ${sample.response_text}`); + } else if (containsPipelineToken(sample.response_text, job.expectedPrefix)) { + sample.same_pipeline_foreign_response_count += 1; + sample.last_foreign_response_text = sample.response_text; + } else { + finish("mismatch", `Final assistant response did not include ${expected}: ${sample.response_text}`); + } + } + }, + onError(error) { + finish("connection_error", `WebSocket connection error: ${error.message}`); + }, + onClose(event) { + sample.close_code = event.code; + sample.close_reason = event.reason || ""; + if (!closed) finish("closed", `WebSocket closed before final assistant response: ${event.code}`); + }, + }); + + function finish(status, reason) { + if (closed) return; + closed = true; + clearTimeout(timer); + sample.status = status; + sample.ok = status === "pass"; + sample.error = status === "timeout" && sample.same_pipeline_foreign_response_count > 0 + ? `${reason || ""} Saw ${sample.same_pipeline_foreign_response_count} same-pipeline foreign assistant response(s); last=${sample.last_foreign_response_text}` + : reason || ""; + if (sentAt > 0) sample.response_duration_ms = rounded(performance.now() - sentAt); + else sample.response_duration_ms = rounded(performance.now() - startedPerf); + const now = Date.now(); + sample.finished_at = new Date(now).toISOString(); + sample.finished_epoch_ms = now; + try { + client?.close(); + } catch { + // Closing a failed socket should not hide the sample result. + } + resolvePromise(sample); + } + }); +} + +function markFirstAssistantEvent(sample, sentAt) { + if (sample.first_assistant_event_ms !== null || sentAt <= 0) return; + const now = Date.now(); + sample.first_assistant_event_at = new Date(now).toISOString(); + sample.first_assistant_event_epoch_ms = now; + sample.first_assistant_event_ms = rounded(performance.now() - sentAt); +} + +function markFirstAssistantContent(sample, sentAt) { + if (sample.first_assistant_content_ms !== null || sentAt <= 0) return; + const now = Date.now(); + sample.first_assistant_content_at = new Date(now).toISOString(); + sample.first_assistant_content_epoch_ms = now; + sample.first_assistant_content_ms = rounded(performance.now() - sentAt); +} + +function containsPipelineToken(text, prefix) { + const escaped = String(prefix).replace(/[.*+?^${}()|[\]\\]/g, "\\$&"); + return new RegExp(`${escaped}-\\d{4}`).test(String(text || "")); +} + +function matchesFailureSignal(text, signals) { + const lower = String(text || "").toLowerCase(); + return signals.some((signal) => lower.includes(signal.toLowerCase())); +} + +function openRawWebSocket(wsUrl, handlers) { + const parsed = new URL(wsUrl); + const secure = parsed.protocol === "wss:"; + const port = Number(parsed.port || (secure ? 443 : 80)); + const host = parsed.hostname; + const path = `${parsed.pathname}${parsed.search}`; + const key = crypto.randomBytes(16).toString("base64"); + const socket = secure + ? tls.connect({ host, port, servername: host }) + : net.connect({ host, port }); + let opened = false; + let closed = false; + let buffer = Buffer.alloc(0); + + socket.setNoDelay(true); + socket.on("connect", () => { + const originProtocol = secure ? "https" : "http"; + const request = [ + `GET ${path} HTTP/1.1`, + `Host: ${parsed.host}`, + "Upgrade: websocket", + "Connection: Upgrade", + `Sec-WebSocket-Key: ${key}`, + "Sec-WebSocket-Version: 13", + `Origin: ${originProtocol}://${parsed.host}`, + "", + "", + ].join("\r\n"); + socket.write(request); + }); + socket.on("data", (chunk) => { + buffer = Buffer.concat([buffer, chunk]); + if (!opened) { + const headerEnd = buffer.indexOf("\r\n\r\n"); + if (headerEnd === -1) return; + const headerText = buffer.slice(0, headerEnd).toString("utf8"); + buffer = buffer.slice(headerEnd + 4); + if (!/^HTTP\/1\.1 101\b/i.test(headerText)) { + handlers.onError(new Error(`Handshake failed: ${headerText.split("\r\n")[0] || "missing status"}`)); + socket.destroy(); + return; + } + opened = true; + handlers.onOpen(); + } + processFrames(); + }); + socket.on("error", (error) => { + if (!closed) handlers.onError(error); + }); + socket.on("close", () => { + if (closed) return; + closed = true; + handlers.onClose({ code: null, reason: "" }); + }); + + function processFrames() { + while (true) { + const frame = readFrame(buffer); + if (!frame) return; + buffer = buffer.slice(frame.consumed); + if (frame.opcode === 0x1) { + handlers.onMessage(frame.payload.toString("utf8")); + } else if (frame.opcode === 0x8) { + const code = frame.payload.length >= 2 ? frame.payload.readUInt16BE(0) : null; + const reason = frame.payload.length > 2 ? frame.payload.slice(2).toString("utf8") : ""; + closed = true; + handlers.onClose({ code, reason }); + socket.end(); + return; + } else if (frame.opcode === 0x9) { + writeFrame(socket, 0xA, frame.payload); + } + } + } + + return { + send(text) { + if (closed || !opened) return; + writeFrame(socket, 0x1, Buffer.from(text, "utf8")); + }, + close() { + if (closed) return; + closed = true; + if (!socket.destroyed) { + if (opened) writeFrame(socket, 0x8, Buffer.alloc(0)); + setTimeout(() => socket.end(), 50).unref(); + } + }, + }; +} + +function readFrame(buffer) { + if (buffer.length < 2) return null; + const first = buffer[0]; + const second = buffer[1]; + const opcode = first & 0x0f; + const masked = Boolean(second & 0x80); + let length = second & 0x7f; + let offset = 2; + if (length === 126) { + if (buffer.length < offset + 2) return null; + length = buffer.readUInt16BE(offset); + offset += 2; + } else if (length === 127) { + if (buffer.length < offset + 8) return null; + const high = buffer.readUInt32BE(offset); + const low = buffer.readUInt32BE(offset + 4); + length = high * 2 ** 32 + low; + offset += 8; + } + let mask = null; + if (masked) { + if (buffer.length < offset + 4) return null; + mask = buffer.slice(offset, offset + 4); + offset += 4; + } + if (buffer.length < offset + length) return null; + let payload = buffer.slice(offset, offset + length); + if (mask) { + payload = Buffer.from(payload); + for (let index = 0; index < payload.length; index += 1) { + payload[index] ^= mask[index % 4]; + } + } + return { + opcode, + payload, + consumed: offset + length, + }; +} + +function writeFrame(socket, opcode, payload) { + const body = Buffer.isBuffer(payload) ? payload : Buffer.from(payload || ""); + const mask = crypto.randomBytes(4); + const headerLength = body.length < 126 ? 2 : body.length <= 0xffff ? 4 : 10; + const header = Buffer.alloc(headerLength); + header[0] = 0x80 | opcode; + if (body.length < 126) { + header[1] = 0x80 | body.length; + } else if (body.length <= 0xffff) { + header[1] = 0x80 | 126; + header.writeUInt16BE(body.length, 2); + } else { + header[1] = 0x80 | 127; + header.writeUInt32BE(Math.floor(body.length / 2 ** 32), 2); + header.writeUInt32BE(body.length >>> 0, 6); + } + const masked = Buffer.from(body); + for (let index = 0; index < masked.length; index += 1) { + masked[index] ^= mask[index % 4]; + } + socket.write(Buffer.concat([header, mask, masked])); +} + +function buildMetrics({ samples, requestsPerPipeline, concurrency, timeoutMs, loadDurationMs, backendUrl, sessionType, fakeProviderState }) { + const okSamples = samples.filter((sample) => sample.ok); + const statusCounts = {}; + const byPipeline = {}; + for (const sample of samples) { + statusCounts[sample.status] = (statusCounts[sample.status] || 0) + 1; + if (!byPipeline[sample.pipeline_label]) { + byPipeline[sample.pipeline_label] = { + ok_count: 0, + error_count: 0, + cross_pipeline_leak_count: 0, + timeout_count: 0, + }; + } + if (sample.ok) byPipeline[sample.pipeline_label].ok_count += 1; + else byPipeline[sample.pipeline_label].error_count += 1; + byPipeline[sample.pipeline_label].cross_pipeline_leak_count += sample.cross_pipeline_leak_count || 0; + if (sample.status === "timeout") byPipeline[sample.pipeline_label].timeout_count += 1; + } + const errorCount = samples.length - okSamples.length; + return { + probe: caseId, + backend_url: backendUrl, + session_type: sessionType, + requests_per_pipeline: requestsPerPipeline, + total_requests: requestsPerPipeline * 2, + completed_requests: samples.length, + concurrency, + timeout_ms: timeoutMs, + ok_count: okSamples.length, + error_count: errorCount, + timeout_count: samples.filter((sample) => sample.status === "timeout").length, + cross_pipeline_leak_count: samples.reduce((count, sample) => count + (sample.cross_pipeline_leak_count || 0), 0), + error_rate: samples.length === 0 ? 1 : rounded(errorCount / samples.length), + load_duration_ms: rounded(loadDurationMs), + throughput_rps: loadDurationMs <= 0 ? 0 : rounded(okSamples.length / (loadDurationMs / 1000)), + status_counts: statusCounts, + by_pipeline: byPipeline, + connected_ms: stats(samples.map((sample) => sample.connected_ms).filter(Number.isFinite)), + first_assistant_event_ms: stats(samples.map((sample) => sample.first_assistant_event_ms).filter(Number.isFinite)), + first_assistant_content_ms: stats(samples.map((sample) => sample.first_assistant_content_ms).filter(Number.isFinite)), + first_response_ms: stats(okSamples.map((sample) => sample.first_response_ms).filter(Number.isFinite)), + response_duration_ms: stats(okSamples.map((sample) => sample.response_duration_ms).filter(Number.isFinite)), + fake_provider: summarizeFakeProviderState(fakeProviderState), + provider_timing: buildProviderTimingMetrics(samples, fakeProviderState), + samples, + }; +} + +function buildThresholds(metrics) { + return { + cross_pipeline_leak_count: { + actual: metrics.cross_pipeline_leak_count, + max: 0, + pass: metrics.cross_pipeline_leak_count === 0, + }, + error_rate: { + actual: metrics.error_rate, + max: maxErrorRate, + pass: metrics.error_rate <= maxErrorRate, + }, + response_p95_ms: { + actual: metrics.response_duration_ms.p95, + max: responseP95BudgetMs, + pass: metrics.ok_count > 0 && metrics.response_duration_ms.p95 <= responseP95BudgetMs, + }, + }; +} + +function positiveInteger(value, fallback) { + const parsed = Number.parseInt(String(value || ""), 10); + return Number.isInteger(parsed) && parsed > 0 ? parsed : fallback; +} + +function positiveNumber(value, fallback) { + const parsed = Number(value || ""); + return Number.isFinite(parsed) && parsed >= 0 ? parsed : fallback; +} + +function bool(value, fallback) { + if (value === undefined || value === "") return fallback; + if (/^(1|true|yes|on)$/i.test(String(value))) return true; + if (/^(0|false|no|off)$/i.test(String(value))) return false; + return fallback; +} + +function textList(value) { + return String(value || "") + .split(/\r?\n|,/) + .map((item) => item.trim()) + .filter(Boolean); +} + +function rounded(value) { + return Number(value.toFixed(3)); +} + +function percentile(values, percentileValue) { + if (values.length === 0) return 0; + const sorted = [...values].sort((a, b) => a - b); + const index = Math.min(sorted.length - 1, Math.ceil((percentileValue / 100) * sorted.length) - 1); + return rounded(sorted[index]); +} + +function stats(values) { + if (values.length === 0) return { min: 0, p50: 0, p95: 0, p99: 0, max: 0 }; + return { + min: rounded(Math.min(...values)), + p50: percentile(values, 50), + p95: percentile(values, 95), + p99: percentile(values, 99), + max: rounded(Math.max(...values)), + }; +} + +function looksLikeEnvIssue(error) { + const message = String(error?.message || error || ""); + return /fetch failed|ECONNREFUSED|ENOTFOUND|LANGBOT_.*not configured|Could not read recovery_key|Backend did not respond/i.test(message); +} + +function safeReason(value) { + return redact(String(value || "")).slice(0, 1000); +} diff --git a/skills/skills/langbot-testing/probes/lib/fake-provider-timing.mjs b/skills/skills/langbot-testing/probes/lib/fake-provider-timing.mjs new file mode 100644 index 000000000..b383b2663 --- /dev/null +++ b/skills/skills/langbot-testing/probes/lib/fake-provider-timing.mjs @@ -0,0 +1,134 @@ +export function summarizeFakeProviderState(state) { + if (!state) return null; + const recentRequests = Array.isArray(state.recent_requests) ? state.recent_requests : []; + const chatRequests = recentRequests.filter((request) => String(request?.path || "").includes("/chat/completions")); + const successfulRequests = chatRequests.filter((request) => request?.status === "ok"); + const faultRequests = chatRequests.filter((request) => ( + request?.should_fail === true + || request?.status === "http_fault" + || (Number.isFinite(request?.http_status) && request.http_status >= 400) + )); + + return { + status: state.status || "unknown", + url: state.url || "", + request_count: Number.isFinite(state.request_count) ? state.request_count : recentRequests.length, + recent_request_count: recentRequests.length, + chat_request_count: chatRequests.length, + fault_count: faultRequests.length, + streamed_request_count: chatRequests.filter((request) => request?.stream === true).length, + duration_ms: stats(chatRequests.map((request) => numberOrNull(request?.duration_ms)).filter(Number.isFinite)), + successful_duration_ms: stats(successfulRequests.map((request) => numberOrNull(request?.duration_ms)).filter(Number.isFinite)), + first_chunk_ms: stats(successfulRequests.map((request) => numberOrNull(request?.first_chunk_ms)).filter(Number.isFinite)), + first_content_chunk_ms: stats(successfulRequests.map((request) => numberOrNull(request?.first_content_chunk_ms)).filter(Number.isFinite)), + content_chunk_count: stats(successfulRequests.map((request) => numberOrNull(request?.content_chunk_count)).filter(Number.isFinite)), + config: state.config || {}, + }; +} + +export function buildProviderTimingMetrics(samples, state) { + const recentRequests = Array.isArray(state?.recent_requests) ? state.recent_requests : []; + const byExpectedText = new Map(); + for (const request of recentRequests) { + const expected = String(request?.expected_text || ""); + if (!expected) continue; + if (!byExpectedText.has(expected)) byExpectedText.set(expected, []); + byExpectedText.get(expected).push(request); + } + + const segments = []; + const missingExpectedText = []; + for (const sample of samples) { + const expected = String(sample?.expected_text || ""); + if (!expected) continue; + const request = (byExpectedText.get(expected) || []).shift(); + if (!request) { + missingExpectedText.push(expected); + continue; + } + const segment = buildTimingSegment(sample, request); + if (segment) segments.push(segment); + } + + const values = (key) => segments.map((segment) => numberOrNull(segment[key])).filter(Number.isFinite); + return { + matched_request_count: segments.length, + missing_provider_match_count: missingExpectedText.length, + missing_expected_text: missingExpectedText.slice(0, 20), + send_to_provider_start_ms: stats(values("send_to_provider_start_ms")), + provider_duration_ms: stats(values("provider_duration_ms")), + provider_finish_to_ws_final_ms: stats(values("provider_finish_to_ws_final_ms")), + langbot_overhead_estimate_ms: stats(values("langbot_overhead_estimate_ms")), + e2e_minus_provider_ms: stats(values("e2e_minus_provider_ms")), + provider_first_content_to_ws_first_content_ms: stats(values("provider_first_content_to_ws_first_content_ms")), + segments, + }; +} + +function buildTimingSegment(sample, request) { + const sentEpochMs = numberOrNull(sample.sent_epoch_ms); + const finishedEpochMs = numberOrNull(sample.finished_epoch_ms); + const providerStartedEpochMs = numberOrNull(request.started_epoch_ms); + const providerFinishedEpochMs = numberOrNull(request.finished_epoch_ms); + const providerFirstContentEpochMs = numberOrNull(request.first_content_chunk_epoch_ms); + const wsFirstContentEpochMs = numberOrNull(sample.first_assistant_content_epoch_ms); + const responseDurationMs = numberOrNull(sample.response_duration_ms); + const providerDurationMs = numberOrNull(request.duration_ms); + + const sendToProviderStartMs = finiteDelta(providerStartedEpochMs, sentEpochMs); + const providerFinishToWsFinalMs = finiteDelta(finishedEpochMs, providerFinishedEpochMs); + const e2eMinusProviderMs = Number.isFinite(responseDurationMs) && Number.isFinite(providerDurationMs) + ? rounded(responseDurationMs - providerDurationMs) + : null; + const overheadEstimateMs = Number.isFinite(sendToProviderStartMs) && Number.isFinite(providerFinishToWsFinalMs) + ? rounded(sendToProviderStartMs + providerFinishToWsFinalMs) + : e2eMinusProviderMs; + + return { + sample_index: sample.index, + pipeline_label: sample.pipeline_label || "", + expected_text: sample.expected_text || "", + provider_request_id: request.id || "", + provider_request_number: request.request_number ?? null, + response_duration_ms: responseDurationMs, + provider_duration_ms: providerDurationMs, + send_to_provider_start_ms: sendToProviderStartMs, + provider_finish_to_ws_final_ms: providerFinishToWsFinalMs, + langbot_overhead_estimate_ms: overheadEstimateMs, + e2e_minus_provider_ms: e2eMinusProviderMs, + provider_first_content_to_ws_first_content_ms: finiteDelta(wsFirstContentEpochMs, providerFirstContentEpochMs), + provider_status: request.status || "", + provider_http_status: request.http_status ?? null, + }; +} + +function finiteDelta(left, right) { + return Number.isFinite(left) && Number.isFinite(right) ? rounded(left - right) : null; +} + +export function stats(values) { + if (values.length === 0) return { min: 0, p50: 0, p95: 0, p99: 0, max: 0 }; + return { + min: rounded(Math.min(...values)), + p50: percentile(values, 50), + p95: percentile(values, 95), + p99: percentile(values, 99), + max: rounded(Math.max(...values)), + }; +} + +export function percentile(values, percentileValue) { + if (values.length === 0) return 0; + const sorted = [...values].sort((a, b) => a - b); + const index = Math.min(sorted.length - 1, Math.ceil((percentileValue / 100) * sorted.length) - 1); + return rounded(sorted[index]); +} + +export function rounded(value) { + return Number(value.toFixed(3)); +} + +function numberOrNull(value) { + const number = Number(value); + return Number.isFinite(number) ? number : null; +} diff --git a/skills/skills/langbot-testing/references/performance-reliability-testing.md b/skills/skills/langbot-testing/references/performance-reliability-testing.md index 54592f6f7..8bbaa2fb4 100644 --- a/skills/skills/langbot-testing/references/performance-reliability-testing.md +++ b/skills/skills/langbot-testing/references/performance-reliability-testing.md @@ -144,8 +144,26 @@ request because Debug Chat broadcasts messages to every connection in the same session; unique tokens prevent one connection from counting another connection's response as its own. +When the fake provider is used, reports also include provider-side timing in +`metrics.json`: + +- `fake_provider.duration_ms` and `fake_provider.first_content_chunk_ms` + measure the controlled provider itself. +- `provider_timing.send_to_provider_start_ms` estimates WebSocket ingress, + pipeline dispatch, runner setup, and requester time before the provider + receives the request. +- `provider_timing.provider_finish_to_ws_final_ms` estimates the path from + provider completion back to the final Debug Chat WebSocket response. +- `provider_timing.langbot_overhead_estimate_ms` is the sum of those two + LangBot-side segments when wall-clock timestamps can be matched by the + unique expected response token. + After the baseline passes, run `langbot-fake-provider-debug-chat-slow-load` to keep the same live backend path while injecting deterministic streaming latency. +Run `langbot-fake-provider-debug-chat-cross-pipeline-isolation` to open +concurrent Debug Chat connections against two fake-provider pipelines and fail +if one pipeline receives the other pipeline's response token. This targets +global pipeline-state regressions in the WebSocket Debug Chat path. Run `langbot-fake-provider-debug-chat-fault-recovery` to inject bounded HTTP provider failures and require both observed failures and later successful requests. The fault-recovery case is deliberately sequential because failed @@ -165,6 +183,7 @@ Useful commands: ```bash rtk bin/lbs test run langbot-fake-provider-debug-chat-load --run-id langbot-fake-load-local rtk bin/lbs test run langbot-fake-provider-debug-chat-slow-load --run-id langbot-fake-slow-local +rtk bin/lbs test run langbot-fake-provider-debug-chat-cross-pipeline-isolation --run-id langbot-fake-cross-pipeline-local rtk bin/lbs test run langbot-fake-provider-debug-chat-fault-recovery --run-id langbot-fake-fault-local rtk bin/lbs test run langbot-space-debug-chat-concurrency-smoke --run-id langbot-space-smoke-local rtk bin/lbs suite run langbot-debug-chat-load-gate --run-id langbot-debug-chat-load-local --include-manual-check @@ -184,8 +203,9 @@ Use the smallest gate that answers the quality question: starting with Pipeline Debug Chat send-to-visible-completion latency. Run it only when the browser profile and target pipeline are ready. - `langbot-debug-chat-load-gate`: WebSocket Debug Chat load checks, starting - with controlled fake-provider baseline, slow-provider, and fault-recovery - profiles, plus an optional low-volume real Space-provider smoke. + with controlled fake-provider baseline, slow-provider, cross-pipeline + isolation, and fault-recovery profiles, plus an optional low-volume real + Space-provider smoke. - `langbot-performance-reliability-gate`: combined starter gate for synthetic contracts plus live backend checks. diff --git a/skills/skills/langbot-testing/suites/langbot-debug-chat-load-gate.yaml b/skills/skills/langbot-testing/suites/langbot-debug-chat-load-gate.yaml index 280a8dd47..4d9d90510 100644 --- a/skills/skills/langbot-testing/suites/langbot-debug-chat-load-gate.yaml +++ b/skills/skills/langbot-testing/suites/langbot-debug-chat-load-gate.yaml @@ -11,5 +11,6 @@ tags: cases: - langbot-fake-provider-debug-chat-load - langbot-fake-provider-debug-chat-slow-load + - langbot-fake-provider-debug-chat-cross-pipeline-isolation - langbot-fake-provider-debug-chat-fault-recovery - langbot-space-debug-chat-concurrency-smoke