Files
stt-server/stt-server.py
mikael-lovqvists-claude-agent 9030b1315d Load HF_TOKEN from token file at startup (consistent with tts-server)
Reads ~/.secrets/hugging-face.token by default, overridable via HF_TOKEN_FILE.

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
2026-06-07 09:10:54 +00:00

278 lines
7.5 KiB
Python
Executable File

#!/usr/bin/env -S bash -c 'exec "$(dirname "$0")/venv/bin/python3" "$0" "$@"'
"""
STT server: records audio, runs Silero VAD, transcribes with faster-whisper.
Broadcasts JSON events to all connected WebSocket clients and to stdout.
Events:
{"event": "ready"}
{"event": "vad_start"}
{"event": "vad_end", "duration": 1.23}
{"event": "transcript", "text": "...", "words": [...], "duration": 1.23}
{"event": "error", "message": "..."}
word format: {"word": "hello", "start": 0.12, "end": 0.45, "probability": 0.99}
Every WebSocket connection receives the full event stream from the moment it
connects — no subscription handshake required.
Machine-readable events are sent over WebSocket only.
Pass --verbose to enable logging to stderr (startup, VAD events, transcripts).
Errors always go to stderr regardless of verbosity.
Environment:
STT_PORT WebSocket port (default: 11501)
Usage:
./stt-server.py
./stt-server.py --model large-v3 --device cuda --compute-type int8_float16 --verbose
"""
import os
import sys
import json
import signal
import argparse
import threading
import queue
import subprocess
import traceback
import asyncio
import websockets
import numpy as np
import torch
SAMPLE_RATE = 16000
VAD_WINDOW = 512 # samples per VAD chunk (32ms at 16kHz)
PRE_ROLL_SAMPLES = 3200 # 0.2s prepended to each segment for context
HISTORY_SAMPLES = 960000 # 60s ring buffer for pre-roll
PORT = int(__import__('os').environ.get('STT_PORT', 11501))
def log(msg, error=False):
if error or verbose:
sys.stderr.write(f'[stt] {msg}\n')
sys.stderr.flush()
# --- WebSocket broadcast ---
_ws_loop = None
_ws_clients = set() # set of asyncio.Queue, one per connection
def emit(event):
line = json.dumps(event)
if _ws_loop is not None:
for q in list(_ws_clients):
_ws_loop.call_soon_threadsafe(q.put_nowait, line)
async def ws_handler(websocket):
q = asyncio.Queue()
_ws_clients.add(q)
log(f'client connected ({len(_ws_clients)} total)')
try:
while True:
msg = await q.get()
await websocket.send(msg)
except websockets.ConnectionClosed:
pass
finally:
_ws_clients.discard(q)
log(f'client disconnected ({len(_ws_clients)} remaining)')
async def ws_main():
global _ws_loop
_ws_loop = asyncio.get_running_loop()
async with websockets.serve(ws_handler, '', PORT):
log(f'WebSocket listening on port {PORT}')
await asyncio.Future() # run forever
def start_ws_server():
asyncio.run(ws_main())
# --- Mic ---
def find_mic():
candidates = [
['parec', ['--format=s16le', '--rate=16000', '--channels=1', '--latency-msec=50']],
['arecord', ['-f', 'S16_LE', '-r', '16000', '-c', '1', '-t', 'raw', '-q']],
]
for cmd, args in candidates:
try:
subprocess.run(['which', cmd], check=True, capture_output=True)
return cmd, args
except subprocess.CalledProcessError:
pass
raise RuntimeError('no mic capture command found — need parec or arecord')
def s16le_to_f32(data):
return np.frombuffer(data, dtype=np.int16).astype(np.float32) / 32768.0
# --- Args + model loading ---
parser = argparse.ArgumentParser()
parser.add_argument('--model', default='base.en')
parser.add_argument('--device', default='cuda')
parser.add_argument('--compute-type', default='int8_float16')
parser.add_argument('--verbose', '-v', action='store_true')
args = parser.parse_args()
verbose = args.verbose
token_file = os.environ.get('HF_TOKEN_FILE', os.path.expanduser('~/.secrets/hugging-face.token'))
try:
with open(token_file) as f:
os.environ['HF_TOKEN'] = f.read().strip()
except FileNotFoundError:
pass
from faster_whisper import WhisperModel
from huggingface_hub import snapshot_download
try:
snapshot_download(f'Systran/faster-whisper-{args.model}', local_files_only=True)
except Exception:
log(f'downloading model {args.model}...')
log(f'loading faster-whisper {args.model} ({args.device}, {args.compute_type})...')
try:
model = WhisperModel(args.model, device=args.device, compute_type=args.compute_type)
log(f'model ready on {args.device}')
except Exception as e:
log(f'{args.device} failed ({e}), falling back to cpu', error=True)
model = WhisperModel(args.model, device='cpu', compute_type='int8')
log('model ready on cpu')
log('loading silero VAD...')
from silero_vad import load_silero_vad, VADIterator
vad_model = load_silero_vad()
vad = VADIterator(vad_model, sampling_rate=SAMPLE_RATE,
threshold=0.5, min_silence_duration_ms=500)
log('VAD ready')
# --- Pre-roll ring buffer ---
history = np.zeros(HISTORY_SAMPLES, dtype=np.float32)
history_pos = 0
def push_history(samples):
global history_pos
n = len(samples)
base = history_pos % HISTORY_SAMPLES
space = HISTORY_SAMPLES - base
if n <= space:
history[base:base + n] = samples
else:
history[base:] = samples[:space]
history[:n - space] = samples[space:]
history_pos += n
def get_preroll():
start = max(0, history_pos - PRE_ROLL_SAMPLES)
count = history_pos - start
out = np.empty(count, dtype=np.float32)
for i in range(count):
out[i] = history[(start + i) % HISTORY_SAMPLES]
return out
# --- Transcription thread ---
transcription_queue = queue.Queue()
def transcription_worker():
while True:
item = transcription_queue.get()
if item is None:
break
samples, duration = item
try:
segments, _ = model.transcribe(
samples,
language='en',
word_timestamps=True,
vad_filter=False,
)
text = ''
words = []
for seg in segments:
text += seg.text
for w in (seg.words or []):
words.append({
'word': w.word,
'start': round(float(w.start), 4),
'end': round(float(w.end), 4),
'probability': round(float(w.probability), 4),
})
log(f'transcript: {json.dumps(text.strip())} ({len(words)} words)')
if text.strip():
emit({'event': 'transcript', 'text': text.strip(), 'words': words, 'duration': round(duration, 3)})
except Exception:
msg = traceback.format_exc()
log(f'transcription error:\n{msg}', error=True)
emit({'event': 'error', 'message': msg})
finally:
transcription_queue.task_done()
threading.Thread(target=transcription_worker, daemon=True).start()
threading.Thread(target=start_ws_server, daemon=True).start()
# --- Main recording + VAD loop ---
cmd, cmd_args = find_mic()
log(f'mic: {cmd} {" ".join(cmd_args)}')
mic = subprocess.Popen([cmd] + cmd_args, stdout=subprocess.PIPE, stderr=subprocess.DEVNULL)
def shutdown(sig=None, frame=None):
mic.terminate()
transcription_queue.put(None)
sys.exit(0)
signal.signal(signal.SIGTERM, shutdown)
signal.signal(signal.SIGINT, shutdown)
emit({'event': 'ready'})
speech_samples = []
speech_start = None
pending = b''
for chunk in mic.stdout:
pending += chunk
while len(pending) >= VAD_WINDOW * 2:
raw = pending[:VAD_WINDOW * 2]
pending = pending[VAD_WINDOW * 2:]
f32 = s16le_to_f32(raw)
push_history(f32)
result = vad(torch.from_numpy(f32), return_seconds=True)
if result is not None:
if 'start' in result:
speech_start = result['start']
speech_samples = [get_preroll()]
log(f'VAD start at {speech_start:.2f}s')
emit({'event': 'vad_start'})
elif 'end' in result and speech_start is not None:
duration = result['end'] - speech_start
log(f'VAD end at {result["end"]:.2f}s (duration {duration:.2f}s)')
emit({'event': 'vad_end', 'duration': round(duration, 3)})
segment = np.concatenate(speech_samples)
transcription_queue.put((segment, duration))
speech_samples = []
speech_start = None
vad.reset_states()
if speech_start is not None:
speech_samples.append(f32)