From: Christian Heller Date: Mon, 15 Jul 2024 06:40:48 +0000 (+0200) Subject: Extend Todo tests, overhaul Ctx library building. X-Git-Url: https://plomlompom.com/repos/%7B%7B%20web_path%20%7D%7D/static/%7B%7Bprefix%7D%7D/blog?a=commitdiff_plain;h=17e619a4517238a4ddd792b4ed65b95ddd8ae634;p=taskplom Extend Todo tests, overhaul Ctx library building. --- diff --git a/plomtask/db.py b/plomtask/db.py index 6f0d13a..7bb928e 100644 --- a/plomtask/db.py +++ b/plomtask/db.py @@ -18,6 +18,30 @@ class UnmigratedDbException(HandledException): """To identify case of unmigrated DB file.""" +class CtxReferences: + """Collects references for future library building.""" + # pylint: disable=too-few-public-methods + + def __init__(self, d: dict[str, list[int | str]]) -> None: + # NB: For tighter mypy testing, we might prefer the library argument + # to be of type dict[str, list[int] | list[str] instead. But my + # current coding knowledge only manages to make that work by turning + # the code much more complex, so let's leave it at + # that for now … + self.d = d + + def update(self, other: CtxReferences) -> bool: + """Updates other with entries in self.""" + changed = False + for cls_name, id_list in self.d.items(): + if cls_name not in other.d: + other.d[cls_name] = [] + for id_ in id_list: + if id_ not in other.d[cls_name]: + other.d[cls_name] += [id_] + return changed + + class DatabaseFile: """Represents the sqlite3 database's file.""" # pylint: disable=too-few-public-methods @@ -283,14 +307,10 @@ class BaseModel(Generic[BaseModelId]): @property def as_dict(self) -> dict[str, object]: """Return self as (json.dumps-compatible) dict.""" - library: dict[str, dict[str, object] | dict[int, object]] = {} - d: dict[str, object] = {'id': self.id_, '_library': library} + references = CtxReferences({}) + d: dict[str, object] = {'id': self.id_, '_references': references} for to_save in self.to_save_simples: - attr = getattr(self, to_save) - if hasattr(attr, 'as_dict_into_reference'): - d[to_save] = attr.as_dict_into_reference(library) - else: - d[to_save] = attr + d[to_save] = getattr(self, to_save) if len(self.to_save_versioned()) > 0: d['_versioned'] = {} for k in self.to_save_versioned(): @@ -301,48 +321,28 @@ class BaseModel(Generic[BaseModelId]): attr_name = r[2] l: list[int | str] = [] for rel in getattr(self, attr_name): - l += [rel.as_dict_into_reference(library)] + cls_name = rel.__class__.__name__ + if cls_name not in references.d: + references.d[cls_name] = [] + l += [rel.id_] + references.d[cls_name] += [rel.id_] d[attr_name] = l for k in self.add_to_dict: - d[k] = [x.as_dict_into_reference(library) + d[k] = [x.into_reference(references) for x in getattr(self, k)] return d - def as_dict_into_reference(self, - library: dict[str, dict[str | int, object]] - ) -> int | str: - """Return self.id_ while writing .as_dict into library.""" - # NB: For tighter mypy testing, we might prefer the library argument - # to be of type dict[str, dict[str, object] | dict[int, object] - # instead. But my current coding knowledge only manage to make that - # work by turning the code much more complex, so let's leave it at - # that for now … - - def into_library(library: dict[str, dict[str | int, object]], - cls_name: str, - id_: str | int, - d: dict[str, object] - ) -> None: - if cls_name not in library: - library[cls_name] = {} - if id_ in library[cls_name]: - if library[cls_name][id_] != d: - msg = 'Unexpected inequality of entries for ' +\ - f'_library at: {cls_name}/{id_}' - raise HandledException(msg) - else: - library[cls_name][id_] = d - - as_dict = self.as_dict - assert isinstance(as_dict['_library'], dict) - for cls_name, dict_of_objs in as_dict['_library'].items(): - for id_, obj in dict_of_objs.items(): - into_library(library, cls_name, id_, obj) - del as_dict['_library'] + def into_reference(self, references: CtxReferences) -> int | str: + """Return self.id_ and write into references for class..""" + cls_name = self.__class__.__name__ + if cls_name not in references.d: + references.d[cls_name] = [] assert self.id_ is not None - into_library(library, self.__class__.__name__, self.id_, as_dict) - assert isinstance(as_dict['id'], (int, str)) - return as_dict['id'] + references.d[cls_name] += [self.id_] + own_refs = self.as_dict['_references'] + assert isinstance(own_refs, CtxReferences) + own_refs.update(references) + return self.id_ @classmethod def name_lowercase(cls) -> str: diff --git a/plomtask/http.py b/plomtask/http.py index fbe4856..61bea84 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -1,6 +1,5 @@ """Web server stuff.""" from __future__ import annotations -from dataclasses import dataclass from typing import Any, Callable from base64 import b64encode, b64decode from binascii import Error as binascii_Exception @@ -14,10 +13,10 @@ from plomtask.dating import date_in_n_days from plomtask.days import Day from plomtask.exceptions import (HandledException, BadFormatException, NotFoundException) -from plomtask.db import DatabaseConnection, DatabaseFile +from plomtask.db import DatabaseConnection, DatabaseFile, CtxReferences from plomtask.processes import Process, ProcessStep, ProcessStepsNode from plomtask.conditions import Condition -from plomtask.todos import Todo +from plomtask.todos import Todo, TodoStepsNode TEMPLATES_DIR = 'templates' @@ -39,37 +38,72 @@ class TaskServer(HTTPServer): self.headers += [('Content-Type', 'application/json')] @staticmethod - def ctx_to_json(ctx: dict[str, object]) -> str: + def ctx_to_json(ctx: dict[str, object], conn: DatabaseConnection) -> str: """Render ctx into JSON string.""" - def walk_ctx(node: object) -> Any: - if hasattr(node, 'as_dict_into_reference'): + + def walk_ctx(node: object, references: CtxReferences) -> Any: + if hasattr(node, 'into_reference'): if hasattr(node, 'id_') and node.id_ is not None: - return node.as_dict_into_reference(library) + library_growing[0] = True + return node.into_reference(references) if hasattr(node, 'as_dict'): - return node.as_dict + d = node.as_dict + if '_references' in d: + own_refs = d['_references'] + if own_refs.update(references): + library_growing[0] = True + del d['_references'] + return d if isinstance(node, (list, tuple)): - return [walk_ctx(x) for x in node] + return [walk_ctx(x, references) for x in node] if isinstance(node, dict): d = {} for k, v in node.items(): - d[k] = walk_ctx(v) + d[k] = walk_ctx(v, references) return d if isinstance(node, HandledException): return str(node) return node - library: dict[str, dict[str, object] | dict[int, object]] = {} - for k, v in ctx.items(): - ctx[k] = walk_ctx(v) + + models = {} + for cls in [Day, Process, ProcessStep, Condition, Todo]: + models[cls.__name__] = cls + library: dict[str, dict[str | int, object]] = {} + references = CtxReferences({}) + library_growing = [True] + while library_growing[0]: + library_growing[0] = False + for k, v in ctx.items(): + ctx[k] = walk_ctx(v, references) + for cls_name, ids in references.d.items(): + if cls_name not in library: + library[cls_name] = {} + for id_ in ids: + cls = models[cls_name] + assert hasattr(cls, 'can_create_by_id') + if cls.can_create_by_id: + assert hasattr(cls, 'by_id_or_create') + d = cls.by_id_or_create(conn, id_).as_dict + else: + assert hasattr(cls, 'by_id') + d = cls.by_id(conn, id_).as_dict + del d['_references'] + library[cls_name][id_] = d + references.d[cls_name] = [] ctx['_library'] = library return json_dumps(ctx) - def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str: + def render(self, + ctx: dict[str, object], + tmpl_name: str, + conn: DatabaseConnection + ) -> str: """Render ctx according to self._render_mode..""" tmpl_name = f'{tmpl_name}.{self._render_mode}' if 'html' == self._render_mode: template = self._jinja.get_template(tmpl_name) return template.render(ctx) - return self.__class__.ctx_to_json(ctx) + return self.__class__.ctx_to_json(ctx, conn) class InputsParser: @@ -179,7 +213,7 @@ class TaskHandler(BaseHTTPRequestHandler): code: int = 200 ) -> None: """Send ctx as proper HTTP response.""" - body = self.server.render(ctx, tmpl_name) + body = self.server.render(ctx, tmpl_name, self.conn) self.send_response(code) for header_tuple in self.server.headers: self.send_header(*header_tuple) @@ -359,15 +393,6 @@ class TaskHandler(BaseHTTPRequestHandler): def do_GET_todo(self, todo: Todo) -> dict[str, object]: """Show single Todo of ?id=.""" - @dataclass - class TodoStepsNode: - """Collect what's useful for Todo steps tree display.""" - id_: int - todo: Todo | None - process: Process | None - children: list[TodoStepsNode] # pylint: disable=undefined-variable - fillable: bool = False - def walk_process_steps(id_: int, process_step_nodes: list[ProcessStepsNode], steps_nodes: list[TodoStepsNode]) -> None: diff --git a/plomtask/todos.py b/plomtask/todos.py index 1f55ae7..06d57ab 100644 --- a/plomtask/todos.py +++ b/plomtask/todos.py @@ -34,6 +34,38 @@ class TodoNode: 'children': [c.as_dict for c in self.children]} +class TodoStepsNode: + """Collect what's useful for Todo steps tree display.""" + # pylint: disable=too-few-public-methods + id_: int + todo: Todo | None + process: Process | None + children: list[TodoStepsNode] # pylint: disable=undefined-variable + fillable: bool + + def __init__(self, + id_: int, + todo: Todo | None, + process: Process | None, + children: list[TodoStepsNode], + fillable: bool = False): + # pylint: disable=too-many-arguments + self.id_ = id_ + self.todo = todo + self.process = process + self.children = children + self.fillable = fillable + + @property + def as_dict(self) -> dict[str, object]: + """Return self as (json.dumps-compatible) dict.""" + return {'id': self.id_, + 'todo': self.todo.id_ if self.todo else None, + 'process': self.process.id_ if self.process else None, + 'children': [c.as_dict for c in self.children], + 'fillable': self.fillable} + + class Todo(BaseModel[int], ConditionsRelations): """Individual actionable.""" # pylint: disable=too-many-instance-attributes diff --git a/tests/days.py b/tests/days.py index 3297032..c36a9ef 100644 --- a/tests/days.py +++ b/tests/days.py @@ -112,32 +112,6 @@ class TestsWithServer(TestCaseWithServer): def _day_as_dict(date: str) -> dict[str, object]: return {'id': date, 'comment': '', 'todos': []} - @staticmethod - def _todo_as_dict(id_: int = 1, - process_id: int = 1, - date: str = '2024-01-01', - conditions: None | list[int] = None, - disables: None | list[int] = None, - blockers: None | list[int] = None, - enables: None | list[int] = None - ) -> dict[str, object]: - """Return JSON of Todo to expect.""" - # pylint: disable=too-many-arguments - d = {'id': id_, - 'date': date, - 'process_id': process_id, - 'is_done': False, - 'calendarize': False, - 'comment': '', - 'children': [], - 'parents': [], - 'effort': None, - 'conditions': conditions if conditions else [], - 'disables': disables if disables else [], - 'blockers': blockers if blockers else [], - 'enables': enables if enables else []} - return d - @staticmethod def _todo_node_as_dict(todo_id: int) -> dict[str, object]: """Return JSON of TodoNode to expect.""" @@ -178,7 +152,6 @@ class TestsWithServer(TestCaseWithServer): @classmethod def GET_day_dict(cls, date: str) -> dict[str, object]: """Return JSON of GET /day to expect.""" - # day: dict[str, object] = {'id': date, 'comment': '', 'todos': []} day = cls._day_as_dict(date) d: dict[str, object] = {'day': date, 'top_nodes': [], @@ -324,8 +297,7 @@ class TestsWithServer(TestCaseWithServer): # post Todos of either process and check their display post_day: dict[str, object] post_day = {'day_comment': '', 'make_type': '', 'new_todo': [1, 2]} - todos = [self._todo_as_dict(1, 1, date), - self._todo_as_dict(2, 2, date)] + todos = [self.todo_as_dict(1, 1, date), self.todo_as_dict(2, 2, date)] expected['_library']['Todo'] = self.as_refs(todos) expected['_library']['Day'][date]['todos'] = self.as_id_list(todos) nodes = [self._todo_node_as_dict(1), self._todo_node_as_dict(2)] @@ -376,8 +348,8 @@ class TestsWithServer(TestCaseWithServer): post_day: dict[str, object] post_day = {'day_comment': '', 'make_type': '', 'new_todo': [1, 2]} todos = [ # id, process_id, date, conds, disables, blockers, enables - self._todo_as_dict(1, 1, date, [1], [1], [2], [2]), - self._todo_as_dict(2, 2, date, [2], [2], [1], [1])] + self.todo_as_dict(1, 1, date, [1], [1], [2], [2]), + self.todo_as_dict(2, 2, date, [2], [2], [1], [1])] expected['_library']['Todo'] = self.as_refs(todos) expected['_library']['Day'][date]['todos'] = self.as_id_list(todos) nodes = [self._todo_node_as_dict(1), self._todo_node_as_dict(2)] diff --git a/tests/todos.py b/tests/todos.py index 66c4ff3..27bf1a4 100644 --- a/tests/todos.py +++ b/tests/todos.py @@ -1,4 +1,5 @@ """Test Todos module.""" +from typing import Any from tests.utils import TestCaseSansDB, TestCaseWithDB, TestCaseWithServer from plomtask.todos import Todo, TodoNode from plomtask.processes import Process, ProcessStep @@ -238,7 +239,24 @@ class TestsWithServer(TestCaseWithServer): def setUp(self) -> None: super().setUp() - self._proc1_form_data = self.post_process(1) + self._proc1_form_data: Any = self.post_process(1) + + @classmethod + def GET_todo_dict(cls, + target_id: int, + todos: list[dict[str, object]], + processes: list[dict[str, object]] + ) -> dict[str, object]: + """Return JSON of GET /todo to expect.""" + library = {'Todo': cls.as_refs(todos), + 'Process': cls.as_refs(processes)} + return {'todo': target_id, + 'steps_todo_to_process': [], + 'adoption_candidates_for': {}, + 'process_candidates': [p['id'] for p in processes], + 'todo_candidates': [], + 'condition_candidates': [], + '_library': library} def test_basic_fail_POST_todo(self) -> None: """Test basic malformed/illegal POST /todo requests.""" @@ -261,15 +279,96 @@ class TestsWithServer(TestCaseWithServer): self.check_post({'adopt': 1}, '/todo?id=1', 400) self.check_post({'adopt': 2}, '/todo?id=1', 404) + def test_do_POST_todo(self) -> None: + """Test POST /todo.""" + date = '2024-01-01' + day_post = {'day_comment': '', 'new_todo': 1, 'make_type': 'full'} + self.check_post(day_post, f'/day?date={date}&make_type=full') + # test posting naked entity at first changes nothing + todo_dict = self.todo_as_dict(1, process_id=1, date=date) + proc_dict = self.proc_as_dict(**self._proc1_form_data) + expected = self.GET_todo_dict(1, [todo_dict], [proc_dict]) + self.check_json_get('/todo?id=1', expected) + self.check_post({}, '/todo?id=1') + self.check_json_get('/todo?id=1', expected) + # test posting doneness + todo_dict['is_done'] = True + self.check_post({'done': ''}, '/todo?id=1') + self.check_json_get('/todo?id=1', expected) + # test implicitly posting non-doneness + self.check_post({}, '/todo?id=1') + todo_dict['is_done'] = False + self.check_json_get('/todo?id=1', expected) + # post new Todo to Day and adopt it + self.check_post(day_post, f'/day?date={date}&make_type=full') + todo2_dict = self.todo_as_dict(2, process_id=1, date=date) + expected['todo_candidates'] = [2] + assert isinstance(expected['_library'], dict) + expected['_library']['Todo']['2'] = todo2_dict + expected['_library']['Todo']['2']['parents'] = [1] + expected['_library']['Todo']['1']['children'] = [2] + expected['steps_todo_to_process'] = [{ + 'children': [], + 'fillable': False, + 'id': 1, + 'process': None, + 'todo': 2}] + self.check_post({'adopt': 2}, '/todo?id=1') + self.check_json_get('/todo?id=1', expected) + # # test todo1 cannot be set done with todo2 not done yet + self.check_post({'adopt': 2, 'done': ''}, '/todo?id=1', 400) + self.check_json_get('/todo?id=1', expected) + # # test todo1 un-adopting todo 2 by just not sending an adopt + self.check_post({}, '/todo?id=1') + expected['_library']['Todo']['2']['parents'] = [] + expected['_library']['Todo']['1']['children'] = [] + expected['steps_todo_to_process'] = [] + self.check_json_get('/todo?id=1', expected) + # test todo2 deletion + self.check_post({'delete': ''}, '/todo?id=2', 302, '/') + del expected['_library']['Todo']['2'] + expected['todo_candidates'] = [] + self.check_json_get('/todo?id=1', expected) + + def test_do_GET_todo(self) -> None: + """Test GET /todo response codes.""" + date = '2024-01-01' + day_post = {'day_comment': '', 'new_todo': 1, 'make_type': 'full'} + self.check_post(day_post, f'/day?date={date}&make_type=full') + # test malformed or illegal parameter values + self.check_get('/todo', 404) + self.check_get('/todo?id=', 404) + self.check_get('/todo?id=foo', 400) + self.check_get('/todo?id=0', 404) + self.check_get('/todo?id=2', 404) + # test all existing Processes are shown as available + p2_post: Any = {'title': 'bar', 'description': 'baz', 'effort': 0.9} + self.post_process(2, p2_post) + todo1_dict = self.todo_as_dict(1, process_id=1, date=date) + proc1_dict = self.proc_as_dict(1, **self._proc1_form_data) + proc2_dict = self.proc_as_dict(2, **p2_post) + expected = self.GET_todo_dict(1, [todo1_dict], [proc1_dict, + proc2_dict]) + self.check_json_get('/todo?id=1', expected) + # post new Todo to Day and expect visibility as candidate + self.check_post(day_post, f'/day?date={date}&make_type=full') + todo2_dict = self.todo_as_dict(2, process_id=1, date=date) + assert isinstance(expected['_library'], dict) + expected['_library']['Todo']['2'] = todo2_dict + expected['todo_candidates'] = [2] + self.check_json_get('/todo?id=1', expected) + def test_do_POST_day(self) -> None: """Test Todo posting of POST /day.""" self.post_process(2) proc = Process.by_id(self.db_conn, 1) proc2 = Process.by_id(self.db_conn, 2) + # check posting no Todos to Day makes Todo.by_date return empty list form_data = {'day_comment': '', 'make_type': 'full'} self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302) self.assertEqual(Todo.by_date(self.db_conn, '2024-01-01'), []) proc = Process.by_id(self.db_conn, 1) + # post Todo to Day and check its display form_data['new_todo'] = str(proc.id_) self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302) todos = Todo.by_date(self.db_conn, '2024-01-01') @@ -279,6 +378,7 @@ class TestsWithServer(TestCaseWithServer): proc = Process.by_id(self.db_conn, 1) self.assertEqual(todo1.process.id_, proc.id_) self.assertEqual(todo1.is_done, False) + # post second Todo, check its appearance proc2 = Process.by_id(self.db_conn, 2) form_data['new_todo'] = str(proc2.id_) self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302) @@ -291,50 +391,6 @@ class TestsWithServer(TestCaseWithServer): self.assertEqual(todo1.process.id_, proc2.id_) self.assertEqual(todo1.is_done, False) - def test_do_POST_todo(self) -> None: - """Test POST /todo.""" - def post_and_reload(form_data: dict[str, object], status: int = 302, - redir_url: str = '/todo?id=1') -> Todo: - self.check_post(form_data, '/todo?id=1', status, redir_url) - return Todo.by_date(self.db_conn, '2024-01-01')[0] - self.check_post({'day_comment': '', 'new_todo': 1, - 'make_type': 'full'}, - '/day?date=2024-01-01&make_type=full', 302) - # test posting naked entity - todo1 = post_and_reload({}) - self.assertEqual(todo1.children, []) - self.assertEqual(todo1.parents, []) - self.assertEqual(todo1.is_done, False) - # test posting doneness - todo1 = post_and_reload({'done': ''}) - self.assertEqual(todo1.is_done, True) - # test implicitly posting non-doneness - todo1 = post_and_reload({}) - self.assertEqual(todo1.is_done, False) - # test posting second todo of same process - self.check_post({'day_comment': '', 'new_todo': 1, - 'make_type': 'full'}, - '/day?date=2024-01-01&make_type=full', 302) - # test todo 1 adopting todo 2 - todo1 = post_and_reload({'adopt': 2}) - todo2 = Todo.by_date(self.db_conn, '2024-01-01')[1] - self.assertEqual(todo1.children, [todo2]) - self.assertEqual(todo1.parents, []) - self.assertEqual(todo2.children, []) - self.assertEqual(todo2.parents, [todo1]) - # test todo1 cannot be set done with todo2 not done yet - todo1 = post_and_reload({'done': '', 'adopt': 2}, 400) - self.assertEqual(todo1.is_done, False) - # test todo1 un-adopting todo 2 by just not sending an adopt - todo1 = post_and_reload({}, 302) - todo2 = Todo.by_date(self.db_conn, '2024-01-01')[1] - self.assertEqual(todo1.children, []) - self.assertEqual(todo1.parents, []) - self.assertEqual(todo2.children, []) - self.assertEqual(todo2.parents, []) - # test todo1 deletion - todo1 = post_and_reload({'delete': ''}, 302, '/') - def test_do_POST_day_todo_adoption(self) -> None: """Test Todos posted to Day view may adopt existing Todos.""" form_data = self.post_process( @@ -427,14 +483,3 @@ class TestsWithServer(TestCaseWithServer): self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302) todo = Todo.by_date(self.db_conn, '2024-01-01')[0] self.assertEqual(todo.is_done, True) - - def test_do_GET_todo(self) -> None: - """Test GET /todo response codes.""" - form_data = {'day_comment': '', 'new_todo': 1, 'make_type': 'full'} - self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302) - self.check_get('/todo', 404) - self.check_get('/todo?id=', 404) - self.check_get('/todo?id=foo', 400) - self.check_get('/todo?id=0', 404) - self.check_get('/todo?id=1', 200) - self.check_get('/todo?id=2', 404) diff --git a/tests/utils.py b/tests/utils.py index b969424..10d6591 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -575,6 +575,32 @@ class TestCaseWithServer(TestCaseWithDB): d['_versioned']['description'][i] = description return d + @staticmethod + def todo_as_dict(id_: int = 1, + process_id: int = 1, + date: str = '2024-01-01', + conditions: None | list[int] = None, + disables: None | list[int] = None, + blockers: None | list[int] = None, + enables: None | list[int] = None + ) -> dict[str, object]: + """Return JSON of Todo to expect.""" + # pylint: disable=too-many-arguments + d = {'id': id_, + 'date': date, + 'process_id': process_id, + 'is_done': False, + 'calendarize': False, + 'comment': '', + 'children': [], + 'parents': [], + 'effort': None, + 'conditions': conditions if conditions else [], + 'disables': disables if disables else [], + 'blockers': blockers if blockers else [], + 'enables': enables if enables else []} + return d + @staticmethod def proc_as_dict(id_: int = 1, title: str = 'A', @@ -672,4 +698,7 @@ class TestCaseWithServer(TestCaseWithDB): self.assertEqual(response.status, 200) retrieved = json_loads(response.read().decode()) rewrite_history_keys_in(retrieved) + # import pprint + # pprint.pprint(expected) + # pprint.pprint(retrieved) self.assertEqual(expected, retrieved)