from typing import Any
from sqlite3 import Row
from plomtask.db import DatabaseConnection, BaseModel
-from plomtask.processes import Process
+from plomtask.processes import Process, ProcessStepsNode
from plomtask.versioned_attributes import VersionedAttribute
from plomtask.conditions import Condition, ConditionsRelations
from plomtask.exceptions import (NotFoundException, BadFormatException,
return todos
@classmethod
- def create_with_children(cls, db_conn: DatabaseConnection, date: str,
- process_ids: list[int]) -> list[Todo]:
- """Create Todos of process_ids for date, ensure children."""
- new_todos = []
- for process_id in process_ids:
- process = Process.by_id(db_conn, process_id)
- todo = Todo(None, process, False, date)
- todo.save(db_conn)
- new_todos += [todo]
- nothing_to_adopt = False
- while not nothing_to_adopt:
- nothing_to_adopt = True
- existing_todos = Todo.by_date(db_conn, date)
- for todo in new_todos:
- if todo.adopt_from(existing_todos):
- nothing_to_adopt = False
- todo.make_missing_children(db_conn)
- todo.save(db_conn)
- return new_todos
+ def create_with_children(cls, db_conn: DatabaseConnection,
+ process_id: int, date: str) -> Todo:
+ """Create Todo of process for date, ensure children."""
+
+ def key_order_func(n: ProcessStepsNode) -> int:
+ assert isinstance(n.process.id_, int)
+ return n.process.id_
+
+ def walk_steps(parent: Todo, step_node: ProcessStepsNode) -> Todo:
+ adoptables = [t for t in cls.by_date(db_conn, date)
+ if (t not in parent.children)
+ and (t != parent)
+ and step_node.process == t.process]
+ satisfier = None
+ for adoptable in adoptables:
+ satisfier = adoptable
+ break
+ if not satisfier:
+ satisfier = cls(None, step_node.process, False, date)
+ satisfier.save(db_conn)
+ sub_step_nodes = list(step_node.steps.values())
+ sub_step_nodes.sort(key=key_order_func)
+ for sub_node in sub_step_nodes:
+ n_slots = len([n for n in sub_step_nodes
+ if n.process == sub_node.process])
+ filled_slots = len([t for t in satisfier.children
+ if t.process == sub_node.process])
+ # if we did not newly create satisfier, it may already fill
+ # some step dependencies, so only fill what remains open
+ if n_slots - filled_slots > 0:
+ satisfier.add_child(walk_steps(satisfier, sub_node))
+ satisfier.save(db_conn)
+ return satisfier
+
+ process = Process.by_id(db_conn, process_id)
+ todo = cls(None, process, False, date)
+ todo.save(db_conn)
+ steps_tree = process.get_steps(db_conn)
+ for step_node in steps_tree.values():
+ todo.add_child(walk_steps(todo, step_node))
+ todo.save(db_conn)
+ return todo
@classmethod
def from_table_row(cls, db_conn: DatabaseConnection,
"""Needed for super().save to save Processes as attributes."""
return self.process.id_
- @property
- def unsatisfied_dependencies(self) -> list[int]:
- """Return Process IDs of .process.explicit_steps not in .children."""
- unsatisfied = [s.step_process_id for s in self.process.explicit_steps
- if s.parent_step_id is None]
- for child_process_id in [c.process.id_ for c in self.children]:
- if child_process_id in unsatisfied:
- unsatisfied.remove(child_process_id)
- return unsatisfied
-
@property
def is_done(self) -> bool:
"""Wrapper around self._is_done so we can control its setter."""
assert isinstance(effort_then, float)
return effort_then
- def adopt_from(self, todos: list[Todo]) -> bool:
- """As far as possible, fill unsatisfied dependencies from todos."""
- adopted = False
- for process_id in self.unsatisfied_dependencies:
- for todo in [t for t in todos if t.process.id_ == process_id
- and t not in self.children]:
- self.add_child(todo)
- adopted = True
- break
- return adopted
-
- def make_missing_children(self, db_conn: DatabaseConnection) -> None:
- """Fill unsatisfied dependencies with new Todos."""
- new_todos = self.__class__.create_with_children(
- db_conn, self.date, self.unsatisfied_dependencies)
- for todo in new_todos:
- self.add_child(todo)
-
def get_step_tree(self, seen_todos: set[int]) -> TodoNode:
"""Return tree of depended-on Todos."""
node_0.children += [node_4]
self.assertEqual(todo_1.get_step_tree(set()), node_0)
- def test_Todo_unsatisfied_steps(self) -> None:
- """Test options of satisfying unfulfilled Process.explicit_steps."""
+ def test_Todo_create_with_children(self) -> None:
+ """Test parenthood guaranteeds of Todo.create_with_children."""
assert isinstance(self.proc.id_, int)
- todo_1 = Todo(None, self.proc, False, self.date1)
- todo_1.save(self.db_conn)
proc2 = Process(None)
proc2.save(self.db_conn)
assert isinstance(proc2.id_, int)
proc4 = Process(None)
proc4.save(self.db_conn)
assert isinstance(proc4.id_, int)
+ # make proc4 step of proc3
proc3.set_steps(self.db_conn, [(None, proc4.id_, None)])
+ # give proc2 three steps; 2× proc1, 1× proc3
proc2.set_steps(self.db_conn, [(None, self.proc.id_, None),
(None, self.proc.id_, None),
(None, proc3.id_, None)])
- todo_2 = Todo(None, proc2, False, self.date1)
- todo_2.save(self.db_conn)
- # test empty adoption does nothing
- todo_2.adopt_from([])
- self.assertEqual(todo_2.children, [])
- # test basic adoption
- todo_2.adopt_from([todo_1])
- self.assertEqual(todo_2.children, [todo_1])
- self.assertEqual(todo_1.parents, [todo_2])
- # test making missing children
- todo_2.make_missing_children(self.db_conn)
- todo_3 = Todo.by_id(self.db_conn, 3)
- todo_4 = Todo.by_id(self.db_conn, 4)
- self.assertEqual(todo_2.children, [todo_1, todo_3, todo_4])
- self.assertEqual(todo_3.process, self.proc)
- self.assertEqual(todo_3.parents, [todo_2])
- self.assertEqual(todo_3.children, [])
- self.assertEqual(todo_4.process, proc3)
- self.assertEqual(todo_4.parents, [todo_2])
- # test .make_missing_children lower down the tree
- todo_4.make_missing_children(self.db_conn)
- todo_5 = Todo.by_id(self.db_conn, 5)
- self.assertEqual(todo_5.process, proc4)
- self.assertEqual(todo_4.children, [todo_5])
- self.assertEqual(todo_5.parents, [todo_4])
+ # test mere creation does nothing
+ todo_ignore = Todo(None, proc2, False, self.date1)
+ todo_ignore.save(self.db_conn)
+ self.assertEqual(todo_ignore.children, [])
+ # test create_with_children on step-less does nothing
+ todo_1 = Todo.create_with_children(self.db_conn, self.proc.id_,
+ self.date1)
+ self.assertEqual(todo_1.children, [])
+ self.assertEqual(len(Todo.all(self.db_conn)), 2)
+ # test create_with_children adopts and creates, and down tree too
+ todo_2 = Todo.create_with_children(self.db_conn, proc2.id_, self.date1)
+ self.assertEqual(3, len(todo_2.children))
+ self.assertEqual(todo_1, todo_2.children[0])
+ self.assertEqual(self.proc, todo_2.children[1].process)
+ self.assertEqual(proc3, todo_2.children[2].process)
+ todo_3 = todo_2.children[2]
+ self.assertEqual(len(todo_3.children), 1)
+ self.assertEqual(todo_3.children[0].process, proc4)
def test_Todo_singularity(self) -> None:
"""Test pointers made for single object keep pointing to it."""
assert isinstance(t.process.id_, int)
return t.process.id_
- def check_adoption(date: str, new_todo: list[int]) -> None:
- form_data = {'day_comment': '', 'new_todo': new_todo}
+ def check_adoption(date: str, new_todos: list[int]) -> None:
+ form_data = {'day_comment': '', 'new_todo': new_todos}
self.check_post(form_data, f'/day?date={date}', 302)
day_todos = Todo.by_date(self.db_conn, date)
day_todos.sort(key=key_order_func)
def check_nesting_adoption(process_id: int, date: str,
new_top_steps: list[int]) -> None:
- result_reversed = new_top_steps[1] < new_top_steps[0]
form_data = self.post_process()
form_data = self.post_process(process_id,
form_data |
todo1 = day_todos[0] # process of process_id
todo2 = day_todos[1] # process 2
todo3 = day_todos[2] # process 1
- if result_reversed:
- self.assertEqual(todo1.children, [todo2, todo3])
- else:
- self.assertEqual(todo1.children, [todo3, todo2])
+ self.assertEqual(sorted(todo1.children), sorted([todo2, todo3]))
self.assertEqual(todo1.parents, [])
self.assertEqual(todo2.children, [todo3])
self.assertEqual(todo2.parents, [todo1])
self.assertEqual(todo3.children, [])
- self.assertEqual(todo3.parents, [todo2, todo1])
+ self.assertEqual(sorted(todo3.parents), sorted([todo2, todo1]))
form_data = self.post_process()
form_data = self.post_process(2, form_data | {'new_top_step': 1})