Source code for normlite.txmodel.operations

# normlite/txmodel/operations.py
# Copyright (C) 2025 Gianmarco Antonini
#
# This module is part of normlite and is released under the GNU Affero General Public License.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program.  If not, see <https://www.gnu.org/licenses/>.
"""Provide a protocol for commit operation and all proxy server operations.

.. versionadded:: 0.6.0

"""
from typing import Protocol

from normlite.notion_sdk.client import AbstractNotionClient


[docs] class Operation(Protocol): """Interface for change requests that process data in the context of a transaction. Change requests follow a well-defined protocol to accomplish their task of modifying data in a consistent way. Each operation must define: * :meth:`stage()`: These are the pre-commit activities to validate data prior to committing them. * :meth:`do_commit()`: All activities to commit data to the database. * :meth:`do_rollback()`: All activities to revert changes committed prior to this failed change. """
[docs] def stage(self) -> None: """Stage and validate the data to be committed."""
[docs] def do_commit(self) -> None: """Perform the commit activities associated with this operation."""
[docs] def do_rollback(self) -> None: """Perform the rollback activities associated to this operation."""
[docs] def get_result(self) -> dict: """Return the result of the last executed :class:`Operation.do_commit()` or :class:`Operation.do_rollback()`."""
[docs] class StagedInsert(Operation): """Operation to create a new page in a Notion database.""" def __init__(self, notion: AbstractNotionClient, page_payload: dict, tx_id: str): self.notion = notion self.page_payload = page_payload self.tx_id = tx_id self.page_id = None # Will be set on commit self._result = None # will be set on commit or on rollback
[docs] def stage(self) -> None: # Pre-check — validate payload minimally try: self.page_payload['parent'] self.page_payload['properties'] except KeyError as ke: raise ValueError( f'Invalid Notion page pages.create payload: ' f'Missing "{ke.args[0]}".' )
[docs] def do_commit(self) -> None: self._result = self.notion('pages', 'create', self.page_payload) self.page_id = self._result.get("id")
[docs] def do_rollback(self) -> None: # Only rollback if commit actually happened: page_id is not None if self.page_id: self._result = self.notion('pages', 'update', {'id': self.page_id, 'data': {'archived': True}})
[docs] def get_result(self) -> dict: return self._result
[docs] class StagedSelect(Operation): """Operation to query the database.""" def __init__(self, notion: AbstractNotionClient, payload: dict, tx_id: str): self.notion = notion self.payload = payload self.tx_id = tx_id self._result = None
[docs] def stage(self) -> None: if 'database_id' not in self.payload or 'filter' not in self.payload: raise ValueError('Missing database id or filters or both.')
[docs] def do_commit(self) -> None: self._result = self.notion('databases', 'query', self.payload)
[docs] def do_rollback(self): """Nothing to rollback, ``SELECT`` is non mutating.""" ...
[docs] def get_result(self) -> dict: return self._result
[docs] class StagedCreateTable(Operation): """Operation to create a new table.""" def __init__(self, notion: AbstractNotionClient, payload: dict, tx_id: str): self.notion = notion self.payload = payload self.tx_id = tx_id self.database_id = None # Will be set on commit self._result = None # will be set on commit or on rollback
[docs] def stage(self): if 'parent' not in self.payload: raise ValueError(f'Missing parent object in payload: {self.payload}') if 'page_id' not in self.payload.get('parent'): raise ValueError(f'Missing page_id in parent object: {self.payload.get('parent')}') if 'properties' not in self.payload: raise ValueError(f'Missing properties object in payload: {self.payload}')
[docs] def do_commit(self) -> None: self._result = self.notion('database', 'create', self.payload) self.database_id = self._result.get('id')
[docs] def do_rollback(self) -> None: if self.database_id: # Only rollback if commit actually happened self.notion('')