import pyttsx3 import threading import queue import time class VoicePrompt: def __init__(self, rate=150, volume=1.0, voice_id=None): """初始化语音提示组件 Args: rate: 语速 volume: 音量 (0.0 到 1.0) voice_id: 语音ID (None表示使用默认值) """ self.rate = rate self.volume = volume self.voice_id = voice_id # 创建语音消息队列 self.speech_queue = queue.Queue() # 启动单独的线程来处理语音播报 self.speech_thread = threading.Thread(target=self._speech_worker, daemon=True) self.speech_thread.start() # 防止多次初始化 self.initialized = False def _init_engine(self): """初始化语音引擎""" if not hasattr(self, 'engine') or self.engine is None: try: self.engine = pyttsx3.init() self.engine.setProperty('rate', self.rate) self.engine.setProperty('volume', self.volume) # 设置语音(如果提供了voice_id) if self.voice_id: self.engine.setProperty('voice', self.voice_id) else: # 尝试设置中文语音(如果有的话) voices = self.engine.getProperty('voices') for voice in voices: if 'chinese' in voice.id.lower() or 'zh' in voice.id.lower(): self.engine.setProperty('voice', voice.id) break self.initialized = True except Exception as e: print(f"初始化语音引擎失败: {e}") def _speech_worker(self): """后台线程,从队列中获取文本并播报""" # 初始化引擎(仅在这个区域创建和使用引擎) self._init_engine() while True: try: # 从队列中读取文本 text = self.speech_queue.get() if text == "__EXIT__": # 退出信号 break if not self.initialized: self._init_engine() # 仅当初始化成功时播放语音 if self.initialized: try: self.engine.say(text) self.engine.runAndWait() except Exception as e: print(f"语音播报错误: {e}") # 重新初始化引擎 try: self.engine = None self._init_engine() except Exception as e2: print(f"重新初始化引擎失败: {e2}") time.sleep(1) # 避免循环过快 # 通知队列任务完成 self.speech_queue.task_done() except Exception as e: print(f"语音工作线程错误: {e}") time.sleep(0.5) # 防止错误时CPU资源过度消耗 def speak(self, text, block=False): """将要播报的文本添加到队列 Args: text: 要播报的文本 block: 是否等待播报完成 """ try: # 将文本加入队列 self.speech_queue.put(text) # 如果是阻塞模式,等待这个任务完成 if block: self.speech_queue.join() except Exception as e: print(f"添加语音文本到队列失败: {e}") def get_available_voices(self): """获取所有可用的语音 Returns: list: 可用语音列表 """ voices = self.engine.getProperty('voices') return [(voice.id, voice.name) for voice in voices] # 测试代码 if __name__ == "__main__": voice = VoicePrompt() print("可用语音:") for voice_id, name in voice.get_available_voices(): print(f" - {name} ({voice_id})") voice.speak("您好,欢迎使用人脸识别系统", block=True)