import re
from abc import ABC, abstractmethod
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime
from threading import RLock
from time import time
from typing import Any, Dict, Iterable, Optional, Type
from platypush.common.notes import Note, NoteCollection, NoteSource
from platypush.context import Variable
from platypush.entities import get_entities_engine
from platypush.message.event.notes import (
BaseNoteEvent,
NoteCreatedEvent,
NoteUpdatedEvent,
NoteDeletedEvent,
CollectionCreatedEvent,
CollectionUpdatedEvent,
CollectionDeletedEvent,
)
from platypush.plugins import RunnablePlugin, action
from platypush.utils import get_plugin_name_by_class, to_datetime
from .mixins import DbMixin
from ._model import (
CollectionsDelta,
ItemType,
NotesDelta,
Results,
ResultsType,
StateDelta,
)
[docs]
class BaseNotePlugin( # pylint: disable=too-many-ancestors
RunnablePlugin, DbMixin, ABC
):
"""
Base class for note-taking plugins.
"""
[docs]
def __init__(
self,
*args,
poll_interval: float = 300,
timeout: Optional[int] = 60,
max_tokens_length: int = 4,
**kwargs,
):
"""
:param poll_interval: Poll interval in seconds to check for updates (default: 300).
If set to zero or null, the plugin will not poll for updates,
and events will be generated only when you manually call :meth:`.sync`.
:param timeout: Timeout in seconds for the plugin operations (default: 60).
:param max_tokens_length: If the API used by the plugin doesn't support
free-text search (that's currently the case for
:class:`platypush.plugins.nextcloud.notes.NextcloudNotesPlugin` and
for any notes plugins that use the local file system as a backend),
then the plugin will use a search index to perform searches. This
parameter specifies the maximum length of the search tokens that will
be indexed, where each token is composed of a sequence of
alphanumeric characters (including underscores). The longer the number,
the more tokens will be indexed and longer exact phrases will be stored,
but more disk space will be used for the search index (default: 4).
"""
RunnablePlugin.__init__(self, *args, poll_interval=poll_interval, **kwargs)
DbMixin.__init__(self, *args, max_tokens_length=max_tokens_length, **kwargs)
self._sync_lock = RLock()
self._timeout = timeout
self.__last_sync_time: Optional[datetime] = None
@property
def _plugin_name(self) -> str:
return get_plugin_name_by_class(self.__class__)
@property
def _last_sync_time_var(self) -> Variable:
"""
Variable name for the last sync time.
"""
return Variable(f'_LAST_ITEMS_SYNC_TIME[{self._plugin_name}]')
@property
def _last_sync_time(self) -> Optional[datetime]:
"""
Get the last sync time from the variable.
"""
if not self.__last_sync_time:
t = self._last_sync_time_var.get()
self.__last_sync_time = datetime.fromtimestamp(float(t)) if t else None
return self.__last_sync_time
@_last_sync_time.setter
def _last_sync_time(self, value: Optional[datetime]):
"""
Set the last sync time to the variable.
"""
with self._sync_lock:
self.__last_sync_time = value
if value is None:
self._last_sync_time_var.set(None)
else:
self._last_sync_time_var.set(value.timestamp())
@abstractmethod
def _fetch_note(self, note_id: Any, *args, **kwargs) -> Optional[Note]:
"""
Don't call this directly if possible.
Instead, use :meth:`.get_note` method to retrieve a note and update the cache
in a thread-safe manner.
:param note_id: The ID of the note to fetch.
:return: The latest note from the backend.
"""
@abstractmethod
def _fetch_notes(
self,
*args,
filter: Optional[Dict[str, Any]] = None, # pylint: disable=redefined-builtin
sort: Optional[Dict[str, bool]] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
**kwargs,
) -> Iterable[Note]:
"""
Don't call this directly if possible.
Instead, use :meth:`.get_notes` method to retrieve notes and update the cache
in a thread-safe manner.
:return: The latest list of notes from the backend.
"""
@abstractmethod
def _create_note(
self,
title: str,
content: str,
*args,
description: Optional[str] = None,
collection: Optional[Any] = None,
geo: Optional[Dict[str, Optional[float]]] = None,
author: Optional[str] = None,
source: Optional[NoteSource] = None,
**kwargs,
) -> Note:
"""
Create a new note with the specified title and content.
"""
@abstractmethod
def _edit_note(
self,
note_id: Any,
*args,
title: Optional[str] = None,
content: Optional[str] = None,
collection: Optional[Any] = None,
geo: Optional[Dict[str, Optional[float]]] = None,
author: Optional[str] = None,
source: Optional[NoteSource] = None,
**kwargs,
) -> Note:
"""
Edit an existing note by ID.
"""
@abstractmethod
def _delete_note(self, note_id: Any, *args, **kwargs):
"""
Delete a note by ID.
"""
@abstractmethod
def _fetch_collection(
self, collection_id: Any, *args, **kwargs
) -> Optional[NoteCollection]:
"""
Don't call this directly if possible.
Instead, use :meth:`.get_collection` to retrieve a collection and update the cache
in a thread-safe manner.
:param collection_id: The ID of the collection to fetch.
:return: The latest collection from the backend.
"""
@abstractmethod
def _fetch_collections(
self,
*args,
filter: Optional[Dict[str, Any]] = None, # pylint: disable=redefined-builtin
sort: Optional[Dict[str, bool]] = None,
limit: Optional[int] = None,
offset: Optional[int] = None,
**kwargs,
) -> Iterable[NoteCollection]:
"""
Don't call this directly if possible.
Instead, use :meth:`.get_collections` to retrieve collections and update the cache
in a thread-safe manner.
:return: The latest list of note collections from the backend.
"""
@abstractmethod
def _create_collection(
self,
title: str,
*args,
description: Optional[str] = None,
parent: Optional[Any] = None,
**kwargs,
) -> NoteCollection:
"""
Create a new note collection with the specified title and description.
"""
@abstractmethod
def _edit_collection(
self,
collection_id: Any,
*args,
title: Optional[str] = None,
description: Optional[str] = None,
parent: Optional[Any] = None,
**kwargs,
) -> NoteCollection:
"""
Edit an existing note collection by ID.
"""
@abstractmethod
def _delete_collection(self, collection_id: Any, *args, **kwargs):
"""
Delete a note collection by ID.
This method should be implemented by subclasses.
"""
def _process_results( # pylint: disable=too-many-positional-arguments
self,
items: Iterable[Any],
results_type: ResultsType,
limit: Optional[int] = None,
offset: Optional[int] = None,
sort: Optional[Dict[str, bool]] = None,
filter: Optional[Dict[str, Any]] = None, # pylint: disable=redefined-builtin
) -> Iterable[Any]:
if not sort:
sort = {'created_at': True}
if filter:
items = [
item
for item in items
if all(
re.search(v, str(getattr(item, k, '')), re.IGNORECASE)
for k, v in filter.items()
)
]
items = sorted(
items,
key=lambda item: tuple(getattr(item, field) for field in sort.keys()),
reverse=any(not ascending for ascending in sort.values()),
)
supports_limit = False
supports_offset = False
if results_type == ResultsType.NOTES:
supports_limit = self._api_settings.supports_notes_limit
supports_offset = self._api_settings.supports_notes_offset
elif results_type == ResultsType.COLLECTIONS:
supports_limit = self._api_settings.supports_collections_limit
supports_offset = self._api_settings.supports_collections_offset
elif results_type == ResultsType.SEARCH:
supports_limit = self._api_settings.supports_search_limit
supports_offset = self._api_settings.supports_search_offset
if offset is not None and not supports_offset:
items = items[offset:]
if limit is not None and not supports_limit:
items = items[:limit]
return items
def _dispatch_events(self, *events):
"""
Dispatch the given events to the event bus.
"""
if not self.bus:
self.logger.warning(
'Event bus not available. Events will not be dispatched.'
)
return
for event in events:
self.bus.post(event)
def _process_events(self, state_delta: StateDelta):
with self._sync_lock:
self._process_collections_events(state_delta.collections)
self._process_notes_events(state_delta.notes)
def _make_event(
self, evt_type: Type[BaseNoteEvent], *args, **kwargs
) -> BaseNoteEvent:
"""
Create a note event of the specified type.
"""
return evt_type(*args, plugin=self._plugin_name, **kwargs)
def _process_notes_events(self, notes_delta: NotesDelta):
removed_note_events = [
self._make_event(NoteDeletedEvent, note=note)
for note in notes_delta.deleted.values()
]
created_note_events = [
self._make_event(NoteCreatedEvent, note=note)
for note in notes_delta.added.values()
]
updated_note_events = [
self._make_event(NoteUpdatedEvent, note=note)
for note in notes_delta.updated.values()
]
self._dispatch_events(
*removed_note_events,
*created_note_events,
*updated_note_events,
)
def _process_collections_events(self, collections_delta: CollectionsDelta):
removed_collection_events = [
self._make_event(CollectionDeletedEvent, collection=collection)
for collection in collections_delta.deleted.values()
]
created_collection_events = [
self._make_event(CollectionCreatedEvent, collection=collection)
for collection in collections_delta.added.values()
]
updated_collection_events = [
self._make_event(CollectionUpdatedEvent, collection=collection)
for collection in collections_delta.updated.values()
]
self._dispatch_events(
*removed_collection_events,
*created_collection_events,
*updated_collection_events,
)
def _get_note(self, note_id: Any, *args, **kwargs) -> Note:
note = self._fetch_note(note_id, *args, **kwargs)
assert note, f'Note with ID {note_id} not found'
with self._sync_lock:
# Always overwrite the note in the cache,
# as this is the most up-to-date complete version
self._notes[note.id] = note
if note.parent and note.parent.id in self._collections:
self._collections[ # pylint: disable=protected-access
note.parent.id
]._notes[ # pylint: disable=protected-access
note.id
] = note
return self._notes[note.id]
def _merge_note(self, note: Note, skip_content: bool = True) -> Note:
"""
Merge the state of an incoming note with the existing one in the cache.
"""
existing_note = self._notes.get(note.id)
if not existing_note:
# If the note doesn't exist, just return the new one
return note
for field in note.__dataclass_fields__:
existing_value = getattr(existing_note, field)
value = getattr(note, field)
# Don't overwrite content, digest and tags here unless they have been re-fetched
if (
skip_content
and field in ('content', 'digest', 'tags')
and existing_value
and not value
):
continue
setattr(existing_note, field, value)
return existing_note
def _get_notes(
self,
*args,
limit: Optional[int] = None,
offset: Optional[int] = None,
sort: Optional[Dict[str, bool]] = None,
filter: Optional[Dict[str, Any]] = None, # pylint: disable=redefined-builtin
fetch: bool = False,
**kwargs,
) -> Iterable[Note]:
# Always fetch if background polling is disabled
fetch = fetch or not self.poll_interval
if fetch:
with self._sync_lock:
self._notes = {
note.id: self._merge_note(note)
for note in self._fetch_notes(
*args,
limit=limit,
offset=offset,
sort=sort,
filter=filter,
**kwargs,
)
}
self._refresh_notes_cache()
return self._process_results(
self._notes.values(),
limit=limit,
offset=offset,
sort=sort,
filter=filter,
results_type=ResultsType.NOTES,
)
def _get_collection(self, collection_id: Any, *args, **kwargs) -> NoteCollection:
collection = self._fetch_collection(collection_id, *args, **kwargs)
assert collection, f'Collection with ID {collection_id} not found'
with self._sync_lock:
# Always overwrite the collection in the cache,
# as this is the most up-to-date complete version
self._collections[collection.id] = collection
self._refresh_collections_cache()
return self._collections[collection.id]
def _get_collections(
self,
*args,
limit: Optional[int] = None,
offset: Optional[int] = None,
sort: Optional[Dict[str, bool]] = None,
filter: Optional[Dict[str, Any]] = None, # pylint: disable=redefined-builtin
fetch: bool = False,
**kwargs,
) -> Iterable[NoteCollection]:
"""
Get the note collections from the cache or fetch them from the backend if needed.
:return: An iterable of NoteCollection objects.
"""
# Always fetch if background polling is disabled
fetch = fetch or not self.poll_interval
if fetch:
with self._sync_lock:
self._collections = {
collection.id: collection
for collection in self._fetch_collections(
*args,
limit=limit,
offset=offset,
sort=sort,
filter=filter,
**kwargs,
)
}
self._refresh_collections_cache()
return self._process_results(
self._collections.values(),
limit=limit,
offset=offset,
sort=sort,
filter=filter,
results_type=ResultsType.COLLECTIONS,
)
def _refresh_notes_cache(self):
for note in list(self._notes.values()):
if note.parent and note.parent.id in self._collections:
note.parent = self._collections[note.parent.id]
self._collections[ # pylint: disable=protected-access
note.parent.id
]._notes[ # pylint: disable=protected-access
note.id
] = note
def _refresh_collections_cache(self):
for collection in list(self._collections.values()):
# Link the notes to their parent collections
for note in list(collection.notes):
if note.id in self._notes:
collection._notes[note.id] = self._notes[
note.id
] # pylint: disable=protected-access
# Link the child collections to their parent collections
tmp_collections = list(collection.collections)
for collection in tmp_collections:
if collection.id not in self._collections:
collection._collections[collection.id] = (
self._collections[ # pylint: disable=protected-access
collection.id
]
)
# Link the parent collections to their child collections
tmp_collections = list(collection.collections)
for collection in tmp_collections:
if collection.parent and collection.parent.id in self._collections:
collection.parent = self._collections[collection.parent.id]
@staticmethod
def _parse_geo(data: dict) -> Dict[str, Optional[float]]:
return {
key: value or None
for key, value in {
key: float(data.get(key, 0))
for key in ('latitude', 'longitude', 'altitude')
}.items()
}
def _get_state_delta(
self,
previous_notes: Dict[Any, Note],
previous_collections: Dict[Any, NoteCollection],
notes: Dict[Any, Note],
collections: Dict[Any, NoteCollection],
filter_by_latest_updated_at: bool = True,
) -> StateDelta:
"""
Get the state delta between the previous and current state of notes and collections.
:param previous_notes: Previous notes state.
:param previous_collections: Previous collections state.
:param notes: Current notes state.
:param collections: Current collections state.
:param filter_by_latest_updated_at: If True, select only the changes
more recent than the latest updated_at timestamp.
:return: A StateDelta object containing the changes.
"""
state_delta = StateDelta()
latest_updated_at = new_latest_updated_at = (
self._last_sync_time.timestamp() if self._last_sync_time else 0
)
# Get new and updated notes
for note in notes.values():
updated_at = note.updated_at.timestamp() if note.updated_at else 0
if not filter_by_latest_updated_at or updated_at > latest_updated_at:
if note.id not in previous_notes:
state_delta.notes.added[note.id] = note
else:
state_delta.notes.updated[note.id] = note
new_latest_updated_at = max(new_latest_updated_at, updated_at)
# Get deleted notes
for note_id in previous_notes:
if note_id not in notes:
state_delta.notes.deleted[note_id] = previous_notes[note_id]
# Get new and updated collections
for collection in collections.values():
updated_at = (
collection.updated_at.timestamp() if collection.updated_at else 0
)
if not filter_by_latest_updated_at or updated_at > latest_updated_at:
if collection.id not in previous_collections:
state_delta.collections.added[collection.id] = collection
else:
state_delta.collections.updated[collection.id] = collection
new_latest_updated_at = max(new_latest_updated_at, updated_at)
# Get deleted collections
for collection_id in previous_collections:
if collection_id not in collections:
state_delta.collections.deleted[collection_id] = previous_collections[
collection_id
]
state_delta.latest_updated_at = new_latest_updated_at
return state_delta
def _refresh_notes(self, notes: Dict[Any, Note]):
"""
Fetch the given notes from the backend and update the cache.
"""
if not notes:
return
self.logger.info(
'Refreshing the state for %d notes from the backend', len(notes)
)
with ThreadPoolExecutor(max_workers=5) as pool:
# Fetch notes in parallel
futures = [
pool.submit(self._fetch_note, note.id) for note in notes.values()
]
# Wait for all futures to complete and collect the results
results = pool.map(lambda f: f.result(), futures)
with self._sync_lock:
self._notes.update(
{note.id: self._merge_note(note) for note in results if note}
)
self._refresh_notes_cache()
@abstractmethod
def _search(
self,
query: str,
*args,
item_type: ItemType,
include_terms: Optional[Dict[str, Any]] = None,
exclude_terms: Optional[Dict[str, Any]] = None,
created_before: Optional[datetime] = None,
created_after: Optional[datetime] = None,
updated_before: Optional[datetime] = None,
updated_after: Optional[datetime] = None,
limit: Optional[int] = None,
offset: Optional[int] = 0,
**kwargs,
) -> Results:
"""
Search for notes or collections based on the provided query and filters.
"""
[docs]
@action
def search(
self,
*args,
query: str,
type: str = ItemType.NOTE.value, # pylint: disable=redefined-builtin
include_terms: Optional[Dict[str, Any]] = None,
exclude_terms: Optional[Dict[str, Any]] = None,
created_before: Optional[datetime] = None,
created_after: Optional[datetime] = None,
updated_before: Optional[datetime] = None,
updated_after: Optional[datetime] = None,
limit: Optional[int] = None,
offset: Optional[int] = 0,
**kwargs,
):
"""
Search for notes or collections based on the provided query and filters.
In most of the cases (but it depends on the backend) double-quoted
search terms will match exact phrases, while unquoted queries will
match any of the words in the query.
Wildcards (again, depending on the backend) in the search terms are
also supported.
:param query: The search query string (it will be searched in all the
fields).
:param type: The type of items to search for - ``note``,
``collection``, or ``tag`` (default: ``note``).
:param include_terms: Optional dictionary of terms to include in the search.
The keys are field names and the values are strings to match against.
:param exclude_terms: Optional dictionary of terms to exclude from the search.
The keys are field names and the values are strings to exclude from the results.
:param created_before: Optional datetime ISO string or UNIX timestamp
to filter items created before this date.
:param created_after: Optional datetime ISO string or UNIX timestamp
to filter items created after this date.
:param updated_before: Optional datetime ISO string or UNIX timestamp
to filter items updated before this date.
:param updated_after: Optional datetime ISO string or UNIX timestamp
to filter items updated after this date.
:param limit: Maximum number of items to retrieve (default: None,
meaning no limit, or depending on the default limit of the backend).
:param offset: Offset to start retrieving items from (default: 0).
:return: An iterable of matching items, format:
.. code-block:: javascript
{
"has_more": false
"results" [
{
"type": "note",
"item": {
"id": "note-id",
"title": "Note Title",
"content": "Note content...",
"created_at": "2023-10-01T12:00:00Z",
"updated_at": "2023-10-01T12:00:00Z",
// ...
}
}
]
}
"""
method = self._search if self._api_settings.supports_search else self._db_search
return method(
query,
*args,
item_type=ItemType(type),
include_terms=include_terms,
exclude_terms=exclude_terms,
created_before=to_datetime(created_before) if created_before else None,
created_after=to_datetime(created_after) if created_after else None,
updated_before=to_datetime(updated_before) if updated_before else None,
updated_after=to_datetime(updated_after) if updated_after else None,
limit=limit,
offset=offset,
**kwargs,
).to_dict()
[docs]
@action
def get_note(self, note_id: Any, *args, **kwargs) -> dict:
"""
Get a single note and its full content by ID.
:param note_id: The ID of the note to retrieve.
:return: The note as a dictionary, format:
.. schema:: notes.NoteItemSchema
"""
return self._get_note(note_id, *args, **kwargs).to_dict()
[docs]
@action
def get_notes(
self,
*args,
limit: Optional[int] = None,
offset: Optional[int] = 0,
sort: Optional[Dict[str, bool]] = None,
filter: Optional[Dict[str, Any]] = None, # pylint: disable=redefined-builtin
fetch: bool = False,
**kwargs,
) -> Iterable[dict]:
"""
Get notes from the cache or fetch them from the backend.
:param limit: Maximum number of collections to retrieve (default: None, meaning no limit).
:param offset: Offset to start retrieving collections from (default: 0).
:param sort: A dictionary specifying the fields to sort by and their order.
Example: {'created_at': True} sorts by creation date in ascending
order, while {'created_at': False} sorts in descending order.
:param filter: A dictionary specifying filters to apply to the notes, in the form
of a dictionary where the keys are field names and the values are regular expressions
to match against the field values.
:param fetch: If True, always fetch the latest collections from the backend,
regardless of the cache state (default: False).
:param kwargs: Additional keyword arguments to pass to the fetch method.
:return: An iterable of notes, format:
.. schema:: notes.NoteItemSchema(many=True)
"""
return [
note.to_dict()
for note in self._get_notes(
*args,
limit=limit,
offset=offset,
sort=sort,
filter=filter,
fetch=fetch,
**kwargs,
)
]
[docs]
@action
def create_note(
self,
title: str,
content: str,
*args,
description: Optional[str] = None,
collection: Optional[Any] = None,
geo: Optional[Dict[str, Optional[float]]] = None,
source: Optional[dict] = None,
author: Optional[str] = None,
**kwargs,
) -> dict:
"""
Create a new note with the specified title and content.
:param title: The title of the new note.
:param content: The content of the new note.
:param description: Optional description for the note.
:param collection: Optional collection ID to add the note to.
:param geo: Optional geographical coordinates as a dictionary with the
fields ``latitude``, ``longitude``, and ``altitude``.
:param source: Optional source information for the note, with at least
one of the fields ``url``, ``name`` or ``app``. By default, the
source is ``platypush`` and the app is ``tech.platypush``.
:param author: Optional author of the note.
:return: The created note as a dictionary, format:
.. schema:: notes.NoteItemSchema
"""
src = NoteSource(
**(
source
or {
'name': 'platypush',
'app': 'tech.platypush',
}
)
)
note = self._create_note(
title,
content,
*args,
description=description,
collection=collection,
geo=self._parse_geo(geo) if geo else None,
author=author,
source=src,
**kwargs,
)
with self._sync_lock:
# Add the new note to the cache
self._notes[note.id] = note
# If it has a parent collection, add it to the collection's notes
if note.parent and note.parent.id in self._collections:
self._collections[ # pylint: disable=protected-access
note.parent.id
]._notes[note.id] = note
# Trigger the note created event
self._dispatch_events(self._make_event(NoteCreatedEvent, note=note))
return note.to_dict()
[docs]
@action
def edit_note(
self,
note_id: Any,
*args,
title: Optional[str] = None,
content: Optional[str] = None,
description: Optional[str] = None,
collection: Optional[Any] = None,
geo: Optional[Dict[str, Optional[float]]] = None,
source: Optional[dict] = None,
author: Optional[str] = None,
**kwargs,
) -> dict:
"""
Edit an existing note by ID.
:param note_id: The ID of the note to edit.
:param title: New title for the note.
:param content: New content for the note.
:param description: Optional new description for the note.
:param collection: New collection ID to move the note under.
:param geo: Optional geographical coordinates as a dictionary with the
fields ``latitude``, ``longitude``, and ``altitude``.
:param source: Optional source information for the note, with at least
one of the fields ``url``, ``name`` or ``app``.
:param author: Optional author of the note.
:return: The updated note as a dictionary, format:
.. schema:: notes.NoteItemSchema
"""
note = self._edit_note(
note_id,
*args,
title=title,
content=content,
description=description,
collection=collection,
geo=self._parse_geo(geo) if geo else None,
author=author,
source=NoteSource(**source) if source else None,
**kwargs,
)
with self._sync_lock:
# Update the cache with the edited note
self._notes[note.id] = note
# If it has a parent collection, update it in the collection's notes
if note.parent and note.parent.id in self._collections:
self._collections[ # pylint: disable=protected-access
note.parent.id
]._notes[ # pylint: disable=protected-access
note.id
] = note
# Trigger the note updated event
self._dispatch_events(self._make_event(NoteUpdatedEvent, note=note))
return note.to_dict()
[docs]
@action
def delete_note(self, note_id: Any, *args, **kwargs):
"""
Delete a note by ID.
:param note_id: The ID of the note to delete.
"""
self._delete_note(note_id, *args, **kwargs)
with self._sync_lock:
note = self._notes.pop(note_id, None)
if not note:
note = Note(id=note_id, plugin=self._plugin_name, title='')
# Remove the note from its parent collection if it has one
if note.parent and note.parent.id in self._collections:
parent_collection = self._collections[note.parent.id]
parent_collection.notes.remove(note)
# Dispatch the deletion event
self._dispatch_events(self._make_event(NoteDeletedEvent, note=note))
[docs]
@action
def get_collection(self, collection_id: Any, *args, **kwargs) -> dict:
"""
Get a single note collection by ID.
:param collection_id: The ID of the collection to retrieve.
:return: The collection as a dictionary, format:
.. schema:: notes.NoteCollectionSchema
"""
return self._get_collection(collection_id, *args, **kwargs).to_dict()
[docs]
@action
def get_collections(
self,
*args,
limit: Optional[int] = None,
offset: Optional[int] = 0,
sort: Optional[Dict[str, bool]] = None,
filter: Optional[Dict[str, Any]] = None, # pylint: disable=redefined-builtin
fetch: bool = False,
**kwargs,
) -> Iterable[dict]:
"""
Get note collections from the cache or fetch them from the backend.
:param limit: Maximum number of collections to retrieve (default: None, meaning no limit).
:param offset: Offset to start retrieving collections from (default: 0).
:param sort: A dictionary specifying the fields to sort by and their order.
Example: {'created_at': True} sorts by creation date in ascending
order, while {'created_at': False} sorts in descending order.
:param filter: A dictionary specifying filters to apply to the collections, in the form
of a dictionary where the keys are field names and the values are regular expressions
to match against the field values.
:param fetch: If True, always fetch the latest collections from the backend,
regardless of the cache state (default: False).
:param kwargs: Additional keyword arguments to pass to the fetch method.
:return: An iterable of note collections, format:
.. schema:: notes.NoteCollectionSchema(many=True)
"""
return [
collection.to_dict()
for collection in self._get_collections(
*args,
limit=limit,
offset=offset,
sort=sort,
filter=filter,
fetch=fetch,
**kwargs,
)
]
[docs]
@action
def create_collection(
self,
title: str,
*args,
description: Optional[str] = None,
parent: Optional[Any] = None,
**kwargs,
) -> dict:
"""
Create a new note collection with the specified title and description.
:param title: The title of the new collection.
:param description: Optional description for the new collection.
:param parent: Optional parent collection ID to create a sub-collection.
:return: The created collection as a dictionary, format:
.. schema:: notes.NoteCollectionSchema
"""
collection = self._create_collection(
title, *args, description=description, parent=parent, **kwargs
)
with self._sync_lock:
# Add the new collection to the cache
self._collections[collection.id] = collection
# If it has a parent, add it to the parent's collections
if collection.parent and collection.parent.id in self._collections:
parent_collection = self._collections[collection.parent.id]
parent_collection.collections.append(collection)
# Trigger the collection created event
self._dispatch_events(
self._make_event(CollectionCreatedEvent, collection=collection)
)
return collection.to_dict()
[docs]
@action
def edit_collection(
self,
collection_id: Any,
*args,
title: Optional[str] = None,
description: Optional[str] = None,
parent: Optional[Any] = None,
**kwargs,
) -> dict:
"""
Edit an existing note collection by ID.
:param collection_id: The ID of the collection to edit.
:param title: New title for the collection.
:param description: New description for the collection.
:param parent: New parent collection ID to move the collection under.
:return: The updated collection as a dictionary, format:
.. schema:: notes.NoteCollectionSchema
"""
collection = self._edit_collection(
collection_id,
*args,
title=title,
description=description,
parent=parent,
**kwargs,
)
with self._sync_lock:
# Update the cache with the edited collection
old_collection = self._collections.get(collection.id)
self._collections[collection.id] = collection
# If the parent has changed, remove it from the old parent's collections
if (
old_collection
and old_collection.parent
and old_collection.parent != collection.parent
and old_collection.parent.id in self._collections
):
parent_collection = self._collections.get(old_collection.parent.id)
if (
parent_collection
and old_collection in parent_collection.collections
):
parent_collection.collections.remove(old_collection)
# If it has a parent, update it in the parent's collections
if collection.parent and collection.parent.id in self._collections:
parent_collection = self._collections.get(collection.parent.id)
if (
parent_collection
and collection not in parent_collection.collections
):
parent_collection.collections.append(collection)
# Trigger the collection updated event
self._dispatch_events(
self._make_event(CollectionUpdatedEvent, collection=collection)
)
return collection.to_dict()
[docs]
@action
def delete_collection(self, collection_id: Any, *args, **kwargs):
"""
Delete a note collection by ID.
:param collection_id: The ID of the collection to delete.
"""
self._delete_collection(collection_id, *args, **kwargs)
with self._sync_lock:
collection = self._collections.pop(collection_id, None)
if not collection:
collection = NoteCollection(
id=collection_id, plugin=self._plugin_name, title=''
)
# Remove the collection from its parent if it has one
if collection.parent and collection.parent.id in self._collections:
parent_collection = self._collections[collection.parent.id]
parent_collection.collections.remove(collection)
# Dispatch the deletion event
self._dispatch_events(
self._make_event(CollectionDeletedEvent, collection=collection)
)
[docs]
@action
def sync(self, *args, **kwargs):
"""
Synchronize the notes and collections with the backend.
This method is called periodically based on the ``poll_interval`` setting.
If ``poll_interval`` is zero or null, you can manually call this method
to synchronize the notes and collections.
"""
# Wait for the entities engine to start
get_entities_engine().wait_start()
t_start = time()
self.logger.info('Synchronizing notes and collections...')
with self._sync_lock:
# Initialize the latest state from the database if not already done
self._db_init()
prev_notes = self._notes.copy()
prev_collections = self._collections.copy()
# Fetch the latest version of the notes from the backend
notes = {
note.id: note
for note in self._get_notes(
*args, fetch=True, sort={'updated_at': False}, **kwargs
)
}
# Fetch the latest version of the collections from the backend
collections = {
collection.id: collection
for collection in self._get_collections(
*args, fetch=True, sort={'updated_at': False}, **kwargs
)
}
# Get the state delta between the previous and current state
state_delta = self._get_state_delta(
previous_notes=prev_notes,
previous_collections=prev_collections,
notes=notes,
collections=collections,
)
# Re-fetch any notes that have been updated since the last sync
self._refresh_notes(
{**state_delta.notes.added, **state_delta.notes.updated}
)
# Update the local cache with the latest notes and collections
if not state_delta.is_empty():
self.logger.info('Synchronizing changes: %s', state_delta)
self._db_sync(state_delta)
self._last_sync_time = datetime.fromtimestamp(state_delta.latest_updated_at)
self._process_events(state_delta)
self.logger.info(
'Synchronization completed in %.2f seconds',
time() - t_start,
)
[docs]
@action
def reset_sync(self):
"""
Reset the sync state.
1. Reset the last sync time to None, forcing a full resync on the
next call to :meth:`.sync`, which in turn will re-trigger all
notes and collections events.
2. Clear the local notes and collections cache.
"""
self.logger.info('Resetting last sync time')
with self._sync_lock:
self._db_clear()
self._last_sync_time = None
self._notes.clear()
self._collections.clear()
def main(self):
if not self.poll_interval:
# If no poll interval is set then we won't poll for new check-ins
self.wait_stop()
return
while not self.should_stop():
try:
self.sync()
except Exception as e:
self.logger.error('Error during sync: %s', e)
finally:
self.wait_stop(self.poll_interval)