from plomtask.db import DatabaseConnection, DatabaseFile, BaseModel
from plomtask.processes import Process, ProcessStep, ProcessStepsNode
from plomtask.conditions import Condition
-from plomtask.todos import Todo, TodoOrProcStepNode, DictableNode
+from plomtask.todos import Todo, TodoOrProcStepNode
+from plomtask.misc import DictableNode
TEMPLATES_DIR = 'templates'
steps_nodes: list[TodoOrProcStepNode]) -> int:
for process_step_node in process_step_nodes:
node_id += 1
- node = TodoOrProcStepNode(node_id, None,
- process_step_node.process, [])
+ proc = Process.by_id(self.conn,
+ process_step_node.step.step_process_id)
+ node = TodoOrProcStepNode(node_id, None, proc, [])
steps_nodes += [node]
node_id = walk_process_steps(
- node_id, list(process_step_node.steps.values()),
- node.children)
+ node_id, process_step_node.steps, node.children)
return node_id
def walk_todo_steps(node_id: int, todos: list[Todo],
todo_steps = [step.todo for step in todo.get_step_tree(set()).children]
process_tree = todo.process.get_steps(self.conn, None)
steps_todo_to_process: list[TodoOrProcStepNode] = []
- last_node_id = walk_process_steps(
- 0, list(process_tree.values()), steps_todo_to_process)
+ last_node_id = walk_process_steps(0, process_tree,
+ steps_todo_to_process)
for steps_node in steps_todo_to_process:
steps_node.fillable = True
walk_todo_steps(last_node_id, todo_steps, steps_todo_to_process)
preset_top_step = process_id
return {'process': process, 'is_new': process.id_ is None,
'preset_top_step': preset_top_step,
- 'steps': process.get_steps(self.conn), 'owners': owners,
+ 'steps': process.get_steps(self.conn),
+ 'owners': owners,
'n_todos': len(Todo.by_process_id(self.conn, process.id_)),
'process_candidates': Process.all(self.conn),
'condition_candidates': Condition.all(self.conn)}
--- /dev/null
+"""What doesn't fit elsewhere so far."""
+from typing import Any
+
+
+class DictableNode:
+ """Template for display chain nodes providing .as_dict_and_refs."""
+ # pylint: disable=too-few-public-methods
+ _to_dict: list[str] = []
+
+ def __init__(self, *args: Any) -> None:
+ for i, arg in enumerate(args):
+ setattr(self, self._to_dict[i], arg)
+
+ @property
+ def as_dict_and_refs(self) -> tuple[dict[str, object], list[Any]]:
+ """Return self as json.dumps-ready dict, list of referenced objects."""
+ d = {}
+ refs = []
+ for name in self._to_dict:
+ attr = getattr(self, name)
+ if hasattr(attr, 'id_'):
+ d[name] = attr.id_
+ continue
+ if isinstance(attr, list):
+ d[name] = []
+ for item in attr:
+ item_d, item_refs = item.as_dict_and_refs
+ d[name] += [item_d]
+ for item_ref in [r for r in item_refs if r not in refs]:
+ refs += [item_ref]
+ continue
+ d[name] = attr
+ return d, refs
"""Collecting Processes and Process-related items."""
from __future__ import annotations
-from dataclasses import dataclass
from typing import Set, Any
from sqlite3 import Row
+from plomtask.misc import DictableNode
from plomtask.db import DatabaseConnection, BaseModel
from plomtask.versioned_attributes import VersionedAttribute
from plomtask.conditions import Condition, ConditionsRelations
HandledException)
-@dataclass
-class ProcessStepsNode:
+class ProcessStepsNode(DictableNode):
"""Collects what's useful to know for ProcessSteps tree display."""
- process: Process
- parent_id: int | None
+ # pylint: disable=too-few-public-methods
+ step: ProcessStep
is_explicit: bool
- steps: dict[int, ProcessStepsNode]
+ steps: list[ProcessStepsNode]
seen: bool = False
is_suppressed: bool = False
+ _to_dict = ['step', 'is_explicit', 'steps', 'seen', 'is_suppressed']
class Process(BaseModel[int], ConditionsRelations):
return [self.__class__.by_id(db_conn, id_) for id_ in owner_ids]
def get_steps(self, db_conn: DatabaseConnection, external_owner:
- Process | None = None) -> dict[int, ProcessStepsNode]:
+ Process | None = None) -> list[ProcessStepsNode]:
"""Return tree of depended-on explicit and implicit ProcessSteps."""
def make_node(step: ProcessStep, suppressed: bool) -> ProcessStepsNode:
if external_owner is not None:
is_explicit = step.owner_id == external_owner.id_
process = self.__class__.by_id(db_conn, step.step_process_id)
- step_steps = {}
+ step_steps = []
if not suppressed:
step_steps = process.get_steps(db_conn, external_owner)
- return ProcessStepsNode(process, step.parent_step_id,
- is_explicit, step_steps, False, suppressed)
+ return ProcessStepsNode(step, is_explicit, step_steps, False,
+ suppressed)
- def walk_steps(node_id: int, node: ProcessStepsNode) -> None:
- node.seen = node_id in seen_step_ids
- seen_step_ids.add(node_id)
+ def walk_steps(node: ProcessStepsNode) -> None:
+ node.seen = node.step.id_ in seen_step_ids
+ assert isinstance(node.step.id_, int)
+ seen_step_ids.add(node.step.id_)
if node.is_suppressed:
return
explicit_children = [s for s in self.explicit_steps
- if s.parent_step_id == node_id]
+ if s.parent_step_id == node.step.id_]
for child in explicit_children:
- assert isinstance(child.id_, int)
- node.steps[child.id_] = make_node(child, False)
- for id_, step in node.steps.items():
- walk_steps(id_, step)
+ node.steps += [make_node(child, False)]
+ for step in node.steps:
+ walk_steps(step)
- steps: dict[int, ProcessStepsNode] = {}
+ step_nodes: list[ProcessStepsNode] = []
seen_step_ids: Set[int] = set()
if external_owner is None:
external_owner = self
if s.parent_step_id is None]:
assert isinstance(step.id_, int)
new_node = make_node(step, step in external_owner.suppressed_steps)
- steps[step.id_] = new_node
- for step_id, step_node in steps.items():
- walk_steps(step_id, step_node)
- return steps
+ step_nodes += [new_node]
+ for step_node in step_nodes:
+ walk_steps(step_node)
+ return step_nodes
def set_step_suppressions(self, db_conn: DatabaseConnection,
step_ids: list[int]) -> None:
from __future__ import annotations
from typing import Any, Set
from sqlite3 import Row
+from plomtask.misc import DictableNode
from plomtask.db import DatabaseConnection, BaseModel
from plomtask.processes import Process, ProcessStepsNode
from plomtask.versioned_attributes import VersionedAttribute
from plomtask.dating import valid_date
-class DictableNode:
- """Template for TodoNode, TodoOrStepsNode providing .as_dict_and_refs."""
- # pylint: disable=too-few-public-methods
- _to_dict: list[str] = []
-
- def __init__(self, *args: Any) -> None:
- for i, arg in enumerate(args):
- setattr(self, self._to_dict[i], arg)
-
- @property
- def as_dict_and_refs(self) -> tuple[dict[str, object], list[Any]]:
- """Return self as json.dumps-ready dict, list of referenced objects."""
- d = {}
- refs = []
- for name in self._to_dict:
- attr = getattr(self, name)
- if hasattr(attr, 'id_'):
- d[name] = attr.id_
- continue
- if isinstance(attr, list):
- d[name] = []
- for item in attr:
- item_d, item_refs = item.as_dict_and_refs
- d[name] += [item_d]
- for item_ref in [r for r in item_refs if r not in refs]:
- refs += [item_ref]
- continue
- d[name] = attr
- return d, refs
-
-
class TodoNode(DictableNode):
"""Collects what's useful to know for Todo/Condition tree display."""
# pylint: disable=too-few-public-methods
def ensure_children(self, db_conn: DatabaseConnection) -> None:
"""Ensure Todo children (create or adopt) demanded by Process chain."""
- def key_order_func(n: ProcessStepsNode) -> int:
- assert isinstance(n.process.id_, int)
- return n.process.id_
-
def walk_steps(parent: Todo, step_node: ProcessStepsNode) -> Todo:
adoptables = [t for t in Todo.by_date(db_conn, parent.date)
if (t not in parent.children)
and (t != parent)
- and step_node.process == t.process]
+ and step_node.step.step_process_id == t.process.id_]
satisfier = None
for adoptable in adoptables:
satisfier = adoptable
break
if not satisfier:
- satisfier = Todo(None, step_node.process, False, parent.date)
+ proc = Process.by_id(db_conn, step_node.step.step_process_id)
+ satisfier = Todo(None, proc, False, parent.date)
satisfier.save(db_conn)
- sub_step_nodes = list(step_node.steps.values())
- sub_step_nodes.sort(key=key_order_func)
+ sub_step_nodes = sorted(step_node.steps,
+ key=lambda s: s.step.step_process_id)
for sub_node in sub_step_nodes:
if sub_node.is_suppressed:
continue
n_slots = len([n for n in sub_step_nodes
- if n.process == sub_node.process])
+ if n.step.step_process_id
+ == sub_node.step.step_process_id])
filled_slots = len([t for t in satisfier.children
- if t.process == sub_node.process])
+ if t.process.id_
+ == sub_node.step.step_process_id_])
# if we did not newly create satisfier, it may already fill
# some step dependencies, so only fill what remains open
if n_slots - filled_slots > 0:
process = Process.by_id(db_conn, self.process_id)
steps_tree = process.get_steps(db_conn)
- for step_node in steps_tree.values():
+ for step_node in steps_tree:
if step_node.is_suppressed:
continue
self.add_child(walk_steps(self, step_node))