Created
December 25, 2025 15:26
-
-
Save numpad0/18ae612675688eeccd3af5eabcfdf686 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Taken and modified from https://developers.cyberagent.co.jp/blog/archives/44592/ | |
| # I think also mostly vibecoded circa 2025/07 | |
| import threading | |
| from collections import deque | |
| import time | |
| import signal | |
| import sys | |
| from datetime import datetime | |
| from audio_player import AudioPlayer | |
| from local_llm import llama2 | |
| from stt import faster_whisper_speech_recognition_stt | |
| #from stt import reazonspeech_stt | |
| #from stt import reazonspeech_nemo_v2_stt | |
| from tts import voicevox | |
| class Main(): | |
| def __init__(self, enable_interim_callback=False, stt_variant="faster_whisper") -> None: | |
| self.enable_interim_callback = enable_interim_callback | |
| self.stt_variant = stt_variant | |
| print("Main: Initializing Main class...") | |
| if self.stt_variant == "faster_whisper": | |
| stt_module = faster_whisper_speech_recognition_stt | |
| #elif self.stt_variant == "reazonspeech": | |
| #stt_module = reazonspeech_stt | |
| #elif self.stt_variant == "reazonspeech-nemo-v2": | |
| #stt_module = reazonspeech_nemo_v2_stt | |
| else: | |
| stt_module = faster_whisper_speech_recognition_stt | |
| #raise ValueError(f"Unknown STT variant: {self.stt_variant}") | |
| self.exit_event = threading.Event() | |
| self.user_utterance_event = threading.Event() # Event to signal new utterances | |
| stt_thread = threading.Thread(target=stt_module.main, args=(self.callback_interim if self.enable_interim_callback else None, self.callback_final, self.exit_event, False,)) | |
| print("Main: STT thread created.") | |
| self.llm = llama2.Llama2() | |
| self.user_utterance_queue = deque() | |
| print("Main: user_utterance_queue created.") | |
| self.processing_new_utterance = False # Flag to indicate if main_process is active | |
| print("Main: processing_new_utterance set.") | |
| self.audio_player = AudioPlayer() | |
| print("Main: AudioPlayer initialized.") | |
| stt_thread.start() | |
| print(f"STT thread started: {stt_thread.is_alive()}") | |
| main_process_thread = threading.Thread(target=self._main_process_loop) | |
| main_process_thread.start() | |
| print(f"Main process thread started: {main_process_thread.is_alive()}") | |
| print("Main class initialized. Threads started.") | |
| print("Main class initialization complete.") | |
| self.chat_log_file_path = datetime.now().strftime("chat_log_%Y%m%d_%H%M%S.txt") | |
| try: | |
| self.chat_log_file = open(self.chat_log_file_path, "a", encoding="utf-8") | |
| print(f"Main: Chat log file opened at {self.chat_log_file_path}") | |
| except IOError as e: | |
| print(f"Error opening chat log file: {e}") | |
| self.chat_log_file = None | |
| def _signal_handler(self, signum, frame): | |
| print(f"Signal {signum} received, setting exit event...") | |
| self.exit_event.set() | |
| # No sys.exit(0) here, let wait() handle the exit after threads clean up | |
| def wait(self): | |
| print("Waiting for threads to complete...") | |
| # Wait indefinitely until the exit_event is set by the signal handler | |
| self.exit_event.wait() | |
| # Once exit_event is set, attempt to join all non-main threads | |
| thread_list = threading.enumerate() | |
| # Filter out the main thread and the current thread (if wait is called from a separate thread) | |
| threads_to_join = [t for t in thread_list if t is not threading.main_thread() and t is not threading.current_thread()] | |
| for thread in threads_to_join: | |
| if thread.is_alive(): | |
| print(f"Joining thread: {thread.name}") | |
| thread.join(timeout=5.0) # Give threads a reasonable timeout to finish | |
| if thread.is_alive(): | |
| print(f"Warning: Thread {thread.name} did not terminate gracefully.") | |
| print("All threads joined. Closing audio player.") | |
| if self.chat_log_file: | |
| self.chat_log_file.close() | |
| print("Chat log file closed.") | |
| self.audio_player.close() | |
| sys.exit(0) # Now exit the process after threads have had a chance to clean up | |
| def callback_interim(self, user_utterance): | |
| print(f"Interim: {user_utterance}") | |
| def callback_final(self, user_utterance): | |
| print("final callback called with:", user_utterance) | |
| if self.processing_new_utterance: # If a main_process is already running | |
| self.audio_player.request_interruption() | |
| self.user_utterance_queue.clear() # Clear the queue on interruption | |
| self.user_utterance_queue.append(user_utterance) | |
| self.user_utterance_event.set() # Signal the main process loop | |
| def _main_process_loop(self): | |
| while not self.exit_event.is_set(): | |
| # Wait for a new utterance or for the exit event to be set | |
| self.user_utterance_event.wait(timeout=0.1) # Add a small timeout to periodically check exit_event | |
| self.user_utterance_event.clear() # Clear the event for the next utterance | |
| if self.exit_event.is_set(): | |
| break # Exit loop if exit event is set | |
| if not self.user_utterance_queue: | |
| # print("Main process loop: Queue is empty, continuing...") # Too verbose | |
| continue # Skip if no valid utterance | |
| user_utterance = self.user_utterance_queue.popleft() | |
| self._process_utterance(user_utterance) | |
| def _write_to_chat_log(self, entry): | |
| if self.chat_log_file: | |
| timestamp = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]") | |
| self.chat_log_file.write(f"{timestamp} {entry}\n") | |
| self.chat_log_file.flush() # Ensure it's written to disk immediately | |
| def _process_utterance(self, user_utterance): | |
| self.processing_new_utterance = True | |
| self._write_to_chat_log(f"User: {user_utterance}") | |
| full_llm_response = "" # Accumulate the full LLM response | |
| delivered_llm_response = "" # Accumulate only the part that has been played | |
| last_played_segment_end_index = 0 | |
| try: | |
| llm_result = self.llm.get(user_utterance, interruption_check=lambda: self.audio_player.interruption_requested) | |
| for chunk in llm_result: | |
| if self.audio_player.interruption_requested: | |
| # Interruption occurred during LLM generation or TTS synthesis of a segment. | |
| # The 'full_llm_response' might be incomplete, and 'delivered_llm_response' holds what was played. | |
| return # Exit early, and the finally block will handle setting the delivered part. | |
| if not hasattr(chunk.choices[0].delta, "content") or chunk.choices[0].delta.content is None: | |
| continue | |
| word = chunk.choices[0].delta.content | |
| if word is not None: | |
| word = word.replace("\r", " ").replace("\n", " ") # Replace CR/LF with whitespace | |
| print(f"LLM Response: {word}") | |
| if word is None: | |
| break | |
| full_llm_response += word | |
| for split_word in ["、","。", "?", "!"]: | |
| if split_word in full_llm_response[last_played_segment_end_index:]: | |
| segment_to_play = full_llm_response[last_played_segment_end_index : full_llm_response.rfind(split_word) + len(split_word)] | |
| start_time = time.time() | |
| wav_data, _ = voicevox.get_audio_file_from_text(segment_to_play) | |
| end_time = time.time() | |
| print(f"Voicevox TTS query-response time: {end_time - start_time:.4f} seconds") | |
| if self.audio_player.interruption_requested: | |
| # Interruption occurred during TTS synthesis. This segment was not fully played. | |
| return # Exit early, finally block will handle. | |
| self.audio_player.play_audio(wav_data) | |
| delivered_llm_response += segment_to_play # Add to delivered only if successfully sent to player | |
| last_played_segment_end_index = full_llm_response.rfind(split_word) + len(split_word) | |
| # After the loop, handle any remaining segment | |
| if full_llm_response[last_played_segment_end_index:] != "": | |
| remaining_segment = full_llm_response[last_played_segment_end_index:] | |
| start_time = time.time() | |
| wav_data, _ = voicevox.get_audio_file_from_text(remaining_segment) | |
| end_time = time.time() | |
| print(f"Voicevox TTS query-response time: {end_time - start_time:.4f} seconds") | |
| if self.audio_player.interruption_requested: | |
| # Interruption occurred during TTS synthesis of the remaining segment. | |
| return # Exit early, finally block will handle. | |
| self.audio_player.play_audio(wav_data) | |
| delivered_llm_response += remaining_segment # Add to delivered if successfully sent to player | |
| finally: | |
| # This block will always execute, whether an interruption occurred or not. | |
| # Set the LLM's agent utterance to only the part that was actually delivered. | |
| self.llm.set_agent_utterance(delivered_llm_response) | |
| self._write_to_chat_log(f"Agent: {delivered_llm_response}") | |
| self.processing_new_utterance = False | |
| self.audio_player.interruption_requested = False # Reset the flag here | |
| if __name__ == '__main__': | |
| import argparse | |
| parser = argparse.ArgumentParser() | |
| parser.add_argument('--stt_variant', type=str, default='reazonspeech-nemo-v2', choices=['faster_whisper', 'reazonspeech', 'reazonspeech-nemo-v2']) | |
| args = parser.parse_args() | |
| ins = Main(enable_interim_callback=True, stt_variant=args.stt_variant) | |
| ins.wait() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Gemini explanation: https://gemini.google.com/share/2b2dc6710900
(full text pasted below for archival purpose)
User: explain this code: (code)
Gemini: This Python code defines the "orchestrator" for a real-time, voice-interactive AI agent. It integrates Speech-to-Text (STT), a Large Language Model (LLM), and Text-to-Speech (TTS) into a seamless pipeline that supports interruptible conversations.
Here is a breakdown of how the system works:
User: why this system is more responsive than regular llm piped through tts
Gemini: This system is significantly more responsive because it implements Parallel Streaming and Sentence-level Chunking.
In a "naive" pipeline, the system waits for the entire LLM response to finish, then sends that massive block of text to the TTS. This creates a "bottleneck" where the user sits in silence for several seconds.
Here is why this specific code is faster:
User: does this system generate same text repeatedly
Gemini: Based on the code provided, the system does not inherently generate the same text repeatedly. However, its behavior depends entirely on how the llama2.Llama2() class and the llm.get() method are implemented in your external files.
Here is a breakdown of why it usually stays fresh, and the specific reasons why it might occasionally get stuck:
User: what happens to chat log after interruption
Gemini: When an interruption occurs, the chat log is designed to reflect exactly what the user actually heard, rather than what the LLM originally planned to say.
This is handled by the interaction between the try...finally block and the delivered_llm_response variable. Here is the step-by-step breakdown: