test(skills): add debug chat timing and isolation probes

This commit is contained in:
huanghuoguoguo
2026-06-25 13:34:30 +08:00
parent 7e36869494
commit 9b0f5b36f3
10 changed files with 1501 additions and 22 deletions
+4
View File
@@ -123,6 +123,7 @@ Controlled Debug Chat message-path load gate:
bin/lbs suite plan langbot-debug-chat-load-gate
bin/lbs test run langbot-fake-provider-debug-chat-load --run-id langbot-fake-load-local
bin/lbs test run langbot-fake-provider-debug-chat-slow-load --run-id langbot-fake-slow-local
bin/lbs test run langbot-fake-provider-debug-chat-cross-pipeline-isolation --run-id langbot-fake-cross-pipeline-local
bin/lbs test run langbot-fake-provider-debug-chat-fault-recovery --run-id langbot-fake-fault-local
bin/lbs test run langbot-space-debug-chat-concurrency-smoke --run-id langbot-space-smoke-local
```
@@ -132,6 +133,9 @@ OpenAI-compatible fake provider, creates the matching provider/model/pipeline,
then sends concurrent WebSocket Debug Chat messages through the real backend.
Use `langbot-fake-provider-debug-chat-slow-load` to measure the same path under
deterministic streaming latency. Use
`langbot-fake-provider-debug-chat-cross-pipeline-isolation` to verify that
concurrent Debug Chat traffic on two pipelines does not leak assistant
responses across pipeline boundaries. Use
`langbot-fake-provider-debug-chat-fault-recovery` to inject bounded provider
HTTP failures and confirm later Debug Chat requests recover.
Use `langbot-space-debug-chat-concurrency-smoke` only as a low-volume live
@@ -0,0 +1,203 @@
#!/usr/bin/env node
import { spawn } from "node:child_process";
import { mkdir, readFile, writeFile } from "node:fs/promises";
import { dirname, resolve } from "node:path";
import { env } from "node:process";
import {
appendLine,
ensureEvidence,
evidencePaths,
loadEnvFiles,
redact,
writeResult,
} from "./lib/langbot-e2e.mjs";
const caseId = "ensure-fake-provider-cross-pipelines";
const DEFAULT_PIPELINE_A_NAME = "Agent QA Fake Provider Debug Chat A";
const DEFAULT_PIPELINE_B_NAME = "Agent QA Fake Provider Debug Chat B";
await loadEnvFiles();
const paths = evidencePaths(caseId);
await ensureEvidence(paths);
const writeEnv = process.argv.includes("--write-env");
const envLocalPath = resolve("skills/.env.local");
const pipelineAName = env.LANGBOT_FAKE_PROVIDER_PIPELINE_A_NAME || DEFAULT_PIPELINE_A_NAME;
const pipelineBName = env.LANGBOT_FAKE_PROVIDER_PIPELINE_B_NAME || DEFAULT_PIPELINE_B_NAME;
const result = {
source: "setup_automation",
case_id: caseId,
run_id: paths.runId,
status: "fail",
reason: "",
pipeline_a: {
name: pipelineAName,
id: "",
url: "",
},
pipeline_b: {
name: pipelineBName,
id: "",
url: "",
},
fake_provider: {
url: "",
base_url: "",
pid: null,
},
wrote_env: false,
evidence: {
console_log: paths.consoleLog,
automation_result_json: paths.automationResultJson,
result_json: paths.resultJson,
},
evidence_collected: ["api_diagnostic", "filesystem"],
};
try {
if (pipelineAName === pipelineBName) {
throw new Error("LANGBOT_FAKE_PROVIDER_PIPELINE_A_NAME and LANGBOT_FAKE_PROVIDER_PIPELINE_B_NAME must be different.");
}
const setupA = await runPipelineSetup(pipelineAName, "A");
const setupB = await runPipelineSetup(pipelineBName, "B");
result.pipeline_a = {
name: setupA.pipeline_name || pipelineAName,
id: setupA.pipeline_id || "",
url: setupA.pipeline_url || "",
};
result.pipeline_b = {
name: setupB.pipeline_name || pipelineBName,
id: setupB.pipeline_id || "",
url: setupB.pipeline_url || "",
};
result.fake_provider = {
url: setupB.fake_provider?.url || setupA.fake_provider?.url || "",
base_url: setupB.fake_provider?.base_url || setupA.fake_provider?.base_url || "",
pid: setupB.fake_provider?.pid ?? setupA.fake_provider?.pid ?? null,
};
if (!result.pipeline_a.url || !result.pipeline_b.url || !result.fake_provider.url) {
throw new Error("Cross-pipeline fake provider setup did not return both pipeline URLs and provider URL.");
}
if (writeEnv) {
await upsertEnvLocal(envLocalPath, {
LANGBOT_FAKE_PROVIDER_URL: result.fake_provider.url,
LANGBOT_FAKE_PROVIDER_BASE_URL: result.fake_provider.base_url,
LANGBOT_FAKE_PROVIDER_PID: result.fake_provider.pid ? String(result.fake_provider.pid) : "",
LANGBOT_FAKE_PROVIDER_PIPELINE_A_URL: result.pipeline_a.url,
LANGBOT_FAKE_PROVIDER_PIPELINE_A_NAME: result.pipeline_a.name,
LANGBOT_FAKE_PROVIDER_PIPELINE_B_URL: result.pipeline_b.url,
LANGBOT_FAKE_PROVIDER_PIPELINE_B_NAME: result.pipeline_b.name,
});
result.wrote_env = true;
}
result.status = "pass";
result.reason = "Fake provider cross-pipeline fixtures are configured.";
} catch (error) {
result.status = looksLikeEnvIssue(error) ? "env_issue" : "fail";
result.reason = safeReason(error.message);
} finally {
await writeResult(paths, result);
console.log(JSON.stringify(result, null, 2));
}
process.exit(result.status === "pass" ? 0 : result.status === "env_issue" ? 2 : 1);
function runPipelineSetup(pipelineName, label) {
return new Promise((resolvePromise, rejectPromise) => {
const child = spawn(process.execPath, ["scripts/e2e/ensure-fake-provider-pipeline.mjs"], {
cwd: resolve("."),
env: {
...env,
LANGBOT_FAKE_PROVIDER_PIPELINE_NAME: pipelineName,
LANGBOT_FAKE_PROVIDER_FIRST_TOKEN_DELAY_MS: env.LANGBOT_FAKE_PROVIDER_FIRST_TOKEN_DELAY_MS || "25",
LANGBOT_FAKE_PROVIDER_CHUNK_DELAY_MS: env.LANGBOT_FAKE_PROVIDER_CHUNK_DELAY_MS || "10",
LANGBOT_FAKE_PROVIDER_CHUNK_COUNT: env.LANGBOT_FAKE_PROVIDER_CHUNK_COUNT || "0",
LANGBOT_FAKE_PROVIDER_FAIL_FIRST_N: "0",
LANGBOT_FAKE_PROVIDER_FAIL_EVERY_N: "0",
LANGBOT_FAKE_PROVIDER_FAULT_STATUS: env.LANGBOT_FAKE_PROVIDER_FAULT_STATUS || "500",
LANGBOT_FAKE_PROVIDER_FAIL_AFTER_FIRST_CHUNK: "false",
LANGBOT_FAKE_PROVIDER_DYNAMIC_RESPONSE: "true",
},
stdio: ["ignore", "pipe", "pipe"],
});
let stdout = "";
let stderr = "";
child.stdout.on("data", (chunk) => {
const text = chunk.toString();
stdout += text;
appendLine(paths.consoleLog, `[setup ${label} stdout] ${text.trimEnd()}`).catch(() => {});
});
child.stderr.on("data", (chunk) => {
const text = chunk.toString();
stderr += text;
appendLine(paths.consoleLog, `[setup ${label} stderr] ${text.trimEnd()}`).catch(() => {});
});
child.on("error", rejectPromise);
child.on("close", (code) => {
const parsed = parseJsonOutput(stdout);
if (code !== 0 || parsed.status !== "pass") {
rejectPromise(new Error(parsed.reason || stderr || `Fake provider pipeline setup ${label} exited with ${code}.`));
return;
}
resolvePromise(parsed);
});
});
}
function parseJsonOutput(text) {
const trimmed = String(text || "").trim();
if (!trimmed) return {};
try {
return JSON.parse(trimmed);
} catch {
const start = trimmed.indexOf("{");
const end = trimmed.lastIndexOf("}");
if (start >= 0 && end > start) {
try {
return JSON.parse(trimmed.slice(start, end + 1));
} catch {
return {};
}
}
return {};
}
}
async function upsertEnvLocal(path, updates) {
await mkdir(dirname(path), { recursive: true });
let text = "";
try {
text = await readFile(path, "utf8");
} catch {
text = "";
}
const lines = text.split(/\r?\n/);
const seen = new Set();
const next = lines.map((line) => {
const trimmed = line.trim();
const match = trimmed.match(/^([A-Z][A-Z0-9_]*)=/);
if (!match || updates[match[1]] === undefined) return line;
seen.add(match[1]);
return `${match[1]}=${updates[match[1]]}`;
});
for (const [key, value] of Object.entries(updates)) {
if (!seen.has(key)) next.push(`${key}=${value}`);
}
await writeFile(path, `${next.join("\n").replace(/\n+$/, "")}\n`, "utf8");
}
function looksLikeEnvIssue(error) {
const message = String(error?.message || error || "");
return /fetch failed|ECONNREFUSED|ENOTFOUND|LANGBOT_.*not configured|Could not read recovery_key|Backend did not respond/i.test(message);
}
function safeReason(value) {
return redact(String(value || "")).slice(0, 1000);
}
+92 -6
View File
@@ -28,6 +28,8 @@ const recentRequests = [];
const server = createServer(async (request, response) => {
const startedAt = Date.now();
const startedPerf = performance.now();
let requestRecord = null;
const url = new URL(request.url || "/", `http://${request.headers.host || `${host}:${port}`}`);
try {
if (request.method === "GET" && url.pathname === "/healthz") {
@@ -98,13 +100,24 @@ const server = createServer(async (request, response) => {
const requestId = `chatcmpl-langbot-fake-${requestCount}`;
const shouldFail = requestCount <= config.fail_first_n
|| (config.fail_every_n > 0 && requestCount % config.fail_every_n === 0);
recordRequest({
const replyText = responseTextForBody(body);
requestRecord = recordRequest({
id: requestId,
request_number: requestCount,
path: url.pathname,
stream: Boolean(body.stream),
model: body.model || "",
message_count: Array.isArray(body.messages) ? body.messages.length : 0,
should_fail: shouldFail,
status: "running",
http_status: null,
expected_text: replyText,
response_text_preview: previewText(replyText),
started_at: new Date(startedAt).toISOString(),
started_epoch_ms: startedAt,
configured_first_token_delay_ms: config.first_token_delay_ms,
configured_chunk_delay_ms: config.chunk_delay_ms,
configured_chunk_count: config.chunk_count,
});
if (shouldFail) {
@@ -116,17 +129,21 @@ const server = createServer(async (request, response) => {
code: "fake_provider_fault",
},
});
finishRequestRecord(requestRecord, startedPerf, {
status: "http_fault",
http_status: config.fault_status,
});
return;
}
const replyText = responseTextForBody(body);
if (body.stream) {
await streamCompletion(response, {
requestId,
model: body.model || modelName,
content: replyText,
failAfterFirstChunk: config.fail_after_first_chunk,
requestRecord,
startedPerf,
});
} else {
await sleep(config.first_token_delay_ms + config.chunk_delay_ms);
@@ -135,6 +152,13 @@ const server = createServer(async (request, response) => {
model: body.model || modelName,
content: replyText,
}));
markRequestTiming(requestRecord, "first_chunk", startedPerf);
markRequestTiming(requestRecord, "first_content_chunk", startedPerf);
requestRecord.content_chunk_count = 1;
finishRequestRecord(requestRecord, startedPerf, {
status: "ok",
http_status: 200,
});
}
return;
}
@@ -146,6 +170,13 @@ const server = createServer(async (request, response) => {
},
});
} catch (error) {
if (requestRecord) {
finishRequestRecord(requestRecord, startedPerf, {
status: "fake_provider_error",
http_status: 500,
error: error instanceof Error ? error.message : String(error),
});
}
sendJson(response, 500, {
error: {
message: error instanceof Error ? error.message : String(error),
@@ -264,7 +295,14 @@ function completionPayload({ requestId, model, content }) {
};
}
async function streamCompletion(response, { requestId, model, content, failAfterFirstChunk: failMidStream }) {
async function streamCompletion(response, {
requestId,
model,
content,
failAfterFirstChunk: failMidStream,
requestRecord,
startedPerf,
}) {
response.writeHead(200, {
"content-type": "text/event-stream; charset=utf-8",
"cache-control": "no-cache",
@@ -272,6 +310,7 @@ async function streamCompletion(response, { requestId, model, content, failAfter
});
await sleep(config.first_token_delay_ms);
markRequestTiming(requestRecord, "first_chunk", startedPerf);
writeSse(response, {
id: requestId,
object: "chat.completion.chunk",
@@ -283,6 +322,8 @@ async function streamCompletion(response, { requestId, model, content, failAfter
const chunks = splitContent(content);
for (let index = 0; index < chunks.length; index += 1) {
await sleep(config.chunk_delay_ms);
if (index === 0) markRequestTiming(requestRecord, "first_content_chunk", startedPerf);
requestRecord.content_chunk_count = (requestRecord.content_chunk_count || 0) + 1;
writeSse(response, {
id: requestId,
object: "chat.completion.chunk",
@@ -291,6 +332,10 @@ async function streamCompletion(response, { requestId, model, content, failAfter
choices: [{ index: 0, delta: { content: chunks[index] }, finish_reason: null }],
});
if (failMidStream && index === 0) {
finishRequestRecord(requestRecord, startedPerf, {
status: "mid_stream_disconnect",
http_status: 200,
});
response.destroy(new Error("LangBot fake provider injected mid-stream disconnect"));
return;
}
@@ -312,6 +357,10 @@ async function streamCompletion(response, { requestId, model, content, failAfter
});
response.write("data: [DONE]\n\n");
response.end();
finishRequestRecord(requestRecord, startedPerf, {
status: "ok",
http_status: 200,
});
}
function writeSse(response, payload) {
@@ -365,11 +414,48 @@ function flattenContent(content) {
}
function recordRequest(entry) {
recentRequests.push({
const item = {
...entry,
at: new Date().toISOString(),
});
finished_at: null,
finished_epoch_ms: null,
duration_ms: null,
first_chunk_at: null,
first_chunk_epoch_ms: null,
first_chunk_ms: null,
first_content_chunk_at: null,
first_content_chunk_epoch_ms: null,
first_content_chunk_ms: null,
content_chunk_count: 0,
};
recentRequests.push(item);
while (recentRequests.length > config.request_log_limit) recentRequests.shift();
return item;
}
function markRequestTiming(entry, key, startedPerf) {
if (!entry || entry[`${key}_at`]) return;
const now = Date.now();
entry[`${key}_at`] = new Date(now).toISOString();
entry[`${key}_epoch_ms`] = now;
entry[`${key}_ms`] = rounded(performance.now() - startedPerf);
}
function finishRequestRecord(entry, startedPerf, updates = {}) {
if (!entry || entry.finished_at) return;
const now = Date.now();
Object.assign(entry, updates);
entry.finished_at = new Date(now).toISOString();
entry.finished_epoch_ms = now;
entry.duration_ms = rounded(performance.now() - startedPerf);
}
function rounded(value) {
return Number(value.toFixed(3));
}
function previewText(value) {
return String(value || "").slice(0, 120);
}
function resetRequestState() {
+40
View File
@@ -151,6 +151,7 @@
"agent-runner-release-preflight",
"agent-runner-runtime-chaos",
"dify-agent-debug-chat",
"langbot-fake-provider-debug-chat-cross-pipeline-isolation",
"langbot-fake-provider-debug-chat-fault-recovery",
"langbot-fake-provider-debug-chat-load",
"langbot-fake-provider-debug-chat-slow-load",
@@ -497,6 +498,44 @@
"backend_log"
]
},
{
"id": "langbot-fake-provider-debug-chat-cross-pipeline-isolation",
"title": "LangBot Debug Chat fake-provider cross-pipeline isolation probe",
"mode": "probe",
"area": "reliability",
"type": "reliability",
"priority": "p1",
"risk": "high",
"ci_eligible": false,
"tags": [
"reliability",
"debug-chat",
"websocket",
"fake-provider",
"isolation",
"concurrency",
"metrics"
],
"automation": "skills/langbot-testing/probes/langbot-debug-chat-cross-pipeline-isolation.mjs",
"setup_automation": [
"node:scripts/e2e/ensure-fake-provider-cross-pipelines.mjs --write-env"
],
"setup_provides_env": [
"LANGBOT_FAKE_PROVIDER_URL",
"LANGBOT_FAKE_PROVIDER_BASE_URL",
"LANGBOT_FAKE_PROVIDER_PID",
"LANGBOT_FAKE_PROVIDER_PIPELINE_A_URL",
"LANGBOT_FAKE_PROVIDER_PIPELINE_A_NAME",
"LANGBOT_FAKE_PROVIDER_PIPELINE_B_URL",
"LANGBOT_FAKE_PROVIDER_PIPELINE_B_NAME"
],
"evidence_required": [
"metrics",
"network",
"api_diagnostic",
"filesystem"
]
},
{
"id": "langbot-fake-provider-debug-chat-fault-recovery",
"title": "LangBot Debug Chat fake-provider fault recovery probe",
@@ -1456,6 +1495,7 @@
"cases": [
"langbot-fake-provider-debug-chat-load",
"langbot-fake-provider-debug-chat-slow-load",
"langbot-fake-provider-debug-chat-cross-pipeline-isolation",
"langbot-fake-provider-debug-chat-fault-recovery",
"langbot-space-debug-chat-concurrency-smoke"
]
@@ -0,0 +1,81 @@
id: langbot-fake-provider-debug-chat-cross-pipeline-isolation
title: "LangBot Debug Chat fake-provider cross-pipeline isolation probe"
mode: probe
area: reliability
type: reliability
priority: p1
risk: high
ci_eligible: false
tags:
- reliability
- debug-chat
- websocket
- fake-provider
- isolation
- concurrency
- metrics
skills:
- langbot-env-setup
- langbot-testing
env:
- LANGBOT_BACKEND_URL
- LANGBOT_FRONTEND_URL
- LANGBOT_E2E_LOGIN_USER
automation: skills/langbot-testing/probes/langbot-debug-chat-cross-pipeline-isolation.mjs
automation_env:
- LANGBOT_BACKEND_URL
- LANGBOT_E2E_LOGIN_USER
- LANGBOT_FAKE_PROVIDER_URL
- LANGBOT_FAKE_PROVIDER_PIPELINE_A_URL
- LANGBOT_FAKE_PROVIDER_PIPELINE_A_NAME
- LANGBOT_FAKE_PROVIDER_PIPELINE_B_URL
- LANGBOT_FAKE_PROVIDER_PIPELINE_B_NAME
automation_debug_chat_load_requests: "6"
automation_debug_chat_load_concurrency: "4"
automation_debug_chat_load_timeout_ms: "30000"
automation_debug_chat_load_response_p95_ms: "5000"
automation_debug_chat_load_max_error_rate: "0"
automation_debug_chat_load_prompt_template: '请只回复 "{expected}",不要解释,不要添加其他字符。'
automation_debug_chat_load_stream: "true"
automation_debug_chat_load_reset: "true"
metrics_thresholds_json: '{"cross_pipeline_leak_count":{"max":0},"response_p95_ms":{"max":5000},"error_rate":{"max":0}}'
load_profile_json: '{"requests_per_pipeline":6,"pipelines":2,"concurrency":4,"path":"Pipeline Debug Chat WebSocket","provider":"controlled fake OpenAI-compatible provider","metric":"cross-pipeline response isolation and send-to-final-assistant-response"}'
setup_automation:
- "node:scripts/e2e/ensure-fake-provider-cross-pipelines.mjs --write-env"
setup_provides_env:
- LANGBOT_FAKE_PROVIDER_URL
- LANGBOT_FAKE_PROVIDER_BASE_URL
- LANGBOT_FAKE_PROVIDER_PID
- LANGBOT_FAKE_PROVIDER_PIPELINE_A_URL
- LANGBOT_FAKE_PROVIDER_PIPELINE_A_NAME
- LANGBOT_FAKE_PROVIDER_PIPELINE_B_URL
- LANGBOT_FAKE_PROVIDER_PIPELINE_B_NAME
steps:
- "Start or reuse the local fake OpenAI-compatible provider."
- "Create or update two local-agent pipelines that both point at the controlled fake provider."
- "Reset both Debug Chat sessions and the fake-provider request log."
- "Open concurrent WebSocket Debug Chat connections to both pipelines and send unique pipeline-scoped response tokens."
checks:
- "automation-result.json status is pass only when every request receives its own expected token and cross_pipeline_leak_count is zero."
- "metrics_summary includes by_pipeline status counts, fake-provider request count, and LangBot/provider timing estimates."
- "samples.json contains per-request pipeline labels so any leak can be attributed to the receiving pipeline."
evidence_required:
- metrics
- network
- api_diagnostic
- filesystem
diagnostics:
- "This probe targets Debug Chat isolation under concurrent traffic from two pipelines."
- "It is designed to expose regressions where global pipeline state causes one pipeline's assistant response to be delivered to another pipeline's Debug Chat session."
- "Same-pipeline foreign responses are tolerated because Debug Chat intentionally broadcasts within the same pipeline/session; cross-pipeline tokens are never tolerated."
success_patterns:
- "Debug Chat cross-pipeline isolation probe passed"
failure_patterns:
- "cross_pipeline_leak"
- "Timed out after"
- "WebSocket connection error"
- "Final assistant response did not include"
troubleshooting:
- backend-not-listening
- debug-chat-history-contaminates-automation
- local-agent-model-route-unavailable
@@ -17,6 +17,10 @@ import {
resetAndAuthLocalUser,
writeResult,
} from "../../../scripts/e2e/lib/langbot-e2e.mjs";
import {
buildProviderTimingMetrics,
summarizeFakeProviderState,
} from "./lib/fake-provider-timing.mjs";
const DEFAULT_LOCAL_PASSWORD = "LangBotE2ELocalPass!2026";
@@ -179,11 +183,18 @@ try {
error_rate: metrics.error_rate,
response_p50_ms: metrics.response_duration_ms.p50,
response_p95_ms: metrics.response_duration_ms.p95,
first_assistant_event_p95_ms: metrics.first_assistant_event_ms.p95,
first_assistant_content_p95_ms: metrics.first_assistant_content_ms.p95,
first_response_p95_ms: metrics.first_response_ms.p95,
throughput_rps: metrics.throughput_rps,
status_counts: metrics.status_counts,
fake_provider_request_count: metrics.fake_provider?.request_count ?? null,
fake_provider_fault_count: metrics.fake_provider?.fault_count ?? null,
fake_provider_duration_p95_ms: metrics.provider_timing?.provider_duration_ms.p95 ?? null,
langbot_overhead_estimate_p95_ms: metrics.provider_timing?.langbot_overhead_estimate_ms.p95 ?? null,
send_to_provider_start_p95_ms: metrics.provider_timing?.send_to_provider_start_ms.p95 ?? null,
provider_finish_to_ws_final_p95_ms: metrics.provider_timing?.provider_finish_to_ws_final_ms.p95 ?? null,
provider_timing_matched_request_count: metrics.provider_timing?.matched_request_count ?? null,
};
result.thresholds_summary = thresholds;
result.artifacts = {
@@ -391,9 +402,25 @@ function runSingleRequest({
expected_text: expected,
prompt,
response_text: "",
started_at: new Date().toISOString(),
started_epoch_ms: Date.now(),
connected_at: null,
connected_epoch_ms: null,
sent_at: null,
sent_epoch_ms: null,
first_assistant_event_at: null,
first_assistant_event_epoch_ms: null,
first_assistant_event_ms: null,
first_assistant_content_at: null,
first_assistant_content_epoch_ms: null,
first_assistant_content_ms: null,
first_response_at: null,
first_response_epoch_ms: null,
connected_ms: null,
first_response_ms: null,
response_duration_ms: null,
finished_at: null,
finished_epoch_ms: null,
event_count: 0,
foreign_response_count: 0,
last_foreign_response_text: "",
@@ -413,6 +440,9 @@ function runSingleRequest({
client = openRawWebSocket(wsUrl, {
onOpen() {
connectedAt = performance.now();
const now = Date.now();
sample.connected_at = new Date(now).toISOString();
sample.connected_epoch_ms = now;
sample.connected_ms = rounded(connectedAt - startedAt);
},
onMessage(text) {
@@ -435,6 +465,9 @@ function runSingleRequest({
if (data.type === "connected") {
sentAt = performance.now();
const now = Date.now();
sample.sent_at = new Date(now).toISOString();
sample.sent_epoch_ms = now;
client.send(JSON.stringify({
type: "message",
message: [{ type: "Plain", text: prompt }],
@@ -449,7 +482,15 @@ function runSingleRequest({
if (data.type !== "response" || data.data?.role !== "assistant") return;
const content = String(data.data.content || "");
markFirstAssistantEvent(sample, sentAt);
if (content) sample.response_text = content;
if (content) markFirstAssistantContent(sample, sentAt);
if (content.includes(expected) && sample.first_response_ms === null && sentAt > 0) {
const now = Date.now();
sample.first_response_at = new Date(now).toISOString();
sample.first_response_epoch_ms = now;
sample.first_response_ms = rounded(performance.now() - sentAt);
}
if (data.data.is_final === true) {
const ok = sample.response_text.includes(expected);
if (ok) {
@@ -488,6 +529,9 @@ function runSingleRequest({
: reason || "";
if (sentAt > 0) sample.response_duration_ms = rounded(performance.now() - sentAt);
else sample.response_duration_ms = rounded(performance.now() - startedAt);
const now = Date.now();
sample.finished_at = new Date(now).toISOString();
sample.finished_epoch_ms = now;
try {
client?.close();
} catch {
@@ -498,6 +542,22 @@ function runSingleRequest({
});
}
function markFirstAssistantEvent(sample, sentAt) {
if (sample.first_assistant_event_ms !== null || sentAt <= 0) return;
const now = Date.now();
sample.first_assistant_event_at = new Date(now).toISOString();
sample.first_assistant_event_epoch_ms = now;
sample.first_assistant_event_ms = rounded(performance.now() - sentAt);
}
function markFirstAssistantContent(sample, sentAt) {
if (sample.first_assistant_content_ms !== null || sentAt <= 0) return;
const now = Date.now();
sample.first_assistant_content_at = new Date(now).toISOString();
sample.first_assistant_content_epoch_ms = now;
sample.first_assistant_content_ms = rounded(performance.now() - sentAt);
}
function containsLoadToken(text, prefix) {
const escaped = String(prefix).replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
return new RegExp(`${escaped}-\\d{4}`).test(String(text || ""));
@@ -709,27 +769,16 @@ function buildMetrics({ samples, totalRequests, concurrency, timeoutMs, loadDura
throughput_rps: loadDurationMs <= 0 ? 0 : rounded(okSamples.length / (loadDurationMs / 1000)),
status_counts: statusCounts,
connected_ms: stats(samples.map((sample) => sample.connected_ms).filter(Number.isFinite)),
first_assistant_event_ms: stats(samples.map((sample) => sample.first_assistant_event_ms).filter(Number.isFinite)),
first_assistant_content_ms: stats(samples.map((sample) => sample.first_assistant_content_ms).filter(Number.isFinite)),
first_response_ms: stats(okSamples.map((sample) => sample.first_response_ms).filter(Number.isFinite)),
response_duration_ms: stats(okSamples.map((sample) => sample.response_duration_ms).filter(Number.isFinite)),
fake_provider: summarizeFakeProviderState(fakeProviderState),
provider_timing: buildProviderTimingMetrics(samples, fakeProviderState),
samples,
};
}
function summarizeFakeProviderState(state) {
if (!state) return null;
const recentRequests = Array.isArray(state.recent_requests) ? state.recent_requests : [];
return {
status: state.status || "unknown",
url: state.url || "",
request_count: Number.isFinite(state.request_count) ? state.request_count : recentRequests.length,
recent_request_count: recentRequests.length,
fault_count: recentRequests.filter((request) => request?.should_fail === true).length,
streamed_request_count: recentRequests.filter((request) => request?.stream === true).length,
config: state.config || {},
};
}
function buildThresholds(metrics) {
const thresholds = {
error_rate: { actual: metrics.error_rate, max: maxErrorRate, pass: metrics.error_rate <= maxErrorRate },
@@ -0,0 +1,861 @@
#!/usr/bin/env node
import crypto from "node:crypto";
import net from "node:net";
import tls from "node:tls";
import { mkdir, writeFile } from "node:fs/promises";
import { resolve } from "node:path";
import { env, exit } from "node:process";
import {
apiJson,
appendLine,
ensureEvidence,
evidencePaths,
loadEnvFiles,
localIsoWithOffset,
redact,
resetAndAuthLocalUser,
writeResult,
} from "../../../scripts/e2e/lib/langbot-e2e.mjs";
import {
buildProviderTimingMetrics,
summarizeFakeProviderState,
} from "./lib/fake-provider-timing.mjs";
const DEFAULT_LOCAL_PASSWORD = "LangBotE2ELocalPass!2026";
await loadEnvFiles();
const caseId = env.LBS_CASE_ID || "langbot-debug-chat-cross-pipeline-isolation";
const paths = evidencePaths(caseId);
await ensureEvidence(paths);
const startedAt = new Date();
const metricsPath = resolve(paths.evidenceDir, "metrics.json");
const samplesPath = resolve(paths.evidenceDir, "samples.json");
const fakeProviderStatePath = resolve(paths.evidenceDir, "fake-provider-state.json");
const resetDiagnosticPath = resolve(paths.evidenceDir, "debug-chat-reset-diagnostic.json");
const backendUrl = env.LANGBOT_BACKEND_URL || "";
const fakeProviderUrl = env.LANGBOT_FAKE_PROVIDER_URL || "";
const sessionType = env.LANGBOT_DEBUG_CHAT_LOAD_SESSION_TYPE || env.LANGBOT_E2E_DEBUG_CHAT_SESSION_TYPE || "person";
const requestsPerPipeline = positiveInteger(env.LANGBOT_DEBUG_CHAT_LOAD_REQUESTS, 6);
const concurrency = Math.min(requestsPerPipeline * 2, positiveInteger(env.LANGBOT_DEBUG_CHAT_LOAD_CONCURRENCY, 4));
const timeoutMs = positiveInteger(env.LANGBOT_DEBUG_CHAT_LOAD_TIMEOUT_MS, 30_000);
const stream = bool(env.LANGBOT_DEBUG_CHAT_LOAD_STREAM, true);
const resetBeforeRun = bool(env.LANGBOT_DEBUG_CHAT_LOAD_RESET, true);
const responseP95BudgetMs = positiveNumber(env.LANGBOT_DEBUG_CHAT_LOAD_RESPONSE_P95_MS, 5_000);
const maxErrorRate = positiveNumber(env.LANGBOT_DEBUG_CHAT_LOAD_MAX_ERROR_RATE, 0);
const promptTemplate = env.LANGBOT_DEBUG_CHAT_LOAD_PROMPT_TEMPLATE
|| "请只回复 \"{expected}\",不要解释,不要添加其他字符。";
const failureSignals = textList(env.LANGBOT_E2E_FAILURE_SIGNALS || env.LANGBOT_DEBUG_CHAT_LOAD_FAILURE_SIGNALS || "");
const pipelineTargets = [
{
label: "A",
expectedPrefix: "PIPEA",
otherPrefix: "PIPEB",
url: env.LANGBOT_FAKE_PROVIDER_PIPELINE_A_URL || "",
name: env.LANGBOT_FAKE_PROVIDER_PIPELINE_A_NAME || "",
},
{
label: "B",
expectedPrefix: "PIPEB",
otherPrefix: "PIPEA",
url: env.LANGBOT_FAKE_PROVIDER_PIPELINE_B_URL || "",
name: env.LANGBOT_FAKE_PROVIDER_PIPELINE_B_NAME || "",
},
];
const result = {
source: "automation",
case_id: caseId,
run_id: paths.runId,
status: "fail",
reason: "",
started_at: startedAt.toISOString(),
started_at_local: localIsoWithOffset(startedAt),
finished_at: "",
finished_at_local: "",
duration_ms: 0,
backend_url: backendUrl,
session_type: sessionType,
pipelines: [],
load_profile: {
requests_per_pipeline: requestsPerPipeline,
total_requests: requestsPerPipeline * 2,
concurrency,
timeout_ms: timeoutMs,
stream,
reset_before_run: resetBeforeRun,
},
evidence: {
network_log: paths.networkLog,
metrics_json: metricsPath,
samples_json: samplesPath,
fake_provider_state_json: fakeProviderStatePath,
debug_chat_reset_diagnostic_json: resetDiagnosticPath,
automation_result_json: paths.automationResultJson,
result_json: paths.resultJson,
},
evidence_collected: ["metrics", "network", "api_diagnostic", "filesystem"],
};
try {
if (!backendUrl) {
result.status = "env_issue";
throw new Error("LANGBOT_BACKEND_URL is not configured.");
}
if (!["person", "group"].includes(sessionType)) {
throw new Error(`LANGBOT_DEBUG_CHAT_LOAD_SESSION_TYPE must be person or group, got ${sessionType}.`);
}
for (const target of pipelineTargets) {
if (!target.url && !target.name) {
result.status = "env_issue";
throw new Error(`Set LANGBOT_FAKE_PROVIDER_PIPELINE_${target.label}_URL or LANGBOT_FAKE_PROVIDER_PIPELINE_${target.label}_NAME.`);
}
}
const backendReady = await backendReachable(backendUrl);
if (!backendReady) {
result.status = "env_issue";
throw new Error(`Backend did not respond at ${backendUrl}.`);
}
const user = env.LANGBOT_E2E_LOGIN_USER || "";
const password = env.LANGBOT_E2E_LOGIN_PASSWORD || DEFAULT_LOCAL_PASSWORD;
if (!user) {
result.status = "env_issue";
throw new Error("LANGBOT_E2E_LOGIN_USER is required so this probe can resolve/reset Debug Chat sessions.");
}
const auth = await resetAndAuthLocalUser({ backendUrl, user, password });
const pipelines = [];
for (const target of pipelineTargets) {
const pipeline = await resolvePipeline({
backendUrl,
token: auth.token,
pipelineUrl: target.url,
pipelineName: target.name,
});
pipelines.push({
...target,
id: pipeline.id,
name: pipeline.name || target.name,
wsUrl: websocketUrl(backendUrl, pipeline.id, sessionType),
});
}
result.pipelines = pipelines.map((pipeline) => ({
label: pipeline.label,
id: pipeline.id,
name: pipeline.name,
url: pipeline.url,
}));
if (resetBeforeRun) {
const resetDiagnostics = [];
for (const pipeline of pipelines) {
const reset = await apiJson(backendUrl, `/api/v1/pipelines/${encodeURIComponent(pipeline.id)}/ws/reset/${encodeURIComponent(sessionType)}`, {
method: "POST",
token: auth.token,
});
resetDiagnostics.push({
pipeline_label: pipeline.label,
pipeline_id: pipeline.id,
status: isApiFailure(reset) ? "fail" : "ready",
http_status: reset.status,
code: reset.json.code ?? null,
reason: isApiFailure(reset) ? reset.json.msg || "Debug Chat reset failed." : "Debug Chat session reset.",
});
}
await writeFile(resetDiagnosticPath, `${JSON.stringify(resetDiagnostics, null, 2)}\n`, "utf8");
const failedReset = resetDiagnostics.find((item) => item.status === "fail");
if (failedReset) throw new Error(failedReset.reason);
}
await resetFakeProvider(fakeProviderUrl);
const jobs = [];
for (let index = 0; index < requestsPerPipeline; index += 1) {
for (const pipeline of pipelines) {
jobs.push({ ...pipeline, index });
}
}
const loadStartedAt = performance.now();
const samples = await runLoad({
jobs,
concurrency,
timeoutMs,
promptTemplate,
stream,
failureSignals,
});
const loadDurationMs = performance.now() - loadStartedAt;
const fakeProviderState = await readFakeProviderState(fakeProviderUrl);
if (fakeProviderState) {
await writeFile(fakeProviderStatePath, `${JSON.stringify(fakeProviderState, null, 2)}\n`, "utf8");
}
const metrics = buildMetrics({
samples,
requestsPerPipeline,
concurrency,
timeoutMs,
loadDurationMs,
backendUrl,
sessionType,
fakeProviderState,
});
const thresholds = buildThresholds(metrics);
const passed = Object.values(thresholds).every((item) => item.pass);
result.status = passed ? "pass" : "fail";
result.reason = passed
? "Debug Chat cross-pipeline isolation probe passed all thresholds."
: "Debug Chat cross-pipeline isolation probe found leaks, errors, or latency threshold breaches.";
result.metrics_summary = {
requests_per_pipeline: metrics.requests_per_pipeline,
total_requests: metrics.total_requests,
concurrency: metrics.concurrency,
ok_count: metrics.ok_count,
error_count: metrics.error_count,
cross_pipeline_leak_count: metrics.cross_pipeline_leak_count,
timeout_count: metrics.timeout_count,
error_rate: metrics.error_rate,
response_p95_ms: metrics.response_duration_ms.p95,
first_response_p95_ms: metrics.first_response_ms.p95,
throughput_rps: metrics.throughput_rps,
status_counts: metrics.status_counts,
by_pipeline: metrics.by_pipeline,
fake_provider_request_count: metrics.fake_provider?.request_count ?? null,
fake_provider_duration_p95_ms: metrics.provider_timing?.provider_duration_ms.p95 ?? null,
langbot_overhead_estimate_p95_ms: metrics.provider_timing?.langbot_overhead_estimate_ms.p95 ?? null,
send_to_provider_start_p95_ms: metrics.provider_timing?.send_to_provider_start_ms.p95 ?? null,
provider_finish_to_ws_final_p95_ms: metrics.provider_timing?.provider_finish_to_ws_final_ms.p95 ?? null,
};
result.thresholds_summary = thresholds;
result.artifacts = {
metrics_json: metricsPath,
samples_json: samplesPath,
fake_provider_state_json: fakeProviderState ? fakeProviderStatePath : "",
network_log: paths.networkLog,
automation_result_json: paths.automationResultJson,
result_json: paths.resultJson,
};
await writeFile(metricsPath, `${JSON.stringify({ ...metrics, thresholds }, null, 2)}\n`, "utf8");
await writeFile(samplesPath, `${JSON.stringify(samples, null, 2)}\n`, "utf8");
} catch (error) {
if (!["env_issue", "blocked"].includes(result.status)) {
result.status = looksLikeEnvIssue(error) ? "env_issue" : "fail";
}
result.reason = result.reason || safeReason(error.message);
} finally {
const finishedAt = new Date();
result.finished_at = finishedAt.toISOString();
result.finished_at_local = localIsoWithOffset(finishedAt);
result.duration_ms = finishedAt.getTime() - startedAt.getTime();
await mkdir(paths.evidenceDir, { recursive: true });
await writeResult(paths, result);
console.log(JSON.stringify(result, null, 2));
}
exit(result.status === "pass" ? 0 : result.status === "env_issue" || result.status === "blocked" ? 2 : 1);
async function backendReachable(baseUrl) {
try {
const response = await fetch(`${baseUrl.replace(/\/$/, "")}/healthz`, {
signal: AbortSignal.timeout(3000),
});
return response.status < 500;
} catch {
return false;
}
}
async function resetFakeProvider(rootUrl) {
if (!rootUrl) return;
try {
await fetch(`${normalizeProviderRootUrl(rootUrl)}/__qa/reset`, {
method: "POST",
signal: AbortSignal.timeout(3000),
});
} catch {
// Missing fake-provider diagnostics should not hide the isolation result.
}
}
async function readFakeProviderState(rootUrl) {
if (!rootUrl) return null;
try {
const response = await fetch(`${normalizeProviderRootUrl(rootUrl)}/__qa/config`, {
signal: AbortSignal.timeout(3000),
});
const json = await response.json().catch(() => ({}));
return {
status: response.ok && json.ok === true ? "loaded" : "unavailable",
url: normalizeProviderRootUrl(rootUrl),
http_status: response.status,
model: json.model || "",
config: json.config || {},
request_count: Number.isFinite(json.request_count) ? json.request_count : null,
recent_requests: Array.isArray(json.recent_requests) ? json.recent_requests : [],
};
} catch (error) {
return {
status: "unavailable",
url: normalizeProviderRootUrl(rootUrl),
reason: safeReason(error.message),
request_count: null,
recent_requests: [],
};
}
}
function normalizeProviderRootUrl(value) {
const trimmed = String(value || "").trim().replace(/\/$/, "");
return trimmed.endsWith("/v1") ? trimmed.slice(0, -3) : trimmed;
}
function pipelineIdFromUrl(url) {
if (!url) return "";
try {
const parsed = new URL(url);
return parsed.searchParams.get("id") || "";
} catch {
return "";
}
}
async function resolvePipeline({ backendUrl, token, pipelineUrl, pipelineName }) {
const idFromUrl = pipelineIdFromUrl(pipelineUrl);
if (idFromUrl) {
const response = await apiJson(backendUrl, `/api/v1/pipelines/${encodeURIComponent(idFromUrl)}`, { token });
const pipeline = response.json.data?.pipeline;
if (isApiFailure(response) || !pipeline?.uuid) {
throw new Error(response.json.msg || `Could not load pipeline ${idFromUrl}.`);
}
return { id: pipeline.uuid, name: pipeline.name || "" };
}
if (!pipelineName) {
throw new Error("Set pipeline URL or name before running this probe.");
}
const response = await apiJson(backendUrl, "/api/v1/pipelines", { token });
if (isApiFailure(response)) {
throw new Error(response.json.msg || "Failed to list pipelines.");
}
const pipeline = (response.json.data?.pipelines || []).find((item) => item.name === pipelineName);
if (!pipeline?.uuid) {
throw new Error(`Could not find pipeline named ${pipelineName}.`);
}
return { id: pipeline.uuid, name: pipeline.name || pipelineName };
}
function isApiFailure(response) {
return response.status >= 400 || (response.json.code !== undefined && response.json.code !== 0);
}
function websocketUrl(baseUrl, pipelineId, sessionTypeValue) {
const parsed = new URL(baseUrl);
parsed.protocol = parsed.protocol === "https:" ? "wss:" : "ws:";
parsed.pathname = `/api/v1/pipelines/${encodeURIComponent(pipelineId)}/ws/connect`;
parsed.search = `?session_type=${encodeURIComponent(sessionTypeValue)}`;
return parsed.toString();
}
async function runLoad(options) {
const samples = [];
const queue = [...options.jobs];
const workers = Array.from({ length: options.concurrency }, async () => {
while (queue.length > 0) {
const job = queue.shift();
if (!job) continue;
const sample = await runSingleRequest({ ...options, job });
samples.push(sample);
}
});
await Promise.all(workers);
return samples.sort((left, right) => (
left.pipeline_label.localeCompare(right.pipeline_label) || left.index - right.index
));
}
function expectedForIndex(prefix, index) {
return `${prefix}-${String(index + 1).padStart(4, "0")}`;
}
function promptForIndex(template, expected) {
return template.replaceAll("{expected}", expected);
}
function runSingleRequest({
job,
timeoutMs,
promptTemplate,
stream,
failureSignals,
}) {
return new Promise((resolvePromise) => {
const expected = expectedForIndex(job.expectedPrefix, job.index);
const prompt = promptForIndex(promptTemplate, expected);
const sample = {
index: job.index,
pipeline_label: job.label,
pipeline_id: job.id,
pipeline_name: job.name,
status: "running",
ok: false,
expected_text: expected,
expected_prefix: job.expectedPrefix,
other_prefix: job.otherPrefix,
prompt,
response_text: "",
started_at: new Date().toISOString(),
started_epoch_ms: Date.now(),
connected_at: null,
connected_epoch_ms: null,
sent_at: null,
sent_epoch_ms: null,
first_assistant_event_at: null,
first_assistant_event_epoch_ms: null,
first_assistant_event_ms: null,
first_assistant_content_at: null,
first_assistant_content_epoch_ms: null,
first_assistant_content_ms: null,
first_response_at: null,
first_response_epoch_ms: null,
connected_ms: null,
first_response_ms: null,
response_duration_ms: null,
finished_at: null,
finished_epoch_ms: null,
event_count: 0,
same_pipeline_foreign_response_count: 0,
cross_pipeline_leak_count: 0,
last_foreign_response_text: "",
error: "",
close_code: null,
close_reason: "",
};
let closed = false;
let connectedAt = 0;
let sentAt = 0;
const startedPerf = performance.now();
let client = null;
const timer = setTimeout(() => {
finish("timeout", `Timed out after ${timeoutMs} ms.`);
}, timeoutMs);
client = openRawWebSocket(job.wsUrl, {
onOpen() {
connectedAt = performance.now();
const now = Date.now();
sample.connected_at = new Date(now).toISOString();
sample.connected_epoch_ms = now;
sample.connected_ms = rounded(connectedAt - startedPerf);
},
onMessage(text) {
sample.event_count += 1;
let data;
try {
data = JSON.parse(String(text || ""));
} catch (error) {
finish("error", `Invalid WebSocket JSON: ${error.message}`);
return;
}
appendLine(paths.networkLog, JSON.stringify({
pipeline_label: job.label,
request_index: job.index,
type: data.type,
session_type: data.session_type || "",
role: data.data?.role || "",
is_final: data.data?.is_final ?? null,
content_preview: redact(String(data.data?.content || data.message || "").slice(0, 200)),
})).catch(() => {});
if (data.type === "connected") {
sentAt = performance.now();
const now = Date.now();
sample.sent_at = new Date(now).toISOString();
sample.sent_epoch_ms = now;
client.send(JSON.stringify({
type: "message",
message: [{ type: "Plain", text: prompt }],
stream,
}));
return;
}
if (data.type === "error") {
finish("error", data.message || "WebSocket error message.");
return;
}
if (data.type !== "response" || data.data?.role !== "assistant") return;
const content = String(data.data.content || "");
markFirstAssistantEvent(sample, sentAt);
if (content) sample.response_text = content;
if (content) markFirstAssistantContent(sample, sentAt);
if (containsPipelineToken(content, job.otherPrefix)) {
sample.cross_pipeline_leak_count += 1;
finish("cross_pipeline_leak", `Pipeline ${job.label} received response from ${job.otherPrefix}: ${content}`);
return;
}
if (content.includes(expected) && sample.first_response_ms === null && sentAt > 0) {
const now = Date.now();
sample.first_response_at = new Date(now).toISOString();
sample.first_response_epoch_ms = now;
sample.first_response_ms = rounded(performance.now() - sentAt);
}
if (data.data.is_final === true) {
const ok = sample.response_text.includes(expected);
if (ok) {
if (sample.first_response_ms === null && sentAt > 0) {
const now = Date.now();
sample.first_response_at = new Date(now).toISOString();
sample.first_response_epoch_ms = now;
sample.first_response_ms = rounded(performance.now() - sentAt);
}
finish("pass", "");
} else if (matchesFailureSignal(sample.response_text, failureSignals)) {
finish("app_error", `Assistant final response matched a failure signal: ${sample.response_text}`);
} else if (containsPipelineToken(sample.response_text, job.expectedPrefix)) {
sample.same_pipeline_foreign_response_count += 1;
sample.last_foreign_response_text = sample.response_text;
} else {
finish("mismatch", `Final assistant response did not include ${expected}: ${sample.response_text}`);
}
}
},
onError(error) {
finish("connection_error", `WebSocket connection error: ${error.message}`);
},
onClose(event) {
sample.close_code = event.code;
sample.close_reason = event.reason || "";
if (!closed) finish("closed", `WebSocket closed before final assistant response: ${event.code}`);
},
});
function finish(status, reason) {
if (closed) return;
closed = true;
clearTimeout(timer);
sample.status = status;
sample.ok = status === "pass";
sample.error = status === "timeout" && sample.same_pipeline_foreign_response_count > 0
? `${reason || ""} Saw ${sample.same_pipeline_foreign_response_count} same-pipeline foreign assistant response(s); last=${sample.last_foreign_response_text}`
: reason || "";
if (sentAt > 0) sample.response_duration_ms = rounded(performance.now() - sentAt);
else sample.response_duration_ms = rounded(performance.now() - startedPerf);
const now = Date.now();
sample.finished_at = new Date(now).toISOString();
sample.finished_epoch_ms = now;
try {
client?.close();
} catch {
// Closing a failed socket should not hide the sample result.
}
resolvePromise(sample);
}
});
}
function markFirstAssistantEvent(sample, sentAt) {
if (sample.first_assistant_event_ms !== null || sentAt <= 0) return;
const now = Date.now();
sample.first_assistant_event_at = new Date(now).toISOString();
sample.first_assistant_event_epoch_ms = now;
sample.first_assistant_event_ms = rounded(performance.now() - sentAt);
}
function markFirstAssistantContent(sample, sentAt) {
if (sample.first_assistant_content_ms !== null || sentAt <= 0) return;
const now = Date.now();
sample.first_assistant_content_at = new Date(now).toISOString();
sample.first_assistant_content_epoch_ms = now;
sample.first_assistant_content_ms = rounded(performance.now() - sentAt);
}
function containsPipelineToken(text, prefix) {
const escaped = String(prefix).replace(/[.*+?^${}()|[\]\\]/g, "\\$&");
return new RegExp(`${escaped}-\\d{4}`).test(String(text || ""));
}
function matchesFailureSignal(text, signals) {
const lower = String(text || "").toLowerCase();
return signals.some((signal) => lower.includes(signal.toLowerCase()));
}
function openRawWebSocket(wsUrl, handlers) {
const parsed = new URL(wsUrl);
const secure = parsed.protocol === "wss:";
const port = Number(parsed.port || (secure ? 443 : 80));
const host = parsed.hostname;
const path = `${parsed.pathname}${parsed.search}`;
const key = crypto.randomBytes(16).toString("base64");
const socket = secure
? tls.connect({ host, port, servername: host })
: net.connect({ host, port });
let opened = false;
let closed = false;
let buffer = Buffer.alloc(0);
socket.setNoDelay(true);
socket.on("connect", () => {
const originProtocol = secure ? "https" : "http";
const request = [
`GET ${path} HTTP/1.1`,
`Host: ${parsed.host}`,
"Upgrade: websocket",
"Connection: Upgrade",
`Sec-WebSocket-Key: ${key}`,
"Sec-WebSocket-Version: 13",
`Origin: ${originProtocol}://${parsed.host}`,
"",
"",
].join("\r\n");
socket.write(request);
});
socket.on("data", (chunk) => {
buffer = Buffer.concat([buffer, chunk]);
if (!opened) {
const headerEnd = buffer.indexOf("\r\n\r\n");
if (headerEnd === -1) return;
const headerText = buffer.slice(0, headerEnd).toString("utf8");
buffer = buffer.slice(headerEnd + 4);
if (!/^HTTP\/1\.1 101\b/i.test(headerText)) {
handlers.onError(new Error(`Handshake failed: ${headerText.split("\r\n")[0] || "missing status"}`));
socket.destroy();
return;
}
opened = true;
handlers.onOpen();
}
processFrames();
});
socket.on("error", (error) => {
if (!closed) handlers.onError(error);
});
socket.on("close", () => {
if (closed) return;
closed = true;
handlers.onClose({ code: null, reason: "" });
});
function processFrames() {
while (true) {
const frame = readFrame(buffer);
if (!frame) return;
buffer = buffer.slice(frame.consumed);
if (frame.opcode === 0x1) {
handlers.onMessage(frame.payload.toString("utf8"));
} else if (frame.opcode === 0x8) {
const code = frame.payload.length >= 2 ? frame.payload.readUInt16BE(0) : null;
const reason = frame.payload.length > 2 ? frame.payload.slice(2).toString("utf8") : "";
closed = true;
handlers.onClose({ code, reason });
socket.end();
return;
} else if (frame.opcode === 0x9) {
writeFrame(socket, 0xA, frame.payload);
}
}
}
return {
send(text) {
if (closed || !opened) return;
writeFrame(socket, 0x1, Buffer.from(text, "utf8"));
},
close() {
if (closed) return;
closed = true;
if (!socket.destroyed) {
if (opened) writeFrame(socket, 0x8, Buffer.alloc(0));
setTimeout(() => socket.end(), 50).unref();
}
},
};
}
function readFrame(buffer) {
if (buffer.length < 2) return null;
const first = buffer[0];
const second = buffer[1];
const opcode = first & 0x0f;
const masked = Boolean(second & 0x80);
let length = second & 0x7f;
let offset = 2;
if (length === 126) {
if (buffer.length < offset + 2) return null;
length = buffer.readUInt16BE(offset);
offset += 2;
} else if (length === 127) {
if (buffer.length < offset + 8) return null;
const high = buffer.readUInt32BE(offset);
const low = buffer.readUInt32BE(offset + 4);
length = high * 2 ** 32 + low;
offset += 8;
}
let mask = null;
if (masked) {
if (buffer.length < offset + 4) return null;
mask = buffer.slice(offset, offset + 4);
offset += 4;
}
if (buffer.length < offset + length) return null;
let payload = buffer.slice(offset, offset + length);
if (mask) {
payload = Buffer.from(payload);
for (let index = 0; index < payload.length; index += 1) {
payload[index] ^= mask[index % 4];
}
}
return {
opcode,
payload,
consumed: offset + length,
};
}
function writeFrame(socket, opcode, payload) {
const body = Buffer.isBuffer(payload) ? payload : Buffer.from(payload || "");
const mask = crypto.randomBytes(4);
const headerLength = body.length < 126 ? 2 : body.length <= 0xffff ? 4 : 10;
const header = Buffer.alloc(headerLength);
header[0] = 0x80 | opcode;
if (body.length < 126) {
header[1] = 0x80 | body.length;
} else if (body.length <= 0xffff) {
header[1] = 0x80 | 126;
header.writeUInt16BE(body.length, 2);
} else {
header[1] = 0x80 | 127;
header.writeUInt32BE(Math.floor(body.length / 2 ** 32), 2);
header.writeUInt32BE(body.length >>> 0, 6);
}
const masked = Buffer.from(body);
for (let index = 0; index < masked.length; index += 1) {
masked[index] ^= mask[index % 4];
}
socket.write(Buffer.concat([header, mask, masked]));
}
function buildMetrics({ samples, requestsPerPipeline, concurrency, timeoutMs, loadDurationMs, backendUrl, sessionType, fakeProviderState }) {
const okSamples = samples.filter((sample) => sample.ok);
const statusCounts = {};
const byPipeline = {};
for (const sample of samples) {
statusCounts[sample.status] = (statusCounts[sample.status] || 0) + 1;
if (!byPipeline[sample.pipeline_label]) {
byPipeline[sample.pipeline_label] = {
ok_count: 0,
error_count: 0,
cross_pipeline_leak_count: 0,
timeout_count: 0,
};
}
if (sample.ok) byPipeline[sample.pipeline_label].ok_count += 1;
else byPipeline[sample.pipeline_label].error_count += 1;
byPipeline[sample.pipeline_label].cross_pipeline_leak_count += sample.cross_pipeline_leak_count || 0;
if (sample.status === "timeout") byPipeline[sample.pipeline_label].timeout_count += 1;
}
const errorCount = samples.length - okSamples.length;
return {
probe: caseId,
backend_url: backendUrl,
session_type: sessionType,
requests_per_pipeline: requestsPerPipeline,
total_requests: requestsPerPipeline * 2,
completed_requests: samples.length,
concurrency,
timeout_ms: timeoutMs,
ok_count: okSamples.length,
error_count: errorCount,
timeout_count: samples.filter((sample) => sample.status === "timeout").length,
cross_pipeline_leak_count: samples.reduce((count, sample) => count + (sample.cross_pipeline_leak_count || 0), 0),
error_rate: samples.length === 0 ? 1 : rounded(errorCount / samples.length),
load_duration_ms: rounded(loadDurationMs),
throughput_rps: loadDurationMs <= 0 ? 0 : rounded(okSamples.length / (loadDurationMs / 1000)),
status_counts: statusCounts,
by_pipeline: byPipeline,
connected_ms: stats(samples.map((sample) => sample.connected_ms).filter(Number.isFinite)),
first_assistant_event_ms: stats(samples.map((sample) => sample.first_assistant_event_ms).filter(Number.isFinite)),
first_assistant_content_ms: stats(samples.map((sample) => sample.first_assistant_content_ms).filter(Number.isFinite)),
first_response_ms: stats(okSamples.map((sample) => sample.first_response_ms).filter(Number.isFinite)),
response_duration_ms: stats(okSamples.map((sample) => sample.response_duration_ms).filter(Number.isFinite)),
fake_provider: summarizeFakeProviderState(fakeProviderState),
provider_timing: buildProviderTimingMetrics(samples, fakeProviderState),
samples,
};
}
function buildThresholds(metrics) {
return {
cross_pipeline_leak_count: {
actual: metrics.cross_pipeline_leak_count,
max: 0,
pass: metrics.cross_pipeline_leak_count === 0,
},
error_rate: {
actual: metrics.error_rate,
max: maxErrorRate,
pass: metrics.error_rate <= maxErrorRate,
},
response_p95_ms: {
actual: metrics.response_duration_ms.p95,
max: responseP95BudgetMs,
pass: metrics.ok_count > 0 && metrics.response_duration_ms.p95 <= responseP95BudgetMs,
},
};
}
function positiveInteger(value, fallback) {
const parsed = Number.parseInt(String(value || ""), 10);
return Number.isInteger(parsed) && parsed > 0 ? parsed : fallback;
}
function positiveNumber(value, fallback) {
const parsed = Number(value || "");
return Number.isFinite(parsed) && parsed >= 0 ? parsed : fallback;
}
function bool(value, fallback) {
if (value === undefined || value === "") return fallback;
if (/^(1|true|yes|on)$/i.test(String(value))) return true;
if (/^(0|false|no|off)$/i.test(String(value))) return false;
return fallback;
}
function textList(value) {
return String(value || "")
.split(/\r?\n|,/)
.map((item) => item.trim())
.filter(Boolean);
}
function rounded(value) {
return Number(value.toFixed(3));
}
function percentile(values, percentileValue) {
if (values.length === 0) return 0;
const sorted = [...values].sort((a, b) => a - b);
const index = Math.min(sorted.length - 1, Math.ceil((percentileValue / 100) * sorted.length) - 1);
return rounded(sorted[index]);
}
function stats(values) {
if (values.length === 0) return { min: 0, p50: 0, p95: 0, p99: 0, max: 0 };
return {
min: rounded(Math.min(...values)),
p50: percentile(values, 50),
p95: percentile(values, 95),
p99: percentile(values, 99),
max: rounded(Math.max(...values)),
};
}
function looksLikeEnvIssue(error) {
const message = String(error?.message || error || "");
return /fetch failed|ECONNREFUSED|ENOTFOUND|LANGBOT_.*not configured|Could not read recovery_key|Backend did not respond/i.test(message);
}
function safeReason(value) {
return redact(String(value || "")).slice(0, 1000);
}
@@ -0,0 +1,134 @@
export function summarizeFakeProviderState(state) {
if (!state) return null;
const recentRequests = Array.isArray(state.recent_requests) ? state.recent_requests : [];
const chatRequests = recentRequests.filter((request) => String(request?.path || "").includes("/chat/completions"));
const successfulRequests = chatRequests.filter((request) => request?.status === "ok");
const faultRequests = chatRequests.filter((request) => (
request?.should_fail === true
|| request?.status === "http_fault"
|| (Number.isFinite(request?.http_status) && request.http_status >= 400)
));
return {
status: state.status || "unknown",
url: state.url || "",
request_count: Number.isFinite(state.request_count) ? state.request_count : recentRequests.length,
recent_request_count: recentRequests.length,
chat_request_count: chatRequests.length,
fault_count: faultRequests.length,
streamed_request_count: chatRequests.filter((request) => request?.stream === true).length,
duration_ms: stats(chatRequests.map((request) => numberOrNull(request?.duration_ms)).filter(Number.isFinite)),
successful_duration_ms: stats(successfulRequests.map((request) => numberOrNull(request?.duration_ms)).filter(Number.isFinite)),
first_chunk_ms: stats(successfulRequests.map((request) => numberOrNull(request?.first_chunk_ms)).filter(Number.isFinite)),
first_content_chunk_ms: stats(successfulRequests.map((request) => numberOrNull(request?.first_content_chunk_ms)).filter(Number.isFinite)),
content_chunk_count: stats(successfulRequests.map((request) => numberOrNull(request?.content_chunk_count)).filter(Number.isFinite)),
config: state.config || {},
};
}
export function buildProviderTimingMetrics(samples, state) {
const recentRequests = Array.isArray(state?.recent_requests) ? state.recent_requests : [];
const byExpectedText = new Map();
for (const request of recentRequests) {
const expected = String(request?.expected_text || "");
if (!expected) continue;
if (!byExpectedText.has(expected)) byExpectedText.set(expected, []);
byExpectedText.get(expected).push(request);
}
const segments = [];
const missingExpectedText = [];
for (const sample of samples) {
const expected = String(sample?.expected_text || "");
if (!expected) continue;
const request = (byExpectedText.get(expected) || []).shift();
if (!request) {
missingExpectedText.push(expected);
continue;
}
const segment = buildTimingSegment(sample, request);
if (segment) segments.push(segment);
}
const values = (key) => segments.map((segment) => numberOrNull(segment[key])).filter(Number.isFinite);
return {
matched_request_count: segments.length,
missing_provider_match_count: missingExpectedText.length,
missing_expected_text: missingExpectedText.slice(0, 20),
send_to_provider_start_ms: stats(values("send_to_provider_start_ms")),
provider_duration_ms: stats(values("provider_duration_ms")),
provider_finish_to_ws_final_ms: stats(values("provider_finish_to_ws_final_ms")),
langbot_overhead_estimate_ms: stats(values("langbot_overhead_estimate_ms")),
e2e_minus_provider_ms: stats(values("e2e_minus_provider_ms")),
provider_first_content_to_ws_first_content_ms: stats(values("provider_first_content_to_ws_first_content_ms")),
segments,
};
}
function buildTimingSegment(sample, request) {
const sentEpochMs = numberOrNull(sample.sent_epoch_ms);
const finishedEpochMs = numberOrNull(sample.finished_epoch_ms);
const providerStartedEpochMs = numberOrNull(request.started_epoch_ms);
const providerFinishedEpochMs = numberOrNull(request.finished_epoch_ms);
const providerFirstContentEpochMs = numberOrNull(request.first_content_chunk_epoch_ms);
const wsFirstContentEpochMs = numberOrNull(sample.first_assistant_content_epoch_ms);
const responseDurationMs = numberOrNull(sample.response_duration_ms);
const providerDurationMs = numberOrNull(request.duration_ms);
const sendToProviderStartMs = finiteDelta(providerStartedEpochMs, sentEpochMs);
const providerFinishToWsFinalMs = finiteDelta(finishedEpochMs, providerFinishedEpochMs);
const e2eMinusProviderMs = Number.isFinite(responseDurationMs) && Number.isFinite(providerDurationMs)
? rounded(responseDurationMs - providerDurationMs)
: null;
const overheadEstimateMs = Number.isFinite(sendToProviderStartMs) && Number.isFinite(providerFinishToWsFinalMs)
? rounded(sendToProviderStartMs + providerFinishToWsFinalMs)
: e2eMinusProviderMs;
return {
sample_index: sample.index,
pipeline_label: sample.pipeline_label || "",
expected_text: sample.expected_text || "",
provider_request_id: request.id || "",
provider_request_number: request.request_number ?? null,
response_duration_ms: responseDurationMs,
provider_duration_ms: providerDurationMs,
send_to_provider_start_ms: sendToProviderStartMs,
provider_finish_to_ws_final_ms: providerFinishToWsFinalMs,
langbot_overhead_estimate_ms: overheadEstimateMs,
e2e_minus_provider_ms: e2eMinusProviderMs,
provider_first_content_to_ws_first_content_ms: finiteDelta(wsFirstContentEpochMs, providerFirstContentEpochMs),
provider_status: request.status || "",
provider_http_status: request.http_status ?? null,
};
}
function finiteDelta(left, right) {
return Number.isFinite(left) && Number.isFinite(right) ? rounded(left - right) : null;
}
export function stats(values) {
if (values.length === 0) return { min: 0, p50: 0, p95: 0, p99: 0, max: 0 };
return {
min: rounded(Math.min(...values)),
p50: percentile(values, 50),
p95: percentile(values, 95),
p99: percentile(values, 99),
max: rounded(Math.max(...values)),
};
}
export function percentile(values, percentileValue) {
if (values.length === 0) return 0;
const sorted = [...values].sort((a, b) => a - b);
const index = Math.min(sorted.length - 1, Math.ceil((percentileValue / 100) * sorted.length) - 1);
return rounded(sorted[index]);
}
export function rounded(value) {
return Number(value.toFixed(3));
}
function numberOrNull(value) {
const number = Number(value);
return Number.isFinite(number) ? number : null;
}
@@ -144,8 +144,26 @@ request because Debug Chat broadcasts messages to every connection in the same
session; unique tokens prevent one connection from counting another
connection's response as its own.
When the fake provider is used, reports also include provider-side timing in
`metrics.json`:
- `fake_provider.duration_ms` and `fake_provider.first_content_chunk_ms`
measure the controlled provider itself.
- `provider_timing.send_to_provider_start_ms` estimates WebSocket ingress,
pipeline dispatch, runner setup, and requester time before the provider
receives the request.
- `provider_timing.provider_finish_to_ws_final_ms` estimates the path from
provider completion back to the final Debug Chat WebSocket response.
- `provider_timing.langbot_overhead_estimate_ms` is the sum of those two
LangBot-side segments when wall-clock timestamps can be matched by the
unique expected response token.
After the baseline passes, run `langbot-fake-provider-debug-chat-slow-load` to
keep the same live backend path while injecting deterministic streaming latency.
Run `langbot-fake-provider-debug-chat-cross-pipeline-isolation` to open
concurrent Debug Chat connections against two fake-provider pipelines and fail
if one pipeline receives the other pipeline's response token. This targets
global pipeline-state regressions in the WebSocket Debug Chat path.
Run `langbot-fake-provider-debug-chat-fault-recovery` to inject bounded HTTP
provider failures and require both observed failures and later successful
requests. The fault-recovery case is deliberately sequential because failed
@@ -165,6 +183,7 @@ Useful commands:
```bash
rtk bin/lbs test run langbot-fake-provider-debug-chat-load --run-id langbot-fake-load-local
rtk bin/lbs test run langbot-fake-provider-debug-chat-slow-load --run-id langbot-fake-slow-local
rtk bin/lbs test run langbot-fake-provider-debug-chat-cross-pipeline-isolation --run-id langbot-fake-cross-pipeline-local
rtk bin/lbs test run langbot-fake-provider-debug-chat-fault-recovery --run-id langbot-fake-fault-local
rtk bin/lbs test run langbot-space-debug-chat-concurrency-smoke --run-id langbot-space-smoke-local
rtk bin/lbs suite run langbot-debug-chat-load-gate --run-id langbot-debug-chat-load-local --include-manual-check
@@ -184,8 +203,9 @@ Use the smallest gate that answers the quality question:
starting with Pipeline Debug Chat send-to-visible-completion latency. Run it
only when the browser profile and target pipeline are ready.
- `langbot-debug-chat-load-gate`: WebSocket Debug Chat load checks, starting
with controlled fake-provider baseline, slow-provider, and fault-recovery
profiles, plus an optional low-volume real Space-provider smoke.
with controlled fake-provider baseline, slow-provider, cross-pipeline
isolation, and fault-recovery profiles, plus an optional low-volume real
Space-provider smoke.
- `langbot-performance-reliability-gate`: combined starter gate for synthetic
contracts plus live backend checks.
@@ -11,5 +11,6 @@ tags:
cases:
- langbot-fake-provider-debug-chat-load
- langbot-fake-provider-debug-chat-slow-load
- langbot-fake-provider-debug-chat-cross-pipeline-isolation
- langbot-fake-provider-debug-chat-fault-recovery
- langbot-space-debug-chat-concurrency-smoke