|
#!/usr/bin/env python3 |
|
|
|
import argparse |
|
import json |
|
import tarfile |
|
import os |
|
import shutil |
|
import re |
|
import sys |
|
import tempfile |
|
import time |
|
import uuid |
|
import threading |
|
from typing import Dict, Literal, Optional, Sequence, Union |
|
from urllib.error import HTTPError, URLError |
|
from urllib.request import urlopen |
|
from mcp.server.fastmcp import FastMCP |
|
|
|
|
|
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) |
|
MODELS_DIR = os.path.expanduser("~/.local/share/qwen3-tts-models") |
|
OUTPUT_DIR = os.path.join(BASE_DIR, "outputs") |
|
DEFAULT_MANIFEST = "qwen3-tts-models.manifest" |
|
DEFAULT_TOS_BASE_URL = "https://tosv.boei18n.byted.org/obj/bytesec-homebrew-boei18n/qwen-tts-models" |
|
|
|
FILENAME_PREFIX = "mcp_audio" |
|
|
|
# Keep these in sync with main.py model options. |
|
MODELS = { |
|
"1": {"name": "Custom Voice", "folder": "Qwen3-TTS-12Hz-1.7B-CustomVoice-8bit", "mode": "custom", "repo_ids": ["mlx-community/Qwen3-TTS-12Hz-1.7B-CustomVoice-8bit"]}, |
|
"2": {"name": "Voice Design", "folder": "Qwen3-TTS-12Hz-1.7B-VoiceDesign-8bit", "mode": "design", "repo_ids": ["mlx-community/Qwen3-TTS-12Hz-1.7B-VoiceDesign-8bit"]}, |
|
"3": {"name": "Voice Cloning", "folder": "Qwen3-TTS-12Hz-1.7B-Base-8bit", "mode": "clone", "repo_ids": ["mlx-community/Qwen3-TTS-12Hz-1.7B-Base-8bit"]}, |
|
"4": {"name": "Custom Voice", "folder": "Qwen3-TTS-12Hz-0.6B-CustomVoice-8bit", "mode": "custom", "repo_ids": ["mlx-community/Qwen3-TTS-12Hz-0.6B-CustomVoice-8bit"]}, |
|
"5": {"name": "Voice Design", "folder": "Qwen3-TTS-12Hz-0.6B-VoiceDesign-8bit", "mode": "design", "repo_ids": ["mlx-community/Qwen3-TTS-12Hz-0.6B-VoiceDesign-6bit", "mlx-community/Qwen3-TTS-12Hz-0.6B-VoiceDesign-5bit", "mlx-community/Qwen3-TTS-12Hz-0.6B-VoiceDesign-4bit"]}, |
|
"6": {"name": "Voice Cloning", "folder": "Qwen3-TTS-12Hz-0.6B-Base-8bit", "mode": "clone", "repo_ids": ["mlx-community/Qwen3-TTS-12Hz-0.6B-Base-8bit"]}, |
|
} |
|
|
|
MODELS_ALIAS = { |
|
"1": "1", |
|
"2": "2", |
|
"3": "3", |
|
"4": "4", |
|
"5": "5", |
|
"6": "6", |
|
"q1": "1", |
|
"q2": "2", |
|
"q3": "3", |
|
"q4": "4", |
|
"q5": "5", |
|
"q6": "6", |
|
"17b-custom": "1", |
|
"17b-voice-design": "2", |
|
"17b-clone": "3", |
|
"06b-custom": "4", |
|
"06b-voice-design": "5", |
|
"06b-clone": "6", |
|
"06b-design": "5", |
|
"0.6b-custom": "4", |
|
"0.6b-design": "5", |
|
"0.6b-clone": "6", |
|
"1.7b-custom": "1", |
|
"1.7b-voice-design": "2", |
|
"1.7b-clone": "3", |
|
"pro-custom": "1", |
|
"pro-design": "2", |
|
"pro-clone": "3", |
|
"lite-custom": "4", |
|
"lite-design": "5", |
|
"lite-clone": "6", |
|
"custom": "4", |
|
"design": "5", |
|
"clone": "6", |
|
} |
|
|
|
DEFAULT_CUSTOM_SPEAKERS = ["Ryan", "Aiden", "Ethan", "Chelsie", "Serena", "Vivian"] |
|
SpeakerName = Literal[ |
|
"Ryan", |
|
"Aiden", |
|
"Ethan", |
|
"Chelsie", |
|
"Serena", |
|
"Vivian", |
|
] |
|
|
|
mcp = FastMCP("qwen3-tts") |
|
|
|
_model_cache: Dict[str, object] = {} |
|
_cache_lock = threading.Lock() |
|
_MLX_IMPORT_ERROR: Optional[str] = None |
|
|
|
|
|
def _runtime_base_dir() -> str: |
|
return os.path.abspath(getattr(sys, "_MEIPASS", BASE_DIR)) |
|
|
|
|
|
def _default_manifest_path(manifest_path: Optional[str] = None) -> str: |
|
if manifest_path: |
|
return manifest_path |
|
|
|
candidates = [ |
|
os.getenv("QWEN_TTS_MANIFEST"), |
|
os.path.join(_runtime_base_dir(), "scripts", DEFAULT_MANIFEST), |
|
os.path.join(BASE_DIR, "scripts", DEFAULT_MANIFEST), |
|
os.path.join(os.getcwd(), "scripts", DEFAULT_MANIFEST), |
|
os.path.join(_runtime_base_dir(), DEFAULT_MANIFEST), |
|
] |
|
for candidate in candidates: |
|
if candidate and os.path.isfile(candidate): |
|
return candidate |
|
|
|
return os.path.join(_runtime_base_dir(), "scripts", DEFAULT_MANIFEST) |
|
|
|
|
|
def _normalize_model_selection( |
|
selected: Optional[Union[str, Sequence[str]]] = None |
|
) -> list[str]: |
|
if selected is None: |
|
return [] |
|
if isinstance(selected, str): |
|
source = [selected] |
|
else: |
|
source = list(selected) |
|
normalized: list[str] = [] |
|
for item in source: |
|
if isinstance(item, str): |
|
normalized.extend(part.strip() for part in item.split(",") if part.strip()) |
|
else: |
|
normalized.append(str(item).strip()) |
|
return [name for name in normalized if name] |
|
|
|
|
|
def _read_manifest(path: str) -> list[str]: |
|
if not os.path.exists(path): |
|
raise FileNotFoundError(f"Manifest not found at '{path}'.") |
|
entries: list[str] = [] |
|
with open(path, "r", encoding="utf-8") as handle: |
|
for line in handle: |
|
rel = line.strip() |
|
if rel: |
|
entries.append(rel) |
|
if not entries: |
|
raise RuntimeError(f"Manifest is empty: '{path}'.") |
|
return entries |
|
|
|
|
|
def _uploaded_model_folders(manifest_path: Optional[str] = None) -> list[str]: |
|
manifest = _read_manifest(_default_manifest_path(manifest_path)) |
|
seen: set[str] = set() |
|
ordered: list[str] = [] |
|
for rel in manifest: |
|
folder = rel.split("/", 1)[0] |
|
if folder not in seen: |
|
ordered.append(folder) |
|
seen.add(folder) |
|
return ordered |
|
|
|
|
|
def _manifest_model_folders(manifest: list[str], include_lock: bool = False) -> list[str]: |
|
ordered: list[str] = [] |
|
seen: set[str] = set() |
|
for rel in manifest: |
|
if not include_lock and rel.endswith(".lock"): |
|
continue |
|
if rel.endswith(".tar.gz"): |
|
folder = rel[: -len(".tar.gz")] |
|
else: |
|
folder = rel.split("/", 1)[0] |
|
if not folder or folder in seen: |
|
continue |
|
ordered.append(folder) |
|
seen.add(folder) |
|
return ordered |
|
|
|
|
|
def _download_with_retries(url: str, destination: str, retries: int) -> bool: |
|
retries = max(1, retries) |
|
last_error: Optional[Exception] = None |
|
|
|
for attempt in range(1, retries + 1): |
|
try: |
|
with urlopen(url, timeout=60) as response: |
|
status = getattr(response, "status", 200) |
|
if status != 200: |
|
raise HTTPError(url, status, "Non-200 response", response.headers, None) |
|
os.makedirs(os.path.dirname(destination), exist_ok=True) |
|
with open(destination, "wb") as f: |
|
shutil.copyfileobj(response, f) |
|
return True |
|
except (HTTPError, URLError, OSError) as exc: |
|
last_error = exc |
|
if attempt >= retries: |
|
break |
|
time.sleep(2**(attempt - 1)) |
|
|
|
if last_error: |
|
raise RuntimeError(f"Download failed for '{url}': {last_error}") |
|
raise RuntimeError(f"Download failed for '{url}'.") |
|
|
|
|
|
def _install_models( |
|
models_dir: str, |
|
selected_models: Optional[Sequence[str]] = None, |
|
base_url: str = DEFAULT_TOS_BASE_URL, |
|
manifest_path: Optional[str] = None, |
|
include_lock: bool = False, |
|
force: bool = False, |
|
dry_run: bool = False, |
|
retries: int = 3, |
|
) -> dict: |
|
selected = _normalize_model_selection(selected_models) |
|
manifest = _read_manifest(_default_manifest_path(manifest_path)) |
|
model_folders = _manifest_model_folders(manifest, include_lock=include_lock) |
|
to_process: list[str] = [] |
|
|
|
if not selected: |
|
to_process = model_folders |
|
else: |
|
for model_name in model_folders: |
|
if model_name in selected: |
|
to_process.append(model_name) |
|
|
|
summary = { |
|
"status": "ok", |
|
"base_url": base_url.rstrip("/"), |
|
"destination": models_dir, |
|
"manifest": _default_manifest_path(manifest_path), |
|
"model_filter": selected, |
|
"include_lock": include_lock, |
|
"force": force, |
|
"dry_run": dry_run, |
|
"retries": max(1, retries), |
|
"requested": len(model_folders), |
|
"selected": len(to_process), |
|
"planned": [], |
|
"downloaded": [], |
|
"skipped": [], |
|
"failed": [], |
|
} |
|
|
|
for model_name in to_process: |
|
destination = os.path.join(models_dir, model_name) |
|
if os.path.exists(destination) and not force: |
|
summary["skipped"].append(model_name) |
|
continue |
|
if dry_run: |
|
summary["planned"].append(model_name) |
|
continue |
|
try: |
|
if force and os.path.isdir(destination): |
|
shutil.rmtree(destination) |
|
with tempfile.NamedTemporaryFile(suffix=".tar.gz", delete=False) as handle: |
|
tarball_path = handle.name |
|
try: |
|
_download_with_retries( |
|
f"{base_url.rstrip('/')}/{model_name}.tar.gz", |
|
tarball_path, |
|
retries, |
|
) |
|
with tarfile.open(tarball_path, "r:gz") as tar: |
|
tar.extractall(models_dir) |
|
summary["downloaded"].append(model_name) |
|
finally: |
|
if os.path.exists(tarball_path): |
|
os.remove(tarball_path) |
|
except Exception as exc: # pragma: no cover - external network path |
|
summary["failed"].append({"file": f"{model_name}.tar.gz", "reason": str(exc)}) |
|
|
|
summary["planned_count"] = len(summary["planned"]) |
|
summary["downloaded_count"] = len(summary["downloaded"]) |
|
summary["skipped_count"] = len(summary["skipped"]) |
|
summary["failed_count"] = len(summary["failed"]) |
|
if summary["failed_count"] > 0: |
|
summary["status"] = "error" |
|
return summary |
|
|
|
|
|
def _build_model_list_result(models_dir: Optional[str] = None, manifest_path: Optional[str] = None) -> dict: |
|
active_dir = models_dir or MODELS_DIR |
|
rows = [] |
|
for folder in _uploaded_model_folders(manifest_path): |
|
folder_path = os.path.join(active_dir, folder) |
|
rows.append( |
|
{ |
|
"folder": folder, |
|
"installed": os.path.isdir(folder_path), |
|
"path": folder_path, |
|
} |
|
) |
|
return { |
|
"status": "ok", |
|
"count": len(rows), |
|
"models_dir": active_dir, |
|
"models": rows, |
|
} |
|
|
|
|
|
def _uninstall_mcp( |
|
uninstall_binary: bool = True, |
|
uninstall_models: bool = True, |
|
binary_path: Optional[str] = None, |
|
models_dir: Optional[str] = None, |
|
) -> dict: |
|
removed_binary = False |
|
removed_models = False |
|
result = { |
|
"status": "ok", |
|
"binary": None, |
|
"models_dir": None, |
|
"binary_removed": False, |
|
"models_dir_removed": False, |
|
} |
|
|
|
if uninstall_binary: |
|
resolved_binary = binary_path or None |
|
if resolved_binary: |
|
binary_to_remove = resolved_binary |
|
else: |
|
binary_to_remove = os.path.realpath(sys.argv[0]) |
|
|
|
result["binary"] = binary_to_remove |
|
if os.path.exists(binary_to_remove) or os.path.islink(binary_to_remove): |
|
try: |
|
os.remove(binary_to_remove) |
|
removed_binary = True |
|
except OSError as exc: |
|
result["status"] = "error" |
|
result["reason"] = f"failed to remove binary '{binary_to_remove}': {exc}" |
|
raise RuntimeError(result["reason"]) |
|
|
|
if uninstall_models: |
|
effective_models_dir = models_dir or MODELS_DIR |
|
result["models_dir"] = effective_models_dir |
|
if os.path.isdir(effective_models_dir) or os.path.islink(effective_models_dir): |
|
try: |
|
shutil.rmtree(effective_models_dir) |
|
removed_models = True |
|
except OSError as exc: |
|
result["status"] = "error" |
|
result["reason"] = f"failed to remove models dir '{effective_models_dir}': {exc}" |
|
raise RuntimeError(result["reason"]) |
|
|
|
result["binary_removed"] = removed_binary |
|
result["models_dir_removed"] = removed_models |
|
return result |
|
|
|
|
|
def _clean_text(value: str) -> str: |
|
return re.sub(r"\s+", " ", value).strip() |
|
|
|
|
|
def _clean_path(raw_path: str) -> str: |
|
path = raw_path.strip() |
|
if len(path) > 1 and path[0] in ['"', "'"] and path[-1] == path[0]: |
|
path = path[1:-1] |
|
return path.replace("\\ ", " ") |
|
|
|
|
|
def _get_model_path(model_key: str) -> str: |
|
key = model_key.lower().strip() |
|
mapped = MODELS_ALIAS.get(key, key) |
|
if mapped not in MODELS: |
|
raise ValueError(f"Unknown model '{model_key}'. Use one of: 1-6.") |
|
|
|
folder = MODELS[mapped]["folder"] |
|
model_path = os.path.join(MODELS_DIR, folder) |
|
if not os.path.exists(model_path): |
|
raise FileNotFoundError( |
|
"Model '{}' is not installed. Install first using install_models(models=['{}'], destination='{}').".format( |
|
key, |
|
folder, |
|
MODELS_DIR, |
|
) |
|
) |
|
|
|
snapshots_dir = os.path.join(model_path, "snapshots") |
|
if os.path.exists(snapshots_dir): |
|
candidates = sorted([x for x in os.listdir(snapshots_dir) if not x.startswith(".")]) |
|
if candidates: |
|
return os.path.join(snapshots_dir, candidates[0]) |
|
|
|
return model_path |
|
|
|
|
|
def _resolve_model(model_key: str): |
|
global _MLX_IMPORT_ERROR |
|
if _MLX_IMPORT_ERROR: |
|
raise RuntimeError(_MLX_IMPORT_ERROR) |
|
|
|
try: |
|
from mlx_audio.tts.utils import load_model |
|
except Exception as exc: |
|
_MLX_IMPORT_ERROR = f"MLX runtime unavailable: {exc}" |
|
raise RuntimeError(_MLX_IMPORT_ERROR) from exc |
|
|
|
key = MODELS_ALIAS.get(model_key.lower().strip(), model_key.lower().strip()) |
|
with _cache_lock: |
|
cached = _model_cache.get(key) |
|
if cached is not None: |
|
return cached |
|
|
|
model_path = _get_model_path(key) |
|
model = load_model(model_path) |
|
_model_cache[key] = model |
|
return model |
|
|
|
|
|
def _expected_output_file(output_dir: str, prefix: str) -> str: |
|
return os.path.join(output_dir, f"{prefix}_000.wav") |
|
|
|
|
|
def _build_tool_response(text: str, model: str, saved_file: Optional[str], played: bool) -> dict: |
|
response = { |
|
"status": "ok", |
|
"model": model, |
|
"text_preview": text[:80], |
|
"played": played, |
|
} |
|
if saved_file: |
|
response["saved_file"] = saved_file |
|
return response |
|
|
|
|
|
@mcp.tool() |
|
def speak_text( |
|
text: str, |
|
model: str = "1", |
|
speaker: SpeakerName = "Vivian", |
|
instruct: Optional[str] = None, |
|
speed: float = 1.0, |
|
lang_code: str = "auto", |
|
play: bool = True, |
|
keep_file: bool = False, |
|
output_dir: Optional[str] = None, |
|
ref_audio: Optional[str] = None, |
|
ref_text: Optional[str] = None, |
|
) -> dict: |
|
""" |
|
Generate speech and optionally play it. |
|
|
|
model accepts: |
|
- numeric keys: 1-6 |
|
- aliases: "pro-custom", "lite-design", "1.7b-clone", etc. |
|
- default "1" = Qwen3-TTS-12Hz-1.7B-CustomVoice (best quality, supports instruct) |
|
|
|
speaker accepts (use native language for best quality): |
|
- Vivian (Chinese native - bright, slightly edgy young female) |
|
- Serena (Chinese native - warm, gentle young female) |
|
- Ryan (English native - dynamic male with strong rhythmic drive) |
|
- Aiden (English native - sunny American male with clear midrange) |
|
- Ethan (English) |
|
- Chelsie (English) |
|
|
|
instruct: natural language style control, e.g. "speak in an angry tone", "用特别愉快的语气说" |
|
- supported by CustomVoice (models 1, 4) and VoiceDesign (models 2, 5) |
|
|
|
lang_code accepts: auto, chinese, english, japanese, korean, german, french, russian, portuguese, spanish, italian |
|
""" |
|
text = _clean_text(text) |
|
if not text: |
|
return {"status": "error", "reason": "Text is empty."} |
|
|
|
try: |
|
resolved = MODELS_ALIAS.get(model.lower().strip(), model.lower().strip()) |
|
info = MODELS[resolved] |
|
tts_model = _resolve_model(resolved) |
|
model_mode = info["mode"] |
|
except KeyError: |
|
return {"status": "error", "reason": f"Unknown model '{model}'. Use 1-6 or aliases."} |
|
except FileNotFoundError as exc: |
|
return { |
|
"status": "error", |
|
"reason": "Model not installed. Install the model before speaking: " + str(exc), |
|
} |
|
except Exception as exc: # pragma: no cover - passthrough error path |
|
return {"status": "error", "reason": str(exc)} |
|
|
|
if model_mode == "custom": |
|
if speaker not in DEFAULT_CUSTOM_SPEAKERS: |
|
return { |
|
"status": "error", |
|
"reason": f"speaker '{speaker}' not in available custom voices.", |
|
} |
|
voice = speaker |
|
tone = instruct or "normal tone" |
|
kwargs = {"instruct": tone} |
|
elif model_mode == "design": |
|
voice = speaker |
|
tone = instruct or "a clear neutral voice" |
|
kwargs = {"instruct": tone} |
|
else: |
|
# voice cloning |
|
if speaker not in DEFAULT_CUSTOM_SPEAKERS: |
|
voice = "Vivian" |
|
else: |
|
voice = speaker |
|
if not ref_audio: |
|
return { |
|
"status": "error", |
|
"reason": "Clone mode requires ref_audio path.", |
|
} |
|
cleaned_ref_audio = _clean_path(ref_audio) |
|
if not os.path.exists(cleaned_ref_audio): |
|
return { |
|
"status": "error", |
|
"reason": f"Reference audio not found: '{cleaned_ref_audio}'.", |
|
} |
|
if not ref_text: |
|
return { |
|
"status": "error", |
|
"reason": "Clone mode requires ref_text transcript.", |
|
} |
|
kwargs = {"ref_audio": cleaned_ref_audio, "ref_text": ref_text} |
|
|
|
final_output_dir = output_dir or (OUTPUT_DIR if keep_file else None) |
|
saved_file: Optional[str] = None |
|
should_stream = final_output_dir is None |
|
|
|
if final_output_dir: |
|
os.makedirs(final_output_dir, exist_ok=True) |
|
file_prefix = f"{FILENAME_PREFIX}_{uuid.uuid4().hex[:8]}" |
|
generated_candidate = _expected_output_file(final_output_dir, file_prefix) |
|
else: |
|
generated_candidate = None |
|
file_prefix = FILENAME_PREFIX |
|
|
|
try: |
|
from mlx_audio.tts.generate import generate_audio |
|
from mlx_audio.tts.audio_player import AudioPlayer |
|
AudioPlayer.min_buffer_seconds = 4.0 |
|
|
|
_original_wait_for_drain = AudioPlayer.wait_for_drain |
|
def _patched_wait_for_drain(self): |
|
if not self.playing and self.buffered_samples() > 0: |
|
self.start_stream() |
|
return _original_wait_for_drain(self) |
|
AudioPlayer.wait_for_drain = _patched_wait_for_drain |
|
generate_audio( |
|
model=tts_model, |
|
text=text, |
|
voice=voice, |
|
speed=speed, |
|
lang_code=lang_code, |
|
verbose=False, |
|
play=bool(play), |
|
stream=should_stream, |
|
streaming_interval=5.0, |
|
output_path=final_output_dir, |
|
file_prefix=file_prefix or FILENAME_PREFIX, |
|
**kwargs, |
|
) |
|
except Exception as exc: |
|
return {"status": "error", "reason": str(exc)} |
|
|
|
if final_output_dir and generated_candidate and os.path.exists(generated_candidate): |
|
saved_file = generated_candidate |
|
elif final_output_dir: |
|
# Fallback: pick newest matching file if numbering differs. |
|
matching = sorted( |
|
[ |
|
os.path.join(final_output_dir, p) |
|
for p in os.listdir(final_output_dir) |
|
if p.startswith(file_prefix) and p.endswith(".wav") |
|
] |
|
) |
|
if matching: |
|
saved_file = matching[-1] |
|
|
|
return _build_tool_response(text, model=resolved, saved_file=saved_file, played=play) |
|
|
|
|
|
@mcp.tool() |
|
def speak_texts( |
|
texts: list[str], |
|
model: str = "1", |
|
speakers: Optional[list[str]] = None, |
|
instructs: Optional[list[str]] = None, |
|
lang_code: str = "auto", |
|
play: bool = True, |
|
keep_file: bool = False, |
|
output_dir: Optional[str] = None, |
|
) -> dict: |
|
""" |
|
Generate and play speech for an array of sentences, one by one. |
|
Each sentence is synthesized and played sequentially (streaming sentence-by-sentence). |
|
|
|
texts: list of sentences to speak. |
|
speakers: optional list of speaker names per sentence (cycles last value if shorter than texts). |
|
instructs: optional list of instructions per sentence (cycles last value if shorter). |
|
lang_code: language hint applied to all sentences (default: auto). |
|
""" |
|
if not texts: |
|
return {"status": "error", "reason": "texts list is empty."} |
|
|
|
results = [] |
|
for i, text in enumerate(texts): |
|
speaker = (speakers[i] if speakers and i < len(speakers) else |
|
(speakers[-1] if speakers else "Vivian")) |
|
instruct = (instructs[i] if instructs and i < len(instructs) else |
|
(instructs[-1] if instructs else None)) |
|
result = speak_text( |
|
text=text, |
|
model=model, |
|
speaker=speaker, |
|
instruct=instruct, |
|
lang_code=lang_code, |
|
play=play, |
|
keep_file=keep_file, |
|
output_dir=output_dir, |
|
) |
|
results.append({"index": i, "text_preview": text[:40], **result}) |
|
if result.get("status") != "ok": |
|
break |
|
|
|
return {"status": "ok", "count": len(results), "results": results} |
|
|
|
|
|
@mcp.tool() |
|
def install_models( |
|
models: Optional[Union[str, Sequence[str]]] = None, |
|
base_url: str = DEFAULT_TOS_BASE_URL, |
|
manifest_path: Optional[str] = None, |
|
destination: Optional[str] = None, |
|
use_temp_dir: bool = False, |
|
include_lock: bool = False, |
|
force: bool = False, |
|
dry_run: bool = False, |
|
retries: int = 3, |
|
) -> dict: |
|
""" |
|
Download model artifacts from the private model mirror. |
|
|
|
models accepts one or more model folder names, e.g. |
|
["Qwen3-TTS-12Hz-0.6B-Base-8bit"] or "Qwen3...". |
|
""" |
|
try: |
|
effective_destination = ( |
|
destination if destination is not None else None |
|
) |
|
if not effective_destination and use_temp_dir: |
|
effective_destination = tempfile.mkdtemp(prefix="qwen3-tts-models-") |
|
return _install_models( |
|
models_dir=effective_destination or MODELS_DIR, |
|
selected_models=models, |
|
base_url=base_url, |
|
manifest_path=manifest_path, |
|
include_lock=include_lock, |
|
force=force, |
|
dry_run=dry_run, |
|
retries=retries, |
|
) |
|
except Exception as exc: |
|
return {"status": "error", "reason": str(exc)} |
|
|
|
|
|
@mcp.tool() |
|
def list_uploaded_models( |
|
models_dir: Optional[str] = None, |
|
manifest_path: Optional[str] = None, |
|
) -> dict: |
|
""" |
|
List model folders from embedded manifest and report whether each one is installed locally. |
|
""" |
|
try: |
|
return _build_model_list_result(models_dir or MODELS_DIR, manifest_path) |
|
except Exception as exc: |
|
return {"status": "error", "reason": str(exc)} |
|
|
|
|
|
def _build_cli_parser(): |
|
parser = argparse.ArgumentParser(description="Run Qwen3-TTS MCP server.") |
|
parser.add_argument( |
|
"--models-dir", |
|
default=MODELS_DIR, |
|
help="Path to local model directory (default: ~/.local/share/qwen3-tts-models).", |
|
) |
|
parser.add_argument( |
|
"--install-destination", |
|
default=None, |
|
help="Optional destination for --install-models or --install-only.", |
|
) |
|
parser.add_argument( |
|
"--install-temp-dir", |
|
action="store_true", |
|
help="Download requested models into a new temporary directory.", |
|
) |
|
parser.add_argument( |
|
"--manifest", |
|
default=None, |
|
help="Optional path to qwen3-tts-models manifest.", |
|
) |
|
parser.add_argument( |
|
"--install-models", |
|
action="store_true", |
|
help="Download models from model mirror before starting server.", |
|
) |
|
parser.add_argument( |
|
"--install-only", |
|
action="store_true", |
|
help="Install models and exit without starting MCP server.", |
|
) |
|
parser.add_argument( |
|
"--install-model", |
|
action="append", |
|
default=[], |
|
help="Optional model folder name to install. Can be repeated.", |
|
) |
|
parser.add_argument( |
|
"--base-url", |
|
default=DEFAULT_TOS_BASE_URL, |
|
help="Model mirror base URL.", |
|
) |
|
parser.add_argument( |
|
"--include-lock", |
|
action="store_true", |
|
help="Also download .lock metadata files.", |
|
) |
|
parser.add_argument( |
|
"--force", |
|
action="store_true", |
|
help="Redownload existing files even if they already exist.", |
|
) |
|
parser.add_argument( |
|
"--dry-run", |
|
action="store_true", |
|
help="Print intended download actions without writing files.", |
|
) |
|
parser.add_argument( |
|
"--retries", |
|
type=int, |
|
default=3, |
|
help="Download retry attempts for each file.", |
|
) |
|
parser.add_argument( |
|
"--speak-text", |
|
default=None, |
|
help="Generate speech and exit. Uses --voice and --speak-model.", |
|
) |
|
parser.add_argument( |
|
"--voice", |
|
default="Vivian", |
|
help="Voice name for --speak-text (default: Vivian). " |
|
"Ryan/Aiden/Ethan/Chelsie=English, Serena/Vivian=Chinese (native).", |
|
) |
|
parser.add_argument( |
|
"--speak-model", |
|
default="1", |
|
help="Model key for --speak-text (default: 1 = 1.7B-CustomVoice).", |
|
) |
|
parser.add_argument( |
|
"--instruct", |
|
default=None, |
|
help="Style instruction for --speak-text, e.g. 'speak in an excited tone'.", |
|
) |
|
parser.add_argument( |
|
"--speak-speed", |
|
type=float, |
|
default=1.0, |
|
help="Speech speed for --speak-text (default: 1.0).", |
|
) |
|
parser.add_argument( |
|
"--lang-code", |
|
default="auto", |
|
help="Language code for --speak-text (default: auto).", |
|
) |
|
parser.add_argument( |
|
"--speak-output-dir", |
|
default=None, |
|
help="Output directory for --speak-text when keep_file is true.", |
|
) |
|
parser.add_argument( |
|
"--speak-keep-file", |
|
action="store_true", |
|
help="Keep generated audio file on disk for --speak-text.", |
|
) |
|
parser.add_argument( |
|
"--speak-no-play", |
|
action="store_true", |
|
help="Disable playback for --speak-text.", |
|
) |
|
parser.add_argument( |
|
"--list-uploaded-models", |
|
action="store_true", |
|
help="List uploaded model folders from manifest and exit.", |
|
) |
|
parser.add_argument( |
|
"--uninstall", |
|
action="store_true", |
|
help="Uninstall MCP binary and model directory, then exit.", |
|
) |
|
parser.add_argument( |
|
"--uninstall-binary-path", |
|
default=None, |
|
help="Path to binary to remove during uninstall.", |
|
) |
|
parser.add_argument( |
|
"--uninstall-bin-dir", |
|
default=None, |
|
help="Directory containing the MCP binary during uninstall.", |
|
) |
|
parser.add_argument( |
|
"--uninstall-binary-name", |
|
default="qwen3-tts-mcp", |
|
help="Binary name to remove when --uninstall-bin-dir is used.", |
|
) |
|
parser.add_argument( |
|
"--uninstall-models-dir", |
|
default=None, |
|
help="Models directory to remove during uninstall.", |
|
) |
|
parser.add_argument( |
|
"--uninstall-binary-only", |
|
action="store_true", |
|
help="Only remove MCP binary during uninstall.", |
|
) |
|
parser.add_argument( |
|
"--uninstall-models-only", |
|
action="store_true", |
|
help="Only remove model directory during uninstall.", |
|
) |
|
return parser |
|
|
|
|
|
def main(): |
|
args = _build_cli_parser().parse_args() |
|
global MODELS_DIR |
|
MODELS_DIR = args.models_dir |
|
if args.uninstall: |
|
binary_path = None |
|
if args.uninstall_binary_path: |
|
binary_path = args.uninstall_binary_path |
|
elif args.uninstall_bin_dir: |
|
binary_path = os.path.join(args.uninstall_bin_dir, args.uninstall_binary_name) |
|
|
|
uninstall_binary = not args.uninstall_models_only |
|
uninstall_models = not args.uninstall_binary_only |
|
try: |
|
print( |
|
json.dumps( |
|
_uninstall_mcp( |
|
uninstall_binary=uninstall_binary, |
|
uninstall_models=uninstall_models, |
|
binary_path=binary_path, |
|
models_dir=args.uninstall_models_dir or MODELS_DIR, |
|
), |
|
indent=2, |
|
) |
|
) |
|
return |
|
except Exception as exc: |
|
print( |
|
json.dumps( |
|
{ |
|
"status": "error", |
|
"reason": str(exc), |
|
}, |
|
indent=2, |
|
) |
|
) |
|
raise SystemExit(1) |
|
return |
|
if args.list_uploaded_models: |
|
print(json.dumps(_build_model_list_result(MODELS_DIR, args.manifest), indent=2)) |
|
return |
|
if args.install_models or args.install_only: |
|
install_result = install_models( |
|
models=args.install_model, |
|
base_url=args.base_url, |
|
manifest_path=args.manifest, |
|
destination=args.install_destination or args.models_dir if not args.install_temp_dir else None, |
|
use_temp_dir=args.install_temp_dir, |
|
include_lock=args.include_lock, |
|
force=args.force, |
|
dry_run=args.dry_run, |
|
retries=args.retries, |
|
) |
|
print(json.dumps(install_result, indent=2)) |
|
if install_result.get("status") == "error": |
|
raise SystemExit(1) |
|
if install_result.get("destination"): |
|
MODELS_DIR = install_result["destination"] |
|
if args.install_only: |
|
return |
|
if args.speak_text is not None: |
|
try: |
|
result = speak_text( |
|
text=args.speak_text, |
|
model=args.speak_model, |
|
speaker=args.voice, |
|
instruct=args.instruct, |
|
speed=args.speak_speed, |
|
lang_code=args.lang_code, |
|
play=not args.speak_no_play, |
|
keep_file=args.speak_keep_file, |
|
output_dir=args.speak_output_dir, |
|
) |
|
print(json.dumps(result, indent=2)) |
|
if result.get("status") != "ok": |
|
raise SystemExit(1) |
|
return |
|
except Exception as exc: |
|
print( |
|
json.dumps( |
|
{ |
|
"status": "error", |
|
"reason": str(exc), |
|
}, |
|
indent=2, |
|
) |
|
) |
|
raise SystemExit(1) |
|
mcp.run() |
|
|
|
|
|
if __name__ == "__main__": |
|
import multiprocessing |
|
multiprocessing.freeze_support() |
|
main() |