"""To identify case of unmigrated DB file."""
+class CtxReferences:
+ """Collects references for future library building."""
+ # pylint: disable=too-few-public-methods
+
+ def __init__(self, d: dict[str, list[int | str]]) -> None:
+ # NB: For tighter mypy testing, we might prefer the library argument
+ # to be of type dict[str, list[int] | list[str] instead. But my
+ # current coding knowledge only manages to make that work by turning
+ # the code much more complex, so let's leave it at
+ # that for now …
+ self.d = d
+
+ def update(self, other: CtxReferences) -> bool:
+ """Updates other with entries in self."""
+ changed = False
+ for cls_name, id_list in self.d.items():
+ if cls_name not in other.d:
+ other.d[cls_name] = []
+ for id_ in id_list:
+ if id_ not in other.d[cls_name]:
+ other.d[cls_name] += [id_]
+ return changed
+
+
class DatabaseFile:
"""Represents the sqlite3 database's file."""
# pylint: disable=too-few-public-methods
@property
def as_dict(self) -> dict[str, object]:
"""Return self as (json.dumps-compatible) dict."""
- library: dict[str, dict[str, object] | dict[int, object]] = {}
- d: dict[str, object] = {'id': self.id_, '_library': library}
+ references = CtxReferences({})
+ d: dict[str, object] = {'id': self.id_, '_references': references}
for to_save in self.to_save_simples:
- attr = getattr(self, to_save)
- if hasattr(attr, 'as_dict_into_reference'):
- d[to_save] = attr.as_dict_into_reference(library)
- else:
- d[to_save] = attr
+ d[to_save] = getattr(self, to_save)
if len(self.to_save_versioned()) > 0:
d['_versioned'] = {}
for k in self.to_save_versioned():
attr_name = r[2]
l: list[int | str] = []
for rel in getattr(self, attr_name):
- l += [rel.as_dict_into_reference(library)]
+ cls_name = rel.__class__.__name__
+ if cls_name not in references.d:
+ references.d[cls_name] = []
+ l += [rel.id_]
+ references.d[cls_name] += [rel.id_]
d[attr_name] = l
for k in self.add_to_dict:
- d[k] = [x.as_dict_into_reference(library)
+ d[k] = [x.into_reference(references)
for x in getattr(self, k)]
return d
- def as_dict_into_reference(self,
- library: dict[str, dict[str | int, object]]
- ) -> int | str:
- """Return self.id_ while writing .as_dict into library."""
- # NB: For tighter mypy testing, we might prefer the library argument
- # to be of type dict[str, dict[str, object] | dict[int, object]
- # instead. But my current coding knowledge only manage to make that
- # work by turning the code much more complex, so let's leave it at
- # that for now …
-
- def into_library(library: dict[str, dict[str | int, object]],
- cls_name: str,
- id_: str | int,
- d: dict[str, object]
- ) -> None:
- if cls_name not in library:
- library[cls_name] = {}
- if id_ in library[cls_name]:
- if library[cls_name][id_] != d:
- msg = 'Unexpected inequality of entries for ' +\
- f'_library at: {cls_name}/{id_}'
- raise HandledException(msg)
- else:
- library[cls_name][id_] = d
-
- as_dict = self.as_dict
- assert isinstance(as_dict['_library'], dict)
- for cls_name, dict_of_objs in as_dict['_library'].items():
- for id_, obj in dict_of_objs.items():
- into_library(library, cls_name, id_, obj)
- del as_dict['_library']
+ def into_reference(self, references: CtxReferences) -> int | str:
+ """Return self.id_ and write into references for class.."""
+ cls_name = self.__class__.__name__
+ if cls_name not in references.d:
+ references.d[cls_name] = []
assert self.id_ is not None
- into_library(library, self.__class__.__name__, self.id_, as_dict)
- assert isinstance(as_dict['id'], (int, str))
- return as_dict['id']
+ references.d[cls_name] += [self.id_]
+ own_refs = self.as_dict['_references']
+ assert isinstance(own_refs, CtxReferences)
+ own_refs.update(references)
+ return self.id_
@classmethod
def name_lowercase(cls) -> str:
"""Web server stuff."""
from __future__ import annotations
-from dataclasses import dataclass
from typing import Any, Callable
from base64 import b64encode, b64decode
from binascii import Error as binascii_Exception
from plomtask.days import Day
from plomtask.exceptions import (HandledException, BadFormatException,
NotFoundException)
-from plomtask.db import DatabaseConnection, DatabaseFile
+from plomtask.db import DatabaseConnection, DatabaseFile, CtxReferences
from plomtask.processes import Process, ProcessStep, ProcessStepsNode
from plomtask.conditions import Condition
-from plomtask.todos import Todo
+from plomtask.todos import Todo, TodoStepsNode
TEMPLATES_DIR = 'templates'
self.headers += [('Content-Type', 'application/json')]
@staticmethod
- def ctx_to_json(ctx: dict[str, object]) -> str:
+ def ctx_to_json(ctx: dict[str, object], conn: DatabaseConnection) -> str:
"""Render ctx into JSON string."""
- def walk_ctx(node: object) -> Any:
- if hasattr(node, 'as_dict_into_reference'):
+
+ def walk_ctx(node: object, references: CtxReferences) -> Any:
+ if hasattr(node, 'into_reference'):
if hasattr(node, 'id_') and node.id_ is not None:
- return node.as_dict_into_reference(library)
+ library_growing[0] = True
+ return node.into_reference(references)
if hasattr(node, 'as_dict'):
- return node.as_dict
+ d = node.as_dict
+ if '_references' in d:
+ own_refs = d['_references']
+ if own_refs.update(references):
+ library_growing[0] = True
+ del d['_references']
+ return d
if isinstance(node, (list, tuple)):
- return [walk_ctx(x) for x in node]
+ return [walk_ctx(x, references) for x in node]
if isinstance(node, dict):
d = {}
for k, v in node.items():
- d[k] = walk_ctx(v)
+ d[k] = walk_ctx(v, references)
return d
if isinstance(node, HandledException):
return str(node)
return node
- library: dict[str, dict[str, object] | dict[int, object]] = {}
- for k, v in ctx.items():
- ctx[k] = walk_ctx(v)
+
+ models = {}
+ for cls in [Day, Process, ProcessStep, Condition, Todo]:
+ models[cls.__name__] = cls
+ library: dict[str, dict[str | int, object]] = {}
+ references = CtxReferences({})
+ library_growing = [True]
+ while library_growing[0]:
+ library_growing[0] = False
+ for k, v in ctx.items():
+ ctx[k] = walk_ctx(v, references)
+ for cls_name, ids in references.d.items():
+ if cls_name not in library:
+ library[cls_name] = {}
+ for id_ in ids:
+ cls = models[cls_name]
+ assert hasattr(cls, 'can_create_by_id')
+ if cls.can_create_by_id:
+ assert hasattr(cls, 'by_id_or_create')
+ d = cls.by_id_or_create(conn, id_).as_dict
+ else:
+ assert hasattr(cls, 'by_id')
+ d = cls.by_id(conn, id_).as_dict
+ del d['_references']
+ library[cls_name][id_] = d
+ references.d[cls_name] = []
ctx['_library'] = library
return json_dumps(ctx)
- def render(self, ctx: dict[str, object], tmpl_name: str = '') -> str:
+ def render(self,
+ ctx: dict[str, object],
+ tmpl_name: str,
+ conn: DatabaseConnection
+ ) -> str:
"""Render ctx according to self._render_mode.."""
tmpl_name = f'{tmpl_name}.{self._render_mode}'
if 'html' == self._render_mode:
template = self._jinja.get_template(tmpl_name)
return template.render(ctx)
- return self.__class__.ctx_to_json(ctx)
+ return self.__class__.ctx_to_json(ctx, conn)
class InputsParser:
code: int = 200
) -> None:
"""Send ctx as proper HTTP response."""
- body = self.server.render(ctx, tmpl_name)
+ body = self.server.render(ctx, tmpl_name, self.conn)
self.send_response(code)
for header_tuple in self.server.headers:
self.send_header(*header_tuple)
def do_GET_todo(self, todo: Todo) -> dict[str, object]:
"""Show single Todo of ?id=."""
- @dataclass
- class TodoStepsNode:
- """Collect what's useful for Todo steps tree display."""
- id_: int
- todo: Todo | None
- process: Process | None
- children: list[TodoStepsNode] # pylint: disable=undefined-variable
- fillable: bool = False
-
def walk_process_steps(id_: int,
process_step_nodes: list[ProcessStepsNode],
steps_nodes: list[TodoStepsNode]) -> None:
'children': [c.as_dict for c in self.children]}
+class TodoStepsNode:
+ """Collect what's useful for Todo steps tree display."""
+ # pylint: disable=too-few-public-methods
+ id_: int
+ todo: Todo | None
+ process: Process | None
+ children: list[TodoStepsNode] # pylint: disable=undefined-variable
+ fillable: bool
+
+ def __init__(self,
+ id_: int,
+ todo: Todo | None,
+ process: Process | None,
+ children: list[TodoStepsNode],
+ fillable: bool = False):
+ # pylint: disable=too-many-arguments
+ self.id_ = id_
+ self.todo = todo
+ self.process = process
+ self.children = children
+ self.fillable = fillable
+
+ @property
+ def as_dict(self) -> dict[str, object]:
+ """Return self as (json.dumps-compatible) dict."""
+ return {'id': self.id_,
+ 'todo': self.todo.id_ if self.todo else None,
+ 'process': self.process.id_ if self.process else None,
+ 'children': [c.as_dict for c in self.children],
+ 'fillable': self.fillable}
+
+
class Todo(BaseModel[int], ConditionsRelations):
"""Individual actionable."""
# pylint: disable=too-many-instance-attributes
def _day_as_dict(date: str) -> dict[str, object]:
return {'id': date, 'comment': '', 'todos': []}
- @staticmethod
- def _todo_as_dict(id_: int = 1,
- process_id: int = 1,
- date: str = '2024-01-01',
- conditions: None | list[int] = None,
- disables: None | list[int] = None,
- blockers: None | list[int] = None,
- enables: None | list[int] = None
- ) -> dict[str, object]:
- """Return JSON of Todo to expect."""
- # pylint: disable=too-many-arguments
- d = {'id': id_,
- 'date': date,
- 'process_id': process_id,
- 'is_done': False,
- 'calendarize': False,
- 'comment': '',
- 'children': [],
- 'parents': [],
- 'effort': None,
- 'conditions': conditions if conditions else [],
- 'disables': disables if disables else [],
- 'blockers': blockers if blockers else [],
- 'enables': enables if enables else []}
- return d
-
@staticmethod
def _todo_node_as_dict(todo_id: int) -> dict[str, object]:
"""Return JSON of TodoNode to expect."""
@classmethod
def GET_day_dict(cls, date: str) -> dict[str, object]:
"""Return JSON of GET /day to expect."""
- # day: dict[str, object] = {'id': date, 'comment': '', 'todos': []}
day = cls._day_as_dict(date)
d: dict[str, object] = {'day': date,
'top_nodes': [],
# post Todos of either process and check their display
post_day: dict[str, object]
post_day = {'day_comment': '', 'make_type': '', 'new_todo': [1, 2]}
- todos = [self._todo_as_dict(1, 1, date),
- self._todo_as_dict(2, 2, date)]
+ todos = [self.todo_as_dict(1, 1, date), self.todo_as_dict(2, 2, date)]
expected['_library']['Todo'] = self.as_refs(todos)
expected['_library']['Day'][date]['todos'] = self.as_id_list(todos)
nodes = [self._todo_node_as_dict(1), self._todo_node_as_dict(2)]
post_day: dict[str, object]
post_day = {'day_comment': '', 'make_type': '', 'new_todo': [1, 2]}
todos = [ # id, process_id, date, conds, disables, blockers, enables
- self._todo_as_dict(1, 1, date, [1], [1], [2], [2]),
- self._todo_as_dict(2, 2, date, [2], [2], [1], [1])]
+ self.todo_as_dict(1, 1, date, [1], [1], [2], [2]),
+ self.todo_as_dict(2, 2, date, [2], [2], [1], [1])]
expected['_library']['Todo'] = self.as_refs(todos)
expected['_library']['Day'][date]['todos'] = self.as_id_list(todos)
nodes = [self._todo_node_as_dict(1), self._todo_node_as_dict(2)]
"""Test Todos module."""
+from typing import Any
from tests.utils import TestCaseSansDB, TestCaseWithDB, TestCaseWithServer
from plomtask.todos import Todo, TodoNode
from plomtask.processes import Process, ProcessStep
def setUp(self) -> None:
super().setUp()
- self._proc1_form_data = self.post_process(1)
+ self._proc1_form_data: Any = self.post_process(1)
+
+ @classmethod
+ def GET_todo_dict(cls,
+ target_id: int,
+ todos: list[dict[str, object]],
+ processes: list[dict[str, object]]
+ ) -> dict[str, object]:
+ """Return JSON of GET /todo to expect."""
+ library = {'Todo': cls.as_refs(todos),
+ 'Process': cls.as_refs(processes)}
+ return {'todo': target_id,
+ 'steps_todo_to_process': [],
+ 'adoption_candidates_for': {},
+ 'process_candidates': [p['id'] for p in processes],
+ 'todo_candidates': [],
+ 'condition_candidates': [],
+ '_library': library}
def test_basic_fail_POST_todo(self) -> None:
"""Test basic malformed/illegal POST /todo requests."""
self.check_post({'adopt': 1}, '/todo?id=1', 400)
self.check_post({'adopt': 2}, '/todo?id=1', 404)
+ def test_do_POST_todo(self) -> None:
+ """Test POST /todo."""
+ date = '2024-01-01'
+ day_post = {'day_comment': '', 'new_todo': 1, 'make_type': 'full'}
+ self.check_post(day_post, f'/day?date={date}&make_type=full')
+ # test posting naked entity at first changes nothing
+ todo_dict = self.todo_as_dict(1, process_id=1, date=date)
+ proc_dict = self.proc_as_dict(**self._proc1_form_data)
+ expected = self.GET_todo_dict(1, [todo_dict], [proc_dict])
+ self.check_json_get('/todo?id=1', expected)
+ self.check_post({}, '/todo?id=1')
+ self.check_json_get('/todo?id=1', expected)
+ # test posting doneness
+ todo_dict['is_done'] = True
+ self.check_post({'done': ''}, '/todo?id=1')
+ self.check_json_get('/todo?id=1', expected)
+ # test implicitly posting non-doneness
+ self.check_post({}, '/todo?id=1')
+ todo_dict['is_done'] = False
+ self.check_json_get('/todo?id=1', expected)
+ # post new Todo to Day and adopt it
+ self.check_post(day_post, f'/day?date={date}&make_type=full')
+ todo2_dict = self.todo_as_dict(2, process_id=1, date=date)
+ expected['todo_candidates'] = [2]
+ assert isinstance(expected['_library'], dict)
+ expected['_library']['Todo']['2'] = todo2_dict
+ expected['_library']['Todo']['2']['parents'] = [1]
+ expected['_library']['Todo']['1']['children'] = [2]
+ expected['steps_todo_to_process'] = [{
+ 'children': [],
+ 'fillable': False,
+ 'id': 1,
+ 'process': None,
+ 'todo': 2}]
+ self.check_post({'adopt': 2}, '/todo?id=1')
+ self.check_json_get('/todo?id=1', expected)
+ # # test todo1 cannot be set done with todo2 not done yet
+ self.check_post({'adopt': 2, 'done': ''}, '/todo?id=1', 400)
+ self.check_json_get('/todo?id=1', expected)
+ # # test todo1 un-adopting todo 2 by just not sending an adopt
+ self.check_post({}, '/todo?id=1')
+ expected['_library']['Todo']['2']['parents'] = []
+ expected['_library']['Todo']['1']['children'] = []
+ expected['steps_todo_to_process'] = []
+ self.check_json_get('/todo?id=1', expected)
+ # test todo2 deletion
+ self.check_post({'delete': ''}, '/todo?id=2', 302, '/')
+ del expected['_library']['Todo']['2']
+ expected['todo_candidates'] = []
+ self.check_json_get('/todo?id=1', expected)
+
+ def test_do_GET_todo(self) -> None:
+ """Test GET /todo response codes."""
+ date = '2024-01-01'
+ day_post = {'day_comment': '', 'new_todo': 1, 'make_type': 'full'}
+ self.check_post(day_post, f'/day?date={date}&make_type=full')
+ # test malformed or illegal parameter values
+ self.check_get('/todo', 404)
+ self.check_get('/todo?id=', 404)
+ self.check_get('/todo?id=foo', 400)
+ self.check_get('/todo?id=0', 404)
+ self.check_get('/todo?id=2', 404)
+ # test all existing Processes are shown as available
+ p2_post: Any = {'title': 'bar', 'description': 'baz', 'effort': 0.9}
+ self.post_process(2, p2_post)
+ todo1_dict = self.todo_as_dict(1, process_id=1, date=date)
+ proc1_dict = self.proc_as_dict(1, **self._proc1_form_data)
+ proc2_dict = self.proc_as_dict(2, **p2_post)
+ expected = self.GET_todo_dict(1, [todo1_dict], [proc1_dict,
+ proc2_dict])
+ self.check_json_get('/todo?id=1', expected)
+ # post new Todo to Day and expect visibility as candidate
+ self.check_post(day_post, f'/day?date={date}&make_type=full')
+ todo2_dict = self.todo_as_dict(2, process_id=1, date=date)
+ assert isinstance(expected['_library'], dict)
+ expected['_library']['Todo']['2'] = todo2_dict
+ expected['todo_candidates'] = [2]
+ self.check_json_get('/todo?id=1', expected)
+
def test_do_POST_day(self) -> None:
"""Test Todo posting of POST /day."""
self.post_process(2)
proc = Process.by_id(self.db_conn, 1)
proc2 = Process.by_id(self.db_conn, 2)
+ # check posting no Todos to Day makes Todo.by_date return empty list
form_data = {'day_comment': '', 'make_type': 'full'}
self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
self.assertEqual(Todo.by_date(self.db_conn, '2024-01-01'), [])
proc = Process.by_id(self.db_conn, 1)
+ # post Todo to Day and check its display
form_data['new_todo'] = str(proc.id_)
self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
todos = Todo.by_date(self.db_conn, '2024-01-01')
proc = Process.by_id(self.db_conn, 1)
self.assertEqual(todo1.process.id_, proc.id_)
self.assertEqual(todo1.is_done, False)
+ # post second Todo, check its appearance
proc2 = Process.by_id(self.db_conn, 2)
form_data['new_todo'] = str(proc2.id_)
self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
self.assertEqual(todo1.process.id_, proc2.id_)
self.assertEqual(todo1.is_done, False)
- def test_do_POST_todo(self) -> None:
- """Test POST /todo."""
- def post_and_reload(form_data: dict[str, object], status: int = 302,
- redir_url: str = '/todo?id=1') -> Todo:
- self.check_post(form_data, '/todo?id=1', status, redir_url)
- return Todo.by_date(self.db_conn, '2024-01-01')[0]
- self.check_post({'day_comment': '', 'new_todo': 1,
- 'make_type': 'full'},
- '/day?date=2024-01-01&make_type=full', 302)
- # test posting naked entity
- todo1 = post_and_reload({})
- self.assertEqual(todo1.children, [])
- self.assertEqual(todo1.parents, [])
- self.assertEqual(todo1.is_done, False)
- # test posting doneness
- todo1 = post_and_reload({'done': ''})
- self.assertEqual(todo1.is_done, True)
- # test implicitly posting non-doneness
- todo1 = post_and_reload({})
- self.assertEqual(todo1.is_done, False)
- # test posting second todo of same process
- self.check_post({'day_comment': '', 'new_todo': 1,
- 'make_type': 'full'},
- '/day?date=2024-01-01&make_type=full', 302)
- # test todo 1 adopting todo 2
- todo1 = post_and_reload({'adopt': 2})
- todo2 = Todo.by_date(self.db_conn, '2024-01-01')[1]
- self.assertEqual(todo1.children, [todo2])
- self.assertEqual(todo1.parents, [])
- self.assertEqual(todo2.children, [])
- self.assertEqual(todo2.parents, [todo1])
- # test todo1 cannot be set done with todo2 not done yet
- todo1 = post_and_reload({'done': '', 'adopt': 2}, 400)
- self.assertEqual(todo1.is_done, False)
- # test todo1 un-adopting todo 2 by just not sending an adopt
- todo1 = post_and_reload({}, 302)
- todo2 = Todo.by_date(self.db_conn, '2024-01-01')[1]
- self.assertEqual(todo1.children, [])
- self.assertEqual(todo1.parents, [])
- self.assertEqual(todo2.children, [])
- self.assertEqual(todo2.parents, [])
- # test todo1 deletion
- todo1 = post_and_reload({'delete': ''}, 302, '/')
-
def test_do_POST_day_todo_adoption(self) -> None:
"""Test Todos posted to Day view may adopt existing Todos."""
form_data = self.post_process(
self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
todo = Todo.by_date(self.db_conn, '2024-01-01')[0]
self.assertEqual(todo.is_done, True)
-
- def test_do_GET_todo(self) -> None:
- """Test GET /todo response codes."""
- form_data = {'day_comment': '', 'new_todo': 1, 'make_type': 'full'}
- self.check_post(form_data, '/day?date=2024-01-01&make_type=full', 302)
- self.check_get('/todo', 404)
- self.check_get('/todo?id=', 404)
- self.check_get('/todo?id=foo', 400)
- self.check_get('/todo?id=0', 404)
- self.check_get('/todo?id=1', 200)
- self.check_get('/todo?id=2', 404)
d['_versioned']['description'][i] = description
return d
+ @staticmethod
+ def todo_as_dict(id_: int = 1,
+ process_id: int = 1,
+ date: str = '2024-01-01',
+ conditions: None | list[int] = None,
+ disables: None | list[int] = None,
+ blockers: None | list[int] = None,
+ enables: None | list[int] = None
+ ) -> dict[str, object]:
+ """Return JSON of Todo to expect."""
+ # pylint: disable=too-many-arguments
+ d = {'id': id_,
+ 'date': date,
+ 'process_id': process_id,
+ 'is_done': False,
+ 'calendarize': False,
+ 'comment': '',
+ 'children': [],
+ 'parents': [],
+ 'effort': None,
+ 'conditions': conditions if conditions else [],
+ 'disables': disables if disables else [],
+ 'blockers': blockers if blockers else [],
+ 'enables': enables if enables else []}
+ return d
+
@staticmethod
def proc_as_dict(id_: int = 1,
title: str = 'A',
self.assertEqual(response.status, 200)
retrieved = json_loads(response.read().decode())
rewrite_history_keys_in(retrieved)
+ # import pprint
+ # pprint.pprint(expected)
+ # pprint.pprint(retrieved)
self.assertEqual(expected, retrieved)