Skip to content

Instantly share code, notes, and snippets.

@telnet2
Last active February 20, 2026 07:57
Show Gist options
  • Select an option

  • Save telnet2/6e755f4934239ff35564f03d44a6461e to your computer and use it in GitHub Desktop.

Select an option

Save telnet2/6e755f4934239ff35564f03d44a6461e to your computer and use it in GitHub Desktop.
Qwen3-TTS on Apple Silicon - session files (mcp_server, patches, howto)
{
"mcpServers": {
"qwen3-tts-mcp": {
"type": "stdio",
"command": "/Users/joohwi.lee/.local/bin/qwen3-tts-mcp",
"args": [
"--models-dir",
"/Users/joohwi.lee/.local/share/qwen3-tts-models"
],
"env": {}
}
}
}

Qwen3-TTS How-To: Lessons Learned

Models

Three models are installed locally at ~/.local/share/qwen3-tts-models/:

Key Model Notes
1 Qwen3-TTS-12Hz-1.7B-CustomVoice-8bit Default. Best quality. Supports instruct.
4 Qwen3-TTS-12Hz-0.6B-CustomVoice-8bit Faster, less RAM. No instruction control.
6 Qwen3-TTS-12Hz-0.6B-Base-8bit Voice cloning from reference audio.

Use aliases: pro-custom (1), lite-custom (4), lite-clone (6).


Speakers

For best quality, use the speaker whose native language matches the text.

Speaker Native Language Description
Vivian Chinese Bright, slightly edgy young female
Serena Chinese Warm, gentle young female
Ryan English Dynamic male with strong rhythmic drive
Aiden English Sunny American male with clear midrange
Ethan English
Chelsie English

The full 1.7B model supports 9 speakers (including Ono_Anna for Japanese and Sohee for Korean) but the installed mlx-community quantized version ships with 6.


Language Codes

Always pass --lang-code for best results. Auto-detection works but can misidentify.

auto, chinese, english, japanese, korean, german, french, russian, portuguese, spanish, italian

Instruction Control (instruct)

The 1.7B CustomVoice model supports natural language style control via instruct.

# Excited tone
python mcp_server.py --speak-text "Hello!" --voice Ryan --instruct "speak in an excited and energetic tone"

# Angry tone
python mcp_server.py --speak-text "I told you so." --voice Ryan --instruct "speak in an angry tone"

# Chinese emotional style
python mcp_server.py --speak-text "你好!" --voice Vivian --lang-code chinese --instruct "用特别愉快的语气说"

Default instruction is "normal tone" when omitted.


MCP Setup

Run via Python (recommended)

Point .mcp.json directly at the venv Python — avoids PyInstaller startup overhead:

{
  "mcpServers": {
    "qwen3-tts-mcp": {
      "type": "stdio",
      "command": "/path/to/tts/.venv/bin/python",
      "args": [
        "/path/to/tts/mcp_server.py",
        "--models-dir", "/Users/<you>/.local/share/qwen3-tts-models"
      ]
    }
  }
}

Run via compiled binary

The binary is slower on cold start (~15s) due to PyInstaller bootstrap overhead. Once warm it is fine. Build with:

.venv/bin/pyinstaller qwen3-tts-mcp.spec --noconfirm
rm -f ~/.local/bin/qwen3-tts-mcp
cp dist/qwen3-tts-mcp ~/.local/bin/qwen3-tts-mcp

Important: The binary bundles all Python dependencies. Patches to .venv packages (e.g. mlx_audio) are NOT reflected in the binary until you rebuild.


Audio Buffering

The problem

mlx_audio's AudioPlayer defaults to min_buffer_seconds = 1.5. Combined with a streaming_interval of 2.0 seconds, audio chunks arrive too slowly and gaps appear between chunks during playback.

The fix (applied in mcp_server.py)

from mlx_audio.tts.audio_player import AudioPlayer
AudioPlayer.min_buffer_seconds = 4.0  # wait for larger buffer before starting playback

And in generate_audio():

streaming_interval=5.0  # generate larger chunks before yielding

Monkey-patching AudioPlayer directly in mcp_server.py (rather than editing the library file) survives pip upgrades.

The drain bug

With min_buffer_seconds = 4.0, short phrases (< 4 seconds) never accumulate enough audio to trigger playback. wait_for_drain() blocks forever — the process hangs.

Fix: patch wait_for_drain to force-start playback if audio is buffered but not yet playing:

_original_wait_for_drain = AudioPlayer.wait_for_drain
def _patched_wait_for_drain(self):
    if not self.playing and self.buffered_samples() > 0:
        self.start_stream()
    return _original_wait_for_drain(self)
AudioPlayer.wait_for_drain = _patched_wait_for_drain

CLI Reference

python mcp_server.py \
  --speak-text "Hello world" \
  --voice Ryan \
  --speak-model 1 \
  --lang-code english \
  --instruct "speak in an excited tone" \
  --speak-keep-file \
  --speak-output-dir ./outputs
Flag Default Description
--speak-text Text to synthesize
--voice Vivian Speaker name
--speak-model 1 Model key (1-6 or alias)
--lang-code auto Language hint
--instruct None Style instruction
--speak-speed 1.0 Speed (note: not yet implemented in mlx_audio)
--speak-keep-file off Save WAV to disk
--speak-output-dir outputs/ Output directory
--speak-no-play off Disable audio playback

Known Issues

  • speed parameter has no effectmlx_audio accepts it but notes "not directly supported yet".
  • Tokenizer warningtransformers 5.0.0rc3 warns about an incorrect regex pattern in the Qwen3 tokenizer. The warning is cosmetic — audio quality is not affected. fix_mistral_regex=True was tested and confirmed to make no audible difference.
  • Binary vs Python path — The warning always exists in both; it's just hidden in MCP tool output (stderr not forwarded).
#!/usr/bin/env python3
import argparse
import json
import tarfile
import os
import shutil
import re
import sys
import tempfile
import time
import uuid
import threading
from typing import Dict, Literal, Optional, Sequence, Union
from urllib.error import HTTPError, URLError
from urllib.request import urlopen
from mcp.server.fastmcp import FastMCP
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
MODELS_DIR = os.path.expanduser("~/.local/share/qwen3-tts-models")
OUTPUT_DIR = os.path.join(BASE_DIR, "outputs")
DEFAULT_MANIFEST = "qwen3-tts-models.manifest"
DEFAULT_TOS_BASE_URL = "https://tosv.boei18n.byted.org/obj/bytesec-homebrew-boei18n/qwen-tts-models"
FILENAME_PREFIX = "mcp_audio"
# Keep these in sync with main.py model options.
MODELS = {
"1": {"name": "Custom Voice", "folder": "Qwen3-TTS-12Hz-1.7B-CustomVoice-8bit", "mode": "custom", "repo_ids": ["mlx-community/Qwen3-TTS-12Hz-1.7B-CustomVoice-8bit"]},
"2": {"name": "Voice Design", "folder": "Qwen3-TTS-12Hz-1.7B-VoiceDesign-8bit", "mode": "design", "repo_ids": ["mlx-community/Qwen3-TTS-12Hz-1.7B-VoiceDesign-8bit"]},
"3": {"name": "Voice Cloning", "folder": "Qwen3-TTS-12Hz-1.7B-Base-8bit", "mode": "clone", "repo_ids": ["mlx-community/Qwen3-TTS-12Hz-1.7B-Base-8bit"]},
"4": {"name": "Custom Voice", "folder": "Qwen3-TTS-12Hz-0.6B-CustomVoice-8bit", "mode": "custom", "repo_ids": ["mlx-community/Qwen3-TTS-12Hz-0.6B-CustomVoice-8bit"]},
"5": {"name": "Voice Design", "folder": "Qwen3-TTS-12Hz-0.6B-VoiceDesign-8bit", "mode": "design", "repo_ids": ["mlx-community/Qwen3-TTS-12Hz-0.6B-VoiceDesign-6bit", "mlx-community/Qwen3-TTS-12Hz-0.6B-VoiceDesign-5bit", "mlx-community/Qwen3-TTS-12Hz-0.6B-VoiceDesign-4bit"]},
"6": {"name": "Voice Cloning", "folder": "Qwen3-TTS-12Hz-0.6B-Base-8bit", "mode": "clone", "repo_ids": ["mlx-community/Qwen3-TTS-12Hz-0.6B-Base-8bit"]},
}
MODELS_ALIAS = {
"1": "1",
"2": "2",
"3": "3",
"4": "4",
"5": "5",
"6": "6",
"q1": "1",
"q2": "2",
"q3": "3",
"q4": "4",
"q5": "5",
"q6": "6",
"17b-custom": "1",
"17b-voice-design": "2",
"17b-clone": "3",
"06b-custom": "4",
"06b-voice-design": "5",
"06b-clone": "6",
"06b-design": "5",
"0.6b-custom": "4",
"0.6b-design": "5",
"0.6b-clone": "6",
"1.7b-custom": "1",
"1.7b-voice-design": "2",
"1.7b-clone": "3",
"pro-custom": "1",
"pro-design": "2",
"pro-clone": "3",
"lite-custom": "4",
"lite-design": "5",
"lite-clone": "6",
"custom": "4",
"design": "5",
"clone": "6",
}
DEFAULT_CUSTOM_SPEAKERS = ["Ryan", "Aiden", "Ethan", "Chelsie", "Serena", "Vivian"]
SpeakerName = Literal[
"Ryan",
"Aiden",
"Ethan",
"Chelsie",
"Serena",
"Vivian",
]
mcp = FastMCP("qwen3-tts")
_model_cache: Dict[str, object] = {}
_cache_lock = threading.Lock()
_MLX_IMPORT_ERROR: Optional[str] = None
def _runtime_base_dir() -> str:
return os.path.abspath(getattr(sys, "_MEIPASS", BASE_DIR))
def _default_manifest_path(manifest_path: Optional[str] = None) -> str:
if manifest_path:
return manifest_path
candidates = [
os.getenv("QWEN_TTS_MANIFEST"),
os.path.join(_runtime_base_dir(), "scripts", DEFAULT_MANIFEST),
os.path.join(BASE_DIR, "scripts", DEFAULT_MANIFEST),
os.path.join(os.getcwd(), "scripts", DEFAULT_MANIFEST),
os.path.join(_runtime_base_dir(), DEFAULT_MANIFEST),
]
for candidate in candidates:
if candidate and os.path.isfile(candidate):
return candidate
return os.path.join(_runtime_base_dir(), "scripts", DEFAULT_MANIFEST)
def _normalize_model_selection(
selected: Optional[Union[str, Sequence[str]]] = None
) -> list[str]:
if selected is None:
return []
if isinstance(selected, str):
source = [selected]
else:
source = list(selected)
normalized: list[str] = []
for item in source:
if isinstance(item, str):
normalized.extend(part.strip() for part in item.split(",") if part.strip())
else:
normalized.append(str(item).strip())
return [name for name in normalized if name]
def _read_manifest(path: str) -> list[str]:
if not os.path.exists(path):
raise FileNotFoundError(f"Manifest not found at '{path}'.")
entries: list[str] = []
with open(path, "r", encoding="utf-8") as handle:
for line in handle:
rel = line.strip()
if rel:
entries.append(rel)
if not entries:
raise RuntimeError(f"Manifest is empty: '{path}'.")
return entries
def _uploaded_model_folders(manifest_path: Optional[str] = None) -> list[str]:
manifest = _read_manifest(_default_manifest_path(manifest_path))
seen: set[str] = set()
ordered: list[str] = []
for rel in manifest:
folder = rel.split("/", 1)[0]
if folder not in seen:
ordered.append(folder)
seen.add(folder)
return ordered
def _manifest_model_folders(manifest: list[str], include_lock: bool = False) -> list[str]:
ordered: list[str] = []
seen: set[str] = set()
for rel in manifest:
if not include_lock and rel.endswith(".lock"):
continue
if rel.endswith(".tar.gz"):
folder = rel[: -len(".tar.gz")]
else:
folder = rel.split("/", 1)[0]
if not folder or folder in seen:
continue
ordered.append(folder)
seen.add(folder)
return ordered
def _download_with_retries(url: str, destination: str, retries: int) -> bool:
retries = max(1, retries)
last_error: Optional[Exception] = None
for attempt in range(1, retries + 1):
try:
with urlopen(url, timeout=60) as response:
status = getattr(response, "status", 200)
if status != 200:
raise HTTPError(url, status, "Non-200 response", response.headers, None)
os.makedirs(os.path.dirname(destination), exist_ok=True)
with open(destination, "wb") as f:
shutil.copyfileobj(response, f)
return True
except (HTTPError, URLError, OSError) as exc:
last_error = exc
if attempt >= retries:
break
time.sleep(2**(attempt - 1))
if last_error:
raise RuntimeError(f"Download failed for '{url}': {last_error}")
raise RuntimeError(f"Download failed for '{url}'.")
def _install_models(
models_dir: str,
selected_models: Optional[Sequence[str]] = None,
base_url: str = DEFAULT_TOS_BASE_URL,
manifest_path: Optional[str] = None,
include_lock: bool = False,
force: bool = False,
dry_run: bool = False,
retries: int = 3,
) -> dict:
selected = _normalize_model_selection(selected_models)
manifest = _read_manifest(_default_manifest_path(manifest_path))
model_folders = _manifest_model_folders(manifest, include_lock=include_lock)
to_process: list[str] = []
if not selected:
to_process = model_folders
else:
for model_name in model_folders:
if model_name in selected:
to_process.append(model_name)
summary = {
"status": "ok",
"base_url": base_url.rstrip("/"),
"destination": models_dir,
"manifest": _default_manifest_path(manifest_path),
"model_filter": selected,
"include_lock": include_lock,
"force": force,
"dry_run": dry_run,
"retries": max(1, retries),
"requested": len(model_folders),
"selected": len(to_process),
"planned": [],
"downloaded": [],
"skipped": [],
"failed": [],
}
for model_name in to_process:
destination = os.path.join(models_dir, model_name)
if os.path.exists(destination) and not force:
summary["skipped"].append(model_name)
continue
if dry_run:
summary["planned"].append(model_name)
continue
try:
if force and os.path.isdir(destination):
shutil.rmtree(destination)
with tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) as handle:
tarball_path = handle.name
try:
_download_with_retries(
f"{base_url.rstrip('/')}/{model_name}.tar.gz",
tarball_path,
retries,
)
with tarfile.open(tarball_path, "r:gz") as tar:
tar.extractall(models_dir)
summary["downloaded"].append(model_name)
finally:
if os.path.exists(tarball_path):
os.remove(tarball_path)
except Exception as exc: # pragma: no cover - external network path
summary["failed"].append({"file": f"{model_name}.tar.gz", "reason": str(exc)})
summary["planned_count"] = len(summary["planned"])
summary["downloaded_count"] = len(summary["downloaded"])
summary["skipped_count"] = len(summary["skipped"])
summary["failed_count"] = len(summary["failed"])
if summary["failed_count"] > 0:
summary["status"] = "error"
return summary
def _build_model_list_result(models_dir: Optional[str] = None, manifest_path: Optional[str] = None) -> dict:
active_dir = models_dir or MODELS_DIR
rows = []
for folder in _uploaded_model_folders(manifest_path):
folder_path = os.path.join(active_dir, folder)
rows.append(
{
"folder": folder,
"installed": os.path.isdir(folder_path),
"path": folder_path,
}
)
return {
"status": "ok",
"count": len(rows),
"models_dir": active_dir,
"models": rows,
}
def _uninstall_mcp(
uninstall_binary: bool = True,
uninstall_models: bool = True,
binary_path: Optional[str] = None,
models_dir: Optional[str] = None,
) -> dict:
removed_binary = False
removed_models = False
result = {
"status": "ok",
"binary": None,
"models_dir": None,
"binary_removed": False,
"models_dir_removed": False,
}
if uninstall_binary:
resolved_binary = binary_path or None
if resolved_binary:
binary_to_remove = resolved_binary
else:
binary_to_remove = os.path.realpath(sys.argv[0])
result["binary"] = binary_to_remove
if os.path.exists(binary_to_remove) or os.path.islink(binary_to_remove):
try:
os.remove(binary_to_remove)
removed_binary = True
except OSError as exc:
result["status"] = "error"
result["reason"] = f"failed to remove binary '{binary_to_remove}': {exc}"
raise RuntimeError(result["reason"])
if uninstall_models:
effective_models_dir = models_dir or MODELS_DIR
result["models_dir"] = effective_models_dir
if os.path.isdir(effective_models_dir) or os.path.islink(effective_models_dir):
try:
shutil.rmtree(effective_models_dir)
removed_models = True
except OSError as exc:
result["status"] = "error"
result["reason"] = f"failed to remove models dir '{effective_models_dir}': {exc}"
raise RuntimeError(result["reason"])
result["binary_removed"] = removed_binary
result["models_dir_removed"] = removed_models
return result
def _clean_text(value: str) -> str:
return re.sub(r"\s+", " ", value).strip()
def _clean_path(raw_path: str) -> str:
path = raw_path.strip()
if len(path) > 1 and path[0] in ['"', "'"] and path[-1] == path[0]:
path = path[1:-1]
return path.replace("\\ ", " ")
def _get_model_path(model_key: str) -> str:
key = model_key.lower().strip()
mapped = MODELS_ALIAS.get(key, key)
if mapped not in MODELS:
raise ValueError(f"Unknown model '{model_key}'. Use one of: 1-6.")
folder = MODELS[mapped]["folder"]
model_path = os.path.join(MODELS_DIR, folder)
if not os.path.exists(model_path):
raise FileNotFoundError(
"Model '{}' is not installed. Install first using install_models(models=['{}'], destination='{}').".format(
key,
folder,
MODELS_DIR,
)
)
snapshots_dir = os.path.join(model_path, "snapshots")
if os.path.exists(snapshots_dir):
candidates = sorted([x for x in os.listdir(snapshots_dir) if not x.startswith(".")])
if candidates:
return os.path.join(snapshots_dir, candidates[0])
return model_path
def _resolve_model(model_key: str):
global _MLX_IMPORT_ERROR
if _MLX_IMPORT_ERROR:
raise RuntimeError(_MLX_IMPORT_ERROR)
try:
from mlx_audio.tts.utils import load_model
except Exception as exc:
_MLX_IMPORT_ERROR = f"MLX runtime unavailable: {exc}"
raise RuntimeError(_MLX_IMPORT_ERROR) from exc
key = MODELS_ALIAS.get(model_key.lower().strip(), model_key.lower().strip())
with _cache_lock:
cached = _model_cache.get(key)
if cached is not None:
return cached
model_path = _get_model_path(key)
model = load_model(model_path)
_model_cache[key] = model
return model
def _expected_output_file(output_dir: str, prefix: str) -> str:
return os.path.join(output_dir, f"{prefix}_000.wav")
def _build_tool_response(text: str, model: str, saved_file: Optional[str], played: bool) -> dict:
response = {
"status": "ok",
"model": model,
"text_preview": text[:80],
"played": played,
}
if saved_file:
response["saved_file"] = saved_file
return response
@mcp.tool()
def speak_text(
text: str,
model: str = "1",
speaker: SpeakerName = "Vivian",
instruct: Optional[str] = None,
speed: float = 1.0,
lang_code: str = "auto",
play: bool = True,
keep_file: bool = False,
output_dir: Optional[str] = None,
ref_audio: Optional[str] = None,
ref_text: Optional[str] = None,
) -> dict:
"""
Generate speech and optionally play it.
model accepts:
- numeric keys: 1-6
- aliases: "pro-custom", "lite-design", "1.7b-clone", etc.
- default "1" = Qwen3-TTS-12Hz-1.7B-CustomVoice (best quality, supports instruct)
speaker accepts (use native language for best quality):
- Vivian (Chinese native - bright, slightly edgy young female)
- Serena (Chinese native - warm, gentle young female)
- Ryan (English native - dynamic male with strong rhythmic drive)
- Aiden (English native - sunny American male with clear midrange)
- Ethan (English)
- Chelsie (English)
instruct: natural language style control, e.g. "speak in an angry tone", "用特别愉快的语气说"
- supported by CustomVoice (models 1, 4) and VoiceDesign (models 2, 5)
lang_code accepts: auto, chinese, english, japanese, korean, german, french, russian, portuguese, spanish, italian
"""
text = _clean_text(text)
if not text:
return {"status": "error", "reason": "Text is empty."}
try:
resolved = MODELS_ALIAS.get(model.lower().strip(), model.lower().strip())
info = MODELS[resolved]
tts_model = _resolve_model(resolved)
model_mode = info["mode"]
except KeyError:
return {"status": "error", "reason": f"Unknown model '{model}'. Use 1-6 or aliases."}
except FileNotFoundError as exc:
return {
"status": "error",
"reason": "Model not installed. Install the model before speaking: " + str(exc),
}
except Exception as exc: # pragma: no cover - passthrough error path
return {"status": "error", "reason": str(exc)}
if model_mode == "custom":
if speaker not in DEFAULT_CUSTOM_SPEAKERS:
return {
"status": "error",
"reason": f"speaker '{speaker}' not in available custom voices.",
}
voice = speaker
tone = instruct or "normal tone"
kwargs = {"instruct": tone}
elif model_mode == "design":
voice = speaker
tone = instruct or "a clear neutral voice"
kwargs = {"instruct": tone}
else:
# voice cloning
if speaker not in DEFAULT_CUSTOM_SPEAKERS:
voice = "Vivian"
else:
voice = speaker
if not ref_audio:
return {
"status": "error",
"reason": "Clone mode requires ref_audio path.",
}
cleaned_ref_audio = _clean_path(ref_audio)
if not os.path.exists(cleaned_ref_audio):
return {
"status": "error",
"reason": f"Reference audio not found: '{cleaned_ref_audio}'.",
}
if not ref_text:
return {
"status": "error",
"reason": "Clone mode requires ref_text transcript.",
}
kwargs = {"ref_audio": cleaned_ref_audio, "ref_text": ref_text}
final_output_dir = output_dir or (OUTPUT_DIR if keep_file else None)
saved_file: Optional[str] = None
should_stream = final_output_dir is None
if final_output_dir:
os.makedirs(final_output_dir, exist_ok=True)
file_prefix = f"{FILENAME_PREFIX}_{uuid.uuid4().hex[:8]}"
generated_candidate = _expected_output_file(final_output_dir, file_prefix)
else:
generated_candidate = None
file_prefix = FILENAME_PREFIX
try:
from mlx_audio.tts.generate import generate_audio
from mlx_audio.tts.audio_player import AudioPlayer
AudioPlayer.min_buffer_seconds = 4.0
_original_wait_for_drain = AudioPlayer.wait_for_drain
def _patched_wait_for_drain(self):
if not self.playing and self.buffered_samples() > 0:
self.start_stream()
return _original_wait_for_drain(self)
AudioPlayer.wait_for_drain = _patched_wait_for_drain
generate_audio(
model=tts_model,
text=text,
voice=voice,
speed=speed,
lang_code=lang_code,
verbose=False,
play=bool(play),
stream=should_stream,
streaming_interval=5.0,
output_path=final_output_dir,
file_prefix=file_prefix or FILENAME_PREFIX,
**kwargs,
)
except Exception as exc:
return {"status": "error", "reason": str(exc)}
if final_output_dir and generated_candidate and os.path.exists(generated_candidate):
saved_file = generated_candidate
elif final_output_dir:
# Fallback: pick newest matching file if numbering differs.
matching = sorted(
[
os.path.join(final_output_dir, p)
for p in os.listdir(final_output_dir)
if p.startswith(file_prefix) and p.endswith(".wav")
]
)
if matching:
saved_file = matching[-1]
return _build_tool_response(text, model=resolved, saved_file=saved_file, played=play)
@mcp.tool()
def speak_texts(
texts: list[str],
model: str = "1",
speakers: Optional[list[str]] = None,
instructs: Optional[list[str]] = None,
lang_code: str = "auto",
play: bool = True,
keep_file: bool = False,
output_dir: Optional[str] = None,
) -> dict:
"""
Generate and play speech for an array of sentences, one by one.
Each sentence is synthesized and played sequentially (streaming sentence-by-sentence).
texts: list of sentences to speak.
speakers: optional list of speaker names per sentence (cycles last value if shorter than texts).
instructs: optional list of instructions per sentence (cycles last value if shorter).
lang_code: language hint applied to all sentences (default: auto).
"""
if not texts:
return {"status": "error", "reason": "texts list is empty."}
results = []
for i, text in enumerate(texts):
speaker = (speakers[i] if speakers and i < len(speakers) else
(speakers[-1] if speakers else "Vivian"))
instruct = (instructs[i] if instructs and i < len(instructs) else
(instructs[-1] if instructs else None))
result = speak_text(
text=text,
model=model,
speaker=speaker,
instruct=instruct,
lang_code=lang_code,
play=play,
keep_file=keep_file,
output_dir=output_dir,
)
results.append({"index": i, "text_preview": text[:40], **result})
if result.get("status") != "ok":
break
return {"status": "ok", "count": len(results), "results": results}
@mcp.tool()
def install_models(
models: Optional[Union[str, Sequence[str]]] = None,
base_url: str = DEFAULT_TOS_BASE_URL,
manifest_path: Optional[str] = None,
destination: Optional[str] = None,
use_temp_dir: bool = False,
include_lock: bool = False,
force: bool = False,
dry_run: bool = False,
retries: int = 3,
) -> dict:
"""
Download model artifacts from the private model mirror.
models accepts one or more model folder names, e.g.
["Qwen3-TTS-12Hz-0.6B-Base-8bit"] or "Qwen3...".
"""
try:
effective_destination = (
destination if destination is not None else None
)
if not effective_destination and use_temp_dir:
effective_destination = tempfile.mkdtemp(prefix="qwen3-tts-models-")
return _install_models(
models_dir=effective_destination or MODELS_DIR,
selected_models=models,
base_url=base_url,
manifest_path=manifest_path,
include_lock=include_lock,
force=force,
dry_run=dry_run,
retries=retries,
)
except Exception as exc:
return {"status": "error", "reason": str(exc)}
@mcp.tool()
def list_uploaded_models(
models_dir: Optional[str] = None,
manifest_path: Optional[str] = None,
) -> dict:
"""
List model folders from embedded manifest and report whether each one is installed locally.
"""
try:
return _build_model_list_result(models_dir or MODELS_DIR, manifest_path)
except Exception as exc:
return {"status": "error", "reason": str(exc)}
def _build_cli_parser():
parser = argparse.ArgumentParser(description="Run Qwen3-TTS MCP server.")
parser.add_argument(
"--models-dir",
default=MODELS_DIR,
help="Path to local model directory (default: ~/.local/share/qwen3-tts-models).",
)
parser.add_argument(
"--install-destination",
default=None,
help="Optional destination for --install-models or --install-only.",
)
parser.add_argument(
"--install-temp-dir",
action="store_true",
help="Download requested models into a new temporary directory.",
)
parser.add_argument(
"--manifest",
default=None,
help="Optional path to qwen3-tts-models manifest.",
)
parser.add_argument(
"--install-models",
action="store_true",
help="Download models from model mirror before starting server.",
)
parser.add_argument(
"--install-only",
action="store_true",
help="Install models and exit without starting MCP server.",
)
parser.add_argument(
"--install-model",
action="append",
default=[],
help="Optional model folder name to install. Can be repeated.",
)
parser.add_argument(
"--base-url",
default=DEFAULT_TOS_BASE_URL,
help="Model mirror base URL.",
)
parser.add_argument(
"--include-lock",
action="store_true",
help="Also download .lock metadata files.",
)
parser.add_argument(
"--force",
action="store_true",
help="Redownload existing files even if they already exist.",
)
parser.add_argument(
"--dry-run",
action="store_true",
help="Print intended download actions without writing files.",
)
parser.add_argument(
"--retries",
type=int,
default=3,
help="Download retry attempts for each file.",
)
parser.add_argument(
"--speak-text",
default=None,
help="Generate speech and exit. Uses --voice and --speak-model.",
)
parser.add_argument(
"--voice",
default="Vivian",
help="Voice name for --speak-text (default: Vivian). "
"Ryan/Aiden/Ethan/Chelsie=English, Serena/Vivian=Chinese (native).",
)
parser.add_argument(
"--speak-model",
default="1",
help="Model key for --speak-text (default: 1 = 1.7B-CustomVoice).",
)
parser.add_argument(
"--instruct",
default=None,
help="Style instruction for --speak-text, e.g. 'speak in an excited tone'.",
)
parser.add_argument(
"--speak-speed",
type=float,
default=1.0,
help="Speech speed for --speak-text (default: 1.0).",
)
parser.add_argument(
"--lang-code",
default="auto",
help="Language code for --speak-text (default: auto).",
)
parser.add_argument(
"--speak-output-dir",
default=None,
help="Output directory for --speak-text when keep_file is true.",
)
parser.add_argument(
"--speak-keep-file",
action="store_true",
help="Keep generated audio file on disk for --speak-text.",
)
parser.add_argument(
"--speak-no-play",
action="store_true",
help="Disable playback for --speak-text.",
)
parser.add_argument(
"--list-uploaded-models",
action="store_true",
help="List uploaded model folders from manifest and exit.",
)
parser.add_argument(
"--uninstall",
action="store_true",
help="Uninstall MCP binary and model directory, then exit.",
)
parser.add_argument(
"--uninstall-binary-path",
default=None,
help="Path to binary to remove during uninstall.",
)
parser.add_argument(
"--uninstall-bin-dir",
default=None,
help="Directory containing the MCP binary during uninstall.",
)
parser.add_argument(
"--uninstall-binary-name",
default="qwen3-tts-mcp",
help="Binary name to remove when --uninstall-bin-dir is used.",
)
parser.add_argument(
"--uninstall-models-dir",
default=None,
help="Models directory to remove during uninstall.",
)
parser.add_argument(
"--uninstall-binary-only",
action="store_true",
help="Only remove MCP binary during uninstall.",
)
parser.add_argument(
"--uninstall-models-only",
action="store_true",
help="Only remove model directory during uninstall.",
)
return parser
def main():
args = _build_cli_parser().parse_args()
global MODELS_DIR
MODELS_DIR = args.models_dir
if args.uninstall:
binary_path = None
if args.uninstall_binary_path:
binary_path = args.uninstall_binary_path
elif args.uninstall_bin_dir:
binary_path = os.path.join(args.uninstall_bin_dir, args.uninstall_binary_name)
uninstall_binary = not args.uninstall_models_only
uninstall_models = not args.uninstall_binary_only
try:
print(
json.dumps(
_uninstall_mcp(
uninstall_binary=uninstall_binary,
uninstall_models=uninstall_models,
binary_path=binary_path,
models_dir=args.uninstall_models_dir or MODELS_DIR,
),
indent=2,
)
)
return
except Exception as exc:
print(
json.dumps(
{
"status": "error",
"reason": str(exc),
},
indent=2,
)
)
raise SystemExit(1)
return
if args.list_uploaded_models:
print(json.dumps(_build_model_list_result(MODELS_DIR, args.manifest), indent=2))
return
if args.install_models or args.install_only:
install_result = install_models(
models=args.install_model,
base_url=args.base_url,
manifest_path=args.manifest,
destination=args.install_destination or args.models_dir if not args.install_temp_dir else None,
use_temp_dir=args.install_temp_dir,
include_lock=args.include_lock,
force=args.force,
dry_run=args.dry_run,
retries=args.retries,
)
print(json.dumps(install_result, indent=2))
if install_result.get("status") == "error":
raise SystemExit(1)
if install_result.get("destination"):
MODELS_DIR = install_result["destination"]
if args.install_only:
return
if args.speak_text is not None:
try:
result = speak_text(
text=args.speak_text,
model=args.speak_model,
speaker=args.voice,
instruct=args.instruct,
speed=args.speak_speed,
lang_code=args.lang_code,
play=not args.speak_no_play,
keep_file=args.speak_keep_file,
output_dir=args.speak_output_dir,
)
print(json.dumps(result, indent=2))
if result.get("status") != "ok":
raise SystemExit(1)
return
except Exception as exc:
print(
json.dumps(
{
"status": "error",
"reason": str(exc),
},
indent=2,
)
)
raise SystemExit(1)
mcp.run()
if __name__ == "__main__":
import multiprocessing
multiprocessing.freeze_support()
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment