赞
踩
这一个模块是基本介绍. 下一个模块是多线程, 多进程调用, 让多个声道同时播放不同音乐
模块名: sounddevice_instructions.py
import os import sys import time import wave import numpy as np import array import sounddevice as sd from scipy.io import wavfile import soundfile class MyException(Exception): """ 自定义的异常类 """ def __init__(self, *args): self.args = args def preliminary_instruction(): """ 初步介绍 sounddevice 查询声卡相关操作 :return: """ # 首先获取与当前主机连接的声卡设备驱动信息 drivers_tuple = sd.query_hostapis() print(drivers_tuple) # 返回一个包含声卡驱动信息的元组, 元组的每个元素, 是一个个字典, 包含了每个驱动的详细信息 for driver_msg_dict in drivers_tuple: # 能够获取每个驱动的名字 print(driver_msg_dict['name'], end=", ") # MME, Windows DirectSound, ASIO, Windows WASAPI, Windows WDM-KS, # 查询当前主机能用的声卡声道 devices_list = sd.query_devices() # 返回一个列表 # 每个设备信息, 以字典形式呈现 for device_msg_dict in devices_list: print(device_msg_dict) # 下面两个函数, 是根据声卡声道名字, 获取声卡声道名称 及 id, 区分 输出 和 输入 两种声道 def get_input_device_id_by_name(channel_name): """ 功能: 根据声卡声道名字, 获取 输入 声道 id :return: 返回输入声道id """ devices_list = sd.query_devices() for index, device_msg_dict in enumerate(devices_list): if channel_name == device_msg_dict["name"] and device_msg_dict["max_input_channels"] > 0: return index else: raise MyException("找不到该设备!!!") def get_output_device_id_by_name(channel_name): """ 功能: 根据声卡声道名字, 获取 输出 声道 id :return: 返回输出声道id """ devices_list = sd.query_devices() for index, device_msg_dict in enumerate(devices_list): if channel_name == device_msg_dict["name"] and device_msg_dict["max_output_channels"] > 0: return index else: raise MyException("找不到该设备!!!") def read_data(audio_file_path, audio_channels): # wav格式文件与 raw(pcm) 格式区分开, 不同格式, 获取其数据内容的方式不一样 if audio_file_path.endswith(".wav"): data_array, sample_rate = soundfile.read(audio_file_path) return data_array elif audio_file_path.endswith(".pcm") or audio_file_path.endswith(".raw"): # 打开一个音频文件, 以 raw(pcm) 格式为例 data_array = array.array('h') with open(audio_file_path, "rb") as f: data_array.frombytes(f.read()) # 根据声道数, 来进行切分(我用的音频是双声道) data_array = data_array[::audio_channels] # 有几个声道, 切分的步长就是几, 这样就能单独切出来一个声道 return data_array def play_audio_file(audio_file_path, channel_id, audio_channels, sample_rate): # 将本机默认的输出声道, 改为 自己设定的声道 # sd.default.device 是一个列表, 第一个元素是: 默认的输入设备id; 第二个是默认的输出设备id sd.default.device[1] = channel_id # 常选参数, 一个数据, 一个采样率, 另外还有一个: blocking=True, 若设置, 则表示播放完毕当前音频再往下进行程序 data_array = read_data(audio_file_path, audio_channels) sd.play(data_array, sample_rate) sd.wait() # 表示等到此音频文件播放完毕之后再往下进行程序 # time.sleep(20) # 使用 time.sleep() ---> 休眠几秒, 音频文件就播放几秒, 时长自己控制 # 注: 如果没有 类似休眠 等延时操作, 则程序只会一闪而过, 不会播放音频 # 使用 sounddevice_example 录制音频, 提示也可以用多进程 def do_record(channel_id, file_path): # 首先设置默认录音声道id, id不同, 调用的录音声卡也会不同, 和 播放一样, 也支持 多进程+多线程, 多个声道同时录音 sd.default.device[0] = channel_id # 再调用函数录音 sample_rate = 44100 # 音频采样率 length = 10 # 时长, 单位秒 record_data = sd.rec(frames=length*sample_rate, samplerate=sample_rate, channels=1, blocking=True) # blocking=True, 能够让录音直到时长 wavfile.write(file_path, sample_rate, record_data) # 边录边播 def play_and_record(input_channel_id, output_channel_id, play_audio_file_path, rec_file_path, play_audio_channels=1, play_audio_fs=44100, rec_file_channels=1): # 首先设置默认输出和输入声道 sd.default.device[0] = input_channel_id sd.default.device[1] = output_channel_id # 开始边录边播 data_array = read_data(play_audio_file_path, play_audio_channels) rec_data = sd.playrec(data=data_array, samplerate=play_audio_fs, channels=rec_file_channels, blocking=True) # 存储录音文件 wavfile.write(rec_file_path, play_audio_fs, rec_data) if __name__ == "__main__": preliminary_instruction() # file_path = r"F:\CloudMusic\download\FIRBetterLife.raw" file_path = r"F:\CloudMusic\download\EpicScoreFireHead.raw" output_id = get_output_device_id_by_name("喇叭/耳机 (Realtek High Definition Audio(SST))") rec_file_path = os.getcwd() + "\\test_record.wav" input_id = get_input_device_id_by_name("麦克风阵列 (Realtek High Definition Audio(SST))") # play_audio_file(rec_file_path, output_id, 1, 44100) # 只播放 # do_record(channel_id, file_path) # 只录音 # play_and_record(input_id, output_id, file_path, rec_file_path, play_audio_channels=2) # 边录边播 # file_path = sys.argv[1] # output_id = int(sys.argv[2]) # audio_channels = int(sys.argv[3]) # fs = int(sys.argv[4]) # play_audio_file(file_path, output_id, audio_channels, fs) # 接收shell参数, 多进程多声道播放音乐
下面这个模块是多进程, 多线程调用, 调用的是上面那个模块
模块名: multi_play_audio.py
import subprocess from python_audio_packages.sounddevice_instructions import get_output_device_id_by_name import threading """ 注: 只有用多进程, 才能实现多声道同时播放不同音频, 一个进程 调用一个 sounddevice , 占用一个声卡声道; 调用这几个 子进程的时候, 可以用 多线程: 原因--->可以用在程序代码内部; 而 multiprocessing.Process 只能用在 if __name__ == "__main__": 之后(Windows系统下是这样) """ channel1 = "喇叭/耳机 (Realtek High Definit" channel2 = "喇叭/耳机 (Realtek High Definition Audio(SST))" channel_id_1 = str(get_output_device_id_by_name(channel1)) channel_id_2 = str(get_output_device_id_by_name(channel2)) audio_file1 = r"F:\CloudMusic\download\FIRBetterLife.raw" audio_file2 = r"F:\CloudMusic\download\EpicScoreFireHead.raw" cmd1 = "python sounddevice_instructions.py " + audio_file1 + " " + channel_id_1 + " 2 44100" cmd2 = "python sounddevice_instructions.py " + audio_file2 + " " + channel_id_2 + " 2 44100" # 使用多进程, 同时使用两个声道, 播放不同音频 def multi_channels_play1(): subprocess.run(cmd1, shell=True) def multi_channels_play2(): subprocess.run(cmd2, shell=True) if __name__ == "__main__": t1 = threading.Thread(target=multi_channels_play1) t2 = threading.Thread(target=multi_channels_play2) t1.start() t2.start()
Copyright © 2003-2013 www.wpsshop.cn 版权所有,并保留所有权利。