Source code for platypush.plugins.assistant.openai

from io import BytesIO
from threading import Event
from typing import Optional

import numpy as np
from pydub import AudioSegment

from platypush.common.assistant import AudioRecorder
from platypush.context import get_plugin
from platypush.plugins import RunnablePlugin, action
from platypush.plugins.assistant import AssistantPlugin
from platypush.plugins.openai import OpenaiPlugin

from ._state import RecordingState


# pylint: disable=too-many-ancestors
[docs] class AssistantOpenaiPlugin(AssistantPlugin, RunnablePlugin): """ A voice assistant based on the OpenAI API. It requires the :class:`platypush.plugins.openai.OpenaiPlugin` plugin to be configured with an OpenAI API key. Hotword detection ----------------- This plugin doesn't have hotword detection, as OpenAI doesn't provide an API for that. Instead, the assistant can be started and stopped programmatically through the :meth:`.start_conversation` action. If you want to implement hotword detection, you can use a separate plugin such as :class:`platypush.plugins.assistant.picovoice.AssistantPicovoicePlugin`. The configuration in this case would be like: .. code-block:: yaml assistant.picovoice: access_key: YOUR_PICOVOICE_ACCESS_KEY # List of hotwords to listen for keywords: - alexa - computer - ok google # Disable speech-to-text and intent recognition, only use hotword # detection stt_enabled: false hotword_enabled: true conversation_start_sound: /sound/to/play/when/the/conversation/starts.mp3 # speech_model_path: /mnt/hd/models/picovoice/cheetah/custom-en.pv # intent_model_path: /mnt/hd/models/picovoice/rhino/custom-en-x86.rhn openai: api_key: YOUR_OPENAI_API_KEY # Customize your assistant's context and knowledge base to your # liking context: - role: system content: > You are a 16th century noble lady who talks in Shakespearean English to her peers. # Enable the assistant plugin assistant.openai: # Enable the text-to-speech plugin tts.openai: # Customize the voice model voice: nova Then you can call :meth:`.start_conversation` when the hotword is detected :class:`platypush.message.event.assistant.HotwordDetectedEvent` is triggered: .. code-block:: python from platypush import run, when from platypush.message.event.assistant import HotwordDetectedEvent @when(HotwordDetectedEvent) # You can also customize it by running a different assistant logic # depending on the hotword # @when(HotwordDetectedEvent, hotword='computer') def on_hotword_detected(): run("assistant.openai.start_conversation") This configuration will: 1. Start the hotword detection when the application starts. 2. Start the OpenAI assistant when the hotword is detected. AI responses ------------ By default (unless you set ``stop_conversation_on_speech_match`` to ``False``), the plugin will: 1. Process the speech through the OpenAI API (the GPT model to be is configurable in the OpenAI plugin ``model`` configuration). 2. Render the response through the configured ``tts_plugin`` (default: ``tts.openai``). If ``tts_plugin`` is not set, then the response will be returned as a string. Custom speech processing ------------------------ You can create custom hooks on :class:`platypush.message.event.assistant.SpeechRecognizedEvent` with custom ``phrase`` strings or (regex) patterns. For example: .. code-block:: python from platypush import run, when from platypush.message.event.assistant import SpeechRecognizedEvent # Matches any phrase that contains either "play music" or "play the # music" @when(SpeechRecognizedEvent, phrase='play (the)? music') def play_music(): run('music.mpd.play') If at least a custom hook with a non-empty ``phrase`` string is matched, then the default response will be disabled. If you still want the assistant to say something when the event is handled, you can call ``event.assistant.render_response`` on the hook: .. code-block:: python from datetime import datetime from textwrap import dedent from time import time from platypush import run, when from platypush.message.event.assistant import SpeechRecognizedEvent @when(SpeechRecognizedEvent, phrase='weather today') def weather_forecast(event: SpeechRecognizedEvent): limit = time() + 24 * 60 * 60 # 24 hours from now forecast = [ weather for weather in run("weather.openweathermap.get_forecast") if datetime.fromisoformat(weather["time"]).timestamp() < limit ] min_temp = round( min(weather["temperature"] for weather in forecast) ) max_temp = round( max(weather["temperature"] for weather in forecast) ) max_wind_gust = round( (max(weather["wind_gust"] for weather in forecast)) * 3.6 ) summaries = [weather["summary"] for weather in forecast] most_common_summary = max(summaries, key=summaries.count) avg_cloud_cover = round( sum(weather["cloud_cover"] for weather in forecast) / len(forecast) ) event.assistant.render_response( dedent( f\"\"\" The forecast for today is: {most_common_summary}, with a minimum of {min_temp} and a maximum of {max_temp} degrees, wind gust of {max_wind_gust} km/h, and an average cloud cover of {avg_cloud_cover}%. \"\"\" ) ) Conversation follow-up ---------------------- A conversation will have a follow-up (i.e. the assistant will listen for a phrase after rendering a response) if the response is not empty and ends with a question mark. If you want to force a follow-up even if the response doesn't end with a question mark, you can call :meth:`.start_conversation` programmatically from your hooks. """
[docs] def __init__( self, model: str = "whisper-1", tts_plugin: Optional[str] = "tts.openai", min_silence_secs: float = 1.0, silence_threshold: int = -22, sample_rate: int = 16000, frame_size: int = 16384, channels: int = 1, conversation_start_timeout: float = 5.0, conversation_end_timeout: float = 1.0, conversation_max_duration: float = 15.0, **kwargs, ): """ :param model: OpenAI model to use for audio transcription (default: ``whisper-1``). :param tts_plugin: Name of the TTS plugin to use for rendering the responses (default: ``tts.openai``). :param min_silence_secs: Minimum silence duration in seconds to detect the end of a conversation (default: 1.0 seconds). :param silence_threshold: Silence threshold in dBFS (default: -22). The value of 0 is the maximum amplitude, and -120 is associated to a silent or nearly silent audio, thus the higher the value, the more sensitive the silence detection will be (default: -22). :param sample_rate: Recording sample rate in Hz (default: 16000). :param frame_size: Recording frame size in samples (default: 16384). Note that it's important to make sure that ``frame_size`` / ``sample_rate`` isn't smaller than the minimum silence duration, otherwise the silence detection won't work properly. :param channels: Number of recording channels (default: 1). :param conversation_start_timeout: How long to wait for the conversation to start (i.e. the first non-silent audio frame to be detected) before giving up and stopping the recording (default: 5.0 seconds). :param conversation_end_timeout: How many seconds of silence to wait after the last non-silent audio frame before stopping the recording (default: 1.5 seconds). :param conversation_max_duration: Maximum conversation duration in seconds (default: 15.0 seconds). """ kwargs["tts_plugin"] = tts_plugin super().__init__(**kwargs) self._model = model self._min_silence_secs = min_silence_secs self._silence_threshold = silence_threshold self._sample_rate = sample_rate self._frame_size = frame_size self._channels = channels self._conversation_start_timeout = conversation_start_timeout self._conversation_end_timeout = conversation_end_timeout self._conversation_max_duration = conversation_max_duration self._start_recording_event = Event() self._disable_default_response = False self._recording_state = RecordingState( sample_rate=sample_rate, channels=channels, min_silence_secs=min_silence_secs, silence_threshold=silence_threshold, ) self._recorder: Optional[AudioRecorder] = None
def _to_audio_segment(self, data: np.ndarray) -> AudioSegment: return AudioSegment( data.tobytes(), frame_rate=self._sample_rate, sample_width=data.dtype.itemsize, channels=self._channels, ) def _is_conversation_ended(self): # End if the recording has been stopped if not self._recorder or self._recorder.should_stop(): return True # End if we reached the max conversation duration if self._recording_state.duration >= self._conversation_max_duration: return True # End if the conversation hasn't started yet and we reached the # conversation start timeout if ( not self._recording_state.conversation_started and self._recording_state.duration >= self._conversation_start_timeout ): return True # End if the conversation has started and the user has been silent for # more than the conversation end timeout if ( self._recording_state.conversation_started and self._recording_state.silence_duration >= self._conversation_end_timeout ): return True return False @property def _openai(self) -> OpenaiPlugin: openai: Optional[OpenaiPlugin] = get_plugin("openai") assert openai, ( "OpenAI plugin not found. " "Please configure the `openai` plugin to use `assistant.openai`" ) return openai def _get_prediction(self, audio: BytesIO) -> str: return self._openai.transcribe_raw( audio.getvalue(), extension='mp3', model=self._model ) def _capture_audio(self, recorder: AudioRecorder): while not self.should_stop() and not self._is_conversation_ended(): audio_data = recorder.read() if not audio_data: continue self._recording_state.add_audio(audio_data) def _audio_loop(self): while not self.should_stop(): self._wait_recording_start() self._recording_state.reset() self._on_conversation_start() try: with AudioRecorder( stop_event=self._should_stop, sample_rate=self._sample_rate, frame_size=self._frame_size, channels=self._channels, ) as self._recorder: self._capture_audio(self._recorder) finally: if self._recorder: try: self._recorder.stream.close() except Exception as e: self.logger.warning("Error closing the audio stream: %s", e) self._recorder = None if self._recording_state.is_silent(): self._on_conversation_timeout() else: audio = self._recording_state.export_audio() text = self._get_prediction(audio) self._on_speech_recognized(text) def _wait_recording_start(self): self._start_recording_event.wait() self._start_recording_event.clear() def _start_conversation(self, *_, **__): self._disable_default_response = False self._recording_state.reset() self._start_recording_event.set() def _stop_conversation(self, *_, **__): self._disable_default_response = True super()._stop_conversation() self._recording_state.reset() if self._recorder: self._recorder.stop() self._on_conversation_end() def _on_speech_recognized(self, phrase: Optional[str]): super()._on_speech_recognized(phrase) # Dirty hack: wait a bit before stopping the conversation to make sure # that there aren't event hooks triggered in other threads that are # supposed to handle. if self.stop_conversation_on_speech_match: self.wait_stop(0.5) if self.should_stop(): return if self._disable_default_response: self.logger.debug("Default response disabled, skipping response") return response = self._openai.get_response(phrase).output if response: self.render_response(response) else: self._on_no_response()
[docs] @action def start_conversation(self, *_, **__): """ Start a conversation with the assistant. The conversation will be automatically stopped after ``conversation_max_duration`` seconds of audio, or after ``conversation_start_timeout`` seconds of silence with no audio detected, or after ``conversation_end_timeout`` seconds after the last non-silent audio frame has been detected, or when the :meth:`.stop_conversation` method is called. """ self._start_conversation()
[docs] @action def mute(self, *_, **__): """ .. note:: This plugin has no hotword detection, thus no continuous audio detection. Speech processing is done on-demand through the :meth:`.start_conversation` and :meth:`.stop_conversation` methods. Therefore, the :meth:`.mute` and :meth:`.unmute` methods are not implemented. """ self.logger.warning( "assistant.openai.mute is not implemented because this plugin " "has no hotword detection, and the only way to stop a conversation " "is by calling stop_conversation()" )
[docs] @action def unmute(self, *_, **__): """ .. note:: This plugin has no hotword detection, thus no continuous audio detection. Speech processing is done on-demand through the :meth:`.start_conversation` and :meth:`.stop_conversation` methods. Therefore, the :meth:`.mute` and :meth:`.unmute` methods are not implemented. """ self.logger.warning( "assistant.openai.unmute is not implemented because this plugin " "has no hotword detection, and the only way to start a conversation " "is by calling start_conversation()" )
[docs] @action def send_text_query(self, text: str, *_, **__): """ If the ``tts_plugin`` configuration is set, then the assistant will process the given text query through :meth:`platypush.plugins.openai.OpenaiPlugin.get_response` and render the response through the specified TTS plugin. :return: The response received from :meth:`platypush.plugins.openai.OpenaiPlugin.get_response`. """ response = self._openai.get_response(text).output self.render_response(response) return response
[docs] def main(self): while not self.should_stop(): try: self._audio_loop() except Exception as e: self.logger.error("Audio loop error: %s", e, exc_info=True) self.wait_stop(5) finally: self.stop_conversation()
[docs] def stop(self): self._stop_conversation() super().stop()