home · contact · privacy
Overhaul/simplify ctx to JSON dictification.
authorChristian Heller <c.heller@plomlompom.de>
Tue, 16 Jul 2024 21:56:58 +0000 (23:56 +0200)
committerChristian Heller <c.heller@plomlompom.de>
Tue, 16 Jul 2024 21:56:58 +0000 (23:56 +0200)
plomtask/db.py
plomtask/http.py
plomtask/todos.py
templates/todo.html
tests/todos.py

index 7bb928ecde9428de82b15eeab40a4ba296affe44..27dc117273f23c44cd4f9af7934f769810b0d3de 100644 (file)
@@ -18,30 +18,6 @@ class UnmigratedDbException(HandledException):
     """To identify case of unmigrated DB file."""
 
 
-class CtxReferences:
-    """Collects references for future library building."""
-    # pylint: disable=too-few-public-methods
-
-    def __init__(self, d: dict[str, list[int | str]]) -> None:
-        # NB: For tighter mypy testing, we might prefer the library argument
-        # to be of type dict[str, list[int] | list[str] instead. But my
-        # current coding knowledge only manages to make that work by turning
-        # the code much more complex, so let's leave it at
-        # that for now …
-        self.d = d
-
-    def update(self, other: CtxReferences) -> bool:
-        """Updates other with entries in self."""
-        changed = False
-        for cls_name, id_list in self.d.items():
-            if cls_name not in other.d:
-                other.d[cls_name] = []
-            for id_ in id_list:
-                if id_ not in other.d[cls_name]:
-                    other.d[cls_name] += [id_]
-        return changed
-
-
 class DatabaseFile:
     """Represents the sqlite3 database's file."""
     # pylint: disable=too-few-public-methods
@@ -305,10 +281,10 @@ class BaseModel(Generic[BaseModelId]):
         return list(cls.versioned_defaults.keys())
 
     @property
-    def as_dict(self) -> dict[str, object]:
-        """Return self as (json.dumps-compatible) dict."""
-        references = CtxReferences({})
-        d: dict[str, object] = {'id': self.id_, '_references': references}
+    def as_dict_and_refs(self) -> tuple[dict[str, object], list[Any]]:
+        """Return self as json.dumps-ready dict, list of referenced objects."""
+        d: dict[str, object] = {'id': self.id_}
+        refs: list[Any] = []
         for to_save in self.to_save_simples:
             d[to_save] = getattr(self, to_save)
         if len(self.to_save_versioned()) > 0:
@@ -317,32 +293,16 @@ class BaseModel(Generic[BaseModelId]):
             attr = getattr(self, k)
             assert isinstance(d['_versioned'], dict)
             d['_versioned'][k] = attr.history
-        for r in self.to_save_relations:
-            attr_name = r[2]
-            l: list[int | str] = []
-            for rel in getattr(self, attr_name):
-                cls_name = rel.__class__.__name__
-                if cls_name not in references.d:
-                    references.d[cls_name] = []
-                l += [rel.id_]
-                references.d[cls_name] += [rel.id_]
-            d[attr_name] = l
-        for k in self.add_to_dict:
-            d[k] = [x.into_reference(references)
-                    for x in getattr(self, k)]
-        return d
-
-    def into_reference(self, references: CtxReferences) -> int | str:
-        """Return self.id_ and write into references for class.."""
-        cls_name = self.__class__.__name__
-        if cls_name not in references.d:
-            references.d[cls_name] = []
-        assert self.id_ is not None
-        references.d[cls_name] += [self.id_]
-        own_refs = self.as_dict['_references']
-        assert isinstance(own_refs, CtxReferences)
-        own_refs.update(references)
-        return self.id_
+        rels_to_collect = [rel[2] for rel in self.to_save_relations]
+        rels_to_collect += self.add_to_dict
+        for attr_name in rels_to_collect:
+            rel_list = []
+            for item in getattr(self, attr_name):
+                rel_list += [item.id_]
+                if item not in refs:
+                    refs += [item]
+            d[attr_name] = rel_list
+        return d, refs
 
     @classmethod
     def name_lowercase(cls) -> str:
index 8fa6f937a915fce0e204ed401f1371432da3a742..31646626b9016178cc300d2eea59e217ce5a9dbb 100644 (file)
@@ -13,10 +13,10 @@ from plomtask.dating import date_in_n_days
 from plomtask.days import Day
 from plomtask.exceptions import (HandledException, BadFormatException,
                                  NotFoundException)
-from plomtask.db import DatabaseConnection, DatabaseFile, CtxReferences
+from plomtask.db import DatabaseConnection, DatabaseFile, BaseModel
 from plomtask.processes import Process, ProcessStep, ProcessStepsNode
 from plomtask.conditions import Condition
-from plomtask.todos import Todo, TodoStepsNode
+from plomtask.todos import Todo, TodoStepsNode, DictableNode
 
 TEMPLATES_DIR = 'templates'
 
@@ -156,55 +156,43 @@ class TaskHandler(BaseHTTPRequestHandler):
     def _ctx_to_json(self, ctx: dict[str, object]) -> str:
         """Render ctx into JSON string."""
 
-        def walk_ctx(node: object, references: CtxReferences) -> Any:
-            if hasattr(node, 'into_reference'):
-                if hasattr(node, 'id_') and node.id_ is not None:
-                    library_growing[0] = True
-                    return node.into_reference(references)
-            if hasattr(node, 'as_dict'):
-                d = node.as_dict
-                if '_references' in d:
-                    own_refs = d['_references']
-                    if own_refs.update(references):
-                        library_growing[0] = True
-                    del d['_references']
+        def flatten(node: object, library: dict[str, dict[str | int, object]]
+                    ) -> Any:
+
+            def update_library_with(item: Any,
+                                    library: dict[str, dict[str | int, object]]
+                                    ) -> None:
+                cls_name = item.__class__.__name__
+                if cls_name not in library:
+                    library[cls_name] = {}
+                if item.id_ not in library[cls_name]:
+                    d, refs = item.as_dict_and_refs
+                    library[cls_name][item.id_] = d
+                    for ref in refs:
+                        update_library_with(ref, library)
+
+            if isinstance(node, BaseModel):
+                update_library_with(node, library)
+                return node.id_
+            if isinstance(node, DictableNode):
+                d, refs = node.as_dict_and_refs
+                for ref in refs:
+                    update_library_with(ref, library)
                 return d
             if isinstance(node, (list, tuple)):
-                return [walk_ctx(x, references) for x in node]
+                return [flatten(item, library) for item in node]
             if isinstance(node, dict):
                 d = {}
                 for k, v in node.items():
-                    d[k] = walk_ctx(v, references)
+                    d[k] = flatten(v, library)
                 return d
             if isinstance(node, HandledException):
                 return str(node)
             return node
 
-        models = {}
-        for cls in [Day, Process, ProcessStep, Condition, Todo]:
-            models[cls.__name__] = cls
         library: dict[str, dict[str | int, object]] = {}
-        references = CtxReferences({})
-        library_growing = [True]
-        while library_growing[0]:
-            library_growing[0] = False
-            for k, v in ctx.items():
-                ctx[k] = walk_ctx(v, references)
-            for cls_name, ids in references.d.items():
-                if cls_name not in library:
-                    library[cls_name] = {}
-                for id_ in ids:
-                    cls = models[cls_name]
-                    assert hasattr(cls, 'can_create_by_id')
-                    if cls.can_create_by_id:
-                        assert hasattr(cls, 'by_id_or_create')
-                        d = cls.by_id_or_create(self.conn, id_).as_dict
-                    else:
-                        assert hasattr(cls, 'by_id')
-                        d = cls.by_id(self.conn, id_).as_dict
-                    del d['_references']
-                    library[cls_name][id_] = d
-                references.d[cls_name] = []
+        for k, v in ctx.items():
+            ctx[k] = flatten(v, library)
         ctx['_library'] = library
         return json_dumps(ctx)
 
@@ -381,17 +369,19 @@ class TaskHandler(BaseHTTPRequestHandler):
     def do_GET_todo(self, todo: Todo) -> dict[str, object]:
         """Show single Todo of ?id=."""
 
-        def walk_process_steps(id_: int,
+        def walk_process_steps(node_id: int,
                                process_step_nodes: list[ProcessStepsNode],
                                steps_nodes: list[TodoStepsNode]) -> None:
             for process_step_node in process_step_nodes:
-                id_ += 1
-                node = TodoStepsNode(id_, None, process_step_node.process, [])
+                node_id += 1
+                node = TodoStepsNode(node_id, None, process_step_node.process,
+                                     [])
                 steps_nodes += [node]
-                walk_process_steps(id_, list(process_step_node.steps.values()),
+                walk_process_steps(node_id,
+                                   list(process_step_node.steps.values()),
                                    node.children)
 
-        def walk_todo_steps(id_: int, todos: list[Todo],
+        def walk_todo_steps(node_id: int, todos: list[Todo],
                             steps_nodes: list[TodoStepsNode]) -> None:
             for todo in todos:
                 matched = False
@@ -402,12 +392,12 @@ class TaskHandler(BaseHTTPRequestHandler):
                     matched = True
                     for child in match.children:
                         child.fillable = True
-                    walk_todo_steps(id_, todo.children, match.children)
+                    walk_todo_steps(node_id, todo.children, match.children)
                 if not matched:
-                    id_ += 1
-                    node = TodoStepsNode(id_, todo, None, [])
+                    node_id += 1
+                    node = TodoStepsNode(node_id, todo, None, [])
                     steps_nodes += [node]
-                    walk_todo_steps(id_, todo.children, node.children)
+                    walk_todo_steps(node_id, todo.children, node.children)
 
         def collect_adoptables_keys(steps_nodes: list[TodoStepsNode]
                                     ) -> set[int]:
index 06d57ab355aa5562391e6e4541728cc01e99418a..4138abcd3051e3a7faf6a8165d04137b3c9cb01c 100644 (file)
@@ -11,59 +11,55 @@ from plomtask.exceptions import (NotFoundException, BadFormatException,
 from plomtask.dating import valid_date
 
 
-class TodoNode:
+class DictableNode:
+    """Template for TodoNode, TodoStepsNode providing .as_dict_and_refs."""
+    # pylint: disable=too-few-public-methods
+    _to_dict: list[str] = []
+
+    def __init__(self, *args: Any) -> None:
+        for i, arg in enumerate(args):
+            setattr(self, self._to_dict[i], arg)
+
+    @property
+    def as_dict_and_refs(self) -> tuple[dict[str, object], list[Any]]:
+        """Return self as json.dumps-ready dict, list of referenced objects."""
+        d = {}
+        refs = []
+        for name in self._to_dict:
+            attr = getattr(self, name)
+            if hasattr(attr, 'id_'):
+                d[name] = attr.id_
+                continue
+            if isinstance(attr, list):
+                d[name] = []
+                for item in attr:
+                    item_d, item_refs = item.as_dict_and_refs
+                    d[name] += [item_d]
+                    for item_ref in [r for r in item_refs if r not in refs]:
+                        refs += [item_ref]
+                continue
+            d[name] = attr
+        return d, refs
+
+
+class TodoNode(DictableNode):
     """Collects what's useful to know for Todo/Condition tree display."""
     # pylint: disable=too-few-public-methods
     todo: Todo
     seen: bool
     children: list[TodoNode]
-
-    def __init__(self,
-                 todo: Todo,
-                 seen: bool,
-                 children: list[TodoNode]) -> None:
-        self.todo = todo
-        self.seen = seen
-        self.children = children
-
-    @property
-    def as_dict(self) -> dict[str, object]:
-        """Return self as (json.dumps-coompatible) dict."""
-        return {'todo': self.todo.id_,
-                'seen': self.seen,
-                'children': [c.as_dict for c in self.children]}
+    _to_dict = ['todo', 'seen', 'children']
 
 
-class TodoStepsNode:
+class TodoStepsNode(DictableNode):
     """Collect what's useful for Todo steps tree display."""
     # pylint: disable=too-few-public-methods
-    id_: int
+    node_id: int
     todo: Todo | None
     process: Process | None
     children: list[TodoStepsNode]  # pylint: disable=undefined-variable
-    fillable: bool
-
-    def __init__(self,
-                 id_: int,
-                 todo: Todo | None,
-                 process: Process | None,
-                 children: list[TodoStepsNode],
-                 fillable: bool = False):
-        # pylint: disable=too-many-arguments
-        self.id_ = id_
-        self.todo = todo
-        self.process = process
-        self.children = children
-        self.fillable = fillable
-
-    @property
-    def as_dict(self) -> dict[str, object]:
-        """Return self as (json.dumps-compatible) dict."""
-        return {'id': self.id_,
-                'todo': self.todo.id_ if self.todo else None,
-                'process': self.process.id_ if self.process else None,
-                'children': [c.as_dict for c in self.children],
-                'fillable': self.fillable}
+    fillable: bool = False
+    _to_dict = ['node_id', 'todo', 'process', 'children', 'fillable']
 
 
 class Todo(BaseModel[int], ConditionsRelations):
index fea931ab83ddf57536ab375ce3773a3f656204ce..45e247f304e20fc3656dd0fb5a63feb4c4a7c027 100644 (file)
@@ -23,7 +23,7 @@ select{ font-size: 0.5em; margin: 0; padding: 0; }
 {% else %}
 {{item.process.title.newest|e}}
 {% if indent == 0 %}
-· fill: <select name="fill_for_{{item.id_}}">
+· fill: <select name="fill_for_{{item.node_id}}">
 <option value="ignore">--</option>
 <option value="make_empty_{{item.process.id_}}">make empty</option>
 <option value="make_full_{{item.process.id_}}">make full</option>
index 27bf1a464610bbe1da8ec7d076c26282550aad0f..01297d479700b483c56bc1c1cfd60f41f3f74928 100644 (file)
@@ -121,16 +121,24 @@ class TestsWithDB(TestCaseWithDB, TestCaseSansDB):
 
     def test_Todo_step_tree(self) -> None:
         """Test self-configuration of TodoStepsNode tree for Day view."""
+
+        def todo_node_as_dict(node: TodoNode) -> dict[str, object]:
+            return {'todo': node.todo.id_, 'seen': node.seen,
+                    'children': [todo_node_as_dict(c) for c in node.children]}
+
         todo_1 = Todo(None, self.proc, False, self.date1)
         todo_1.save(self.db_conn)
         assert isinstance(todo_1.id_, int)
         # test minimum
         node_0 = TodoNode(todo_1, False, [])
-        self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict)
+        cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree(set()))
+        cmp_1_dict = todo_node_as_dict(node_0)
+        self.assertEqual(cmp_0_dict, cmp_1_dict)
         # test non_emtpy seen_todo does something
         node_0.seen = True
-        self.assertEqual(todo_1.get_step_tree({todo_1.id_}).as_dict,
-                         node_0.as_dict)
+        cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree({todo_1.id_}))
+        cmp_1_dict = todo_node_as_dict(node_0)
+        self.assertEqual(cmp_0_dict, cmp_1_dict)
         # test child shows up
         todo_2 = Todo(None, self.proc, False, self.date1)
         todo_2.save(self.db_conn)
@@ -139,7 +147,9 @@ class TestsWithDB(TestCaseWithDB, TestCaseSansDB):
         node_2 = TodoNode(todo_2, False, [])
         node_0.children = [node_2]
         node_0.seen = False
-        self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict)
+        cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree(set()))
+        cmp_1_dict = todo_node_as_dict(node_0)
+        self.assertEqual(cmp_0_dict, cmp_1_dict)
         # test child shows up with child
         todo_3 = Todo(None, self.proc, False, self.date1)
         todo_3.save(self.db_conn)
@@ -147,12 +157,16 @@ class TestsWithDB(TestCaseWithDB, TestCaseSansDB):
         todo_2.add_child(todo_3)
         node_3 = TodoNode(todo_3, False, [])
         node_2.children = [node_3]
-        self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict)
+        cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree(set()))
+        cmp_1_dict = todo_node_as_dict(node_0)
+        self.assertEqual(cmp_0_dict, cmp_1_dict)
         # test same todo can be child-ed multiple times at different locations
         todo_1.add_child(todo_3)
         node_4 = TodoNode(todo_3, True, [])
         node_0.children += [node_4]
-        self.assertEqual(todo_1.get_step_tree(set()).as_dict, node_0.as_dict)
+        cmp_0_dict = todo_node_as_dict(todo_1.get_step_tree(set()))
+        cmp_1_dict = todo_node_as_dict(node_0)
+        self.assertEqual(cmp_0_dict, cmp_1_dict)
 
     def test_Todo_create_with_children(self) -> None:
         """Test parenthood guaranteeds of Todo.create_with_children."""
@@ -310,7 +324,7 @@ class TestsWithServer(TestCaseWithServer):
         expected['steps_todo_to_process'] = [{
             'children': [],
             'fillable': False,
-            'id': 1,
+            'node_id': 1,
             'process': None,
             'todo': 2}]
         self.check_post({'adopt': 2}, '/todo?id=1')