From: Christian Heller Date: Sat, 22 Jun 2024 02:53:16 +0000 (+0200) Subject: Slightly improve and re-organize Condition tests. X-Git-Url: https://plomlompom.com/repos/berlin_corona.txt?a=commitdiff_plain;h=HEAD;hp=a658d31f985ec9a08181005614c3d9876e046274;p=plomtask Slightly improve and re-organize Condition tests. --- diff --git a/.pylintrc b/.pylintrc index b4814d1..50133a0 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,3 +1,3 @@ [BASIC] init-hook='import sys; sys.path.append(".")' -good-names-rgxs=.*_?do_(GET|POST)(_[a-z]+)?,test_[A-Z]+ +good-names-rgxs=(.*_)?(GET|POST)(_.+)?,,test_[A-Z]+ diff --git a/plomtask/conditions.py b/plomtask/conditions.py index d255927..15dcb9d 100644 --- a/plomtask/conditions.py +++ b/plomtask/conditions.py @@ -1,7 +1,5 @@ """Non-doable elements of ProcessStep/Todo chains.""" from __future__ import annotations -from typing import Any -from sqlite3 import Row from plomtask.db import DatabaseConnection, BaseModel from plomtask.versioned_attributes import VersionedAttribute from plomtask.exceptions import HandledException @@ -13,6 +11,9 @@ class Condition(BaseModel[int]): to_save = ['is_active'] to_save_versioned = ['title', 'description'] to_search = ['title.newest', 'description.newest'] + can_create_by_id = True + sorters = {'is_active': lambda c: c.is_active, + 'title': lambda c: c.title.newest} def __init__(self, id_: int | None, is_active: bool = False) -> None: super().__init__(id_) @@ -21,30 +22,20 @@ class Condition(BaseModel[int]): self.description = VersionedAttribute(self, 'condition_descriptions', '') - @classmethod - def from_table_row(cls, db_conn: DatabaseConnection, - row: Row | list[Any]) -> Condition: - """Build condition from row, including VersionedAttributes.""" - condition = super().from_table_row(db_conn, row) - for name in ('title', 'description'): - table_name = f'condition_{name}s' - for row_ in db_conn.row_where(table_name, 'parent', row[0]): - getattr(condition, name).history_from_row(row_) - return condition - def remove(self, db_conn: DatabaseConnection) -> None: """Remove from DB, with VersionedAttributes. Checks for Todos and Processes that depend on Condition, prohibits deletion if found. """ - if self.id_ is None: - raise HandledException('cannot remove unsaved item') - for item in ('process', 'todo'): - for attr in ('conditions', 'blockers', 'enables', 'disables'): - table_name = f'{item}_{attr}' - for _ in db_conn.row_where(table_name, 'condition', self.id_): - raise HandledException('cannot remove Condition in use') + if self.id_ is not None: + for item in ('process', 'todo'): + for attr in ('conditions', 'blockers', 'enables', 'disables'): + table_name = f'{item}_{attr}' + for _ in db_conn.row_where(table_name, 'condition', + self.id_): + msg = 'cannot remove Condition in use' + raise HandledException(msg) super().remove(db_conn) diff --git a/plomtask/days.py b/plomtask/days.py index afe4a01..2320130 100644 --- a/plomtask/days.py +++ b/plomtask/days.py @@ -12,6 +12,8 @@ class Day(BaseModel[str]): """Individual days defined by their dates.""" table_name = 'days' to_save = ['comment'] + add_to_dict = ['todos'] + can_create_by_id = True def __init__(self, date: str, comment: str = '') -> None: id_ = valid_date(date) @@ -33,13 +35,16 @@ class Day(BaseModel[str]): return day @classmethod - def by_id(cls, - db_conn: DatabaseConnection, id_: str | None, - create: bool = False, - ) -> Day: - """Extend BaseModel.by_id checking for new/lost .todos.""" - day = super().by_id(db_conn, id_, create) - assert day.id_ is not None + def by_id(cls, db_conn: DatabaseConnection, id_: str) -> Day: + """Extend BaseModel.by_id + + Checks Todo.days_to_update if we need to a retrieved Day's .todos, + and also ensures we're looking for proper dates and not strings like + "yesterday" by enforcing the valid_date translation. + """ + assert isinstance(id_, str) + possibly_translated_date = valid_date(id_) + day = super().by_id(db_conn, possibly_translated_date) if day.id_ in Todo.days_to_update: Todo.days_to_update.remove(day.id_) day.todos = Todo.by_date(db_conn, day.id_) diff --git a/plomtask/db.py b/plomtask/db.py index 99998a6..13cdaef 100644 --- a/plomtask/db.py +++ b/plomtask/db.py @@ -4,7 +4,7 @@ from os import listdir from os.path import isfile from difflib import Differ from sqlite3 import connect as sql_connect, Cursor, Row -from typing import Any, Self, TypeVar, Generic +from typing import Any, Self, TypeVar, Generic, Callable from plomtask.exceptions import HandledException, NotFoundException from plomtask.dating import valid_date @@ -235,10 +235,13 @@ class BaseModel(Generic[BaseModelId]): to_save: list[str] = [] to_save_versioned: list[str] = [] to_save_relations: list[tuple[str, str, str, int]] = [] + add_to_dict: list[str] = [] id_: None | BaseModelId cache_: dict[BaseModelId, Self] to_search: list[str] = [] + can_create_by_id = False _exists = True + sorters: dict[str, Callable[..., Any]] = {} def __init__(self, id_: BaseModelId | None) -> None: if isinstance(id_, int) and id_ < 1: @@ -271,6 +274,84 @@ class BaseModel(Generic[BaseModelId]): assert isinstance(other.id_, int) return self.id_ < other.id_ + @property + def as_dict(self) -> dict[str, object]: + """Return self as (json.dumps-compatible) dict.""" + library: dict[str, dict[str | int, object]] = {} + d: dict[str, object] = {'id': self.id_, '_library': library} + for to_save in self.to_save: + attr = getattr(self, to_save) + if hasattr(attr, 'as_dict_into_reference'): + d[to_save] = attr.as_dict_into_reference(library) + else: + d[to_save] = attr + if len(self.to_save_versioned) > 0: + d['_versioned'] = {} + for k in self.to_save_versioned: + attr = getattr(self, k) + assert isinstance(d['_versioned'], dict) + d['_versioned'][k] = attr.history + for r in self.to_save_relations: + attr_name = r[2] + l: list[int | str] = [] + for rel in getattr(self, attr_name): + l += [rel.as_dict_into_reference(library)] + d[attr_name] = l + for k in self.add_to_dict: + d[k] = [x.as_dict_into_reference(library) + for x in getattr(self, k)] + return d + + def as_dict_into_reference(self, + library: dict[str, dict[str | int, object]] + ) -> int | str: + """Return self.id_ while writing .as_dict into library.""" + def into_library(library: dict[str, dict[str | int, object]], + cls_name: str, + id_: str | int, + d: dict[str, object] + ) -> None: + if cls_name not in library: + library[cls_name] = {} + if id_ in library[cls_name]: + if library[cls_name][id_] != d: + msg = 'Unexpected inequality of entries for ' +\ + f'_library at: {cls_name}/{id_}' + raise HandledException(msg) + else: + library[cls_name][id_] = d + as_dict = self.as_dict + assert isinstance(as_dict['_library'], dict) + for cls_name, dict_of_objs in as_dict['_library'].items(): + for id_, obj in dict_of_objs.items(): + into_library(library, cls_name, id_, obj) + del as_dict['_library'] + assert self.id_ is not None + into_library(library, self.__class__.__name__, self.id_, as_dict) + assert isinstance(as_dict['id'], (int, str)) + return as_dict['id'] + + @classmethod + def name_lowercase(cls) -> str: + """Convenience method to return cls' name in lowercase.""" + return cls.__name__.lower() + + @classmethod + def sort_by(cls, seq: list[Any], sort_key: str, default: str = 'title' + ) -> str: + """Sort cls list by cls.sorters[sort_key] (reverse if '-'-prefixed).""" + reverse = False + if len(sort_key) > 1 and '-' == sort_key[0]: + sort_key = sort_key[1:] + reverse = True + if sort_key not in cls.sorters: + sort_key = default + sorter: Callable[..., Any] = cls.sorters[sort_key] + seq.sort(key=sorter, reverse=reverse) + if reverse: + sort_key = f'-{sort_key}' + return sort_key + # cache management # (we primarily use the cache to ensure we work on the same object in # memory no matter where and how we retrieve it, e.g. we don't want @@ -295,7 +376,13 @@ class BaseModel(Generic[BaseModelId]): @classmethod def empty_cache(cls) -> None: - """Empty class's cache.""" + """Empty class's cache, and disappear all former inhabitants.""" + # pylint: disable=protected-access + # (cause we remain within the class) + if hasattr(cls, 'cache_'): + to_disappear = list(cls.cache_.values()) + for item in to_disappear: + item._disappear() cls.cache_ = {} @classmethod @@ -310,15 +397,14 @@ class BaseModel(Generic[BaseModelId]): def _get_cached(cls: type[BaseModelInstance], id_: BaseModelId) -> BaseModelInstance | None: """Get object of id_ from class's cache, or None if not found.""" - # pylint: disable=consider-iterating-dictionary cache = cls.get_cache() - if id_ in cache.keys(): + if id_ in cache: obj = cache[id_] assert isinstance(obj, cls) return obj return None - def _cache(self) -> None: + def cache(self) -> None: """Update object in class's cache. Also calls ._disappear if cache holds older reference to object of same @@ -349,22 +435,23 @@ class BaseModel(Generic[BaseModelId]): # pylint: disable=unused-argument db_conn: DatabaseConnection, row: Row | list[Any]) -> BaseModelInstance: - """Make from DB row, update DB cache with it.""" + """Make from DB row (sans relations), update DB cache with it.""" obj = cls(*row) - obj._cache() + assert obj.id_ is not None + for attr_name in cls.to_save_versioned: + attr = getattr(obj, attr_name) + table_name = attr.table_name + for row_ in db_conn.row_where(table_name, 'parent', obj.id_): + attr.history_from_row(row_) + obj.cache() return obj @classmethod - def by_id(cls, db_conn: DatabaseConnection, - id_: BaseModelId | None, - # pylint: disable=unused-argument - create: bool = False) -> Self: + def by_id(cls, db_conn: DatabaseConnection, id_: BaseModelId) -> Self: """Retrieve by id_, on failure throw NotFoundException. First try to get from cls.cache_, only then check DB; if found, put into cache. - - If create=True, make anew (but do not cache yet). """ obj = None if id_ is not None: @@ -375,11 +462,22 @@ class BaseModel(Generic[BaseModelId]): break if obj: return obj - if create: - obj = cls(id_) - return obj raise NotFoundException(f'found no object of ID {id_}') + @classmethod + def by_id_or_create(cls, db_conn: DatabaseConnection, + id_: BaseModelId | None + ) -> Self: + """Wrapper around .by_id, creating (not caching/saving) if not find.""" + if not cls.can_create_by_id: + raise HandledException('Class cannot .by_id_or_create.') + if id_ is None: + return cls(None) + try: + return cls.by_id(db_conn, id_) + except NotFoundException: + return cls(id_) + @classmethod def all(cls: type[BaseModelInstance], db_conn: DatabaseConnection) -> list[BaseModelInstance]: @@ -465,7 +563,7 @@ class BaseModel(Generic[BaseModelId]): values) if not isinstance(self.id_, str): self.id_ = cursor.lastrowid # type: ignore[assignment] - self._cache() + self.cache() for attr_name in self.to_save_versioned: getattr(self, attr_name).save(db_conn) for table, column, attr_name, key_index in self.to_save_relations: diff --git a/plomtask/http.py b/plomtask/http.py index 26c8b71..b7040f7 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -1,17 +1,19 @@ """Web server stuff.""" from __future__ import annotations from dataclasses import dataclass -from typing import Any, Callable, Mapping +from typing import Any, Callable from base64 import b64encode, b64decode +from binascii import Error as binascii_Exception from http.server import BaseHTTPRequestHandler from http.server import HTTPServer from urllib.parse import urlparse, parse_qs +from json import dumps as json_dumps from os.path import split as path_split from jinja2 import Environment as JinjaEnv, FileSystemLoader as JinjaFSLoader from plomtask.dating import date_in_n_days from plomtask.days import Day -from plomtask.exceptions import HandledException, BadFormatException, \ - NotFoundException +from plomtask.exceptions import (HandledException, BadFormatException, + NotFoundException) from plomtask.db import DatabaseConnection, DatabaseFile from plomtask.processes import Process, ProcessStep, ProcessStepsNode from plomtask.conditions import Condition @@ -27,7 +29,47 @@ class TaskServer(HTTPServer): *args: Any, **kwargs: Any) -> None: super().__init__(*args, **kwargs) self.db = db_file - self.jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR)) + self.headers: list[tuple[str, str]] = [] + self._render_mode = 'html' + self._jinja = JinjaEnv(loader=JinjaFSLoader(TEMPLATES_DIR)) + + def set_json_mode(self) -> None: + """Make server send JSON instead of HTML responses.""" + self._render_mode = 'json' + self.headers += [('Content-Type', 'application/json')] + + @staticmethod + def ctx_to_json(ctx: dict[str, object]) -> str: + """Render ctx into JSON string.""" + def walk_ctx(node: object) -> Any: + if hasattr(node, 'as_dict_into_reference'): + if hasattr(node, 'id_') and node.id_ is not None: + return node.as_dict_into_reference(library) + if hasattr(node, 'as_dict'): + return node.as_dict + if isinstance(node, (list, tuple)): + return [walk_ctx(x) for x in node] + if isinstance(node, dict): + d = {} + for k, v in node.items(): + d[k] = walk_ctx(v) + return d + if isinstance(node, HandledException): + return str(node) + return node + library: dict[str, dict[str | int, object]] = {} + for k, v in ctx.items(): + ctx[k] = walk_ctx(v) + ctx['_library'] = library + return json_dumps(ctx) + + def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str: + """Render ctx according to self._render_mode..""" + tmpl_name = f'{tmpl_name}.{self._render_mode}' + if 'html' == self._render_mode: + template = self._jinja.get_template(tmpl_name) + return template.render(ctx) + return self.__class__.ctx_to_json(ctx) class InputsParser: @@ -96,6 +138,20 @@ class InputsParser: msg = f'cannot int a form field value for key {key} in: {all_str}' raise BadFormatException(msg) from e + def get_all_floats_or_nones(self, key: str) -> list[float | None]: + """Retrieve list of float value at key, None if empty strings.""" + ret: list[float | None] = [] + for val in self.get_all_str(key): + if '' == val: + ret += [None] + else: + try: + ret += [float(val)] + except ValueError as e: + msg = f'cannot float form field value for key {key}: {val}' + raise BadFormatException(msg) from e + return ret + class TaskHandler(BaseHTTPRequestHandler): """Handles single HTTP request.""" @@ -106,20 +162,54 @@ class TaskHandler(BaseHTTPRequestHandler): _form_data: InputsParser _params: InputsParser - def _send_html(self, + def _send_page(self, + ctx: dict[str, Any], tmpl_name: str, - ctx: Mapping[str, object], - code: int = 200) -> None: - """Send HTML as proper HTTP response.""" - tmpl = self.server.jinja.get_template(tmpl_name) - html = tmpl.render(ctx) + code: int = 200 + ) -> None: + """Send ctx as proper HTTP response.""" + body = self.server.render(ctx, tmpl_name) self.send_response(code) + for header_tuple in self.server.headers: + self.send_header(*header_tuple) self.end_headers() - self.wfile.write(bytes(html, 'utf-8')) + self.wfile.write(bytes(body, 'utf-8')) @staticmethod def _request_wrapper(http_method: str, not_found_msg: str ) -> Callable[..., Callable[[TaskHandler], None]]: + """Wrapper for do_GET… and do_POST… handlers, to init and clean up. + + Among other things, conditionally cleans all caches, but only on POST + requests, as only those are expected to change the states of objects + that may be cached, and certainly only those are expected to write any + changes to the database. We want to call them as early though as + possible here, either exactly after the specific request handler + returns successfully, or right after any exception is triggered – + otherwise, race conditions become plausible. + + Note that otherwise any POST attempt, even a failed one, may end in + problematic inconsistencies: + + - if the POST handler experiences an Exception, changes to objects + won't get written to the DB, but the changed objects may remain in + the cache and affect other objects despite their possibly illegal + state + + - even if an object was just saved to the DB, we cannot be sure its + current state is completely identical to what we'd get if loading it + fresh from the DB (e.g. currently Process.n_owners is only updated + when loaded anew via .from_table_row, nor is its state written to + the DB by .save; a questionable design choice, but proof that we + have no guarantee that objects' .save stores all their states we'd + prefer at their most up-to-date. + """ + + def clear_caches() -> None: + for cls in (Day, Todo, Condition, Process, ProcessStep): + assert hasattr(cls, 'empty_cache') + cls.empty_cache() + def decorator(f: Callable[..., str | None] ) -> Callable[[TaskHandler], None]: def wrapper(self: TaskHandler) -> None: @@ -136,6 +226,8 @@ class TaskHandler(BaseHTTPRequestHandler): if hasattr(self, handler_name): handler = getattr(self, handler_name) redir_target = f(self, handler) + if 'POST' == http_method: + clear_caches() if redir_target: self.send_response(302) self.send_header('Location', redir_target) @@ -144,11 +236,10 @@ class TaskHandler(BaseHTTPRequestHandler): msg = f'{not_found_msg}: {self._site}' raise NotFoundException(msg) except HandledException as error: - for cls in (Day, Todo, Condition, Process, ProcessStep): - assert hasattr(cls, 'empty_cache') - cls.empty_cache() + if 'POST' == http_method: + clear_caches() ctx = {'msg': error} - self._send_html('msg.html', ctx, error.http_code) + self._send_page(ctx, 'msg', error.http_code) finally: self.conn.close() return wrapper @@ -158,11 +249,11 @@ class TaskHandler(BaseHTTPRequestHandler): def do_GET(self, handler: Callable[[], str | dict[str, object]] ) -> str | None: """Render page with result of handler, or redirect if result is str.""" - tmpl_name = f'{self._site}.html' - ctx_or_redir = handler() - if isinstance(ctx_or_redir, str): - return ctx_or_redir - self._send_html(tmpl_name, ctx_or_redir) + tmpl_name = f'{self._site}' + ctx_or_redir_target = handler() + if isinstance(ctx_or_redir_target, str): + return ctx_or_redir_target + self._send_page(ctx_or_redir_target, tmpl_name) return None @_request_wrapper('POST', 'Unknown POST target') @@ -178,6 +269,25 @@ class TaskHandler(BaseHTTPRequestHandler): # GET handlers + @staticmethod + def _get_item(target_class: Any + ) -> Callable[..., Callable[[TaskHandler], + dict[str, object]]]: + def decorator(f: Callable[..., dict[str, object]] + ) -> Callable[[TaskHandler], dict[str, object]]: + def wrapper(self: TaskHandler) -> dict[str, object]: + # pylint: disable=protected-access + # (because pylint here fails to detect the use of wrapper as a + # method to self with respective access privileges) + id_ = self._params.get_int_or_none('id') + if target_class.can_create_by_id: + item = target_class.by_id_or_create(self.conn, id_) + else: + item = target_class.by_id(self.conn, id_) + return f(self, item) + return wrapper + return decorator + def do_GET_(self) -> str: """Return redirect target on GET /.""" return '/day' @@ -210,7 +320,7 @@ class TaskHandler(BaseHTTPRequestHandler): def do_GET_day(self) -> dict[str, object]: """Show single Day of ?date=.""" date = self._params.get_str('date', date_in_n_days(0)) - day = Day.by_id(self.conn, date, create=True) + day = Day.by_id_or_create(self.conn, date) make_type = self._params.get_str('make_type') conditions_present = [] enablers_for = {} @@ -236,7 +346,8 @@ class TaskHandler(BaseHTTPRequestHandler): 'conditions_present': conditions_present, 'processes': Process.all(self.conn)} - def do_GET_todo(self) -> dict[str, object]: + @_get_item(Todo) + def do_GET_todo(self, todo: Todo) -> dict[str, object]: """Show single Todo of ?id=.""" @dataclass @@ -287,8 +398,6 @@ class TaskHandler(BaseHTTPRequestHandler): ids = ids | collect_adoptables_keys(node.children) return ids - id_ = self._params.get_int('id') - todo = Todo.by_id(self.conn, id_) todo_steps = [step.todo for step in todo.get_step_tree(set()).children] process_tree = todo.process.get_steps(self.conn, None) steps_todo_to_process: list[TodoStepsNode] = [] @@ -301,7 +410,8 @@ class TaskHandler(BaseHTTPRequestHandler): adoptables: dict[int, list[Todo]] = {} any_adoptables = [Todo.by_id(self.conn, t.id_) for t in Todo.by_date(self.conn, todo.date) - if t != todo] + if t.id_ is not None + and t != todo] for id_ in collect_adoptables_keys(steps_todo_to_process): adoptables[id_] = [t for t in any_adoptables if t.process.id_ == id_] @@ -324,22 +434,7 @@ class TaskHandler(BaseHTTPRequestHandler): todos = [t for t in todos_by_date_range if comment_pattern in t.comment and ((not process_id) or t.process.id_ == process_id)] - if sort_by == 'doneness': - todos.sort(key=lambda t: t.is_done) - elif sort_by == '-doneness': - todos.sort(key=lambda t: t.is_done, reverse=True) - elif sort_by == 'title': - todos.sort(key=lambda t: t.title_then) - elif sort_by == '-title': - todos.sort(key=lambda t: t.title_then, reverse=True) - elif sort_by == 'comment': - todos.sort(key=lambda t: t.comment) - elif sort_by == '-comment': - todos.sort(key=lambda t: t.comment, reverse=True) - elif sort_by == '-date': - todos.sort(key=lambda t: t.date, reverse=True) - else: - todos.sort(key=lambda t: t.date) + sort_by = Todo.sort_by(todos, sort_by) return {'start': start, 'end': end, 'process_id': process_id, 'comment_pattern': comment_pattern, 'todos': todos, 'all_processes': Process.all(self.conn), 'sort_by': sort_by} @@ -347,24 +442,16 @@ class TaskHandler(BaseHTTPRequestHandler): def do_GET_conditions(self) -> dict[str, object]: """Show all Conditions.""" pattern = self._params.get_str('pattern') - conditions = Condition.matching(self.conn, pattern) sort_by = self._params.get_str('sort_by') - if sort_by == 'is_active': - conditions.sort(key=lambda c: c.is_active) - elif sort_by == '-is_active': - conditions.sort(key=lambda c: c.is_active, reverse=True) - elif sort_by == '-title': - conditions.sort(key=lambda c: c.title.newest, reverse=True) - else: - conditions.sort(key=lambda c: c.title.newest) + conditions = Condition.matching(self.conn, pattern) + sort_by = Condition.sort_by(conditions, sort_by) return {'conditions': conditions, 'sort_by': sort_by, 'pattern': pattern} - def do_GET_condition(self) -> dict[str, object]: + @_get_item(Condition) + def do_GET_condition(self, c: Condition) -> dict[str, object]: """Show Condition of ?id=.""" - id_ = self._params.get_int_or_none('id') - c = Condition.by_id(self.conn, id_, create=True) ps = Process.all(self.conn) return {'condition': c, 'is_new': c.id_ is None, 'enabled_processes': [p for p in ps if c in p.conditions], @@ -372,31 +459,35 @@ class TaskHandler(BaseHTTPRequestHandler): 'enabling_processes': [p for p in ps if c in p.enables], 'disabling_processes': [p for p in ps if c in p.disables]} - def do_GET_condition_titles(self) -> dict[str, object]: + @_get_item(Condition) + def do_GET_condition_titles(self, c: Condition) -> dict[str, object]: """Show title history of Condition of ?id=.""" - id_ = self._params.get_int_or_none('id') - condition = Condition.by_id(self.conn, id_) - return {'condition': condition} + return {'condition': c} - def do_GET_condition_descriptions(self) -> dict[str, object]: + @_get_item(Condition) + def do_GET_condition_descriptions(self, c: Condition) -> dict[str, object]: """Show description historys of Condition of ?id=.""" - id_ = self._params.get_int_or_none('id') - condition = Condition.by_id(self.conn, id_) - return {'condition': condition} + return {'condition': c} - def do_GET_process(self) -> dict[str, object]: + @_get_item(Process) + def do_GET_process(self, process: Process) -> dict[str, object]: """Show Process of ?id=.""" - id_ = self._params.get_int_or_none('id') - process = Process.by_id(self.conn, id_, create=True) + owner_ids = self._params.get_all_int('step_to') + owned_ids = self._params.get_all_int('has_step') title_64 = self._params.get_str('title_b64') if title_64: - title = b64decode(title_64.encode()).decode() + try: + title = b64decode(title_64.encode()).decode() + except binascii_Exception as exc: + msg = 'invalid base64 for ?title_b64=' + raise BadFormatException(msg) from exc process.title.set(title) + preset_top_step = None owners = process.used_as_step_by(self.conn) - for step_id in self._params.get_all_int('step_to'): + for step_id in owner_ids: owners += [Process.by_id(self.conn, step_id)] - preset_top_step = None - for process_id in self._params.get_all_int('has_step'): + for process_id in owned_ids: + Process.by_id(self.conn, process_id) # to ensure ID exists preset_top_step = process_id return {'process': process, 'is_new': process.id_ is None, 'preset_top_step': preset_top_step, @@ -405,49 +496,57 @@ class TaskHandler(BaseHTTPRequestHandler): 'process_candidates': Process.all(self.conn), 'condition_candidates': Condition.all(self.conn)} - def do_GET_process_titles(self) -> dict[str, object]: + @_get_item(Process) + def do_GET_process_titles(self, p: Process) -> dict[str, object]: """Show title history of Process of ?id=.""" - id_ = self._params.get_int_or_none('id') - process = Process.by_id(self.conn, id_) - return {'process': process} + return {'process': p} - def do_GET_process_descriptions(self) -> dict[str, object]: + @_get_item(Process) + def do_GET_process_descriptions(self, p: Process) -> dict[str, object]: """Show description historys of Process of ?id=.""" - id_ = self._params.get_int_or_none('id') - process = Process.by_id(self.conn, id_) - return {'process': process} + return {'process': p} - def do_GET_process_efforts(self) -> dict[str, object]: + @_get_item(Process) + def do_GET_process_efforts(self, p: Process) -> dict[str, object]: """Show default effort history of Process of ?id=.""" - id_ = self._params.get_int_or_none('id') - process = Process.by_id(self.conn, id_) - return {'process': process} + return {'process': p} def do_GET_processes(self) -> dict[str, object]: """Show all Processes.""" pattern = self._params.get_str('pattern') - processes = Process.matching(self.conn, pattern) sort_by = self._params.get_str('sort_by') - if sort_by == 'steps': - processes.sort(key=lambda p: len(p.explicit_steps)) - elif sort_by == '-steps': - processes.sort(key=lambda p: len(p.explicit_steps), reverse=True) - elif sort_by == 'owners': - processes.sort(key=lambda p: p.n_owners or 0) - elif sort_by == '-owners': - processes.sort(key=lambda p: p.n_owners or 0, reverse=True) - elif sort_by == 'effort': - processes.sort(key=lambda p: p.effort.newest) - elif sort_by == '-effort': - processes.sort(key=lambda p: p.effort.newest, reverse=True) - elif sort_by == '-title': - processes.sort(key=lambda p: p.title.newest, reverse=True) - else: - processes.sort(key=lambda p: p.title.newest) + processes = Process.matching(self.conn, pattern) + sort_by = Process.sort_by(processes, sort_by) return {'processes': processes, 'sort_by': sort_by, 'pattern': pattern} # POST handlers + @staticmethod + def _delete_or_post(target_class: Any, redir_target: str = '/' + ) -> Callable[..., Callable[[TaskHandler], str]]: + def decorator(f: Callable[..., str] + ) -> Callable[[TaskHandler], str]: + def wrapper(self: TaskHandler) -> str: + # pylint: disable=protected-access + # (because pylint here fails to detect the use of wrapper as a + # method to self with respective access privileges) + id_ = self._params.get_int_or_none('id') + for _ in self._form_data.get_all_str('delete'): + if id_ is None: + msg = 'trying to delete non-saved ' +\ + f'{target_class.__name__}' + raise NotFoundException(msg) + item = target_class.by_id(self.conn, id_) + item.remove(self.conn) + return redir_target + if target_class.can_create_by_id: + item = target_class.by_id_or_create(self.conn, id_) + else: + item = target_class.by_id(self.conn, id_) + return f(self, item) + return wrapper + return decorator + def _change_versioned_timestamps(self, cls: Any, attr_name: str) -> str: """Update history timestamps for VersionedAttribute.""" id_ = self._params.get_int_or_none('id') @@ -458,50 +557,61 @@ class TaskHandler(BaseHTTPRequestHandler): if old[19:] != v: attr.reset_timestamp(old, f'{v}.0') attr.save(self.conn) - cls_name = cls.__name__.lower() - return f'/{cls_name}_{attr_name}s?id={item.id_}' + return f'/{cls.name_lowercase()}_{attr_name}s?id={item.id_}' def do_POST_day(self) -> str: """Update or insert Day of date and Todos mapped to it.""" + # pylint: disable=too-many-locals date = self._params.get_str('date') - day = Day.by_id(self.conn, date, create=True) - day.comment = self._form_data.get_str('day_comment') - day.save(self.conn) + day_comment = self._form_data.get_str('day_comment') make_type = self._form_data.get_str('make_type') - for process_id in sorted(self._form_data.get_all_int('new_todo')): + old_todos = self._form_data.get_all_int('todo_id') + new_todos = self._form_data.get_all_int('new_todo') + comments = self._form_data.get_all_str('comment') + efforts = self._form_data.get_all_floats_or_nones('effort') + done_todos = self._form_data.get_all_int('done') + for _ in [id_ for id_ in done_todos if id_ not in old_todos]: + raise BadFormatException('"done" field refers to unknown Todo') + is_done = [t_id in done_todos for t_id in old_todos] + if not (len(old_todos) == len(is_done) == len(comments) + == len(efforts)): + msg = 'not equal number each of number of todo_id, comments, ' +\ + 'and efforts inputs' + raise BadFormatException(msg) + day = Day.by_id_or_create(self.conn, date) + day.comment = day_comment + day.save(self.conn) + for process_id in sorted(new_todos): if 'empty' == make_type: process = Process.by_id(self.conn, process_id) todo = Todo(None, process, False, date) todo.save(self.conn) else: Todo.create_with_children(self.conn, process_id, date) - done_ids = self._form_data.get_all_int('done') - comments = self._form_data.get_all_str('comment') - efforts = self._form_data.get_all_str('effort') - for i, todo_id in enumerate(self._form_data.get_all_int('todo_id')): + for i, todo_id in enumerate(old_todos): todo = Todo.by_id(self.conn, todo_id) - todo.is_done = todo_id in done_ids - if len(comments) > 0: - todo.comment = comments[i] - if len(efforts) > 0: - todo.effort = float(efforts[i]) if efforts[i] else None + todo.is_done = is_done[i] + todo.comment = comments[i] + todo.effort = efforts[i] todo.save(self.conn) return f'/day?date={date}&make_type={make_type}' - def do_POST_todo(self) -> str: + @_delete_or_post(Todo, '/') + def do_POST_todo(self, todo: Todo) -> str: """Update Todo and its children.""" # pylint: disable=too-many-locals - # pylint: disable=too-many-branches - id_ = self._params.get_int('id') - for _ in self._form_data.get_all_str('delete'): - todo = Todo .by_id(self.conn, id_) - todo.remove(self.conn) - return '/' - todo = Todo.by_id(self.conn, id_) adopted_child_ids = self._form_data.get_all_int('adopt') processes_to_make_full = self._form_data.get_all_int('make_full') processes_to_make_empty = self._form_data.get_all_int('make_empty') fill_fors = self._form_data.get_first_strings_starting('fill_for_') + effort = self._form_data.get_str('effort', ignore_strict=True) + conditions = self._form_data.get_all_int('conditions') + disables = self._form_data.get_all_int('disables') + blockers = self._form_data.get_all_int('blockers') + enables = self._form_data.get_all_int('enables') + is_done = len(self._form_data.get_all_str('done')) > 0 + calendarize = len(self._form_data.get_all_str('calendarize')) > 0 + comment = self._form_data.get_str('comment', ignore_strict=True) for v in fill_fors.values(): if v.startswith('make_empty_'): processes_to_make_empty += [int(v[11:])] @@ -530,16 +640,14 @@ class TaskHandler(BaseHTTPRequestHandler): for process_id in processes_to_make_full: made = Todo.create_with_children(self.conn, process_id, todo.date) todo.add_child(made) - effort = self._form_data.get_str('effort', ignore_strict=True) todo.effort = float(effort) if effort else None - todo.set_conditions(self.conn, - self._form_data.get_all_int('condition')) - todo.set_blockers(self.conn, self._form_data.get_all_int('blocker')) - todo.set_enables(self.conn, self._form_data.get_all_int('enables')) - todo.set_disables(self.conn, self._form_data.get_all_int('disables')) - todo.is_done = len(self._form_data.get_all_str('done')) > 0 - todo.calendarize = len(self._form_data.get_all_str('calendarize')) > 0 - todo.comment = self._form_data.get_str('comment', ignore_strict=True) + todo.set_conditions(self.conn, conditions) + todo.set_blockers(self.conn, blockers) + todo.set_enables(self.conn, enables) + todo.set_disables(self.conn, disables) + todo.is_done = is_done + todo.calendarize = calendarize + todo.comment = comment todo.save(self.conn) return f'/todo?id={todo.id_}' @@ -555,60 +663,70 @@ class TaskHandler(BaseHTTPRequestHandler): """Update history timestamps for Process.title.""" return self._change_versioned_timestamps(Process, 'title') - def do_POST_process(self) -> str: + @_delete_or_post(Process, '/processes') + def do_POST_process(self, process: Process) -> str: """Update or insert Process of ?id= and fields defined in postvars.""" - # pylint: disable=too-many-branches - id_ = self._params.get_int_or_none('id') - for _ in self._form_data.get_all_str('delete'): - process = Process.by_id(self.conn, id_) - process.remove(self.conn) - return '/processes' - process = Process.by_id(self.conn, id_, create=True) - process.title.set(self._form_data.get_str('title')) - process.description.set(self._form_data.get_str('description')) - process.effort.set(self._form_data.get_float('effort')) - process.set_conditions(self.conn, - self._form_data.get_all_int('condition')) - process.set_blockers(self.conn, self._form_data.get_all_int('blocker')) - process.set_enables(self.conn, self._form_data.get_all_int('enables')) - process.set_disables(self.conn, - self._form_data.get_all_int('disables')) - process.calendarize = self._form_data.get_all_str('calendarize') != [] + # pylint: disable=too-many-locals + # pylint: disable=too-many-statements + title = self._form_data.get_str('title') + description = self._form_data.get_str('description') + effort = self._form_data.get_float('effort') + conditions = self._form_data.get_all_int('conditions') + blockers = self._form_data.get_all_int('blockers') + enables = self._form_data.get_all_int('enables') + disables = self._form_data.get_all_int('disables') + calendarize = self._form_data.get_all_str('calendarize') != [] + suppresses = self._form_data.get_all_int('suppresses') + step_of = self._form_data.get_all_str('step_of') + keep_steps = self._form_data.get_all_int('keep_step') + step_ids = self._form_data.get_all_int('steps') + new_top_steps = self._form_data.get_all_str('new_top_step') + step_process_id_to = {} + step_parent_id_to = {} + new_steps_to = {} + for step_id in step_ids: + name = f'new_step_to_{step_id}' + new_steps_to[step_id] = self._form_data.get_all_int(name) + for step_id in keep_steps: + name = f'step_{step_id}_process_id' + step_process_id_to[step_id] = self._form_data.get_int(name) + name = f'step_{step_id}_parent_id' + step_parent_id_to[step_id] = self._form_data.get_int_or_none(name) + process.title.set(title) + process.description.set(description) + process.effort.set(effort) + process.set_conditions(self.conn, conditions) + process.set_blockers(self.conn, blockers) + process.set_enables(self.conn, enables) + process.set_disables(self.conn, disables) + process.calendarize = calendarize process.save(self.conn) assert isinstance(process.id_, int) + new_step_title = None steps: list[ProcessStep] = [] - for step_id in self._form_data.get_all_int('keep_step'): - if step_id not in self._form_data.get_all_int('steps'): + for step_id in keep_steps: + if step_id not in step_ids: raise BadFormatException('trying to keep unknown step') - for step_id in self._form_data.get_all_int('steps'): - if step_id not in self._form_data.get_all_int('keep_step'): - continue - step_process_id = self._form_data.get_int( - f'step_{step_id}_process_id') - parent_id = self._form_data.get_int_or_none( - f'step_{step_id}_parent_id') - steps += [ProcessStep(step_id, process.id_, step_process_id, - parent_id)] - for step_id in self._form_data.get_all_int('steps'): - for step_process_id in self._form_data.get_all_int( - f'new_step_to_{step_id}'): - steps += [ProcessStep(None, process.id_, step_process_id, - step_id)] - new_step_title = None - for step_identifier in self._form_data.get_all_str('new_top_step'): + step = ProcessStep(step_id, process.id_, + step_process_id_to[step_id], + step_parent_id_to[step_id]) + steps += [step] + for step_id in step_ids: + new = [ProcessStep(None, process.id_, step_process_id, step_id) + for step_process_id in new_steps_to[step_id]] + steps += new + for step_identifier in new_top_steps: try: step_process_id = int(step_identifier) - steps += [ProcessStep(None, process.id_, step_process_id, - None)] + step = ProcessStep(None, process.id_, step_process_id, None) + steps += [step] except ValueError: new_step_title = step_identifier process.set_steps(self.conn, steps) - process.set_step_suppressions(self.conn, - self._form_data. - get_all_int('suppresses')) + process.set_step_suppressions(self.conn, suppresses) owners_to_set = [] new_owner_title = None - for owner_identifier in self._form_data.get_all_str('step_of'): + for owner_identifier in step_of: try: owners_to_set += [int(owner_identifier)] except ValueError: @@ -632,16 +750,14 @@ class TaskHandler(BaseHTTPRequestHandler): """Update history timestamps for Condition.title.""" return self._change_versioned_timestamps(Condition, 'title') - def do_POST_condition(self) -> str: + @_delete_or_post(Condition, '/conditions') + def do_POST_condition(self, condition: Condition) -> str: """Update/insert Condition of ?id= and fields defined in postvars.""" - id_ = self._params.get_int_or_none('id') - for _ in self._form_data.get_all_str('delete'): - condition = Condition.by_id(self.conn, id_) - condition.remove(self.conn) - return '/conditions' - condition = Condition.by_id(self.conn, id_, create=True) - condition.is_active = self._form_data.get_all_str('is_active') != [] - condition.title.set(self._form_data.get_str('title')) - condition.description.set(self._form_data.get_str('description')) + is_active = self._form_data.get_str('is_active') == 'True' + title = self._form_data.get_str('title') + description = self._form_data.get_str('description') + condition.is_active = is_active + condition.title.set(title) + condition.description.set(description) condition.save(self.conn) return f'/condition?id={condition.id_}' diff --git a/plomtask/processes.py b/plomtask/processes.py index 06ee4ba..bb1de3a 100644 --- a/plomtask/processes.py +++ b/plomtask/processes.py @@ -33,7 +33,13 @@ class Process(BaseModel[int], ConditionsRelations): ('process_disables', 'process', 'disables', 0), ('process_step_suppressions', 'process', 'suppressed_steps', 0)] + add_to_dict = ['explicit_steps'] to_search = ['title.newest', 'description.newest'] + can_create_by_id = True + sorters = {'steps': lambda p: len(p.explicit_steps), + 'owners': lambda p: p.n_owners, + 'effort': lambda p: p.effort.newest, + 'title': lambda p: p.title.newest} def __init__(self, id_: int | None, calendarize: bool = False) -> None: BaseModel.__init__(self, id_) @@ -51,11 +57,7 @@ class Process(BaseModel[int], ConditionsRelations): row: Row | list[Any]) -> Process: """Make from DB row, with dependencies.""" process = super().from_table_row(db_conn, row) - assert isinstance(process.id_, int) - for name in ('title', 'description', 'effort'): - table = f'process_{name}s' - for row_ in db_conn.row_where(table, 'parent', process.id_): - getattr(process, name).history_from_row(row_) + assert process.id_ is not None for name in ('conditions', 'blockers', 'enables', 'disables'): table = f'process_{name}' assert isinstance(process.id_, int) @@ -218,7 +220,7 @@ class ProcessStep(BaseModel[int]): self.parent_step_id = parent_step_id def save(self, db_conn: DatabaseConnection) -> None: - """Remove from DB, and owner's .explicit_steps.""" + """Update into DB/cache, and owner's .explicit_steps.""" super().save(db_conn) owner = Process.by_id(db_conn, self.owner_id) if self not in owner.explicit_steps: diff --git a/plomtask/todos.py b/plomtask/todos.py index 705bd72..f5388b5 100644 --- a/plomtask/todos.py +++ b/plomtask/todos.py @@ -1,6 +1,5 @@ """Actionables.""" from __future__ import annotations -from dataclasses import dataclass from typing import Any, Set from sqlite3 import Row from plomtask.db import DatabaseConnection, BaseModel @@ -12,13 +11,28 @@ from plomtask.exceptions import (NotFoundException, BadFormatException, from plomtask.dating import valid_date -@dataclass class TodoNode: """Collects what's useful to know for Todo/Condition tree display.""" + # pylint: disable=too-few-public-methods todo: Todo seen: bool children: list[TodoNode] + def __init__(self, + todo: Todo, + seen: bool, + children: list[TodoNode]) -> None: + self.todo = todo + self.seen = seen + self.children = children + + @property + def as_dict(self) -> dict[str, object]: + """Return self as (json.dumps-coompatible) dict.""" + return {'todo': self.todo.id_, + 'seen': self.seen, + 'children': [c.as_dict for c in self.children]} + class Todo(BaseModel[int], ConditionsRelations): """Individual actionable.""" @@ -37,6 +51,10 @@ class Todo(BaseModel[int], ConditionsRelations): days_to_update: Set[str] = set() children: list[Todo] parents: list[Todo] + sorters = {'doneness': lambda t: t.is_done, + 'title': lambda t: t.title_then, + 'comment': lambda t: t.comment, + 'date': lambda t: t.date} # pylint: disable=too-many-arguments def __init__(self, id_: int | None, diff --git a/plomtask/versioned_attributes.py b/plomtask/versioned_attributes.py index cbd1c8e..8861c98 100644 --- a/plomtask/versioned_attributes.py +++ b/plomtask/versioned_attributes.py @@ -4,7 +4,8 @@ from typing import Any from sqlite3 import Row from time import sleep from plomtask.db import DatabaseConnection -from plomtask.exceptions import HandledException, BadFormatException +from plomtask.exceptions import (HandledException, BadFormatException, + NotFoundException) TIMESTAMP_FMT = '%Y-%m-%d %H:%M:%S.%f' @@ -98,6 +99,8 @@ class VersionedAttribute: def save(self, db_conn: DatabaseConnection) -> None: """Save as self.history entries, but first wipe old ones.""" + if self.parent.id_ is None: + raise NotFoundException('cannot save attribute to parent if no ID') db_conn.rewrite_relations(self.table_name, 'parent', self.parent.id_, [[item[0], item[1]] for item in self.history.items()]) diff --git a/scripts/pre-commit b/scripts/pre-commit index c92a5eb..e448035 100755 --- a/scripts/pre-commit +++ b/scripts/pre-commit @@ -1,5 +1,6 @@ #!/bin/sh set -e +# for dir in $(echo 'tests'); do for dir in $(echo '.' 'plomtask' 'tests'); do echo "Running mypy on ${dir}/ …." python3 -m mypy --strict ${dir}/*.py @@ -11,6 +12,6 @@ done echo "Running unittest-parallel on tests/." unittest-parallel -t . -s tests/ -p '*.py' set +e -rm test_db:*.* +rm test_db:* set -e exit 0 diff --git a/templates/day.html b/templates/day.html index f980cd1..59cf55b 100644 --- a/templates/day.html +++ b/templates/day.html @@ -7,20 +7,20 @@ th { border: 1px solid black; } -td.cond_line_0, td.cond_line_1, td.cond_line_2 { +td.cond_line { padding: 0; border-top: 1px solid white; } -td.cond_line_0 { +td.cond_0 { background-color: #bfbfbf; } -td.cond_line_1 { +td.cond_1 { background-color: #dfdfdf; } -td.cond_line_2 { - background-color: #fffff; +td.cond_2 { + background-color: fffff; } -td.cond_line_corner { +td.cond_shrink { max-width: 0px; white-space: nowrap; overflow: hidden; @@ -58,11 +58,15 @@ input.ablers { {% endif %} {% for condition in conditions_present %} - {% if condition in node.todo.conditions and not condition.is_active %} -O  + ++> {% elif condition in node.todo.blockers and condition.is_active %} -!  + +-> +{% else %} + +| {% endif %} {% endfor %} @@ -80,7 +84,17 @@ O  {% for condition in conditions_present|reverse %} -{% if condition in node.todo.enables %} +{% elif condition in node.todo.disables %} !{% endif %} +{% if condition in node.todo.enables %} + ++> +{% elif condition in node.todo.disables %} + +-> +{% else %} + + | +{% endif %} + {% endfor %} @@ -142,10 +156,12 @@ comment: add:

+make new todos +descendants (i.e. adopt where possible, otherwise create anew)

@@ -162,25 +178,25 @@ add: {% for _ in conditions_present %} {% if outer_loop.index > loop.index %} - {% endfor %} - - + + {% for _ in conditions_present %} {% if outer_loop.index0 + loop.index < conditions_present|length %} - {% for condition in conditions_present %} - + {% endfor %} {% for condition in conditions_present %} - + {% endfor %} diff --git a/templates/process.html b/templates/process.html index 60687d8..7bb503e 100644 --- a/templates/process.html +++ b/templates/process.html @@ -97,11 +97,11 @@ edit process of ID {{process.id_}} - + - + diff --git a/templates/todo.html b/templates/todo.html index c2fb01d..fea931a 100644 --- a/templates/todo.html +++ b/templates/todo.html @@ -75,11 +75,11 @@ select{ font-size: 0.5em; margin: 0; padding: 0; } - + - + diff --git a/tests/conditions.py b/tests/conditions.py index 3b05bd0..9b3a403 100644 --- a/tests/conditions.py +++ b/tests/conditions.py @@ -9,7 +9,6 @@ from plomtask.exceptions import HandledException class TestsSansDB(TestCaseSansDB): """Tests requiring no DB setup.""" checked_class = Condition - do_id_test = True versioned_defaults_to_test = {'title': 'UNNAMED', 'description': ''} @@ -19,31 +18,9 @@ class TestsWithDB(TestCaseWithDB): default_init_kwargs = {'is_active': False} test_versioneds = {'title': str, 'description': str} - def test_Condition_from_table_row(self) -> None: - """Test .from_table_row() properly reads in class from DB""" - self.check_from_table_row() - self.check_versioned_from_table_row('title', str) - self.check_versioned_from_table_row('description', str) - - def test_Condition_by_id(self) -> None: - """Test .by_id(), including creation.""" - self.check_by_id() - - def test_Condition_all(self) -> None: - """Test .all().""" - self.check_all() - - def test_Condition_singularity(self) -> None: - """Test pointers made for single object keep pointing to it.""" - self.check_singularity('is_active', True) - - def test_Condition_versioned_attributes_singularity(self) -> None: - """Test behavior of VersionedAttributes on saving (with .title).""" - self.check_versioned_singularity() - - def test_Condition_remove(self) -> None: + def test_remove(self) -> None: """Test .remove() effects on DB and cache.""" - self.check_remove() + super().test_remove() proc = Process(None) proc.save(self.db_conn) todo = Todo(None, proc, False, '2024-01-01') @@ -65,20 +42,147 @@ class TestsWithDB(TestCaseWithDB): class TestsWithServer(TestCaseWithServer): """Module tests against our HTTP server/handler (and database).""" - def test_do_POST_condition(self) -> None: - """Test POST /condition and its effect on the database.""" - form_data = {'title': 'foo', 'description': 'foo'} - self.check_post(form_data, '/condition', 302, '/condition?id=1') - self.assertEqual(1, len(Condition.all(self.db_conn))) - form_data['delete'] = '' - self.check_post(form_data, '/condition?id=', 404) - self.check_post(form_data, '/condition?id=2', 404) - self.check_post(form_data, '/condition?id=1', 302, '/conditions') - self.assertEqual(0, len(Condition.all(self.db_conn))) + @classmethod + def GET_condition_dict(cls, cond: dict[str, object]) -> dict[str, object]: + """Return JSON of GET /condition to expect.""" + return {'is_new': False, + 'enabled_processes': [], + 'disabled_processes': [], + 'enabling_processes': [], + 'disabling_processes': [], + 'condition': cond['id'], + '_library': {'Condition': cls.as_refs([cond])}} - def test_do_GET(self) -> None: - """Test /condition and /conditions response codes.""" - form_data = {'title': 'foo', 'description': 'foo'} - self.check_post(form_data, '/condition', 302, '/condition?id=1') + @classmethod + def GET_conditions_dict(cls, conds: list[dict[str, object]] + ) -> dict[str, object]: + """Return JSON of GET /conditions to expect.""" + library = {'Condition': cls.as_refs(conds)} if conds else {} + d: dict[str, object] = {'conditions': cls.as_id_list(conds), + 'sort_by': 'title', + 'pattern': '', + '_library': library} + return d + + def test_fail_POST_condition(self) -> None: + """Test malformed/illegal POST /condition requests.""" + # check invalid POST payloads + url = '/condition' + self.check_post({}, url, 400) + self.check_post({'title': ''}, url, 400) + self.check_post({'title': '', 'description': ''}, url, 400) + self.check_post({'title': '', 'is_active': False}, url, 400) + self.check_post({'description': '', 'is_active': False}, url, 400) + # check valid POST payload on bad paths + valid_payload = {'title': '', 'description': '', 'is_active': False} + self.check_post(valid_payload, '/condition?id=foo', 400) + + def test_POST_condition(self) -> None: + """Test (valid) POST /condition and its effect on GET /condition[s].""" + # test valid POST's effect on … + post = {'title': 'foo', 'description': 'oof', 'is_active': False} + self.check_post(post, '/condition', 302, '/condition?id=1') + # … single /condition + cond = self.cond_as_dict(titles=['foo'], descriptions=['oof']) + assert isinstance(cond['_versioned'], dict) + expected_single = self.GET_condition_dict(cond) + self.check_json_get('/condition?id=1', expected_single) + # … full /conditions + expected_all = self.GET_conditions_dict([cond]) + self.check_json_get('/conditions', expected_all) + # test (no) effect of invalid POST to existing Condition on /condition + self.check_post({}, '/condition?id=1', 400) + self.check_json_get('/condition?id=1', expected_single) + # test effect of POST changing title and activeness + post = {'title': 'bar', 'description': 'oof', 'is_active': True} + self.check_post(post, '/condition?id=1', 302) + cond['_versioned']['title'][1] = 'bar' + cond['is_active'] = True + self.check_json_get('/condition?id=1', expected_single) + # test deletion POST's effect on … + self.check_post({'delete': ''}, '/condition?id=1', 302, '/conditions') + cond = self.cond_as_dict() + assert isinstance(expected_single['_library'], dict) + expected_single['_library']['Condition'] = self.as_refs([cond]) + self.check_json_get('/condition?id=1', expected_single) + # … full /conditions + expected_all['conditions'] = [] + expected_all['_library'] = {} + self.check_json_get('/conditions', expected_all) + + def test_GET_condition(self) -> None: + """More GET /condition testing, especially for Process relations.""" + # check expected default status codes self.check_get_defaults('/condition') - self.check_get('/conditions', 200) + # make Condition and two Processes that among them establish all + # possible ConditionsRelations to it, … + cond_post = {'title': 'foo', 'description': 'oof', 'is_active': False} + self.check_post(cond_post, '/condition', 302, '/condition?id=1') + proc1_post = {'title': 'A', 'description': '', 'effort': 1.0, + 'conditions': [1], 'disables': [1]} + proc2_post = {'title': 'B', 'description': '', 'effort': 1.0, + 'enables': [1], 'blockers': [1]} + self.post_process(1, proc1_post) + self.post_process(2, proc2_post) + # … then check /condition displays all these properly. + cond = self.cond_as_dict(titles=['foo'], descriptions=['oof']) + assert isinstance(cond['id'], int) + proc1 = self.proc_as_dict(conditions=[cond['id']], + disables=[cond['id']]) + proc2 = self.proc_as_dict(2, 'B', + blockers=[cond['id']], + enables=[cond['id']]) + expected = self.GET_condition_dict(cond) + assert isinstance(expected['_library'], dict) + expected['enabled_processes'] = self.as_id_list([proc1]) + expected['disabled_processes'] = self.as_id_list([proc2]) + expected['enabling_processes'] = self.as_id_list([proc2]) + expected['disabling_processes'] = self.as_id_list([proc1]) + expected['_library']['Process'] = self.as_refs([proc1, proc2]) + self.check_json_get('/condition?id=1', expected) + + def test_GET_conditions(self) -> None: + """Test GET /conditions.""" + # test empty result on empty DB, default-settings on empty params + expected = self.GET_conditions_dict([]) + self.check_json_get('/conditions', expected) + # test on meaningless non-empty params (incl. entirely un-used key), + # that 'sort_by' default to 'title' (even if set to something else, as + # long as without handler) and 'pattern' get preserved + expected['pattern'] = 'bar' # preserved despite zero effect! + url = '/conditions?sort_by=foo&pattern=bar&foo=x' + self.check_json_get(url, expected) + # test non-empty result, automatic (positive) sorting by title + post1 = {'is_active': False, 'title': 'foo', 'description': 'oof'} + post2 = {'is_active': False, 'title': 'bar', 'description': 'rab'} + post3 = {'is_active': True, 'title': 'baz', 'description': 'zab'} + self.check_post(post1, '/condition', 302, '/condition?id=1') + self.check_post(post2, '/condition', 302, '/condition?id=2') + self.check_post(post3, '/condition', 302, '/condition?id=3') + cond1 = self.cond_as_dict(1, False, ['foo'], ['oof']) + cond2 = self.cond_as_dict(2, False, ['bar'], ['rab']) + cond3 = self.cond_as_dict(3, True, ['baz'], ['zab']) + expected = self.GET_conditions_dict([cond2, cond3, cond1]) + self.check_json_get('/conditions', expected) + # test other sortings + # (NB: by .is_active has two items of =False, their order currently + # is not explicitly made predictable, so mail fail until we do) + expected['sort_by'] = '-title' + expected['conditions'] = self.as_id_list([cond1, cond3, cond2]) + self.check_json_get('/conditions?sort_by=-title', expected) + expected['sort_by'] = 'is_active' + expected['conditions'] = self.as_id_list([cond1, cond2, cond3]) + self.check_json_get('/conditions?sort_by=is_active', expected) + expected['sort_by'] = '-is_active' + expected['conditions'] = self.as_id_list([cond3, cond1, cond2]) + self.check_json_get('/conditions?sort_by=-is_active', expected) + # test pattern matching on title + expected = self.GET_conditions_dict([cond2, cond3]) + expected['pattern'] = 'ba' + self.check_json_get('/conditions?pattern=ba', expected) + # test pattern matching on description + assert isinstance(expected['_library'], dict) + expected['conditions'] = self.as_id_list([cond1]) + expected['_library']['Condition'] = self.as_refs([cond1]) + expected['pattern'] = 'of' + self.check_json_get('/conditions?pattern=of', expected) diff --git a/tests/days.py b/tests/days.py index 286f758..8e3768c 100644 --- a/tests/days.py +++ b/tests/days.py @@ -1,24 +1,24 @@ """Test Days module.""" from unittest import TestCase from datetime import datetime +from typing import Callable from tests.utils import TestCaseWithDB, TestCaseWithServer from plomtask.dating import date_in_n_days from plomtask.days import Day -from plomtask.exceptions import BadFormatException class TestsSansDB(TestCase): """Days module tests not requiring DB setup.""" + legal_ids = ['2024-01-01'] + illegal_ids = ['foo', '2024-02-30', '2024-02-01 23:00:00'] - def test_Day_valid_date(self) -> None: - """Test Day's date format validation and parsing.""" - with self.assertRaises(BadFormatException): - Day('foo') - with self.assertRaises(BadFormatException): - Day('2024-02-30') - with self.assertRaises(BadFormatException): - Day('2024-02-01 23:00:00') - self.assertEqual(datetime(2024, 1, 1), Day('2024-01-01').datetime) + def test_Day_datetime_weekday_neighbor_dates(self) -> None: + """Test Day's date parsing.""" + self.assertEqual(datetime(2024, 5, 1), Day('2024-05-01').datetime) + self.assertEqual('Sunday', Day('2024-03-17').weekday) + self.assertEqual('March', Day('2024-03-17').month_name) + self.assertEqual('2023-12-31', Day('2024-01-01').prev_date) + self.assertEqual('2023-03-01', Day('2023-02-28').next_date) def test_Day_sorting(self) -> None: """Test sorting by .__lt__ and Day.__eq__.""" @@ -28,43 +28,21 @@ class TestsSansDB(TestCase): days = [day3, day1, day2] self.assertEqual(sorted(days), [day1, day2, day3]) - def test_Day_weekday(self) -> None: - """Test Day.weekday.""" - self.assertEqual(Day('2024-03-17').weekday, 'Sunday') - - def test_Day_neighbor_dates(self) -> None: - """Test Day.prev_date and Day.next_date.""" - self.assertEqual(Day('2024-01-01').prev_date, '2023-12-31') - self.assertEqual(Day('2023-02-28').next_date, '2023-03-01') - class TestsWithDB(TestCaseWithDB): """Tests requiring DB, but not server setup.""" checked_class = Day default_ids = ('2024-01-01', '2024-01-02', '2024-01-03') - def test_saving_and_caching(self) -> None: - """Test storage of instances. - - We don't use the parent class's method here because the checked class - has too different a handling of IDs. - """ - kwargs = {'date': self.default_ids[0], 'comment': 'foo'} - self.check_saving_and_caching(**kwargs) - - def test_Day_from_table_row(self) -> None: - """Test .from_table_row() properly reads in class from DB""" - self.check_from_table_row() - - def test_Day_by_id(self) -> None: - """Test .by_id().""" - self.check_by_id() - def test_Day_by_date_range_filled(self) -> None: """Test Day.by_date_range_filled.""" date1, date2, date3 = self.default_ids - day1, day2, day3 = self.check_all() - # check date range is a closed interval + day1 = Day(date1) + day2 = Day(date2) + day3 = Day(date3) + for day in [day1, day2, day3]: + day.save(self.db_conn) + # check date range includes limiter days self.assertEqual(Day.by_date_range_filled(self.db_conn, date1, date3), [day1, day2, day3]) # check first date range value excludes what's earlier @@ -85,41 +63,313 @@ class TestsWithDB(TestCaseWithDB): self.assertEqual(Day.by_date_range_filled(self.db_conn, day5.date, day7.date), [day5, day6, day7]) - self.check_storage([day1, day2, day3, day6]) + self.check_identity_with_cache_and_db([day1, day2, day3, day6]) # check 'today' is interpreted as today's date today = Day(date_in_n_days(0)) - today.save(self.db_conn) self.assertEqual(Day.by_date_range_filled(self.db_conn, 'today', 'today'), [today]) - - def test_Day_remove(self) -> None: - """Test .remove() effects on DB and cache.""" - self.check_remove() - - def test_Day_singularity(self) -> None: - """Test pointers made for single object keep pointing to it.""" - self.check_singularity('day_comment', 'boo') + prev_day = Day(date_in_n_days(-1)) + next_day = Day(date_in_n_days(1)) + self.assertEqual(Day.by_date_range_filled(self.db_conn, + 'yesterday', 'tomorrow'), + [prev_day, today, next_day]) class TestsWithServer(TestCaseWithServer): """Tests against our HTTP server/handler (and database).""" - def test_do_GET(self) -> None: - """Test /day and /calendar response codes, and / redirect.""" - self.check_get('/day', 200) - self.check_get('/day?date=3000-01-01', 200) - self.check_get('/day?date=FOO', 400) - self.check_get('/calendar', 200) - self.check_get('/calendar?start=&end=', 200) - self.check_get('/calendar?start=today&end=today', 200) - self.check_get('/calendar?start=2024-01-01&end=2025-01-01', 200) - self.check_get('/calendar?start=foo', 400) + @classmethod + def GET_day_dict(cls, date: str) -> dict[str, object]: + """Return JSON of GET /day to expect.""" + # day: dict[str, object] = {'id': date, 'comment': '', 'todos': []} + day = cls._day_as_dict(date) + d: dict[str, object] = {'day': date, + 'top_nodes': [], + 'make_type': '', + 'enablers_for': {}, + 'disablers_for': {}, + 'conditions_present': [], + 'processes': [], + '_library': {'Day': cls.as_refs([day])}} + return d + + @classmethod + def GET_calendar_dict(cls, start: int, end: int) -> dict[str, object]: + """Return JSON of GET /calendar to expect.""" + today_date = date_in_n_days(0) + start_date = date_in_n_days(start) + end_date = date_in_n_days(end) + dates = [date_in_n_days(i) for i in range(start, end+1)] + days = [cls._day_as_dict(d) for d in dates] + library = {'Day': cls.as_refs(days)} if len(days) > 0 else {} + return {'today': today_date, 'start': start_date, 'end': end_date, + 'days': dates, '_library': library} + + @staticmethod + def _todo_as_dict(id_: int = 1, + process_id: int = 1, + date: str = '2024-01-01', + conditions: None | list[int] = None, + disables: None | list[int] = None, + blockers: None | list[int] = None, + enables: None | list[int] = None + ) -> dict[str, object]: + """Return JSON of Todo to expect.""" + # pylint: disable=too-many-arguments + d = {'id': id_, + 'date': date, + 'process_id': process_id, + 'is_done': False, + 'calendarize': False, + 'comment': '', + 'children': [], + 'parents': [], + 'effort': None, + 'conditions': conditions if conditions else [], + 'disables': disables if disables else [], + 'blockers': blockers if blockers else [], + 'enables': enables if enables else []} + return d - def test_do_POST_day(self) -> None: - """Test POST /day.""" - form_data = {'day_comment': '', 'make_type': 'full'} - self.check_post(form_data, '/day', 400) - self.check_post(form_data, '/day?date=foo', 400) - self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302) - self.check_post({'foo': ''}, '/day?date=2024-01-01', 400) + @staticmethod + def _todo_node_as_dict(todo_id: int) -> dict[str, object]: + """Return JSON of TodoNode to expect.""" + return {'children': [], 'seen': False, 'todo': todo_id} + + @staticmethod + def _day_as_dict(date: str) -> dict[str, object]: + return {'id': date, 'comment': '', 'todos': []} + + @staticmethod + def _post_batch(list_of_args: list[list[object]], + names_of_simples: list[str], + names_of_versioneds: list[str], + f_as_dict: Callable[..., dict[str, object]], + f_to_post: Callable[..., None | dict[str, object]] + ) -> list[dict[str, object]]: + """Post expected=f_as_dict(*args) as input to f_to_post, for many.""" + expecteds = [] + for args in list_of_args: + expecteds += [f_as_dict(*args)] + for expected in expecteds: + assert isinstance(expected['_versioned'], dict) + post = {} + for name in names_of_simples: + post[name] = expected[name] + for name in names_of_versioneds: + post[name] = expected['_versioned'][name][0] + f_to_post(expected['id'], post) + return expecteds + + def _post_day(self, params: str = '', + form_data: None | dict[str, object] = None, + redir_to: str = '', + status: int = 302, + ) -> None: + """POST /day?{params} with form_data.""" + if not form_data: + form_data = {'day_comment': '', 'make_type': ''} + target = f'/day?{params}' + if not redir_to: + redir_to = f'{target}&make_type={form_data["make_type"]}' + self.check_post(form_data, target, status, redir_to) + + def test_basic_GET_day(self) -> None: + """Test basic (no Processes/Conditions/Todos) GET /day basics.""" + # check illegal date parameters + self.check_get('/day?date=foo', 400) + self.check_get('/day?date=2024-02-30', 400) + # check undefined day + date = date_in_n_days(0) + expected = self.GET_day_dict(date) + self.check_json_get('/day', expected) + # NB: GET ?date="today"/"yesterday"/"tomorrow" in test_basic_POST_day + # check 'make_type' GET parameter affects immediate reply, but … + date = '2024-01-01' + expected = self.GET_day_dict(date) + expected['make_type'] = 'bar' + self.check_json_get(f'/day?date={date}&make_type=bar', expected) + # … not any following, … + expected['make_type'] = '' + self.check_json_get(f'/day?date={date}', expected) + # … not even when part of a POST request + post: dict[str, object] = {'day_comment': '', 'make_type': 'foo'} + self._post_day(f'date={date}', post) + self.check_json_get(f'/day?date={date}', expected) + + def test_fail_POST_day(self) -> None: + """Test malformed/illegal POST /day requests.""" + # check payloads lacking minimum expecteds + url = '/day?date=2024-01-01' + self.check_post({}, url, 400) + self.check_post({'day_comment': ''}, url, 400) + self.check_post({'make_type': ''}, url, 400) + # to next check illegal new_todo values, we need an actual Process + self.post_process(1) + # check illegal new_todo values + post: dict[str, object] + post = {'make_type': '', 'day_comment': '', 'new_todo': ['foo']} + self.check_post(post, url, 400) + post['new_todo'] = [1, 2] # no Process of .id_=2 exists + # to next check illegal old_todo inputs, we need to first post Todo + post['new_todo'] = [1] + self.check_post(post, url, 302, '/day?date=2024-01-01&make_type=') + # check illegal old_todo inputs (equal list lengths though) + post = {'make_type': '', 'day_comment': '', 'comment': ['foo'], + 'effort': [3.3], 'done': [], 'todo_id': [1]} + self.check_post(post, url, 302, '/day?date=2024-01-01&make_type=') + post['todo_id'] = [2] # reference to non-existant Process + self.check_post(post, url, 404) + post['todo_id'] = ['a'] + self.check_post(post, url, 400) + post['todo_id'] = [1] + post['done'] = ['foo'] + self.check_post(post, url, 400) + post['done'] = [2] # reference to non-posted todo_id + self.check_post(post, url, 400) + post['done'] = [] + post['effort'] = ['foo'] + self.check_post(post, url, 400) + post['effort'] = [None] + self.check_post(post, url, 400) + post['effort'] = [3.3] + # check illegal old_todo inputs: unequal list lengths + post['comment'] = [] + self.check_post(post, url, 400) + post['comment'] = ['foo', 'foo'] + self.check_post(post, url, 400) + post['comment'] = ['foo'] + post['effort'] = [] + self.check_post(post, url, 400) + post['effort'] = [3.3, 3.3] + self.check_post(post, url, 400) + post['effort'] = [3.3] + post['todo_id'] = [1, 1] + self.check_post(post, url, 400) + post['todo_id'] = [1] + # # check valid POST payload on bad paths + self.check_post(post, '/day', 400) + self.check_post(post, '/day?date=', 400) + self.check_post(post, '/day?date=foo', 400) + + def test_basic_POST_day(self) -> None: + """Test basic (no Todos) POST /day. + + Check POST (& GET!) requests properly parse 'today', 'tomorrow', + 'yesterday', and actual date strings; + preserve 'make_type' setting in redirect even if nonsensical; + and store 'day_comment' + """ + for name, dist, test_str in [('2024-01-01', None, 'a'), + ('today', 0, 'b'), + ('yesterday', -1, 'c'), + ('tomorrow', +1, 'd')]: + date = name if dist is None else date_in_n_days(dist) + post = {'day_comment': test_str, 'make_type': f'x:{test_str}'} + post_url = f'/day?date={name}' + redir_url = f'{post_url}&make_type={post["make_type"]}' + self.check_post(post, post_url, 302, redir_url) + expected = self.GET_day_dict(date) + assert isinstance(expected['_library'], dict) + expected['_library']['Day'][date]['comment'] = test_str + self.check_json_get(post_url, expected) + + def test_GET_day_with_processes_and_todos(self) -> None: + """Test GET /day displaying Processes and Todos (no trees).""" + date = '2024-01-01' + # check Processes get displayed in ['processes'] and ['_library'] + procs_data = [[1, 'foo', 'oof', 1.1], [2, 'bar', 'rab', 0.9]] + procs_expected = self._post_batch(procs_data, [], + ['title', 'description', 'effort'], + self.proc_as_dict, self.post_process) + expected = self.GET_day_dict(date) + assert isinstance(expected['_library'], dict) + expected['processes'] = self.as_id_list(procs_expected) + expected['_library']['Process'] = self.as_refs(procs_expected) + self._post_day(f'date={date}') + self.check_json_get(f'/day?date={date}', expected) + # post Todos of either process and check their display + post_day: dict[str, object] + post_day = {'day_comment': '', 'make_type': '', 'new_todo': [1, 2]} + todos = [self._todo_as_dict(1, 1, date), + self._todo_as_dict(2, 2, date)] + expected['_library']['Todo'] = self.as_refs(todos) + expected['_library']['Day'][date]['todos'] = self.as_id_list(todos) + nodes = [self._todo_node_as_dict(1), self._todo_node_as_dict(2)] + expected['top_nodes'] = nodes + self._post_day(f'date={date}', post_day) + self.check_json_get(f'/day?date={date}', expected) + # add a comment to one Todo and set the other's doneness and effort + post_day = {'day_comment': '', 'make_type': '', 'new_todo': [], + 'todo_id': [1, 2], 'done': [2], 'comment': ['FOO', ''], + 'effort': [2.3, '']} + expected['_library']['Todo']['1']['comment'] = 'FOO' + expected['_library']['Todo']['1']['effort'] = 2.3 + expected['_library']['Todo']['2']['is_done'] = True + self._post_day(f'date={date}', post_day) + self.check_json_get(f'/day?date={date}', expected) + + def test_GET_day_with_conditions(self) -> None: + """Test GET /day displaying Conditions and their relations.""" + date = '2024-01-01' + # add Process with Conditions and their Todos, check display + conds_data = [[1, False, ['A'], ['a']], [2, True, ['B'], ['b']]] + conds_expected = self._post_batch( + conds_data, ['is_active'], ['title', 'description'], + self.cond_as_dict, + lambda x, y: self.check_post(y, f'/condition?id={x}', 302)) + cond_names = ['conditions', 'disables', 'blockers', 'enables'] + procs_data = [[1, 'foo', 'oof', 1.1, [1], [1], [2], [2]], + [2, 'bar', 'rab', 0.9, [2], [2], [1], [1]]] + procs_expected = self._post_batch(procs_data, cond_names, + ['title', 'description', 'effort'], + self.proc_as_dict, self.post_process) + expected = self.GET_day_dict(date) + assert isinstance(expected['_library'], dict) + expected['processes'] = self.as_id_list(procs_expected) + expected['_library']['Process'] = self.as_refs(procs_expected) + expected['_library']['Condition'] = self.as_refs(conds_expected) + self._post_day(f'date={date}') + self.check_json_get(f'/day?date={date}', expected) + # add Todos in relation to Conditions, check consequences + post_day: dict[str, object] + post_day = {'day_comment': '', 'make_type': '', 'new_todo': [1, 2]} + todos = [self._todo_as_dict(1, 1, date, [1], [1], [2], [2]), + self._todo_as_dict(2, 2, date, [2], [2], [1], [1])] + expected['_library']['Todo'] = self.as_refs(todos) + expected['_library']['Day'][date]['todos'] = self.as_id_list(todos) + nodes = [self._todo_node_as_dict(1), self._todo_node_as_dict(2)] + expected['top_nodes'] = nodes + expected['disablers_for'] = {'1': [1], '2': [2]} + expected['enablers_for'] = {'1': [2], '2': [1]} + expected['conditions_present'] = self.as_id_list(conds_expected) + self._post_day(f'date={date}', post_day) + self.check_json_get(f'/day?date={date}', expected) + + def test_GET_calendar(self) -> None: + """Test GET /calendar responses based on various inputs, DB states.""" + # check illegal date range delimiters + self.check_get('/calendar?start=foo', 400) + self.check_get('/calendar?end=foo', 400) + # check default range without saved days + expected = self.GET_calendar_dict(-1, 366) + self.check_json_get('/calendar', expected) + self.check_json_get('/calendar?start=&end=', expected) + # check named days as delimiters + expected = self.GET_calendar_dict(-1, +1) + self.check_json_get('/calendar?start=yesterday&end=tomorrow', expected) + # check zero-element range + expected = self.GET_calendar_dict(+1, 0) + self.check_json_get('/calendar?start=tomorrow&end=today', expected) + # check saved day shows up in results with proven by its comment + post_day: dict[str, object] = {'day_comment': 'foo', 'make_type': ''} + date1 = date_in_n_days(-2) + self._post_day(f'date={date1}', post_day) + start_date = date_in_n_days(-5) + end_date = date_in_n_days(+5) + url = f'/calendar?start={start_date}&end={end_date}' + expected = self.GET_calendar_dict(-5, +5) + assert isinstance(expected['_library'], dict) + expected['_library']['Day'][date1]['comment'] = post_day['day_comment'] + self.check_json_get(url, expected) diff --git a/tests/misc.py b/tests/misc.py index b0fb872..a27f0d0 100644 --- a/tests/misc.py +++ b/tests/misc.py @@ -151,7 +151,7 @@ class TestsWithServer(TestCaseWithServer): """Tests against our HTTP server/handler (and database).""" def test_do_GET(self) -> None: - """Test / redirect, and unknown targets failing.""" + """Test GET / redirect, and unknown targets failing.""" self.conn.request('GET', '/') self.check_redirect('/day') self.check_get('/foo', 404) diff --git a/tests/processes.py b/tests/processes.py index 34f6427..1b20e21 100644 --- a/tests/processes.py +++ b/tests/processes.py @@ -1,4 +1,5 @@ """Test Processes module.""" +from typing import Any from tests.utils import TestCaseWithDB, TestCaseWithServer, TestCaseSansDB from plomtask.processes import Process, ProcessStep, ProcessStepsNode from plomtask.conditions import Condition @@ -9,7 +10,6 @@ from plomtask.todos import Todo class TestsSansDB(TestCaseSansDB): """Module tests not requiring DB setup.""" checked_class = Process - do_id_test = True versioned_defaults_to_test = {'title': 'UNNAMED', 'description': '', 'effort': 1.0} @@ -17,7 +17,6 @@ class TestsSansDB(TestCaseSansDB): class TestsSansDBProcessStep(TestCaseSansDB): """Module tests not requiring DB setup.""" checked_class = ProcessStep - do_id_test = True default_init_args = [2, 3, 4] @@ -58,17 +57,15 @@ class TestsWithDB(TestCaseWithDB): def test_Process_conditions_saving(self) -> None: """Test .save/.save_core.""" p, set1, set2, set3 = self.p_of_conditions() + assert p.id_ is not None r = Process.by_id(self.db_conn, p.id_) self.assertEqual(sorted(r.conditions), sorted(set1)) self.assertEqual(sorted(r.enables), sorted(set2)) self.assertEqual(sorted(r.disables), sorted(set3)) - def test_Process_from_table_row(self) -> None: - """Test .from_table_row() properly reads in class from DB""" - self.check_from_table_row() - self.check_versioned_from_table_row('title', str) - self.check_versioned_from_table_row('description', str) - self.check_versioned_from_table_row('effort', float) + def test_from_table_row(self) -> None: + """Test .from_table_row() properly reads in class from DB.""" + super().test_from_table_row() p, set1, set2, set3 = self.p_of_conditions() p.save(self.db_conn) assert isinstance(p.id_, int) @@ -182,25 +179,9 @@ class TestsWithDB(TestCaseWithDB): method(self.db_conn, [c1.id_, c2.id_]) self.assertEqual(getattr(p, target), [c1, c2]) - def test_Process_by_id(self) -> None: - """Test .by_id(), including creation""" - self.check_by_id() - - def test_Process_all(self) -> None: - """Test .all().""" - self.check_all() - - def test_Process_singularity(self) -> None: - """Test pointers made for single object keep pointing to it.""" - self.check_singularity('conditions', [Condition(None)]) - - def test_Process_versioned_attributes_singularity(self) -> None: - """Test behavior of VersionedAttributes on saving (with .title).""" - self.check_versioned_singularity() - - def test_Process_removal(self) -> None: + def test_remove(self) -> None: """Test removal of Processes and ProcessSteps.""" - self.check_remove() + super().test_remove() p1, p2, p3 = self.three_processes() assert isinstance(p1.id_, int) assert isinstance(p2.id_, int) @@ -212,13 +193,15 @@ class TestsWithDB(TestCaseWithDB): p1.remove(self.db_conn) p2.set_steps(self.db_conn, []) with self.assertRaises(NotFoundException): + assert step_id is not None ProcessStep.by_id(self.db_conn, step_id) p1.remove(self.db_conn) step = ProcessStep(None, p2.id_, p3.id_, None) - step_id = step.id_ p2.set_steps(self.db_conn, [step]) + step_id = step.id_ p2.remove(self.db_conn) with self.assertRaises(NotFoundException): + assert step_id is not None ProcessStep.by_id(self.db_conn, step_id) todo = Todo(None, p3, False, '2024-01-01') todo.save(self.db_conn) @@ -231,33 +214,25 @@ class TestsWithDB(TestCaseWithDB): class TestsWithDBForProcessStep(TestCaseWithDB): """Module tests requiring DB setup.""" checked_class = ProcessStep - default_init_kwargs = {'owner_id': 2, 'step_process_id': 3, - 'parent_step_id': 4} + default_init_kwargs = {'owner_id': 1, 'step_process_id': 2, + 'parent_step_id': 3} def setUp(self) -> None: super().setUp() - p = Process(1) - p.save(self.db_conn) - p = Process(2) - p.save(self.db_conn) + self.p1 = Process(1) + self.p1.save(self.db_conn) - def test_saving_and_caching(self) -> None: - """Test storage and initialization of instances and attributes.""" - self.check_saving_and_caching(id_=1, **self.default_init_kwargs) - - def test_ProcessStep_remove(self) -> None: + def test_remove(self) -> None: """Test .remove and unsetting of owner's .explicit_steps entry.""" - p1 = Process(None) - p2 = Process(None) - p1.save(self.db_conn) + p2 = Process(2) p2.save(self.db_conn) - assert isinstance(p1.id_, int) + assert isinstance(self.p1.id_, int) assert isinstance(p2.id_, int) - step = ProcessStep(None, p1.id_, p2.id_, None) - p1.set_steps(self.db_conn, [step]) + step = ProcessStep(None, self.p1.id_, p2.id_, None) + self.p1.set_steps(self.db_conn, [step]) step.remove(self.db_conn) - self.assertEqual(p1.explicit_steps, []) - self.check_storage([]) + self.assertEqual(self.p1.explicit_steps, []) + self.check_identity_with_cache_and_db([]) class TestsWithServer(TestCaseWithServer): @@ -277,11 +252,12 @@ class TestsWithServer(TestCaseWithServer): '/process?id=', 400) self.assertEqual(1, len(Process.all(self.db_conn))) form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.0} - self.post_process(2, form_data | {'condition': []}) - self.check_post(form_data | {'condition': [1]}, '/process?id=', 404) - self.check_post({'title': 'foo', 'description': 'foo'}, + self.post_process(2, form_data | {'conditions': []}) + self.check_post(form_data | {'conditions': [1]}, '/process?id=', 404) + self.check_post({'title': 'foo', 'description': 'foo', + 'is_active': False}, '/condition', 302, '/condition?id=1') - self.post_process(3, form_data | {'condition': [1]}) + self.post_process(3, form_data | {'conditions': [1]}) self.post_process(4, form_data | {'disables': [1]}) self.post_process(5, form_data | {'enables': [1]}) form_data['delete'] = '' @@ -311,6 +287,7 @@ class TestsWithServer(TestCaseWithServer): self.post_process(1, form_data_1) retrieved_process = Process.by_id(self.db_conn, 1) self.assertEqual(retrieved_process.explicit_steps, []) + assert retrieved_step_id is not None with self.assertRaises(NotFoundException): ProcessStep.by_id(self.db_conn, retrieved_step_id) # post new first (top_level) step of process 3 to process 1 @@ -361,11 +338,11 @@ class TestsWithServer(TestCaseWithServer): self.post_process(1, form_data_1) retrieved_process = Process.by_id(self.db_conn, 1) self.assertEqual(len(retrieved_process.explicit_steps), 2) - retrieved_step_0 = retrieved_process.explicit_steps[0] + retrieved_step_0 = retrieved_process.explicit_steps[1] self.assertEqual(retrieved_step_0.step_process_id, 3) self.assertEqual(retrieved_step_0.owner_id, 1) self.assertEqual(retrieved_step_0.parent_step_id, None) - retrieved_step_1 = retrieved_process.explicit_steps[1] + retrieved_step_1 = retrieved_process.explicit_steps[0] self.assertEqual(retrieved_step_1.step_process_id, 2) self.assertEqual(retrieved_step_1.owner_id, 1) self.assertEqual(retrieved_step_1.parent_step_id, None) @@ -393,11 +370,11 @@ class TestsWithServer(TestCaseWithServer): self.post_process(1, form_data_1) retrieved_process = Process.by_id(self.db_conn, 1) self.assertEqual(len(retrieved_process.explicit_steps), 3) - retrieved_step_0 = retrieved_process.explicit_steps[0] + retrieved_step_0 = retrieved_process.explicit_steps[1] self.assertEqual(retrieved_step_0.step_process_id, 2) self.assertEqual(retrieved_step_0.owner_id, 1) self.assertEqual(retrieved_step_0.parent_step_id, None) - retrieved_step_1 = retrieved_process.explicit_steps[1] + retrieved_step_1 = retrieved_process.explicit_steps[0] self.assertEqual(retrieved_step_1.step_process_id, 3) self.assertEqual(retrieved_step_1.owner_id, 1) self.assertEqual(retrieved_step_1.parent_step_id, None) @@ -408,5 +385,126 @@ class TestsWithServer(TestCaseWithServer): def test_do_GET(self) -> None: """Test /process and /processes response codes.""" + self.check_get('/process', 200) + self.check_get('/process?id=', 200) + self.check_get('/process?id=1', 200) self.check_get_defaults('/process') self.check_get('/processes', 200) + + def test_fail_GET_process(self) -> None: + """Test invalid GET /process params.""" + # check for invalid IDs + self.check_get('/process?id=foo', 400) + self.check_get('/process?id=0', 500) + # check we catch invalid base64 + self.check_get('/process?title_b64=foo', 400) + # check failure on references to unknown processes; we create Process + # of ID=1 here so we know the 404 comes from step_to=2 etc. (that tie + # the Process displayed by /process to others), not from not finding + # the main Process itself + self.post_process(1) + self.check_get('/process?id=1&step_to=2', 404) + self.check_get('/process?id=1&has_step=2', 404) + + @classmethod + def GET_processes_dict(cls, procs: list[dict[str, object]] + ) -> dict[str, object]: + """Return JSON of GET /processes to expect.""" + library = {'Process': cls.as_refs(procs)} if procs else {} + d: dict[str, object] = {'processes': cls.as_id_list(procs), + 'sort_by': 'title', + 'pattern': '', + '_library': library} + return d + + @staticmethod + def procstep_as_dict(id_: int, + owner_id: int, + step_process_id: int, + parent_step_id: int | None = None + ) -> dict[str, object]: + """Return JSON of Process to expect.""" + return {'id': id_, + 'owner_id': owner_id, + 'step_process_id': step_process_id, + 'parent_step_id': parent_step_id} + + def test_GET_processes(self) -> None: + """Test GET /processes.""" + # pylint: disable=too-many-statements + # test empty result on empty DB, default-settings on empty params + expected = self.GET_processes_dict([]) + self.check_json_get('/processes', expected) + # test on meaningless non-empty params (incl. entirely un-used key), + # that 'sort_by' default to 'title' (even if set to something else, as + # long as without handler) and 'pattern' get preserved + expected['pattern'] = 'bar' # preserved despite zero effect! + url = '/processes?sort_by=foo&pattern=bar&foo=x' + self.check_json_get(url, expected) + # test non-empty result, automatic (positive) sorting by title + post1: dict[str, Any] + post2: dict[str, Any] + post3: dict[str, Any] + post1 = {'title': 'foo', 'description': 'oof', 'effort': 1.0} + post2 = {'title': 'bar', 'description': 'rab', 'effort': 1.1} + post2['new_top_step'] = 1 + post3 = {'title': 'baz', 'description': 'zab', 'effort': 0.9} + post3['new_top_step'] = 1 + self.post_process(1, post1) + self.post_process(2, post2) + self.post_process(3, post3) + post3['new_top_step'] = 2 + post3['keep_step'] = 2 + post3['steps'] = [2] + post3['step_2_process_id'] = 1 + self.post_process(3, post3) + proc1 = self.proc_as_dict(1, post1['title'], + post1['description'], post1['effort']) + proc2 = self.proc_as_dict(2, post2['title'], + post2['description'], post2['effort']) + proc3 = self.proc_as_dict(3, post3['title'], + post3['description'], post3['effort']) + proc2['explicit_steps'] = [1] + proc3['explicit_steps'] = [2, 3] + step1 = self.procstep_as_dict(1, 2, 1) + step2 = self.procstep_as_dict(2, 3, 1) + step3 = self.procstep_as_dict(3, 3, 2) + expected = self.GET_processes_dict([proc2, proc3, proc1]) + assert isinstance(expected['_library'], dict) + expected['_library']['ProcessStep'] = self.as_refs([step1, step2, + step3]) + self.check_json_get('/processes', expected) + # test other sortings + expected['sort_by'] = '-title' + expected['processes'] = self.as_id_list([proc1, proc3, proc2]) + self.check_json_get('/processes?sort_by=-title', expected) + expected['sort_by'] = 'effort' + expected['processes'] = self.as_id_list([proc3, proc1, proc2]) + self.check_json_get('/processes?sort_by=effort', expected) + expected['sort_by'] = '-effort' + expected['processes'] = self.as_id_list([proc2, proc1, proc3]) + self.check_json_get('/processes?sort_by=-effort', expected) + expected['sort_by'] = 'steps' + expected['processes'] = self.as_id_list([proc1, proc2, proc3]) + self.check_json_get('/processes?sort_by=steps', expected) + expected['sort_by'] = '-steps' + expected['processes'] = self.as_id_list([proc3, proc2, proc1]) + self.check_json_get('/processes?sort_by=-steps', expected) + expected['sort_by'] = 'owners' + expected['processes'] = self.as_id_list([proc3, proc2, proc1]) + self.check_json_get('/processes?sort_by=owners', expected) + expected['sort_by'] = '-owners' + expected['processes'] = self.as_id_list([proc1, proc2, proc3]) + self.check_json_get('/processes?sort_by=-owners', expected) + # test pattern matching on title + expected = self.GET_processes_dict([proc2, proc3]) + assert isinstance(expected['_library'], dict) + expected['pattern'] = 'ba' + expected['_library']['ProcessStep'] = self.as_refs([step1, step2, + step3]) + self.check_json_get('/processes?pattern=ba', expected) + # test pattern matching on description + expected['processes'] = self.as_id_list([proc1]) + expected['_library'] = {'Process': self.as_refs([proc1])} + expected['pattern'] = 'of' + self.check_json_get('/processes?pattern=of', expected) diff --git a/tests/todos.py b/tests/todos.py index 9317c39..dd57ee4 100644 --- a/tests/todos.py +++ b/tests/todos.py @@ -1,5 +1,5 @@ """Test Todos module.""" -from tests.utils import TestCaseWithDB, TestCaseWithServer +from tests.utils import TestCaseSansDB, TestCaseWithDB, TestCaseWithServer from plomtask.todos import Todo, TodoNode from plomtask.processes import Process, ProcessStep from plomtask.conditions import Condition @@ -7,11 +7,18 @@ from plomtask.exceptions import (NotFoundException, BadFormatException, HandledException) -class TestsWithDB(TestCaseWithDB): - """Tests requiring DB, but not server setup.""" +class TestsWithDB(TestCaseWithDB, TestCaseSansDB): + """Tests requiring DB, but not server setup. + + NB: We subclass TestCaseSansDB too, to pull in its .test_id_validation, + which for Todo wouldn't run without a DB being set up due to the need for + Processes with set IDs. + """ checked_class = Todo default_init_kwargs = {'process': None, 'is_done': False, 'date': '2024-01-01'} + # solely used for TestCaseSansDB.test_id_setting + default_init_args = [None, False, '2024-01-01'] def setUp(self) -> None: super().setUp() @@ -24,6 +31,7 @@ class TestsWithDB(TestCaseWithDB): self.cond2 = Condition(None) self.cond2.save(self.db_conn) self.default_init_kwargs['process'] = self.proc + self.default_init_args[0] = self.proc def test_Todo_init(self) -> None: """Test creation of Todo and what they default to.""" @@ -45,16 +53,6 @@ class TestsWithDB(TestCaseWithDB): self.assertEqual(todo_yes_id.enables, []) self.assertEqual(todo_yes_id.disables, []) - def test_Todo_by_id(self) -> None: - """Test findability of Todos.""" - todo = Todo(1, self.proc, False, self.date1) - todo.save(self.db_conn) - self.assertEqual(Todo.by_id(self.db_conn, 1), todo) - with self.assertRaises(NotFoundException): - Todo.by_id(self.db_conn, 0) - with self.assertRaises(NotFoundException): - Todo.by_id(self.db_conn, 2) - def test_Todo_by_date(self) -> None: """Test findability of Todos by date.""" t1 = Todo(None, self.proc, False, self.date1) @@ -127,10 +125,11 @@ class TestsWithDB(TestCaseWithDB): assert isinstance(todo_1.id_, int) # test minimum node_0 = TodoNode(todo_1, False, []) - self.assertEqual(todo_1.get_step_tree(set()), node_0) + self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict) # test non_emtpy seen_todo does something node_0.seen = True - self.assertEqual(todo_1.get_step_tree({todo_1.id_}), node_0) + self.assertEqual(todo_1.get_step_tree({todo_1.id_}).as_dict, + node_0.as_dict) # test child shows up todo_2 = Todo(None, self.proc, False, self.date1) todo_2.save(self.db_conn) @@ -139,7 +138,7 @@ class TestsWithDB(TestCaseWithDB): node_2 = TodoNode(todo_2, False, []) node_0.children = [node_2] node_0.seen = False - self.assertEqual(todo_1.get_step_tree(set()), node_0) + self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict) # test child shows up with child todo_3 = Todo(None, self.proc, False, self.date1) todo_3.save(self.db_conn) @@ -147,12 +146,12 @@ class TestsWithDB(TestCaseWithDB): todo_2.add_child(todo_3) node_3 = TodoNode(todo_3, False, []) node_2.children = [node_3] - self.assertEqual(todo_1.get_step_tree(set()), node_0) + self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict) # test same todo can be child-ed multiple times at different locations todo_1.add_child(todo_3) node_4 = TodoNode(todo_3, True, []) node_0.children += [node_4] - self.assertEqual(todo_1.get_step_tree(set()), node_0) + self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict) def test_Todo_create_with_children(self) -> None: """Test parenthood guaranteeds of Todo.create_with_children.""" @@ -193,14 +192,11 @@ class TestsWithDB(TestCaseWithDB): self.assertEqual(len(todo_3.children), 1) self.assertEqual(todo_3.children[0].process, proc4) - def test_Todo_singularity(self) -> None: - """Test pointers made for single object keep pointing to it.""" - self.check_singularity('is_done', True, self.proc, False, self.date1) - def test_Todo_remove(self) -> None: """Test removal.""" todo_1 = Todo(None, self.proc, False, self.date1) todo_1.save(self.db_conn) + assert todo_1.id_ is not None todo_0 = Todo(None, self.proc, False, self.date1) todo_0.save(self.db_conn) todo_0.add_child(todo_1) @@ -228,6 +224,7 @@ class TestsWithDB(TestCaseWithDB): todo_1.comment = 'foo' todo_1.effort = -0.1 todo_1.save(self.db_conn) + assert todo_1.id_ is not None Todo.by_id(self.db_conn, todo_1.id_) todo_1.comment = '' todo_1_id = todo_1.id_ @@ -248,19 +245,25 @@ class TestsWithServer(TestCaseWithServer): form_data = {'day_comment': '', 'make_type': 'full'} self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302) self.assertEqual(Todo.by_date(self.db_conn, '2024-01-01'), []) + proc = Process.by_id(self.db_conn, 1) form_data['new_todo'] = str(proc.id_) self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302) todos = Todo.by_date(self.db_conn, '2024-01-01') self.assertEqual(1, len(todos)) todo1 = todos[0] self.assertEqual(todo1.id_, 1) + proc = Process.by_id(self.db_conn, 1) self.assertEqual(todo1.process.id_, proc.id_) self.assertEqual(todo1.is_done, False) + proc2 = Process.by_id(self.db_conn, 2) form_data['new_todo'] = str(proc2.id_) self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302) todos = Todo.by_date(self.db_conn, '2024-01-01') todo1 = todos[1] self.assertEqual(todo1.id_, 2) + proc2 = Process.by_id(self.db_conn, 1) + todo1 = Todo.by_date(self.db_conn, '2024-01-01')[0] + self.assertEqual(todo1.id_, 1) self.assertEqual(todo1.process.id_, proc2.id_) self.assertEqual(todo1.is_done, False) @@ -277,7 +280,7 @@ class TestsWithServer(TestCaseWithServer): '/day?date=2024-01-01&make_type=full', 302) # test posting to bad URLs self.check_post({}, '/todo=', 404) - self.check_post({}, '/todo?id=', 400) + self.check_post({}, '/todo?id=', 404) self.check_post({}, '/todo?id=FOO', 400) self.check_post({}, '/todo?id=0', 404) # test posting naked entity @@ -403,12 +406,15 @@ class TestsWithServer(TestCaseWithServer): form_data = {'day_comment': '', 'new_todo': [1], 'make_type': 'full'} self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302) todo = Todo.by_date(self.db_conn, '2024-01-01')[0] - form_data = {'day_comment': '', 'todo_id': [1], 'make_type': 'full'} + form_data = {'day_comment': '', 'todo_id': [1], 'make_type': 'full', + 'comment': [''], 'done': [], 'effort': ['']} self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302) + todo = Todo.by_date(self.db_conn, '2024-01-01')[0] self.assertEqual(todo.is_done, False) form_data = {'day_comment': '', 'todo_id': [1], 'done': [1], - 'make_type': 'full'} + 'make_type': 'full', 'comment': [''], 'effort': ['']} self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302) + todo = Todo.by_date(self.db_conn, '2024-01-01')[0] self.assertEqual(todo.is_done, True) def test_do_GET_todo(self) -> None: @@ -416,8 +422,8 @@ class TestsWithServer(TestCaseWithServer): self.post_process() form_data = {'day_comment': '', 'new_todo': 1, 'make_type': 'full'} self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302) - self.check_get('/todo', 400) - self.check_get('/todo?id=', 400) + self.check_get('/todo', 404) + self.check_get('/todo?id=', 404) self.check_get('/todo?id=foo', 400) self.check_get('/todo?id=0', 404) self.check_get('/todo?id=1', 200) diff --git a/tests/utils.py b/tests/utils.py index a9a4e80..6654368 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,11 +1,13 @@ """Shared test utilities.""" +from __future__ import annotations from unittest import TestCase +from typing import Mapping, Any, Callable from threading import Thread from http.client import HTTPConnection +from json import loads as json_loads from urllib.parse import urlencode from uuid import uuid4 from os import remove as remove_file -from typing import Mapping, Any from plomtask.db import DatabaseFile, DatabaseConnection from plomtask.http import TaskHandler, TaskServer from plomtask.processes import Process, ProcessStep @@ -15,27 +17,36 @@ from plomtask.todos import Todo from plomtask.exceptions import NotFoundException, HandledException +def _within_checked_class(f: Callable[..., None]) -> Callable[..., None]: + def wrapper(self: TestCase) -> None: + if hasattr(self, 'checked_class'): + f(self) + return wrapper + + class TestCaseSansDB(TestCase): """Tests requiring no DB setup.""" checked_class: Any - do_id_test: bool = False default_init_args: list[Any] = [] versioned_defaults_to_test: dict[str, str | float] = {} + legal_ids = [1, 5] + illegal_ids = [0] - def test_id_setting(self) -> None: - """Test .id_ being set and its legal range being enforced.""" - if not self.do_id_test: - return - with self.assertRaises(HandledException): - self.checked_class(0, *self.default_init_args) - obj = self.checked_class(5, *self.default_init_args) - self.assertEqual(obj.id_, 5) + @_within_checked_class + def test_id_validation(self) -> None: + """Test .id_ validation/setting.""" + for id_ in self.illegal_ids: + with self.assertRaises(HandledException): + self.checked_class(id_, *self.default_init_args) + for id_ in self.legal_ids: + obj = self.checked_class(id_, *self.default_init_args) + self.assertEqual(obj.id_, id_) + @_within_checked_class def test_versioned_defaults(self) -> None: """Test defaults of VersionedAttributes.""" - if len(self.versioned_defaults_to_test) == 0: - return - obj = self.checked_class(1, *self.default_init_args) + id_ = self.legal_ids[0] + obj = self.checked_class(id_, *self.default_init_args) for k, v in self.versioned_defaults_to_test.items(): self.assertEqual(getattr(obj, k).newest, v) @@ -60,19 +71,29 @@ class TestCaseWithDB(TestCase): self.db_conn.close() remove_file(self.db_file.path) - def test_saving_and_caching(self) -> None: - """Test storage and initialization of instances and attributes.""" - if not hasattr(self, 'checked_class'): - return - self.check_saving_and_caching(id_=1, **self.default_init_kwargs) - obj = self.checked_class(None, **self.default_init_kwargs) - obj.save(self.db_conn) - self.assertEqual(obj.id_, 2) - for k, v in self.test_versioneds.items(): - self.check_saving_of_versioned(k, v) + def _load_from_db(self, id_: int | str) -> list[object]: + db_found: list[object] = [] + for row in self.db_conn.row_where(self.checked_class.table_name, + 'id', id_): + db_found += [self.checked_class.from_table_row(self.db_conn, + row)] + return db_found - def check_storage(self, content: list[Any]) -> None: - """Test cache and DB equal content.""" + def _change_obj(self, obj: object) -> str: + attr_name: str = self.checked_class.to_save[-1] + attr = getattr(obj, attr_name) + new_attr: str | int | float | bool + if isinstance(attr, (int, float)): + new_attr = attr + 1 + elif isinstance(attr, str): + new_attr = attr + '_' + elif isinstance(attr, bool): + new_attr = not attr + setattr(obj, attr_name, new_attr) + return attr_name + + def check_identity_with_cache_and_db(self, content: list[Any]) -> None: + """Test both cache and DB equal content.""" expected_cache = {} for item in content: expected_cache[item.id_] = item @@ -81,129 +102,210 @@ class TestCaseWithDB(TestCase): db_found: list[Any] = [] for item in content: assert isinstance(item.id_, type(self.default_ids[0])) - for row in self.db_conn.row_where(self.checked_class.table_name, - 'id', item.id_): - db_found += [self.checked_class.from_table_row(self.db_conn, - row)] + db_found += self._load_from_db(item.id_) hashes_db_found = [hash(x) for x in db_found] self.assertEqual(sorted(hashes_content), sorted(hashes_db_found)) - def check_saving_and_caching(self, **kwargs: Any) -> None: - """Test instance.save in its core without relations.""" - obj = self.checked_class(**kwargs) # pylint: disable=not-callable - # check object init itself doesn't store anything yet - self.check_storage([]) - # check saving sets core attributes properly - obj.save(self.db_conn) - for key, value in kwargs.items(): - self.assertEqual(getattr(obj, key), value) - # check saving stored properly in cache and DB - self.check_storage([obj]) - - def check_saving_of_versioned(self, attr_name: str, type_: type) -> None: - """Test owner's versioned attributes.""" - owner = self.checked_class(None) - vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1] - attr = getattr(owner, attr_name) - attr.set(vals[0]) - attr.set(vals[1]) - owner.save(self.db_conn) - retrieved = owner.__class__.by_id(self.db_conn, owner.id_) - attr = getattr(retrieved, attr_name) - self.assertEqual(sorted(attr.history.values()), vals) - - def check_by_id(self) -> None: - """Test .by_id(), including creation.""" + @_within_checked_class + def test_saving_versioned(self) -> None: + """Test storage and initialization of versioned attributes.""" + def retrieve_attr_vals() -> list[object]: + attr_vals_saved: list[object] = [] + assert hasattr(retrieved, 'id_') + for row in self.db_conn.row_where(attr.table_name, 'parent', + retrieved.id_): + attr_vals_saved += [row[2]] + return attr_vals_saved + for attr_name, type_ in self.test_versioneds.items(): + # fail saving attributes on non-saved owner + owner = self.checked_class(None, **self.default_init_kwargs) + vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1] + attr = getattr(owner, attr_name) + attr.set(vals[0]) + attr.set(vals[1]) + with self.assertRaises(NotFoundException): + attr.save(self.db_conn) + owner.save(self.db_conn) + # check stored attribute is as expected + retrieved = self._load_from_db(owner.id_)[0] + attr = getattr(retrieved, attr_name) + self.assertEqual(sorted(attr.history.values()), vals) + # check owner.save() created entries in attr table + attr_vals_saved = retrieve_attr_vals() + self.assertEqual(vals, attr_vals_saved) + # check setting new val to attr inconsequential to DB without save + attr.set(vals[0]) + attr_vals_saved = retrieve_attr_vals() + self.assertEqual(vals, attr_vals_saved) + # check save finally adds new val + attr.save(self.db_conn) + attr_vals_saved = retrieve_attr_vals() + self.assertEqual(vals + [vals[0]], attr_vals_saved) + + @_within_checked_class + def test_saving_and_caching(self) -> None: + """Test effects of .cache() and .save().""" + id1 = self.default_ids[0] + # check failure to cache without ID (if None-ID input possible) + if isinstance(id1, int): + obj0 = self.checked_class(None, **self.default_init_kwargs) + with self.assertRaises(HandledException): + obj0.cache() + # check mere object init itself doesn't even store in cache + obj1 = self.checked_class(id1, **self.default_init_kwargs) + self.assertEqual(self.checked_class.get_cache(), {}) + # check .cache() fills cache, but not DB + obj1.cache() + self.assertEqual(self.checked_class.get_cache(), {id1: obj1}) + db_found = self._load_from_db(id1) + self.assertEqual(db_found, []) + # check .save() sets ID (for int IDs), updates cache, and fills DB + # (expect ID to be set to id1, despite obj1 already having that as ID: + # it's generated by cursor.lastrowid on the DB table, and with obj1 + # not written there, obj2 should get it first!) + id_input = None if isinstance(id1, int) else id1 + obj2 = self.checked_class(id_input, **self.default_init_kwargs) + obj2.save(self.db_conn) + obj2_hash = hash(obj2) + self.assertEqual(self.checked_class.get_cache(), {id1: obj2}) + db_found += self._load_from_db(id1) + self.assertEqual([hash(o) for o in db_found], [obj2_hash]) + # check we cannot overwrite obj2 with obj1 despite its same ID, + # since it has disappeared now + with self.assertRaises(HandledException): + obj1.save(self.db_conn) + + @_within_checked_class + def test_by_id(self) -> None: + """Test .by_id().""" + id1, id2, _ = self.default_ids # check failure if not yet saved - id1, id2 = self.default_ids[0], self.default_ids[1] - obj = self.checked_class(id1) # pylint: disable=not-callable + obj1 = self.checked_class(id1, **self.default_init_kwargs) with self.assertRaises(NotFoundException): self.checked_class.by_id(self.db_conn, id1) + # check identity of cached and retrieved + obj1.cache() + self.assertEqual(obj1, self.checked_class.by_id(self.db_conn, id1)) # check identity of saved and retrieved - obj.save(self.db_conn) - self.assertEqual(obj, self.checked_class.by_id(self.db_conn, id1)) - # check create=True acts like normal instantiation (sans saving) - by_id_created = self.checked_class.by_id(self.db_conn, id2, - create=True) - # pylint: disable=not-callable - self.assertEqual(self.checked_class(id2), by_id_created) - self.check_storage([obj]) - - def check_from_table_row(self, *args: Any) -> None: - """Test .from_table_row() properly reads in class from DB""" + obj2 = self.checked_class(id2, **self.default_init_kwargs) + obj2.save(self.db_conn) + self.assertEqual(obj2, self.checked_class.by_id(self.db_conn, id2)) + + @_within_checked_class + def test_by_id_or_create(self) -> None: + """Test .by_id_or_create.""" + # check .by_id_or_create fails if wrong class + if not self.checked_class.can_create_by_id: + with self.assertRaises(HandledException): + self.checked_class.by_id_or_create(self.db_conn, None) + return + # check ID input of None creates, on saving, ID=1,2,… for int IDs + if isinstance(self.default_ids[0], int): + for n in range(2): + item = self.checked_class.by_id_or_create(self.db_conn, None) + self.assertEqual(item.id_, None) + item.save(self.db_conn) + self.assertEqual(item.id_, n+1) + # check .by_id_or_create acts like normal instantiation (sans saving) + id_ = self.default_ids[2] + item = self.checked_class.by_id_or_create(self.db_conn, id_) + self.assertEqual(item.id_, id_) + with self.assertRaises(NotFoundException): + self.checked_class.by_id(self.db_conn, item.id_) + self.assertEqual(self.checked_class(item.id_), item) + + @_within_checked_class + def test_from_table_row(self) -> None: + """Test .from_table_row() properly reads in class directly from DB.""" id_ = self.default_ids[0] - obj = self.checked_class(id_, *args) # pylint: disable=not-callable + obj = self.checked_class(id_, **self.default_init_kwargs) obj.save(self.db_conn) - assert isinstance(obj.id_, type(self.default_ids[0])) + assert isinstance(obj.id_, type(id_)) for row in self.db_conn.row_where(self.checked_class.table_name, 'id', obj.id_): + # check .from_table_row reproduces state saved, no matter if obj + # later changed (with caching even) hash_original = hash(obj) + attr_name = self._change_obj(obj) + obj.cache() + to_cmp = getattr(obj, attr_name) retrieved = self.checked_class.from_table_row(self.db_conn, row) + self.assertNotEqual(to_cmp, getattr(retrieved, attr_name)) self.assertEqual(hash_original, hash(retrieved)) + # check cache contains what .from_table_row just produced self.assertEqual({retrieved.id_: retrieved}, self.checked_class.get_cache()) + # check .from_table_row also reads versioned attributes from DB + for attr_name, type_ in self.test_versioneds.items(): + owner = self.checked_class(None) + vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1] + attr = getattr(owner, attr_name) + attr.set(vals[0]) + attr.set(vals[1]) + owner.save(self.db_conn) + for row in self.db_conn.row_where(owner.table_name, 'id', + owner.id_): + retrieved = owner.__class__.from_table_row(self.db_conn, row) + attr = getattr(retrieved, attr_name) + self.assertEqual(sorted(attr.history.values()), vals) - def check_versioned_from_table_row(self, attr_name: str, - type_: type) -> None: - """Test .from_table_row() reads versioned attributes from DB.""" - owner = self.checked_class(None) - vals: list[Any] = ['t1', 't2'] if type_ == str else [0.9, 1.1] - attr = getattr(owner, attr_name) - attr.set(vals[0]) - attr.set(vals[1]) - owner.save(self.db_conn) - for row in self.db_conn.row_where(owner.table_name, 'id', owner.id_): - retrieved = owner.__class__.from_table_row(self.db_conn, row) - attr = getattr(retrieved, attr_name) - self.assertEqual(sorted(attr.history.values()), vals) - - def check_all(self) -> tuple[Any, Any, Any]: - """Test .all().""" - # pylint: disable=not-callable - item1 = self.checked_class(self.default_ids[0]) - item2 = self.checked_class(self.default_ids[1]) - item3 = self.checked_class(self.default_ids[2]) - # check pre-save .all() returns empty list + @_within_checked_class + def test_all(self) -> None: + """Test .all() and its relation to cache and savings.""" + id_1, id_2, id_3 = self.default_ids + item1 = self.checked_class(id_1, **self.default_init_kwargs) + item2 = self.checked_class(id_2, **self.default_init_kwargs) + item3 = self.checked_class(id_3, **self.default_init_kwargs) + # check .all() returns empty list on un-cached items self.assertEqual(self.checked_class.all(self.db_conn), []) - # check that all() shows all saved, but no unsaved items - item1.save(self.db_conn) + # check that all() shows only cached/saved items + item1.cache() item3.save(self.db_conn) self.assertEqual(sorted(self.checked_class.all(self.db_conn)), sorted([item1, item3])) item2.save(self.db_conn) self.assertEqual(sorted(self.checked_class.all(self.db_conn)), sorted([item1, item2, item3])) - return item1, item2, item3 - def check_singularity(self, defaulting_field: str, - non_default_value: Any, *args: Any) -> None: + @_within_checked_class + def test_singularity(self) -> None: """Test pointers made for single object keep pointing to it.""" id1 = self.default_ids[0] - obj = self.checked_class(id1, *args) # pylint: disable=not-callable + obj = self.checked_class(id1, **self.default_init_kwargs) obj.save(self.db_conn) - setattr(obj, defaulting_field, non_default_value) + # change object, expect retrieved through .by_id to carry change + attr_name = self._change_obj(obj) + new_attr = getattr(obj, attr_name) retrieved = self.checked_class.by_id(self.db_conn, id1) - self.assertEqual(non_default_value, - getattr(retrieved, defaulting_field)) + self.assertEqual(new_attr, getattr(retrieved, attr_name)) - def check_versioned_singularity(self) -> None: + @_within_checked_class + def test_versioned_singularity_title(self) -> None: """Test singularity of VersionedAttributes on saving (with .title).""" - obj = self.checked_class(None) # pylint: disable=not-callable - obj.save(self.db_conn) - assert isinstance(obj.id_, int) - obj.title.set('named') - retrieved = self.checked_class.by_id(self.db_conn, obj.id_) - self.assertEqual(obj.title.history, retrieved.title.history) + if 'title' in self.test_versioneds: + obj = self.checked_class(None) + obj.save(self.db_conn) + assert isinstance(obj.id_, int) + # change obj, expect retrieved through .by_id to carry change + obj.title.set('named') + retrieved = self.checked_class.by_id(self.db_conn, obj.id_) + self.assertEqual(obj.title.history, retrieved.title.history) - def check_remove(self, *args: Any) -> None: + @_within_checked_class + def test_remove(self) -> None: """Test .remove() effects on DB and cache.""" id_ = self.default_ids[0] - obj = self.checked_class(id_, *args) # pylint: disable=not-callable + obj = self.checked_class(id_, **self.default_init_kwargs) + # check removal only works after saving with self.assertRaises(HandledException): obj.remove(self.db_conn) obj.save(self.db_conn) obj.remove(self.db_conn) - self.check_storage([]) + # check access to obj fails after removal + with self.assertRaises(HandledException): + print(obj.id_) + # check DB and cache now empty + self.check_identity_with_cache_and_db([]) class TestCaseWithServer(TestCaseWithDB): @@ -217,6 +319,7 @@ class TestCaseWithServer(TestCaseWithDB): self.server_thread.start() self.conn = HTTPConnection(str(self.httpd.server_address[0]), self.httpd.server_address[1]) + self.httpd.set_json_mode() def tearDown(self) -> None: self.httpd.shutdown() @@ -224,6 +327,71 @@ class TestCaseWithServer(TestCaseWithDB): self.server_thread.join() super().tearDown() + @staticmethod + def as_id_list(items: list[dict[str, object]]) -> list[int | str]: + """Return list of only 'id' fields of items.""" + id_list = [] + for item in items: + assert isinstance(item['id'], (int, str)) + id_list += [item['id']] + return id_list + + @staticmethod + def as_refs(items: list[dict[str, object]] + ) -> dict[str, dict[str, object]]: + """Return dictionary of items by their 'id' fields.""" + refs = {} + for item in items: + refs[str(item['id'])] = item + return refs + + @staticmethod + def cond_as_dict(id_: int = 1, + is_active: bool = False, + titles: None | list[str] = None, + descriptions: None | list[str] = None + ) -> dict[str, object]: + """Return JSON of Condition to expect.""" + d = {'id': id_, + 'is_active': is_active, + '_versioned': { + 'title': {}, + 'description': {}}} + titles = titles if titles else [] + descriptions = descriptions if descriptions else [] + assert isinstance(d['_versioned'], dict) + for i, title in enumerate(titles): + d['_versioned']['title'][i] = title + for i, description in enumerate(descriptions): + d['_versioned']['description'][i] = description + return d + + @staticmethod + def proc_as_dict(id_: int = 1, + title: str = 'A', + description: str = '', + effort: float = 1.0, + conditions: None | list[int] = None, + disables: None | list[int] = None, + blockers: None | list[int] = None, + enables: None | list[int] = None + ) -> dict[str, object]: + """Return JSON of Process to expect.""" + # pylint: disable=too-many-arguments + d = {'id': id_, + 'calendarize': False, + 'suppressed_steps': [], + 'explicit_steps': [], + '_versioned': { + 'title': {0: title}, + 'description': {0: description}, + 'effort': {0: effort}}, + 'conditions': conditions if conditions else [], + 'disables': disables if disables else [], + 'enables': enables if enables else [], + 'blockers': blockers if blockers else []} + return d + def check_redirect(self, target: str) -> None: """Check that self.conn answers with a 302 redirect to target.""" response = self.conn.getresponse() @@ -267,3 +435,31 @@ class TestCaseWithServer(TestCaseWithDB): self.check_post(form_data, f'/process?id={id_}', 302, f'/process?id={id_}') return form_data + + def check_json_get(self, path: str, expected: dict[str, object]) -> None: + """Compare JSON on GET path with expected. + + To simplify comparison of VersionedAttribute histories, transforms + timestamp keys of VersionedAttribute history keys into integers + counting chronologically forward from 0. + """ + def rewrite_history_keys_in(item: Any) -> Any: + if isinstance(item, dict): + if '_versioned' in item.keys(): + for k in item['_versioned']: + vals = item['_versioned'][k].values() + history = {} + for i, val in enumerate(vals): + history[i] = val + item['_versioned'][k] = history + for k in list(item.keys()): + rewrite_history_keys_in(item[k]) + elif isinstance(item, list): + item[:] = [rewrite_history_keys_in(i) for i in item] + return item + self.conn.request('GET', path) + response = self.conn.getresponse() + self.assertEqual(response.status, 200) + retrieved = json_loads(response.read().decode()) + rewrite_history_keys_in(retrieved) + self.assertEqual(expected, retrieved)
+| {% elif outer_loop.index < loop.index %} - + {% else %} -× +/ {% endif %} {{condition.title.at(day.date)|e}}{{condition.title.at(day.date)|e}} + {% elif outer_loop.index0 + loop.index > conditions_present|length %} - + | {% else %} - Ã— + \ {% endif %} {% endfor %} @@ -197,11 +213,11 @@ add:
|doables |comments
conditions{{ macros.simple_checkbox_table("condition", process.conditions, "condition", "condition_candidates") }}{{ macros.simple_checkbox_table("conditions", process.conditions, "condition", "condition_candidates") }}
blockers{{ macros.simple_checkbox_table("blocker", process.blockers, "condition", "condition_candidates") }}{{ macros.simple_checkbox_table("blockers", process.blockers, "condition", "condition_candidates") }}
enables
conditions{{ macros.simple_checkbox_table("condition", todo.conditions, "condition", "condition_candidates") }}{{ macros.simple_checkbox_table("conditions", todo.conditions, "condition", "condition_candidates") }}
blockers{{ macros.simple_checkbox_table("blocker", todo.blockers, "condition", "condition_candidates") }}{{ macros.simple_checkbox_table("blockers", todo.blockers, "condition", "condition_candidates") }}
enables