home · contact · privacy
Re-write caching.
[plomtask] / plomtask / conditions.py
1 """Non-doable elements of ProcessStep/Todo chains."""
2 from __future__ import annotations
3 from typing import Any
4 from sqlite3 import Row
5 from plomtask.db import DatabaseConnection, BaseModel
6 from plomtask.misc import VersionedAttribute
7 from plomtask.exceptions import NotFoundException
8
9
10 class Condition(BaseModel[int]):
11     """Non Process-dependency for ProcessSteps and Todos."""
12     table_name = 'conditions'
13     to_save = ['is_active']
14
15     def __init__(self, id_: int | None, is_active: bool = False) -> None:
16         self.set_int_id(id_)
17         self.is_active = is_active
18         self.title = VersionedAttribute(self, 'condition_titles', 'UNNAMED')
19         self.description = VersionedAttribute(self, 'condition_descriptions',
20                                               '')
21
22     @classmethod
23     def from_table_row(cls, db_conn: DatabaseConnection,
24                        row: Row | list[Any]) -> Condition:
25         """Build condition from row, including VersionedAttributes."""
26         condition = super().from_table_row(db_conn, row)
27         assert isinstance(condition, Condition)
28         for name in ('title', 'description'):
29             table_name = f'condition_{name}s'
30             for row_ in db_conn.row_where(table_name, 'parent', row[0]):
31                 getattr(condition, name).history_from_row(row_)
32         return condition
33
34     @classmethod
35     def all(cls, db_conn: DatabaseConnection) -> list[Condition]:
36         """Collect all Conditions and their VersionedAttributes."""
37         conditions = {}
38         for id_, condition in cls.cache_.items():
39             conditions[id_] = condition
40         already_recorded = conditions.keys()
41         for id_ in db_conn.column_all('conditions', 'id'):
42             if id_ not in already_recorded:
43                 condition = cls.by_id(db_conn, id_)
44                 assert isinstance(condition.id_, int)
45                 conditions[condition.id_] = condition
46         return list(conditions.values())
47
48     @classmethod
49     def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
50               create: bool = False) -> Condition:
51         """Collect (or create) Condition and its VersionedAttributes."""
52         condition = None
53         if id_:
54             condition, _ = super()._by_id(db_conn, id_)
55         if not condition:
56             if not create:
57                 raise NotFoundException(f'Condition not found of id: {id_}')
58             condition = cls(id_, False)
59             condition.save(db_conn)
60         assert isinstance(condition, Condition)
61         return condition
62
63     def save(self, db_conn: DatabaseConnection) -> None:
64         """Save self and its VersionedAttributes to DB and cache."""
65         self.save_core(db_conn)
66         self.title.save(db_conn)
67         self.description.save(db_conn)
68         assert isinstance(self.id_, int)
69
70
71 class ConditionsRelations:
72     """Methods for handling relations to Conditions, for Todo and Process."""
73
74     def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
75                        target: str = 'conditions') -> None:
76         """Set self.[target] to Conditions identified by ids."""
77         target_list = getattr(self, target)
78         while len(target_list) > 0:
79             target_list.pop()
80         for id_ in ids:
81             target_list += [Condition.by_id(db_conn, id_)]
82
83     def set_enables(self, db_conn: DatabaseConnection,
84                     ids: list[int]) -> None:
85         """Set self.enables to Conditions identified by ids."""
86         self.set_conditions(db_conn, ids, 'enables')
87
88     def set_disables(self, db_conn: DatabaseConnection,
89                      ids: list[int]) -> None:
90         """Set self.disables to Conditions identified by ids."""
91         self.set_conditions(db_conn, ids, 'disables')