Skip to content

Instantly share code, notes, and snippets.

@numpad0
Created December 25, 2025 15:26
Show Gist options
  • Select an option

  • Save numpad0/18ae612675688eeccd3af5eabcfdf686 to your computer and use it in GitHub Desktop.

Select an option

Save numpad0/18ae612675688eeccd3af5eabcfdf686 to your computer and use it in GitHub Desktop.
# Taken and modified from https://developers.cyberagent.co.jp/blog/archives/44592/
# I think also mostly vibecoded circa 2025/07
import threading
from collections import deque
import time
import signal
import sys
from datetime import datetime
from audio_player import AudioPlayer
from local_llm import llama2
from stt import faster_whisper_speech_recognition_stt
#from stt import reazonspeech_stt
#from stt import reazonspeech_nemo_v2_stt
from tts import voicevox
class Main():
def __init__(self, enable_interim_callback=False, stt_variant="faster_whisper") -> None:
self.enable_interim_callback = enable_interim_callback
self.stt_variant = stt_variant
print("Main: Initializing Main class...")
if self.stt_variant == "faster_whisper":
stt_module = faster_whisper_speech_recognition_stt
#elif self.stt_variant == "reazonspeech":
#stt_module = reazonspeech_stt
#elif self.stt_variant == "reazonspeech-nemo-v2":
#stt_module = reazonspeech_nemo_v2_stt
else:
stt_module = faster_whisper_speech_recognition_stt
#raise ValueError(f"Unknown STT variant: {self.stt_variant}")
self.exit_event = threading.Event()
self.user_utterance_event = threading.Event() # Event to signal new utterances
stt_thread = threading.Thread(target=stt_module.main, args=(self.callback_interim if self.enable_interim_callback else None, self.callback_final, self.exit_event, False,))
print("Main: STT thread created.")
self.llm = llama2.Llama2()
self.user_utterance_queue = deque()
print("Main: user_utterance_queue created.")
self.processing_new_utterance = False # Flag to indicate if main_process is active
print("Main: processing_new_utterance set.")
self.audio_player = AudioPlayer()
print("Main: AudioPlayer initialized.")
stt_thread.start()
print(f"STT thread started: {stt_thread.is_alive()}")
main_process_thread = threading.Thread(target=self._main_process_loop)
main_process_thread.start()
print(f"Main process thread started: {main_process_thread.is_alive()}")
print("Main class initialized. Threads started.")
print("Main class initialization complete.")
self.chat_log_file_path = datetime.now().strftime("chat_log_%Y%m%d_%H%M%S.txt")
try:
self.chat_log_file = open(self.chat_log_file_path, "a", encoding="utf-8")
print(f"Main: Chat log file opened at {self.chat_log_file_path}")
except IOError as e:
print(f"Error opening chat log file: {e}")
self.chat_log_file = None
def _signal_handler(self, signum, frame):
print(f"Signal {signum} received, setting exit event...")
self.exit_event.set()
# No sys.exit(0) here, let wait() handle the exit after threads clean up
def wait(self):
print("Waiting for threads to complete...")
# Wait indefinitely until the exit_event is set by the signal handler
self.exit_event.wait()
# Once exit_event is set, attempt to join all non-main threads
thread_list = threading.enumerate()
# Filter out the main thread and the current thread (if wait is called from a separate thread)
threads_to_join = [t for t in thread_list if t is not threading.main_thread() and t is not threading.current_thread()]
for thread in threads_to_join:
if thread.is_alive():
print(f"Joining thread: {thread.name}")
thread.join(timeout=5.0) # Give threads a reasonable timeout to finish
if thread.is_alive():
print(f"Warning: Thread {thread.name} did not terminate gracefully.")
print("All threads joined. Closing audio player.")
if self.chat_log_file:
self.chat_log_file.close()
print("Chat log file closed.")
self.audio_player.close()
sys.exit(0) # Now exit the process after threads have had a chance to clean up
def callback_interim(self, user_utterance):
print(f"Interim: {user_utterance}")
def callback_final(self, user_utterance):
print("final callback called with:", user_utterance)
if self.processing_new_utterance: # If a main_process is already running
self.audio_player.request_interruption()
self.user_utterance_queue.clear() # Clear the queue on interruption
self.user_utterance_queue.append(user_utterance)
self.user_utterance_event.set() # Signal the main process loop
def _main_process_loop(self):
while not self.exit_event.is_set():
# Wait for a new utterance or for the exit event to be set
self.user_utterance_event.wait(timeout=0.1) # Add a small timeout to periodically check exit_event
self.user_utterance_event.clear() # Clear the event for the next utterance
if self.exit_event.is_set():
break # Exit loop if exit event is set
if not self.user_utterance_queue:
# print("Main process loop: Queue is empty, continuing...") # Too verbose
continue # Skip if no valid utterance
user_utterance = self.user_utterance_queue.popleft()
self._process_utterance(user_utterance)
def _write_to_chat_log(self, entry):
if self.chat_log_file:
timestamp = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]")
self.chat_log_file.write(f"{timestamp} {entry}\n")
self.chat_log_file.flush() # Ensure it's written to disk immediately
def _process_utterance(self, user_utterance):
self.processing_new_utterance = True
self._write_to_chat_log(f"User: {user_utterance}")
full_llm_response = "" # Accumulate the full LLM response
delivered_llm_response = "" # Accumulate only the part that has been played
last_played_segment_end_index = 0
try:
llm_result = self.llm.get(user_utterance, interruption_check=lambda: self.audio_player.interruption_requested)
for chunk in llm_result:
if self.audio_player.interruption_requested:
# Interruption occurred during LLM generation or TTS synthesis of a segment.
# The 'full_llm_response' might be incomplete, and 'delivered_llm_response' holds what was played.
return # Exit early, and the finally block will handle setting the delivered part.
if not hasattr(chunk.choices[0].delta, "content") or chunk.choices[0].delta.content is None:
continue
word = chunk.choices[0].delta.content
if word is not None:
word = word.replace("\r", " ").replace("\n", " ") # Replace CR/LF with whitespace
print(f"LLM Response: {word}")
if word is None:
break
full_llm_response += word
for split_word in ["、","。", "?", "!"]:
if split_word in full_llm_response[last_played_segment_end_index:]:
segment_to_play = full_llm_response[last_played_segment_end_index : full_llm_response.rfind(split_word) + len(split_word)]
start_time = time.time()
wav_data, _ = voicevox.get_audio_file_from_text(segment_to_play)
end_time = time.time()
print(f"Voicevox TTS query-response time: {end_time - start_time:.4f} seconds")
if self.audio_player.interruption_requested:
# Interruption occurred during TTS synthesis. This segment was not fully played.
return # Exit early, finally block will handle.
self.audio_player.play_audio(wav_data)
delivered_llm_response += segment_to_play # Add to delivered only if successfully sent to player
last_played_segment_end_index = full_llm_response.rfind(split_word) + len(split_word)
# After the loop, handle any remaining segment
if full_llm_response[last_played_segment_end_index:] != "":
remaining_segment = full_llm_response[last_played_segment_end_index:]
start_time = time.time()
wav_data, _ = voicevox.get_audio_file_from_text(remaining_segment)
end_time = time.time()
print(f"Voicevox TTS query-response time: {end_time - start_time:.4f} seconds")
if self.audio_player.interruption_requested:
# Interruption occurred during TTS synthesis of the remaining segment.
return # Exit early, finally block will handle.
self.audio_player.play_audio(wav_data)
delivered_llm_response += remaining_segment # Add to delivered if successfully sent to player
finally:
# This block will always execute, whether an interruption occurred or not.
# Set the LLM's agent utterance to only the part that was actually delivered.
self.llm.set_agent_utterance(delivered_llm_response)
self._write_to_chat_log(f"Agent: {delivered_llm_response}")
self.processing_new_utterance = False
self.audio_player.interruption_requested = False # Reset the flag here
if __name__ == '__main__':
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--stt_variant', type=str, default='reazonspeech-nemo-v2', choices=['faster_whisper', 'reazonspeech', 'reazonspeech-nemo-v2'])
args = parser.parse_args()
ins = Main(enable_interim_callback=True, stt_variant=args.stt_variant)
ins.wait()
@numpad0
Copy link
Author

numpad0 commented Dec 25, 2025

Gemini explanation: https://gemini.google.com/share/2b2dc6710900

(full text pasted below for archival purpose)


User: explain this code: (code)

Gemini: This Python code defines the "orchestrator" for a real-time, voice-interactive AI agent. It integrates Speech-to-Text (STT), a Large Language Model (LLM), and Text-to-Speech (TTS) into a seamless pipeline that supports interruptible conversations.

Here is a breakdown of how the system works:

1. High-Level Architecture
The system operates using multiple threads to ensure the UI or main loop doesn't freeze while waiting for heavy AI processing.
    STT Thread: Constantly listens to the microphone and converts speech to text.
    Main Process Loop: A background thread that waits for new text from the STT thread and coordinates the LLM and TTS.
    Audio Player: A dedicated module to handle playing back the generated speech.

2. Key Components & Logic
Multithreading & Synchronization
The code uses threading.Event and collections.deque (a thread-safe queue) to communicate between threads:
    self.exit_event: Used to shut down all threads gracefully when the user kills the program.
    self.user_utterance_event: A signal that tells the main loop "Hey, the user just said something, start processing!"
    self.user_utterance_queue: Stores the text of what the user said until the LLM is ready to read it.
The Interruptible Workflow (callback_final)
One of the most advanced features here is the ability to interrupt the AI. If the AI is currently talking (self.processing_new_utterance is True) and the user starts speaking again, the code calls self.audio_player.request_interruption(). This stops the current voice output immediately so the AI can listen to the new input.
Streaming & Chunking (_process_utterance)
To reduce "latency" (the time you wait for a response), the code doesn't wait for the LLM to finish the entire paragraph. Instead:
    It streams words from Llama 2.
    It looks for punctuation marks like 。, !, or ? (Japanese delimiters).
    As soon as a full sentence is formed, it sends just that sentence to Voicevox (TTS) to be turned into audio.
    It plays that sentence while the LLM is still generating the next one.
Logging and Cleanup
    Chat Logs: Every interaction is timestamped and saved to a .txt file.
    Graceful Exit: The wait() method ensures that when you stop the program (Ctrl+C), it finishes writing logs and closes audio streams properly before exiting.

3. Summary of Data Flow
    Step	Component	Action
    1	STT (Whisper)	Converts user voice to text.
    2	callback_final	Adds text to queue and triggers the processing event.
    3	LLM (Llama2)	Generates a text response in chunks.
    4	TTS (Voicevox)	Converts text segments into .wav data.
    5	AudioPlayer	Plays the sound and monitors for interruptions.

4. Technical Detail: Why use .rfind()?
In _process_utterance, the code uses full_llm_response.rfind(split_word). This is a clever way to find the last occurrence of punctuation in the current buffer. This ensures that if the LLM generates "Hello. How are you?", the system picks up "Hello." first, plays it, and then moves on to "How are you?".
Would you like me to explain how to modify the interruption sensitivity or how to switch to a different TTS engine?

User: why this system is more responsive than regular llm piped through tts

Gemini: This system is significantly more responsive because it implements Parallel Streaming and Sentence-level Chunking.

In a "naive" pipeline, the system waits for the entire LLM response to finish, then sends that massive block of text to the TTS. This creates a "bottleneck" where the user sits in silence for several seconds.
Here is why this specific code is faster:

1. Eliminating the "Linear Bottleneck"
In a standard serial pipeline, your wait time is TimeLLM​+TimeTTS​. In this code, the wait time is reduced to only TimeFirst_Sentence​.
    Standard Pipeline: LLM generates 100 words → TTS processes 100 words → Audio plays. (High Latency)
    This Code: LLM generates 10 words (first sentence) → TTS immediately processes those 10 words → Audio plays while the LLM is still generating the remaining 90 words. (Low Latency)

2. Sentence-Level Chunking
The code uses a "buffer and split" logic. It watches the incoming stream for Japanese punctuation:
Python
for split_word in ["、","。", "?", "!"]:
    if split_word in full_llm_response[last_played_segment_end_index:]:
        # Grab just the sentence and send to TTS immediately
By splitting at 。 (period) or ? (question mark), the system sends small, manageable bites to the Voicevox engine. Since TTS engines process short strings much faster than long paragraphs, the "Time to First Audio" is drastically reduced.

3. Asynchronous Audio Buffering
Because the AudioPlayer and LLM are running on different threads, the system can "look ahead."
While you are listening to Sentence A, the system is already:
    Receiving Sentence B from the LLM.
    Synthesizing Sentence B into a WAV file via Voicevox.
    Queuing Sentence B in the AudioPlayer.
This creates a seamless flow of speech without the "robotic pauses" usually found between sentences in simpler scripts.

4. Immediate Interruption Handling
Most LLM pipelines are "deaf" while they are "speaking." If you try to correct a regular AI mid-sentence, it will finish its entire pre-planned script before listening to you.
In this code, the callback_final function monitors for new user input during playback:
    It immediately calls audio_player.request_interruption().
    It clears the user_utterance_queue.
    The LLM generation loop checks interruption_requested and kills the current process instantly.
This makes the system feel "alive" because it reacts to you in milliseconds, rather than making you wait for it to finish a long-winded response.
Would you like to know how to tune the "split words" to make the response feel even more natural for English or other languages?

User: does this system generate same text repeatedly

Gemini: Based on the code provided, the system does not inherently generate the same text repeatedly. However, its behavior depends entirely on how the llama2.Llama2() class and the llm.get() method are implemented in your external files.

Here is a breakdown of why it usually stays fresh, and the specific reasons why it might occasionally get stuck:

1. The Context Memory (Why it shouldn't repeat)
In the finally block of the _process_utterance method, there is a critical line:
Python
self.llm.set_agent_utterance(delivered_llm_response)
This suggests that the llama2 class maintains a conversation history.
    Every time the AI finishes speaking, it "remembers" what it just said.
    In the next turn, it sends that history back to the LLM model.
    Because LLMs are designed to predict the next logical part of a conversation, having the previous context prevents it from looping the same greeting or answer over and over.

2. LLM "Stochasticity" (Randomness)
Most implementations of llama2 (like the one used in your self.llm.get call) use a parameter called Temperature.
    If Temperature is set above 0, the model chooses words with a degree of randomness.
    Even if you ask the exact same question twice, a temperature of 0.7 or 0.8 will likely result in two different phrasings.

3. When might it repeat? (The Risks)
There are three specific scenarios where this code could result in repetitive text:
    Low Temperature (0): If the LLM is configured with temperature=0, it becomes deterministic. If you give it the same input, it will provide the exact same "best" output every time.
    The "Interruption Loop": If you interrupt the AI constantly, the code clears the queue but may not be saving the "interrupted" fragment correctly to memory. If the delivered_llm_response is empty because you interrupted it before a single sentence finished, the LLM might "forget" it tried to speak and start the same sentence again.
    Small Model Constraints: If you are using a very small version of Llama 2 (like a 3B or 7B parameter model), these models are more prone to "repetition penalty" issues, where they get stuck in a loop of the same three or four words (e.g., "I think that... I think that...").

4. How the "Interruption" affects repetition
This code has a very specific way of handling "half-finished" thoughts:
Python
    # Set the LLM's agent utterance to ONLY the part that was actually delivered.
    self.llm.set_agent_utterance(delivered_llm_response)

This is a sophisticated touch. If the AI was going to say a 100-word paragraph but you interrupted it after 5 words, it only saves those 5 words to its memory. This prevents "hallucinated history" where the AI thinks it said something that you never actually heard.
Would you like to see how to add a "repetition penalty" to the llm.get call to ensure the text stays varied?

User: what happens to chat log after interruption

Gemini: When an interruption occurs, the chat log is designed to reflect exactly what the user actually heard, rather than what the LLM originally planned to say.

This is handled by the interaction between the try...finally block and the delivered_llm_response variable. Here is the step-by-step breakdown:

1. Tracking the "Delivered" Text
As the LLM streams text, the code maintains two separate variables:
    full_llm_response: Everything the LLM has generated so far.
    delivered_llm_response: Only the text segments that were successfully sent to the AudioPlayer.

2. The Interruption Trigger
If you start speaking while the AI is talking, the audio_player.interruption_requested flag becomes True. The code immediately hits a return statement:
Python
if self.audio_player.interruption_requested:
    return # Exit early, and the finally block will handle setting the delivered part.

3. The finally Block (The Log Writer)
Because the processing is wrapped in a finally block, that code runs even if the function returns early due to an interruption.
The logic inside that block performs the following "cleanup" for your chat log:
    Truncation: It ignores the rest of the full_llm_response that was never turned into sound.
    Writing to Disk: It calls self._write_to_chat_log(f"Agent: {delivered_llm_response}").
    Memory Sync: It tells the LLM class to only remember the delivered part so the next response makes sense contextually.

Summary Table: Log Behavior
    Scenario	What goes in the Chat Log?
    No Interruption	The full text of the AI's response.
    Interruption (Mid-Sentence)	Only the complete sentences that were finished and played before you spoke.
    Interruption (Immediate)	If you interrupted before the first sentence finished, the Agent entry in the log will be blank or very short.

Why this matters for the AI's "Brain"
If the log (and the LLM memory) saved the entire intended response instead of just the delivered part, the AI would get confused.
For example, if the AI tried to say "I can help with that. Would you like to start now?" but you interrupted after "I can help with that," the log ensures the AI doesn't wrongly assume it already asked the follow-up question.
Would you like me to show you how to modify the code so that even "partial" sentences (the words caught mid-stream) get saved to the log during an interruption?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment