| import numpy as np |
| import soundfile as sf |
| import time |
|
|
| def audio_stream_generator(audio_file_path, chunk_size=4096, simulate_realtime=True): |
| """ |
| 音频流生成器,从音频文件中读取数据并以流的方式输出 |
| |
| 参数: |
| audio_file_path: 音频文件路径 |
| chunk_size: 每个数据块的大小(采样点数) |
| simulate_realtime: 是否模拟实时流处理的速度 |
| |
| 生成: |
| numpy.ndarray: 每次生成一个chunk_size大小的np.float32数据块 |
| """ |
| |
| audio_data, sample_rate = sf.read(audio_file_path) |
| |
| |
| if audio_data.dtype != np.float32: |
| audio_data = audio_data.astype(np.float32) |
| |
| |
| if len(audio_data.shape) > 1 and audio_data.shape[1] > 1: |
| audio_data = audio_data.mean(axis=1) |
| |
| print(f"已加载音频文件: {audio_file_path}") |
| print(f"采样率: {sample_rate} Hz") |
| print(f"音频长度: {len(audio_data)/sample_rate:.2f} 秒") |
| |
| |
| chunk_duration = chunk_size / sample_rate if simulate_realtime else 0 |
| |
| |
| audio_len = len(audio_data) |
| for pos in range(0, audio_len, chunk_size): |
| |
| end_pos = min(pos + chunk_size, audio_len) |
| chunk = audio_data[pos:end_pos] |
| |
| |
| if len(chunk) < chunk_size: |
| padded_chunk = np.zeros(chunk_size, dtype=np.float32) |
| padded_chunk[:len(chunk)] = chunk |
| chunk = padded_chunk |
| |
| |
| if simulate_realtime: |
| time.sleep(chunk_duration) |
| |
| yield chunk |
| |
| print("音频流处理完成") |