Visão geral do projeto
Ao integrar DuckDuckGo APIs de pesquisa que fornecem ao modelo de linguagem comum grande um New Bing de acesso à Web em tempo real e recursos inteligentes de perguntas e respostas. Multiplexação de grandes modelos de aplicativos existentes Chamada de funçãopara processos eficientes e flexíveis de processamento de consultas e geração de respostas.
Realização do núcleo
O sistema usa o Function calling mecanismo para a integração perfeita do processamento de consultas e da geração de respostas:
- Análise e pesquisa de consultas: usando
claude-3-haikutalvezgpt-3.5-turboO modelo mais simples gera termos de pesquisa otimizados e chama a API de pesquisa por meio de chamadas de função. - Pesquisa na Web: chame a API do DuckDuckGo para realizar a recuperação de informações em tempo real.
- Geração inteligente de respostas: usando
claude-3-5-sonnnettalvezgpt-4oModelos mais complexos, como esses, combinam resultados de pesquisa e perguntas originais para gerar respostas precisas e abrangentes.
implementação do código
1. configurações de ambiente e definições de função
import os
import json
from openai import OpenAI
from duckduckgo_search import DDGS
API_KEY = os.getenv("OPENAI_API_KEY")
BASE_URL = os.getenv("OPENAI_BASE_URL")
client = OpenAI(api_key=API_KEY, base_url=BASE_URL)
FUNCTIONS = [
{
"name": "search_duckduckgo",
"description": "使用DuckDuckGo搜索引擎查询信息。可以搜索最新新闻、文章、博客等内容。",
"parameters": {
"type": "object",
"properties": {
"keywords": {
"type": "array",
"items": {"type": "string"},
"description": "搜索的关键词列表。例如:['Python', '机器学习', '最新进展']。"
}
},
"required": ["keywords"]
}
}
]
2. funções auxiliares relacionadas
def search_duckduckgo(keywords):
search_term = " ".join(keywords)
with DDGS() as ddgs:
return list(ddgs.text(keywords=search_term, region="cn-zh", safesearch="on", max_results=5))
def print_search_results(results):
for result in results:
print(
f"标题: {result['title']}\n链接: {result['href']}\n摘要: {result['body']}\n---")
def get_openai_response(messages, model="gpt-3.5-turbo", functions=None, function_call=None):
try:
response = client.chat.completions.create(
model=model,
messages=messages,
functions=functions,
function_call=function_call
)
return response.choices[0].message
except Exception as e:
print(f"调用OpenAI API时出错: {str(e)}")
return None
def process_function_call(response_message):
function_name = response_message.function_call.name
function_args = json.loads(response_message.function_call.arguments)
print(f"\n模型选择调用函数: {function_name}")
if function_name == "search_duckduckgo":
keywords = function_args.get('keywords', [])
if not keywords:
print("错误:模型没有提供搜索关键词")
return None
print(f"关键词: {', '.join(keywords)}")
function_response = search_duckduckgo(keywords)
print("\nDuckDuckGo搜索返回结果:")
print_search_results(function_response)
return function_response
else:
print(f"未知的函数名称: {function_name}")
return None
3. principais funções de processamento
def main(question):
print(f"问题:{question}")
messages = [{"role": "user", "content": question}]
response_message = get_openai_response(
messages, functions=FUNCTIONS, function_call="auto")
if not response_message:
return
if response_message.function_call:
function_response = process_function_call(response_message)
if function_response:
messages.extend([
response_message.model_dump(),
{
"role": "function",
"name": response_message.function_call.name,
"content": json.dumps(function_response, ensure_ascii=False)
}
])
final_response = get_openai_response(messages, model="gpt-4o")
if final_response:
print("\n最终回答:")
print(final_response.content)
else:
print("\n模型直接回答:")
print(response_message.content)
realização
Dessa forma, modelos grandes podem ser capazes de responder a uma variedade de perguntas, por exemplo:
- Informações geográficas: "O PRD inclui Foshan?"
- Perguntas sobre os próprios recursos da IA: "O que você pode fazer?"
- Pergunta específica do domínio, "Quem é o autor de Plants vs. Zombies Hybrid?"
- Reflexão filosófica: "A vida tem duas palavras, quais são elas?"
- Eventos atuais relacionados a "What is Jumping Inequality?" (O que é a desigualdade saltitante?)
- Pergunta relacionada ao horário: "Qual é o horário atual de Pequim?

Código completo
import os
import json
from openai import OpenAI
from duckduckgo_search import DDGS
API_KEY = os.getenv("OPENAI_API_KEY")
BASE_URL = os.getenv("OPENAI_BASE_URL")
client = OpenAI(api_key=API_KEY, base_url=BASE_URL)
FUNCTIONS = [
{
"name": "search_duckduckgo",
"description": "使用DuckDuckGo搜索引擎查询信息。可以搜索最新新闻、文章、博客等内容。",
"parameters": {
"type": "object",
"properties": {
"keywords": {
"type": "array",
"items": {"type": "string"},
"description": "搜索的关键词列表。例如:['Python', '机器学习', '最新进展']。"
}
},
"required": ["keywords"]
}
}
]
def search_duckduckgo(keywords):
search_term = " ".join(keywords)
with DDGS() as ddgs:
return list(ddgs.text(keywords=search_term, region="cn-zh", safesearch="on", max_results=5))
def print_search_results(results):
for result in results:
print(
f"标题: {result['title']}\n链接: {result['href']}\n摘要: {result['body']}\n---")
def get_openai_response(messages, model="gpt-3.5-turbo", functions=None, function_call=None):
try:
response = client.chat.completions.create(
model=model,
messages=messages,
functions=functions,
function_call=function_call
)
return response.choices[0].message
except Exception as e:
print(f"调用OpenAI API时出错: {str(e)}")
return None
def process_function_call(response_message):
function_name = response_message.function_call.name
function_args = json.loads(response_message.function_call.arguments)
print(f"\n模型选择调用函数: {function_name}")
if function_name == "search_duckduckgo":
keywords = function_args.get('keywords', [])
if not keywords:
print("错误:模型没有提供搜索关键词")
return None
print(f"关键词: {', '.join(keywords)}")
function_response = search_duckduckgo(keywords)
print("\nDuckDuckGo搜索返回结果:")
print_search_results(function_response)
return function_response
else:
print(f"未知的函数名称: {function_name}")
return None
def main(question):
print(f"问题:{question}")
messages = [{"role": "user", "content": question}]
response_message = get_openai_response(
messages, functions=FUNCTIONS, function_call="auto")
if not response_message:
return
if response_message.function_call:
if not response_message.content:
response_message.content = ""
function_response = process_function_call(response_message)
if function_response:
messages.extend([
response_message.model_dump(),
{
"role": "function",
"name": response_message.function_call.name,
"content": json.dumps(function_response, ensure_ascii=False)
}
])
final_response = get_openai_response(messages, model="gpt-4o")
if final_response:
print("\n最终回答:")
print(final_response.content)
else:
print("\n模型直接回答:")
print(response_message.content)
if __name__ == "__main__":
main("植物大战僵尸杂交版的作者是谁?他是怎么想到做出来这个游戏的?")

































