home · contact · privacy
Slightly improve and re-organize Condition tests. master
authorChristian Heller <c.heller@plomlompom.de>
Sat, 22 Jun 2024 02:53:16 +0000 (04:53 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Sat, 22 Jun 2024 02:53:16 +0000 (04:53 +0200)
18 files changed:
.pylintrc
plomtask/conditions.py
plomtask/days.py
plomtask/db.py
plomtask/http.py
plomtask/processes.py
plomtask/todos.py
plomtask/versioned_attributes.py
scripts/pre-commit
templates/day.html
templates/process.html
templates/todo.html
tests/conditions.py
tests/days.py
tests/misc.py
tests/processes.py
tests/todos.py
tests/utils.py

index b4814d17d477dee4ee5ba2f67f52a04c41ba7b5d..50133a08eb3882a3fd34b04e0a208fb255ef4665 100644 (file)
--- a/.pylintrc
+++ b/.pylintrc
@@ -1,3 +1,3 @@
 [BASIC]
 init-hook='import sys; sys.path.append(".")'
-good-names-rgxs=.*_?do_(GET|POST)(_[a-z]+)?,test_[A-Z]+
+good-names-rgxs=(.*_)?(GET|POST)(_.+)?,,test_[A-Z]+
index d2559272cd876c5b071b59437998e3356409cd07..15dcb9df623c60378485632ce3bebc4c30f03d47 100644 (file)
@@ -1,7 +1,5 @@
 """Non-doable elements of ProcessStep/Todo chains."""
 from __future__ import annotations
-from typing import Any
-from sqlite3 import Row
 from plomtask.db import DatabaseConnection, BaseModel
 from plomtask.versioned_attributes import VersionedAttribute
 from plomtask.exceptions import HandledException
@@ -13,6 +11,9 @@ class Condition(BaseModel[int]):
     to_save = ['is_active']
     to_save_versioned = ['title', 'description']
     to_search = ['title.newest', 'description.newest']
+    can_create_by_id = True
+    sorters = {'is_active': lambda c: c.is_active,
+               'title': lambda c: c.title.newest}
 
     def __init__(self, id_: int | None, is_active: bool = False) -> None:
         super().__init__(id_)
@@ -21,30 +22,20 @@ class Condition(BaseModel[int]):
         self.description = VersionedAttribute(self, 'condition_descriptions',
                                               '')
 
-    @classmethod
-    def from_table_row(cls, db_conn: DatabaseConnection,
-                       row: Row | list[Any]) -> Condition:
-        """Build condition from row, including VersionedAttributes."""
-        condition = super().from_table_row(db_conn, row)
-        for name in ('title', 'description'):
-            table_name = f'condition_{name}s'
-            for row_ in db_conn.row_where(table_name, 'parent', row[0]):
-                getattr(condition, name).history_from_row(row_)
-        return condition
-
     def remove(self, db_conn: DatabaseConnection) -> None:
         """Remove from DB, with VersionedAttributes.
 
         Checks for Todos and Processes that depend on Condition, prohibits
         deletion if found.
         """
-        if self.id_ is None:
-            raise HandledException('cannot remove unsaved item')
-        for item in ('process', 'todo'):
-            for attr in ('conditions', 'blockers', 'enables', 'disables'):
-                table_name = f'{item}_{attr}'
-                for _ in db_conn.row_where(table_name, 'condition', self.id_):
-                    raise HandledException('cannot remove Condition in use')
+        if self.id_ is not None:
+            for item in ('process', 'todo'):
+                for attr in ('conditions', 'blockers', 'enables', 'disables'):
+                    table_name = f'{item}_{attr}'
+                    for _ in db_conn.row_where(table_name, 'condition',
+                                               self.id_):
+                        msg = 'cannot remove Condition in use'
+                        raise HandledException(msg)
         super().remove(db_conn)
 
 
index afe4a01be6f509a8b624da7c45650500a96805e2..23201301bbe792042a361d3f970415c622d80627 100644 (file)
@@ -12,6 +12,8 @@ class Day(BaseModel[str]):
     """Individual days defined by their dates."""
     table_name = 'days'
     to_save = ['comment']
+    add_to_dict = ['todos']
+    can_create_by_id = True
 
     def __init__(self, date: str, comment: str = '') -> None:
         id_ = valid_date(date)
@@ -33,13 +35,16 @@ class Day(BaseModel[str]):
         return day
 
     @classmethod
-    def by_id(cls,
-              db_conn: DatabaseConnection, id_: str | None,
-              create: bool = False,
-              ) -> Day:
-        """Extend BaseModel.by_id checking for new/lost .todos."""
-        day = super().by_id(db_conn, id_, create)
-        assert day.id_ is not None
+    def by_id(cls, db_conn: DatabaseConnection, id_: str) -> Day:
+        """Extend BaseModel.by_id
+
+        Checks Todo.days_to_update if we need to a retrieved Day's .todos,
+        and also ensures we're looking for proper dates and not strings like
+        "yesterday" by enforcing the valid_date translation.
+        """
+        assert isinstance(id_, str)
+        possibly_translated_date = valid_date(id_)
+        day = super().by_id(db_conn, possibly_translated_date)
         if day.id_ in Todo.days_to_update:
             Todo.days_to_update.remove(day.id_)
             day.todos = Todo.by_date(db_conn, day.id_)
index 99998a6ab29f760ba0d62f90395739dad4b521ff..13cdaef5b9c7d3e992f8c92730a9979b9eee2d73 100644 (file)
@@ -4,7 +4,7 @@ from os import listdir
 from os.path import isfile
 from difflib import Differ
 from sqlite3 import connect as sql_connect, Cursor, Row
-from typing import Any, Self, TypeVar, Generic
+from typing import Any, Self, TypeVar, Generic, Callable
 from plomtask.exceptions import HandledException, NotFoundException
 from plomtask.dating import valid_date
 
@@ -235,10 +235,13 @@ class BaseModel(Generic[BaseModelId]):
     to_save: list[str] = []
     to_save_versioned: list[str] = []
     to_save_relations: list[tuple[str, str, str, int]] = []
+    add_to_dict: list[str] = []
     id_: None | BaseModelId
     cache_: dict[BaseModelId, Self]
     to_search: list[str] = []
+    can_create_by_id = False
     _exists = True
+    sorters: dict[str, Callable[..., Any]] = {}
 
     def __init__(self, id_: BaseModelId | None) -> None:
         if isinstance(id_, int) and id_ < 1:
@@ -271,6 +274,84 @@ class BaseModel(Generic[BaseModelId]):
         assert isinstance(other.id_, int)
         return self.id_ < other.id_
 
+    @property
+    def as_dict(self) -> dict[str, object]:
+        """Return self as (json.dumps-compatible) dict."""
+        library: dict[str, dict[str | int, object]] = {}
+        d: dict[str, object] = {'id': self.id_, '_library': library}
+        for to_save in self.to_save:
+            attr = getattr(self, to_save)
+            if hasattr(attr, 'as_dict_into_reference'):
+                d[to_save] = attr.as_dict_into_reference(library)
+            else:
+                d[to_save] = attr
+        if len(self.to_save_versioned) > 0:
+            d['_versioned'] = {}
+        for k in self.to_save_versioned:
+            attr = getattr(self, k)
+            assert isinstance(d['_versioned'], dict)
+            d['_versioned'][k] = attr.history
+        for r in self.to_save_relations:
+            attr_name = r[2]
+            l: list[int | str] = []
+            for rel in getattr(self, attr_name):
+                l += [rel.as_dict_into_reference(library)]
+            d[attr_name] = l
+        for k in self.add_to_dict:
+            d[k] = [x.as_dict_into_reference(library)
+                    for x in getattr(self, k)]
+        return d
+
+    def as_dict_into_reference(self,
+                               library: dict[str, dict[str | int, object]]
+                               ) -> int | str:
+        """Return self.id_ while writing .as_dict into library."""
+        def into_library(library: dict[str, dict[str | int, object]],
+                         cls_name: str,
+                         id_: str | int,
+                         d: dict[str, object]
+                         ) -> None:
+            if cls_name not in library:
+                library[cls_name] = {}
+            if id_ in library[cls_name]:
+                if library[cls_name][id_] != d:
+                    msg = 'Unexpected inequality of entries for ' +\
+                            f'_library at: {cls_name}/{id_}'
+                    raise HandledException(msg)
+            else:
+                library[cls_name][id_] = d
+        as_dict = self.as_dict
+        assert isinstance(as_dict['_library'], dict)
+        for cls_name, dict_of_objs in as_dict['_library'].items():
+            for id_, obj in dict_of_objs.items():
+                into_library(library, cls_name, id_, obj)
+        del as_dict['_library']
+        assert self.id_ is not None
+        into_library(library, self.__class__.__name__, self.id_, as_dict)
+        assert isinstance(as_dict['id'], (int, str))
+        return as_dict['id']
+
+    @classmethod
+    def name_lowercase(cls) -> str:
+        """Convenience method to return cls' name in lowercase."""
+        return cls.__name__.lower()
+
+    @classmethod
+    def sort_by(cls, seq: list[Any], sort_key: str, default: str = 'title'
+                ) -> str:
+        """Sort cls list by cls.sorters[sort_key] (reverse if '-'-prefixed)."""
+        reverse = False
+        if len(sort_key) > 1 and '-' == sort_key[0]:
+            sort_key = sort_key[1:]
+            reverse = True
+        if sort_key not in cls.sorters:
+            sort_key = default
+        sorter: Callable[..., Any] = cls.sorters[sort_key]
+        seq.sort(key=sorter, reverse=reverse)
+        if reverse:
+            sort_key = f'-{sort_key}'
+        return sort_key
+
     # cache management
     # (we primarily use the cache to ensure we work on the same object in
     # memory no matter where and how we retrieve it, e.g. we don't want
@@ -295,7 +376,13 @@ class BaseModel(Generic[BaseModelId]):
 
     @classmethod
     def empty_cache(cls) -> None:
-        """Empty class's cache."""
+        """Empty class's cache, and disappear all former inhabitants."""
+        # pylint: disable=protected-access
+        # (cause we remain within the class)
+        if hasattr(cls, 'cache_'):
+            to_disappear = list(cls.cache_.values())
+            for item in to_disappear:
+                item._disappear()
         cls.cache_ = {}
 
     @classmethod
@@ -310,15 +397,14 @@ class BaseModel(Generic[BaseModelId]):
     def _get_cached(cls: type[BaseModelInstance],
                     id_: BaseModelId) -> BaseModelInstance | None:
         """Get object of id_ from class's cache, or None if not found."""
-        # pylint: disable=consider-iterating-dictionary
         cache = cls.get_cache()
-        if id_ in cache.keys():
+        if id_ in cache:
             obj = cache[id_]
             assert isinstance(obj, cls)
             return obj
         return None
 
-    def _cache(self) -> None:
+    def cache(self) -> None:
         """Update object in class's cache.
 
         Also calls ._disappear if cache holds older reference to object of same
@@ -349,22 +435,23 @@ class BaseModel(Generic[BaseModelId]):
                        # pylint: disable=unused-argument
                        db_conn: DatabaseConnection,
                        row: Row | list[Any]) -> BaseModelInstance:
-        """Make from DB row, update DB cache with it."""
+        """Make from DB row (sans relations), update DB cache with it."""
         obj = cls(*row)
-        obj._cache()
+        assert obj.id_ is not None
+        for attr_name in cls.to_save_versioned:
+            attr = getattr(obj, attr_name)
+            table_name = attr.table_name
+            for row_ in db_conn.row_where(table_name, 'parent', obj.id_):
+                attr.history_from_row(row_)
+        obj.cache()
         return obj
 
     @classmethod
-    def by_id(cls, db_conn: DatabaseConnection,
-              id_: BaseModelId | None,
-              # pylint: disable=unused-argument
-              create: bool = False) -> Self:
+    def by_id(cls, db_conn: DatabaseConnection, id_: BaseModelId) -> Self:
         """Retrieve by id_, on failure throw NotFoundException.
 
         First try to get from cls.cache_, only then check DB; if found,
         put into cache.
-
-        If create=True, make anew (but do not cache yet).
         """
         obj = None
         if id_ is not None:
@@ -375,11 +462,22 @@ class BaseModel(Generic[BaseModelId]):
                     break
         if obj:
             return obj
-        if create:
-            obj = cls(id_)
-            return obj
         raise NotFoundException(f'found no object of ID {id_}')
 
+    @classmethod
+    def by_id_or_create(cls, db_conn: DatabaseConnection,
+                        id_: BaseModelId | None
+                        ) -> Self:
+        """Wrapper around .by_id, creating (not caching/saving) if not find."""
+        if not cls.can_create_by_id:
+            raise HandledException('Class cannot .by_id_or_create.')
+        if id_ is None:
+            return cls(None)
+        try:
+            return cls.by_id(db_conn, id_)
+        except NotFoundException:
+            return cls(id_)
+
     @classmethod
     def all(cls: type[BaseModelInstance],
             db_conn: DatabaseConnection) -> list[BaseModelInstance]:
@@ -465,7 +563,7 @@ class BaseModel(Generic[BaseModelId]):
                                       values)
         if not isinstance(self.id_, str):
             self.id_ = cursor.lastrowid  # type: ignore[assignment]
-        self._cache()
+        self.cache()
         for attr_name in self.to_save_versioned:
             getattr(self, attr_name).save(db_conn)
         for table, column, attr_name, key_index in self.to_save_relations:
index 26c8b719fec481e4106d66073a85844a1d16491a..b7040f76fa9c3c1d58b0ceedc58fabf02752f616 100644 (file)
@@ -1,17 +1,19 @@
 """Web server stuff."""
 from __future__ import annotations
 from dataclasses import dataclass
-from typing import Any, Callable, Mapping
+from typing import Any, Callable
 from base64 import b64encode, b64decode
+from binascii import Error as binascii_Exception
 from http.server import BaseHTTPRequestHandler
 from http.server import HTTPServer
 from urllib.parse import urlparse, parse_qs
+from json import dumps as json_dumps
 from os.path import split as path_split
 from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
 from plomtask.dating import date_in_n_days
 from plomtask.days import Day
-from plomtask.exceptions import HandledException, BadFormatException, \
-        NotFoundException
+from plomtask.exceptions import (HandledException, BadFormatException,
+                                 NotFoundException)
 from plomtask.db import DatabaseConnection, DatabaseFile
 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
 from plomtask.conditions import Condition
@@ -27,7 +29,47 @@ class TaskServer(HTTPServer):
                  *args: Any, **kwargs: Any) -> None:
         super().__init__(*args, **kwargs)
         self.db = db_file
-        self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
+        self.headers: list[tuple[str, str]] = []
+        self._render_mode = 'html'
+        self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
+
+    def set_json_mode(self) -> None:
+        """Make server send JSON instead of HTML responses."""
+        self._render_mode = 'json'
+        self.headers += [('Content-Type', 'application/json')]
+
+    @staticmethod
+    def ctx_to_json(ctx: dict[str, object]) -> str:
+        """Render ctx into JSON string."""
+        def walk_ctx(node: object) -> Any:
+            if hasattr(node, 'as_dict_into_reference'):
+                if hasattr(node, 'id_') and node.id_ is not None:
+                    return node.as_dict_into_reference(library)
+            if hasattr(node, 'as_dict'):
+                return node.as_dict
+            if isinstance(node, (list, tuple)):
+                return [walk_ctx(x) for x in node]
+            if isinstance(node, dict):
+                d = {}
+                for k, v in node.items():
+                    d[k] = walk_ctx(v)
+                return d
+            if isinstance(node, HandledException):
+                return str(node)
+            return node
+        library: dict[str, dict[str | int, object]] = {}
+        for k, v in ctx.items():
+            ctx[k] = walk_ctx(v)
+        ctx['_library'] = library
+        return json_dumps(ctx)
+
+    def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
+        """Render ctx according to self._render_mode.."""
+        tmpl_name = f'{tmpl_name}.{self._render_mode}'
+        if 'html' == self._render_mode:
+            template = self._jinja.get_template(tmpl_name)
+            return template.render(ctx)
+        return self.__class__.ctx_to_json(ctx)
 
 
 class InputsParser:
@@ -96,6 +138,20 @@ class InputsParser:
             msg = f'cannot int a form field value for key {key} in: {all_str}'
             raise BadFormatException(msg) from e
 
+    def get_all_floats_or_nones(self, key: str) -> list[float | None]:
+        """Retrieve list of float value at key, None if empty strings."""
+        ret: list[float | None] = []
+        for val in self.get_all_str(key):
+            if '' == val:
+                ret += [None]
+            else:
+                try:
+                    ret += [float(val)]
+                except ValueError as e:
+                    msg = f'cannot float form field value for key {key}: {val}'
+                    raise BadFormatException(msg) from e
+        return ret
+
 
 class TaskHandler(BaseHTTPRequestHandler):
     """Handles single HTTP request."""
@@ -106,20 +162,54 @@ class TaskHandler(BaseHTTPRequestHandler):
     _form_data: InputsParser
     _params: InputsParser
 
-    def _send_html(self,
+    def _send_page(self,
+                   ctx: dict[str, Any],
                    tmpl_name: str,
-                   ctx: Mapping[str, object],
-                   code: int = 200) -> None:
-        """Send HTML as proper HTTP response."""
-        tmpl = self.server.jinja.get_template(tmpl_name)
-        html = tmpl.render(ctx)
+                   code: int = 200
+                   ) -> None:
+        """Send ctx as proper HTTP response."""
+        body = self.server.render(ctx, tmpl_name)
         self.send_response(code)
+        for header_tuple in self.server.headers:
+            self.send_header(*header_tuple)
         self.end_headers()
-        self.wfile.write(bytes(html, 'utf-8'))
+        self.wfile.write(bytes(body, 'utf-8'))
 
     @staticmethod
     def _request_wrapper(http_method: str, not_found_msg: str
                          ) -> Callable[..., Callable[[TaskHandler], None]]:
+        """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
+
+        Among other things, conditionally cleans all caches, but only on POST
+        requests, as only those are expected to change the states of objects
+        that may be cached, and certainly only those are expected to write any
+        changes to the database. We want to call them as early though as
+        possible here, either exactly after the specific request handler
+        returns successfully, or right after any exception is triggered –
+        otherwise, race conditions become plausible.
+
+        Note that otherwise any POST attempt, even a failed one, may end in
+        problematic inconsistencies:
+
+        - if the POST handler experiences an Exception, changes to objects
+          won't get written to the DB, but the changed objects may remain in
+          the cache and affect other objects despite their possibly illegal
+          state
+
+        - even if an object was just saved to the DB, we cannot be sure its
+          current state is completely identical to what we'd get if loading it
+          fresh from the DB (e.g. currently Process.n_owners is only updated
+          when loaded anew via .from_table_row, nor is its state written to
+          the DB by .save; a questionable design choice, but proof that we
+          have no guarantee that objects' .save stores all their states we'd
+          prefer at their most up-to-date.
+        """
+
+        def clear_caches() -> None:
+            for cls in (Day, Todo, Condition, Process, ProcessStep):
+                assert hasattr(cls, 'empty_cache')
+                cls.empty_cache()
+
         def decorator(f: Callable[..., str | None]
                       ) -> Callable[[TaskHandler], None]:
             def wrapper(self: TaskHandler) -> None:
@@ -136,6 +226,8 @@ class TaskHandler(BaseHTTPRequestHandler):
                     if hasattr(self, handler_name):
                         handler = getattr(self, handler_name)
                         redir_target = f(self, handler)
+                        if 'POST' == http_method:
+                            clear_caches()
                         if redir_target:
                             self.send_response(302)
                             self.send_header('Location', redir_target)
@@ -144,11 +236,10 @@ class TaskHandler(BaseHTTPRequestHandler):
                         msg = f'{not_found_msg}: {self._site}'
                         raise NotFoundException(msg)
                 except HandledException as error:
-                    for cls in (Day, Todo, Condition, Process, ProcessStep):
-                        assert hasattr(cls, 'empty_cache')
-                        cls.empty_cache()
+                    if 'POST' == http_method:
+                        clear_caches()
                     ctx = {'msg': error}
-                    self._send_html('msg.html', ctx, error.http_code)
+                    self._send_page(ctx, 'msg', error.http_code)
                 finally:
                     self.conn.close()
             return wrapper
@@ -158,11 +249,11 @@ class TaskHandler(BaseHTTPRequestHandler):
     def do_GET(self, handler: Callable[[], str | dict[str, object]]
                ) -> str | None:
         """Render page with result of handler, or redirect if result is str."""
-        tmpl_name = f'{self._site}.html'
-        ctx_or_redir = handler()
-        if isinstance(ctx_or_redir, str):
-            return ctx_or_redir
-        self._send_html(tmpl_name, ctx_or_redir)
+        tmpl_name = f'{self._site}'
+        ctx_or_redir_target = handler()
+        if isinstance(ctx_or_redir_target, str):
+            return ctx_or_redir_target
+        self._send_page(ctx_or_redir_target, tmpl_name)
         return None
 
     @_request_wrapper('POST', 'Unknown POST target')
@@ -178,6 +269,25 @@ class TaskHandler(BaseHTTPRequestHandler):
 
     # GET handlers
 
+    @staticmethod
+    def _get_item(target_class: Any
+                  ) -> Callable[..., Callable[[TaskHandler],
+                                              dict[str, object]]]:
+        def decorator(f: Callable[..., dict[str, object]]
+                      ) -> Callable[[TaskHandler], dict[str, object]]:
+            def wrapper(self: TaskHandler) -> dict[str, object]:
+                # pylint: disable=protected-access
+                # (because pylint here fails to detect the use of wrapper as a
+                # method to self with respective access privileges)
+                id_ = self._params.get_int_or_none('id')
+                if target_class.can_create_by_id:
+                    item = target_class.by_id_or_create(self.conn, id_)
+                else:
+                    item = target_class.by_id(self.conn, id_)
+                return f(self, item)
+            return wrapper
+        return decorator
+
     def do_GET_(self) -> str:
         """Return redirect target on GET /."""
         return '/day'
@@ -210,7 +320,7 @@ class TaskHandler(BaseHTTPRequestHandler):
     def do_GET_day(self) -> dict[str, object]:
         """Show single Day of ?date=."""
         date = self._params.get_str('date', date_in_n_days(0))
-        day = Day.by_id(self.conn, date, create=True)
+        day = Day.by_id_or_create(self.conn, date)
         make_type = self._params.get_str('make_type')
         conditions_present = []
         enablers_for = {}
@@ -236,7 +346,8 @@ class TaskHandler(BaseHTTPRequestHandler):
                 'conditions_present': conditions_present,
                 'processes': Process.all(self.conn)}
 
-    def do_GET_todo(self) -> dict[str, object]:
+    @_get_item(Todo)
+    def do_GET_todo(self, todo: Todo) -> dict[str, object]:
         """Show single Todo of ?id=."""
 
         @dataclass
@@ -287,8 +398,6 @@ class TaskHandler(BaseHTTPRequestHandler):
                 ids = ids | collect_adoptables_keys(node.children)
             return ids
 
-        id_ = self._params.get_int('id')
-        todo = Todo.by_id(self.conn, id_)
         todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
         process_tree = todo.process.get_steps(self.conn, None)
         steps_todo_to_process: list[TodoStepsNode] = []
@@ -301,7 +410,8 @@ class TaskHandler(BaseHTTPRequestHandler):
         adoptables: dict[int, list[Todo]] = {}
         any_adoptables = [Todo.by_id(self.conn, t.id_)
                           for t in Todo.by_date(self.conn, todo.date)
-                          if t != todo]
+                          if t.id_ is not None
+                          and t != todo]
         for id_ in collect_adoptables_keys(steps_todo_to_process):
             adoptables[id_] = [t for t in any_adoptables
                                if t.process.id_ == id_]
@@ -324,22 +434,7 @@ class TaskHandler(BaseHTTPRequestHandler):
         todos = [t for t in todos_by_date_range
                  if comment_pattern in t.comment
                  and ((not process_id) or t.process.id_ == process_id)]
-        if sort_by == 'doneness':
-            todos.sort(key=lambda t: t.is_done)
-        elif sort_by == '-doneness':
-            todos.sort(key=lambda t: t.is_done, reverse=True)
-        elif sort_by == 'title':
-            todos.sort(key=lambda t: t.title_then)
-        elif sort_by == '-title':
-            todos.sort(key=lambda t: t.title_then, reverse=True)
-        elif sort_by == 'comment':
-            todos.sort(key=lambda t: t.comment)
-        elif sort_by == '-comment':
-            todos.sort(key=lambda t: t.comment, reverse=True)
-        elif sort_by == '-date':
-            todos.sort(key=lambda t: t.date, reverse=True)
-        else:
-            todos.sort(key=lambda t: t.date)
+        sort_by = Todo.sort_by(todos, sort_by)
         return {'start': start, 'end': end, 'process_id': process_id,
                 'comment_pattern': comment_pattern, 'todos': todos,
                 'all_processes': Process.all(self.conn), 'sort_by': sort_by}
@@ -347,24 +442,16 @@ class TaskHandler(BaseHTTPRequestHandler):
     def do_GET_conditions(self) -> dict[str, object]:
         """Show all Conditions."""
         pattern = self._params.get_str('pattern')
-        conditions = Condition.matching(self.conn, pattern)
         sort_by = self._params.get_str('sort_by')
-        if sort_by == 'is_active':
-            conditions.sort(key=lambda c: c.is_active)
-        elif sort_by == '-is_active':
-            conditions.sort(key=lambda c: c.is_active, reverse=True)
-        elif sort_by == '-title':
-            conditions.sort(key=lambda c: c.title.newest, reverse=True)
-        else:
-            conditions.sort(key=lambda c: c.title.newest)
+        conditions = Condition.matching(self.conn, pattern)
+        sort_by = Condition.sort_by(conditions, sort_by)
         return {'conditions': conditions,
                 'sort_by': sort_by,
                 'pattern': pattern}
 
-    def do_GET_condition(self) -> dict[str, object]:
+    @_get_item(Condition)
+    def do_GET_condition(self, c: Condition) -> dict[str, object]:
         """Show Condition of ?id=."""
-        id_ = self._params.get_int_or_none('id')
-        c = Condition.by_id(self.conn, id_, create=True)
         ps = Process.all(self.conn)
         return {'condition': c, 'is_new': c.id_ is None,
                 'enabled_processes': [p for p in ps if c in p.conditions],
@@ -372,31 +459,35 @@ class TaskHandler(BaseHTTPRequestHandler):
                 'enabling_processes': [p for p in ps if c in p.enables],
                 'disabling_processes': [p for p in ps if c in p.disables]}
 
-    def do_GET_condition_titles(self) -> dict[str, object]:
+    @_get_item(Condition)
+    def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
         """Show title history of Condition of ?id=."""
-        id_ = self._params.get_int_or_none('id')
-        condition = Condition.by_id(self.conn, id_)
-        return {'condition': condition}
+        return {'condition': c}
 
-    def do_GET_condition_descriptions(self) -> dict[str, object]:
+    @_get_item(Condition)
+    def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
         """Show description historys of Condition of ?id=."""
-        id_ = self._params.get_int_or_none('id')
-        condition = Condition.by_id(self.conn, id_)
-        return {'condition': condition}
+        return {'condition': c}
 
-    def do_GET_process(self) -> dict[str, object]:
+    @_get_item(Process)
+    def do_GET_process(self, process: Process) -> dict[str, object]:
         """Show Process of ?id=."""
-        id_ = self._params.get_int_or_none('id')
-        process = Process.by_id(self.conn, id_, create=True)
+        owner_ids = self._params.get_all_int('step_to')
+        owned_ids = self._params.get_all_int('has_step')
         title_64 = self._params.get_str('title_b64')
         if title_64:
-            title = b64decode(title_64.encode()).decode()
+            try:
+                title = b64decode(title_64.encode()).decode()
+            except binascii_Exception as exc:
+                msg = 'invalid base64 for ?title_b64='
+                raise BadFormatException(msg) from exc
             process.title.set(title)
+        preset_top_step = None
         owners = process.used_as_step_by(self.conn)
-        for step_id in self._params.get_all_int('step_to'):
+        for step_id in owner_ids:
             owners += [Process.by_id(self.conn, step_id)]
-        preset_top_step = None
-        for process_id in self._params.get_all_int('has_step'):
+        for process_id in owned_ids:
+            Process.by_id(self.conn, process_id)  # to ensure ID exists
             preset_top_step = process_id
         return {'process': process, 'is_new': process.id_ is None,
                 'preset_top_step': preset_top_step,
@@ -405,49 +496,57 @@ class TaskHandler(BaseHTTPRequestHandler):
                 'process_candidates': Process.all(self.conn),
                 'condition_candidates': Condition.all(self.conn)}
 
-    def do_GET_process_titles(self) -> dict[str, object]:
+    @_get_item(Process)
+    def do_GET_process_titles(self, p: Process) -> dict[str, object]:
         """Show title history of Process of ?id=."""
-        id_ = self._params.get_int_or_none('id')
-        process = Process.by_id(self.conn, id_)
-        return {'process': process}
+        return {'process': p}
 
-    def do_GET_process_descriptions(self) -> dict[str, object]:
+    @_get_item(Process)
+    def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
         """Show description historys of Process of ?id=."""
-        id_ = self._params.get_int_or_none('id')
-        process = Process.by_id(self.conn, id_)
-        return {'process': process}
+        return {'process': p}
 
-    def do_GET_process_efforts(self) -> dict[str, object]:
+    @_get_item(Process)
+    def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
         """Show default effort history of Process of ?id=."""
-        id_ = self._params.get_int_or_none('id')
-        process = Process.by_id(self.conn, id_)
-        return {'process': process}
+        return {'process': p}
 
     def do_GET_processes(self) -> dict[str, object]:
         """Show all Processes."""
         pattern = self._params.get_str('pattern')
-        processes = Process.matching(self.conn, pattern)
         sort_by = self._params.get_str('sort_by')
-        if sort_by == 'steps':
-            processes.sort(key=lambda p: len(p.explicit_steps))
-        elif sort_by == '-steps':
-            processes.sort(key=lambda p: len(p.explicit_steps), reverse=True)
-        elif sort_by == 'owners':
-            processes.sort(key=lambda p: p.n_owners or 0)
-        elif sort_by == '-owners':
-            processes.sort(key=lambda p: p.n_owners or 0, reverse=True)
-        elif sort_by == 'effort':
-            processes.sort(key=lambda p: p.effort.newest)
-        elif sort_by == '-effort':
-            processes.sort(key=lambda p: p.effort.newest, reverse=True)
-        elif sort_by == '-title':
-            processes.sort(key=lambda p: p.title.newest, reverse=True)
-        else:
-            processes.sort(key=lambda p: p.title.newest)
+        processes = Process.matching(self.conn, pattern)
+        sort_by = Process.sort_by(processes, sort_by)
         return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
 
     # POST handlers
 
+    @staticmethod
+    def _delete_or_post(target_class: Any, redir_target: str = '/'
+                        ) -> Callable[..., Callable[[TaskHandler], str]]:
+        def decorator(f: Callable[..., str]
+                      ) -> Callable[[TaskHandler], str]:
+            def wrapper(self: TaskHandler) -> str:
+                # pylint: disable=protected-access
+                # (because pylint here fails to detect the use of wrapper as a
+                # method to self with respective access privileges)
+                id_ = self._params.get_int_or_none('id')
+                for _ in self._form_data.get_all_str('delete'):
+                    if id_ is None:
+                        msg = 'trying to delete non-saved ' +\
+                                f'{target_class.__name__}'
+                        raise NotFoundException(msg)
+                    item = target_class.by_id(self.conn, id_)
+                    item.remove(self.conn)
+                    return redir_target
+                if target_class.can_create_by_id:
+                    item = target_class.by_id_or_create(self.conn, id_)
+                else:
+                    item = target_class.by_id(self.conn, id_)
+                return f(self, item)
+            return wrapper
+        return decorator
+
     def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
         """Update history timestamps for VersionedAttribute."""
         id_ = self._params.get_int_or_none('id')
@@ -458,50 +557,61 @@ class TaskHandler(BaseHTTPRequestHandler):
             if old[19:] != v:
                 attr.reset_timestamp(old, f'{v}.0')
         attr.save(self.conn)
-        cls_name = cls.__name__.lower()
-        return f'/{cls_name}_{attr_name}s?id={item.id_}'
+        return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
 
     def do_POST_day(self) -> str:
         """Update or insert Day of date and Todos mapped to it."""
+        # pylint: disable=too-many-locals
         date = self._params.get_str('date')
-        day = Day.by_id(self.conn, date, create=True)
-        day.comment = self._form_data.get_str('day_comment')
-        day.save(self.conn)
+        day_comment = self._form_data.get_str('day_comment')
         make_type = self._form_data.get_str('make_type')
-        for process_id in sorted(self._form_data.get_all_int('new_todo')):
+        old_todos = self._form_data.get_all_int('todo_id')
+        new_todos = self._form_data.get_all_int('new_todo')
+        comments = self._form_data.get_all_str('comment')
+        efforts = self._form_data.get_all_floats_or_nones('effort')
+        done_todos = self._form_data.get_all_int('done')
+        for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
+            raise BadFormatException('"done" field refers to unknown Todo')
+        is_done = [t_id in done_todos for t_id in old_todos]
+        if not (len(old_todos) == len(is_done) == len(comments)
+                == len(efforts)):
+            msg = 'not equal number each of number of todo_id, comments, ' +\
+                    'and efforts inputs'
+            raise BadFormatException(msg)
+        day = Day.by_id_or_create(self.conn, date)
+        day.comment = day_comment
+        day.save(self.conn)
+        for process_id in sorted(new_todos):
             if 'empty' == make_type:
                 process = Process.by_id(self.conn, process_id)
                 todo = Todo(None, process, False, date)
                 todo.save(self.conn)
             else:
                 Todo.create_with_children(self.conn, process_id, date)
-        done_ids = self._form_data.get_all_int('done')
-        comments = self._form_data.get_all_str('comment')
-        efforts = self._form_data.get_all_str('effort')
-        for i, todo_id in enumerate(self._form_data.get_all_int('todo_id')):
+        for i, todo_id in enumerate(old_todos):
             todo = Todo.by_id(self.conn, todo_id)
-            todo.is_done = todo_id in done_ids
-            if len(comments) > 0:
-                todo.comment = comments[i]
-            if len(efforts) > 0:
-                todo.effort = float(efforts[i]) if efforts[i] else None
+            todo.is_done = is_done[i]
+            todo.comment = comments[i]
+            todo.effort = efforts[i]
             todo.save(self.conn)
         return f'/day?date={date}&make_type={make_type}'
 
-    def do_POST_todo(self) -> str:
+    @_delete_or_post(Todo, '/')
+    def do_POST_todo(self, todo: Todo) -> str:
         """Update Todo and its children."""
         # pylint: disable=too-many-locals
-        # pylint: disable=too-many-branches
-        id_ = self._params.get_int('id')
-        for _ in self._form_data.get_all_str('delete'):
-            todo = Todo .by_id(self.conn, id_)
-            todo.remove(self.conn)
-            return '/'
-        todo = Todo.by_id(self.conn, id_)
         adopted_child_ids = self._form_data.get_all_int('adopt')
         processes_to_make_full = self._form_data.get_all_int('make_full')
         processes_to_make_empty = self._form_data.get_all_int('make_empty')
         fill_fors = self._form_data.get_first_strings_starting('fill_for_')
+        effort = self._form_data.get_str('effort', ignore_strict=True)
+        conditions = self._form_data.get_all_int('conditions')
+        disables = self._form_data.get_all_int('disables')
+        blockers = self._form_data.get_all_int('blockers')
+        enables = self._form_data.get_all_int('enables')
+        is_done = len(self._form_data.get_all_str('done')) > 0
+        calendarize = len(self._form_data.get_all_str('calendarize')) > 0
+        comment = self._form_data.get_str('comment', ignore_strict=True)
         for v in fill_fors.values():
             if v.startswith('make_empty_'):
                 processes_to_make_empty += [int(v[11:])]
@@ -530,16 +640,14 @@ class TaskHandler(BaseHTTPRequestHandler):
         for process_id in processes_to_make_full:
             made = Todo.create_with_children(self.conn, process_id, todo.date)
             todo.add_child(made)
-        effort = self._form_data.get_str('effort', ignore_strict=True)
         todo.effort = float(effort) if effort else None
-        todo.set_conditions(self.conn,
-                            self._form_data.get_all_int('condition'))
-        todo.set_blockers(self.conn, self._form_data.get_all_int('blocker'))
-        todo.set_enables(self.conn, self._form_data.get_all_int('enables'))
-        todo.set_disables(self.conn, self._form_data.get_all_int('disables'))
-        todo.is_done = len(self._form_data.get_all_str('done')) > 0
-        todo.calendarize = len(self._form_data.get_all_str('calendarize')) > 0
-        todo.comment = self._form_data.get_str('comment', ignore_strict=True)
+        todo.set_conditions(self.conn, conditions)
+        todo.set_blockers(self.conn, blockers)
+        todo.set_enables(self.conn, enables)
+        todo.set_disables(self.conn, disables)
+        todo.is_done = is_done
+        todo.calendarize = calendarize
+        todo.comment = comment
         todo.save(self.conn)
         return f'/todo?id={todo.id_}'
 
@@ -555,60 +663,70 @@ class TaskHandler(BaseHTTPRequestHandler):
         """Update history timestamps for Process.title."""
         return self._change_versioned_timestamps(Process, 'title')
 
-    def do_POST_process(self) -> str:
+    @_delete_or_post(Process, '/processes')
+    def do_POST_process(self, process: Process) -> str:
         """Update or insert Process of ?id= and fields defined in postvars."""
-        # pylint: disable=too-many-branches
-        id_ = self._params.get_int_or_none('id')
-        for _ in self._form_data.get_all_str('delete'):
-            process = Process.by_id(self.conn, id_)
-            process.remove(self.conn)
-            return '/processes'
-        process = Process.by_id(self.conn, id_, create=True)
-        process.title.set(self._form_data.get_str('title'))
-        process.description.set(self._form_data.get_str('description'))
-        process.effort.set(self._form_data.get_float('effort'))
-        process.set_conditions(self.conn,
-                               self._form_data.get_all_int('condition'))
-        process.set_blockers(self.conn, self._form_data.get_all_int('blocker'))
-        process.set_enables(self.conn, self._form_data.get_all_int('enables'))
-        process.set_disables(self.conn,
-                             self._form_data.get_all_int('disables'))
-        process.calendarize = self._form_data.get_all_str('calendarize') != []
+        # pylint: disable=too-many-locals
+        # pylint: disable=too-many-statements
+        title = self._form_data.get_str('title')
+        description = self._form_data.get_str('description')
+        effort = self._form_data.get_float('effort')
+        conditions = self._form_data.get_all_int('conditions')
+        blockers = self._form_data.get_all_int('blockers')
+        enables = self._form_data.get_all_int('enables')
+        disables = self._form_data.get_all_int('disables')
+        calendarize = self._form_data.get_all_str('calendarize') != []
+        suppresses = self._form_data.get_all_int('suppresses')
+        step_of = self._form_data.get_all_str('step_of')
+        keep_steps = self._form_data.get_all_int('keep_step')
+        step_ids = self._form_data.get_all_int('steps')
+        new_top_steps = self._form_data.get_all_str('new_top_step')
+        step_process_id_to = {}
+        step_parent_id_to = {}
+        new_steps_to = {}
+        for step_id in step_ids:
+            name = f'new_step_to_{step_id}'
+            new_steps_to[step_id] = self._form_data.get_all_int(name)
+        for step_id in keep_steps:
+            name = f'step_{step_id}_process_id'
+            step_process_id_to[step_id] = self._form_data.get_int(name)
+            name = f'step_{step_id}_parent_id'
+            step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
+        process.title.set(title)
+        process.description.set(description)
+        process.effort.set(effort)
+        process.set_conditions(self.conn, conditions)
+        process.set_blockers(self.conn, blockers)
+        process.set_enables(self.conn, enables)
+        process.set_disables(self.conn, disables)
+        process.calendarize = calendarize
         process.save(self.conn)
         assert isinstance(process.id_, int)
+        new_step_title = None
         steps: list[ProcessStep] = []
-        for step_id in self._form_data.get_all_int('keep_step'):
-            if step_id not in self._form_data.get_all_int('steps'):
+        for step_id in keep_steps:
+            if step_id not in step_ids:
                 raise BadFormatException('trying to keep unknown step')
-        for step_id in self._form_data.get_all_int('steps'):
-            if step_id not in self._form_data.get_all_int('keep_step'):
-                continue
-            step_process_id = self._form_data.get_int(
-                    f'step_{step_id}_process_id')
-            parent_id = self._form_data.get_int_or_none(
-                    f'step_{step_id}_parent_id')
-            steps += [ProcessStep(step_id, process.id_, step_process_id,
-                                  parent_id)]
-        for step_id in self._form_data.get_all_int('steps'):
-            for step_process_id in self._form_data.get_all_int(
-                    f'new_step_to_{step_id}'):
-                steps += [ProcessStep(None, process.id_, step_process_id,
-                                      step_id)]
-        new_step_title = None
-        for step_identifier in self._form_data.get_all_str('new_top_step'):
+            step = ProcessStep(step_id, process.id_,
+                               step_process_id_to[step_id],
+                               step_parent_id_to[step_id])
+            steps += [step]
+        for step_id in step_ids:
+            new = [ProcessStep(None, process.id_, step_process_id, step_id)
+                   for step_process_id in new_steps_to[step_id]]
+            steps += new
+        for step_identifier in new_top_steps:
             try:
                 step_process_id = int(step_identifier)
-                steps += [ProcessStep(None, process.id_, step_process_id,
-                                      None)]
+                step = ProcessStep(None, process.id_, step_process_id, None)
+                steps += [step]
             except ValueError:
                 new_step_title = step_identifier
         process.set_steps(self.conn, steps)
-        process.set_step_suppressions(self.conn,
-                                      self._form_data.
-                                      get_all_int('suppresses'))
+        process.set_step_suppressions(self.conn, suppresses)
         owners_to_set = []
         new_owner_title = None
-        for owner_identifier in self._form_data.get_all_str('step_of'):
+        for owner_identifier in step_of:
             try:
                 owners_to_set += [int(owner_identifier)]
             except ValueError:
@@ -632,16 +750,14 @@ class TaskHandler(BaseHTTPRequestHandler):
         """Update history timestamps for Condition.title."""
         return self._change_versioned_timestamps(Condition, 'title')
 
-    def do_POST_condition(self) -> str:
+    @_delete_or_post(Condition, '/conditions')
+    def do_POST_condition(self, condition: Condition) -> str:
         """Update/insert Condition of ?id= and fields defined in postvars."""
-        id_ = self._params.get_int_or_none('id')
-        for _ in self._form_data.get_all_str('delete'):
-            condition = Condition.by_id(self.conn, id_)
-            condition.remove(self.conn)
-            return '/conditions'
-        condition = Condition.by_id(self.conn, id_, create=True)
-        condition.is_active = self._form_data.get_all_str('is_active') != []
-        condition.title.set(self._form_data.get_str('title'))
-        condition.description.set(self._form_data.get_str('description'))
+        is_active = self._form_data.get_str('is_active') == 'True'
+        title = self._form_data.get_str('title')
+        description = self._form_data.get_str('description')
+        condition.is_active = is_active
+        condition.title.set(title)
+        condition.description.set(description)
         condition.save(self.conn)
         return f'/condition?id={condition.id_}'
index 06ee4ba9b9c2a3011019b03b1b0e21633fce780c..bb1de3a4a3356415473bc652d650e202886eb01b 100644 (file)
@@ -33,7 +33,13 @@ class Process(BaseModel[int], ConditionsRelations):
                          ('process_disables', 'process', 'disables', 0),
                          ('process_step_suppressions', 'process',
                           'suppressed_steps', 0)]
+    add_to_dict = ['explicit_steps']
     to_search = ['title.newest', 'description.newest']
+    can_create_by_id = True
+    sorters = {'steps': lambda p: len(p.explicit_steps),
+               'owners': lambda p: p.n_owners,
+               'effort': lambda p: p.effort.newest,
+               'title': lambda p: p.title.newest}
 
     def __init__(self, id_: int | None, calendarize: bool = False) -> None:
         BaseModel.__init__(self, id_)
@@ -51,11 +57,7 @@ class Process(BaseModel[int], ConditionsRelations):
                        row: Row | list[Any]) -> Process:
         """Make from DB row, with dependencies."""
         process = super().from_table_row(db_conn, row)
-        assert isinstance(process.id_, int)
-        for name in ('title', 'description', 'effort'):
-            table = f'process_{name}s'
-            for row_ in db_conn.row_where(table, 'parent', process.id_):
-                getattr(process, name).history_from_row(row_)
+        assert process.id_ is not None
         for name in ('conditions', 'blockers', 'enables', 'disables'):
             table = f'process_{name}'
             assert isinstance(process.id_, int)
@@ -218,7 +220,7 @@ class ProcessStep(BaseModel[int]):
         self.parent_step_id = parent_step_id
 
     def save(self, db_conn: DatabaseConnection) -> None:
-        """Remove from DB, and owner's .explicit_steps."""
+        """Update into DB/cache, and owner's .explicit_steps."""
         super().save(db_conn)
         owner = Process.by_id(db_conn, self.owner_id)
         if self not in owner.explicit_steps:
index 705bd725e2ff662ab4f9f2e370e61169a413ff03..f5388b58f25ec1237b65b751c8fd5fa352160ddf 100644 (file)
@@ -1,6 +1,5 @@
 """Actionables."""
 from __future__ import annotations
-from dataclasses import dataclass
 from typing import Any, Set
 from sqlite3 import Row
 from plomtask.db import DatabaseConnection, BaseModel
@@ -12,13 +11,28 @@ from plomtask.exceptions import (NotFoundException, BadFormatException,
 from plomtask.dating import valid_date
 
 
-@dataclass
 class TodoNode:
     """Collects what's useful to know for Todo/Condition tree display."""
+    # pylint: disable=too-few-public-methods
     todo: Todo
     seen: bool
     children: list[TodoNode]
 
+    def __init__(self,
+                 todo: Todo,
+                 seen: bool,
+                 children: list[TodoNode]) -> None:
+        self.todo = todo
+        self.seen = seen
+        self.children = children
+
+    @property
+    def as_dict(self) -> dict[str, object]:
+        """Return self as (json.dumps-coompatible) dict."""
+        return {'todo': self.todo.id_,
+                'seen': self.seen,
+                'children': [c.as_dict for c in self.children]}
+
 
 class Todo(BaseModel[int], ConditionsRelations):
     """Individual actionable."""
@@ -37,6 +51,10 @@ class Todo(BaseModel[int], ConditionsRelations):
     days_to_update: Set[str] = set()
     children: list[Todo]
     parents: list[Todo]
+    sorters = {'doneness': lambda t: t.is_done,
+               'title': lambda t: t.title_then,
+               'comment': lambda t: t.comment,
+               'date': lambda t: t.date}
 
     # pylint: disable=too-many-arguments
     def __init__(self, id_: int | None,
index cbd1c8e348a9230b10176d55b4b6a490fe11ff33..8861c9834ff3924d6459ced5cb9c69629424bb45 100644 (file)
@@ -4,7 +4,8 @@ from typing import Any
 from sqlite3 import Row
 from time import sleep
 from plomtask.db import DatabaseConnection
-from plomtask.exceptions import HandledException, BadFormatException
+from plomtask.exceptions import (HandledException, BadFormatException,
+                                 NotFoundException)
 
 TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
 
@@ -98,6 +99,8 @@ class VersionedAttribute:
 
     def save(self, db_conn: DatabaseConnection) -> None:
         """Save as self.history entries, but first wipe old ones."""
+        if self.parent.id_ is None:
+            raise NotFoundException('cannot save attribute to parent if no ID')
         db_conn.rewrite_relations(self.table_name, 'parent', self.parent.id_,
                                   [[item[0], item[1]]
                                    for item in self.history.items()])
index c92a5eb0f8462356d31c4bc64c434bfd4a46a17e..e4480354be0edab044bc69d5fada5ab840046dfa 100755 (executable)
@@ -1,5 +1,6 @@
 #!/bin/sh
 set -e
+# for dir in $(echo 'tests'); do
 for dir in $(echo '.' 'plomtask' 'tests'); do
     echo "Running mypy on ${dir}/ …."
     python3 -m mypy --strict ${dir}/*.py
@@ -11,6 +12,6 @@ done
 echo "Running unittest-parallel on tests/."
 unittest-parallel -t . -s tests/ -p '*.py'
 set +e
-rm test_db:*.*
+rm test_db:*
 set -e
 exit 0
index f980cd1f5ad93d271e8103c981c8cdf49523e8fe..59cf55b46029da9850a4e95eb0279fb3fce7e756 100644 (file)
@@ -7,20 +7,20 @@
 th {
   border: 1px solid black;
 }
-td.cond_line_0, td.cond_line_1, td.cond_line_2 {
+td.cond_line {
   padding: 0;
   border-top: 1px solid white;
 }
-td.cond_line_0 {
+td.cond_0 {
   background-color: #bfbfbf;
 }
-td.cond_line_1 {
+td.cond_1 {
   background-color: #dfdfdf;
 }
-td.cond_line_2 {
-  background-color: #fffff;
+td.cond_2 {
+  background-color: fffff;
 }
-td.cond_line_corner {
+td.cond_shrink {
   max-width: 0px;
   white-space: nowrap;
   overflow: hidden;
@@ -58,11 +58,15 @@ input.ablers {
 {% endif %}
 
 {% for condition in conditions_present %}
-<td class="cond_line_{{loop.index0 % 3}}">
 {% if condition in node.todo.conditions and not condition.is_active %}
-O&nbsp;
+<td class="cond_line cond_{{loop.index0 % 3}}">
++&gt;
 {% elif condition in node.todo.blockers and condition.is_active %}
-!&nbsp;
+<td class="cond_line cond_{{loop.index0 % 3}}">
+-&gt;
+{% else %}
+<td class="cond_line cond_{{loop.index0 % 3}} cond_shrink">
+|
 {% endif %}
 </td>
 {% endfor %}
@@ -80,7 +84,17 @@ O&nbsp;
 </td>
 
 {% for condition in conditions_present|reverse %}
-<td class="cond_line_{{(conditions_present|length - loop.index) % 3}}">{% if condition in node.todo.enables %}&nbsp;+{% elif condition in node.todo.disables %}&nbsp;!{% endif %}</td>
+{% if condition in node.todo.enables %}
+<td class="cond_line cond_{{(conditions_present|length - loop.index) % 3}}">
++&gt;
+{% elif condition in node.todo.disables %}
+<td class="cond_line cond_{{(conditions_present|length - loop.index) % 3}}">
+-&gt;
+{% else %}
+<td class="cond_line cond_{{(conditions_present|length - loop.index) % 3}} cond_shrink">
+&nbsp;|
+{% endif %}
+</td>
 {% endfor %}
 
 <td colspan=2>
@@ -142,10 +156,12 @@ comment:
 add: <input type="text" name="new_todo" list="processes">
 </p>
 <p>
+make new todos
 <select name="make_type">
-<option value="full">make new todos with children</option>
-<option value="empty"{% if make_type == "empty" %}selected {% endif %}>make new todos without children</option>
+<option value="full">with</option>
+<option value="empty"{% if make_type == "empty" %}selected {% endif %}>without</option>
 </select>
+descendants (i.e. adopt where possible, otherwise create anew)
 </p>
 
 <table>
@@ -162,25 +178,25 @@ add: <input type="text" name="new_todo" list="processes">
 
 {% for _ in conditions_present %}
 {% if outer_loop.index > loop.index %}
-<td class="cond_line_{{loop.index0 % 3}}">
+<td class="cond_line cond_{{loop.index0 % 3}} cond_shrink">|
 {% elif outer_loop.index < loop.index %}
-<td class="cond_line_{{outer_loop.index0 % 3}}">
+<td class="cond_line cond_{{outer_loop.index0 % 3}}">
 {% else %}
-<td class="cond_line_{{outer_loop.index0 % 3}} cond_line_corner">×
+<td class="cond_line cond_{{outer_loop.index0 % 3}} cond_shrink">/
 {% endif %}
 </td>
 {% endfor %}
 
-<td class="cond_line_{{loop.index0 % 3}}"><input type="checkbox" disabled{% if condition.is_active %} checked{% endif %}></td>
-<td colspan=2 class="cond_line_{{loop.index0 % 3}}"><a href="condition?id={{condition.id_}}">{{condition.title.at(day.date)|e}}</a></td>
+<td class="cond_line cond_{{loop.index0 % 3}}"><input type="checkbox" disabled{% if condition.is_active %} checked{% endif %}></td>
+<td colspan=2 class="cond_line cond_{{loop.index0 % 3}}"><a href="condition?id={{condition.id_}}">{{condition.title.at(day.date)|e}}</a></td>
 
 {% for _ in conditions_present %}
 {% if outer_loop.index0 + loop.index < conditions_present|length %}
-<td class="cond_line_{{outer_loop.index0 % 3}}">
+<td class="cond_line cond_{{outer_loop.index0 % 3}}">
 {% elif outer_loop.index0 + loop.index > conditions_present|length %}
-<td class="cond_line_{{(conditions_present|length - loop.index) % 3}}">
+<td class="cond_line cond_{{(conditions_present|length - loop.index) % 3}} cond_shrink">&nbsp;|
 {% else %}
-<td class="cond_line_{{outer_loop.index0 % 3}} cond_line_corner">&nbsp;×
+<td class="cond_line cond_{{outer_loop.index0 % 3}} cond_shrink">&nbsp;\
 {% endif %}
 {% endfor %}
 
@@ -197,11 +213,11 @@ add: <input type="text" name="new_todo" list="processes">
 
 <tr>
 {% for condition in conditions_present %}
-<td class="cond_line_{{loop.index0 % 3}}"></td>
+<td class="cond_line cond_{{loop.index0 % 3}} cond_shrink">|</td>
 {% endfor %}
 <th colspan=3>doables</th>
 {% for condition in conditions_present %}
-<td class="cond_line_{{(conditions_present|length - loop.index) % 3}}"></td>
+<td class="cond_line cond_{{(conditions_present|length - loop.index) % 3}} cond_shrink">&nbsp;|</td>
 {% endfor %}
 <th colspan=2>comments</th>
 </tr>
index 60687d8b96ef3ed9245ccb46671bb346494a6cb9..7bb503eeafd2a68e21987021a7db0a110c4e548b 100644 (file)
@@ -97,11 +97,11 @@ edit process of ID {{process.id_}}
 </tr>
 <tr>
 <th>conditions</th>
-<td>{{ macros.simple_checkbox_table("condition", process.conditions, "condition", "condition_candidates") }}</td>
+<td>{{ macros.simple_checkbox_table("conditions", process.conditions, "condition", "condition_candidates") }}</td>
 </tr>
 <tr>
 <th>blockers</th>
-<td>{{ macros.simple_checkbox_table("blocker", process.blockers, "condition", "condition_candidates") }}</td>
+<td>{{ macros.simple_checkbox_table("blockers", process.blockers, "condition", "condition_candidates") }}</td>
 </tr>
 <tr>
 <th>enables</th>
index c2fb01d710d8d9d0081d70e124f3a38d241ec7e3..fea931ab83ddf57536ab375ce3773a3f656204ce 100644 (file)
@@ -75,11 +75,11 @@ select{ font-size: 0.5em; margin: 0; padding: 0; }
 </tr>
 <tr>
 <th>conditions</th>
-<td>{{ macros.simple_checkbox_table("condition", todo.conditions, "condition", "condition_candidates") }}</td>
+<td>{{ macros.simple_checkbox_table("conditions", todo.conditions, "condition", "condition_candidates") }}</td>
 </tr>
 <tr>
 <th>blockers</th>
-<td>{{ macros.simple_checkbox_table("blocker", todo.blockers, "condition", "condition_candidates") }}</td>
+<td>{{ macros.simple_checkbox_table("blockers", todo.blockers, "condition", "condition_candidates") }}</td>
 </tr>
 <tr>
 <th>enables</th>
index 3b05bd098da61ecbeec25588bd5e76572bde5ef2..9b3a40383ad21d959ce8884fd5c180039d317919 100644 (file)
@@ -9,7 +9,6 @@ from plomtask.exceptions import HandledException
 class TestsSansDB(TestCaseSansDB):
     """Tests requiring no DB setup."""
     checked_class = Condition
-    do_id_test = True
     versioned_defaults_to_test = {'title': 'UNNAMED', 'description': ''}
 
 
@@ -19,31 +18,9 @@ class TestsWithDB(TestCaseWithDB):
     default_init_kwargs = {'is_active': False}
     test_versioneds = {'title': str, 'description': str}
 
-    def test_Condition_from_table_row(self) -> None:
-        """Test .from_table_row() properly reads in class from DB"""
-        self.check_from_table_row()
-        self.check_versioned_from_table_row('title', str)
-        self.check_versioned_from_table_row('description', str)
-
-    def test_Condition_by_id(self) -> None:
-        """Test .by_id(), including creation."""
-        self.check_by_id()
-
-    def test_Condition_all(self) -> None:
-        """Test .all()."""
-        self.check_all()
-
-    def test_Condition_singularity(self) -> None:
-        """Test pointers made for single object keep pointing to it."""
-        self.check_singularity('is_active', True)
-
-    def test_Condition_versioned_attributes_singularity(self) -> None:
-        """Test behavior of VersionedAttributes on saving (with .title)."""
-        self.check_versioned_singularity()
-
-    def test_Condition_remove(self) -> None:
+    def test_remove(self) -> None:
         """Test .remove() effects on DB and cache."""
-        self.check_remove()
+        super().test_remove()
         proc = Process(None)
         proc.save(self.db_conn)
         todo = Todo(None, proc, False, '2024-01-01')
@@ -65,20 +42,147 @@ class TestsWithDB(TestCaseWithDB):
 class TestsWithServer(TestCaseWithServer):
     """Module tests against our HTTP server/handler (and database)."""
 
-    def test_do_POST_condition(self) -> None:
-        """Test POST /condition and its effect on the database."""
-        form_data = {'title': 'foo', 'description': 'foo'}
-        self.check_post(form_data, '/condition', 302, '/condition?id=1')
-        self.assertEqual(1, len(Condition.all(self.db_conn)))
-        form_data['delete'] = ''
-        self.check_post(form_data, '/condition?id=', 404)
-        self.check_post(form_data, '/condition?id=2', 404)
-        self.check_post(form_data, '/condition?id=1', 302, '/conditions')
-        self.assertEqual(0, len(Condition.all(self.db_conn)))
+    @classmethod
+    def GET_condition_dict(cls, cond: dict[str, object]) -> dict[str, object]:
+        """Return JSON of GET /condition to expect."""
+        return {'is_new': False,
+                'enabled_processes': [],
+                'disabled_processes': [],
+                'enabling_processes': [],
+                'disabling_processes': [],
+                'condition': cond['id'],
+                '_library': {'Condition': cls.as_refs([cond])}}
 
-    def test_do_GET(self) -> None:
-        """Test /condition and /conditions response codes."""
-        form_data = {'title': 'foo', 'description': 'foo'}
-        self.check_post(form_data, '/condition', 302, '/condition?id=1')
+    @classmethod
+    def GET_conditions_dict(cls, conds: list[dict[str, object]]
+                            ) -> dict[str, object]:
+        """Return JSON of GET /conditions to expect."""
+        library = {'Condition': cls.as_refs(conds)} if conds else {}
+        d: dict[str, object] = {'conditions': cls.as_id_list(conds),
+                                'sort_by': 'title',
+                                'pattern': '',
+                                '_library': library}
+        return d
+
+    def test_fail_POST_condition(self) -> None:
+        """Test malformed/illegal POST /condition requests."""
+        # check invalid POST payloads
+        url = '/condition'
+        self.check_post({}, url, 400)
+        self.check_post({'title': ''}, url, 400)
+        self.check_post({'title': '', 'description': ''}, url, 400)
+        self.check_post({'title': '', 'is_active': False}, url, 400)
+        self.check_post({'description': '', 'is_active': False}, url, 400)
+        # check valid POST payload on bad paths
+        valid_payload = {'title': '', 'description': '', 'is_active': False}
+        self.check_post(valid_payload, '/condition?id=foo', 400)
+
+    def test_POST_condition(self) -> None:
+        """Test (valid) POST /condition and its effect on GET /condition[s]."""
+        # test valid POST's effect on …
+        post = {'title': 'foo', 'description': 'oof', 'is_active': False}
+        self.check_post(post, '/condition', 302, '/condition?id=1')
+        # … single /condition
+        cond = self.cond_as_dict(titles=['foo'], descriptions=['oof'])
+        assert isinstance(cond['_versioned'], dict)
+        expected_single = self.GET_condition_dict(cond)
+        self.check_json_get('/condition?id=1', expected_single)
+        # … full /conditions
+        expected_all = self.GET_conditions_dict([cond])
+        self.check_json_get('/conditions', expected_all)
+        # test (no) effect of invalid POST to existing Condition on /condition
+        self.check_post({}, '/condition?id=1', 400)
+        self.check_json_get('/condition?id=1', expected_single)
+        # test effect of POST changing title and activeness
+        post = {'title': 'bar', 'description': 'oof', 'is_active': True}
+        self.check_post(post, '/condition?id=1', 302)
+        cond['_versioned']['title'][1] = 'bar'
+        cond['is_active'] = True
+        self.check_json_get('/condition?id=1', expected_single)
+        # test deletion POST's effect on …
+        self.check_post({'delete': ''}, '/condition?id=1', 302, '/conditions')
+        cond = self.cond_as_dict()
+        assert isinstance(expected_single['_library'], dict)
+        expected_single['_library']['Condition'] = self.as_refs([cond])
+        self.check_json_get('/condition?id=1', expected_single)
+        # … full /conditions
+        expected_all['conditions'] = []
+        expected_all['_library'] = {}
+        self.check_json_get('/conditions', expected_all)
+
+    def test_GET_condition(self) -> None:
+        """More GET /condition testing, especially for Process relations."""
+        # check expected default status codes
         self.check_get_defaults('/condition')
-        self.check_get('/conditions', 200)
+        # make Condition and two Processes that among them establish all
+        # possible ConditionsRelations to it, …
+        cond_post = {'title': 'foo', 'description': 'oof', 'is_active': False}
+        self.check_post(cond_post, '/condition', 302, '/condition?id=1')
+        proc1_post = {'title': 'A', 'description': '', 'effort': 1.0,
+                      'conditions': [1], 'disables': [1]}
+        proc2_post = {'title': 'B', 'description': '', 'effort': 1.0,
+                      'enables': [1], 'blockers': [1]}
+        self.post_process(1, proc1_post)
+        self.post_process(2, proc2_post)
+        # … then check /condition displays all these properly.
+        cond = self.cond_as_dict(titles=['foo'], descriptions=['oof'])
+        assert isinstance(cond['id'], int)
+        proc1 = self.proc_as_dict(conditions=[cond['id']],
+                                  disables=[cond['id']])
+        proc2 = self.proc_as_dict(2, 'B',
+                                  blockers=[cond['id']],
+                                  enables=[cond['id']])
+        expected = self.GET_condition_dict(cond)
+        assert isinstance(expected['_library'], dict)
+        expected['enabled_processes'] = self.as_id_list([proc1])
+        expected['disabled_processes'] = self.as_id_list([proc2])
+        expected['enabling_processes'] = self.as_id_list([proc2])
+        expected['disabling_processes'] = self.as_id_list([proc1])
+        expected['_library']['Process'] = self.as_refs([proc1, proc2])
+        self.check_json_get('/condition?id=1', expected)
+
+    def test_GET_conditions(self) -> None:
+        """Test GET /conditions."""
+        # test empty result on empty DB, default-settings on empty params
+        expected = self.GET_conditions_dict([])
+        self.check_json_get('/conditions', expected)
+        # test on meaningless non-empty params (incl. entirely un-used key),
+        # that 'sort_by' default to 'title' (even if set to something else, as
+        # long as without handler) and 'pattern' get preserved
+        expected['pattern'] = 'bar'  # preserved despite zero effect!
+        url = '/conditions?sort_by=foo&pattern=bar&foo=x'
+        self.check_json_get(url, expected)
+        # test non-empty result, automatic (positive) sorting by title
+        post1 = {'is_active': False, 'title': 'foo', 'description': 'oof'}
+        post2 = {'is_active': False, 'title': 'bar', 'description': 'rab'}
+        post3 = {'is_active': True, 'title': 'baz', 'description': 'zab'}
+        self.check_post(post1, '/condition', 302, '/condition?id=1')
+        self.check_post(post2, '/condition', 302, '/condition?id=2')
+        self.check_post(post3, '/condition', 302, '/condition?id=3')
+        cond1 = self.cond_as_dict(1, False, ['foo'], ['oof'])
+        cond2 = self.cond_as_dict(2, False, ['bar'], ['rab'])
+        cond3 = self.cond_as_dict(3, True, ['baz'], ['zab'])
+        expected = self.GET_conditions_dict([cond2, cond3, cond1])
+        self.check_json_get('/conditions', expected)
+        # test other sortings
+        # (NB: by .is_active has two items of =False, their order currently
+        # is not explicitly made predictable, so mail fail until we do)
+        expected['sort_by'] = '-title'
+        expected['conditions'] = self.as_id_list([cond1, cond3, cond2])
+        self.check_json_get('/conditions?sort_by=-title', expected)
+        expected['sort_by'] = 'is_active'
+        expected['conditions'] = self.as_id_list([cond1, cond2, cond3])
+        self.check_json_get('/conditions?sort_by=is_active', expected)
+        expected['sort_by'] = '-is_active'
+        expected['conditions'] = self.as_id_list([cond3, cond1, cond2])
+        self.check_json_get('/conditions?sort_by=-is_active', expected)
+        # test pattern matching on title
+        expected = self.GET_conditions_dict([cond2, cond3])
+        expected['pattern'] = 'ba'
+        self.check_json_get('/conditions?pattern=ba', expected)
+        # test pattern matching on description
+        assert isinstance(expected['_library'], dict)
+        expected['conditions'] = self.as_id_list([cond1])
+        expected['_library']['Condition'] = self.as_refs([cond1])
+        expected['pattern'] = 'of'
+        self.check_json_get('/conditions?pattern=of', expected)
index 286f75815ef51e74ade9e15e96bdb85dc4218a4b..8e3768c660937b5ba32078ee13d03cf275aa57fa 100644 (file)
@@ -1,24 +1,24 @@
 """Test Days module."""
 from unittest import TestCase
 from datetime import datetime
+from typing import Callable
 from tests.utils import TestCaseWithDB, TestCaseWithServer
 from plomtask.dating import date_in_n_days
 from plomtask.days import Day
-from plomtask.exceptions import BadFormatException
 
 
 class TestsSansDB(TestCase):
     """Days module tests not requiring DB setup."""
+    legal_ids = ['2024-01-01']
+    illegal_ids = ['foo', '2024-02-30', '2024-02-01 23:00:00']
 
-    def test_Day_valid_date(self) -> None:
-        """Test Day's date format validation and parsing."""
-        with self.assertRaises(BadFormatException):
-            Day('foo')
-        with self.assertRaises(BadFormatException):
-            Day('2024-02-30')
-        with self.assertRaises(BadFormatException):
-            Day('2024-02-01 23:00:00')
-        self.assertEqual(datetime(2024, 1, 1), Day('2024-01-01').datetime)
+    def test_Day_datetime_weekday_neighbor_dates(self) -> None:
+        """Test Day's date parsing."""
+        self.assertEqual(datetime(2024, 5, 1), Day('2024-05-01').datetime)
+        self.assertEqual('Sunday', Day('2024-03-17').weekday)
+        self.assertEqual('March', Day('2024-03-17').month_name)
+        self.assertEqual('2023-12-31', Day('2024-01-01').prev_date)
+        self.assertEqual('2023-03-01', Day('2023-02-28').next_date)
 
     def test_Day_sorting(self) -> None:
         """Test sorting by .__lt__ and Day.__eq__."""
@@ -28,43 +28,21 @@ class TestsSansDB(TestCase):
         days = [day3, day1, day2]
         self.assertEqual(sorted(days), [day1, day2, day3])
 
-    def test_Day_weekday(self) -> None:
-        """Test Day.weekday."""
-        self.assertEqual(Day('2024-03-17').weekday, 'Sunday')
-
-    def test_Day_neighbor_dates(self) -> None:
-        """Test Day.prev_date and Day.next_date."""
-        self.assertEqual(Day('2024-01-01').prev_date, '2023-12-31')
-        self.assertEqual(Day('2023-02-28').next_date, '2023-03-01')
-
 
 class TestsWithDB(TestCaseWithDB):
     """Tests requiring DB, but not server setup."""
     checked_class = Day
     default_ids = ('2024-01-01', '2024-01-02', '2024-01-03')
 
-    def test_saving_and_caching(self) -> None:
-        """Test storage of instances.
-
-        We don't use the parent class's method here because the checked class
-        has too different a handling of IDs.
-        """
-        kwargs = {'date': self.default_ids[0], 'comment': 'foo'}
-        self.check_saving_and_caching(**kwargs)
-
-    def test_Day_from_table_row(self) -> None:
-        """Test .from_table_row() properly reads in class from DB"""
-        self.check_from_table_row()
-
-    def test_Day_by_id(self) -> None:
-        """Test .by_id()."""
-        self.check_by_id()
-
     def test_Day_by_date_range_filled(self) -> None:
         """Test Day.by_date_range_filled."""
         date1, date2, date3 = self.default_ids
-        day1, day2, day3 = self.check_all()
-        # check date range is a closed interval
+        day1 = Day(date1)
+        day2 = Day(date2)
+        day3 = Day(date3)
+        for day in [day1, day2, day3]:
+            day.save(self.db_conn)
+        # check date range includes limiter days
         self.assertEqual(Day.by_date_range_filled(self.db_conn, date1, date3),
                          [day1, day2, day3])
         # check first date range value excludes what's earlier
@@ -85,41 +63,313 @@ class TestsWithDB(TestCaseWithDB):
         self.assertEqual(Day.by_date_range_filled(self.db_conn,
                                                   day5.date, day7.date),
                          [day5, day6, day7])
-        self.check_storage([day1, day2, day3, day6])
+        self.check_identity_with_cache_and_db([day1, day2, day3, day6])
         # check 'today' is interpreted as today's date
         today = Day(date_in_n_days(0))
-        today.save(self.db_conn)
         self.assertEqual(Day.by_date_range_filled(self.db_conn,
                                                   'today', 'today'),
                          [today])
-
-    def test_Day_remove(self) -> None:
-        """Test .remove() effects on DB and cache."""
-        self.check_remove()
-
-    def test_Day_singularity(self) -> None:
-        """Test pointers made for single object keep pointing to it."""
-        self.check_singularity('day_comment', 'boo')
+        prev_day = Day(date_in_n_days(-1))
+        next_day = Day(date_in_n_days(1))
+        self.assertEqual(Day.by_date_range_filled(self.db_conn,
+                                                  'yesterday', 'tomorrow'),
+                         [prev_day, today, next_day])
 
 
 class TestsWithServer(TestCaseWithServer):
     """Tests against our HTTP server/handler (and database)."""
 
-    def test_do_GET(self) -> None:
-        """Test /day and /calendar response codes, and / redirect."""
-        self.check_get('/day', 200)
-        self.check_get('/day?date=3000-01-01', 200)
-        self.check_get('/day?date=FOO', 400)
-        self.check_get('/calendar', 200)
-        self.check_get('/calendar?start=&end=', 200)
-        self.check_get('/calendar?start=today&end=today', 200)
-        self.check_get('/calendar?start=2024-01-01&end=2025-01-01', 200)
-        self.check_get('/calendar?start=foo', 400)
+    @classmethod
+    def GET_day_dict(cls, date: str) -> dict[str, object]:
+        """Return JSON of GET /day to expect."""
+        # day: dict[str, object] = {'id': date, 'comment': '', 'todos': []}
+        day = cls._day_as_dict(date)
+        d: dict[str, object] = {'day': date,
+                                'top_nodes': [],
+                                'make_type': '',
+                                'enablers_for': {},
+                                'disablers_for': {},
+                                'conditions_present': [],
+                                'processes': [],
+                                '_library': {'Day': cls.as_refs([day])}}
+        return d
+
+    @classmethod
+    def GET_calendar_dict(cls, start: int, end: int) -> dict[str, object]:
+        """Return JSON of GET /calendar to expect."""
+        today_date = date_in_n_days(0)
+        start_date = date_in_n_days(start)
+        end_date = date_in_n_days(end)
+        dates = [date_in_n_days(i) for i in range(start, end+1)]
+        days = [cls._day_as_dict(d) for d in dates]
+        library = {'Day': cls.as_refs(days)} if len(days) > 0 else {}
+        return {'today': today_date, 'start': start_date, 'end': end_date,
+                'days': dates, '_library': library}
+
+    @staticmethod
+    def _todo_as_dict(id_: int = 1,
+                      process_id: int = 1,
+                      date: str = '2024-01-01',
+                      conditions: None | list[int] = None,
+                      disables: None | list[int] = None,
+                      blockers: None | list[int] = None,
+                      enables: None | list[int] = None
+                      ) -> dict[str, object]:
+        """Return JSON of Todo to expect."""
+        # pylint: disable=too-many-arguments
+        d = {'id': id_,
+             'date': date,
+             'process_id': process_id,
+             'is_done': False,
+             'calendarize': False,
+             'comment': '',
+             'children': [],
+             'parents': [],
+             'effort': None,
+             'conditions': conditions if conditions else [],
+             'disables': disables if disables else [],
+             'blockers': blockers if blockers else [],
+             'enables': enables if enables else []}
+        return d
 
-    def test_do_POST_day(self) -> None:
-        """Test POST /day."""
-        form_data = {'day_comment': '', 'make_type': 'full'}
-        self.check_post(form_data, '/day', 400)
-        self.check_post(form_data, '/day?date=foo', 400)
-        self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
-        self.check_post({'foo': ''}, '/day?date=2024-01-01', 400)
+    @staticmethod
+    def _todo_node_as_dict(todo_id: int) -> dict[str, object]:
+        """Return JSON of TodoNode to expect."""
+        return {'children': [], 'seen': False, 'todo': todo_id}
+
+    @staticmethod
+    def _day_as_dict(date: str) -> dict[str, object]:
+        return {'id': date, 'comment': '', 'todos': []}
+
+    @staticmethod
+    def _post_batch(list_of_args: list[list[object]],
+                    names_of_simples: list[str],
+                    names_of_versioneds: list[str],
+                    f_as_dict: Callable[..., dict[str, object]],
+                    f_to_post: Callable[..., None | dict[str, object]]
+                    ) -> list[dict[str, object]]:
+        """Post expected=f_as_dict(*args) as input to f_to_post, for many."""
+        expecteds = []
+        for args in list_of_args:
+            expecteds += [f_as_dict(*args)]
+        for expected in expecteds:
+            assert isinstance(expected['_versioned'], dict)
+            post = {}
+            for name in names_of_simples:
+                post[name] = expected[name]
+            for name in names_of_versioneds:
+                post[name] = expected['_versioned'][name][0]
+            f_to_post(expected['id'], post)
+        return expecteds
+
+    def _post_day(self, params: str = '',
+                  form_data: None | dict[str, object] = None,
+                  redir_to: str = '',
+                  status: int = 302,
+                  ) -> None:
+        """POST /day?{params} with form_data."""
+        if not form_data:
+            form_data = {'day_comment': '', 'make_type': ''}
+        target = f'/day?{params}'
+        if not redir_to:
+            redir_to = f'{target}&make_type={form_data["make_type"]}'
+        self.check_post(form_data, target, status, redir_to)
+
+    def test_basic_GET_day(self) -> None:
+        """Test basic (no Processes/Conditions/Todos) GET /day basics."""
+        # check illegal date parameters
+        self.check_get('/day?date=foo', 400)
+        self.check_get('/day?date=2024-02-30', 400)
+        # check undefined day
+        date = date_in_n_days(0)
+        expected = self.GET_day_dict(date)
+        self.check_json_get('/day', expected)
+        # NB: GET ?date="today"/"yesterday"/"tomorrow" in test_basic_POST_day
+        # check 'make_type' GET parameter affects immediate reply, but …
+        date = '2024-01-01'
+        expected = self.GET_day_dict(date)
+        expected['make_type'] = 'bar'
+        self.check_json_get(f'/day?date={date}&make_type=bar', expected)
+        # … not any following, …
+        expected['make_type'] = ''
+        self.check_json_get(f'/day?date={date}', expected)
+        # … not even when part of a POST request
+        post: dict[str, object] = {'day_comment': '', 'make_type': 'foo'}
+        self._post_day(f'date={date}', post)
+        self.check_json_get(f'/day?date={date}', expected)
+
+    def test_fail_POST_day(self) -> None:
+        """Test malformed/illegal POST /day requests."""
+        # check payloads lacking minimum expecteds
+        url = '/day?date=2024-01-01'
+        self.check_post({}, url, 400)
+        self.check_post({'day_comment': ''}, url, 400)
+        self.check_post({'make_type': ''}, url, 400)
+        # to next check illegal new_todo values, we need an actual Process
+        self.post_process(1)
+        # check illegal new_todo values
+        post: dict[str, object]
+        post = {'make_type': '', 'day_comment': '', 'new_todo': ['foo']}
+        self.check_post(post, url, 400)
+        post['new_todo'] = [1, 2]  # no Process of .id_=2 exists
+        # to next check illegal old_todo inputs, we need to first post Todo
+        post['new_todo'] = [1]
+        self.check_post(post, url, 302, '/day?date=2024-01-01&make_type=')
+        # check illegal old_todo inputs (equal list lengths though)
+        post = {'make_type': '', 'day_comment': '', 'comment': ['foo'],
+                'effort': [3.3], 'done': [], 'todo_id': [1]}
+        self.check_post(post, url, 302, '/day?date=2024-01-01&make_type=')
+        post['todo_id'] = [2]  # reference to non-existant Process
+        self.check_post(post, url, 404)
+        post['todo_id'] = ['a']
+        self.check_post(post, url, 400)
+        post['todo_id'] = [1]
+        post['done'] = ['foo']
+        self.check_post(post, url, 400)
+        post['done'] = [2]  # reference to non-posted todo_id
+        self.check_post(post, url, 400)
+        post['done'] = []
+        post['effort'] = ['foo']
+        self.check_post(post, url, 400)
+        post['effort'] = [None]
+        self.check_post(post, url, 400)
+        post['effort'] = [3.3]
+        # check illegal old_todo inputs: unequal list lengths
+        post['comment'] = []
+        self.check_post(post, url, 400)
+        post['comment'] = ['foo', 'foo']
+        self.check_post(post, url, 400)
+        post['comment'] = ['foo']
+        post['effort'] = []
+        self.check_post(post, url, 400)
+        post['effort'] = [3.3, 3.3]
+        self.check_post(post, url, 400)
+        post['effort'] = [3.3]
+        post['todo_id'] = [1, 1]
+        self.check_post(post, url, 400)
+        post['todo_id'] = [1]
+        # # check valid POST payload on bad paths
+        self.check_post(post, '/day', 400)
+        self.check_post(post, '/day?date=', 400)
+        self.check_post(post, '/day?date=foo', 400)
+
+    def test_basic_POST_day(self) -> None:
+        """Test basic (no Todos) POST /day.
+
+        Check POST (& GET!) requests properly parse 'today', 'tomorrow',
+        'yesterday', and actual date strings;
+        preserve 'make_type' setting in redirect even if nonsensical;
+        and store 'day_comment'
+        """
+        for name, dist, test_str in [('2024-01-01', None, 'a'),
+                                     ('today', 0, 'b'),
+                                     ('yesterday', -1, 'c'),
+                                     ('tomorrow', +1, 'd')]:
+            date = name if dist is None else date_in_n_days(dist)
+            post = {'day_comment': test_str, 'make_type': f'x:{test_str}'}
+            post_url = f'/day?date={name}'
+            redir_url = f'{post_url}&make_type={post["make_type"]}'
+            self.check_post(post, post_url, 302, redir_url)
+            expected = self.GET_day_dict(date)
+            assert isinstance(expected['_library'], dict)
+            expected['_library']['Day'][date]['comment'] = test_str
+            self.check_json_get(post_url, expected)
+
+    def test_GET_day_with_processes_and_todos(self) -> None:
+        """Test GET /day displaying Processes and Todos (no trees)."""
+        date = '2024-01-01'
+        # check Processes get displayed in ['processes'] and ['_library']
+        procs_data = [[1, 'foo', 'oof', 1.1], [2, 'bar', 'rab', 0.9]]
+        procs_expected = self._post_batch(procs_data, [],
+                                          ['title', 'description', 'effort'],
+                                          self.proc_as_dict, self.post_process)
+        expected = self.GET_day_dict(date)
+        assert isinstance(expected['_library'], dict)
+        expected['processes'] = self.as_id_list(procs_expected)
+        expected['_library']['Process'] = self.as_refs(procs_expected)
+        self._post_day(f'date={date}')
+        self.check_json_get(f'/day?date={date}', expected)
+        # post Todos of either process and check their display
+        post_day: dict[str, object]
+        post_day = {'day_comment': '', 'make_type': '', 'new_todo': [1, 2]}
+        todos = [self._todo_as_dict(1, 1, date),
+                 self._todo_as_dict(2, 2, date)]
+        expected['_library']['Todo'] = self.as_refs(todos)
+        expected['_library']['Day'][date]['todos'] = self.as_id_list(todos)
+        nodes = [self._todo_node_as_dict(1), self._todo_node_as_dict(2)]
+        expected['top_nodes'] = nodes
+        self._post_day(f'date={date}', post_day)
+        self.check_json_get(f'/day?date={date}', expected)
+        # add a comment to one Todo and set the other's doneness and effort
+        post_day = {'day_comment': '', 'make_type': '', 'new_todo': [],
+                    'todo_id': [1, 2], 'done': [2], 'comment': ['FOO', ''],
+                    'effort': [2.3, '']}
+        expected['_library']['Todo']['1']['comment'] = 'FOO'
+        expected['_library']['Todo']['1']['effort'] = 2.3
+        expected['_library']['Todo']['2']['is_done'] = True
+        self._post_day(f'date={date}', post_day)
+        self.check_json_get(f'/day?date={date}', expected)
+
+    def test_GET_day_with_conditions(self) -> None:
+        """Test GET /day displaying Conditions and their relations."""
+        date = '2024-01-01'
+        # add Process with Conditions and their Todos, check display
+        conds_data = [[1, False, ['A'], ['a']], [2, True, ['B'], ['b']]]
+        conds_expected = self._post_batch(
+                conds_data, ['is_active'], ['title', 'description'],
+                self.cond_as_dict,
+                lambda x, y: self.check_post(y, f'/condition?id={x}', 302))
+        cond_names = ['conditions', 'disables', 'blockers', 'enables']
+        procs_data = [[1, 'foo', 'oof', 1.1, [1], [1], [2], [2]],
+                      [2, 'bar', 'rab', 0.9, [2], [2], [1], [1]]]
+        procs_expected = self._post_batch(procs_data, cond_names,
+                                          ['title', 'description', 'effort'],
+                                          self.proc_as_dict, self.post_process)
+        expected = self.GET_day_dict(date)
+        assert isinstance(expected['_library'], dict)
+        expected['processes'] = self.as_id_list(procs_expected)
+        expected['_library']['Process'] = self.as_refs(procs_expected)
+        expected['_library']['Condition'] = self.as_refs(conds_expected)
+        self._post_day(f'date={date}')
+        self.check_json_get(f'/day?date={date}', expected)
+        # add Todos in relation to Conditions, check consequences
+        post_day: dict[str, object]
+        post_day = {'day_comment': '', 'make_type': '', 'new_todo': [1, 2]}
+        todos = [self._todo_as_dict(1, 1, date, [1], [1], [2], [2]),
+                 self._todo_as_dict(2, 2, date, [2], [2], [1], [1])]
+        expected['_library']['Todo'] = self.as_refs(todos)
+        expected['_library']['Day'][date]['todos'] = self.as_id_list(todos)
+        nodes = [self._todo_node_as_dict(1), self._todo_node_as_dict(2)]
+        expected['top_nodes'] = nodes
+        expected['disablers_for'] = {'1': [1], '2': [2]}
+        expected['enablers_for'] = {'1': [2], '2': [1]}
+        expected['conditions_present'] = self.as_id_list(conds_expected)
+        self._post_day(f'date={date}', post_day)
+        self.check_json_get(f'/day?date={date}', expected)
+
+    def test_GET_calendar(self) -> None:
+        """Test GET /calendar responses based on various inputs, DB states."""
+        # check illegal date range delimiters
+        self.check_get('/calendar?start=foo', 400)
+        self.check_get('/calendar?end=foo', 400)
+        # check default range without saved days
+        expected = self.GET_calendar_dict(-1, 366)
+        self.check_json_get('/calendar', expected)
+        self.check_json_get('/calendar?start=&end=', expected)
+        # check named days as delimiters
+        expected = self.GET_calendar_dict(-1, +1)
+        self.check_json_get('/calendar?start=yesterday&end=tomorrow', expected)
+        # check zero-element range
+        expected = self.GET_calendar_dict(+1, 0)
+        self.check_json_get('/calendar?start=tomorrow&end=today', expected)
+        # check saved day shows up in results with proven by its comment
+        post_day: dict[str, object] = {'day_comment': 'foo', 'make_type': ''}
+        date1 = date_in_n_days(-2)
+        self._post_day(f'date={date1}', post_day)
+        start_date = date_in_n_days(-5)
+        end_date = date_in_n_days(+5)
+        url = f'/calendar?start={start_date}&end={end_date}'
+        expected = self.GET_calendar_dict(-5, +5)
+        assert isinstance(expected['_library'], dict)
+        expected['_library']['Day'][date1]['comment'] = post_day['day_comment']
+        self.check_json_get(url, expected)
index b0fb872bbdc2f34b65f2c3f843762c451c34baa0..a27f0d0a1f8c0a3330be0e6c6906e3a7d6d53fd2 100644 (file)
@@ -151,7 +151,7 @@ class TestsWithServer(TestCaseWithServer):
     """Tests against our HTTP server/handler (and database)."""
 
     def test_do_GET(self) -> None:
-        """Test / redirect, and unknown targets failing."""
+        """Test GET / redirect, and unknown targets failing."""
         self.conn.request('GET', '/')
         self.check_redirect('/day')
         self.check_get('/foo', 404)
index 34f6427e4d06152a17f035672061757cce203cac..1b20e217d077d826765f5a83c9a2b3250de38ba2 100644 (file)
@@ -1,4 +1,5 @@
 """Test Processes module."""
+from typing import Any
 from tests.utils import TestCaseWithDB, TestCaseWithServer, TestCaseSansDB
 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
 from plomtask.conditions import Condition
@@ -9,7 +10,6 @@ from plomtask.todos import Todo
 class TestsSansDB(TestCaseSansDB):
     """Module tests not requiring DB setup."""
     checked_class = Process
-    do_id_test = True
     versioned_defaults_to_test = {'title': 'UNNAMED', 'description': '',
                                   'effort': 1.0}
 
@@ -17,7 +17,6 @@ class TestsSansDB(TestCaseSansDB):
 class TestsSansDBProcessStep(TestCaseSansDB):
     """Module tests not requiring DB setup."""
     checked_class = ProcessStep
-    do_id_test = True
     default_init_args = [2, 3, 4]
 
 
@@ -58,17 +57,15 @@ class TestsWithDB(TestCaseWithDB):
     def test_Process_conditions_saving(self) -> None:
         """Test .save/.save_core."""
         p, set1, set2, set3 = self.p_of_conditions()
+        assert p.id_ is not None
         r = Process.by_id(self.db_conn, p.id_)
         self.assertEqual(sorted(r.conditions), sorted(set1))
         self.assertEqual(sorted(r.enables), sorted(set2))
         self.assertEqual(sorted(r.disables), sorted(set3))
 
-    def test_Process_from_table_row(self) -> None:
-        """Test .from_table_row() properly reads in class from DB"""
-        self.check_from_table_row()
-        self.check_versioned_from_table_row('title', str)
-        self.check_versioned_from_table_row('description', str)
-        self.check_versioned_from_table_row('effort', float)
+    def test_from_table_row(self) -> None:
+        """Test .from_table_row() properly reads in class from DB."""
+        super().test_from_table_row()
         p, set1, set2, set3 = self.p_of_conditions()
         p.save(self.db_conn)
         assert isinstance(p.id_, int)
@@ -182,25 +179,9 @@ class TestsWithDB(TestCaseWithDB):
             method(self.db_conn, [c1.id_, c2.id_])
             self.assertEqual(getattr(p, target), [c1, c2])
 
-    def test_Process_by_id(self) -> None:
-        """Test .by_id(), including creation"""
-        self.check_by_id()
-
-    def test_Process_all(self) -> None:
-        """Test .all()."""
-        self.check_all()
-
-    def test_Process_singularity(self) -> None:
-        """Test pointers made for single object keep pointing to it."""
-        self.check_singularity('conditions', [Condition(None)])
-
-    def test_Process_versioned_attributes_singularity(self) -> None:
-        """Test behavior of VersionedAttributes on saving (with .title)."""
-        self.check_versioned_singularity()
-
-    def test_Process_removal(self) -> None:
+    def test_remove(self) -> None:
         """Test removal of Processes and ProcessSteps."""
-        self.check_remove()
+        super().test_remove()
         p1, p2, p3 = self.three_processes()
         assert isinstance(p1.id_, int)
         assert isinstance(p2.id_, int)
@@ -212,13 +193,15 @@ class TestsWithDB(TestCaseWithDB):
             p1.remove(self.db_conn)
         p2.set_steps(self.db_conn, [])
         with self.assertRaises(NotFoundException):
+            assert step_id is not None
             ProcessStep.by_id(self.db_conn, step_id)
         p1.remove(self.db_conn)
         step = ProcessStep(None, p2.id_, p3.id_, None)
-        step_id = step.id_
         p2.set_steps(self.db_conn, [step])
+        step_id = step.id_
         p2.remove(self.db_conn)
         with self.assertRaises(NotFoundException):
+            assert step_id is not None
             ProcessStep.by_id(self.db_conn, step_id)
         todo = Todo(None, p3, False, '2024-01-01')
         todo.save(self.db_conn)
@@ -231,33 +214,25 @@ class TestsWithDB(TestCaseWithDB):
 class TestsWithDBForProcessStep(TestCaseWithDB):
     """Module tests requiring DB setup."""
     checked_class = ProcessStep
-    default_init_kwargs = {'owner_id': 2, 'step_process_id': 3,
-                           'parent_step_id': 4}
+    default_init_kwargs = {'owner_id': 1, 'step_process_id': 2,
+                           'parent_step_id': 3}
 
     def setUp(self) -> None:
         super().setUp()
-        p = Process(1)
-        p.save(self.db_conn)
-        p = Process(2)
-        p.save(self.db_conn)
+        self.p1 = Process(1)
+        self.p1.save(self.db_conn)
 
-    def test_saving_and_caching(self) -> None:
-        """Test storage and initialization of instances and attributes."""
-        self.check_saving_and_caching(id_=1, **self.default_init_kwargs)
-
-    def test_ProcessStep_remove(self) -> None:
+    def test_remove(self) -> None:
         """Test .remove and unsetting of owner's .explicit_steps entry."""
-        p1 = Process(None)
-        p2 = Process(None)
-        p1.save(self.db_conn)
+        p2 = Process(2)
         p2.save(self.db_conn)
-        assert isinstance(p1.id_, int)
+        assert isinstance(self.p1.id_, int)
         assert isinstance(p2.id_, int)
-        step = ProcessStep(None, p1.id_, p2.id_, None)
-        p1.set_steps(self.db_conn, [step])
+        step = ProcessStep(None, self.p1.id_, p2.id_, None)
+        self.p1.set_steps(self.db_conn, [step])
         step.remove(self.db_conn)
-        self.assertEqual(p1.explicit_steps, [])
-        self.check_storage([])
+        self.assertEqual(self.p1.explicit_steps, [])
+        self.check_identity_with_cache_and_db([])
 
 
 class TestsWithServer(TestCaseWithServer):
@@ -277,11 +252,12 @@ class TestsWithServer(TestCaseWithServer):
                         '/process?id=', 400)
         self.assertEqual(1, len(Process.all(self.db_conn)))
         form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.0}
-        self.post_process(2, form_data | {'condition': []})
-        self.check_post(form_data | {'condition': [1]}, '/process?id=', 404)
-        self.check_post({'title': 'foo', 'description': 'foo'},
+        self.post_process(2, form_data | {'conditions': []})
+        self.check_post(form_data | {'conditions': [1]}, '/process?id=', 404)
+        self.check_post({'title': 'foo', 'description': 'foo',
+                         'is_active': False},
                         '/condition', 302, '/condition?id=1')
-        self.post_process(3, form_data | {'condition': [1]})
+        self.post_process(3, form_data | {'conditions': [1]})
         self.post_process(4, form_data | {'disables': [1]})
         self.post_process(5, form_data | {'enables': [1]})
         form_data['delete'] = ''
@@ -311,6 +287,7 @@ class TestsWithServer(TestCaseWithServer):
         self.post_process(1, form_data_1)
         retrieved_process = Process.by_id(self.db_conn, 1)
         self.assertEqual(retrieved_process.explicit_steps, [])
+        assert retrieved_step_id is not None
         with self.assertRaises(NotFoundException):
             ProcessStep.by_id(self.db_conn, retrieved_step_id)
         # post new first (top_level) step of process 3 to process 1
@@ -361,11 +338,11 @@ class TestsWithServer(TestCaseWithServer):
         self.post_process(1, form_data_1)
         retrieved_process = Process.by_id(self.db_conn, 1)
         self.assertEqual(len(retrieved_process.explicit_steps), 2)
-        retrieved_step_0 = retrieved_process.explicit_steps[0]
+        retrieved_step_0 = retrieved_process.explicit_steps[1]
         self.assertEqual(retrieved_step_0.step_process_id, 3)
         self.assertEqual(retrieved_step_0.owner_id, 1)
         self.assertEqual(retrieved_step_0.parent_step_id, None)
-        retrieved_step_1 = retrieved_process.explicit_steps[1]
+        retrieved_step_1 = retrieved_process.explicit_steps[0]
         self.assertEqual(retrieved_step_1.step_process_id, 2)
         self.assertEqual(retrieved_step_1.owner_id, 1)
         self.assertEqual(retrieved_step_1.parent_step_id, None)
@@ -393,11 +370,11 @@ class TestsWithServer(TestCaseWithServer):
         self.post_process(1, form_data_1)
         retrieved_process = Process.by_id(self.db_conn, 1)
         self.assertEqual(len(retrieved_process.explicit_steps), 3)
-        retrieved_step_0 = retrieved_process.explicit_steps[0]
+        retrieved_step_0 = retrieved_process.explicit_steps[1]
         self.assertEqual(retrieved_step_0.step_process_id, 2)
         self.assertEqual(retrieved_step_0.owner_id, 1)
         self.assertEqual(retrieved_step_0.parent_step_id, None)
-        retrieved_step_1 = retrieved_process.explicit_steps[1]
+        retrieved_step_1 = retrieved_process.explicit_steps[0]
         self.assertEqual(retrieved_step_1.step_process_id, 3)
         self.assertEqual(retrieved_step_1.owner_id, 1)
         self.assertEqual(retrieved_step_1.parent_step_id, None)
@@ -408,5 +385,126 @@ class TestsWithServer(TestCaseWithServer):
 
     def test_do_GET(self) -> None:
         """Test /process and /processes response codes."""
+        self.check_get('/process', 200)
+        self.check_get('/process?id=', 200)
+        self.check_get('/process?id=1', 200)
         self.check_get_defaults('/process')
         self.check_get('/processes', 200)
+
+    def test_fail_GET_process(self) -> None:
+        """Test invalid GET /process params."""
+        # check for invalid IDs
+        self.check_get('/process?id=foo', 400)
+        self.check_get('/process?id=0', 500)
+        # check we catch invalid base64
+        self.check_get('/process?title_b64=foo', 400)
+        # check failure on references to unknown processes; we create Process
+        # of ID=1 here so we know the 404 comes from step_to=2 etc. (that tie
+        # the Process displayed by /process to others), not from not finding
+        # the main Process itself
+        self.post_process(1)
+        self.check_get('/process?id=1&step_to=2', 404)
+        self.check_get('/process?id=1&has_step=2', 404)
+
+    @classmethod
+    def GET_processes_dict(cls, procs: list[dict[str, object]]
+                           ) -> dict[str, object]:
+        """Return JSON of GET /processes to expect."""
+        library = {'Process': cls.as_refs(procs)} if procs else {}
+        d: dict[str, object] = {'processes': cls.as_id_list(procs),
+                                'sort_by': 'title',
+                                'pattern': '',
+                                '_library': library}
+        return d
+
+    @staticmethod
+    def procstep_as_dict(id_: int,
+                         owner_id: int,
+                         step_process_id: int,
+                         parent_step_id: int | None = None
+                         ) -> dict[str, object]:
+        """Return JSON of Process to expect."""
+        return {'id': id_,
+                'owner_id': owner_id,
+                'step_process_id': step_process_id,
+                'parent_step_id': parent_step_id}
+
+    def test_GET_processes(self) -> None:
+        """Test GET /processes."""
+        # pylint: disable=too-many-statements
+        # test empty result on empty DB, default-settings on empty params
+        expected = self.GET_processes_dict([])
+        self.check_json_get('/processes', expected)
+        # test on meaningless non-empty params (incl. entirely un-used key),
+        # that 'sort_by' default to 'title' (even if set to something else, as
+        # long as without handler) and 'pattern' get preserved
+        expected['pattern'] = 'bar'  # preserved despite zero effect!
+        url = '/processes?sort_by=foo&pattern=bar&foo=x'
+        self.check_json_get(url, expected)
+        # test non-empty result, automatic (positive) sorting by title
+        post1: dict[str, Any]
+        post2: dict[str, Any]
+        post3: dict[str, Any]
+        post1 = {'title': 'foo', 'description': 'oof', 'effort': 1.0}
+        post2 = {'title': 'bar', 'description': 'rab', 'effort': 1.1}
+        post2['new_top_step'] = 1
+        post3 = {'title': 'baz', 'description': 'zab', 'effort': 0.9}
+        post3['new_top_step'] = 1
+        self.post_process(1, post1)
+        self.post_process(2, post2)
+        self.post_process(3, post3)
+        post3['new_top_step'] = 2
+        post3['keep_step'] = 2
+        post3['steps'] = [2]
+        post3['step_2_process_id'] = 1
+        self.post_process(3, post3)
+        proc1 = self.proc_as_dict(1, post1['title'],
+                                  post1['description'], post1['effort'])
+        proc2 = self.proc_as_dict(2, post2['title'],
+                                  post2['description'], post2['effort'])
+        proc3 = self.proc_as_dict(3, post3['title'],
+                                  post3['description'], post3['effort'])
+        proc2['explicit_steps'] = [1]
+        proc3['explicit_steps'] = [2, 3]
+        step1 = self.procstep_as_dict(1, 2, 1)
+        step2 = self.procstep_as_dict(2, 3, 1)
+        step3 = self.procstep_as_dict(3, 3, 2)
+        expected = self.GET_processes_dict([proc2, proc3, proc1])
+        assert isinstance(expected['_library'], dict)
+        expected['_library']['ProcessStep'] = self.as_refs([step1, step2,
+                                                            step3])
+        self.check_json_get('/processes', expected)
+        # test other sortings
+        expected['sort_by'] = '-title'
+        expected['processes'] = self.as_id_list([proc1, proc3, proc2])
+        self.check_json_get('/processes?sort_by=-title', expected)
+        expected['sort_by'] = 'effort'
+        expected['processes'] = self.as_id_list([proc3, proc1, proc2])
+        self.check_json_get('/processes?sort_by=effort', expected)
+        expected['sort_by'] = '-effort'
+        expected['processes'] = self.as_id_list([proc2, proc1, proc3])
+        self.check_json_get('/processes?sort_by=-effort', expected)
+        expected['sort_by'] = 'steps'
+        expected['processes'] = self.as_id_list([proc1, proc2, proc3])
+        self.check_json_get('/processes?sort_by=steps', expected)
+        expected['sort_by'] = '-steps'
+        expected['processes'] = self.as_id_list([proc3, proc2, proc1])
+        self.check_json_get('/processes?sort_by=-steps', expected)
+        expected['sort_by'] = 'owners'
+        expected['processes'] = self.as_id_list([proc3, proc2, proc1])
+        self.check_json_get('/processes?sort_by=owners', expected)
+        expected['sort_by'] = '-owners'
+        expected['processes'] = self.as_id_list([proc1, proc2, proc3])
+        self.check_json_get('/processes?sort_by=-owners', expected)
+        # test pattern matching on title
+        expected = self.GET_processes_dict([proc2, proc3])
+        assert isinstance(expected['_library'], dict)
+        expected['pattern'] = 'ba'
+        expected['_library']['ProcessStep'] = self.as_refs([step1, step2,
+                                                            step3])
+        self.check_json_get('/processes?pattern=ba', expected)
+        # test pattern matching on description
+        expected['processes'] = self.as_id_list([proc1])
+        expected['_library'] = {'Process': self.as_refs([proc1])}
+        expected['pattern'] = 'of'
+        self.check_json_get('/processes?pattern=of', expected)
index 9317c398b255c51b14046f888b3de4a24b70238d..dd57ee4c0c28cfc73d3d9c08dd6c18ab2dd7cd7b 100644 (file)
@@ -1,5 +1,5 @@
 """Test Todos module."""
-from tests.utils import TestCaseWithDB, TestCaseWithServer
+from tests.utils import TestCaseSansDB, TestCaseWithDB, TestCaseWithServer
 from plomtask.todos import Todo, TodoNode
 from plomtask.processes import Process, ProcessStep
 from plomtask.conditions import Condition
@@ -7,11 +7,18 @@ from plomtask.exceptions import (NotFoundException, BadFormatException,
                                  HandledException)
 
 
-class TestsWithDB(TestCaseWithDB):
-    """Tests requiring DB, but not server setup."""
+class TestsWithDB(TestCaseWithDB, TestCaseSansDB):
+    """Tests requiring DB, but not server setup.
+
+    NB: We subclass TestCaseSansDB too, to pull in its .test_id_validation,
+    which for Todo wouldn't run without a DB being set up due to the need for
+    Processes with set IDs.
+    """
     checked_class = Todo
     default_init_kwargs = {'process': None, 'is_done': False,
                            'date': '2024-01-01'}
+    # solely used for TestCaseSansDB.test_id_setting
+    default_init_args = [None, False, '2024-01-01']
 
     def setUp(self) -> None:
         super().setUp()
@@ -24,6 +31,7 @@ class TestsWithDB(TestCaseWithDB):
         self.cond2 = Condition(None)
         self.cond2.save(self.db_conn)
         self.default_init_kwargs['process'] = self.proc
+        self.default_init_args[0] = self.proc
 
     def test_Todo_init(self) -> None:
         """Test creation of Todo and what they default to."""
@@ -45,16 +53,6 @@ class TestsWithDB(TestCaseWithDB):
         self.assertEqual(todo_yes_id.enables, [])
         self.assertEqual(todo_yes_id.disables, [])
 
-    def test_Todo_by_id(self) -> None:
-        """Test findability of Todos."""
-        todo = Todo(1, self.proc, False, self.date1)
-        todo.save(self.db_conn)
-        self.assertEqual(Todo.by_id(self.db_conn, 1), todo)
-        with self.assertRaises(NotFoundException):
-            Todo.by_id(self.db_conn, 0)
-        with self.assertRaises(NotFoundException):
-            Todo.by_id(self.db_conn, 2)
-
     def test_Todo_by_date(self) -> None:
         """Test findability of Todos by date."""
         t1 = Todo(None, self.proc, False, self.date1)
@@ -127,10 +125,11 @@ class TestsWithDB(TestCaseWithDB):
         assert isinstance(todo_1.id_, int)
         # test minimum
         node_0 = TodoNode(todo_1, False, [])
-        self.assertEqual(todo_1.get_step_tree(set()), node_0)
+        self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict)
         # test non_emtpy seen_todo does something
         node_0.seen = True
-        self.assertEqual(todo_1.get_step_tree({todo_1.id_}), node_0)
+        self.assertEqual(todo_1.get_step_tree({todo_1.id_}).as_dict,
+                         node_0.as_dict)
         # test child shows up
         todo_2 = Todo(None, self.proc, False, self.date1)
         todo_2.save(self.db_conn)
@@ -139,7 +138,7 @@ class TestsWithDB(TestCaseWithDB):
         node_2 = TodoNode(todo_2, False, [])
         node_0.children = [node_2]
         node_0.seen = False
-        self.assertEqual(todo_1.get_step_tree(set()), node_0)
+        self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict)
         # test child shows up with child
         todo_3 = Todo(None, self.proc, False, self.date1)
         todo_3.save(self.db_conn)
@@ -147,12 +146,12 @@ class TestsWithDB(TestCaseWithDB):
         todo_2.add_child(todo_3)
         node_3 = TodoNode(todo_3, False, [])
         node_2.children = [node_3]
-        self.assertEqual(todo_1.get_step_tree(set()), node_0)
+        self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict)
         # test same todo can be child-ed multiple times at different locations
         todo_1.add_child(todo_3)
         node_4 = TodoNode(todo_3, True, [])
         node_0.children += [node_4]
-        self.assertEqual(todo_1.get_step_tree(set()), node_0)
+        self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict)
 
     def test_Todo_create_with_children(self) -> None:
         """Test parenthood guaranteeds of Todo.create_with_children."""
@@ -193,14 +192,11 @@ class TestsWithDB(TestCaseWithDB):
         self.assertEqual(len(todo_3.children), 1)
         self.assertEqual(todo_3.children[0].process, proc4)
 
-    def test_Todo_singularity(self) -> None:
-        """Test pointers made for single object keep pointing to it."""
-        self.check_singularity('is_done', True, self.proc, False, self.date1)
-
     def test_Todo_remove(self) -> None:
         """Test removal."""
         todo_1 = Todo(None, self.proc, False, self.date1)
         todo_1.save(self.db_conn)
+        assert todo_1.id_ is not None
         todo_0 = Todo(None, self.proc, False, self.date1)
         todo_0.save(self.db_conn)
         todo_0.add_child(todo_1)
@@ -228,6 +224,7 @@ class TestsWithDB(TestCaseWithDB):
         todo_1.comment = 'foo'
         todo_1.effort = -0.1
         todo_1.save(self.db_conn)
+        assert todo_1.id_ is not None
         Todo.by_id(self.db_conn, todo_1.id_)
         todo_1.comment = ''
         todo_1_id = todo_1.id_
@@ -248,19 +245,25 @@ class TestsWithServer(TestCaseWithServer):
         form_data = {'day_comment': '', 'make_type': 'full'}
         self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
         self.assertEqual(Todo.by_date(self.db_conn, '2024-01-01'), [])
+        proc = Process.by_id(self.db_conn, 1)
         form_data['new_todo'] = str(proc.id_)
         self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
         todos = Todo.by_date(self.db_conn, '2024-01-01')
         self.assertEqual(1, len(todos))
         todo1 = todos[0]
         self.assertEqual(todo1.id_, 1)
+        proc = Process.by_id(self.db_conn, 1)
         self.assertEqual(todo1.process.id_, proc.id_)
         self.assertEqual(todo1.is_done, False)
+        proc2 = Process.by_id(self.db_conn, 2)
         form_data['new_todo'] = str(proc2.id_)
         self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
         todos = Todo.by_date(self.db_conn, '2024-01-01')
         todo1 = todos[1]
         self.assertEqual(todo1.id_, 2)
+        proc2 = Process.by_id(self.db_conn, 1)
+        todo1 = Todo.by_date(self.db_conn, '2024-01-01')[0]
+        self.assertEqual(todo1.id_, 1)
         self.assertEqual(todo1.process.id_, proc2.id_)
         self.assertEqual(todo1.is_done, False)
 
@@ -277,7 +280,7 @@ class TestsWithServer(TestCaseWithServer):
                         '/day?date=2024-01-01&make_type=full', 302)
         # test posting to bad URLs
         self.check_post({}, '/todo=', 404)
-        self.check_post({}, '/todo?id=', 400)
+        self.check_post({}, '/todo?id=', 404)
         self.check_post({}, '/todo?id=FOO', 400)
         self.check_post({}, '/todo?id=0', 404)
         # test posting naked entity
@@ -403,12 +406,15 @@ class TestsWithServer(TestCaseWithServer):
         form_data = {'day_comment': '', 'new_todo': [1], 'make_type': 'full'}
         self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
         todo = Todo.by_date(self.db_conn, '2024-01-01')[0]
-        form_data = {'day_comment': '', 'todo_id': [1], 'make_type': 'full'}
+        form_data = {'day_comment': '', 'todo_id': [1], 'make_type': 'full',
+                     'comment': [''], 'done': [], 'effort': ['']}
         self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
+        todo = Todo.by_date(self.db_conn, '2024-01-01')[0]
         self.assertEqual(todo.is_done, False)
         form_data = {'day_comment': '', 'todo_id': [1], 'done': [1],
-                     'make_type': 'full'}
+                     'make_type': 'full', 'comment': [''], 'effort': ['']}
         self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
+        todo = Todo.by_date(self.db_conn, '2024-01-01')[0]
         self.assertEqual(todo.is_done, True)
 
     def test_do_GET_todo(self) -> None:
@@ -416,8 +422,8 @@ class TestsWithServer(TestCaseWithServer):
         self.post_process()
         form_data = {'day_comment': '', 'new_todo': 1, 'make_type': 'full'}
         self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
-        self.check_get('/todo', 400)
-        self.check_get('/todo?id=', 400)
+        self.check_get('/todo', 404)
+        self.check_get('/todo?id=', 404)
         self.check_get('/todo?id=foo', 400)
         self.check_get('/todo?id=0', 404)
         self.check_get('/todo?id=1', 200)
index a9a4e80418a54f288281999b964ce8364474c177..665436873c27af704a13827715d3c795e04e1fe1 100644 (file)
@@ -1,11 +1,13 @@
 """Shared test utilities."""
+from __future__ import annotations
 from unittest import TestCase
+from typing import Mapping, Any, Callable
 from threading import Thread
 from http.client import HTTPConnection
+from json import loads as json_loads
 from urllib.parse import urlencode
 from uuid import uuid4
 from os import remove as remove_file
-from typing import Mapping, Any
 from plomtask.db import DatabaseFile, DatabaseConnection
 from plomtask.http import TaskHandler, TaskServer
 from plomtask.processes import Process, ProcessStep
@@ -15,27 +17,36 @@ from plomtask.todos import Todo
 from plomtask.exceptions import NotFoundException, HandledException
 
 
+def _within_checked_class(f: Callable[..., None]) -> Callable[..., None]:
+    def wrapper(self: TestCase) -> None:
+        if hasattr(self, 'checked_class'):
+            f(self)
+    return wrapper
+
+
 class TestCaseSansDB(TestCase):
     """Tests requiring no DB setup."""
     checked_class: Any
-    do_id_test: bool = False
     default_init_args: list[Any] = []
     versioned_defaults_to_test: dict[str, str | float] = {}
+    legal_ids = [1, 5]
+    illegal_ids = [0]
 
-    def test_id_setting(self) -> None:
-        """Test .id_ being set and its legal range being enforced."""
-        if not self.do_id_test:
-            return
-        with self.assertRaises(HandledException):
-            self.checked_class(0, *self.default_init_args)
-        obj = self.checked_class(5, *self.default_init_args)
-        self.assertEqual(obj.id_, 5)
+    @_within_checked_class
+    def test_id_validation(self) -> None:
+        """Test .id_ validation/setting."""
+        for id_ in self.illegal_ids:
+            with self.assertRaises(HandledException):
+                self.checked_class(id_, *self.default_init_args)
+        for id_ in self.legal_ids:
+            obj = self.checked_class(id_, *self.default_init_args)
+            self.assertEqual(obj.id_, id_)
 
+    @_within_checked_class
     def test_versioned_defaults(self) -> None:
         """Test defaults of VersionedAttributes."""
-        if len(self.versioned_defaults_to_test) == 0:
-            return
-        obj = self.checked_class(1, *self.default_init_args)
+        id_ = self.legal_ids[0]
+        obj = self.checked_class(id_, *self.default_init_args)
         for k, v in self.versioned_defaults_to_test.items():
             self.assertEqual(getattr(obj, k).newest, v)
 
@@ -60,19 +71,29 @@ class TestCaseWithDB(TestCase):
         self.db_conn.close()
         remove_file(self.db_file.path)
 
-    def test_saving_and_caching(self) -> None:
-        """Test storage and initialization of instances and attributes."""
-        if not hasattr(self, 'checked_class'):
-            return
-        self.check_saving_and_caching(id_=1, **self.default_init_kwargs)
-        obj = self.checked_class(None, **self.default_init_kwargs)
-        obj.save(self.db_conn)
-        self.assertEqual(obj.id_, 2)
-        for k, v in self.test_versioneds.items():
-            self.check_saving_of_versioned(k, v)
+    def _load_from_db(self, id_: int | str) -> list[object]:
+        db_found: list[object] = []
+        for row in self.db_conn.row_where(self.checked_class.table_name,
+                                          'id', id_):
+            db_found += [self.checked_class.from_table_row(self.db_conn,
+                                                           row)]
+        return db_found
 
-    def check_storage(self, content: list[Any]) -> None:
-        """Test cache and DB equal content."""
+    def _change_obj(self, obj: object) -> str:
+        attr_name: str = self.checked_class.to_save[-1]
+        attr = getattr(obj, attr_name)
+        new_attr: str | int | float | bool
+        if isinstance(attr, (int, float)):
+            new_attr = attr + 1
+        elif isinstance(attr, str):
+            new_attr = attr + '_'
+        elif isinstance(attr, bool):
+            new_attr = not attr
+        setattr(obj, attr_name, new_attr)
+        return attr_name
+
+    def check_identity_with_cache_and_db(self, content: list[Any]) -> None:
+        """Test both cache and DB equal content."""
         expected_cache = {}
         for item in content:
             expected_cache[item.id_] = item
@@ -81,129 +102,210 @@ class TestCaseWithDB(TestCase):
         db_found: list[Any] = []
         for item in content:
             assert isinstance(item.id_, type(self.default_ids[0]))
-            for row in self.db_conn.row_where(self.checked_class.table_name,
-                                              'id', item.id_):
-                db_found += [self.checked_class.from_table_row(self.db_conn,
-                                                               row)]
+            db_found += self._load_from_db(item.id_)
         hashes_db_found = [hash(x) for x in db_found]
         self.assertEqual(sorted(hashes_content), sorted(hashes_db_found))
 
-    def check_saving_and_caching(self, **kwargs: Any) -> None:
-        """Test instance.save in its core without relations."""
-        obj = self.checked_class(**kwargs)  # pylint: disable=not-callable
-        # check object init itself doesn't store anything yet
-        self.check_storage([])
-        # check saving sets core attributes properly
-        obj.save(self.db_conn)
-        for key, value in kwargs.items():
-            self.assertEqual(getattr(obj, key), value)
-        # check saving stored properly in cache and DB
-        self.check_storage([obj])
-
-    def check_saving_of_versioned(self, attr_name: str, type_: type) -> None:
-        """Test owner's versioned attributes."""
-        owner = self.checked_class(None)
-        vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1]
-        attr = getattr(owner, attr_name)
-        attr.set(vals[0])
-        attr.set(vals[1])
-        owner.save(self.db_conn)
-        retrieved = owner.__class__.by_id(self.db_conn, owner.id_)
-        attr = getattr(retrieved, attr_name)
-        self.assertEqual(sorted(attr.history.values()), vals)
-
-    def check_by_id(self) -> None:
-        """Test .by_id(), including creation."""
+    @_within_checked_class
+    def test_saving_versioned(self) -> None:
+        """Test storage and initialization of versioned attributes."""
+        def retrieve_attr_vals() -> list[object]:
+            attr_vals_saved: list[object] = []
+            assert hasattr(retrieved, 'id_')
+            for row in self.db_conn.row_where(attr.table_name, 'parent',
+                                              retrieved.id_):
+                attr_vals_saved += [row[2]]
+            return attr_vals_saved
+        for attr_name, type_ in self.test_versioneds.items():
+            # fail saving attributes on non-saved owner
+            owner = self.checked_class(None, **self.default_init_kwargs)
+            vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1]
+            attr = getattr(owner, attr_name)
+            attr.set(vals[0])
+            attr.set(vals[1])
+            with self.assertRaises(NotFoundException):
+                attr.save(self.db_conn)
+            owner.save(self.db_conn)
+            # check stored attribute is as expected
+            retrieved = self._load_from_db(owner.id_)[0]
+            attr = getattr(retrieved, attr_name)
+            self.assertEqual(sorted(attr.history.values()), vals)
+            # check owner.save() created entries in attr table
+            attr_vals_saved = retrieve_attr_vals()
+            self.assertEqual(vals, attr_vals_saved)
+            # check setting new val to attr inconsequential to DB without save
+            attr.set(vals[0])
+            attr_vals_saved = retrieve_attr_vals()
+            self.assertEqual(vals, attr_vals_saved)
+            # check save finally adds new val
+            attr.save(self.db_conn)
+            attr_vals_saved = retrieve_attr_vals()
+            self.assertEqual(vals + [vals[0]], attr_vals_saved)
+
+    @_within_checked_class
+    def test_saving_and_caching(self) -> None:
+        """Test effects of .cache() and .save()."""
+        id1 = self.default_ids[0]
+        # check failure to cache without ID (if None-ID input possible)
+        if isinstance(id1, int):
+            obj0 = self.checked_class(None, **self.default_init_kwargs)
+            with self.assertRaises(HandledException):
+                obj0.cache()
+        # check mere object init itself doesn't even store in cache
+        obj1 = self.checked_class(id1, **self.default_init_kwargs)
+        self.assertEqual(self.checked_class.get_cache(), {})
+        # check .cache() fills cache, but not DB
+        obj1.cache()
+        self.assertEqual(self.checked_class.get_cache(), {id1: obj1})
+        db_found = self._load_from_db(id1)
+        self.assertEqual(db_found, [])
+        # check .save() sets ID (for int IDs), updates cache, and fills DB
+        # (expect ID to be set to id1, despite obj1 already having that as ID:
+        # it's generated by cursor.lastrowid on the DB table, and with obj1
+        # not written there, obj2 should get it first!)
+        id_input = None if isinstance(id1, int) else id1
+        obj2 = self.checked_class(id_input, **self.default_init_kwargs)
+        obj2.save(self.db_conn)
+        obj2_hash = hash(obj2)
+        self.assertEqual(self.checked_class.get_cache(), {id1: obj2})
+        db_found += self._load_from_db(id1)
+        self.assertEqual([hash(o) for o in db_found], [obj2_hash])
+        # check we cannot overwrite obj2 with obj1 despite its same ID,
+        # since it has disappeared now
+        with self.assertRaises(HandledException):
+            obj1.save(self.db_conn)
+
+    @_within_checked_class
+    def test_by_id(self) -> None:
+        """Test .by_id()."""
+        id1, id2, _ = self.default_ids
         # check failure if not yet saved
-        id1, id2 = self.default_ids[0], self.default_ids[1]
-        obj = self.checked_class(id1)  # pylint: disable=not-callable
+        obj1 = self.checked_class(id1, **self.default_init_kwargs)
         with self.assertRaises(NotFoundException):
             self.checked_class.by_id(self.db_conn, id1)
+        # check identity of cached and retrieved
+        obj1.cache()
+        self.assertEqual(obj1, self.checked_class.by_id(self.db_conn, id1))
         # check identity of saved and retrieved
-        obj.save(self.db_conn)
-        self.assertEqual(obj, self.checked_class.by_id(self.db_conn, id1))
-        # check create=True acts like normal instantiation (sans saving)
-        by_id_created = self.checked_class.by_id(self.db_conn, id2,
-                                                 create=True)
-        # pylint: disable=not-callable
-        self.assertEqual(self.checked_class(id2), by_id_created)
-        self.check_storage([obj])
-
-    def check_from_table_row(self, *args: Any) -> None:
-        """Test .from_table_row() properly reads in class from DB"""
+        obj2 = self.checked_class(id2, **self.default_init_kwargs)
+        obj2.save(self.db_conn)
+        self.assertEqual(obj2, self.checked_class.by_id(self.db_conn, id2))
+
+    @_within_checked_class
+    def test_by_id_or_create(self) -> None:
+        """Test .by_id_or_create."""
+        # check .by_id_or_create fails if wrong class
+        if not self.checked_class.can_create_by_id:
+            with self.assertRaises(HandledException):
+                self.checked_class.by_id_or_create(self.db_conn, None)
+            return
+        # check ID input of None creates, on saving, ID=1,2,… for int IDs
+        if isinstance(self.default_ids[0], int):
+            for n in range(2):
+                item = self.checked_class.by_id_or_create(self.db_conn, None)
+                self.assertEqual(item.id_, None)
+                item.save(self.db_conn)
+                self.assertEqual(item.id_, n+1)
+        # check .by_id_or_create acts like normal instantiation (sans saving)
+        id_ = self.default_ids[2]
+        item = self.checked_class.by_id_or_create(self.db_conn, id_)
+        self.assertEqual(item.id_, id_)
+        with self.assertRaises(NotFoundException):
+            self.checked_class.by_id(self.db_conn, item.id_)
+        self.assertEqual(self.checked_class(item.id_), item)
+
+    @_within_checked_class
+    def test_from_table_row(self) -> None:
+        """Test .from_table_row() properly reads in class directly from DB."""
         id_ = self.default_ids[0]
-        obj = self.checked_class(id_, *args)  # pylint: disable=not-callable
+        obj = self.checked_class(id_, **self.default_init_kwargs)
         obj.save(self.db_conn)
-        assert isinstance(obj.id_, type(self.default_ids[0]))
+        assert isinstance(obj.id_, type(id_))
         for row in self.db_conn.row_where(self.checked_class.table_name,
                                           'id', obj.id_):
+            # check .from_table_row reproduces state saved, no matter if obj
+            # later changed (with caching even)
             hash_original = hash(obj)
+            attr_name = self._change_obj(obj)
+            obj.cache()
+            to_cmp = getattr(obj, attr_name)
             retrieved = self.checked_class.from_table_row(self.db_conn, row)
+            self.assertNotEqual(to_cmp, getattr(retrieved, attr_name))
             self.assertEqual(hash_original, hash(retrieved))
+            # check cache contains what .from_table_row just produced
             self.assertEqual({retrieved.id_: retrieved},
                              self.checked_class.get_cache())
+        # check .from_table_row also reads versioned attributes from DB
+        for attr_name, type_ in self.test_versioneds.items():
+            owner = self.checked_class(None)
+            vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1]
+            attr = getattr(owner, attr_name)
+            attr.set(vals[0])
+            attr.set(vals[1])
+            owner.save(self.db_conn)
+            for row in self.db_conn.row_where(owner.table_name, 'id',
+                                              owner.id_):
+                retrieved = owner.__class__.from_table_row(self.db_conn, row)
+                attr = getattr(retrieved, attr_name)
+                self.assertEqual(sorted(attr.history.values()), vals)
 
-    def check_versioned_from_table_row(self, attr_name: str,
-                                       type_: type) -> None:
-        """Test .from_table_row() reads versioned attributes from DB."""
-        owner = self.checked_class(None)
-        vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1]
-        attr = getattr(owner, attr_name)
-        attr.set(vals[0])
-        attr.set(vals[1])
-        owner.save(self.db_conn)
-        for row in self.db_conn.row_where(owner.table_name, 'id', owner.id_):
-            retrieved = owner.__class__.from_table_row(self.db_conn, row)
-            attr = getattr(retrieved, attr_name)
-            self.assertEqual(sorted(attr.history.values()), vals)
-
-    def check_all(self) -> tuple[Any, Any, Any]:
-        """Test .all()."""
-        # pylint: disable=not-callable
-        item1 = self.checked_class(self.default_ids[0])
-        item2 = self.checked_class(self.default_ids[1])
-        item3 = self.checked_class(self.default_ids[2])
-        # check pre-save .all() returns empty list
+    @_within_checked_class
+    def test_all(self) -> None:
+        """Test .all() and its relation to cache and savings."""
+        id_1, id_2, id_3 = self.default_ids
+        item1 = self.checked_class(id_1, **self.default_init_kwargs)
+        item2 = self.checked_class(id_2, **self.default_init_kwargs)
+        item3 = self.checked_class(id_3, **self.default_init_kwargs)
+        # check .all() returns empty list on un-cached items
         self.assertEqual(self.checked_class.all(self.db_conn), [])
-        # check that all() shows all saved, but no unsaved items
-        item1.save(self.db_conn)
+        # check that all() shows only cached/saved items
+        item1.cache()
         item3.save(self.db_conn)
         self.assertEqual(sorted(self.checked_class.all(self.db_conn)),
                          sorted([item1, item3]))
         item2.save(self.db_conn)
         self.assertEqual(sorted(self.checked_class.all(self.db_conn)),
                          sorted([item1, item2, item3]))
-        return item1, item2, item3
 
-    def check_singularity(self, defaulting_field: str,
-                          non_default_value: Any, *args: Any) -> None:
+    @_within_checked_class
+    def test_singularity(self) -> None:
         """Test pointers made for single object keep pointing to it."""
         id1 = self.default_ids[0]
-        obj = self.checked_class(id1, *args)  # pylint: disable=not-callable
+        obj = self.checked_class(id1, **self.default_init_kwargs)
         obj.save(self.db_conn)
-        setattr(obj, defaulting_field, non_default_value)
+        # change object, expect retrieved through .by_id to carry change
+        attr_name = self._change_obj(obj)
+        new_attr = getattr(obj, attr_name)
         retrieved = self.checked_class.by_id(self.db_conn, id1)
-        self.assertEqual(non_default_value,
-                         getattr(retrieved, defaulting_field))
+        self.assertEqual(new_attr, getattr(retrieved, attr_name))
 
-    def check_versioned_singularity(self) -> None:
+    @_within_checked_class
+    def test_versioned_singularity_title(self) -> None:
         """Test singularity of VersionedAttributes on saving (with .title)."""
-        obj = self.checked_class(None)  # pylint: disable=not-callable
-        obj.save(self.db_conn)
-        assert isinstance(obj.id_, int)
-        obj.title.set('named')
-        retrieved = self.checked_class.by_id(self.db_conn, obj.id_)
-        self.assertEqual(obj.title.history, retrieved.title.history)
+        if 'title' in self.test_versioneds:
+            obj = self.checked_class(None)
+            obj.save(self.db_conn)
+            assert isinstance(obj.id_, int)
+            # change obj, expect retrieved through .by_id to carry change
+            obj.title.set('named')
+            retrieved = self.checked_class.by_id(self.db_conn, obj.id_)
+            self.assertEqual(obj.title.history, retrieved.title.history)
 
-    def check_remove(self, *args: Any) -> None:
+    @_within_checked_class
+    def test_remove(self) -> None:
         """Test .remove() effects on DB and cache."""
         id_ = self.default_ids[0]
-        obj = self.checked_class(id_, *args)  # pylint: disable=not-callable
+        obj = self.checked_class(id_, **self.default_init_kwargs)
+        # check removal only works after saving
         with self.assertRaises(HandledException):
             obj.remove(self.db_conn)
         obj.save(self.db_conn)
         obj.remove(self.db_conn)
-        self.check_storage([])
+        # check access to obj fails after removal
+        with self.assertRaises(HandledException):
+            print(obj.id_)
+        # check DB and cache now empty
+        self.check_identity_with_cache_and_db([])
 
 
 class TestCaseWithServer(TestCaseWithDB):
@@ -217,6 +319,7 @@ class TestCaseWithServer(TestCaseWithDB):
         self.server_thread.start()
         self.conn = HTTPConnection(str(self.httpd.server_address[0]),
                                    self.httpd.server_address[1])
+        self.httpd.set_json_mode()
 
     def tearDown(self) -> None:
         self.httpd.shutdown()
@@ -224,6 +327,71 @@ class TestCaseWithServer(TestCaseWithDB):
         self.server_thread.join()
         super().tearDown()
 
+    @staticmethod
+    def as_id_list(items: list[dict[str, object]]) -> list[int | str]:
+        """Return list of only 'id' fields of items."""
+        id_list = []
+        for item in items:
+            assert isinstance(item['id'], (int, str))
+            id_list += [item['id']]
+        return id_list
+
+    @staticmethod
+    def as_refs(items: list[dict[str, object]]
+                ) -> dict[str, dict[str, object]]:
+        """Return dictionary of items by their 'id' fields."""
+        refs = {}
+        for item in items:
+            refs[str(item['id'])] = item
+        return refs
+
+    @staticmethod
+    def cond_as_dict(id_: int = 1,
+                     is_active: bool = False,
+                     titles: None | list[str] = None,
+                     descriptions: None | list[str] = None
+                     ) -> dict[str, object]:
+        """Return JSON of Condition to expect."""
+        d = {'id': id_,
+             'is_active': is_active,
+             '_versioned': {
+                 'title': {},
+                 'description': {}}}
+        titles = titles if titles else []
+        descriptions = descriptions if descriptions else []
+        assert isinstance(d['_versioned'], dict)
+        for i, title in enumerate(titles):
+            d['_versioned']['title'][i] = title
+        for i, description in enumerate(descriptions):
+            d['_versioned']['description'][i] = description
+        return d
+
+    @staticmethod
+    def proc_as_dict(id_: int = 1,
+                     title: str = 'A',
+                     description: str = '',
+                     effort: float = 1.0,
+                     conditions: None | list[int] = None,
+                     disables: None | list[int] = None,
+                     blockers: None | list[int] = None,
+                     enables: None | list[int] = None
+                     ) -> dict[str, object]:
+        """Return JSON of Process to expect."""
+        # pylint: disable=too-many-arguments
+        d = {'id': id_,
+             'calendarize': False,
+             'suppressed_steps': [],
+             'explicit_steps': [],
+             '_versioned': {
+                 'title': {0: title},
+                 'description': {0: description},
+                 'effort': {0: effort}},
+             'conditions': conditions if conditions else [],
+             'disables': disables if disables else [],
+             'enables': enables if enables else [],
+             'blockers': blockers if blockers else []}
+        return d
+
     def check_redirect(self, target: str) -> None:
         """Check that self.conn answers with a 302 redirect to target."""
         response = self.conn.getresponse()
@@ -267,3 +435,31 @@ class TestCaseWithServer(TestCaseWithDB):
         self.check_post(form_data, f'/process?id={id_}', 302,
                         f'/process?id={id_}')
         return form_data
+
+    def check_json_get(self, path: str, expected: dict[str, object]) -> None:
+        """Compare JSON on GET path with expected.
+
+        To simplify comparison of VersionedAttribute histories, transforms
+        timestamp keys of VersionedAttribute history keys into integers
+        counting chronologically forward from 0.
+        """
+        def rewrite_history_keys_in(item: Any) -> Any:
+            if isinstance(item, dict):
+                if '_versioned' in item.keys():
+                    for k in item['_versioned']:
+                        vals = item['_versioned'][k].values()
+                        history = {}
+                        for i, val in enumerate(vals):
+                            history[i] = val
+                        item['_versioned'][k] = history
+                for k in list(item.keys()):
+                    rewrite_history_keys_in(item[k])
+            elif isinstance(item, list):
+                item[:] = [rewrite_history_keys_in(i) for i in item]
+            return item
+        self.conn.request('GET', path)
+        response = self.conn.getresponse()
+        self.assertEqual(response.status, 200)
+        retrieved = json_loads(response.read().decode())
+        rewrite_history_keys_in(retrieved)
+        self.assertEqual(expected, retrieved)