Source code for platypush.plugins.slack

import json
from typing import Optional, Iterable

import multiprocessing
import requests
from websocket import WebSocketApp

from platypush.context import get_bus
from platypush.message.event.chat.slack import (
    SlackMessageReceivedEvent,
    SlackMessageDeletedEvent,
    SlackMessageEditedEvent,
    SlackAppMentionReceivedEvent,
)
from platypush.plugins import RunnablePlugin, action
from platypush.plugins.chat import ChatPlugin


[docs] class SlackPlugin(ChatPlugin, RunnablePlugin): """ Plugin used to interact with Slack instances. You'll have to generate your own Slack app and tokens in order to use this plugin. Steps: - Create a new Slack app `here <https://api.slack.com/apps>` and associate a Slack workspace to it. - In the configuration panel of your app scroll to Socket Mode and select Enable Socket Mode. - Scroll to Event Subscriptions and select Enable Events. - Choose the type of events that you want to subscribe to. You can select bot events (i.e. when somebody in the channel mentions the name of your app) or any of the workspace events (e.g. creation of messages, user events etc.). - Scroll to App-Level Tokens and generate a new token with ``connections:write`` scope. This token will be used to receive Slack events over websocket. - Scroll to OAuth & Permissions and select the scopes that you want to enable. You may usually want to enable Bot Token Scopes -> ``app_mentions:read``, so the script can react when somebody mentions its name. You may also want to select the user scopes relevant to your application - e.g. read/write messages, manage users etc. - If you changed scopes and settings, you may have to reinstall the app in the workspace through the Install App menu. - Navigate to the Install App menu. If the app has been correctly installed in your workspace then you should see a Bot User OAuth Token, used to authenticate API calls performed as the app/bot. If you also granted user permissions to the app then you should also see a User OAuth Token on the page. """ _api_base_url = 'https://slack.com/api'
[docs] def __init__( self, app_token: str, bot_token: str, user_token: Optional[str] = None, **kwargs ): """ :param app_token: Your Slack app token. :param bot_token: Bot OAuth token reported on the *Install App* menu. :param user_token: User OAuth token reported on the *Install App* menu. """ super().__init__(**kwargs) self._app_token = app_token self._bot_token = bot_token self._user_token = user_token self._ws_url: Optional[str] = None self._ws_app: Optional[WebSocketApp] = None self._ws_listener: Optional[multiprocessing.Process] = None self._connected_event = multiprocessing.Event() self._disconnected_event = multiprocessing.Event() self._state_lock = multiprocessing.RLock()
@classmethod def _url_for(cls, method: str) -> str: return f'{cls._api_base_url}/{method}'
[docs] @action def send_message( self, channel: str, as_user: bool = False, text: Optional[str] = None, blocks: Optional[Iterable[str]] = None, **kwargs, ): """ Send a message to a channel. It requires a token with ``chat:write`` bot/user scope. :param channel: Channel ID or name. :param as_user: If true then the message will be sent as the authorized user, otherwise as the application bot (default: false). :param text: Text to be sent. :param blocks: Extra blocks to be added to the message (e.g. images, links, markdown). See `Slack documentation for blocks <https://api.slack.com/reference/block-kit/blocks>`_. """ rs = requests.post( self._url_for('chat.postMessage'), headers={ 'Authorization': f'Bearer {self._user_token if as_user else self._bot_token}' }, json={ 'channel': channel, 'text': text, 'blocks': blocks or [], }, ) try: rs.raise_for_status() rs = rs.json() assert rs.get('ok'), rs.get('error', rs.get('warning')) except Exception as e: raise AssertionError(e)
[docs] def main(self): self._connect() stop_events = [] while not any(stop_events): stop_events = self._should_stop.wait( timeout=1 ), self._disconnected_event.wait(timeout=1)
[docs] def stop(self): if self._ws_app: self._ws_app.close() self._ws_app = None if self._ws_listener and self._ws_listener.is_alive(): self.logger.info('Terminating websocket process') self._ws_listener.terminate() self._ws_listener.join(5) if self._ws_listener and self._ws_listener.is_alive(): self.logger.warning( 'Terminating the websocket process failed, killing the process' ) self._ws_listener.kill() if self._ws_listener: self._ws_listener.join() self._ws_listener = None super().stop()
def _connect(self): with self._state_lock: if self.should_stop() or self._connected_event.is_set(): return self._ws_url = None rs = requests.post( 'https://slack.com/api/apps.connections.open', headers={ 'Authorization': f'Bearer {self._app_token}', }, ) try: rs.raise_for_status() except Exception: if rs.status_code == 401 or rs.status_code == 403: self.logger.error( 'Unauthorized/Forbidden Slack API request, stopping the service' ) self.stop() return raise rs = rs.json() assert rs.get('ok') self._ws_url = rs.get('url') self._ws_app = WebSocketApp( self._ws_url, on_open=self._on_open(), on_message=self._on_msg(), on_error=self._on_error(), on_close=self._on_close(), ) def server(): self._ws_app.run_forever() self._ws_listener = multiprocessing.Process(target=server) self._ws_listener.start() def _on_open(self): def hndl(*_): with self._state_lock: self._disconnected_event.clear() self._connected_event.set() self.logger.info('Connected to the Slack websocket') return hndl @staticmethod def _send_ack(ws: WebSocketApp, msg): envelope_id = msg.get('envelope_id') if envelope_id: # Send ACK ws.send( json.dumps( { 'envelope_id': envelope_id, } ) ) def _on_msg(self): def hndl(*args): ws = args[0] if len(args) > 1 else self._ws_app data = json.loads(args[1] if len(args) > 1 else args[0]) output_event = None self._send_ack(ws, data) if data['type'] == 'events_api': event = data.get('payload', {}).get('event', {}) event_args = {} if event['type'] == 'app_mention': output_event = SlackAppMentionReceivedEvent( text=event['text'], user=event['user'], channel=event['channel'], team=event['team'], timestamp=event['event_ts'], icons=event.get('icons'), blocks=event.get('blocks'), ) elif event['type'] == 'message': msg = event.copy() prev_msg = event.get('previous_message') event_type = SlackMessageReceivedEvent if event.get('subtype') == 'message_deleted': msg = prev_msg event_type = SlackMessageDeletedEvent event_args['timestamp'] = event['deleted_ts'] else: event_args['timestamp'] = msg.get('ts') if event.get('subtype') == 'message_changed': msg = msg.get('message', msg) event_args['previous_message'] = prev_msg event_type = SlackMessageEditedEvent event_args.update( { 'text': msg.get('text'), 'user': msg.get('user'), 'channel': msg.get('channel', event.get('channel')), 'team': msg.get('team'), 'icons': msg.get('icons'), 'blocks': msg.get('blocks'), } ) output_event = event_type(**event_args) if output_event: get_bus().post(output_event) return hndl def _on_error(self): def hndl(*args): error = args[1] if len(args) > 1 else args[0] ws = args[0] if len(args) > 1 else None self.logger.warning('Slack websocket error: {}'.format(error)) if ws: ws.close() return hndl def _on_close(self): def hndl(*_): with self._state_lock: self._disconnected_event.set() self._connected_event.clear() self.logger.warning('Slack websocket connection closed') return hndl