全模态【Qwen-Omni】:图文输入、语音输出,重新定义AI助手
接下来,我们做一个好玩(wuliao)的事情,将使用Qwen3-Omni对图片生成的语音,结合原图片生成一个视频。
以下代码实现的功能特点:
-
完整流程:图片分析 → 语音生成 → 视频合成 -
自动时长:根据音频长度自动设置视频时长 -
多方法支持:使用OpenCV + ffmpeg -
错误处理:完善的异常处理和临时文件清理 -
灵活配置:可自定义视频参数
输出结果:
-
生成的视频将包含: -
静态的eagle.png图片作为视频画面 -
AI生成的语音描述作为音频 -
合适的视频时长(根据语音长度)
这样您就得到了一个结合了图片和AI语音描述的视频文件!
import base64
import soundfile as sf
import numpy as np
import os
import subprocess
import tempfile
from PIL import Image
import cv2
def analyze_local_image_and_generate_video(client, image_path, question="图中描绘的是什么景象?",
voice="Cherry", audio_format="wav",
video_filename="output_video.mp4",
video_duration=None,
print_text=True, samplerate=24000):
"""
分析本地图片,生成语音描述,并创建结合图片和音频的视频
参数:
client: OpenAI客户端实例
image_path (str): 本地图片文件路径
question (str): 对图片的提问
voice (str): 语音类型
audio_format (str): 音频格式
video_filename (str): 输出视频文件名
video_duration (int): 视频时长(秒),如果为None则根据音频长度自动计算
print_text (bool): 是否打印文本回复
samplerate (int): 音频采样率
返回:
dict: 包含文本回复、音频和视频信息的字典
"""
try:
# 第一步:分析图片并生成语音描述
print("=== 第一步:分析图片并生成语音描述 ===")
analysis_result = analyze_local_image(
client=client,
image_path=image_path,
question=question,
voice=voice,
audio_format=audio_format,
generate_audio=True,
save_audio=True,
audio_filename="temp_audio.wav",
print_text=print_text,
samplerate=samplerate,
stream=True
)
if not analysis_result['success']:
return analysis_result
# 第二步:创建视频
print("\n=== 第二步:创建视频 ===")
video_result = create_video_from_image_and_audio(
image_path=image_path,
audio_path="temp_audio.wav",
output_path=video_filename,
duration=video_duration
)
# 清理临时音频文件
if os.path.exists("temp_audio.wav"):
os.remove("temp_audio.wav")
# 合并结果
final_result = {
**analysis_result,
**video_result
}
return final_result
except Exception as e:
print(f"视频生成失败: {e}")
# 清理临时文件
if os.path.exists("temp_audio.wav"):
os.remove("temp_audio.wav")
return {
"success": False,
"error": str(e),
"text": analysis_result.get('text', '') if 'analysis_result' in locals() else '',
"video_created": False
}
def create_video_from_image_and_audio(image_path, audio_path, output_path, duration=None):
"""
使用图片和音频创建视频
参数:
image_path (str): 图片文件路径
audio_path (str): 音频文件路径
output_path (str): 输出视频路径
duration (int): 视频时长(秒),如果为None则使用音频长度
返回:
dict: 视频生成结果
"""
try:
# 检查输入文件
if not os.path.exists(image_path):
raise FileNotFoundError(f"图片文件不存在: {image_path}")
if not os.path.exists(audio_path):
raise FileNotFoundError(f"音频文件不存在: {audio_path}")
# 获取音频时长
audio_info = sf.info(audio_path)
audio_duration = audio_info.duration
# 使用音频时长作为视频时长(如果没有指定)
if duration is None:
duration = int(audio_duration) + 1 # 加1秒缓冲
print(f"音频时长: {audio_duration:.2f}秒")
print(f"视频时长: {duration}秒")
# 方法1:使用OpenCV创建视频(推荐,依赖较少)
return create_video_with_opencv(image_path, audio_path, output_path, duration)
except Exception as e:
print(f"视频创建失败: {e}")
return {
"success": False,
"video_created": False,
"error": str(e)
}
def create_video_with_opencv(image_path, audio_path, output_path, duration):
"""
使用OpenCV创建视频(静态图片+音频)
"""
try:
# 读取图片
img = cv2.imread(image_path)
if img is None:
raise ValueError(f"无法读取图片: {image_path}")
height, width, layers = img.shape
fps = 25 # 帧率
# 创建视频写入器
fourcc = cv2.VideoWriter_fourcc(*'mp4v')
temp_video_path = "temp_video_no_audio.mp4"
video_writer = cv2.VideoWriter(temp_video_path, fourcc, fps, (width, height))
# 写入帧
total_frames = int(duration * fps)
print(f"生成视频帧: {total_frames}帧")
for i in range(total_frames):
video_writer.write(img)
video_writer.release()
print("视频帧生成完成")
# 合并音频(使用ffmpeg)
if merge_audio_with_ffmpeg(temp_video_path, audio_path, output_path):
# 清理临时文件
if os.path.exists(temp_video_path):
os.remove(temp_video_path)
# 获取输出文件信息
file_size = os.path.getsize(output_path)
return {
"success": True,
"video_created": True,
"video_path": output_path,
"video_duration": duration,
"video_size": file_size,
"resolution": f"{width}x{height}",
"method": "opencv"
}
else:
return {
"success": False,
"video_created": False,
"error": "音频合并失败"
}
except Exception as e:
print(f"OpenCV视频创建失败: {e}")
return {
"success": False,
"video_created": False,
"error": str(e)
}
def merge_audio_with_ffmpeg(video_path, audio_path, output_path):
"""
使用ffmpeg合并视频和音频
"""
try:
# 检查ffmpeg是否可用
try:
subprocess.run(["ffmpeg", "-version"], capture_output=True, check=True)
except (subprocess.CalledProcessError, FileNotFoundError):
print("警告: ffmpeg未安装,无法添加音频")
# 如果没有ffmpeg,直接复制视频文件
import shutil
shutil.copy2(video_path, output_path)
return True
# 使用ffmpeg合并音频
cmd = [
"ffmpeg", "-y", # -y 覆盖输出文件
"-i", video_path,
"-i", audio_path,
"-c:v", "copy", # 复制视频流
"-c:a", "aac", # 使用AAC编码音频
"-shortest", # 以最短的流为准
output_path
]
result = subprocess.run(cmd, capture_output=True, text=True)
if result.returncode == 0:
print("音频合并成功")
return True
else:
print(f"ffmpeg错误: {result.stderr}")
return False
except Exception as e:
print(f"音频合并失败: {e}")
return False
def analyze_local_image(client, image_path, question="图中描绘的是什么景象?",
voice="Cherry", audio_format="wav",
generate_audio=True, save_audio=True,
audio_filename="image_description.wav",
print_text=True, samplerate=24000,
stream=True):
"""
(之前的analyze_local_image函数保持不变)
"""
try:
# 检查图片文件是否存在
if not os.path.exists(image_path):
raise FileNotFoundError(f"图片文件不存在: {image_path}")
# 读取图片文件并编码为base64
with open(image_path, "rb") as image_file:
image_data = image_file.read()
base64_image = base64.b64encode(image_data).decode('utf-8')
# 构建数据URL
file_ext = os.path.splitext(image_path)[1].lower()
mime_type = "image/png" if file_ext == '.png' else "image/jpeg"
image_url = f"data:{mime_type};base64,{base64_image}"
# 构建多模态消息内容
messages = [
{
"role": "user",
"content": [
{
"type": "image_url",
"image_url": {"url": image_url},
},
{"type": "text", "text": question},
],
}
]
# 准备请求参数
request_params = {
"model": "qwen3-omni-flash",
"messages": messages,
"stream": stream,
}
if generate_audio:
request_params["modalities"] = ["text", "audio"]
request_params["audio"] = {"voice": voice, "format": audio_format}
request_params["stream_options"] = {"include_usage": True}
else:
request_params["modalities"] = ["text"]
if not stream:
request_params["stream"] = False
# 发起聊天补全请求
completion = client.chat.completions.create(**request_params)
# 初始化变量
text_response = ""
audio_base64_string = ""
if print_text:
print("模型回复:")
if stream:
for chunk in completion:
if chunk.choices and chunk.choices[0].delta.content:
chunk_text = chunk.choices[0].delta.content
text_response += chunk_text
if print_text:
print(chunk_text, end="", flush=True)
if (generate_audio and chunk.choices and
hasattr(chunk.choices[0].delta, "audio") and
chunk.choices[0].delta.audio and
chunk.choices[0].delta.audio.get("data")):
audio_base64_string += chunk.choices[0].delta.audio["data"]
else:
text_response = completion.choices[0].message.content
if print_text:
print(text_response)
# 保存音频文件
audio_data = None
if generate_audio and audio_base64_string and save_audio:
wav_bytes = base64.b64decode(audio_base64_string)
audio_data = np.frombuffer(wav_bytes, dtype=np.int16)
sf.write(audio_filename, audio_data, samplerate=samplerate)
if print_text:
print(f"\n音频文件已保存至:{audio_filename}")
return {
"success": True,
"text": text_response,
"audio_base64": audio_base64_string if generate_audio else "",
"audio_data": audio_data if generate_audio else None,
"audio_saved": (generate_audio and save_audio and bool(audio_base64_string)),
"audio_filename": audio_filename if (generate_audio and save_audio and audio_base64_string) else None,
"generate_audio": generate_audio,
"image_path": image_path,
"question": question
}
except Exception as e:
print(f"图片分析请求失败: {e}")
return {
"success": False,
"text": "",
"audio_base64": "",
"audio_data": None,
"audio_saved": False,
"audio_filename": None,
"generate_audio": generate_audio,
"error": str(e),
"image_path": image_path,
"question": question
}
# 使用示例
if __name__ == "__main__":
# 生成视频
print("=== 开始生成图片描述视频 ===")
result = analyze_local_image_and_generate_video(
client=client,
image_path="eagle.png", # 替换为您的图片路径
question="请详细描述这张图片中的鹰,包括它的特征、姿态和环境",
voice="Cherry",
video_filename="eagle_video.mp4",
print_text=True
)
# 打印结果
print("\n=== 生成结果 ===")
if result['success'] and result['video_created']:
print(f"✅ 视频生成成功!")
print(f"📁 视频文件: {result['video_path']}")
print(f"⏱️ 视频时长: {result['video_duration']}秒")
print(f"📊 视频大小: {result['video_size']}字节")
print(f"🖼️ 分辨率: {result['resolution']}")
print(f"📝 描述文本: {result['text'][:100]}...")
else:
print(f"❌ 生成失败: {result.get('error', '未知错误')}")
输出为:
=== 开始生成图片描述视频 ===
=== 第一步:分析图片并生成语音描述 ===
模型回复:
这是一只白头海雕,它正展开双翼在空中翱翔。鹰的头部和尾部是白色的,与深棕色的身体形成鲜明对比。它的喙和爪子是黄色的,显得非常锐利。鹰的姿态非常优雅,翅膀完全展开,显示出强大的力量和自由。背景是蓝天白云,给人一种广阔无垠的感觉。
音频文件已保存至:temp_audio.wav
=== 第二步:创建视频 ===
音频时长: 22.56秒
视频时长: 23秒
生成视频帧: 575帧
视频帧生成完成
音频合并成功
=== 生成结果 ===
✅ 视频生成成功!
📁 视频文件: eagle_video.mp4
⏱️ 视频时长: 23秒
📊 视频大小: 7406633字节
🖼️ 分辨率: 1024x1024
📝 描述文本: 这是一只白头海雕,它正展开双翼在空中翱翔。鹰的头部和尾部是白色的,与深棕色的身体形成鲜明对比。它的喙和爪子是黄色的,显得非常锐利。鹰的姿态非常优雅,翅膀完全展开,显示出强大的力量和自由。背景是蓝天白云...
设想一下,如果我们是对视频进行理解后,然后将理解的文字或语音去重新生成一段新视频,将会更好玩一些。

