Source code for platypush.plugins.user

from typing import List, Dict, Any, Optional, Tuple, Union

from platypush.plugins import Plugin, action
from platypush.user import UserManager


[docs] class UserPlugin(Plugin): """ Plugin to programmatically create and manage users and user sessions """
[docs] def __init__(self, **kwargs): super().__init__(**kwargs) self.user_manager = UserManager()
[docs] @action def create_user( self, username, password, executing_user=None, executing_user_password=None, session_token=None, **kwargs, ): """ Create a user. This action needs to be executed by an already existing user, who needs to authenticate with their own credentials, unless this is the first user created on the system. :return: .. code-block:: json { "user_id": 1, "username": "test-user", "created_at": "2020-11-26T22:41:40.550574" } """ if ( self.user_manager.get_user_count() > 0 and not executing_user and not session_token ): return None, "You need to authenticate in order to create another user" if not self.user_manager.authenticate_user( executing_user, executing_user_password ): user, _ = self.user_manager.authenticate_user_session(session_token)[:2] if not user: return None, "Invalid credentials and/or session_token" try: user = self.user_manager.create_user(username, password, **kwargs) except (NameError, ValueError) as e: return None, str(e) return { 'user_id': user.user_id, 'username': user.username, 'created_at': user.created_at.isoformat(), }
[docs] @action def authenticate_user( self, username: str, password: str, code: Optional[str] = None, return_details: bool = False, ) -> Union[bool, Tuple[bool, str]]: """ Authenticate a user. :param username: Username. :param password: Password. :param code: Optional 2FA code, if 2FA is enabled for the user. :param return_details: If True then return the error details in case of authentication failure. :return: If ``return_details`` is False (default), the action returns True if the provided credentials are valid, False otherwise. If ``return_details`` is True then the action returns a tuple (authenticated, error_details) where ``authenticated`` is True if the provided credentials are valid, False otherwise, and ``error_details`` is a string containing the error details in case of authentication failure. Supported error details are: - ``invalid_credentials``: Invalid username or password. - ``invalid_otp_code``: Invalid 2FA code. - ``missing_otp_code``: Username/password are correct, but a 2FA code is required for the user. """ response = self.user_manager.authenticate_user( username, password, code=code, with_status=return_details ) if return_details: assert ( isinstance(response, tuple) and len(response) == 2 ), 'Invalid response from authenticate_user' return response[0], response[1].value return response
[docs] @action def update_password(self, username, old_password, new_password): """ Update the password of a user. :return: True if the password was successfully updated, false otherwise """ return self.user_manager.update_password(username, old_password, new_password)
[docs] @action def delete_user( self, username, executing_user=None, executing_user_password=None, session_token=None, ): """ Delete a user. """ if not self.user_manager.authenticate_user( executing_user, executing_user_password ): user, _ = self.user_manager.authenticate_user_session(session_token)[:2] if not user: return None, "Invalid credentials and/or session_token" try: return self.user_manager.delete_user(username) except NameError: return None, f"No such user: {username}"
[docs] @action def create_session(self, username, password, code=None, expires_at=None): """ Create a user session. :return: .. code-block:: json { "session_token": "secret", "user_id": 1, "username": "test-user", "created_at": "2020-11-26T22:41:40.550574", "expires_at": "2020-11-26T22:41:40.550574" } """ session = self.user_manager.create_user_session( username=username, password=password, code=code, expires_at=expires_at ) if isinstance(session, tuple): session = session[0] if not session: return None, "Invalid credentials" return { 'session_token': session.session_token, 'user_id': session.user_id, 'created_at': session.created_at.isoformat(), 'expires_at': ( session.expires_at.isoformat() if session.expires_at else None # type: ignore ), }
[docs] @action def authenticate_session(self, session_token): """ Authenticate a session by token and return the associated user. :return: .. code-block:: json { "user_id": 1, "username": "test-user", "created_at": "2020-11-26T22:41:40.550574" } """ user, _ = self.user_manager.authenticate_user_session(session_token)[:2] if not user: return None, 'Invalid session token' return { 'user_id': user.user_id, 'username': user.username, 'created_at': user.created_at.isoformat(), }
[docs] @action def delete_session(self, session_token): """ Delete a user session. """ return self.user_manager.delete_user_session(session_token)
[docs] @action def get_users(self) -> List[Dict[str, Any]]: """ Get the list of registered users. :return: .. code-block:: json [ { "user_id": 1, "username": "user1", "created_at": "2020-11-26T22:41:40.550574" }, { "user_id": 2, "username": "user2", "created_at": "2020-11-28T21:10:23.224813" } ] """ users = self.user_manager.get_users() return [ { 'user_id': user.user_id, 'username': user.username, 'created_at': user.created_at.isoformat(), } for user in users ]
[docs] @action def get_user_by_session(self, session_token: str) -> dict: """ Get the user record associated to a session token. :param session_token: Session token. :return: .. code-block:: json [ { "user_id": 1, "username": "user1", "created_at": "2020-11-26T22:41:40.550574" } ] """ user = self.user_manager.get_user_by_session(session_token) assert user, 'No user associated with the specified session token' return { 'user_id': user.user_id, 'username': user.username, 'created_at': user.created_at.isoformat(), }
# vim:sw=4:ts=4:et: