Source code for platypush.plugins.todoist

import time
from typing import Optional

import todoist
import todoist.managers.items  # type: ignore
import websocket

from platypush.plugins import Plugin, action
from platypush.schemas.todoist import (
    TodoistCollaboratorSchema,
    TodoistFilterSchema,
    TodoistItemSchema,
    TodoistLiveNotificationSchema,
    TodoistNoteSchema,
    TodoistProjectNoteSchema,
    TodoistProjectSchema,
    TodoistUserSchema,
)


[docs] class TodoistPlugin(Plugin): """ Todoist integration. You'll also need a Todoist token. You can get it `here <https://todoist.com/prefs/integrations>`_. """ _sync_timeout = 60.0
[docs] def __init__(self, api_token: str, **kwargs): """ :param api_token: Todoist API token. You can get it `here <https://todoist.com/prefs/integrations>`_. """ super().__init__(**kwargs) self.api_token = api_token self._api = None self._last_sync_time = None self._ws_url: Optional[str] = None self._ws: Optional[websocket.WebSocketApp] = None self._connected = False self._todoist_initialized = False self._items = {} self._event_handled = False
def _get_api(self) -> todoist.TodoistAPI: # type: ignore if not self._api: self._api = todoist.TodoistAPI(self.api_token) # type: ignore if ( not self._last_sync_time or time.time() - self._last_sync_time > self._sync_timeout ): self._api.sync() return self._api
[docs] @action def get_user(self): """ Get logged user info. :return: .. schema:: todoist.TodoistUserSchema """ return TodoistUserSchema().dump(self._get_api().state['user'])
[docs] @action def get_projects(self): """ Get list of Todoist projects. :return: .. schema:: todoist.TodoistProjectSchema(many=True) """ return TodoistProjectSchema().dump(self._get_api().state['projects'], many=True)
[docs] @action def get_items(self): """ Get list of Todoist projects. :return .. schema:: todoist.TodoistItemSchema(many=True) """ return TodoistItemSchema().dump(self._get_api().state['items'], many=True)
[docs] @action def get_filters(self): """ Get list of Todoist filters. :return: .. schema:: todoist.TodoistFilterSchema(many=True) """ return TodoistFilterSchema().dump(self._get_api().state['filters'], many=True)
[docs] @action def get_live_notifications(self): """ Get list of Todoist live notifications. :return: .. schema:: todoist.TodoistLiveNotificationSchema(many=True) """ return TodoistLiveNotificationSchema().dump( self._get_api().state['live_notifications'], many=True )
[docs] @action def get_collaborators(self): """ Get list of collaborators. :return: .. schema:: todoist.TodoistCollaboratorSchema(many=True) """ return TodoistCollaboratorSchema().dump( self._get_api().state['collaborators'], many=True )
[docs] @action def get_notes(self): """ Get list of Todoist notes. :return: .. schema:: todoist.TodoistNoteSchema(many=True) """ return TodoistNoteSchema().dump(self._get_api().state['notes'], many=True)
[docs] @action def get_project_notes(self): """ Get list of Todoist project notes. :return: .. schema:: todoist.TodoistProjectNoteSchema(many=True) """ return TodoistProjectNoteSchema().dump( self._get_api().state['project_notes'], many=True )
[docs] @action def add_item(self, content: str, project_id: Optional[int] = None, **kwargs): """ Add a new item. """ api = self._get_api() manager = todoist.managers.items.ItemsManager(api=api) item = manager.add(content, project_id=project_id, **kwargs) api.commit() return item.data
[docs] @action def delete_item(self, item_id: int): """ Delete an item by id. """ api = self._get_api() manager = todoist.managers.items.ItemsManager(api=api) manager.delete(item_id) api.commit()
[docs] @action def update_item(self, item_id: int, **kwargs): """ Update an item by id. """ api = self._get_api() manager = todoist.managers.items.ItemsManager(api=api) manager.update(item_id, **kwargs) api.commit()
[docs] @action def complete_item(self, item_id: int): """ Mark an item as done. """ api = self._get_api() manager = todoist.managers.items.ItemsManager(api=api) manager.complete(item_id) api.commit()
[docs] @action def uncomplete_item(self, item_id: int): """ Mark an item as not done. """ api = self._get_api() manager = todoist.managers.items.ItemsManager(api=api) manager.uncomplete(item_id) api.commit()
[docs] @action def archive(self, item_id: int): """ Archive an item by id. """ api = self._get_api() manager = todoist.managers.items.ItemsManager(api=api) manager.archive(item_id) api.commit()
[docs] @action def unarchive(self, item_id: int): """ Un-archive an item by id. """ api = self._get_api() manager = todoist.managers.items.ItemsManager(api=api) manager.unarchive(item_id) api.commit()
[docs] @action def sync(self): """ Sync/update info with the remote server. """ api = self._get_api() api.sync()
# vim:sw=4:ts=4:et: