[BASIC]
init-hook='import sys; sys.path.append(".")'
-good-names-rgxs=.*_?do_(GET|POST)(_[a-z]+)?,test_[A-Z]+
+good-names-rgxs=(.*_)?(GET|POST)(_.+)?,,test_[A-Z]+
"""Non-doable elements of ProcessStep/Todo chains."""
from __future__ import annotations
-from typing import Any
-from sqlite3 import Row
from plomtask.db import DatabaseConnection, BaseModel
from plomtask.versioned_attributes import VersionedAttribute
from plomtask.exceptions import HandledException
to_save = ['is_active']
to_save_versioned = ['title', 'description']
to_search = ['title.newest', 'description.newest']
+ can_create_by_id = True
+ sorters = {'is_active': lambda c: c.is_active,
+ 'title': lambda c: c.title.newest}
def __init__(self, id_: int | None, is_active: bool = False) -> None:
super().__init__(id_)
self.description = VersionedAttribute(self, 'condition_descriptions',
'')
- @classmethod
- def from_table_row(cls, db_conn: DatabaseConnection,
- row: Row | list[Any]) -> Condition:
- """Build condition from row, including VersionedAttributes."""
- condition = super().from_table_row(db_conn, row)
- for name in ('title', 'description'):
- table_name = f'condition_{name}s'
- for row_ in db_conn.row_where(table_name, 'parent', row[0]):
- getattr(condition, name).history_from_row(row_)
- return condition
-
def remove(self, db_conn: DatabaseConnection) -> None:
"""Remove from DB, with VersionedAttributes.
Checks for Todos and Processes that depend on Condition, prohibits
deletion if found.
"""
- if self.id_ is None:
- raise HandledException('cannot remove unsaved item')
- for item in ('process', 'todo'):
- for attr in ('conditions', 'blockers', 'enables', 'disables'):
- table_name = f'{item}_{attr}'
- for _ in db_conn.row_where(table_name, 'condition', self.id_):
- raise HandledException('cannot remove Condition in use')
+ if self.id_ is not None:
+ for item in ('process', 'todo'):
+ for attr in ('conditions', 'blockers', 'enables', 'disables'):
+ table_name = f'{item}_{attr}'
+ for _ in db_conn.row_where(table_name, 'condition',
+ self.id_):
+ msg = 'cannot remove Condition in use'
+ raise HandledException(msg)
super().remove(db_conn)
"""Individual days defined by their dates."""
table_name = 'days'
to_save = ['comment']
+ add_to_dict = ['todos']
+ can_create_by_id = True
def __init__(self, date: str, comment: str = '') -> None:
id_ = valid_date(date)
return day
@classmethod
- def by_id(cls,
- db_conn: DatabaseConnection, id_: str | None,
- create: bool = False,
- ) -> Day:
- """Extend BaseModel.by_id checking for new/lost .todos."""
- day = super().by_id(db_conn, id_, create)
- assert day.id_ is not None
+ def by_id(cls, db_conn: DatabaseConnection, id_: str) -> Day:
+ """Extend BaseModel.by_id
+
+ Checks Todo.days_to_update if we need to a retrieved Day's .todos,
+ and also ensures we're looking for proper dates and not strings like
+ "yesterday" by enforcing the valid_date translation.
+ """
+ assert isinstance(id_, str)
+ possibly_translated_date = valid_date(id_)
+ day = super().by_id(db_conn, possibly_translated_date)
if day.id_ in Todo.days_to_update:
Todo.days_to_update.remove(day.id_)
day.todos = Todo.by_date(db_conn, day.id_)
from os.path import isfile
from difflib import Differ
from sqlite3 import connect as sql_connect, Cursor, Row
-from typing import Any, Self, TypeVar, Generic
+from typing import Any, Self, TypeVar, Generic, Callable
from plomtask.exceptions import HandledException, NotFoundException
from plomtask.dating import valid_date
to_save: list[str] = []
to_save_versioned: list[str] = []
to_save_relations: list[tuple[str, str, str, int]] = []
+ add_to_dict: list[str] = []
id_: None | BaseModelId
cache_: dict[BaseModelId, Self]
to_search: list[str] = []
+ can_create_by_id = False
_exists = True
+ sorters: dict[str, Callable[..., Any]] = {}
def __init__(self, id_: BaseModelId | None) -> None:
if isinstance(id_, int) and id_ < 1:
assert isinstance(other.id_, int)
return self.id_ < other.id_
+ @property
+ def as_dict(self) -> dict[str, object]:
+ """Return self as (json.dumps-compatible) dict."""
+ library: dict[str, dict[str | int, object]] = {}
+ d: dict[str, object] = {'id': self.id_, '_library': library}
+ for to_save in self.to_save:
+ attr = getattr(self, to_save)
+ if hasattr(attr, 'as_dict_into_reference'):
+ d[to_save] = attr.as_dict_into_reference(library)
+ else:
+ d[to_save] = attr
+ if len(self.to_save_versioned) > 0:
+ d['_versioned'] = {}
+ for k in self.to_save_versioned:
+ attr = getattr(self, k)
+ assert isinstance(d['_versioned'], dict)
+ d['_versioned'][k] = attr.history
+ for r in self.to_save_relations:
+ attr_name = r[2]
+ l: list[int | str] = []
+ for rel in getattr(self, attr_name):
+ l += [rel.as_dict_into_reference(library)]
+ d[attr_name] = l
+ for k in self.add_to_dict:
+ d[k] = [x.as_dict_into_reference(library)
+ for x in getattr(self, k)]
+ return d
+
+ def as_dict_into_reference(self,
+ library: dict[str, dict[str | int, object]]
+ ) -> int | str:
+ """Return self.id_ while writing .as_dict into library."""
+ def into_library(library: dict[str, dict[str | int, object]],
+ cls_name: str,
+ id_: str | int,
+ d: dict[str, object]
+ ) -> None:
+ if cls_name not in library:
+ library[cls_name] = {}
+ if id_ in library[cls_name]:
+ if library[cls_name][id_] != d:
+ msg = 'Unexpected inequality of entries for ' +\
+ f'_library at: {cls_name}/{id_}'
+ raise HandledException(msg)
+ else:
+ library[cls_name][id_] = d
+ as_dict = self.as_dict
+ assert isinstance(as_dict['_library'], dict)
+ for cls_name, dict_of_objs in as_dict['_library'].items():
+ for id_, obj in dict_of_objs.items():
+ into_library(library, cls_name, id_, obj)
+ del as_dict['_library']
+ assert self.id_ is not None
+ into_library(library, self.__class__.__name__, self.id_, as_dict)
+ assert isinstance(as_dict['id'], (int, str))
+ return as_dict['id']
+
+ @classmethod
+ def name_lowercase(cls) -> str:
+ """Convenience method to return cls' name in lowercase."""
+ return cls.__name__.lower()
+
+ @classmethod
+ def sort_by(cls, seq: list[Any], sort_key: str, default: str = 'title'
+ ) -> str:
+ """Sort cls list by cls.sorters[sort_key] (reverse if '-'-prefixed)."""
+ reverse = False
+ if len(sort_key) > 1 and '-' == sort_key[0]:
+ sort_key = sort_key[1:]
+ reverse = True
+ if sort_key not in cls.sorters:
+ sort_key = default
+ sorter: Callable[..., Any] = cls.sorters[sort_key]
+ seq.sort(key=sorter, reverse=reverse)
+ if reverse:
+ sort_key = f'-{sort_key}'
+ return sort_key
+
# cache management
# (we primarily use the cache to ensure we work on the same object in
# memory no matter where and how we retrieve it, e.g. we don't want
@classmethod
def empty_cache(cls) -> None:
- """Empty class's cache."""
+ """Empty class's cache, and disappear all former inhabitants."""
+ # pylint: disable=protected-access
+ # (cause we remain within the class)
+ if hasattr(cls, 'cache_'):
+ to_disappear = list(cls.cache_.values())
+ for item in to_disappear:
+ item._disappear()
cls.cache_ = {}
@classmethod
def _get_cached(cls: type[BaseModelInstance],
id_: BaseModelId) -> BaseModelInstance | None:
"""Get object of id_ from class's cache, or None if not found."""
- # pylint: disable=consider-iterating-dictionary
cache = cls.get_cache()
- if id_ in cache.keys():
+ if id_ in cache:
obj = cache[id_]
assert isinstance(obj, cls)
return obj
return None
- def _cache(self) -> None:
+ def cache(self) -> None:
"""Update object in class's cache.
Also calls ._disappear if cache holds older reference to object of same
# pylint: disable=unused-argument
db_conn: DatabaseConnection,
row: Row | list[Any]) -> BaseModelInstance:
- """Make from DB row, update DB cache with it."""
+ """Make from DB row (sans relations), update DB cache with it."""
obj = cls(*row)
- obj._cache()
+ assert obj.id_ is not None
+ for attr_name in cls.to_save_versioned:
+ attr = getattr(obj, attr_name)
+ table_name = attr.table_name
+ for row_ in db_conn.row_where(table_name, 'parent', obj.id_):
+ attr.history_from_row(row_)
+ obj.cache()
return obj
@classmethod
- def by_id(cls, db_conn: DatabaseConnection,
- id_: BaseModelId | None,
- # pylint: disable=unused-argument
- create: bool = False) -> Self:
+ def by_id(cls, db_conn: DatabaseConnection, id_: BaseModelId) -> Self:
"""Retrieve by id_, on failure throw NotFoundException.
First try to get from cls.cache_, only then check DB; if found,
put into cache.
-
- If create=True, make anew (but do not cache yet).
"""
obj = None
if id_ is not None:
break
if obj:
return obj
- if create:
- obj = cls(id_)
- return obj
raise NotFoundException(f'found no object of ID {id_}')
+ @classmethod
+ def by_id_or_create(cls, db_conn: DatabaseConnection,
+ id_: BaseModelId | None
+ ) -> Self:
+ """Wrapper around .by_id, creating (not caching/saving) if not find."""
+ if not cls.can_create_by_id:
+ raise HandledException('Class cannot .by_id_or_create.')
+ if id_ is None:
+ return cls(None)
+ try:
+ return cls.by_id(db_conn, id_)
+ except NotFoundException:
+ return cls(id_)
+
@classmethod
def all(cls: type[BaseModelInstance],
db_conn: DatabaseConnection) -> list[BaseModelInstance]:
values)
if not isinstance(self.id_, str):
self.id_ = cursor.lastrowid # type: ignore[assignment]
- self._cache()
+ self.cache()
for attr_name in self.to_save_versioned:
getattr(self, attr_name).save(db_conn)
for table, column, attr_name, key_index in self.to_save_relations:
"""Web server stuff."""
from __future__ import annotations
from dataclasses import dataclass
-from typing import Any, Callable, Mapping
+from typing import Any, Callable
from base64 import b64encode, b64decode
+from binascii import Error as binascii_Exception
from http.server import BaseHTTPRequestHandler
from http.server import HTTPServer
from urllib.parse import urlparse, parse_qs
+from json import dumps as json_dumps
from os.path import split as path_split
from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader
from plomtask.dating import date_in_n_days
from plomtask.days import Day
-from plomtask.exceptions import HandledException, BadFormatException, \
- NotFoundException
+from plomtask.exceptions import (HandledException, BadFormatException,
+ NotFoundException)
from plomtask.db import DatabaseConnection, DatabaseFile
from plomtask.processes import Process, ProcessStep, ProcessStepsNode
from plomtask.conditions import Condition
*args: Any, **kwargs: Any) -> None:
super().__init__(*args, **kwargs)
self.db = db_file
- self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
+ self.headers: list[tuple[str, str]] = []
+ self._render_mode = 'html'
+ self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR))
+
+ def set_json_mode(self) -> None:
+ """Make server send JSON instead of HTML responses."""
+ self._render_mode = 'json'
+ self.headers += [('Content-Type', 'application/json')]
+
+ @staticmethod
+ def ctx_to_json(ctx: dict[str, object]) -> str:
+ """Render ctx into JSON string."""
+ def walk_ctx(node: object) -> Any:
+ if hasattr(node, 'as_dict_into_reference'):
+ if hasattr(node, 'id_') and node.id_ is not None:
+ return node.as_dict_into_reference(library)
+ if hasattr(node, 'as_dict'):
+ return node.as_dict
+ if isinstance(node, (list, tuple)):
+ return [walk_ctx(x) for x in node]
+ if isinstance(node, dict):
+ d = {}
+ for k, v in node.items():
+ d[k] = walk_ctx(v)
+ return d
+ if isinstance(node, HandledException):
+ return str(node)
+ return node
+ library: dict[str, dict[str | int, object]] = {}
+ for k, v in ctx.items():
+ ctx[k] = walk_ctx(v)
+ ctx['_library'] = library
+ return json_dumps(ctx)
+
+ def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
+ """Render ctx according to self._render_mode.."""
+ tmpl_name = f'{tmpl_name}.{self._render_mode}'
+ if 'html' == self._render_mode:
+ template = self._jinja.get_template(tmpl_name)
+ return template.render(ctx)
+ return self.__class__.ctx_to_json(ctx)
class InputsParser:
msg = f'cannot int a form field value for key {key} in: {all_str}'
raise BadFormatException(msg) from e
+ def get_all_floats_or_nones(self, key: str) -> list[float | None]:
+ """Retrieve list of float value at key, None if empty strings."""
+ ret: list[float | None] = []
+ for val in self.get_all_str(key):
+ if '' == val:
+ ret += [None]
+ else:
+ try:
+ ret += [float(val)]
+ except ValueError as e:
+ msg = f'cannot float form field value for key {key}: {val}'
+ raise BadFormatException(msg) from e
+ return ret
+
class TaskHandler(BaseHTTPRequestHandler):
"""Handles single HTTP request."""
_form_data: InputsParser
_params: InputsParser
- def _send_html(self,
+ def _send_page(self,
+ ctx: dict[str, Any],
tmpl_name: str,
- ctx: Mapping[str, object],
- code: int = 200) -> None:
- """Send HTML as proper HTTP response."""
- tmpl = self.server.jinja.get_template(tmpl_name)
- html = tmpl.render(ctx)
+ code: int = 200
+ ) -> None:
+ """Send ctx as proper HTTP response."""
+ body = self.server.render(ctx, tmpl_name)
self.send_response(code)
+ for header_tuple in self.server.headers:
+ self.send_header(*header_tuple)
self.end_headers()
- self.wfile.write(bytes(html, 'utf-8'))
+ self.wfile.write(bytes(body, 'utf-8'))
@staticmethod
def _request_wrapper(http_method: str, not_found_msg: str
) -> Callable[..., Callable[[TaskHandler], None]]:
+ """Wrapper for do_GET… and do_POST… handlers, to init and clean up.
+
+ Among other things, conditionally cleans all caches, but only on POST
+ requests, as only those are expected to change the states of objects
+ that may be cached, and certainly only those are expected to write any
+ changes to the database. We want to call them as early though as
+ possible here, either exactly after the specific request handler
+ returns successfully, or right after any exception is triggered –
+ otherwise, race conditions become plausible.
+
+ Note that otherwise any POST attempt, even a failed one, may end in
+ problematic inconsistencies:
+
+ - if the POST handler experiences an Exception, changes to objects
+ won't get written to the DB, but the changed objects may remain in
+ the cache and affect other objects despite their possibly illegal
+ state
+
+ - even if an object was just saved to the DB, we cannot be sure its
+ current state is completely identical to what we'd get if loading it
+ fresh from the DB (e.g. currently Process.n_owners is only updated
+ when loaded anew via .from_table_row, nor is its state written to
+ the DB by .save; a questionable design choice, but proof that we
+ have no guarantee that objects' .save stores all their states we'd
+ prefer at their most up-to-date.
+ """
+
+ def clear_caches() -> None:
+ for cls in (Day, Todo, Condition, Process, ProcessStep):
+ assert hasattr(cls, 'empty_cache')
+ cls.empty_cache()
+
def decorator(f: Callable[..., str | None]
) -> Callable[[TaskHandler], None]:
def wrapper(self: TaskHandler) -> None:
if hasattr(self, handler_name):
handler = getattr(self, handler_name)
redir_target = f(self, handler)
+ if 'POST' == http_method:
+ clear_caches()
if redir_target:
self.send_response(302)
self.send_header('Location', redir_target)
msg = f'{not_found_msg}: {self._site}'
raise NotFoundException(msg)
except HandledException as error:
- for cls in (Day, Todo, Condition, Process, ProcessStep):
- assert hasattr(cls, 'empty_cache')
- cls.empty_cache()
+ if 'POST' == http_method:
+ clear_caches()
ctx = {'msg': error}
- self._send_html('msg.html', ctx, error.http_code)
+ self._send_page(ctx, 'msg', error.http_code)
finally:
self.conn.close()
return wrapper
def do_GET(self, handler: Callable[[], str | dict[str, object]]
) -> str | None:
"""Render page with result of handler, or redirect if result is str."""
- tmpl_name = f'{self._site}.html'
- ctx_or_redir = handler()
- if isinstance(ctx_or_redir, str):
- return ctx_or_redir
- self._send_html(tmpl_name, ctx_or_redir)
+ tmpl_name = f'{self._site}'
+ ctx_or_redir_target = handler()
+ if isinstance(ctx_or_redir_target, str):
+ return ctx_or_redir_target
+ self._send_page(ctx_or_redir_target, tmpl_name)
return None
@_request_wrapper('POST', 'Unknown POST target')
# GET handlers
+ @staticmethod
+ def _get_item(target_class: Any
+ ) -> Callable[..., Callable[[TaskHandler],
+ dict[str, object]]]:
+ def decorator(f: Callable[..., dict[str, object]]
+ ) -> Callable[[TaskHandler], dict[str, object]]:
+ def wrapper(self: TaskHandler) -> dict[str, object]:
+ # pylint: disable=protected-access
+ # (because pylint here fails to detect the use of wrapper as a
+ # method to self with respective access privileges)
+ id_ = self._params.get_int_or_none('id')
+ if target_class.can_create_by_id:
+ item = target_class.by_id_or_create(self.conn, id_)
+ else:
+ item = target_class.by_id(self.conn, id_)
+ return f(self, item)
+ return wrapper
+ return decorator
+
def do_GET_(self) -> str:
"""Return redirect target on GET /."""
return '/day'
def do_GET_day(self) -> dict[str, object]:
"""Show single Day of ?date=."""
date = self._params.get_str('date', date_in_n_days(0))
- day = Day.by_id(self.conn, date, create=True)
+ day = Day.by_id_or_create(self.conn, date)
make_type = self._params.get_str('make_type')
conditions_present = []
enablers_for = {}
'conditions_present': conditions_present,
'processes': Process.all(self.conn)}
- def do_GET_todo(self) -> dict[str, object]:
+ @_get_item(Todo)
+ def do_GET_todo(self, todo: Todo) -> dict[str, object]:
"""Show single Todo of ?id=."""
@dataclass
ids = ids | collect_adoptables_keys(node.children)
return ids
- id_ = self._params.get_int('id')
- todo = Todo.by_id(self.conn, id_)
todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
process_tree = todo.process.get_steps(self.conn, None)
steps_todo_to_process: list[TodoStepsNode] = []
adoptables: dict[int, list[Todo]] = {}
any_adoptables = [Todo.by_id(self.conn, t.id_)
for t in Todo.by_date(self.conn, todo.date)
- if t != todo]
+ if t.id_ is not None
+ and t != todo]
for id_ in collect_adoptables_keys(steps_todo_to_process):
adoptables[id_] = [t for t in any_adoptables
if t.process.id_ == id_]
todos = [t for t in todos_by_date_range
if comment_pattern in t.comment
and ((not process_id) or t.process.id_ == process_id)]
- if sort_by == 'doneness':
- todos.sort(key=lambda t: t.is_done)
- elif sort_by == '-doneness':
- todos.sort(key=lambda t: t.is_done, reverse=True)
- elif sort_by == 'title':
- todos.sort(key=lambda t: t.title_then)
- elif sort_by == '-title':
- todos.sort(key=lambda t: t.title_then, reverse=True)
- elif sort_by == 'comment':
- todos.sort(key=lambda t: t.comment)
- elif sort_by == '-comment':
- todos.sort(key=lambda t: t.comment, reverse=True)
- elif sort_by == '-date':
- todos.sort(key=lambda t: t.date, reverse=True)
- else:
- todos.sort(key=lambda t: t.date)
+ sort_by = Todo.sort_by(todos, sort_by)
return {'start': start, 'end': end, 'process_id': process_id,
'comment_pattern': comment_pattern, 'todos': todos,
'all_processes': Process.all(self.conn), 'sort_by': sort_by}
def do_GET_conditions(self) -> dict[str, object]:
"""Show all Conditions."""
pattern = self._params.get_str('pattern')
- conditions = Condition.matching(self.conn, pattern)
sort_by = self._params.get_str('sort_by')
- if sort_by == 'is_active':
- conditions.sort(key=lambda c: c.is_active)
- elif sort_by == '-is_active':
- conditions.sort(key=lambda c: c.is_active, reverse=True)
- elif sort_by == '-title':
- conditions.sort(key=lambda c: c.title.newest, reverse=True)
- else:
- conditions.sort(key=lambda c: c.title.newest)
+ conditions = Condition.matching(self.conn, pattern)
+ sort_by = Condition.sort_by(conditions, sort_by)
return {'conditions': conditions,
'sort_by': sort_by,
'pattern': pattern}
- def do_GET_condition(self) -> dict[str, object]:
+ @_get_item(Condition)
+ def do_GET_condition(self, c: Condition) -> dict[str, object]:
"""Show Condition of ?id=."""
- id_ = self._params.get_int_or_none('id')
- c = Condition.by_id(self.conn, id_, create=True)
ps = Process.all(self.conn)
return {'condition': c, 'is_new': c.id_ is None,
'enabled_processes': [p for p in ps if c in p.conditions],
'enabling_processes': [p for p in ps if c in p.enables],
'disabling_processes': [p for p in ps if c in p.disables]}
- def do_GET_condition_titles(self) -> dict[str, object]:
+ @_get_item(Condition)
+ def do_GET_condition_titles(self, c: Condition) -> dict[str, object]:
"""Show title history of Condition of ?id=."""
- id_ = self._params.get_int_or_none('id')
- condition = Condition.by_id(self.conn, id_)
- return {'condition': condition}
+ return {'condition': c}
- def do_GET_condition_descriptions(self) -> dict[str, object]:
+ @_get_item(Condition)
+ def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]:
"""Show description historys of Condition of ?id=."""
- id_ = self._params.get_int_or_none('id')
- condition = Condition.by_id(self.conn, id_)
- return {'condition': condition}
+ return {'condition': c}
- def do_GET_process(self) -> dict[str, object]:
+ @_get_item(Process)
+ def do_GET_process(self, process: Process) -> dict[str, object]:
"""Show Process of ?id=."""
- id_ = self._params.get_int_or_none('id')
- process = Process.by_id(self.conn, id_, create=True)
+ owner_ids = self._params.get_all_int('step_to')
+ owned_ids = self._params.get_all_int('has_step')
title_64 = self._params.get_str('title_b64')
if title_64:
- title = b64decode(title_64.encode()).decode()
+ try:
+ title = b64decode(title_64.encode()).decode()
+ except binascii_Exception as exc:
+ msg = 'invalid base64 for ?title_b64='
+ raise BadFormatException(msg) from exc
process.title.set(title)
+ preset_top_step = None
owners = process.used_as_step_by(self.conn)
- for step_id in self._params.get_all_int('step_to'):
+ for step_id in owner_ids:
owners += [Process.by_id(self.conn, step_id)]
- preset_top_step = None
- for process_id in self._params.get_all_int('has_step'):
+ for process_id in owned_ids:
+ Process.by_id(self.conn, process_id) # to ensure ID exists
preset_top_step = process_id
return {'process': process, 'is_new': process.id_ is None,
'preset_top_step': preset_top_step,
'process_candidates': Process.all(self.conn),
'condition_candidates': Condition.all(self.conn)}
- def do_GET_process_titles(self) -> dict[str, object]:
+ @_get_item(Process)
+ def do_GET_process_titles(self, p: Process) -> dict[str, object]:
"""Show title history of Process of ?id=."""
- id_ = self._params.get_int_or_none('id')
- process = Process.by_id(self.conn, id_)
- return {'process': process}
+ return {'process': p}
- def do_GET_process_descriptions(self) -> dict[str, object]:
+ @_get_item(Process)
+ def do_GET_process_descriptions(self, p: Process) -> dict[str, object]:
"""Show description historys of Process of ?id=."""
- id_ = self._params.get_int_or_none('id')
- process = Process.by_id(self.conn, id_)
- return {'process': process}
+ return {'process': p}
- def do_GET_process_efforts(self) -> dict[str, object]:
+ @_get_item(Process)
+ def do_GET_process_efforts(self, p: Process) -> dict[str, object]:
"""Show default effort history of Process of ?id=."""
- id_ = self._params.get_int_or_none('id')
- process = Process.by_id(self.conn, id_)
- return {'process': process}
+ return {'process': p}
def do_GET_processes(self) -> dict[str, object]:
"""Show all Processes."""
pattern = self._params.get_str('pattern')
- processes = Process.matching(self.conn, pattern)
sort_by = self._params.get_str('sort_by')
- if sort_by == 'steps':
- processes.sort(key=lambda p: len(p.explicit_steps))
- elif sort_by == '-steps':
- processes.sort(key=lambda p: len(p.explicit_steps), reverse=True)
- elif sort_by == 'owners':
- processes.sort(key=lambda p: p.n_owners or 0)
- elif sort_by == '-owners':
- processes.sort(key=lambda p: p.n_owners or 0, reverse=True)
- elif sort_by == 'effort':
- processes.sort(key=lambda p: p.effort.newest)
- elif sort_by == '-effort':
- processes.sort(key=lambda p: p.effort.newest, reverse=True)
- elif sort_by == '-title':
- processes.sort(key=lambda p: p.title.newest, reverse=True)
- else:
- processes.sort(key=lambda p: p.title.newest)
+ processes = Process.matching(self.conn, pattern)
+ sort_by = Process.sort_by(processes, sort_by)
return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern}
# POST handlers
+ @staticmethod
+ def _delete_or_post(target_class: Any, redir_target: str = '/'
+ ) -> Callable[..., Callable[[TaskHandler], str]]:
+ def decorator(f: Callable[..., str]
+ ) -> Callable[[TaskHandler], str]:
+ def wrapper(self: TaskHandler) -> str:
+ # pylint: disable=protected-access
+ # (because pylint here fails to detect the use of wrapper as a
+ # method to self with respective access privileges)
+ id_ = self._params.get_int_or_none('id')
+ for _ in self._form_data.get_all_str('delete'):
+ if id_ is None:
+ msg = 'trying to delete non-saved ' +\
+ f'{target_class.__name__}'
+ raise NotFoundException(msg)
+ item = target_class.by_id(self.conn, id_)
+ item.remove(self.conn)
+ return redir_target
+ if target_class.can_create_by_id:
+ item = target_class.by_id_or_create(self.conn, id_)
+ else:
+ item = target_class.by_id(self.conn, id_)
+ return f(self, item)
+ return wrapper
+ return decorator
+
def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str:
"""Update history timestamps for VersionedAttribute."""
id_ = self._params.get_int_or_none('id')
if old[19:] != v:
attr.reset_timestamp(old, f'{v}.0')
attr.save(self.conn)
- cls_name = cls.__name__.lower()
- return f'/{cls_name}_{attr_name}s?id={item.id_}'
+ return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}'
def do_POST_day(self) -> str:
"""Update or insert Day of date and Todos mapped to it."""
+ # pylint: disable=too-many-locals
date = self._params.get_str('date')
- day = Day.by_id(self.conn, date, create=True)
- day.comment = self._form_data.get_str('day_comment')
- day.save(self.conn)
+ day_comment = self._form_data.get_str('day_comment')
make_type = self._form_data.get_str('make_type')
- for process_id in sorted(self._form_data.get_all_int('new_todo')):
+ old_todos = self._form_data.get_all_int('todo_id')
+ new_todos = self._form_data.get_all_int('new_todo')
+ comments = self._form_data.get_all_str('comment')
+ efforts = self._form_data.get_all_floats_or_nones('effort')
+ done_todos = self._form_data.get_all_int('done')
+ for _ in [id_ for id_ in done_todos if id_ not in old_todos]:
+ raise BadFormatException('"done" field refers to unknown Todo')
+ is_done = [t_id in done_todos for t_id in old_todos]
+ if not (len(old_todos) == len(is_done) == len(comments)
+ == len(efforts)):
+ msg = 'not equal number each of number of todo_id, comments, ' +\
+ 'and efforts inputs'
+ raise BadFormatException(msg)
+ day = Day.by_id_or_create(self.conn, date)
+ day.comment = day_comment
+ day.save(self.conn)
+ for process_id in sorted(new_todos):
if 'empty' == make_type:
process = Process.by_id(self.conn, process_id)
todo = Todo(None, process, False, date)
todo.save(self.conn)
else:
Todo.create_with_children(self.conn, process_id, date)
- done_ids = self._form_data.get_all_int('done')
- comments = self._form_data.get_all_str('comment')
- efforts = self._form_data.get_all_str('effort')
- for i, todo_id in enumerate(self._form_data.get_all_int('todo_id')):
+ for i, todo_id in enumerate(old_todos):
todo = Todo.by_id(self.conn, todo_id)
- todo.is_done = todo_id in done_ids
- if len(comments) > 0:
- todo.comment = comments[i]
- if len(efforts) > 0:
- todo.effort = float(efforts[i]) if efforts[i] else None
+ todo.is_done = is_done[i]
+ todo.comment = comments[i]
+ todo.effort = efforts[i]
todo.save(self.conn)
return f'/day?date={date}&make_type={make_type}'
- def do_POST_todo(self) -> str:
+ @_delete_or_post(Todo, '/')
+ def do_POST_todo(self, todo: Todo) -> str:
"""Update Todo and its children."""
# pylint: disable=too-many-locals
- # pylint: disable=too-many-branches
- id_ = self._params.get_int('id')
- for _ in self._form_data.get_all_str('delete'):
- todo = Todo .by_id(self.conn, id_)
- todo.remove(self.conn)
- return '/'
- todo = Todo.by_id(self.conn, id_)
adopted_child_ids = self._form_data.get_all_int('adopt')
processes_to_make_full = self._form_data.get_all_int('make_full')
processes_to_make_empty = self._form_data.get_all_int('make_empty')
fill_fors = self._form_data.get_first_strings_starting('fill_for_')
+ effort = self._form_data.get_str('effort', ignore_strict=True)
+ conditions = self._form_data.get_all_int('conditions')
+ disables = self._form_data.get_all_int('disables')
+ blockers = self._form_data.get_all_int('blockers')
+ enables = self._form_data.get_all_int('enables')
+ is_done = len(self._form_data.get_all_str('done')) > 0
+ calendarize = len(self._form_data.get_all_str('calendarize')) > 0
+ comment = self._form_data.get_str('comment', ignore_strict=True)
for v in fill_fors.values():
if v.startswith('make_empty_'):
processes_to_make_empty += [int(v[11:])]
for process_id in processes_to_make_full:
made = Todo.create_with_children(self.conn, process_id, todo.date)
todo.add_child(made)
- effort = self._form_data.get_str('effort', ignore_strict=True)
todo.effort = float(effort) if effort else None
- todo.set_conditions(self.conn,
- self._form_data.get_all_int('condition'))
- todo.set_blockers(self.conn, self._form_data.get_all_int('blocker'))
- todo.set_enables(self.conn, self._form_data.get_all_int('enables'))
- todo.set_disables(self.conn, self._form_data.get_all_int('disables'))
- todo.is_done = len(self._form_data.get_all_str('done')) > 0
- todo.calendarize = len(self._form_data.get_all_str('calendarize')) > 0
- todo.comment = self._form_data.get_str('comment', ignore_strict=True)
+ todo.set_conditions(self.conn, conditions)
+ todo.set_blockers(self.conn, blockers)
+ todo.set_enables(self.conn, enables)
+ todo.set_disables(self.conn, disables)
+ todo.is_done = is_done
+ todo.calendarize = calendarize
+ todo.comment = comment
todo.save(self.conn)
return f'/todo?id={todo.id_}'
"""Update history timestamps for Process.title."""
return self._change_versioned_timestamps(Process, 'title')
- def do_POST_process(self) -> str:
+ @_delete_or_post(Process, '/processes')
+ def do_POST_process(self, process: Process) -> str:
"""Update or insert Process of ?id= and fields defined in postvars."""
- # pylint: disable=too-many-branches
- id_ = self._params.get_int_or_none('id')
- for _ in self._form_data.get_all_str('delete'):
- process = Process.by_id(self.conn, id_)
- process.remove(self.conn)
- return '/processes'
- process = Process.by_id(self.conn, id_, create=True)
- process.title.set(self._form_data.get_str('title'))
- process.description.set(self._form_data.get_str('description'))
- process.effort.set(self._form_data.get_float('effort'))
- process.set_conditions(self.conn,
- self._form_data.get_all_int('condition'))
- process.set_blockers(self.conn, self._form_data.get_all_int('blocker'))
- process.set_enables(self.conn, self._form_data.get_all_int('enables'))
- process.set_disables(self.conn,
- self._form_data.get_all_int('disables'))
- process.calendarize = self._form_data.get_all_str('calendarize') != []
+ # pylint: disable=too-many-locals
+ # pylint: disable=too-many-statements
+ title = self._form_data.get_str('title')
+ description = self._form_data.get_str('description')
+ effort = self._form_data.get_float('effort')
+ conditions = self._form_data.get_all_int('conditions')
+ blockers = self._form_data.get_all_int('blockers')
+ enables = self._form_data.get_all_int('enables')
+ disables = self._form_data.get_all_int('disables')
+ calendarize = self._form_data.get_all_str('calendarize') != []
+ suppresses = self._form_data.get_all_int('suppresses')
+ step_of = self._form_data.get_all_str('step_of')
+ keep_steps = self._form_data.get_all_int('keep_step')
+ step_ids = self._form_data.get_all_int('steps')
+ new_top_steps = self._form_data.get_all_str('new_top_step')
+ step_process_id_to = {}
+ step_parent_id_to = {}
+ new_steps_to = {}
+ for step_id in step_ids:
+ name = f'new_step_to_{step_id}'
+ new_steps_to[step_id] = self._form_data.get_all_int(name)
+ for step_id in keep_steps:
+ name = f'step_{step_id}_process_id'
+ step_process_id_to[step_id] = self._form_data.get_int(name)
+ name = f'step_{step_id}_parent_id'
+ step_parent_id_to[step_id] = self._form_data.get_int_or_none(name)
+ process.title.set(title)
+ process.description.set(description)
+ process.effort.set(effort)
+ process.set_conditions(self.conn, conditions)
+ process.set_blockers(self.conn, blockers)
+ process.set_enables(self.conn, enables)
+ process.set_disables(self.conn, disables)
+ process.calendarize = calendarize
process.save(self.conn)
assert isinstance(process.id_, int)
+ new_step_title = None
steps: list[ProcessStep] = []
- for step_id in self._form_data.get_all_int('keep_step'):
- if step_id not in self._form_data.get_all_int('steps'):
+ for step_id in keep_steps:
+ if step_id not in step_ids:
raise BadFormatException('trying to keep unknown step')
- for step_id in self._form_data.get_all_int('steps'):
- if step_id not in self._form_data.get_all_int('keep_step'):
- continue
- step_process_id = self._form_data.get_int(
- f'step_{step_id}_process_id')
- parent_id = self._form_data.get_int_or_none(
- f'step_{step_id}_parent_id')
- steps += [ProcessStep(step_id, process.id_, step_process_id,
- parent_id)]
- for step_id in self._form_data.get_all_int('steps'):
- for step_process_id in self._form_data.get_all_int(
- f'new_step_to_{step_id}'):
- steps += [ProcessStep(None, process.id_, step_process_id,
- step_id)]
- new_step_title = None
- for step_identifier in self._form_data.get_all_str('new_top_step'):
+ step = ProcessStep(step_id, process.id_,
+ step_process_id_to[step_id],
+ step_parent_id_to[step_id])
+ steps += [step]
+ for step_id in step_ids:
+ new = [ProcessStep(None, process.id_, step_process_id, step_id)
+ for step_process_id in new_steps_to[step_id]]
+ steps += new
+ for step_identifier in new_top_steps:
try:
step_process_id = int(step_identifier)
- steps += [ProcessStep(None, process.id_, step_process_id,
- None)]
+ step = ProcessStep(None, process.id_, step_process_id, None)
+ steps += [step]
except ValueError:
new_step_title = step_identifier
process.set_steps(self.conn, steps)
- process.set_step_suppressions(self.conn,
- self._form_data.
- get_all_int('suppresses'))
+ process.set_step_suppressions(self.conn, suppresses)
owners_to_set = []
new_owner_title = None
- for owner_identifier in self._form_data.get_all_str('step_of'):
+ for owner_identifier in step_of:
try:
owners_to_set += [int(owner_identifier)]
except ValueError:
"""Update history timestamps for Condition.title."""
return self._change_versioned_timestamps(Condition, 'title')
- def do_POST_condition(self) -> str:
+ @_delete_or_post(Condition, '/conditions')
+ def do_POST_condition(self, condition: Condition) -> str:
"""Update/insert Condition of ?id= and fields defined in postvars."""
- id_ = self._params.get_int_or_none('id')
- for _ in self._form_data.get_all_str('delete'):
- condition = Condition.by_id(self.conn, id_)
- condition.remove(self.conn)
- return '/conditions'
- condition = Condition.by_id(self.conn, id_, create=True)
- condition.is_active = self._form_data.get_all_str('is_active') != []
- condition.title.set(self._form_data.get_str('title'))
- condition.description.set(self._form_data.get_str('description'))
+ is_active = self._form_data.get_str('is_active') == 'True'
+ title = self._form_data.get_str('title')
+ description = self._form_data.get_str('description')
+ condition.is_active = is_active
+ condition.title.set(title)
+ condition.description.set(description)
condition.save(self.conn)
return f'/condition?id={condition.id_}'
('process_disables', 'process', 'disables', 0),
('process_step_suppressions', 'process',
'suppressed_steps', 0)]
+ add_to_dict = ['explicit_steps']
to_search = ['title.newest', 'description.newest']
+ can_create_by_id = True
+ sorters = {'steps': lambda p: len(p.explicit_steps),
+ 'owners': lambda p: p.n_owners,
+ 'effort': lambda p: p.effort.newest,
+ 'title': lambda p: p.title.newest}
def __init__(self, id_: int | None, calendarize: bool = False) -> None:
BaseModel.__init__(self, id_)
row: Row | list[Any]) -> Process:
"""Make from DB row, with dependencies."""
process = super().from_table_row(db_conn, row)
- assert isinstance(process.id_, int)
- for name in ('title', 'description', 'effort'):
- table = f'process_{name}s'
- for row_ in db_conn.row_where(table, 'parent', process.id_):
- getattr(process, name).history_from_row(row_)
+ assert process.id_ is not None
for name in ('conditions', 'blockers', 'enables', 'disables'):
table = f'process_{name}'
assert isinstance(process.id_, int)
self.parent_step_id = parent_step_id
def save(self, db_conn: DatabaseConnection) -> None:
- """Remove from DB, and owner's .explicit_steps."""
+ """Update into DB/cache, and owner's .explicit_steps."""
super().save(db_conn)
owner = Process.by_id(db_conn, self.owner_id)
if self not in owner.explicit_steps:
"""Actionables."""
from __future__ import annotations
-from dataclasses import dataclass
from typing import Any, Set
from sqlite3 import Row
from plomtask.db import DatabaseConnection, BaseModel
from plomtask.dating import valid_date
-@dataclass
class TodoNode:
"""Collects what's useful to know for Todo/Condition tree display."""
+ # pylint: disable=too-few-public-methods
todo: Todo
seen: bool
children: list[TodoNode]
+ def __init__(self,
+ todo: Todo,
+ seen: bool,
+ children: list[TodoNode]) -> None:
+ self.todo = todo
+ self.seen = seen
+ self.children = children
+
+ @property
+ def as_dict(self) -> dict[str, object]:
+ """Return self as (json.dumps-coompatible) dict."""
+ return {'todo': self.todo.id_,
+ 'seen': self.seen,
+ 'children': [c.as_dict for c in self.children]}
+
class Todo(BaseModel[int], ConditionsRelations):
"""Individual actionable."""
days_to_update: Set[str] = set()
children: list[Todo]
parents: list[Todo]
+ sorters = {'doneness': lambda t: t.is_done,
+ 'title': lambda t: t.title_then,
+ 'comment': lambda t: t.comment,
+ 'date': lambda t: t.date}
# pylint: disable=too-many-arguments
def __init__(self, id_: int | None,
from sqlite3 import Row
from time import sleep
from plomtask.db import DatabaseConnection
-from plomtask.exceptions import HandledException, BadFormatException
+from plomtask.exceptions import (HandledException, BadFormatException,
+ NotFoundException)
TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f'
def save(self, db_conn: DatabaseConnection) -> None:
"""Save as self.history entries, but first wipe old ones."""
+ if self.parent.id_ is None:
+ raise NotFoundException('cannot save attribute to parent if no ID')
db_conn.rewrite_relations(self.table_name, 'parent', self.parent.id_,
[[item[0], item[1]]
for item in self.history.items()])
#!/bin/sh
set -e
+# for dir in $(echo 'tests'); do
for dir in $(echo '.' 'plomtask' 'tests'); do
echo "Running mypy on ${dir}/ …."
python3 -m mypy --strict ${dir}/*.py
echo "Running unittest-parallel on tests/."
unittest-parallel -t . -s tests/ -p '*.py'
set +e
-rm test_db:*.*
+rm test_db:*
set -e
exit 0
th {
border: 1px solid black;
}
-td.cond_line_0, td.cond_line_1, td.cond_line_2 {
+td.cond_line {
padding: 0;
border-top: 1px solid white;
}
-td.cond_line_0 {
+td.cond_0 {
background-color: #bfbfbf;
}
-td.cond_line_1 {
+td.cond_1 {
background-color: #dfdfdf;
}
-td.cond_line_2 {
- background-color: #fffff;
+td.cond_2 {
+ background-color: fffff;
}
-td.cond_line_corner {
+td.cond_shrink {
max-width: 0px;
white-space: nowrap;
overflow: hidden;
{% endif %}
{% for condition in conditions_present %}
-<td class="cond_line_{{loop.index0 % 3}}">
{% if condition in node.todo.conditions and not condition.is_active %}
-O
+<td class="cond_line cond_{{loop.index0 % 3}}">
++>
{% elif condition in node.todo.blockers and condition.is_active %}
-!
+<td class="cond_line cond_{{loop.index0 % 3}}">
+->
+{% else %}
+<td class="cond_line cond_{{loop.index0 % 3}} cond_shrink">
+|
{% endif %}
</td>
{% endfor %}
</td>
{% for condition in conditions_present|reverse %}
-<td class="cond_line_{{(conditions_present|length - loop.index) % 3}}">{% if condition in node.todo.enables %} +{% elif condition in node.todo.disables %} !{% endif %}</td>
+{% if condition in node.todo.enables %}
+<td class="cond_line cond_{{(conditions_present|length - loop.index) % 3}}">
++>
+{% elif condition in node.todo.disables %}
+<td class="cond_line cond_{{(conditions_present|length - loop.index) % 3}}">
+->
+{% else %}
+<td class="cond_line cond_{{(conditions_present|length - loop.index) % 3}} cond_shrink">
+ |
+{% endif %}
+</td>
{% endfor %}
<td colspan=2>
add: <input type="text" name="new_todo" list="processes">
</p>
<p>
+make new todos
<select name="make_type">
-<option value="full">make new todos with children</option>
-<option value="empty"{% if make_type == "empty" %}selected {% endif %}>make new todos without children</option>
+<option value="full">with</option>
+<option value="empty"{% if make_type == "empty" %}selected {% endif %}>without</option>
</select>
+descendants (i.e. adopt where possible, otherwise create anew)
</p>
<table>
{% for _ in conditions_present %}
{% if outer_loop.index > loop.index %}
-<td class="cond_line_{{loop.index0 % 3}}">
+<td class="cond_line cond_{{loop.index0 % 3}} cond_shrink">|
{% elif outer_loop.index < loop.index %}
-<td class="cond_line_{{outer_loop.index0 % 3}}">
+<td class="cond_line cond_{{outer_loop.index0 % 3}}">
{% else %}
-<td class="cond_line_{{outer_loop.index0 % 3}} cond_line_corner">×
+<td class="cond_line cond_{{outer_loop.index0 % 3}} cond_shrink">/
{% endif %}
</td>
{% endfor %}
-<td class="cond_line_{{loop.index0 % 3}}"><input type="checkbox" disabled{% if condition.is_active %} checked{% endif %}></td>
-<td colspan=2 class="cond_line_{{loop.index0 % 3}}"><a href="condition?id={{condition.id_}}">{{condition.title.at(day.date)|e}}</a></td>
+<td class="cond_line cond_{{loop.index0 % 3}}"><input type="checkbox" disabled{% if condition.is_active %} checked{% endif %}></td>
+<td colspan=2 class="cond_line cond_{{loop.index0 % 3}}"><a href="condition?id={{condition.id_}}">{{condition.title.at(day.date)|e}}</a></td>
{% for _ in conditions_present %}
{% if outer_loop.index0 + loop.index < conditions_present|length %}
-<td class="cond_line_{{outer_loop.index0 % 3}}">
+<td class="cond_line cond_{{outer_loop.index0 % 3}}">
{% elif outer_loop.index0 + loop.index > conditions_present|length %}
-<td class="cond_line_{{(conditions_present|length - loop.index) % 3}}">
+<td class="cond_line cond_{{(conditions_present|length - loop.index) % 3}} cond_shrink"> |
{% else %}
-<td class="cond_line_{{outer_loop.index0 % 3}} cond_line_corner"> ×
+<td class="cond_line cond_{{outer_loop.index0 % 3}} cond_shrink"> \
{% endif %}
{% endfor %}
<tr>
{% for condition in conditions_present %}
-<td class="cond_line_{{loop.index0 % 3}}"></td>
+<td class="cond_line cond_{{loop.index0 % 3}} cond_shrink">|</td>
{% endfor %}
<th colspan=3>doables</th>
{% for condition in conditions_present %}
-<td class="cond_line_{{(conditions_present|length - loop.index) % 3}}"></td>
+<td class="cond_line cond_{{(conditions_present|length - loop.index) % 3}} cond_shrink"> |</td>
{% endfor %}
<th colspan=2>comments</th>
</tr>
</tr>
<tr>
<th>conditions</th>
-<td>{{ macros.simple_checkbox_table("condition", process.conditions, "condition", "condition_candidates") }}</td>
+<td>{{ macros.simple_checkbox_table("conditions", process.conditions, "condition", "condition_candidates") }}</td>
</tr>
<tr>
<th>blockers</th>
-<td>{{ macros.simple_checkbox_table("blocker", process.blockers, "condition", "condition_candidates") }}</td>
+<td>{{ macros.simple_checkbox_table("blockers", process.blockers, "condition", "condition_candidates") }}</td>
</tr>
<tr>
<th>enables</th>
</tr>
<tr>
<th>conditions</th>
-<td>{{ macros.simple_checkbox_table("condition", todo.conditions, "condition", "condition_candidates") }}</td>
+<td>{{ macros.simple_checkbox_table("conditions", todo.conditions, "condition", "condition_candidates") }}</td>
</tr>
<tr>
<th>blockers</th>
-<td>{{ macros.simple_checkbox_table("blocker", todo.blockers, "condition", "condition_candidates") }}</td>
+<td>{{ macros.simple_checkbox_table("blockers", todo.blockers, "condition", "condition_candidates") }}</td>
</tr>
<tr>
<th>enables</th>
class TestsSansDB(TestCaseSansDB):
"""Tests requiring no DB setup."""
checked_class = Condition
- do_id_test = True
versioned_defaults_to_test = {'title': 'UNNAMED', 'description': ''}
default_init_kwargs = {'is_active': False}
test_versioneds = {'title': str, 'description': str}
- def test_Condition_from_table_row(self) -> None:
- """Test .from_table_row() properly reads in class from DB"""
- self.check_from_table_row()
- self.check_versioned_from_table_row('title', str)
- self.check_versioned_from_table_row('description', str)
-
- def test_Condition_by_id(self) -> None:
- """Test .by_id(), including creation."""
- self.check_by_id()
-
- def test_Condition_all(self) -> None:
- """Test .all()."""
- self.check_all()
-
- def test_Condition_singularity(self) -> None:
- """Test pointers made for single object keep pointing to it."""
- self.check_singularity('is_active', True)
-
- def test_Condition_versioned_attributes_singularity(self) -> None:
- """Test behavior of VersionedAttributes on saving (with .title)."""
- self.check_versioned_singularity()
-
- def test_Condition_remove(self) -> None:
+ def test_remove(self) -> None:
"""Test .remove() effects on DB and cache."""
- self.check_remove()
+ super().test_remove()
proc = Process(None)
proc.save(self.db_conn)
todo = Todo(None, proc, False, '2024-01-01')
class TestsWithServer(TestCaseWithServer):
"""Module tests against our HTTP server/handler (and database)."""
- def test_do_POST_condition(self) -> None:
- """Test POST /condition and its effect on the database."""
- form_data = {'title': 'foo', 'description': 'foo'}
- self.check_post(form_data, '/condition', 302, '/condition?id=1')
- self.assertEqual(1, len(Condition.all(self.db_conn)))
- form_data['delete'] = ''
- self.check_post(form_data, '/condition?id=', 404)
- self.check_post(form_data, '/condition?id=2', 404)
- self.check_post(form_data, '/condition?id=1', 302, '/conditions')
- self.assertEqual(0, len(Condition.all(self.db_conn)))
+ @classmethod
+ def GET_condition_dict(cls, cond: dict[str, object]) -> dict[str, object]:
+ """Return JSON of GET /condition to expect."""
+ return {'is_new': False,
+ 'enabled_processes': [],
+ 'disabled_processes': [],
+ 'enabling_processes': [],
+ 'disabling_processes': [],
+ 'condition': cond['id'],
+ '_library': {'Condition': cls.as_refs([cond])}}
- def test_do_GET(self) -> None:
- """Test /condition and /conditions response codes."""
- form_data = {'title': 'foo', 'description': 'foo'}
- self.check_post(form_data, '/condition', 302, '/condition?id=1')
+ @classmethod
+ def GET_conditions_dict(cls, conds: list[dict[str, object]]
+ ) -> dict[str, object]:
+ """Return JSON of GET /conditions to expect."""
+ library = {'Condition': cls.as_refs(conds)} if conds else {}
+ d: dict[str, object] = {'conditions': cls.as_id_list(conds),
+ 'sort_by': 'title',
+ 'pattern': '',
+ '_library': library}
+ return d
+
+ def test_fail_POST_condition(self) -> None:
+ """Test malformed/illegal POST /condition requests."""
+ # check invalid POST payloads
+ url = '/condition'
+ self.check_post({}, url, 400)
+ self.check_post({'title': ''}, url, 400)
+ self.check_post({'title': '', 'description': ''}, url, 400)
+ self.check_post({'title': '', 'is_active': False}, url, 400)
+ self.check_post({'description': '', 'is_active': False}, url, 400)
+ # check valid POST payload on bad paths
+ valid_payload = {'title': '', 'description': '', 'is_active': False}
+ self.check_post(valid_payload, '/condition?id=foo', 400)
+
+ def test_POST_condition(self) -> None:
+ """Test (valid) POST /condition and its effect on GET /condition[s]."""
+ # test valid POST's effect on …
+ post = {'title': 'foo', 'description': 'oof', 'is_active': False}
+ self.check_post(post, '/condition', 302, '/condition?id=1')
+ # … single /condition
+ cond = self.cond_as_dict(titles=['foo'], descriptions=['oof'])
+ assert isinstance(cond['_versioned'], dict)
+ expected_single = self.GET_condition_dict(cond)
+ self.check_json_get('/condition?id=1', expected_single)
+ # … full /conditions
+ expected_all = self.GET_conditions_dict([cond])
+ self.check_json_get('/conditions', expected_all)
+ # test (no) effect of invalid POST to existing Condition on /condition
+ self.check_post({}, '/condition?id=1', 400)
+ self.check_json_get('/condition?id=1', expected_single)
+ # test effect of POST changing title and activeness
+ post = {'title': 'bar', 'description': 'oof', 'is_active': True}
+ self.check_post(post, '/condition?id=1', 302)
+ cond['_versioned']['title'][1] = 'bar'
+ cond['is_active'] = True
+ self.check_json_get('/condition?id=1', expected_single)
+ # test deletion POST's effect on …
+ self.check_post({'delete': ''}, '/condition?id=1', 302, '/conditions')
+ cond = self.cond_as_dict()
+ assert isinstance(expected_single['_library'], dict)
+ expected_single['_library']['Condition'] = self.as_refs([cond])
+ self.check_json_get('/condition?id=1', expected_single)
+ # … full /conditions
+ expected_all['conditions'] = []
+ expected_all['_library'] = {}
+ self.check_json_get('/conditions', expected_all)
+
+ def test_GET_condition(self) -> None:
+ """More GET /condition testing, especially for Process relations."""
+ # check expected default status codes
self.check_get_defaults('/condition')
- self.check_get('/conditions', 200)
+ # make Condition and two Processes that among them establish all
+ # possible ConditionsRelations to it, …
+ cond_post = {'title': 'foo', 'description': 'oof', 'is_active': False}
+ self.check_post(cond_post, '/condition', 302, '/condition?id=1')
+ proc1_post = {'title': 'A', 'description': '', 'effort': 1.0,
+ 'conditions': [1], 'disables': [1]}
+ proc2_post = {'title': 'B', 'description': '', 'effort': 1.0,
+ 'enables': [1], 'blockers': [1]}
+ self.post_process(1, proc1_post)
+ self.post_process(2, proc2_post)
+ # … then check /condition displays all these properly.
+ cond = self.cond_as_dict(titles=['foo'], descriptions=['oof'])
+ assert isinstance(cond['id'], int)
+ proc1 = self.proc_as_dict(conditions=[cond['id']],
+ disables=[cond['id']])
+ proc2 = self.proc_as_dict(2, 'B',
+ blockers=[cond['id']],
+ enables=[cond['id']])
+ expected = self.GET_condition_dict(cond)
+ assert isinstance(expected['_library'], dict)
+ expected['enabled_processes'] = self.as_id_list([proc1])
+ expected['disabled_processes'] = self.as_id_list([proc2])
+ expected['enabling_processes'] = self.as_id_list([proc2])
+ expected['disabling_processes'] = self.as_id_list([proc1])
+ expected['_library']['Process'] = self.as_refs([proc1, proc2])
+ self.check_json_get('/condition?id=1', expected)
+
+ def test_GET_conditions(self) -> None:
+ """Test GET /conditions."""
+ # test empty result on empty DB, default-settings on empty params
+ expected = self.GET_conditions_dict([])
+ self.check_json_get('/conditions', expected)
+ # test on meaningless non-empty params (incl. entirely un-used key),
+ # that 'sort_by' default to 'title' (even if set to something else, as
+ # long as without handler) and 'pattern' get preserved
+ expected['pattern'] = 'bar' # preserved despite zero effect!
+ url = '/conditions?sort_by=foo&pattern=bar&foo=x'
+ self.check_json_get(url, expected)
+ # test non-empty result, automatic (positive) sorting by title
+ post1 = {'is_active': False, 'title': 'foo', 'description': 'oof'}
+ post2 = {'is_active': False, 'title': 'bar', 'description': 'rab'}
+ post3 = {'is_active': True, 'title': 'baz', 'description': 'zab'}
+ self.check_post(post1, '/condition', 302, '/condition?id=1')
+ self.check_post(post2, '/condition', 302, '/condition?id=2')
+ self.check_post(post3, '/condition', 302, '/condition?id=3')
+ cond1 = self.cond_as_dict(1, False, ['foo'], ['oof'])
+ cond2 = self.cond_as_dict(2, False, ['bar'], ['rab'])
+ cond3 = self.cond_as_dict(3, True, ['baz'], ['zab'])
+ expected = self.GET_conditions_dict([cond2, cond3, cond1])
+ self.check_json_get('/conditions', expected)
+ # test other sortings
+ # (NB: by .is_active has two items of =False, their order currently
+ # is not explicitly made predictable, so mail fail until we do)
+ expected['sort_by'] = '-title'
+ expected['conditions'] = self.as_id_list([cond1, cond3, cond2])
+ self.check_json_get('/conditions?sort_by=-title', expected)
+ expected['sort_by'] = 'is_active'
+ expected['conditions'] = self.as_id_list([cond1, cond2, cond3])
+ self.check_json_get('/conditions?sort_by=is_active', expected)
+ expected['sort_by'] = '-is_active'
+ expected['conditions'] = self.as_id_list([cond3, cond1, cond2])
+ self.check_json_get('/conditions?sort_by=-is_active', expected)
+ # test pattern matching on title
+ expected = self.GET_conditions_dict([cond2, cond3])
+ expected['pattern'] = 'ba'
+ self.check_json_get('/conditions?pattern=ba', expected)
+ # test pattern matching on description
+ assert isinstance(expected['_library'], dict)
+ expected['conditions'] = self.as_id_list([cond1])
+ expected['_library']['Condition'] = self.as_refs([cond1])
+ expected['pattern'] = 'of'
+ self.check_json_get('/conditions?pattern=of', expected)
"""Test Days module."""
from unittest import TestCase
from datetime import datetime
+from typing import Callable
from tests.utils import TestCaseWithDB, TestCaseWithServer
from plomtask.dating import date_in_n_days
from plomtask.days import Day
-from plomtask.exceptions import BadFormatException
class TestsSansDB(TestCase):
"""Days module tests not requiring DB setup."""
+ legal_ids = ['2024-01-01']
+ illegal_ids = ['foo', '2024-02-30', '2024-02-01 23:00:00']
- def test_Day_valid_date(self) -> None:
- """Test Day's date format validation and parsing."""
- with self.assertRaises(BadFormatException):
- Day('foo')
- with self.assertRaises(BadFormatException):
- Day('2024-02-30')
- with self.assertRaises(BadFormatException):
- Day('2024-02-01 23:00:00')
- self.assertEqual(datetime(2024, 1, 1), Day('2024-01-01').datetime)
+ def test_Day_datetime_weekday_neighbor_dates(self) -> None:
+ """Test Day's date parsing."""
+ self.assertEqual(datetime(2024, 5, 1), Day('2024-05-01').datetime)
+ self.assertEqual('Sunday', Day('2024-03-17').weekday)
+ self.assertEqual('March', Day('2024-03-17').month_name)
+ self.assertEqual('2023-12-31', Day('2024-01-01').prev_date)
+ self.assertEqual('2023-03-01', Day('2023-02-28').next_date)
def test_Day_sorting(self) -> None:
"""Test sorting by .__lt__ and Day.__eq__."""
days = [day3, day1, day2]
self.assertEqual(sorted(days), [day1, day2, day3])
- def test_Day_weekday(self) -> None:
- """Test Day.weekday."""
- self.assertEqual(Day('2024-03-17').weekday, 'Sunday')
-
- def test_Day_neighbor_dates(self) -> None:
- """Test Day.prev_date and Day.next_date."""
- self.assertEqual(Day('2024-01-01').prev_date, '2023-12-31')
- self.assertEqual(Day('2023-02-28').next_date, '2023-03-01')
-
class TestsWithDB(TestCaseWithDB):
"""Tests requiring DB, but not server setup."""
checked_class = Day
default_ids = ('2024-01-01', '2024-01-02', '2024-01-03')
- def test_saving_and_caching(self) -> None:
- """Test storage of instances.
-
- We don't use the parent class's method here because the checked class
- has too different a handling of IDs.
- """
- kwargs = {'date': self.default_ids[0], 'comment': 'foo'}
- self.check_saving_and_caching(**kwargs)
-
- def test_Day_from_table_row(self) -> None:
- """Test .from_table_row() properly reads in class from DB"""
- self.check_from_table_row()
-
- def test_Day_by_id(self) -> None:
- """Test .by_id()."""
- self.check_by_id()
-
def test_Day_by_date_range_filled(self) -> None:
"""Test Day.by_date_range_filled."""
date1, date2, date3 = self.default_ids
- day1, day2, day3 = self.check_all()
- # check date range is a closed interval
+ day1 = Day(date1)
+ day2 = Day(date2)
+ day3 = Day(date3)
+ for day in [day1, day2, day3]:
+ day.save(self.db_conn)
+ # check date range includes limiter days
self.assertEqual(Day.by_date_range_filled(self.db_conn, date1, date3),
[day1, day2, day3])
# check first date range value excludes what's earlier
self.assertEqual(Day.by_date_range_filled(self.db_conn,
day5.date, day7.date),
[day5, day6, day7])
- self.check_storage([day1, day2, day3, day6])
+ self.check_identity_with_cache_and_db([day1, day2, day3, day6])
# check 'today' is interpreted as today's date
today = Day(date_in_n_days(0))
- today.save(self.db_conn)
self.assertEqual(Day.by_date_range_filled(self.db_conn,
'today', 'today'),
[today])
-
- def test_Day_remove(self) -> None:
- """Test .remove() effects on DB and cache."""
- self.check_remove()
-
- def test_Day_singularity(self) -> None:
- """Test pointers made for single object keep pointing to it."""
- self.check_singularity('day_comment', 'boo')
+ prev_day = Day(date_in_n_days(-1))
+ next_day = Day(date_in_n_days(1))
+ self.assertEqual(Day.by_date_range_filled(self.db_conn,
+ 'yesterday', 'tomorrow'),
+ [prev_day, today, next_day])
class TestsWithServer(TestCaseWithServer):
"""Tests against our HTTP server/handler (and database)."""
- def test_do_GET(self) -> None:
- """Test /day and /calendar response codes, and / redirect."""
- self.check_get('/day', 200)
- self.check_get('/day?date=3000-01-01', 200)
- self.check_get('/day?date=FOO', 400)
- self.check_get('/calendar', 200)
- self.check_get('/calendar?start=&end=', 200)
- self.check_get('/calendar?start=today&end=today', 200)
- self.check_get('/calendar?start=2024-01-01&end=2025-01-01', 200)
- self.check_get('/calendar?start=foo', 400)
+ @classmethod
+ def GET_day_dict(cls, date: str) -> dict[str, object]:
+ """Return JSON of GET /day to expect."""
+ # day: dict[str, object] = {'id': date, 'comment': '', 'todos': []}
+ day = cls._day_as_dict(date)
+ d: dict[str, object] = {'day': date,
+ 'top_nodes': [],
+ 'make_type': '',
+ 'enablers_for': {},
+ 'disablers_for': {},
+ 'conditions_present': [],
+ 'processes': [],
+ '_library': {'Day': cls.as_refs([day])}}
+ return d
+
+ @classmethod
+ def GET_calendar_dict(cls, start: int, end: int) -> dict[str, object]:
+ """Return JSON of GET /calendar to expect."""
+ today_date = date_in_n_days(0)
+ start_date = date_in_n_days(start)
+ end_date = date_in_n_days(end)
+ dates = [date_in_n_days(i) for i in range(start, end+1)]
+ days = [cls._day_as_dict(d) for d in dates]
+ library = {'Day': cls.as_refs(days)} if len(days) > 0 else {}
+ return {'today': today_date, 'start': start_date, 'end': end_date,
+ 'days': dates, '_library': library}
+
+ @staticmethod
+ def _todo_as_dict(id_: int = 1,
+ process_id: int = 1,
+ date: str = '2024-01-01',
+ conditions: None | list[int] = None,
+ disables: None | list[int] = None,
+ blockers: None | list[int] = None,
+ enables: None | list[int] = None
+ ) -> dict[str, object]:
+ """Return JSON of Todo to expect."""
+ # pylint: disable=too-many-arguments
+ d = {'id': id_,
+ 'date': date,
+ 'process_id': process_id,
+ 'is_done': False,
+ 'calendarize': False,
+ 'comment': '',
+ 'children': [],
+ 'parents': [],
+ 'effort': None,
+ 'conditions': conditions if conditions else [],
+ 'disables': disables if disables else [],
+ 'blockers': blockers if blockers else [],
+ 'enables': enables if enables else []}
+ return d
- def test_do_POST_day(self) -> None:
- """Test POST /day."""
- form_data = {'day_comment': '', 'make_type': 'full'}
- self.check_post(form_data, '/day', 400)
- self.check_post(form_data, '/day?date=foo', 400)
- self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
- self.check_post({'foo': ''}, '/day?date=2024-01-01', 400)
+ @staticmethod
+ def _todo_node_as_dict(todo_id: int) -> dict[str, object]:
+ """Return JSON of TodoNode to expect."""
+ return {'children': [], 'seen': False, 'todo': todo_id}
+
+ @staticmethod
+ def _day_as_dict(date: str) -> dict[str, object]:
+ return {'id': date, 'comment': '', 'todos': []}
+
+ @staticmethod
+ def _post_batch(list_of_args: list[list[object]],
+ names_of_simples: list[str],
+ names_of_versioneds: list[str],
+ f_as_dict: Callable[..., dict[str, object]],
+ f_to_post: Callable[..., None | dict[str, object]]
+ ) -> list[dict[str, object]]:
+ """Post expected=f_as_dict(*args) as input to f_to_post, for many."""
+ expecteds = []
+ for args in list_of_args:
+ expecteds += [f_as_dict(*args)]
+ for expected in expecteds:
+ assert isinstance(expected['_versioned'], dict)
+ post = {}
+ for name in names_of_simples:
+ post[name] = expected[name]
+ for name in names_of_versioneds:
+ post[name] = expected['_versioned'][name][0]
+ f_to_post(expected['id'], post)
+ return expecteds
+
+ def _post_day(self, params: str = '',
+ form_data: None | dict[str, object] = None,
+ redir_to: str = '',
+ status: int = 302,
+ ) -> None:
+ """POST /day?{params} with form_data."""
+ if not form_data:
+ form_data = {'day_comment': '', 'make_type': ''}
+ target = f'/day?{params}'
+ if not redir_to:
+ redir_to = f'{target}&make_type={form_data["make_type"]}'
+ self.check_post(form_data, target, status, redir_to)
+
+ def test_basic_GET_day(self) -> None:
+ """Test basic (no Processes/Conditions/Todos) GET /day basics."""
+ # check illegal date parameters
+ self.check_get('/day?date=foo', 400)
+ self.check_get('/day?date=2024-02-30', 400)
+ # check undefined day
+ date = date_in_n_days(0)
+ expected = self.GET_day_dict(date)
+ self.check_json_get('/day', expected)
+ # NB: GET ?date="today"/"yesterday"/"tomorrow" in test_basic_POST_day
+ # check 'make_type' GET parameter affects immediate reply, but …
+ date = '2024-01-01'
+ expected = self.GET_day_dict(date)
+ expected['make_type'] = 'bar'
+ self.check_json_get(f'/day?date={date}&make_type=bar', expected)
+ # … not any following, …
+ expected['make_type'] = ''
+ self.check_json_get(f'/day?date={date}', expected)
+ # … not even when part of a POST request
+ post: dict[str, object] = {'day_comment': '', 'make_type': 'foo'}
+ self._post_day(f'date={date}', post)
+ self.check_json_get(f'/day?date={date}', expected)
+
+ def test_fail_POST_day(self) -> None:
+ """Test malformed/illegal POST /day requests."""
+ # check payloads lacking minimum expecteds
+ url = '/day?date=2024-01-01'
+ self.check_post({}, url, 400)
+ self.check_post({'day_comment': ''}, url, 400)
+ self.check_post({'make_type': ''}, url, 400)
+ # to next check illegal new_todo values, we need an actual Process
+ self.post_process(1)
+ # check illegal new_todo values
+ post: dict[str, object]
+ post = {'make_type': '', 'day_comment': '', 'new_todo': ['foo']}
+ self.check_post(post, url, 400)
+ post['new_todo'] = [1, 2] # no Process of .id_=2 exists
+ # to next check illegal old_todo inputs, we need to first post Todo
+ post['new_todo'] = [1]
+ self.check_post(post, url, 302, '/day?date=2024-01-01&make_type=')
+ # check illegal old_todo inputs (equal list lengths though)
+ post = {'make_type': '', 'day_comment': '', 'comment': ['foo'],
+ 'effort': [3.3], 'done': [], 'todo_id': [1]}
+ self.check_post(post, url, 302, '/day?date=2024-01-01&make_type=')
+ post['todo_id'] = [2] # reference to non-existant Process
+ self.check_post(post, url, 404)
+ post['todo_id'] = ['a']
+ self.check_post(post, url, 400)
+ post['todo_id'] = [1]
+ post['done'] = ['foo']
+ self.check_post(post, url, 400)
+ post['done'] = [2] # reference to non-posted todo_id
+ self.check_post(post, url, 400)
+ post['done'] = []
+ post['effort'] = ['foo']
+ self.check_post(post, url, 400)
+ post['effort'] = [None]
+ self.check_post(post, url, 400)
+ post['effort'] = [3.3]
+ # check illegal old_todo inputs: unequal list lengths
+ post['comment'] = []
+ self.check_post(post, url, 400)
+ post['comment'] = ['foo', 'foo']
+ self.check_post(post, url, 400)
+ post['comment'] = ['foo']
+ post['effort'] = []
+ self.check_post(post, url, 400)
+ post['effort'] = [3.3, 3.3]
+ self.check_post(post, url, 400)
+ post['effort'] = [3.3]
+ post['todo_id'] = [1, 1]
+ self.check_post(post, url, 400)
+ post['todo_id'] = [1]
+ # # check valid POST payload on bad paths
+ self.check_post(post, '/day', 400)
+ self.check_post(post, '/day?date=', 400)
+ self.check_post(post, '/day?date=foo', 400)
+
+ def test_basic_POST_day(self) -> None:
+ """Test basic (no Todos) POST /day.
+
+ Check POST (& GET!) requests properly parse 'today', 'tomorrow',
+ 'yesterday', and actual date strings;
+ preserve 'make_type' setting in redirect even if nonsensical;
+ and store 'day_comment'
+ """
+ for name, dist, test_str in [('2024-01-01', None, 'a'),
+ ('today', 0, 'b'),
+ ('yesterday', -1, 'c'),
+ ('tomorrow', +1, 'd')]:
+ date = name if dist is None else date_in_n_days(dist)
+ post = {'day_comment': test_str, 'make_type': f'x:{test_str}'}
+ post_url = f'/day?date={name}'
+ redir_url = f'{post_url}&make_type={post["make_type"]}'
+ self.check_post(post, post_url, 302, redir_url)
+ expected = self.GET_day_dict(date)
+ assert isinstance(expected['_library'], dict)
+ expected['_library']['Day'][date]['comment'] = test_str
+ self.check_json_get(post_url, expected)
+
+ def test_GET_day_with_processes_and_todos(self) -> None:
+ """Test GET /day displaying Processes and Todos (no trees)."""
+ date = '2024-01-01'
+ # check Processes get displayed in ['processes'] and ['_library']
+ procs_data = [[1, 'foo', 'oof', 1.1], [2, 'bar', 'rab', 0.9]]
+ procs_expected = self._post_batch(procs_data, [],
+ ['title', 'description', 'effort'],
+ self.proc_as_dict, self.post_process)
+ expected = self.GET_day_dict(date)
+ assert isinstance(expected['_library'], dict)
+ expected['processes'] = self.as_id_list(procs_expected)
+ expected['_library']['Process'] = self.as_refs(procs_expected)
+ self._post_day(f'date={date}')
+ self.check_json_get(f'/day?date={date}', expected)
+ # post Todos of either process and check their display
+ post_day: dict[str, object]
+ post_day = {'day_comment': '', 'make_type': '', 'new_todo': [1, 2]}
+ todos = [self._todo_as_dict(1, 1, date),
+ self._todo_as_dict(2, 2, date)]
+ expected['_library']['Todo'] = self.as_refs(todos)
+ expected['_library']['Day'][date]['todos'] = self.as_id_list(todos)
+ nodes = [self._todo_node_as_dict(1), self._todo_node_as_dict(2)]
+ expected['top_nodes'] = nodes
+ self._post_day(f'date={date}', post_day)
+ self.check_json_get(f'/day?date={date}', expected)
+ # add a comment to one Todo and set the other's doneness and effort
+ post_day = {'day_comment': '', 'make_type': '', 'new_todo': [],
+ 'todo_id': [1, 2], 'done': [2], 'comment': ['FOO', ''],
+ 'effort': [2.3, '']}
+ expected['_library']['Todo']['1']['comment'] = 'FOO'
+ expected['_library']['Todo']['1']['effort'] = 2.3
+ expected['_library']['Todo']['2']['is_done'] = True
+ self._post_day(f'date={date}', post_day)
+ self.check_json_get(f'/day?date={date}', expected)
+
+ def test_GET_day_with_conditions(self) -> None:
+ """Test GET /day displaying Conditions and their relations."""
+ date = '2024-01-01'
+ # add Process with Conditions and their Todos, check display
+ conds_data = [[1, False, ['A'], ['a']], [2, True, ['B'], ['b']]]
+ conds_expected = self._post_batch(
+ conds_data, ['is_active'], ['title', 'description'],
+ self.cond_as_dict,
+ lambda x, y: self.check_post(y, f'/condition?id={x}', 302))
+ cond_names = ['conditions', 'disables', 'blockers', 'enables']
+ procs_data = [[1, 'foo', 'oof', 1.1, [1], [1], [2], [2]],
+ [2, 'bar', 'rab', 0.9, [2], [2], [1], [1]]]
+ procs_expected = self._post_batch(procs_data, cond_names,
+ ['title', 'description', 'effort'],
+ self.proc_as_dict, self.post_process)
+ expected = self.GET_day_dict(date)
+ assert isinstance(expected['_library'], dict)
+ expected['processes'] = self.as_id_list(procs_expected)
+ expected['_library']['Process'] = self.as_refs(procs_expected)
+ expected['_library']['Condition'] = self.as_refs(conds_expected)
+ self._post_day(f'date={date}')
+ self.check_json_get(f'/day?date={date}', expected)
+ # add Todos in relation to Conditions, check consequences
+ post_day: dict[str, object]
+ post_day = {'day_comment': '', 'make_type': '', 'new_todo': [1, 2]}
+ todos = [self._todo_as_dict(1, 1, date, [1], [1], [2], [2]),
+ self._todo_as_dict(2, 2, date, [2], [2], [1], [1])]
+ expected['_library']['Todo'] = self.as_refs(todos)
+ expected['_library']['Day'][date]['todos'] = self.as_id_list(todos)
+ nodes = [self._todo_node_as_dict(1), self._todo_node_as_dict(2)]
+ expected['top_nodes'] = nodes
+ expected['disablers_for'] = {'1': [1], '2': [2]}
+ expected['enablers_for'] = {'1': [2], '2': [1]}
+ expected['conditions_present'] = self.as_id_list(conds_expected)
+ self._post_day(f'date={date}', post_day)
+ self.check_json_get(f'/day?date={date}', expected)
+
+ def test_GET_calendar(self) -> None:
+ """Test GET /calendar responses based on various inputs, DB states."""
+ # check illegal date range delimiters
+ self.check_get('/calendar?start=foo', 400)
+ self.check_get('/calendar?end=foo', 400)
+ # check default range without saved days
+ expected = self.GET_calendar_dict(-1, 366)
+ self.check_json_get('/calendar', expected)
+ self.check_json_get('/calendar?start=&end=', expected)
+ # check named days as delimiters
+ expected = self.GET_calendar_dict(-1, +1)
+ self.check_json_get('/calendar?start=yesterday&end=tomorrow', expected)
+ # check zero-element range
+ expected = self.GET_calendar_dict(+1, 0)
+ self.check_json_get('/calendar?start=tomorrow&end=today', expected)
+ # check saved day shows up in results with proven by its comment
+ post_day: dict[str, object] = {'day_comment': 'foo', 'make_type': ''}
+ date1 = date_in_n_days(-2)
+ self._post_day(f'date={date1}', post_day)
+ start_date = date_in_n_days(-5)
+ end_date = date_in_n_days(+5)
+ url = f'/calendar?start={start_date}&end={end_date}'
+ expected = self.GET_calendar_dict(-5, +5)
+ assert isinstance(expected['_library'], dict)
+ expected['_library']['Day'][date1]['comment'] = post_day['day_comment']
+ self.check_json_get(url, expected)
"""Tests against our HTTP server/handler (and database)."""
def test_do_GET(self) -> None:
- """Test / redirect, and unknown targets failing."""
+ """Test GET / redirect, and unknown targets failing."""
self.conn.request('GET', '/')
self.check_redirect('/day')
self.check_get('/foo', 404)
"""Test Processes module."""
+from typing import Any
from tests.utils import TestCaseWithDB, TestCaseWithServer, TestCaseSansDB
from plomtask.processes import Process, ProcessStep, ProcessStepsNode
from plomtask.conditions import Condition
class TestsSansDB(TestCaseSansDB):
"""Module tests not requiring DB setup."""
checked_class = Process
- do_id_test = True
versioned_defaults_to_test = {'title': 'UNNAMED', 'description': '',
'effort': 1.0}
class TestsSansDBProcessStep(TestCaseSansDB):
"""Module tests not requiring DB setup."""
checked_class = ProcessStep
- do_id_test = True
default_init_args = [2, 3, 4]
def test_Process_conditions_saving(self) -> None:
"""Test .save/.save_core."""
p, set1, set2, set3 = self.p_of_conditions()
+ assert p.id_ is not None
r = Process.by_id(self.db_conn, p.id_)
self.assertEqual(sorted(r.conditions), sorted(set1))
self.assertEqual(sorted(r.enables), sorted(set2))
self.assertEqual(sorted(r.disables), sorted(set3))
- def test_Process_from_table_row(self) -> None:
- """Test .from_table_row() properly reads in class from DB"""
- self.check_from_table_row()
- self.check_versioned_from_table_row('title', str)
- self.check_versioned_from_table_row('description', str)
- self.check_versioned_from_table_row('effort', float)
+ def test_from_table_row(self) -> None:
+ """Test .from_table_row() properly reads in class from DB."""
+ super().test_from_table_row()
p, set1, set2, set3 = self.p_of_conditions()
p.save(self.db_conn)
assert isinstance(p.id_, int)
method(self.db_conn, [c1.id_, c2.id_])
self.assertEqual(getattr(p, target), [c1, c2])
- def test_Process_by_id(self) -> None:
- """Test .by_id(), including creation"""
- self.check_by_id()
-
- def test_Process_all(self) -> None:
- """Test .all()."""
- self.check_all()
-
- def test_Process_singularity(self) -> None:
- """Test pointers made for single object keep pointing to it."""
- self.check_singularity('conditions', [Condition(None)])
-
- def test_Process_versioned_attributes_singularity(self) -> None:
- """Test behavior of VersionedAttributes on saving (with .title)."""
- self.check_versioned_singularity()
-
- def test_Process_removal(self) -> None:
+ def test_remove(self) -> None:
"""Test removal of Processes and ProcessSteps."""
- self.check_remove()
+ super().test_remove()
p1, p2, p3 = self.three_processes()
assert isinstance(p1.id_, int)
assert isinstance(p2.id_, int)
p1.remove(self.db_conn)
p2.set_steps(self.db_conn, [])
with self.assertRaises(NotFoundException):
+ assert step_id is not None
ProcessStep.by_id(self.db_conn, step_id)
p1.remove(self.db_conn)
step = ProcessStep(None, p2.id_, p3.id_, None)
- step_id = step.id_
p2.set_steps(self.db_conn, [step])
+ step_id = step.id_
p2.remove(self.db_conn)
with self.assertRaises(NotFoundException):
+ assert step_id is not None
ProcessStep.by_id(self.db_conn, step_id)
todo = Todo(None, p3, False, '2024-01-01')
todo.save(self.db_conn)
class TestsWithDBForProcessStep(TestCaseWithDB):
"""Module tests requiring DB setup."""
checked_class = ProcessStep
- default_init_kwargs = {'owner_id': 2, 'step_process_id': 3,
- 'parent_step_id': 4}
+ default_init_kwargs = {'owner_id': 1, 'step_process_id': 2,
+ 'parent_step_id': 3}
def setUp(self) -> None:
super().setUp()
- p = Process(1)
- p.save(self.db_conn)
- p = Process(2)
- p.save(self.db_conn)
+ self.p1 = Process(1)
+ self.p1.save(self.db_conn)
- def test_saving_and_caching(self) -> None:
- """Test storage and initialization of instances and attributes."""
- self.check_saving_and_caching(id_=1, **self.default_init_kwargs)
-
- def test_ProcessStep_remove(self) -> None:
+ def test_remove(self) -> None:
"""Test .remove and unsetting of owner's .explicit_steps entry."""
- p1 = Process(None)
- p2 = Process(None)
- p1.save(self.db_conn)
+ p2 = Process(2)
p2.save(self.db_conn)
- assert isinstance(p1.id_, int)
+ assert isinstance(self.p1.id_, int)
assert isinstance(p2.id_, int)
- step = ProcessStep(None, p1.id_, p2.id_, None)
- p1.set_steps(self.db_conn, [step])
+ step = ProcessStep(None, self.p1.id_, p2.id_, None)
+ self.p1.set_steps(self.db_conn, [step])
step.remove(self.db_conn)
- self.assertEqual(p1.explicit_steps, [])
- self.check_storage([])
+ self.assertEqual(self.p1.explicit_steps, [])
+ self.check_identity_with_cache_and_db([])
class TestsWithServer(TestCaseWithServer):
'/process?id=', 400)
self.assertEqual(1, len(Process.all(self.db_conn)))
form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.0}
- self.post_process(2, form_data | {'condition': []})
- self.check_post(form_data | {'condition': [1]}, '/process?id=', 404)
- self.check_post({'title': 'foo', 'description': 'foo'},
+ self.post_process(2, form_data | {'conditions': []})
+ self.check_post(form_data | {'conditions': [1]}, '/process?id=', 404)
+ self.check_post({'title': 'foo', 'description': 'foo',
+ 'is_active': False},
'/condition', 302, '/condition?id=1')
- self.post_process(3, form_data | {'condition': [1]})
+ self.post_process(3, form_data | {'conditions': [1]})
self.post_process(4, form_data | {'disables': [1]})
self.post_process(5, form_data | {'enables': [1]})
form_data['delete'] = ''
self.post_process(1, form_data_1)
retrieved_process = Process.by_id(self.db_conn, 1)
self.assertEqual(retrieved_process.explicit_steps, [])
+ assert retrieved_step_id is not None
with self.assertRaises(NotFoundException):
ProcessStep.by_id(self.db_conn, retrieved_step_id)
# post new first (top_level) step of process 3 to process 1
self.post_process(1, form_data_1)
retrieved_process = Process.by_id(self.db_conn, 1)
self.assertEqual(len(retrieved_process.explicit_steps), 2)
- retrieved_step_0 = retrieved_process.explicit_steps[0]
+ retrieved_step_0 = retrieved_process.explicit_steps[1]
self.assertEqual(retrieved_step_0.step_process_id, 3)
self.assertEqual(retrieved_step_0.owner_id, 1)
self.assertEqual(retrieved_step_0.parent_step_id, None)
- retrieved_step_1 = retrieved_process.explicit_steps[1]
+ retrieved_step_1 = retrieved_process.explicit_steps[0]
self.assertEqual(retrieved_step_1.step_process_id, 2)
self.assertEqual(retrieved_step_1.owner_id, 1)
self.assertEqual(retrieved_step_1.parent_step_id, None)
self.post_process(1, form_data_1)
retrieved_process = Process.by_id(self.db_conn, 1)
self.assertEqual(len(retrieved_process.explicit_steps), 3)
- retrieved_step_0 = retrieved_process.explicit_steps[0]
+ retrieved_step_0 = retrieved_process.explicit_steps[1]
self.assertEqual(retrieved_step_0.step_process_id, 2)
self.assertEqual(retrieved_step_0.owner_id, 1)
self.assertEqual(retrieved_step_0.parent_step_id, None)
- retrieved_step_1 = retrieved_process.explicit_steps[1]
+ retrieved_step_1 = retrieved_process.explicit_steps[0]
self.assertEqual(retrieved_step_1.step_process_id, 3)
self.assertEqual(retrieved_step_1.owner_id, 1)
self.assertEqual(retrieved_step_1.parent_step_id, None)
def test_do_GET(self) -> None:
"""Test /process and /processes response codes."""
+ self.check_get('/process', 200)
+ self.check_get('/process?id=', 200)
+ self.check_get('/process?id=1', 200)
self.check_get_defaults('/process')
self.check_get('/processes', 200)
+
+ def test_fail_GET_process(self) -> None:
+ """Test invalid GET /process params."""
+ # check for invalid IDs
+ self.check_get('/process?id=foo', 400)
+ self.check_get('/process?id=0', 500)
+ # check we catch invalid base64
+ self.check_get('/process?title_b64=foo', 400)
+ # check failure on references to unknown processes; we create Process
+ # of ID=1 here so we know the 404 comes from step_to=2 etc. (that tie
+ # the Process displayed by /process to others), not from not finding
+ # the main Process itself
+ self.post_process(1)
+ self.check_get('/process?id=1&step_to=2', 404)
+ self.check_get('/process?id=1&has_step=2', 404)
+
+ @classmethod
+ def GET_processes_dict(cls, procs: list[dict[str, object]]
+ ) -> dict[str, object]:
+ """Return JSON of GET /processes to expect."""
+ library = {'Process': cls.as_refs(procs)} if procs else {}
+ d: dict[str, object] = {'processes': cls.as_id_list(procs),
+ 'sort_by': 'title',
+ 'pattern': '',
+ '_library': library}
+ return d
+
+ @staticmethod
+ def procstep_as_dict(id_: int,
+ owner_id: int,
+ step_process_id: int,
+ parent_step_id: int | None = None
+ ) -> dict[str, object]:
+ """Return JSON of Process to expect."""
+ return {'id': id_,
+ 'owner_id': owner_id,
+ 'step_process_id': step_process_id,
+ 'parent_step_id': parent_step_id}
+
+ def test_GET_processes(self) -> None:
+ """Test GET /processes."""
+ # pylint: disable=too-many-statements
+ # test empty result on empty DB, default-settings on empty params
+ expected = self.GET_processes_dict([])
+ self.check_json_get('/processes', expected)
+ # test on meaningless non-empty params (incl. entirely un-used key),
+ # that 'sort_by' default to 'title' (even if set to something else, as
+ # long as without handler) and 'pattern' get preserved
+ expected['pattern'] = 'bar' # preserved despite zero effect!
+ url = '/processes?sort_by=foo&pattern=bar&foo=x'
+ self.check_json_get(url, expected)
+ # test non-empty result, automatic (positive) sorting by title
+ post1: dict[str, Any]
+ post2: dict[str, Any]
+ post3: dict[str, Any]
+ post1 = {'title': 'foo', 'description': 'oof', 'effort': 1.0}
+ post2 = {'title': 'bar', 'description': 'rab', 'effort': 1.1}
+ post2['new_top_step'] = 1
+ post3 = {'title': 'baz', 'description': 'zab', 'effort': 0.9}
+ post3['new_top_step'] = 1
+ self.post_process(1, post1)
+ self.post_process(2, post2)
+ self.post_process(3, post3)
+ post3['new_top_step'] = 2
+ post3['keep_step'] = 2
+ post3['steps'] = [2]
+ post3['step_2_process_id'] = 1
+ self.post_process(3, post3)
+ proc1 = self.proc_as_dict(1, post1['title'],
+ post1['description'], post1['effort'])
+ proc2 = self.proc_as_dict(2, post2['title'],
+ post2['description'], post2['effort'])
+ proc3 = self.proc_as_dict(3, post3['title'],
+ post3['description'], post3['effort'])
+ proc2['explicit_steps'] = [1]
+ proc3['explicit_steps'] = [2, 3]
+ step1 = self.procstep_as_dict(1, 2, 1)
+ step2 = self.procstep_as_dict(2, 3, 1)
+ step3 = self.procstep_as_dict(3, 3, 2)
+ expected = self.GET_processes_dict([proc2, proc3, proc1])
+ assert isinstance(expected['_library'], dict)
+ expected['_library']['ProcessStep'] = self.as_refs([step1, step2,
+ step3])
+ self.check_json_get('/processes', expected)
+ # test other sortings
+ expected['sort_by'] = '-title'
+ expected['processes'] = self.as_id_list([proc1, proc3, proc2])
+ self.check_json_get('/processes?sort_by=-title', expected)
+ expected['sort_by'] = 'effort'
+ expected['processes'] = self.as_id_list([proc3, proc1, proc2])
+ self.check_json_get('/processes?sort_by=effort', expected)
+ expected['sort_by'] = '-effort'
+ expected['processes'] = self.as_id_list([proc2, proc1, proc3])
+ self.check_json_get('/processes?sort_by=-effort', expected)
+ expected['sort_by'] = 'steps'
+ expected['processes'] = self.as_id_list([proc1, proc2, proc3])
+ self.check_json_get('/processes?sort_by=steps', expected)
+ expected['sort_by'] = '-steps'
+ expected['processes'] = self.as_id_list([proc3, proc2, proc1])
+ self.check_json_get('/processes?sort_by=-steps', expected)
+ expected['sort_by'] = 'owners'
+ expected['processes'] = self.as_id_list([proc3, proc2, proc1])
+ self.check_json_get('/processes?sort_by=owners', expected)
+ expected['sort_by'] = '-owners'
+ expected['processes'] = self.as_id_list([proc1, proc2, proc3])
+ self.check_json_get('/processes?sort_by=-owners', expected)
+ # test pattern matching on title
+ expected = self.GET_processes_dict([proc2, proc3])
+ assert isinstance(expected['_library'], dict)
+ expected['pattern'] = 'ba'
+ expected['_library']['ProcessStep'] = self.as_refs([step1, step2,
+ step3])
+ self.check_json_get('/processes?pattern=ba', expected)
+ # test pattern matching on description
+ expected['processes'] = self.as_id_list([proc1])
+ expected['_library'] = {'Process': self.as_refs([proc1])}
+ expected['pattern'] = 'of'
+ self.check_json_get('/processes?pattern=of', expected)
"""Test Todos module."""
-from tests.utils import TestCaseWithDB, TestCaseWithServer
+from tests.utils import TestCaseSansDB, TestCaseWithDB, TestCaseWithServer
from plomtask.todos import Todo, TodoNode
from plomtask.processes import Process, ProcessStep
from plomtask.conditions import Condition
HandledException)
-class TestsWithDB(TestCaseWithDB):
- """Tests requiring DB, but not server setup."""
+class TestsWithDB(TestCaseWithDB, TestCaseSansDB):
+ """Tests requiring DB, but not server setup.
+
+ NB: We subclass TestCaseSansDB too, to pull in its .test_id_validation,
+ which for Todo wouldn't run without a DB being set up due to the need for
+ Processes with set IDs.
+ """
checked_class = Todo
default_init_kwargs = {'process': None, 'is_done': False,
'date': '2024-01-01'}
+ # solely used for TestCaseSansDB.test_id_setting
+ default_init_args = [None, False, '2024-01-01']
def setUp(self) -> None:
super().setUp()
self.cond2 = Condition(None)
self.cond2.save(self.db_conn)
self.default_init_kwargs['process'] = self.proc
+ self.default_init_args[0] = self.proc
def test_Todo_init(self) -> None:
"""Test creation of Todo and what they default to."""
self.assertEqual(todo_yes_id.enables, [])
self.assertEqual(todo_yes_id.disables, [])
- def test_Todo_by_id(self) -> None:
- """Test findability of Todos."""
- todo = Todo(1, self.proc, False, self.date1)
- todo.save(self.db_conn)
- self.assertEqual(Todo.by_id(self.db_conn, 1), todo)
- with self.assertRaises(NotFoundException):
- Todo.by_id(self.db_conn, 0)
- with self.assertRaises(NotFoundException):
- Todo.by_id(self.db_conn, 2)
-
def test_Todo_by_date(self) -> None:
"""Test findability of Todos by date."""
t1 = Todo(None, self.proc, False, self.date1)
assert isinstance(todo_1.id_, int)
# test minimum
node_0 = TodoNode(todo_1, False, [])
- self.assertEqual(todo_1.get_step_tree(set()), node_0)
+ self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict)
# test non_emtpy seen_todo does something
node_0.seen = True
- self.assertEqual(todo_1.get_step_tree({todo_1.id_}), node_0)
+ self.assertEqual(todo_1.get_step_tree({todo_1.id_}).as_dict,
+ node_0.as_dict)
# test child shows up
todo_2 = Todo(None, self.proc, False, self.date1)
todo_2.save(self.db_conn)
node_2 = TodoNode(todo_2, False, [])
node_0.children = [node_2]
node_0.seen = False
- self.assertEqual(todo_1.get_step_tree(set()), node_0)
+ self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict)
# test child shows up with child
todo_3 = Todo(None, self.proc, False, self.date1)
todo_3.save(self.db_conn)
todo_2.add_child(todo_3)
node_3 = TodoNode(todo_3, False, [])
node_2.children = [node_3]
- self.assertEqual(todo_1.get_step_tree(set()), node_0)
+ self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict)
# test same todo can be child-ed multiple times at different locations
todo_1.add_child(todo_3)
node_4 = TodoNode(todo_3, True, [])
node_0.children += [node_4]
- self.assertEqual(todo_1.get_step_tree(set()), node_0)
+ self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict)
def test_Todo_create_with_children(self) -> None:
"""Test parenthood guaranteeds of Todo.create_with_children."""
self.assertEqual(len(todo_3.children), 1)
self.assertEqual(todo_3.children[0].process, proc4)
- def test_Todo_singularity(self) -> None:
- """Test pointers made for single object keep pointing to it."""
- self.check_singularity('is_done', True, self.proc, False, self.date1)
-
def test_Todo_remove(self) -> None:
"""Test removal."""
todo_1 = Todo(None, self.proc, False, self.date1)
todo_1.save(self.db_conn)
+ assert todo_1.id_ is not None
todo_0 = Todo(None, self.proc, False, self.date1)
todo_0.save(self.db_conn)
todo_0.add_child(todo_1)
todo_1.comment = 'foo'
todo_1.effort = -0.1
todo_1.save(self.db_conn)
+ assert todo_1.id_ is not None
Todo.by_id(self.db_conn, todo_1.id_)
todo_1.comment = ''
todo_1_id = todo_1.id_
form_data = {'day_comment': '', 'make_type': 'full'}
self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
self.assertEqual(Todo.by_date(self.db_conn, '2024-01-01'), [])
+ proc = Process.by_id(self.db_conn, 1)
form_data['new_todo'] = str(proc.id_)
self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
todos = Todo.by_date(self.db_conn, '2024-01-01')
self.assertEqual(1, len(todos))
todo1 = todos[0]
self.assertEqual(todo1.id_, 1)
+ proc = Process.by_id(self.db_conn, 1)
self.assertEqual(todo1.process.id_, proc.id_)
self.assertEqual(todo1.is_done, False)
+ proc2 = Process.by_id(self.db_conn, 2)
form_data['new_todo'] = str(proc2.id_)
self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
todos = Todo.by_date(self.db_conn, '2024-01-01')
todo1 = todos[1]
self.assertEqual(todo1.id_, 2)
+ proc2 = Process.by_id(self.db_conn, 1)
+ todo1 = Todo.by_date(self.db_conn, '2024-01-01')[0]
+ self.assertEqual(todo1.id_, 1)
self.assertEqual(todo1.process.id_, proc2.id_)
self.assertEqual(todo1.is_done, False)
'/day?date=2024-01-01&make_type=full', 302)
# test posting to bad URLs
self.check_post({}, '/todo=', 404)
- self.check_post({}, '/todo?id=', 400)
+ self.check_post({}, '/todo?id=', 404)
self.check_post({}, '/todo?id=FOO', 400)
self.check_post({}, '/todo?id=0', 404)
# test posting naked entity
form_data = {'day_comment': '', 'new_todo': [1], 'make_type': 'full'}
self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
todo = Todo.by_date(self.db_conn, '2024-01-01')[0]
- form_data = {'day_comment': '', 'todo_id': [1], 'make_type': 'full'}
+ form_data = {'day_comment': '', 'todo_id': [1], 'make_type': 'full',
+ 'comment': [''], 'done': [], 'effort': ['']}
self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
+ todo = Todo.by_date(self.db_conn, '2024-01-01')[0]
self.assertEqual(todo.is_done, False)
form_data = {'day_comment': '', 'todo_id': [1], 'done': [1],
- 'make_type': 'full'}
+ 'make_type': 'full', 'comment': [''], 'effort': ['']}
self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
+ todo = Todo.by_date(self.db_conn, '2024-01-01')[0]
self.assertEqual(todo.is_done, True)
def test_do_GET_todo(self) -> None:
self.post_process()
form_data = {'day_comment': '', 'new_todo': 1, 'make_type': 'full'}
self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
- self.check_get('/todo', 400)
- self.check_get('/todo?id=', 400)
+ self.check_get('/todo', 404)
+ self.check_get('/todo?id=', 404)
self.check_get('/todo?id=foo', 400)
self.check_get('/todo?id=0', 404)
self.check_get('/todo?id=1', 200)
"""Shared test utilities."""
+from __future__ import annotations
from unittest import TestCase
+from typing import Mapping, Any, Callable
from threading import Thread
from http.client import HTTPConnection
+from json import loads as json_loads
from urllib.parse import urlencode
from uuid import uuid4
from os import remove as remove_file
-from typing import Mapping, Any
from plomtask.db import DatabaseFile, DatabaseConnection
from plomtask.http import TaskHandler, TaskServer
from plomtask.processes import Process, ProcessStep
from plomtask.exceptions import NotFoundException, HandledException
+def _within_checked_class(f: Callable[..., None]) -> Callable[..., None]:
+ def wrapper(self: TestCase) -> None:
+ if hasattr(self, 'checked_class'):
+ f(self)
+ return wrapper
+
+
class TestCaseSansDB(TestCase):
"""Tests requiring no DB setup."""
checked_class: Any
- do_id_test: bool = False
default_init_args: list[Any] = []
versioned_defaults_to_test: dict[str, str | float] = {}
+ legal_ids = [1, 5]
+ illegal_ids = [0]
- def test_id_setting(self) -> None:
- """Test .id_ being set and its legal range being enforced."""
- if not self.do_id_test:
- return
- with self.assertRaises(HandledException):
- self.checked_class(0, *self.default_init_args)
- obj = self.checked_class(5, *self.default_init_args)
- self.assertEqual(obj.id_, 5)
+ @_within_checked_class
+ def test_id_validation(self) -> None:
+ """Test .id_ validation/setting."""
+ for id_ in self.illegal_ids:
+ with self.assertRaises(HandledException):
+ self.checked_class(id_, *self.default_init_args)
+ for id_ in self.legal_ids:
+ obj = self.checked_class(id_, *self.default_init_args)
+ self.assertEqual(obj.id_, id_)
+ @_within_checked_class
def test_versioned_defaults(self) -> None:
"""Test defaults of VersionedAttributes."""
- if len(self.versioned_defaults_to_test) == 0:
- return
- obj = self.checked_class(1, *self.default_init_args)
+ id_ = self.legal_ids[0]
+ obj = self.checked_class(id_, *self.default_init_args)
for k, v in self.versioned_defaults_to_test.items():
self.assertEqual(getattr(obj, k).newest, v)
self.db_conn.close()
remove_file(self.db_file.path)
- def test_saving_and_caching(self) -> None:
- """Test storage and initialization of instances and attributes."""
- if not hasattr(self, 'checked_class'):
- return
- self.check_saving_and_caching(id_=1, **self.default_init_kwargs)
- obj = self.checked_class(None, **self.default_init_kwargs)
- obj.save(self.db_conn)
- self.assertEqual(obj.id_, 2)
- for k, v in self.test_versioneds.items():
- self.check_saving_of_versioned(k, v)
+ def _load_from_db(self, id_: int | str) -> list[object]:
+ db_found: list[object] = []
+ for row in self.db_conn.row_where(self.checked_class.table_name,
+ 'id', id_):
+ db_found += [self.checked_class.from_table_row(self.db_conn,
+ row)]
+ return db_found
- def check_storage(self, content: list[Any]) -> None:
- """Test cache and DB equal content."""
+ def _change_obj(self, obj: object) -> str:
+ attr_name: str = self.checked_class.to_save[-1]
+ attr = getattr(obj, attr_name)
+ new_attr: str | int | float | bool
+ if isinstance(attr, (int, float)):
+ new_attr = attr + 1
+ elif isinstance(attr, str):
+ new_attr = attr + '_'
+ elif isinstance(attr, bool):
+ new_attr = not attr
+ setattr(obj, attr_name, new_attr)
+ return attr_name
+
+ def check_identity_with_cache_and_db(self, content: list[Any]) -> None:
+ """Test both cache and DB equal content."""
expected_cache = {}
for item in content:
expected_cache[item.id_] = item
db_found: list[Any] = []
for item in content:
assert isinstance(item.id_, type(self.default_ids[0]))
- for row in self.db_conn.row_where(self.checked_class.table_name,
- 'id', item.id_):
- db_found += [self.checked_class.from_table_row(self.db_conn,
- row)]
+ db_found += self._load_from_db(item.id_)
hashes_db_found = [hash(x) for x in db_found]
self.assertEqual(sorted(hashes_content), sorted(hashes_db_found))
- def check_saving_and_caching(self, **kwargs: Any) -> None:
- """Test instance.save in its core without relations."""
- obj = self.checked_class(**kwargs) # pylint: disable=not-callable
- # check object init itself doesn't store anything yet
- self.check_storage([])
- # check saving sets core attributes properly
- obj.save(self.db_conn)
- for key, value in kwargs.items():
- self.assertEqual(getattr(obj, key), value)
- # check saving stored properly in cache and DB
- self.check_storage([obj])
-
- def check_saving_of_versioned(self, attr_name: str, type_: type) -> None:
- """Test owner's versioned attributes."""
- owner = self.checked_class(None)
- vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1]
- attr = getattr(owner, attr_name)
- attr.set(vals[0])
- attr.set(vals[1])
- owner.save(self.db_conn)
- retrieved = owner.__class__.by_id(self.db_conn, owner.id_)
- attr = getattr(retrieved, attr_name)
- self.assertEqual(sorted(attr.history.values()), vals)
-
- def check_by_id(self) -> None:
- """Test .by_id(), including creation."""
+ @_within_checked_class
+ def test_saving_versioned(self) -> None:
+ """Test storage and initialization of versioned attributes."""
+ def retrieve_attr_vals() -> list[object]:
+ attr_vals_saved: list[object] = []
+ assert hasattr(retrieved, 'id_')
+ for row in self.db_conn.row_where(attr.table_name, 'parent',
+ retrieved.id_):
+ attr_vals_saved += [row[2]]
+ return attr_vals_saved
+ for attr_name, type_ in self.test_versioneds.items():
+ # fail saving attributes on non-saved owner
+ owner = self.checked_class(None, **self.default_init_kwargs)
+ vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1]
+ attr = getattr(owner, attr_name)
+ attr.set(vals[0])
+ attr.set(vals[1])
+ with self.assertRaises(NotFoundException):
+ attr.save(self.db_conn)
+ owner.save(self.db_conn)
+ # check stored attribute is as expected
+ retrieved = self._load_from_db(owner.id_)[0]
+ attr = getattr(retrieved, attr_name)
+ self.assertEqual(sorted(attr.history.values()), vals)
+ # check owner.save() created entries in attr table
+ attr_vals_saved = retrieve_attr_vals()
+ self.assertEqual(vals, attr_vals_saved)
+ # check setting new val to attr inconsequential to DB without save
+ attr.set(vals[0])
+ attr_vals_saved = retrieve_attr_vals()
+ self.assertEqual(vals, attr_vals_saved)
+ # check save finally adds new val
+ attr.save(self.db_conn)
+ attr_vals_saved = retrieve_attr_vals()
+ self.assertEqual(vals + [vals[0]], attr_vals_saved)
+
+ @_within_checked_class
+ def test_saving_and_caching(self) -> None:
+ """Test effects of .cache() and .save()."""
+ id1 = self.default_ids[0]
+ # check failure to cache without ID (if None-ID input possible)
+ if isinstance(id1, int):
+ obj0 = self.checked_class(None, **self.default_init_kwargs)
+ with self.assertRaises(HandledException):
+ obj0.cache()
+ # check mere object init itself doesn't even store in cache
+ obj1 = self.checked_class(id1, **self.default_init_kwargs)
+ self.assertEqual(self.checked_class.get_cache(), {})
+ # check .cache() fills cache, but not DB
+ obj1.cache()
+ self.assertEqual(self.checked_class.get_cache(), {id1: obj1})
+ db_found = self._load_from_db(id1)
+ self.assertEqual(db_found, [])
+ # check .save() sets ID (for int IDs), updates cache, and fills DB
+ # (expect ID to be set to id1, despite obj1 already having that as ID:
+ # it's generated by cursor.lastrowid on the DB table, and with obj1
+ # not written there, obj2 should get it first!)
+ id_input = None if isinstance(id1, int) else id1
+ obj2 = self.checked_class(id_input, **self.default_init_kwargs)
+ obj2.save(self.db_conn)
+ obj2_hash = hash(obj2)
+ self.assertEqual(self.checked_class.get_cache(), {id1: obj2})
+ db_found += self._load_from_db(id1)
+ self.assertEqual([hash(o) for o in db_found], [obj2_hash])
+ # check we cannot overwrite obj2 with obj1 despite its same ID,
+ # since it has disappeared now
+ with self.assertRaises(HandledException):
+ obj1.save(self.db_conn)
+
+ @_within_checked_class
+ def test_by_id(self) -> None:
+ """Test .by_id()."""
+ id1, id2, _ = self.default_ids
# check failure if not yet saved
- id1, id2 = self.default_ids[0], self.default_ids[1]
- obj = self.checked_class(id1) # pylint: disable=not-callable
+ obj1 = self.checked_class(id1, **self.default_init_kwargs)
with self.assertRaises(NotFoundException):
self.checked_class.by_id(self.db_conn, id1)
+ # check identity of cached and retrieved
+ obj1.cache()
+ self.assertEqual(obj1, self.checked_class.by_id(self.db_conn, id1))
# check identity of saved and retrieved
- obj.save(self.db_conn)
- self.assertEqual(obj, self.checked_class.by_id(self.db_conn, id1))
- # check create=True acts like normal instantiation (sans saving)
- by_id_created = self.checked_class.by_id(self.db_conn, id2,
- create=True)
- # pylint: disable=not-callable
- self.assertEqual(self.checked_class(id2), by_id_created)
- self.check_storage([obj])
-
- def check_from_table_row(self, *args: Any) -> None:
- """Test .from_table_row() properly reads in class from DB"""
+ obj2 = self.checked_class(id2, **self.default_init_kwargs)
+ obj2.save(self.db_conn)
+ self.assertEqual(obj2, self.checked_class.by_id(self.db_conn, id2))
+
+ @_within_checked_class
+ def test_by_id_or_create(self) -> None:
+ """Test .by_id_or_create."""
+ # check .by_id_or_create fails if wrong class
+ if not self.checked_class.can_create_by_id:
+ with self.assertRaises(HandledException):
+ self.checked_class.by_id_or_create(self.db_conn, None)
+ return
+ # check ID input of None creates, on saving, ID=1,2,… for int IDs
+ if isinstance(self.default_ids[0], int):
+ for n in range(2):
+ item = self.checked_class.by_id_or_create(self.db_conn, None)
+ self.assertEqual(item.id_, None)
+ item.save(self.db_conn)
+ self.assertEqual(item.id_, n+1)
+ # check .by_id_or_create acts like normal instantiation (sans saving)
+ id_ = self.default_ids[2]
+ item = self.checked_class.by_id_or_create(self.db_conn, id_)
+ self.assertEqual(item.id_, id_)
+ with self.assertRaises(NotFoundException):
+ self.checked_class.by_id(self.db_conn, item.id_)
+ self.assertEqual(self.checked_class(item.id_), item)
+
+ @_within_checked_class
+ def test_from_table_row(self) -> None:
+ """Test .from_table_row() properly reads in class directly from DB."""
id_ = self.default_ids[0]
- obj = self.checked_class(id_, *args) # pylint: disable=not-callable
+ obj = self.checked_class(id_, **self.default_init_kwargs)
obj.save(self.db_conn)
- assert isinstance(obj.id_, type(self.default_ids[0]))
+ assert isinstance(obj.id_, type(id_))
for row in self.db_conn.row_where(self.checked_class.table_name,
'id', obj.id_):
+ # check .from_table_row reproduces state saved, no matter if obj
+ # later changed (with caching even)
hash_original = hash(obj)
+ attr_name = self._change_obj(obj)
+ obj.cache()
+ to_cmp = getattr(obj, attr_name)
retrieved = self.checked_class.from_table_row(self.db_conn, row)
+ self.assertNotEqual(to_cmp, getattr(retrieved, attr_name))
self.assertEqual(hash_original, hash(retrieved))
+ # check cache contains what .from_table_row just produced
self.assertEqual({retrieved.id_: retrieved},
self.checked_class.get_cache())
+ # check .from_table_row also reads versioned attributes from DB
+ for attr_name, type_ in self.test_versioneds.items():
+ owner = self.checked_class(None)
+ vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1]
+ attr = getattr(owner, attr_name)
+ attr.set(vals[0])
+ attr.set(vals[1])
+ owner.save(self.db_conn)
+ for row in self.db_conn.row_where(owner.table_name, 'id',
+ owner.id_):
+ retrieved = owner.__class__.from_table_row(self.db_conn, row)
+ attr = getattr(retrieved, attr_name)
+ self.assertEqual(sorted(attr.history.values()), vals)
- def check_versioned_from_table_row(self, attr_name: str,
- type_: type) -> None:
- """Test .from_table_row() reads versioned attributes from DB."""
- owner = self.checked_class(None)
- vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1]
- attr = getattr(owner, attr_name)
- attr.set(vals[0])
- attr.set(vals[1])
- owner.save(self.db_conn)
- for row in self.db_conn.row_where(owner.table_name, 'id', owner.id_):
- retrieved = owner.__class__.from_table_row(self.db_conn, row)
- attr = getattr(retrieved, attr_name)
- self.assertEqual(sorted(attr.history.values()), vals)
-
- def check_all(self) -> tuple[Any, Any, Any]:
- """Test .all()."""
- # pylint: disable=not-callable
- item1 = self.checked_class(self.default_ids[0])
- item2 = self.checked_class(self.default_ids[1])
- item3 = self.checked_class(self.default_ids[2])
- # check pre-save .all() returns empty list
+ @_within_checked_class
+ def test_all(self) -> None:
+ """Test .all() and its relation to cache and savings."""
+ id_1, id_2, id_3 = self.default_ids
+ item1 = self.checked_class(id_1, **self.default_init_kwargs)
+ item2 = self.checked_class(id_2, **self.default_init_kwargs)
+ item3 = self.checked_class(id_3, **self.default_init_kwargs)
+ # check .all() returns empty list on un-cached items
self.assertEqual(self.checked_class.all(self.db_conn), [])
- # check that all() shows all saved, but no unsaved items
- item1.save(self.db_conn)
+ # check that all() shows only cached/saved items
+ item1.cache()
item3.save(self.db_conn)
self.assertEqual(sorted(self.checked_class.all(self.db_conn)),
sorted([item1, item3]))
item2.save(self.db_conn)
self.assertEqual(sorted(self.checked_class.all(self.db_conn)),
sorted([item1, item2, item3]))
- return item1, item2, item3
- def check_singularity(self, defaulting_field: str,
- non_default_value: Any, *args: Any) -> None:
+ @_within_checked_class
+ def test_singularity(self) -> None:
"""Test pointers made for single object keep pointing to it."""
id1 = self.default_ids[0]
- obj = self.checked_class(id1, *args) # pylint: disable=not-callable
+ obj = self.checked_class(id1, **self.default_init_kwargs)
obj.save(self.db_conn)
- setattr(obj, defaulting_field, non_default_value)
+ # change object, expect retrieved through .by_id to carry change
+ attr_name = self._change_obj(obj)
+ new_attr = getattr(obj, attr_name)
retrieved = self.checked_class.by_id(self.db_conn, id1)
- self.assertEqual(non_default_value,
- getattr(retrieved, defaulting_field))
+ self.assertEqual(new_attr, getattr(retrieved, attr_name))
- def check_versioned_singularity(self) -> None:
+ @_within_checked_class
+ def test_versioned_singularity_title(self) -> None:
"""Test singularity of VersionedAttributes on saving (with .title)."""
- obj = self.checked_class(None) # pylint: disable=not-callable
- obj.save(self.db_conn)
- assert isinstance(obj.id_, int)
- obj.title.set('named')
- retrieved = self.checked_class.by_id(self.db_conn, obj.id_)
- self.assertEqual(obj.title.history, retrieved.title.history)
+ if 'title' in self.test_versioneds:
+ obj = self.checked_class(None)
+ obj.save(self.db_conn)
+ assert isinstance(obj.id_, int)
+ # change obj, expect retrieved through .by_id to carry change
+ obj.title.set('named')
+ retrieved = self.checked_class.by_id(self.db_conn, obj.id_)
+ self.assertEqual(obj.title.history, retrieved.title.history)
- def check_remove(self, *args: Any) -> None:
+ @_within_checked_class
+ def test_remove(self) -> None:
"""Test .remove() effects on DB and cache."""
id_ = self.default_ids[0]
- obj = self.checked_class(id_, *args) # pylint: disable=not-callable
+ obj = self.checked_class(id_, **self.default_init_kwargs)
+ # check removal only works after saving
with self.assertRaises(HandledException):
obj.remove(self.db_conn)
obj.save(self.db_conn)
obj.remove(self.db_conn)
- self.check_storage([])
+ # check access to obj fails after removal
+ with self.assertRaises(HandledException):
+ print(obj.id_)
+ # check DB and cache now empty
+ self.check_identity_with_cache_and_db([])
class TestCaseWithServer(TestCaseWithDB):
self.server_thread.start()
self.conn = HTTPConnection(str(self.httpd.server_address[0]),
self.httpd.server_address[1])
+ self.httpd.set_json_mode()
def tearDown(self) -> None:
self.httpd.shutdown()
self.server_thread.join()
super().tearDown()
+ @staticmethod
+ def as_id_list(items: list[dict[str, object]]) -> list[int | str]:
+ """Return list of only 'id' fields of items."""
+ id_list = []
+ for item in items:
+ assert isinstance(item['id'], (int, str))
+ id_list += [item['id']]
+ return id_list
+
+ @staticmethod
+ def as_refs(items: list[dict[str, object]]
+ ) -> dict[str, dict[str, object]]:
+ """Return dictionary of items by their 'id' fields."""
+ refs = {}
+ for item in items:
+ refs[str(item['id'])] = item
+ return refs
+
+ @staticmethod
+ def cond_as_dict(id_: int = 1,
+ is_active: bool = False,
+ titles: None | list[str] = None,
+ descriptions: None | list[str] = None
+ ) -> dict[str, object]:
+ """Return JSON of Condition to expect."""
+ d = {'id': id_,
+ 'is_active': is_active,
+ '_versioned': {
+ 'title': {},
+ 'description': {}}}
+ titles = titles if titles else []
+ descriptions = descriptions if descriptions else []
+ assert isinstance(d['_versioned'], dict)
+ for i, title in enumerate(titles):
+ d['_versioned']['title'][i] = title
+ for i, description in enumerate(descriptions):
+ d['_versioned']['description'][i] = description
+ return d
+
+ @staticmethod
+ def proc_as_dict(id_: int = 1,
+ title: str = 'A',
+ description: str = '',
+ effort: float = 1.0,
+ conditions: None | list[int] = None,
+ disables: None | list[int] = None,
+ blockers: None | list[int] = None,
+ enables: None | list[int] = None
+ ) -> dict[str, object]:
+ """Return JSON of Process to expect."""
+ # pylint: disable=too-many-arguments
+ d = {'id': id_,
+ 'calendarize': False,
+ 'suppressed_steps': [],
+ 'explicit_steps': [],
+ '_versioned': {
+ 'title': {0: title},
+ 'description': {0: description},
+ 'effort': {0: effort}},
+ 'conditions': conditions if conditions else [],
+ 'disables': disables if disables else [],
+ 'enables': enables if enables else [],
+ 'blockers': blockers if blockers else []}
+ return d
+
def check_redirect(self, target: str) -> None:
"""Check that self.conn answers with a 302 redirect to target."""
response = self.conn.getresponse()
self.check_post(form_data, f'/process?id={id_}', 302,
f'/process?id={id_}')
return form_data
+
+ def check_json_get(self, path: str, expected: dict[str, object]) -> None:
+ """Compare JSON on GET path with expected.
+
+ To simplify comparison of VersionedAttribute histories, transforms
+ timestamp keys of VersionedAttribute history keys into integers
+ counting chronologically forward from 0.
+ """
+ def rewrite_history_keys_in(item: Any) -> Any:
+ if isinstance(item, dict):
+ if '_versioned' in item.keys():
+ for k in item['_versioned']:
+ vals = item['_versioned'][k].values()
+ history = {}
+ for i, val in enumerate(vals):
+ history[i] = val
+ item['_versioned'][k] = history
+ for k in list(item.keys()):
+ rewrite_history_keys_in(item[k])
+ elif isinstance(item, list):
+ item[:] = [rewrite_history_keys_in(i) for i in item]
+ return item
+ self.conn.request('GET', path)
+ response = self.conn.getresponse()
+ self.assertEqual(response.status, 200)
+ retrieved = json_loads(response.read().decode())
+ rewrite_history_keys_in(retrieved)
+ self.assertEqual(expected, retrieved)