From: Christian Heller Date: Tue, 16 Jul 2024 21:56:58 +0000 (+0200) Subject: Overhaul/simplify ctx to JSON dictification. X-Git-Url: https://plomlompom.com/repos/%7B%7Bprefix%7D%7D/process?a=commitdiff_plain;h=aecabbb18e92cf89d6da06249a42907116b1f48f;p=plomtask Overhaul/simplify ctx to JSON dictification. --- diff --git a/plomtask/db.py b/plomtask/db.py index 7bb928e..27dc117 100644 --- a/plomtask/db.py +++ b/plomtask/db.py @@ -18,30 +18,6 @@ class UnmigratedDbException(HandledException): """To identify case of unmigrated DB file.""" -class CtxReferences: - """Collects references for future library building.""" - # pylint: disable=too-few-public-methods - - def __init__(self, d: dict[str, list[int | str]]) -> None: - # NB: For tighter mypy testing, we might prefer the library argument - # to be of type dict[str, list[int] | list[str] instead. But my - # current coding knowledge only manages to make that work by turning - # the code much more complex, so let's leave it at - # that for now … - self.d = d - - def update(self, other: CtxReferences) -> bool: - """Updates other with entries in self.""" - changed = False - for cls_name, id_list in self.d.items(): - if cls_name not in other.d: - other.d[cls_name] = [] - for id_ in id_list: - if id_ not in other.d[cls_name]: - other.d[cls_name] += [id_] - return changed - - class DatabaseFile: """Represents the sqlite3 database's file.""" # pylint: disable=too-few-public-methods @@ -305,10 +281,10 @@ class BaseModel(Generic[BaseModelId]): return list(cls.versioned_defaults.keys()) @property - def as_dict(self) -> dict[str, object]: - """Return self as (json.dumps-compatible) dict.""" - references = CtxReferences({}) - d: dict[str, object] = {'id': self.id_, '_references': references} + def as_dict_and_refs(self) -> tuple[dict[str, object], list[Any]]: + """Return self as json.dumps-ready dict, list of referenced objects.""" + d: dict[str, object] = {'id': self.id_} + refs: list[Any] = [] for to_save in self.to_save_simples: d[to_save] = getattr(self, to_save) if len(self.to_save_versioned()) > 0: @@ -317,32 +293,16 @@ class BaseModel(Generic[BaseModelId]): attr = getattr(self, k) assert isinstance(d['_versioned'], dict) d['_versioned'][k] = attr.history - for r in self.to_save_relations: - attr_name = r[2] - l: list[int | str] = [] - for rel in getattr(self, attr_name): - cls_name = rel.__class__.__name__ - if cls_name not in references.d: - references.d[cls_name] = [] - l += [rel.id_] - references.d[cls_name] += [rel.id_] - d[attr_name] = l - for k in self.add_to_dict: - d[k] = [x.into_reference(references) - for x in getattr(self, k)] - return d - - def into_reference(self, references: CtxReferences) -> int | str: - """Return self.id_ and write into references for class..""" - cls_name = self.__class__.__name__ - if cls_name not in references.d: - references.d[cls_name] = [] - assert self.id_ is not None - references.d[cls_name] += [self.id_] - own_refs = self.as_dict['_references'] - assert isinstance(own_refs, CtxReferences) - own_refs.update(references) - return self.id_ + rels_to_collect = [rel[2] for rel in self.to_save_relations] + rels_to_collect += self.add_to_dict + for attr_name in rels_to_collect: + rel_list = [] + for item in getattr(self, attr_name): + rel_list += [item.id_] + if item not in refs: + refs += [item] + d[attr_name] = rel_list + return d, refs @classmethod def name_lowercase(cls) -> str: diff --git a/plomtask/http.py b/plomtask/http.py index 8fa6f93..3164662 100644 --- a/plomtask/http.py +++ b/plomtask/http.py @@ -13,10 +13,10 @@ from plomtask.dating import date_in_n_days from plomtask.days import Day from plomtask.exceptions import (HandledException, BadFormatException, NotFoundException) -from plomtask.db import DatabaseConnection, DatabaseFile, CtxReferences +from plomtask.db import DatabaseConnection, DatabaseFile, BaseModel from plomtask.processes import Process, ProcessStep, ProcessStepsNode from plomtask.conditions import Condition -from plomtask.todos import Todo, TodoStepsNode +from plomtask.todos import Todo, TodoStepsNode, DictableNode TEMPLATES_DIR = 'templates' @@ -156,55 +156,43 @@ class TaskHandler(BaseHTTPRequestHandler): def _ctx_to_json(self, ctx: dict[str, object]) -> str: """Render ctx into JSON string.""" - def walk_ctx(node: object, references: CtxReferences) -> Any: - if hasattr(node, 'into_reference'): - if hasattr(node, 'id_') and node.id_ is not None: - library_growing[0] = True - return node.into_reference(references) - if hasattr(node, 'as_dict'): - d = node.as_dict - if '_references' in d: - own_refs = d['_references'] - if own_refs.update(references): - library_growing[0] = True - del d['_references'] + def flatten(node: object, library: dict[str, dict[str | int, object]] + ) -> Any: + + def update_library_with(item: Any, + library: dict[str, dict[str | int, object]] + ) -> None: + cls_name = item.__class__.__name__ + if cls_name not in library: + library[cls_name] = {} + if item.id_ not in library[cls_name]: + d, refs = item.as_dict_and_refs + library[cls_name][item.id_] = d + for ref in refs: + update_library_with(ref, library) + + if isinstance(node, BaseModel): + update_library_with(node, library) + return node.id_ + if isinstance(node, DictableNode): + d, refs = node.as_dict_and_refs + for ref in refs: + update_library_with(ref, library) return d if isinstance(node, (list, tuple)): - return [walk_ctx(x, references) for x in node] + return [flatten(item, library) for item in node] if isinstance(node, dict): d = {} for k, v in node.items(): - d[k] = walk_ctx(v, references) + d[k] = flatten(v, library) return d if isinstance(node, HandledException): return str(node) return node - models = {} - for cls in [Day, Process, ProcessStep, Condition, Todo]: - models[cls.__name__] = cls library: dict[str, dict[str | int, object]] = {} - references = CtxReferences({}) - library_growing = [True] - while library_growing[0]: - library_growing[0] = False - for k, v in ctx.items(): - ctx[k] = walk_ctx(v, references) - for cls_name, ids in references.d.items(): - if cls_name not in library: - library[cls_name] = {} - for id_ in ids: - cls = models[cls_name] - assert hasattr(cls, 'can_create_by_id') - if cls.can_create_by_id: - assert hasattr(cls, 'by_id_or_create') - d = cls.by_id_or_create(self.conn, id_).as_dict - else: - assert hasattr(cls, 'by_id') - d = cls.by_id(self.conn, id_).as_dict - del d['_references'] - library[cls_name][id_] = d - references.d[cls_name] = [] + for k, v in ctx.items(): + ctx[k] = flatten(v, library) ctx['_library'] = library return json_dumps(ctx) @@ -381,17 +369,19 @@ class TaskHandler(BaseHTTPRequestHandler): def do_GET_todo(self, todo: Todo) -> dict[str, object]: """Show single Todo of ?id=.""" - def walk_process_steps(id_: int, + def walk_process_steps(node_id: int, process_step_nodes: list[ProcessStepsNode], steps_nodes: list[TodoStepsNode]) -> None: for process_step_node in process_step_nodes: - id_ += 1 - node = TodoStepsNode(id_, None, process_step_node.process, []) + node_id += 1 + node = TodoStepsNode(node_id, None, process_step_node.process, + []) steps_nodes += [node] - walk_process_steps(id_, list(process_step_node.steps.values()), + walk_process_steps(node_id, + list(process_step_node.steps.values()), node.children) - def walk_todo_steps(id_: int, todos: list[Todo], + def walk_todo_steps(node_id: int, todos: list[Todo], steps_nodes: list[TodoStepsNode]) -> None: for todo in todos: matched = False @@ -402,12 +392,12 @@ class TaskHandler(BaseHTTPRequestHandler): matched = True for child in match.children: child.fillable = True - walk_todo_steps(id_, todo.children, match.children) + walk_todo_steps(node_id, todo.children, match.children) if not matched: - id_ += 1 - node = TodoStepsNode(id_, todo, None, []) + node_id += 1 + node = TodoStepsNode(node_id, todo, None, []) steps_nodes += [node] - walk_todo_steps(id_, todo.children, node.children) + walk_todo_steps(node_id, todo.children, node.children) def collect_adoptables_keys(steps_nodes: list[TodoStepsNode] ) -> set[int]: diff --git a/plomtask/todos.py b/plomtask/todos.py index 06d57ab..4138abc 100644 --- a/plomtask/todos.py +++ b/plomtask/todos.py @@ -11,59 +11,55 @@ from plomtask.exceptions import (NotFoundException, BadFormatException, from plomtask.dating import valid_date -class TodoNode: +class DictableNode: + """Template for TodoNode, TodoStepsNode providing .as_dict_and_refs.""" + # pylint: disable=too-few-public-methods + _to_dict: list[str] = [] + + def __init__(self, *args: Any) -> None: + for i, arg in enumerate(args): + setattr(self, self._to_dict[i], arg) + + @property + def as_dict_and_refs(self) -> tuple[dict[str, object], list[Any]]: + """Return self as json.dumps-ready dict, list of referenced objects.""" + d = {} + refs = [] + for name in self._to_dict: + attr = getattr(self, name) + if hasattr(attr, 'id_'): + d[name] = attr.id_ + continue + if isinstance(attr, list): + d[name] = [] + for item in attr: + item_d, item_refs = item.as_dict_and_refs + d[name] += [item_d] + for item_ref in [r for r in item_refs if r not in refs]: + refs += [item_ref] + continue + d[name] = attr + return d, refs + + +class TodoNode(DictableNode): """Collects what's useful to know for Todo/Condition tree display.""" # pylint: disable=too-few-public-methods todo: Todo seen: bool children: list[TodoNode] - - def __init__(self, - todo: Todo, - seen: bool, - children: list[TodoNode]) -> None: - self.todo = todo - self.seen = seen - self.children = children - - @property - def as_dict(self) -> dict[str, object]: - """Return self as (json.dumps-coompatible) dict.""" - return {'todo': self.todo.id_, - 'seen': self.seen, - 'children': [c.as_dict for c in self.children]} + _to_dict = ['todo', 'seen', 'children'] -class TodoStepsNode: +class TodoStepsNode(DictableNode): """Collect what's useful for Todo steps tree display.""" # pylint: disable=too-few-public-methods - id_: int + node_id: int todo: Todo | None process: Process | None children: list[TodoStepsNode] # pylint: disable=undefined-variable - fillable: bool - - def __init__(self, - id_: int, - todo: Todo | None, - process: Process | None, - children: list[TodoStepsNode], - fillable: bool = False): - # pylint: disable=too-many-arguments - self.id_ = id_ - self.todo = todo - self.process = process - self.children = children - self.fillable = fillable - - @property - def as_dict(self) -> dict[str, object]: - """Return self as (json.dumps-compatible) dict.""" - return {'id': self.id_, - 'todo': self.todo.id_ if self.todo else None, - 'process': self.process.id_ if self.process else None, - 'children': [c.as_dict for c in self.children], - 'fillable': self.fillable} + fillable: bool = False + _to_dict = ['node_id', 'todo', 'process', 'children', 'fillable'] class Todo(BaseModel[int], ConditionsRelations): diff --git a/templates/todo.html b/templates/todo.html index fea931a..45e247f 100644 --- a/templates/todo.html +++ b/templates/todo.html @@ -23,7 +23,7 @@ select{ font-size: 0.5em; margin: 0; padding: 0; } {% else %} {{item.process.title.newest|e}} {% if indent == 0 %} -· fill: diff --git a/tests/todos.py b/tests/todos.py index 27bf1a4..01297d4 100644 --- a/tests/todos.py +++ b/tests/todos.py @@ -121,16 +121,24 @@ class TestsWithDB(TestCaseWithDB, TestCaseSansDB): def test_Todo_step_tree(self) -> None: """Test self-configuration of TodoStepsNode tree for Day view.""" + + def todo_node_as_dict(node: TodoNode) -> dict[str, object]: + return {'todo': node.todo.id_, 'seen': node.seen, + 'children': [todo_node_as_dict(c) for c in node.children]} + todo_1 = Todo(None, self.proc, False, self.date1) todo_1.save(self.db_conn) assert isinstance(todo_1.id_, int) # test minimum node_0 = TodoNode(todo_1, False, []) - self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict) + cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree(set())) + cmp_1_dict = todo_node_as_dict(node_0) + self.assertEqual(cmp_0_dict, cmp_1_dict) # test non_emtpy seen_todo does something node_0.seen = True - self.assertEqual(todo_1.get_step_tree({todo_1.id_}).as_dict, - node_0.as_dict) + cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree({todo_1.id_})) + cmp_1_dict = todo_node_as_dict(node_0) + self.assertEqual(cmp_0_dict, cmp_1_dict) # test child shows up todo_2 = Todo(None, self.proc, False, self.date1) todo_2.save(self.db_conn) @@ -139,7 +147,9 @@ class TestsWithDB(TestCaseWithDB, TestCaseSansDB): node_2 = TodoNode(todo_2, False, []) node_0.children = [node_2] node_0.seen = False - self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict) + cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree(set())) + cmp_1_dict = todo_node_as_dict(node_0) + self.assertEqual(cmp_0_dict, cmp_1_dict) # test child shows up with child todo_3 = Todo(None, self.proc, False, self.date1) todo_3.save(self.db_conn) @@ -147,12 +157,16 @@ class TestsWithDB(TestCaseWithDB, TestCaseSansDB): todo_2.add_child(todo_3) node_3 = TodoNode(todo_3, False, []) node_2.children = [node_3] - self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict) + cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree(set())) + cmp_1_dict = todo_node_as_dict(node_0) + self.assertEqual(cmp_0_dict, cmp_1_dict) # test same todo can be child-ed multiple times at different locations todo_1.add_child(todo_3) node_4 = TodoNode(todo_3, True, []) node_0.children += [node_4] - self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict) + cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree(set())) + cmp_1_dict = todo_node_as_dict(node_0) + self.assertEqual(cmp_0_dict, cmp_1_dict) def test_Todo_create_with_children(self) -> None: """Test parenthood guaranteeds of Todo.create_with_children.""" @@ -310,7 +324,7 @@ class TestsWithServer(TestCaseWithServer): expected['steps_todo_to_process'] = [{ 'children': [], 'fillable': False, - 'id': 1, + 'node_id': 1, 'process': None, 'todo': 2}] self.check_post({'adopt': 2}, '/todo?id=1')