diff --git a/pkg/provider/modelmgr/requesters/anthropicmsgs.py b/pkg/provider/modelmgr/requesters/anthropicmsgs.py index e0850c03..0c73068c 100644 --- a/pkg/provider/modelmgr/requesters/anthropicmsgs.py +++ b/pkg/provider/modelmgr/requesters/anthropicmsgs.py @@ -135,6 +135,14 @@ class AnthropicMessages(requester.ProviderAPIRequester): args['messages'] = req_messages + if args["thinking"]: + args['thinking'] = { + "type": "enabled", + "budget_tokens": 10000 + } + else: + args.pop('thinking') + if funcs: tools = await self.ap.tool_mgr.generate_tools_for_anthropic(funcs) @@ -148,12 +156,12 @@ class AnthropicMessages(requester.ProviderAPIRequester): 'content': '', 'role': resp.role, } - + print(type(resp)) assert type(resp) is anthropic.types.message.Message for block in resp.content: if not remove_think and block.type == 'thinking': - args['content'] = '' + block.thinking + '\n' + args['content'] + args['content'] = '\n' + block.thinking + '\n\n' + args['content'] elif block.type == 'text': args['content'] += block.text elif block.type == 'tool_use': @@ -177,3 +185,175 @@ class AnthropicMessages(requester.ProviderAPIRequester): raise errors.RequesterError(f'模型无效: {e.message}') else: raise errors.RequesterError(f'请求地址无效: {e.message}') + + + async def invoke_llm_stream( + self, + query: core_entities.Query, + model: requester.RuntimeLLMModel, + messages: typing.List[llm_entities.Message], + funcs: typing.List[tools_entities.LLMFunction] = None, + extra_args: dict[str, typing.Any] = {}, + remove_think: bool = False, + ) -> llm_entities.Message: + self.client.api_key = model.token_mgr.get_token() + + args = extra_args.copy() + args['model'] = model.model_entity.name + args['stream'] = True + + # 处理消息 + + # system + system_role_message = None + + for i, m in enumerate(messages): + if m.role == 'system': + system_role_message = m + + break + + if system_role_message: + messages.pop(i) + + if isinstance(system_role_message, llm_entities.Message) and isinstance(system_role_message.content, str): + args['system'] = system_role_message.content + + req_messages = [] + + for m in messages: + if m.role == 'tool': + tool_call_id = m.tool_call_id + + req_messages.append( + { + 'role': 'user', + 'content': [ + { + 'type': 'tool_result', + 'tool_use_id': tool_call_id, + 'content': m.content, + } + ], + } + ) + + continue + + msg_dict = m.dict(exclude_none=True) + + if isinstance(m.content, str) and m.content.strip() != '': + msg_dict['content'] = [{'type': 'text', 'text': m.content}] + elif isinstance(m.content, list): + for i, ce in enumerate(m.content): + if ce.type == 'image_base64': + image_b64, image_format = await image.extract_b64_and_format(ce.image_base64) + + alter_image_ele = { + 'type': 'image', + 'source': { + 'type': 'base64', + 'media_type': f'image/{image_format}', + 'data': image_b64, + }, + } + msg_dict['content'][i] = alter_image_ele + + if m.tool_calls: + for tool_call in m.tool_calls: + msg_dict['content'].append( + { + 'type': 'tool_use', + 'id': tool_call.id, + 'name': tool_call.function.name, + 'input': json.loads(tool_call.function.arguments), + } + ) + + del msg_dict['tool_calls'] + + req_messages.append(msg_dict) + if args["thinking"]: + args['thinking'] = { + "type": "enabled", + "budget_tokens": 10000 + } + else: + args.pop('thinking') + + args['messages'] = req_messages + + if funcs: + tools = await self.ap.tool_mgr.generate_tools_for_anthropic(funcs) + + if tools: + args['tools'] = tools + + try: + role = 'assistant' # 默认角色 + # chunk_idx = 0 + think_started = False + think_ended = False + finish_reason = False + content = '' + async for chunk in await self.client.messages.create(**args): + # print(chunk) + if isinstance(chunk, anthropic.types.raw_content_block_start_event.RawContentBlockStartEvent): # 记录开始 + if chunk.content_block.type == 'thinking' and not remove_think: + think_started = True + continue + elif chunk.content_block.type == 'text' and not remove_think: + think_ended = True + continue + elif isinstance(chunk, anthropic.types.raw_content_block_delta_event.RawContentBlockDeltaEvent): + if chunk.delta.type == "thinking_delta": + if think_started: + think_started = False + content = '\n' + chunk.delta.thinking + elif remove_think: + continue + else: + content = chunk.delta.thinking + elif chunk.delta.type == "text_delta": + if think_ended: + think_ended = False + content = '\n\n' + chunk.delta.text + else: + content = chunk.delta.text + elif isinstance(chunk, anthropic.types.raw_content_block_stop_event.RawContentBlockStopEvent): + continue # 记录raw_content_block结束的 + + elif isinstance(chunk, anthropic.types.raw_message_delta_event.RawMessageDeltaEvent): + if chunk.delta.stop_reason == "end_turn": + finish_reason = True + elif isinstance(chunk, anthropic.types.raw_message_stop_event.RawMessageStopEvent): + continue # 这个好像是完全结束 + else: + print(chunk) + continue + + + args = { + 'content': content, + 'role': role, + "is_final": finish_reason + } + # if chunk_idx == 0: + # chunk_idx += 1 + # continue + + # assert type(chunk) is anthropic.types.message.Chunk + + + yield llm_entities.MessageChunk(**args) + + # return llm_entities.Message(**args) + except anthropic.AuthenticationError as e: + raise errors.RequesterError(f'api-key 无效: {e.message}') + except anthropic.BadRequestError as e: + raise errors.RequesterError(str(e.message)) + except anthropic.NotFoundError as e: + if 'model: ' in str(e): + raise errors.RequesterError(f'模型无效: {e.message}') + else: + raise errors.RequesterError(f'请求地址无效: {e.message}')