#!/usr/bin/env -S bash -c 'exec "$(dirname "$0")/venv/bin/python3" "$0" "$@"' """ STT server: records audio, runs Silero VAD, transcribes with faster-whisper. Broadcasts JSON events to all connected WebSocket clients and to stdout. Events: {"event": "ready"} {"event": "vad_start"} {"event": "vad_end", "duration": 1.23} {"event": "transcript", "text": "...", "words": [...], "duration": 1.23} {"event": "error", "message": "..."} word format: {"word": "hello", "start": 0.12, "end": 0.45, "probability": 0.99} Every WebSocket connection receives the full event stream from the moment it connects — no subscription handshake required. Machine-readable events are sent over WebSocket only. Pass --verbose to enable logging to stderr (startup, VAD events, transcripts). Errors always go to stderr regardless of verbosity. Environment: STT_PORT WebSocket port (default: 11501) Usage: ./stt-server.py ./stt-server.py --model large-v3 --device cuda --compute-type int8_float16 --verbose """ import os import sys import json import signal import argparse import threading import queue import subprocess import traceback import asyncio import websockets import numpy as np import torch SAMPLE_RATE = 16000 VAD_WINDOW = 512 # samples per VAD chunk (32ms at 16kHz) PRE_ROLL_SAMPLES = 3200 # 0.2s prepended to each segment for context HISTORY_SAMPLES = 960000 # 60s ring buffer for pre-roll PORT = int(__import__('os').environ.get('STT_PORT', 11501)) def log(msg, error=False): if error or verbose: sys.stderr.write(f'[stt] {msg}\n') sys.stderr.flush() # --- WebSocket broadcast --- _ws_loop = None _ws_clients = set() # set of asyncio.Queue, one per connection def emit(event): line = json.dumps(event) if _ws_loop is not None: for q in list(_ws_clients): _ws_loop.call_soon_threadsafe(q.put_nowait, line) async def ws_handler(websocket): q = asyncio.Queue() _ws_clients.add(q) log(f'client connected ({len(_ws_clients)} total)') try: while True: msg = await q.get() await websocket.send(msg) except websockets.ConnectionClosed: pass finally: _ws_clients.discard(q) log(f'client disconnected ({len(_ws_clients)} remaining)') async def ws_main(): global _ws_loop _ws_loop = asyncio.get_running_loop() async with websockets.serve(ws_handler, '', PORT): log(f'WebSocket listening on port {PORT}') await asyncio.Future() # run forever def start_ws_server(): asyncio.run(ws_main()) # --- Mic --- def find_mic(): candidates = [ ['parec', ['--format=s16le', '--rate=16000', '--channels=1', '--latency-msec=50']], ['arecord', ['-f', 'S16_LE', '-r', '16000', '-c', '1', '-t', 'raw', '-q']], ] for cmd, args in candidates: try: subprocess.run(['which', cmd], check=True, capture_output=True) return cmd, args except subprocess.CalledProcessError: pass raise RuntimeError('no mic capture command found — need parec or arecord') def s16le_to_f32(data): return np.frombuffer(data, dtype=np.int16).astype(np.float32) / 32768.0 # --- Args + model loading --- parser = argparse.ArgumentParser() parser.add_argument('--model', default='base.en') parser.add_argument('--device', default='cuda') parser.add_argument('--compute-type', default='int8_float16') parser.add_argument('--language', default=None, help='language code (e.g. en, sv) or None for auto-detect') parser.add_argument('--task', default='transcribe', choices=['transcribe', 'translate'], help='transcribe keeps the source language; translate converts to English') parser.add_argument('--verbose', '-v', action='store_true') args = parser.parse_args() verbose = args.verbose token_file = os.environ.get('HF_TOKEN_FILE', os.path.expanduser('~/.secrets/hugging-face.token')) try: with open(token_file) as f: os.environ['HF_TOKEN'] = f.read().strip() except FileNotFoundError: pass from faster_whisper import WhisperModel from huggingface_hub import snapshot_download try: snapshot_download(f'Systran/faster-whisper-{args.model}', local_files_only=True) except Exception: log(f'downloading model {args.model}...') log(f'loading faster-whisper {args.model} ({args.device}, {args.compute_type})...') try: model = WhisperModel(args.model, device=args.device, compute_type=args.compute_type) log(f'model ready on {args.device}') except Exception as e: log(f'{args.device} failed ({e}), falling back to cpu', error=True) model = WhisperModel(args.model, device='cpu', compute_type='int8') log('model ready on cpu') log('loading silero VAD...') from silero_vad import load_silero_vad, VADIterator vad_model = load_silero_vad() vad = VADIterator(vad_model, sampling_rate=SAMPLE_RATE, threshold=0.5, min_silence_duration_ms=500) log('VAD ready') # --- Pre-roll ring buffer --- history = np.zeros(HISTORY_SAMPLES, dtype=np.float32) history_pos = 0 def push_history(samples): global history_pos n = len(samples) base = history_pos % HISTORY_SAMPLES space = HISTORY_SAMPLES - base if n <= space: history[base:base + n] = samples else: history[base:] = samples[:space] history[:n - space] = samples[space:] history_pos += n def get_preroll(): start = max(0, history_pos - PRE_ROLL_SAMPLES) count = history_pos - start out = np.empty(count, dtype=np.float32) for i in range(count): out[i] = history[(start + i) % HISTORY_SAMPLES] return out # --- Transcription thread --- transcription_queue = queue.Queue() def transcription_worker(): while True: item = transcription_queue.get() if item is None: break samples, duration = item try: segments, _ = model.transcribe( samples, language=args.language, task=args.task, word_timestamps=True, vad_filter=False, ) text = '' words = [] for seg in segments: text += seg.text for w in (seg.words or []): words.append({ 'word': w.word, 'start': round(float(w.start), 4), 'end': round(float(w.end), 4), 'probability': round(float(w.probability), 4), }) log(f'transcript: {json.dumps(text.strip())} ({len(words)} words)') if text.strip(): emit({'event': 'transcript', 'text': text.strip(), 'words': words, 'duration': round(duration, 3)}) except Exception: msg = traceback.format_exc() log(f'transcription error:\n{msg}', error=True) emit({'event': 'error', 'message': msg}) finally: transcription_queue.task_done() threading.Thread(target=transcription_worker, daemon=True).start() threading.Thread(target=start_ws_server, daemon=True).start() # --- Main recording + VAD loop --- cmd, cmd_args = find_mic() log(f'mic: {cmd} {" ".join(cmd_args)}') mic = subprocess.Popen([cmd] + cmd_args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL) def shutdown(sig=None, frame=None): mic.terminate() transcription_queue.put(None) sys.exit(0) signal.signal(signal.SIGTERM, shutdown) signal.signal(signal.SIGINT, shutdown) emit({'event': 'ready'}) speech_samples = [] speech_start = None pending = b'' for chunk in mic.stdout: pending += chunk while len(pending) >= VAD_WINDOW * 2: raw = pending[:VAD_WINDOW * 2] pending = pending[VAD_WINDOW * 2:] f32 = s16le_to_f32(raw) push_history(f32) result = vad(torch.from_numpy(f32), return_seconds=True) if result is not None: if 'start' in result: speech_start = result['start'] speech_samples = [get_preroll()] log(f'VAD start at {speech_start:.2f}s') emit({'event': 'vad_start'}) elif 'end' in result and speech_start is not None: duration = result['end'] - speech_start log(f'VAD end at {result["end"]:.2f}s (duration {duration:.2f}s)') emit({'event': 'vad_end', 'duration': round(duration, 3)}) segment = np.concatenate(speech_samples) transcription_queue.put((segment, duration)) speech_samples = [] speech_start = None vad.reset_states() if speech_start is not None: speech_samples.append(f32)