Source code for platypush.plugins.notes._plugin

import os
import shutil
from datetime import datetime
from threading import Timer
from typing import Any, Dict, List, Optional

from watchdog.observers import Observer

from platypush.bus import Bus
from platypush.common.notes import Note, NoteCollection
from platypush.message.event.file import (
    FileSystemEvent,
    FileSystemDeleteEvent,
    FileSystemMovedEvent,
)
from platypush.plugins.file.monitor import EventHandler
from platypush.plugins.file.monitor.entities.resources import MonitoredResource
from platypush.plugins.notes import ApiSettings, BaseNotePlugin, Results
from platypush.plugins.notes._model import StateDelta


# pylint: disable=too-many-ancestors
[docs] class NotesPlugin(BaseNotePlugin): """ This plugin provides the ability to manage local notes on the filesystem. Specify the directory where the notes are stored in the ``path`` parameter. It can be synchronized with external notes plugins through the ``sync_to_plugins`` configuration parameter, which allows you to synchronize notes with other plugins that support the same API. """ _api_settings = ApiSettings( supports_notes_limit=True, supports_notes_offset=True, supports_collections_limit=True, supports_collections_offset=True, supports_search_limit=False, supports_search_offset=False, supports_search=False, ) _supported_notes_exts = ['md', 'txt', 'rst', 'html']
[docs] def __init__( self, path: str, *args, file_extension: str = 'md', poll_interval: float = 10.0, **kwargs, ): """ :param path: The directory where the notes are stored. :param file_extension: The file extension for the notes. Defaults to ``md``. """ super().__init__(*args, poll_interval=poll_interval, **kwargs) self.path = os.path.abspath(os.path.expanduser(path)) self.file_extension = file_extension self._pending_events: List[FileSystemEvent] = [] self._event_processor: Optional[Timer] = None self._watchdog_bus = Bus() self._observer = Observer() evt_hndl = EventHandler( resource=MonitoredResource(path=self.path, recursive=True), bus=self._watchdog_bus, ) self._observer.schedule(evt_hndl, self.path, recursive=True) self.logger.info('Notes plugin initialized with notes directory: %s', self.path)
@classmethod def _is_note(cls, path: str) -> bool: return any(path.endswith(f'.{ext}') for ext in cls._supported_notes_exts) def _id_to_path(self, item_id: Any) -> str: """ Convert an item ID to a path within the notes directory. """ item_id = os.path.expanduser(str(item_id).strip()) candidate_path = os.path.abspath( item_id if os.path.isabs(item_id) else os.path.join(self.path, item_id) ) # Security check: must be inside path (abspath, not realpath, so symlinks OK) notes_dir_abs = os.path.abspath(self.path) assert ( os.path.commonpath([candidate_path, notes_dir_abs]) == notes_dir_abs ), f'Item ID "{item_id}" is outside the notes directory "{self.path}"' return candidate_path def _path_to_id(self, path: str) -> Any: """ Convert an item path to an ID relative to the notes directory. """ path = os.path.expanduser(path) path = os.path.abspath( path if os.path.isabs(path) else os.path.join(self.path, path) ) # Security check: must be inside notes_dir (abspath, not realpath, so symlinks OK) notes_dir_abs = os.path.abspath(self.path) assert ( os.path.commonpath([path, notes_dir_abs]) == notes_dir_abs ), f'Item path "{path}" is outside the notes directory "{self.path}"' return os.path.relpath(path, notes_dir_abs) def _note_id_to_path(self, note_id: Any) -> str: """ Convert a note ID to a file path within the notes directory. """ note_id = str(note_id).strip() if not self._is_note(note_id): note_id += f'.{self.file_extension}' return self._id_to_path(note_id) def _note_path_to_id(self, path: str) -> Any: """ Convert a file path to a note ID relative to the notes directory. """ path = os.path.expanduser(path) if not self._is_note(path): path += f'.{self.file_extension}' return self._path_to_id(path) def _to_note(self, path: str, with_content: bool = False) -> Note: """ Convert a file path to a Note object. """ note_id = self._note_path_to_id(path) content = None assert os.path.exists(path), f'Note path "{path}" does not exist' if with_content: with open(path, 'r') as f: content = f.read() return Note( id=note_id, plugin=self._plugin_name, title=os.path.splitext(os.path.basename(path))[0], content=content, parent=self._fetch_collection( os.path.dirname(note_id) if os.path.dirname(note_id) else None ), created_at=datetime.fromtimestamp(os.path.getctime(path)), updated_at=datetime.fromtimestamp(os.path.getmtime(path)), ) def _to_collection(self, path: str) -> NoteCollection: """ Convert a file path to a NoteCollection object. """ collection_id = self._path_to_id(path) assert os.path.exists(path), f'Collection path "{path}" does not exist' return NoteCollection( id=collection_id, plugin=self._plugin_name, title=os.path.basename(path), parent=self._fetch_collection( os.path.dirname(collection_id) if os.path.dirname(collection_id) else None ), created_at=datetime.fromtimestamp(os.path.getctime(path)), updated_at=datetime.fromtimestamp(os.path.getmtime(path)), ) def _fetch_note(self, note_id: Any, *_, **__) -> Optional[Note]: try: path = self._note_id_to_path(note_id) return self._to_note(path, with_content=True) except AssertionError: return None def _fetch_notes( self, *_, limit: Optional[int] = None, offset: Optional[int] = None, **__ ) -> List[Note]: notes = [] for root, _, files in os.walk(self.path): for file in files: if self._is_note(file): path = os.path.join(root, file) try: note = self._to_note(path, with_content=False) notes.append(note) except AssertionError: continue notes.sort(key=lambda n: n.updated_at, reverse=True) if limit is not None: notes = ( notes[offset : offset + limit] if offset is not None else notes[:limit] ) elif offset is not None: notes = notes[offset:] return notes def _create_note( self, title: str, content: str, *_, parent: Optional[Any] = None, **__, ) -> Note: """ Create a new note with the given title and content. """ title = title.strip() filename = title if self._is_note(title) else f'{title}.{self.file_extension}' note_id = os.path.join(parent or '', filename) path = self._note_id_to_path(note_id) os.makedirs(os.path.dirname(path), exist_ok=True) with open(path, 'w') as f: f.write(content) return self._to_note(path, with_content=True) def _edit_note( self, note_id: Any, *_, title: Optional[str] = None, content: Optional[str] = None, parent: Optional[Any] = None, **__, ) -> Note: """ Edit an existing note with the given ID. """ note = self._fetch_note(note_id) assert note is not None, f'Note with ID "{note_id}" not found' if title is not None or parent is not None: old_path = self._note_id_to_path(note.id) new_parent = parent or (note.parent.id if note.parent else '') new_title = title.strip() if title else note.title filename = ( new_title if self._is_note(new_title) else f'{new_title}.{self.file_extension}' ) note_id = os.path.join(new_parent, filename) new_path = self._note_id_to_path(note_id) if new_path != old_path: os.makedirs(os.path.dirname(new_path), exist_ok=True) os.rename(old_path, new_path) note.id = note_id note.title = new_title if content is not None: path = self._note_id_to_path(note.id) with open(path, 'w') as f: f.write(content) note.updated_at = datetime.now() note.content = content if content is not None else note.content return note def _delete_note(self, note_id: Any, *_, **__): """ Delete a note with the given ID. """ note = self._fetch_note(note_id) assert note is not None, f'Note with ID "{note_id}" not found' path = self._note_id_to_path(note.id) if not os.path.exists(path): raise FileNotFoundError(f'Note file "{path}" does not exist') os.remove(path) def _fetch_collection( self, collection_id: Any, *_, **__ ) -> Optional[NoteCollection]: """ Fetch a collection by its ID. """ try: path = self._id_to_path(collection_id) if not os.path.exists(path): return None return self._to_collection(path) except AssertionError: return None def _fetch_collections( self, *_, limit: Optional[int] = None, offset: Optional[int] = None, **__ ) -> List[NoteCollection]: collections = {} for root, _, files in os.walk(self.path): for file in files: if self._is_note(file): path = os.path.dirname(os.path.join(root, file)) if path == self.path: continue try: collection = self._to_collection(path) collections[collection.id] = collection except AssertionError: continue collections = list(collections.values()) collections.sort(key=lambda c: c.updated_at, reverse=True) if limit is not None: collections = ( collections[offset : offset + limit] if offset is not None else collections[:limit] ) elif offset is not None: collections = collections[offset:] return collections def _create_collection( self, title: str, *_, parent: Optional[Any] = None, **__, ) -> NoteCollection: """ Create a new collection with the given title. """ collection_id = os.path.join(parent or '', title) path = self._id_to_path(collection_id) os.makedirs(path, exist_ok=True) return self._to_collection(path) def _edit_collection( self, collection_id: Any, *_, title: Optional[str] = None, parent: Optional[Any] = None, **__, ) -> NoteCollection: """ Edit an existing collection with the given ID. """ collection = self._fetch_collection(collection_id) assert collection is not None, f'Collection with ID "{collection_id}" not found' if title is not None or parent is not None: old_path = self._id_to_path(collection.id) new_parent = parent or (collection.parent.id if collection.parent else '') new_path = self._id_to_path( os.path.join(new_parent, title or collection.title) ) if new_path != old_path: os.makedirs(os.path.dirname(new_path), exist_ok=True) os.rename(old_path, new_path) collection.id = collection_id collection.title = os.path.basename(new_path) collection.updated_at = datetime.now() return collection def _delete_collection(self, collection_id: Any, *_: Any, **__: Any): """ Delete a collection with the given ID. """ collection = self._fetch_collection(collection_id) assert collection is not None, f'Collection with ID "{collection_id}" not found' path = self._id_to_path(collection.id) if not os.path.exists(path): raise FileNotFoundError(f'Collection path "{path}" does not exist') shutil.rmtree(path) def _search(self, *_, **__) -> Results: raise NotImplementedError( 'Search is not implemented in the Local Notes API. ' 'Fallback on the internal database search instead.' ) def _poll_events(self, timeout: Optional[float] = 1.0): """ Poll for file system events in the notes directory. """ if not self._observer.is_alive(): self._observer.start() while not self.should_stop(): evt: Optional[FileSystemEvent] = self._watchdog_bus.get(timeout=timeout) # type: ignore if evt is None: continue with self._sync_lock: if self._event_processor is None: self._event_processor = Timer( self.poll_interval or 1.0, self._sync_events ) self._event_processor.start() self._pending_events.append(evt) def _sync_events(self): """ Timed function to process pending file system events. """ try: with self._sync_lock: state_delta = self._to_state_delta(self._pending_events) if state_delta.is_empty(): return self._apply_state_delta(state_delta) self._db_sync(state_delta) self._last_sync_time = datetime.fromtimestamp( state_delta.latest_updated_at ) self._pending_events.clear() self._process_events(state_delta) finally: if self._event_processor: self._event_processor.cancel() self._event_processor = None def _apply_state_delta(self, state_delta: StateDelta): for collection in [ *state_delta.collections.added.values(), *state_delta.collections.updated.values(), ]: self._collections[collection.id] = collection for collection_id in state_delta.collections.deleted: self._collections.pop(collection_id, None) for note in [ *state_delta.notes.added.values(), *state_delta.notes.updated.values(), ]: self._notes[note.id] = note for note_id in state_delta.notes.deleted: self._notes.pop(note_id, None) def _to_state_delta(self, events: List[FileSystemEvent]) -> StateDelta: """ Calculates the state delta from file system events. """ notes: Dict[str, Note] = {} prev_notes: Dict[str, Note] = {} for event in events: path = event.args.get("path") if ( not path or # Skip directory events event.args.get("is_directory", False) or # Skip events for unsupported note extensions not any(path.endswith(f'.{ext}') for ext in self._supported_notes_exts) ): continue note_id = self._note_path_to_id(path) if self._notes.get(note_id): prev_notes[note_id] = self._notes[note_id] # Handle file move events if isinstance(event, FileSystemMovedEvent): new_path = event.args.get("new_path") assert new_path note_id = self._note_path_to_id(new_path) notes[note_id] = self._to_note(new_path, with_content=True) # Handle file creation or modification elif not isinstance(event, FileSystemDeleteEvent): notes[note_id] = self._to_note(path, with_content=True) collections = { note.parent.id: note.parent for note in notes.values() if note.parent } prev_collections = { collection.id: self._collections[collection.id] for collection in collections.values() if collection.id in self._collections } return self._get_state_delta( previous_notes=prev_notes, previous_collections=prev_collections, notes=notes, collections=collections, filter_by_latest_updated_at=False, ) def main(self): if not self.poll_interval: self.wait_stop() return try: self.sync() except Exception as e: raise RuntimeError( 'Failed to perform initial sync with the notes directory' ) from e try: self._poll_events() except KeyboardInterrupt: self.logger.info('KeyboardInterrupt received, stopping the notes plugin') finally: self._observer.stop() self._observer.join() self._watchdog_bus.stop() self.logger.info('Notes plugin stopped') self.wait_stop(self.poll_interval)
# vim:sw=4:ts=4:et: