import hashlib
import json
import os
import pathlib
import time
import urllib.request
import zipfile
from threading import Event
from typing import Optional
from platypush.common.assistant import AudioRecorder
from platypush.config import Config
from platypush.plugins import RunnablePlugin, action
from platypush.plugins.assistant import AssistantPlugin
_VOSK_MODEL_LIST_URL = 'https://alphacephei.com/vosk/models/model-list.json'
# pylint: disable=too-many-ancestors
[docs]
class AssistantVoskPlugin(AssistantPlugin, RunnablePlugin):
"""
A voice assistant based on the `Vosk <https://alphacephei.com/vosk/>`_
offline speech recognition engine.
Vosk is a lightweight, offline speech recognition toolkit that supports
multiple languages and runs on various platforms including Raspberry Pi.
Setup
-----
1. Install the plugin dependencies (``pip install vosk sounddevice``).
2. Either set the ``lang`` parameter (e.g. ``en``, ``en-us``, ``it``,
``de``) and the plugin will automatically download the best matching
small model, or manually download a Vosk model from the `Vosk models
page <https://alphacephei.com/vosk/models>`_ and provide its path via
``model_path``.
Models are stored by default under
``<PLATYPUSH_WORKDIR>/assistant.vosk/models``.
Hotword detection
-----------------
This plugin does not include built-in hotword detection. You can pair it
with a hotword detection plugin such as
:class:`platypush.plugins.assistant.picovoice.AssistantPicovoicePlugin`
(with ``stt_enabled: false``) or
:class:`platypush.plugins.assistant.openwakeword.AssistantOpenwakewordPlugin`.
Example configuration with OpenWakeWord for hotword detection:
.. code-block:: yaml
assistant.openwakeword:
models:
- hey_jarvis
assistant.vosk:
lang: en # auto-downloads a small en-us model
# or: model_path: /path/to/vosk-model-en-us-0.22
Then trigger the conversation on hotword detection:
.. code-block:: python
from platypush import run, when
from platypush.message.event.assistant import HotwordDetectedEvent
@when(HotwordDetectedEvent)
def on_hotword_detected():
run("assistant.vosk.start_conversation")
Speech recognition
------------------
When a conversation is started (either programmatically via
:meth:`.start_conversation` or after a hotword is detected), the plugin
records audio from the microphone and processes it through Vosk in
real-time. When speech is recognized, a
:class:`platypush.message.event.assistant.SpeechRecognizedEvent` is fired.
You can hook into recognized speech:
.. code-block:: python
from platypush import when, run
from platypush.message.event.assistant import SpeechRecognizedEvent
@when(SpeechRecognizedEvent, phrase='turn on (the)? lights?')
def on_turn_on_lights(event: SpeechRecognizedEvent, **context):
run("light.hue.on")
"""
[docs]
def __init__(
self,
model_path: Optional[str] = None,
*,
lang: Optional[str] = None,
models_directory: Optional[str] = None,
sample_rate: int = 16000,
frame_size: int = 4000,
channels: int = 1,
conversation_start_timeout: float = 5.0,
conversation_end_timeout: float = 1.5,
conversation_max_duration: float = 15.0,
words: bool = False,
**kwargs,
):
"""
:param model_path: Path to the Vosk model directory. You can download
models from `<https://alphacephei.com/vosk/models>`_. Either
``model_path`` or ``lang`` must be specified.
:param lang: Language code (e.g. ``en``, ``en-us``, ``it``, ``de``,
``fr``). If specified and ``model_path`` is not set, the plugin
will automatically download the best matching small model from
the Vosk model repository. Generic codes like ``en`` will match
the most common regional variant (e.g. ``en-us``).
:param models_directory: Directory where downloaded models are stored.
Default: ``<PLATYPUSH_WORKDIR>/assistant.vosk/models``.
:param sample_rate: Audio sample rate in Hz (default: 16000). Most
Vosk models expect 16 kHz audio.
:param frame_size: Number of samples per audio frame (default: 4000).
With the default sample rate of 16000, this corresponds to 250 ms
per frame.
:param channels: Number of audio channels (default: 1). Vosk requires
mono audio.
:param conversation_start_timeout: Seconds to wait for speech after
starting a conversation before timing out (default: 5.0).
:param conversation_end_timeout: Seconds of silence after the last
detected speech before ending the conversation (default: 1.5).
:param conversation_max_duration: Maximum conversation duration in
seconds (default: 15.0).
:param words: If True, include per-word timing and confidence
information in the recognition results (default: False).
"""
super().__init__(**kwargs)
assert model_path or lang, "Either 'model_path' or 'lang' must be specified"
self._model_path = os.path.expanduser(model_path) if model_path else None
self._lang = lang
self._models_directory = os.path.expanduser(
models_directory
or os.path.join(Config.get_workdir(), 'assistant.vosk', 'models')
)
self._sample_rate = sample_rate
self._frame_size = frame_size
self._channels = channels
self._conversation_start_timeout = conversation_start_timeout
self._conversation_end_timeout = conversation_end_timeout
self._conversation_max_duration = conversation_max_duration
self._words = words
self._model = None
self._start_recording_event = Event()
self._recorder: Optional[AudioRecorder] = None
def _resolve_model_path(self) -> str:
"""
Resolve the model path: if ``model_path`` is set, use it directly;
otherwise fetch the Vosk model list and download the best matching
small model for the configured language.
"""
if self._model_path:
return self._model_path
assert self._lang, "Either 'model_path' or 'lang' must be specified"
lang = self._lang.lower().strip()
# Check if a model for this language is already downloaded
existing = self._find_local_model(lang)
if existing:
self.logger.info(
'Using existing Vosk model: %s', os.path.basename(existing)
)
return existing
self.logger.info(
'Fetching Vosk model list to find a model for language: %s', lang
)
model_list = self._fetch_model_list()
best = self._pick_best_model(lang, model_list)
assert best, (
f"No Vosk model found for language '{lang}'. "
f"Check available languages at {_VOSK_MODEL_LIST_URL}"
)
return self._download_model(best)
def _fetch_model_list(self) -> list:
"""Fetch the model list JSON from the Vosk website."""
with urllib.request.urlopen(_VOSK_MODEL_LIST_URL, timeout=30) as resp:
return json.loads(resp.read().decode('utf-8'))
def _pick_best_model(self, lang: str, model_list: list) -> Optional[dict]:
"""
Find the best matching model for the given language code.
Matching rules (in priority order):
1. Exact match on the ``lang`` field.
2. Prefix match: e.g. ``en`` matches ``en-us``, ``en-in``, ``en-gb``.
Among matches, non-obsolete models are preferred. Then "small" models
are preferred over "big" ones, and among models of the same type the
smallest by download size is chosen.
"""
exact = [m for m in model_list if m.get('lang', '').lower() == lang]
prefix = [
m for m in model_list if m.get('lang', '').lower().startswith(lang + '-')
]
candidates = exact or prefix
if not candidates:
return None
# Filter out obsolete models if there are non-obsolete alternatives
non_obsolete = [m for m in candidates if m.get('obsolete') != 'true']
if non_obsolete:
candidates = non_obsolete
# Prefer small models
small = [m for m in candidates if m.get('type') == 'small']
if small:
candidates = small
# Pick the smallest by size
candidates.sort(key=lambda m: m.get('size', float('inf')))
return candidates[0]
def _find_local_model(self, lang: str) -> Optional[str]:
"""
Look for an already-downloaded model matching the language in the
models directory.
"""
if not os.path.isdir(self._models_directory):
return None
# Prefer small models; sort alphabetically so the latest version wins
matches = []
for entry in sorted(os.listdir(self._models_directory)):
entry_lower = entry.lower()
if not entry_lower.startswith('vosk-model'):
continue
# Check language match: exact (e.g. vosk-model-small-fr-0.22)
# or prefix (e.g. vosk-model-small-en-us-0.15 for lang="en")
# Strip the "vosk-model-" or "vosk-model-small-" prefix to get
# the lang+version portion.
rest = entry_lower.removeprefix('vosk-model-')
is_small = rest.startswith('small-')
if is_small:
rest = rest.removeprefix('small-')
if rest == lang or rest.startswith(lang + '-'):
full_path = os.path.join(self._models_directory, entry)
if os.path.isdir(full_path):
matches.append((is_small, entry, full_path))
if not matches:
return None
# Prefer small models, then latest by name
matches.sort(key=lambda m: (m[0], m[1]), reverse=True)
return matches[0][2]
def _download_model(self, model_info: dict) -> str:
"""
Download and extract a Vosk model zip into the models directory.
Returns the path to the extracted model directory.
"""
name = model_info['name']
url = model_info['url']
expected_md5 = model_info.get('md5')
dest_dir = os.path.join(self._models_directory, name)
if os.path.isdir(dest_dir):
self.logger.info('Model %s already exists at %s', name, dest_dir)
return dest_dir
pathlib.Path(self._models_directory).mkdir(parents=True, exist_ok=True)
zip_path = os.path.join(self._models_directory, f'{name}.zip')
self.logger.info(
'Downloading Vosk model %s (%s) ...',
name,
model_info.get('size_text', 'unknown size'),
)
try:
urllib.request.urlretrieve(url, zip_path)
if expected_md5:
actual_md5 = self._md5(zip_path)
assert actual_md5 == expected_md5, (
f'MD5 mismatch for {name}: '
f'expected {expected_md5}, got {actual_md5}'
)
self.logger.info('Extracting %s ...', name)
with zipfile.ZipFile(zip_path, 'r') as zf:
zf.extractall(self._models_directory)
assert os.path.isdir(
dest_dir
), f'Expected model directory {dest_dir} not found after extraction'
finally:
if os.path.isfile(zip_path):
os.remove(zip_path)
self.logger.info('Vosk model %s ready at %s', name, dest_dir)
return dest_dir
@staticmethod
def _md5(path: str) -> str:
"""Compute MD5 hex digest of a file."""
h = hashlib.md5()
with open(path, 'rb') as f:
for chunk in iter(lambda: f.read(1 << 20), b''):
h.update(chunk)
return h.hexdigest()
def _load_model(self):
from vosk import Model, SetLogLevel
SetLogLevel(-1)
if self._model is not None:
return
model_path = self._resolve_model_path()
self.logger.info('Loading Vosk model from: %s', model_path)
self._model = Model(model_path=model_path)
self.logger.info('Vosk model loaded')
def _create_recognizer(self):
from vosk import KaldiRecognizer
rec = KaldiRecognizer(self._model, self._sample_rate)
if self._words:
rec.SetWords(True)
return rec
def _wait_recording_start(self):
while not self.should_stop():
if self._start_recording_event.wait(timeout=1.0):
self._start_recording_event.clear()
return True
return False
def _capture_and_recognize(self):
"""
Record audio and perform streaming recognition using Vosk.
"""
rec = self._create_recognizer()
conversation_start = time.time()
last_speech_time = None
speech_detected = False
result = {}
try:
with AudioRecorder(
stop_event=self._should_stop,
sample_rate=self._sample_rate,
frame_size=self._frame_size,
channels=self._channels,
) as recorder:
self._recorder = recorder
while not self.should_stop() and not recorder.should_stop():
elapsed = time.time() - conversation_start
# Check max duration
if elapsed >= self._conversation_max_duration:
self.logger.debug('Conversation max duration reached')
break
# Check start timeout (no speech detected yet)
if (
not speech_detected
and elapsed >= self._conversation_start_timeout
):
self.logger.debug('Conversation start timeout reached')
break
# Check end timeout (silence after speech)
if (
speech_detected
and last_speech_time
and (time.time() - last_speech_time)
>= self._conversation_end_timeout
):
self.logger.debug('Conversation end timeout (silence) reached')
break
audio_data = recorder.read(timeout=0.5)
if not audio_data or not len( # pylint: disable=C1802
audio_data.data
):
continue
# Vosk expects bytes (int16 PCM)
data = audio_data.data.tobytes()
if rec.AcceptWaveform(data):
result = json.loads(rec.Result())
text = result.get('text', '').strip()
if text:
speech_detected = True
last_speech_time = time.time()
else:
partial = json.loads(rec.PartialResult())
partial_text = partial.get('partial', '').strip()
if partial_text:
speech_detected = True
last_speech_time = time.time()
finally:
self._recorder = None
if not speech_detected:
return None
final = json.loads(rec.FinalResult())
text = final.get('text', '')
if not text:
text = result.get('text', '')
text = text.strip()
if not text:
return None
return text
def _audio_loop(self):
while not self.should_stop():
if not self._wait_recording_start():
break
self._on_conversation_start()
try:
text = self._capture_and_recognize()
except Exception as e:
self.logger.error(
'Error during speech recognition: %s', e, exc_info=True
)
self._on_conversation_end()
continue
if text:
self._on_speech_recognized(text)
else:
self._on_conversation_timeout()
def _start_conversation(self, *_, **__):
self._start_recording_event.set()
def _stop_conversation(self, *_, **__):
super()._stop_conversation()
if self._recorder:
self._recorder.stop()
self._on_conversation_end()
[docs]
@action
def start_conversation(self, *_, **__):
"""
Start a conversation with the assistant.
The conversation will be automatically stopped after
``conversation_max_duration`` seconds, or after
``conversation_start_timeout`` seconds of silence with no speech
detected, or after ``conversation_end_timeout`` seconds of silence
after the last speech, or when :meth:`.stop_conversation` is called.
"""
self._start_conversation()
[docs]
@action
def mute(self, *_, **__):
"""
.. note:: This plugin has no continuous hotword detection. Speech
processing is on-demand via :meth:`.start_conversation` and
:meth:`.stop_conversation`. Mute/unmute are no-ops.
"""
self.logger.warning(
"assistant.vosk.mute is not implemented because this plugin "
"has no hotword detection"
)
[docs]
@action
def unmute(self, *_, **__):
"""
.. note:: This plugin has no continuous hotword detection. Speech
processing is on-demand via :meth:`.start_conversation` and
:meth:`.stop_conversation`. Mute/unmute are no-ops.
"""
self.logger.warning(
"assistant.vosk.unmute is not implemented because this plugin "
"has no hotword detection"
)
[docs]
@action
def send_text_query(self, *_, query: str, **__):
"""
Send a text query to the assistant (emulates speech recognition).
:param query: The text query to process.
"""
self._on_speech_recognized(query)
def main(self):
self._load_model()
while not self.should_stop():
try:
self._audio_loop()
except Exception as e:
self.logger.error('Audio loop error: %s', e, exc_info=True)
self.wait_stop(5)
def stop(self):
self._stop_conversation()
super().stop()
# vim:sw=4:ts=4:et: