修改: app/api/bedrock.ts

修改:     app/client/platforms/bedrock.ts
	修改:     app/constant.ts
This commit is contained in:
glay
2024-11-22 06:33:39 +08:00
parent f60c237b16
commit bd68df1d9b
3 changed files with 309 additions and 78 deletions

View File

@@ -7,32 +7,117 @@ function parseEventData(chunk: Uint8Array): any {
const decoder = new TextDecoder();
const text = decoder.decode(chunk);
try {
return JSON.parse(text);
const parsed = JSON.parse(text);
// AWS Bedrock wraps the response in a 'body' field
if (typeof parsed.body === "string") {
try {
return JSON.parse(parsed.body);
} catch (e) {
return { output: parsed.body };
}
}
return parsed.body || parsed;
} catch (e) {
console.error("Error parsing event data:", e);
try {
// Handle base64 encoded responses
const base64Match = text.match(/:"([A-Za-z0-9+/=]+)"/);
if (base64Match) {
const decoded = Buffer.from(base64Match[1], "base64").toString("utf-8");
return JSON.parse(decoded);
try {
return JSON.parse(decoded);
} catch (e) {
return { output: decoded };
}
}
// Handle event-type responses
const eventMatch = text.match(/:event-type[^\{]+({.*})/);
if (eventMatch) {
return JSON.parse(eventMatch[1]);
try {
return JSON.parse(eventMatch[1]);
} catch (e) {
return { output: eventMatch[1] };
}
}
} catch (innerError) {}
// Handle plain text responses
if (text.trim()) {
// Clean up any malformed JSON characters
const cleanText = text.replace(/[\x00-\x1F\x7F-\x9F]/g, "");
return { output: cleanText };
}
} catch (innerError) {
console.error("Error in fallback parsing:", innerError);
}
}
return null;
}
async function* transformBedrockStream(stream: ReadableStream) {
async function* transformBedrockStream(
stream: ReadableStream,
modelId: string,
) {
const reader = stream.getReader();
let buffer = "";
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;
if (done) {
if (buffer) {
yield `data: ${JSON.stringify({
delta: { text: buffer },
})}\n\n`;
}
break;
}
const parsed = parseEventData(value);
if (parsed) {
if (!parsed) continue;
console.log("Parsed response:", JSON.stringify(parsed, null, 2));
// Handle Titan models
if (modelId.startsWith("amazon.titan")) {
const text = parsed.outputText || "";
if (text) {
yield `data: ${JSON.stringify({
delta: { text },
})}\n\n`;
}
}
// Handle LLaMA3 models
else if (modelId.startsWith("us.meta.llama3")) {
let text = "";
if (parsed.generation) {
text = parsed.generation;
} else if (parsed.output) {
text = parsed.output;
} else if (typeof parsed === "string") {
text = parsed;
}
if (text) {
// Clean up any control characters or invalid JSON characters
text = text.replace(/[\x00-\x1F\x7F-\x9F]/g, "");
yield `data: ${JSON.stringify({
delta: { text },
})}\n\n`;
}
}
// Handle Mistral models
else if (modelId.startsWith("mistral.mistral")) {
const text =
parsed.output || parsed.outputs?.[0]?.text || parsed.completion || "";
if (text) {
yield `data: ${JSON.stringify({
delta: { text },
})}\n\n`;
}
}
// Handle Claude models
else if (modelId.startsWith("anthropic.claude")) {
if (parsed.type === "content_block_delta") {
if (parsed.delta?.type === "text_delta") {
yield `data: ${JSON.stringify({
@@ -66,6 +151,8 @@ async function* transformBedrockStream(stream: ReadableStream) {
function validateRequest(body: any, modelId: string): void {
if (!modelId) throw new Error("Model ID is required");
const bodyContent = body.body || body;
if (modelId.startsWith("anthropic.claude")) {
if (
!body.anthropic_version ||
@@ -82,13 +169,14 @@ function validateRequest(body: any, modelId: string): void {
} else if (typeof body.prompt !== "string") {
throw new Error("prompt is required for Claude 2 and earlier");
}
} else if (modelId.startsWith("meta.llama")) {
if (!body.prompt) throw new Error("Llama requires a prompt");
} else if (modelId.startsWith("us.meta.llama3")) {
if (!bodyContent.prompt) {
throw new Error("prompt is required for LLaMA3 models");
}
} else if (modelId.startsWith("mistral.mistral")) {
if (!Array.isArray(body.messages))
throw new Error("Mistral requires a messages array");
if (!bodyContent.prompt) throw new Error("Mistral requires a prompt");
} else if (modelId.startsWith("amazon.titan")) {
if (!body.inputText) throw new Error("Titan requires inputText");
if (!bodyContent.inputText) throw new Error("Titan requires inputText");
}
}
@@ -114,14 +202,35 @@ async function requestBedrock(req: NextRequest) {
throw new Error("Failed to decrypt AWS credentials");
}
const endpoint = `https://bedrock-runtime.${awsRegion}.amazonaws.com/model/${modelId}/invoke-with-response-stream`;
// Construct the base endpoint
const baseEndpoint = `https://bedrock-runtime.${awsRegion}.amazonaws.com`;
// Set up timeout
const timeoutId = setTimeout(() => controller.abort(), 10 * 60 * 1000);
try {
// Determine the endpoint and request body based on model type
let endpoint;
let requestBody;
let additionalHeaders = {};
const bodyText = await req.clone().text();
if (!bodyText) {
throw new Error("Request body is empty");
}
const bodyJson = JSON.parse(bodyText);
validateRequest(bodyJson, modelId);
const canonicalBody = JSON.stringify(bodyJson);
// For all other models, use standard endpoint
endpoint = `${baseEndpoint}/model/${modelId}/invoke-with-response-stream`;
requestBody = JSON.stringify(bodyJson.body || bodyJson);
console.log("Request to AWS Bedrock:", {
endpoint,
modelId,
body: requestBody,
});
const headers = await sign({
method: "POST",
@@ -130,14 +239,17 @@ async function requestBedrock(req: NextRequest) {
accessKeyId: decryptedAccessKey,
secretAccessKey: decryptedSecretKey,
sessionToken: decryptedSessionToken,
body: canonicalBody,
body: requestBody,
service: "bedrock",
});
const res = await fetch(endpoint, {
method: "POST",
headers,
body: canonicalBody,
headers: {
...headers,
...additionalHeaders,
},
body: requestBody,
redirect: "manual",
// @ts-ignore
duplex: "half",
@@ -146,15 +258,20 @@ async function requestBedrock(req: NextRequest) {
if (!res.ok) {
const error = await res.text();
console.error("AWS Bedrock error response:", error);
try {
const errorJson = JSON.parse(error);
throw new Error(errorJson.message || error);
} catch {
throw new Error(error);
throw new Error(error || "Failed to get response from Bedrock");
}
}
const transformedStream = transformBedrockStream(res.body!);
if (!res.body) {
throw new Error("Empty response from Bedrock");
}
const transformedStream = transformBedrockStream(res.body, modelId);
const stream = new ReadableStream({
async start(controller) {
try {
@@ -163,6 +280,7 @@ async function requestBedrock(req: NextRequest) {
}
controller.close();
} catch (err) {
console.error("Stream error:", err);
controller.error(err);
}
},
@@ -177,6 +295,7 @@ async function requestBedrock(req: NextRequest) {
},
});
} catch (e) {
console.error("Request error:", e);
throw e;
} finally {
clearTimeout(timeoutId);
@@ -202,6 +321,7 @@ export async function handle(
try {
return await requestBedrock(req);
} catch (e) {
console.error("Handler error:", e);
return NextResponse.json(
{ error: true, msg: e instanceof Error ? e.message : "Unknown error" },
{ status: 500 },