--- /dev/null
+"""Non-doable elements of ProcessStep/Todo chains."""
+from __future__ import annotations
+from sqlite3 import Row
+from plomtask.db import DatabaseConnection
+from plomtask.misc import VersionedAttribute
+from plomtask.exceptions import BadFormatException, NotFoundException
+
+
+class Condition:
+ """Non Process-dependency for ProcessSteps and Todos."""
+
+ def __init__(self, id_: int | None, is_active: bool = False) -> None:
+ if (id_ is not None) and id_ < 1:
+ msg = f'illegal Condition ID, must be >=1: {id_}'
+ raise BadFormatException(msg)
+ self.id_ = id_
+ self.is_active = is_active
+ self.title = VersionedAttribute(self, 'condition_titles', 'UNNAMED')
+ self.description = VersionedAttribute(self, 'condition_descriptions',
+ '')
+
+ @classmethod
+ def from_table_row(cls, db_conn: DatabaseConnection,
+ row: Row) -> Condition:
+ """Build condition from row, including VersionedAttributes."""
+ condition = cls(row[0], row[1])
+ for title_row in db_conn.exec('SELECT * FROM condition_titles '
+ 'WHERE parent_id = ?', (row[0],)):
+ condition.title.history[title_row[1]] = title_row[2]
+ for desc_row in db_conn.exec('SELECT * FROM condition_descriptions '
+ 'WHERE parent_id = ?', (row[0],)):
+ condition.description.history[desc_row[1]] = desc_row[2]
+ return condition
+
+ @classmethod
+ def all(cls, db_conn: DatabaseConnection) -> list[Condition]:
+ """Collect all Conditions and their VersionedAttributes."""
+ conditions = {}
+ for id_, condition in db_conn.cached_conditions.items():
+ conditions[id_] = condition
+ already_recorded = conditions.keys()
+ for row in db_conn.exec('SELECT id FROM conditions'):
+ if row[0] not in already_recorded:
+ condition = cls.by_id(db_conn, row[0])
+ conditions[condition.id_] = condition
+ return list(conditions.values())
+
+ @classmethod
+ def by_id(cls, db_conn: DatabaseConnection, id_: int | None,
+ create: bool = False) -> Condition:
+ """Collect (or create) Condition and its VersionedAttributes."""
+ condition = None
+ if id_ in db_conn.cached_conditions.keys():
+ condition = db_conn.cached_conditions[id_]
+ else:
+ for row in db_conn.exec('SELECT * FROM conditions WHERE id = ?',
+ (id_,)):
+ condition = cls.from_table_row(db_conn, row)
+ break
+ if not condition:
+ if not create:
+ raise NotFoundException(f'Condition not found of id: {id_}')
+ condition = cls(id_, False)
+ return condition
+
+ def save(self, db_conn: DatabaseConnection) -> None:
+ """Save self and its VersionedAttributes to DB and cache."""
+ cursor = db_conn.exec('REPLACE INTO conditions VALUES (?, ?)',
+ (self.id_, self.is_active))
+ self.id_ = cursor.lastrowid
+ self.title.save(db_conn)
+ self.description.save(db_conn)
+ assert self.id_ is not None
+ db_conn.cached_conditions[self.id_] = self
self.cached_days: Dict[str, Any] = {}
self.cached_process_steps: Dict[int, Any] = {}
self.cached_processes: Dict[int, Any] = {}
+ self.cached_conditions: Dict[int, Any] = {}
def commit(self) -> None:
"""Commit SQL transaction."""
NotFoundException
from plomtask.db import DatabaseConnection, DatabaseFile
from plomtask.processes import Process
+from plomtask.conditions import Condition
from plomtask.todos import Todo
TEMPLATES_DIR = 'templates'
"""Handle any GET request."""
try:
conn, site, params = self._init_handling()
- if site in {'calendar', 'day', 'process', 'processes', 'todo'}:
+ if site in {'calendar', 'day', 'process', 'processes', 'todo',
+ 'condition', 'conditions'}:
html = getattr(self, f'do_GET_{site}')(conn, params)
elif '' == site:
self._redirect('/day')
date = params.get_str('date', todays_date())
day = Day.by_date(conn, date, create=True)
todos = Todo.by_date(conn, date)
+ conditions_listing = []
+ for condition in Condition.all(conn):
+ enablers = Todo.enablers_for_at(conn, condition, date)
+ disablers = Todo.disablers_for_at(conn, condition, date)
+ conditions_listing += [{
+ 'condition': condition,
+ 'enablers': enablers,
+ 'disablers': disablers}]
return self.server.jinja.get_template('day.html').render(
- day=day, processes=Process.all(conn), todos=todos)
+ day=day, processes=Process.all(conn), todos=todos,
+ conditions_listing=conditions_listing)
def do_GET_todo(self, conn: DatabaseConnection, params:
ParamsParser) -> str:
"""Show single Todo of ?id=."""
id_ = params.get_int_or_none('id')
todo = Todo.by_id(conn, id_)
- candidates = Todo.by_date(conn, todo.day.date)
+ todo_candidates = Todo.by_date(conn, todo.day.date)
return self.server.jinja.get_template('todo.html').render(
- todo=todo, candidates=candidates)
+ todo=todo, todo_candidates=todo_candidates,
+ condition_candidates=Condition.all(conn))
+
+ def do_GET_conditions(self, conn: DatabaseConnection,
+ _: ParamsParser) -> str:
+ """Show all Conditions."""
+ return self.server.jinja.get_template('conditions.html').render(
+ conditions=Condition.all(conn))
+
+ def do_GET_condition(self, conn: DatabaseConnection,
+ params: ParamsParser) -> str:
+ """Show Condition of ?id=."""
+ id_ = params.get_int_or_none('id')
+ condition = Condition.by_id(conn, id_, create=True)
+ return self.server.jinja.get_template('condition.html').render(
+ condition=condition)
def do_GET_process(self, conn: DatabaseConnection,
params: ParamsParser) -> str:
owners = process.used_as_step_by(conn)
return self.server.jinja.get_template('process.html').render(
process=process, steps=process.get_steps(conn),
- owners=owners, candidates=Process.all(conn))
+ owners=owners, process_candidates=Process.all(conn),
+ condition_candidates=Condition.all(conn))
def do_GET_processes(self, conn: DatabaseConnection,
_: ParamsParser) -> str:
postvars = parse_qs(self.rfile.read(length).decode(),
keep_blank_values=True, strict_parsing=True)
form_data = PostvarsParser(postvars)
- if site in ('day', 'process', 'todo'):
+ if site in ('day', 'process', 'todo', 'condition'):
getattr(self, f'do_POST_{site}')(conn, params, form_data)
conn.commit()
else:
if child_id is not None:
child = Todo.by_id(conn, child_id)
todo.add_child(child)
+ todo.set_conditions(conn, form_data.get_all_int('condition'))
+ todo.set_fulfills(conn, form_data.get_all_int('fulfills'))
+ todo.set_undoes(conn, form_data.get_all_int('undoes'))
todo.is_done = len(form_data.get_all_str('done')) > 0
todo.save(conn)
+ for condition in todo.fulfills:
+ condition.save(conn)
+ for condition in todo.undoes:
+ condition.save(conn)
def do_POST_process(self, conn: DatabaseConnection, params: ParamsParser,
form_data: PostvarsParser) -> None:
process.title.set(form_data.get_str('title'))
process.description.set(form_data.get_str('description'))
process.effort.set(form_data.get_float('effort'))
+ process.set_conditions(conn, form_data.get_all_int('condition'))
+ process.set_fulfills(conn, form_data.get_all_int('fulfills'))
+ process.set_undoes(conn, form_data.get_all_int('undoes'))
process.save_without_steps(conn)
assert process.id_ is not None # for mypy
process.explicit_steps = []
process.add_step(conn, None, step_process_id, None)
process.fix_steps(conn)
+ def do_POST_condition(self, conn: DatabaseConnection, params: ParamsParser,
+ form_data: PostvarsParser) -> None:
+ """Update/insert Condition of ?id= and fields defined in postvars."""
+ id_ = params.get_int_or_none('id')
+ condition = Condition.by_id(conn, id_, create=True)
+ condition.title.set(form_data.get_str('title'))
+ condition.description.set(form_data.get_str('description'))
+ condition.save(conn)
+
def _init_handling(self) -> tuple[DatabaseConnection, str, ParamsParser]:
conn = DatabaseConnection(self.server.db)
parsed_url = urlparse(self.path)
--- /dev/null
+"""Attributes whose values are recorded as a timestamped history."""
+from datetime import datetime
+from typing import Any
+from plomtask.db import DatabaseConnection
+
+
+class VersionedAttribute:
+ """Attributes whose values are recorded as a timestamped history."""
+
+ def __init__(self,
+ parent: Any, table_name: str, default: str | float) -> None:
+ self.parent = parent
+ self.table_name = table_name
+ self.default = default
+ self.history: dict[str, str | float] = {}
+
+ @property
+ def _newest_timestamp(self) -> str:
+ """Return most recent timestamp."""
+ return sorted(self.history.keys())[-1]
+
+ @property
+ def newest(self) -> str | float:
+ """Return most recent value, or self.default if self.history empty."""
+ if 0 == len(self.history):
+ return self.default
+ return self.history[self._newest_timestamp]
+
+ def set(self, value: str | float) -> None:
+ """Add to self.history if and only if not same value as newest one."""
+ if 0 == len(self.history) \
+ or value != self.history[self._newest_timestamp]:
+ self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
+
+ def at(self, queried_time: str) -> str | float:
+ """Retrieve value of timestamp nearest queried_time from the past."""
+ sorted_timestamps = sorted(self.history.keys())
+ if 0 == len(sorted_timestamps):
+ return self.default
+ selected_timestamp = sorted_timestamps[0]
+ for timestamp in sorted_timestamps[1:]:
+ if timestamp > queried_time:
+ break
+ selected_timestamp = timestamp
+ return self.history[selected_timestamp]
+
+ def save(self, db_conn: DatabaseConnection) -> None:
+ """Save as self.history entries, but first wipe old ones."""
+ db_conn.exec(f'DELETE FROM {self.table_name} WHERE parent_id = ?',
+ (self.parent.id_,))
+ for timestamp, value in self.history.items():
+ db_conn.exec(f'INSERT INTO {self.table_name} VALUES (?, ?, ?)',
+ (self.parent.id_, timestamp, value))
"""Collecting Processes and Process-related items."""
from __future__ import annotations
from sqlite3 import Row
-from datetime import datetime
from typing import Any, Set
from plomtask.db import DatabaseConnection
+from plomtask.misc import VersionedAttribute
+from plomtask.conditions import Condition
from plomtask.exceptions import NotFoundException, BadFormatException
class Process:
"""Template for, and metadata for, Todos, and their arrangements."""
+ # pylint: disable=too-many-instance-attributes
+
def __init__(self, id_: int | None) -> None:
if (id_ is not None) and id_ < 1:
raise BadFormatException(f'illegal Process ID, must be >=1: {id_}')
self.id_ = id_
- self.title = VersionedAttribute(self, 'title', 'UNNAMED')
- self.description = VersionedAttribute(self, 'description', '')
- self.effort = VersionedAttribute(self, 'effort', 1.0)
+ self.title = VersionedAttribute(self, 'process_titles', 'UNNAMED')
+ self.description = VersionedAttribute(self, 'process_descriptions', '')
+ self.effort = VersionedAttribute(self, 'process_efforts', 1.0)
self.explicit_steps: list[ProcessStep] = []
+ self.conditions: list[Condition] = []
+ self.fulfills: list[Condition] = []
+ self.undoes: list[Condition] = []
@classmethod
def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Process:
if not create:
raise NotFoundException(f'Process not found of id: {id_}')
process = Process(id_)
- if process:
- for row in db_conn.exec('SELECT * FROM process_titles '
- 'WHERE process_id = ?', (process.id_,)):
- process.title.history[row[1]] = row[2]
- for row in db_conn.exec('SELECT * FROM process_descriptions '
- 'WHERE process_id = ?', (process.id_,)):
- process.description.history[row[1]] = row[2]
- for row in db_conn.exec('SELECT * FROM process_efforts '
- 'WHERE process_id = ?', (process.id_,)):
- process.effort.history[row[1]] = row[2]
- for row in db_conn.exec('SELECT * FROM process_steps '
- 'WHERE owner_id = ?', (process.id_,)):
- process.explicit_steps += [ProcessStep.from_table_row(db_conn,
- row)]
+ for row in db_conn.exec('SELECT * FROM process_titles '
+ 'WHERE parent_id = ?', (process.id_,)):
+ process.title.history[row[1]] = row[2]
+ for row in db_conn.exec('SELECT * FROM process_descriptions '
+ 'WHERE parent_id = ?', (process.id_,)):
+ process.description.history[row[1]] = row[2]
+ for row in db_conn.exec('SELECT * FROM process_efforts '
+ 'WHERE parent_id = ?', (process.id_,)):
+ process.effort.history[row[1]] = row[2]
+ for row in db_conn.exec('SELECT * FROM process_steps '
+ 'WHERE owner_id = ?', (process.id_,)):
+ process.explicit_steps += [ProcessStep.from_table_row(db_conn,
+ row)]
+ for row in db_conn.exec('SELECT condition FROM process_conditions '
+ 'WHERE process = ?', (process.id_,)):
+ process.conditions += [Condition.by_id(db_conn, row[0])]
+ for row in db_conn.exec('SELECT condition FROM process_fulfills '
+ 'WHERE process = ?', (process.id_,)):
+ process.fulfills += [Condition.by_id(db_conn, row[0])]
+ for row in db_conn.exec('SELECT condition FROM process_undoes '
+ 'WHERE process = ?', (process.id_,)):
+ process.undoes += [Condition.by_id(db_conn, row[0])]
assert isinstance(process, Process)
return process
"""Return tree of depended-on explicit and implicit ProcessSteps."""
def make_node(step: ProcessStep) -> dict[str, object]:
- step_process = self.__class__.by_id(db_conn, step.step_process_id)
is_explicit = False
if external_owner is not None:
is_explicit = step.owner_id == external_owner.id_
- step_steps = step_process.get_steps(db_conn, external_owner)
- return {'process': step_process, 'parent_id': step.parent_step_id,
+ process = self.__class__.by_id(db_conn, step.step_process_id)
+ step_steps = process.get_steps(db_conn, external_owner)
+ return {'process': process, 'parent_id': step.parent_step_id,
'is_explicit': is_explicit, 'steps': step_steps}
def walk_steps(node_id: int, node: dict[str, Any]) -> None:
walk_steps(step_id, step_node)
return steps
+ def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
+ trgt: str = 'conditions') -> None:
+ """Set self.[target] to Conditions identified by ids."""
+ trgt_list = getattr(self, trgt)
+ while len(trgt_list) > 0:
+ trgt_list.pop()
+ for id_ in ids:
+ trgt_list += [Condition.by_id(db_conn, id_)]
+
+ def set_fulfills(self, db_conn: DatabaseConnection,
+ ids: list[int]) -> None:
+ """Set self.fulfills to Conditions identified by ids."""
+ self.set_conditions(db_conn, ids, 'fulfills')
+
+ def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
+ """Set self.undoes to Conditions identified by ids."""
+ self.set_conditions(db_conn, ids, 'undoes')
+
def add_step(self, db_conn: DatabaseConnection, id_: int | None,
step_process_id: int,
parent_step_id: int | None) -> ProcessStep:
self.title.save(db_conn)
self.description.save(db_conn)
self.effort.save(db_conn)
+ db_conn.exec('DELETE FROM process_conditions WHERE process = ?',
+ (self.id_,))
+ for condition in self.conditions:
+ db_conn.exec('INSERT INTO process_conditions VALUES (?,?)',
+ (self.id_, condition.id_))
+ db_conn.exec('DELETE FROM process_fulfills WHERE process = ?',
+ (self.id_,))
+ for condition in self.fulfills:
+ db_conn.exec('INSERT INTO process_fulfills VALUES (?,?)',
+ (self.id_, condition.id_))
+ db_conn.exec('DELETE FROM process_undoes WHERE process = ?',
+ (self.id_,))
+ for condition in self.undoes:
+ db_conn.exec('INSERT INTO process_undoes VALUES (?,?)',
+ (self.id_, condition.id_))
assert self.id_ is not None
db_conn.cached_processes[self.id_] = self
self.id_ = cursor.lastrowid
assert self.id_ is not None
db_conn.cached_process_steps[self.id_] = self
-
-
-class VersionedAttribute:
- """Attributes whose values are recorded as a timestamped history."""
-
- def __init__(self,
- parent: Process, name: str, default: str | float) -> None:
- self.parent = parent
- self.name = name
- self.default = default
- self.history: dict[str, str | float] = {}
-
- @property
- def _newest_timestamp(self) -> str:
- """Return most recent timestamp."""
- return sorted(self.history.keys())[-1]
-
- @property
- def newest(self) -> str | float:
- """Return most recent value, or self.default if self.history empty."""
- if 0 == len(self.history):
- return self.default
- return self.history[self._newest_timestamp]
-
- def set(self, value: str | float) -> None:
- """Add to self.history if and only if not same value as newest one."""
- if 0 == len(self.history) \
- or value != self.history[self._newest_timestamp]:
- self.history[datetime.now().strftime('%Y-%m-%d %H:%M:%S')] = value
-
- def at(self, queried_time: str) -> str | float:
- """Retrieve value of timestamp nearest queried_time from the past."""
- sorted_timestamps = sorted(self.history.keys())
- if 0 == len(sorted_timestamps):
- return self.default
- selected_timestamp = sorted_timestamps[0]
- for timestamp in sorted_timestamps[1:]:
- if timestamp > queried_time:
- break
- selected_timestamp = timestamp
- return self.history[selected_timestamp]
-
- def save(self, db_conn: DatabaseConnection) -> None:
- """Save as self.history entries, but first wipe old ones."""
- db_conn.exec(f'DELETE FROM process_{self.name}s WHERE process_id = ?',
- (self.parent.id_,))
- for timestamp, value in self.history.items():
- db_conn.exec(f'INSERT INTO process_{self.name}s VALUES (?, ?, ?)',
- (self.parent.id_, timestamp, value))
from plomtask.db import DatabaseConnection
from plomtask.days import Day
from plomtask.processes import Process
+from plomtask.conditions import Condition
from plomtask.exceptions import (NotFoundException, BadFormatException,
HandledException)
class Todo:
"""Individual actionable."""
+ # pylint: disable=too-many-instance-attributes
+
def __init__(self, id_: int | None, process: Process,
is_done: bool, day: Day) -> None:
self.id_ = id_
self.day = day
self.children: list[Todo] = []
self.parents: list[Todo] = []
+ self.conditions: list[Condition] = []
+ self.fulfills: list[Condition] = []
+ self.undoes: list[Condition] = []
+ if not self.id_:
+ self.conditions = process.conditions[:]
+ self.fulfills = process.fulfills[:]
+ self.undoes = process.undoes[:]
@classmethod
def from_table_row(cls, db_conn: DatabaseConnection, row: Row) -> Todo:
for row in db_conn.exec('SELECT parent FROM todo_children '
'WHERE child = ?', (id_,)):
todo.parents += [cls.by_id(db_conn, row[0])]
+ for row in db_conn.exec('SELECT condition FROM todo_conditions '
+ 'WHERE todo = ?', (id_,)):
+ todo.conditions += [Condition.by_id(db_conn, row[0])]
+ for row in db_conn.exec('SELECT condition FROM todo_fulfills '
+ 'WHERE todo = ?', (id_,)):
+ todo.fulfills += [Condition.by_id(db_conn, row[0])]
+ for row in db_conn.exec('SELECT condition FROM todo_undoes '
+ 'WHERE todo = ?', (id_,)):
+ todo.undoes += [Condition.by_id(db_conn, row[0])]
assert isinstance(todo, Todo)
return todo
todos += [cls.by_id(db_conn, row[0])]
return todos
+ @classmethod
+ def enablers_for_at(cls, db_conn: DatabaseConnection, condition: Condition,
+ date: str) -> list[Todo]:
+ """Collect all Todos of day that enable condition."""
+ enablers = []
+ for row in db_conn.exec('SELECT todo FROM todo_fulfills '
+ 'WHERE condition = ?', (condition.id_,)):
+ todo = cls.by_id(db_conn, row[0])
+ if todo.day.date == date:
+ enablers += [todo]
+ return enablers
+
+ @classmethod
+ def disablers_for_at(cls, db_conn: DatabaseConnection,
+ condition: Condition, date: str) -> list[Todo]:
+ """Collect all Todos of day that disable condition."""
+ disablers = []
+ for row in db_conn.exec('SELECT todo FROM todo_undoes '
+ 'WHERE condition = ?', (condition.id_,)):
+ todo = cls.by_id(db_conn, row[0])
+ if todo.day.date == date:
+ disablers += [todo]
+ return disablers
+
@property
def is_doable(self) -> bool:
- """Decide whether .is_done can be set to True based on children's."""
+ """Decide whether .is_done settable based on children, Conditions."""
for child in self.children:
if not child.is_done:
return False
+ for condition in self.conditions:
+ if not condition.is_active:
+ return False
return True
@property
def is_done(self, value: bool) -> None:
if value != self.is_done and not self.is_doable:
raise BadFormatException('cannot change doneness of undoable Todo')
- self._is_done = value
+ if self._is_done != value:
+ self._is_done = value
+ if value is True:
+ for condition in self.fulfills:
+ condition.is_active = True
+ for condition in self.undoes:
+ condition.is_active = False
+
+ def set_undoes(self, db_conn: DatabaseConnection, ids: list[int]) -> None:
+ """Set self.undoes to Conditions identified by ids."""
+ self.set_conditions(db_conn, ids, 'undoes')
+
+ def set_fulfills(self, db_conn: DatabaseConnection,
+ ids: list[int]) -> None:
+ """Set self.fulfills to Conditions identified by ids."""
+ self.set_conditions(db_conn, ids, 'fulfills')
+
+ def set_conditions(self, db_conn: DatabaseConnection, ids: list[int],
+ target: str = 'conditions') -> None:
+ """Set self.[target] to Conditions identified by ids."""
+ target_list = getattr(self, target)
+ while len(target_list) > 0:
+ target_list.pop()
+ for id_ in ids:
+ target_list += [Condition.by_id(db_conn, id_)]
def add_child(self, child: Todo) -> None:
"""Add child to self.children, guard against recursion"""
for child in self.children:
db_conn.exec('INSERT INTO todo_children VALUES (?, ?)',
(self.id_, child.id_))
+ db_conn.exec('DELETE FROM todo_fulfills WHERE todo = ?', (self.id_,))
+ for condition in self.fulfills:
+ if condition.id_ is None:
+ raise NotFoundException('Fulfilled Condition of Todo '
+ 'without ID (not saved?)')
+ db_conn.exec('INSERT INTO todo_fulfills VALUES (?, ?)',
+ (self.id_, condition.id_))
+ db_conn.exec('DELETE FROM todo_undoes WHERE todo = ?', (self.id_,))
+ for condition in self.undoes:
+ if condition.id_ is None:
+ raise NotFoundException('Undone Condition of Todo '
+ 'without ID (not saved?)')
+ db_conn.exec('INSERT INTO todo_undoes VALUES (?, ?)',
+ (self.id_, condition.id_))
+ db_conn.exec('DELETE FROM todo_conditions WHERE todo = ?', (self.id_,))
+ for condition in self.conditions:
+ if condition.id_ is None:
+ raise NotFoundException('Condition of Todo '
+ 'without ID (not saved?)')
+ db_conn.exec('INSERT INTO todo_conditions VALUES (?, ?)',
+ (self.id_, condition.id_))
+CREATE TABLE condition_descriptions (
+ parent_id INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ description TEXT NOT NULL,
+ PRIMARY KEY (parent_id, timestamp),
+ FOREIGN KEY (parent_id) REFERENCES conditions(id)
+);
+CREATE TABLE condition_titles (
+ parent_id INTEGER NOT NULL,
+ timestamp TEXT NOT NULL,
+ title TEXT NOT NULL,
+ PRIMARY KEY (parent_id, timestamp),
+ FOREIGN KEY (parent_id) REFERENCES conditions(id)
+);
+CREATE TABLE conditions (
+ id INTEGER PRIMARY KEY,
+ is_active BOOLEAN NOT NULL
+);
CREATE TABLE days (
date TEXT PRIMARY KEY,
comment TEXT NOT NULL
);
+CREATE TABLE process_conditions (
+ process INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY (process, condition),
+ FOREIGN KEY (process) REFERENCES processes(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
CREATE TABLE process_descriptions (
- process_id INTEGER NOT NULL,
+ parent_id INTEGER NOT NULL,
timestamp TEXT NOT NULL,
description TEXT NOT NULL,
- PRIMARY KEY (process_id, timestamp),
- FOREIGN KEY (process_id) REFERENCES processes(id)
+ PRIMARY KEY (parent_id, timestamp),
+ FOREIGN KEY (parent_id) REFERENCES processes(id)
);
CREATE TABLE process_efforts (
- process_id INTEGER NOT NULL,
+ parent_id INTEGER NOT NULL,
timestamp TEXT NOT NULL,
effort REAL NOT NULL,
- PRIMARY KEY (process_id, timestamp),
- FOREIGN KEY (process_id) REFERENCES processes(id)
+ PRIMARY KEY (parent_id, timestamp),
+ FOREIGN KEY (parent_id) REFERENCES processes(id)
+);
+CREATE TABLE process_fulfills (
+ process INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(process, condition),
+ FOREIGN KEY (process) REFERENCES processes(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
);
CREATE TABLE process_steps (
step_id INTEGER PRIMARY KEY,
FOREIGN KEY (parent_step_id) REFERENCES process_steps(step_id)
);
CREATE TABLE process_titles (
- process_id INTEGER NOT NULL,
+ parent_id INTEGER NOT NULL,
timestamp TEXT NOT NULL,
title TEXT NOT NULL,
- PRIMARY KEY (process_id, timestamp),
- FOREIGN KEY (process_id) REFERENCES processes(id)
+ PRIMARY KEY (parent_id, timestamp),
+ FOREIGN KEY (parent_id) REFERENCES processes(id)
+);
+CREATE TABLE process_undoes (
+ process INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(process, condition),
+ FOREIGN KEY (process) REFERENCES processes(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
);
CREATE TABLE processes (
id INTEGER PRIMARY KEY
FOREIGN KEY (parent) REFERENCES todos(id),
FOREIGN KEY (child) REFERENCES todos(id)
);
+CREATE TABLE todo_conditions (
+ todo INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(todo, condition),
+ FOREIGN KEY (todo) REFERENCES todos(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE todo_fulfills (
+ todo INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(todo, condition),
+ FOREIGN KEY (todo) REFERENCES todos(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
+CREATE TABLE todo_undoes (
+ todo INTEGER NOT NULL,
+ condition INTEGER NOT NULL,
+ PRIMARY KEY(todo, condition),
+ FOREIGN KEY (todo) REFERENCES todos(id),
+ FOREIGN KEY (condition) REFERENCES conditions(id)
+);
CREATE TABLE todos (
id INTEGER PRIMARY KEY,
process_id INTEGER NOT NULL,
<meta charset="UTF-8">
<body>
<a href="processes">processes</a>
+<a href="conditions">conditions</a>
<a href="day">today</a>
<a href="calendar">calendar</a>
<hr>
--- /dev/null
+{% extends 'base.html' %}
+
+{% block content %}
+<h3>condition</h3>
+<form action="condition?id={{condition.id_ or ''}}" method="POST">
+title: <input name="title" value="{{condition.title.newest|e}}" />
+description: <input name="description" value="{{condition.description.newest|e}}" />
+<input type="submit" value="OK" />
+{% endblock %}
+
--- /dev/null
+{% extends 'base.html' %}
+
+{% block content %}
+<a href="condition">add</a>
+<ul>
+{% for condition in conditions %}
+<li><a href="condition?id={{condition.id_}}">{{condition.title.newest}}</a>
+{% endfor %}
+</ul>
+{% endblock %}
+
+
{% for child in todo.children %}
{{ todo_with_children(child, indent+1) }}
{% endfor %}
+{% for condition in todo.conditions %}
+<li>{% for i in range(indent+1) %}+{% endfor %} [{% if condition.is_active %}x{% else %} {% endif %}] <a href="condition?id={{condition.id_}}">{{condition.title.newest|e}}</a>
+{% endfor %}
{% endmacro %}
{% block content %}
{% endfor %}
</datalist>
</form>
+<h4>conditions</h4>
+{% for node in conditions_listing %}
+<li>[{% if node['condition'].is_active %}x{% else %} {% endif %}] <a href="condition?id={{node['condition'].id_}}">{{node['condition'].title.newest|e}}</a>
+<ul>
+{% for enabler in node['enablers'] %}
+<li>[+] {{enabler.process.title.newest|e}}</li>
+{% endfor %}
+{% for disabler in node['disablers'] %}
+<li>[-] {{disabler.process.title.newest|e}}</li>
+{% endfor %}
+</ul>
+</li>
+{% endfor %}
+<h4>todos</h4>
<ul>
{% for todo in todos %}
{{ todo_with_children(todo, 0) }}
{% extends 'base.html' %}
-{% macro process_with_steps(step_id, step_node, indent) %}
+{% macro step_with_steps(step_id, step_node, indent) %}
<tr>
<td>
<input type="hidden" name="steps" value="{{step_id}}" />
{% if step_node.is_explicit %}
<input type="checkbox" name="keep_step" value="{{step_id}}" checked />
<input type="hidden" name="step_{{step_id}}_process_id" value="{{step_node.process.id_}}" />
+<input type="hidden" name="step_{{step_id}}_condition_id" value="{{step_node.condition.id_}}" />
<input type="hidden" name="step_{{step_id}}_parent_id" value="{{step_node.parent_id or ''}}" />
{% endif %}
</td>
</tr>
{% if step_node.is_explicit or not step_node.seen %}
{% for substep_id, substep in step_node.steps.items() %}
-{{ process_with_steps(substep_id, substep, indent+1) }}
+{{ step_with_steps(substep_id, substep, indent+1) }}
{% endfor %}
{% endif %}
{% endmacro %}
title: <input name="title" value="{{process.title.newest|e}}" />
description: <input name="description" value="{{process.description.newest|e}}" />
default effort: <input name="effort" type="number" step=0.1 value={{process.effort.newest}} />
+<h4>conditions</h4>
+<table>
+{% for condition in process.conditions %}
+<tr>
+<td>
+<input type="checkbox" name="condition" value="{{condition.id_}}" checked />
+</td>
+<td>
+<a href="condition?id={{condition.id_}}">{{condition.title.newest|e}}</a>
+</td>
+</tr>
+{% endfor %}
+</table>
+add condition: <input name="condition" list="condition_candidates" autocomplete="off" />
+<datalist id="condition_candidates">
+{% for condition_candidate in condition_candidates %}
+<option value="{{condition_candidate.id_}}">{{condition_candidate.title.newest|e}}</option>
+{% endfor %}
+</datalist>
+<h4>fulfills</h4>
+<table>
+{% for condition in process.fulfills %}
+<tr>
+<td>
+<input type="checkbox" name="fulfills" value="{{condition.id_}}" checked />
+</td>
+<td>
+<a href="condition?id={{condition.id_}}">{{condition.title.newest|e}}</a>
+</td>
+</tr>
+{% endfor %}
+</table>
+add fulfills: <input name="fulfills" list="condition_candidates" autocomplete="off" />
+<h4>conditions</h4>
+<table>
+{% for condition in process.undoes %}
+<tr>
+<td>
+<input type="checkbox" name="undoes" value="{{condition.id_}}" checked />
+</td>
+<td>
+<a href="condition?id={{condition.id_}}">{{condition.title.newest|e}}</a>
+</td>
+</tr>
+{% endfor %}
+</table>
+add undoes: <input name="undoes" list="condition_candidates" autocomplete="off" />
<h4>steps</h4>
<table>
{% for step_id, step_node in steps.items() %}
-{{ process_with_steps(step_id, step_node, 0) }}
+{{ step_with_steps(step_id, step_node, 0) }}
{% endfor %}
</table>
-add step: <input name="new_top_step" list="candidates" autocomplete="off" />
-<datalist id="candidates">
-{% for candidate in candidates %}
+add step: <input name="new_top_step" list="step_candidates" autocomplete="off" />
+<datalist id="step_candidates">
+{% for candidate in step_candidates %}
<option value="{{candidate.id_}}">{{candidate.title.newest|e}}</option>
{% endfor %}
</datalist>
{% endfor %}
</ul>
{% endblock %}
-
-
--- /dev/null
+{% extends 'base.html' %}
+
+{% block content %}
+<h3>Todo: {{todo.process.title.newest|e}}</h3>
+<form action="todo?id={{todo.id_}}" method="POST">
+<p>
+id: {{todo.id_}}<br />
+day: <a href="day?date={{todo.day.date}}">{{todo.day.date}}</a><br />
+process: <a href="process?id={{todo.process.id_}}">{{todo.process.title.newest|e}}</a><br />
+done: <input type="checkbox" name="done" {% if todo.is_done %}checked {% endif %} {% if not todo.is_doable %}disabled {% endif %}/><br />
+</p>
+<h4>conditions</h4>
+<table>
+{% for condition in todo.conditions %}
+<tr>
+<td>
+<input type="checkbox" name="condition" value="{{condition.id_}}" checked />
+</td>
+<td>
+<a href="condition?id={{condition.id_}}">{{condition.title.newest|e}}</a>
+</td>
+</tr>
+{% endfor %}
+</table>
+add condition: <input name="condition" list="condition_candidates" autocomplete="off" />
+<datalist id="condition_candidates">
+{% for condition_candidate in condition_candidates %}
+<option value="{{condition_candidate.id_}}">{{condition_candidate.title.newest|e}}</option>
+{% endfor %}
+</datalist>
+<h4>fulfills</h4>
+<table>
+{% for condition in todo.fulfills %}
+<tr>
+<td>
+<input type="checkbox" name="fulfills" value="{{condition.id_}}" checked />
+</td>
+<td>
+<a href="condition?id={{condition.id_}}">{{condition.title.newest|e}}</a>
+</td>
+</tr>
+{% endfor %}
+</table>
+add fulfills: <input name="fulfills" list="condition_candidates" autocomplete="off" />
+<h4>undoes</h4>
+<table>
+{% for condition in todo.undoes%}
+<tr>
+<td>
+<input type="checkbox" name="undoes" value="{{condition.id_}}" checked />
+</td>
+<td>
+<a href="condition?id={{condition.id_}}">{{condition.title.newest|e}}</a>
+</td>
+</tr>
+{% endfor %}
+</table>
+add undoes: <input name="undoes" list="condition_candidates" autocomplete="off" />
+<h4>parents</h4>
+<ul>
+{% for parent in todo.parents %}
+<li><a href="todo?id={{parent.id_}}">{{parent.process.title.newest|e}}</a>
+{% endfor %}
+</ul>
+<h4>children</h4>
+<ul>
+{% for child in todo.children %}
+<li><a href="todo?id={{child.id_}}">{{child.process.title.newest|e}}</a>
+{% endfor %}
+</ul>
+adopt: <input name="adopt" list="todo_candidates" autocomplete="off" />
+<datalist id="todo_candidates">
+{% for candidate in todo_candidates %}
+<option value="{{candidate.id_}}">{{candidate.process.title.newest|e}} ({{candidate.id_}})</option>
+{% endfor %}
+</datalist>
+<input type="submit" value="OK" />
+</form
+{% endblock %}
--- /dev/null
+"""Test Conditions module."""
+from tests.utils import TestCaseWithDB, TestCaseWithServer
+from plomtask.conditions import Condition
+from plomtask.exceptions import NotFoundException
+
+
+class TestsWithDB(TestCaseWithDB):
+ """Tests requiring DB, but not server setup."""
+
+ def test_Condition_by_id(self) -> None:
+ """Test creation and findability."""
+ condition = Condition(None, False)
+ condition.save(self.db_conn)
+ self.assertEqual(Condition.by_id(self.db_conn, 1), condition)
+ with self.assertRaises(NotFoundException):
+ self.assertEqual(Condition.by_id(self.db_conn, 0), condition)
+ with self.assertRaises(NotFoundException):
+ self.assertEqual(Condition.by_id(self.db_conn, 2), condition)
+
+ def test_Condition_all(self) -> None:
+ """Test .all()."""
+ self.assertEqual(Condition.all(self.db_conn), [])
+ condition_1 = Condition(None, False)
+ condition_1.save(self.db_conn)
+ self.assertEqual(Condition.all(self.db_conn), [condition_1])
+ condition_2 = Condition(None, False)
+ condition_2.save(self.db_conn)
+ self.assertEqual(Condition.all(self.db_conn), [condition_1,
+ condition_2])
+
+ def test_Condition_singularity(self) -> None:
+ """Test pointers made for single object keep pointing to it."""
+ condition_1 = Condition(None, False)
+ condition_1.save(self.db_conn)
+ condition_1.is_active = True
+ condition_retrieved = Condition.by_id(self.db_conn, 1)
+ self.assertEqual(True, condition_retrieved.is_active)
+
+
+class TestsWithServer(TestCaseWithServer):
+ """Module tests against our HTTP server/handler (and database)."""
+
+ def test_do_POST_condition(self) -> None:
+ """Test POST /condition and its effect on the database."""
+ form_data = {'title': 'foo', 'description': 'foo'}
+ self.check_post(form_data, '/condition', 302, '/')
+ self.assertEqual(1, len(Condition.all(self.db_conn)))
+
+ def test_do_GET(self) -> None:
+ """Test /condition and /conditions response codes."""
+ self.check_get('/condition', 200)
+ self.check_get('/condition?id=', 200)
+ self.check_get('/condition?id=0', 400)
+ self.check_get('/condition?id=FOO', 400)
+ self.check_get('/conditions', 200)
from typing import Any
from tests.utils import TestCaseWithDB, TestCaseWithServer
from plomtask.processes import Process, ProcessStep
+from plomtask.conditions import Condition
from plomtask.exceptions import NotFoundException, BadFormatException
self.assertEqual(p_2.used_as_step_by(self.db_conn), [p_1])
self.assertEqual(p_3.used_as_step_by(self.db_conn), [p_1, p_2])
+ def test_Process_undoes(self) -> None:
+ """Test setting Process.undoes"""
+ p = Process(None)
+ p.set_undoes(self.db_conn, [])
+ p.set_undoes(self.db_conn, [])
+ self.assertEqual(p.undoes, [])
+ c1 = Condition(None, False)
+ c1.save(self.db_conn)
+ assert c1.id_ is not None
+ p.set_undoes(self.db_conn, [c1.id_])
+ self.assertEqual(p.undoes, [c1])
+ c2 = Condition(None, False)
+ c2.save(self.db_conn)
+ assert c2.id_ is not None
+ p.set_undoes(self.db_conn, [c2.id_])
+ self.assertEqual(p.undoes, [c2])
+ p.set_undoes(self.db_conn, [c1.id_, c2.id_])
+ self.assertEqual(p.undoes, [c1, c2])
+
+ def test_Process_fulfills(self) -> None:
+ """Test setting Process.fulfills"""
+ p = Process(None)
+ p.set_fulfills(self.db_conn, [])
+ p.set_fulfills(self.db_conn, [])
+ self.assertEqual(p.fulfills, [])
+ c1 = Condition(None, False)
+ c1.save(self.db_conn)
+ assert c1.id_ is not None
+ p.set_fulfills(self.db_conn, [c1.id_])
+ self.assertEqual(p.fulfills, [c1])
+ c2 = Condition(None, False)
+ c2.save(self.db_conn)
+ assert c2.id_ is not None
+ p.set_fulfills(self.db_conn, [c2.id_])
+ self.assertEqual(p.fulfills, [c2])
+ p.set_fulfills(self.db_conn, [c1.id_, c2.id_])
+ self.assertEqual(p.fulfills, [c1, c2])
+
+ def test_Process_conditions(self) -> None:
+ """Test setting Process.conditions"""
+ p = Process(None)
+ p.set_conditions(self.db_conn, [])
+ p.set_conditions(self.db_conn, [])
+ self.assertEqual(p.conditions, [])
+ c1 = Condition(None, False)
+ c1.save(self.db_conn)
+ assert c1.id_ is not None
+ p.set_conditions(self.db_conn, [c1.id_])
+ self.assertEqual(p.conditions, [c1])
+ c2 = Condition(None, False)
+ c2.save(self.db_conn)
+ assert c2.id_ is not None
+ p.set_conditions(self.db_conn, [c2.id_])
+ self.assertEqual(p.conditions, [c2])
+ p.set_conditions(self.db_conn, [c1.id_, c2.id_])
+ self.assertEqual(p.conditions, [c1, c2])
+
def test_Process_by_id(self) -> None:
"""Test Process.by_id()."""
with self.assertRaises(NotFoundException):
form_data = {'description': '', 'effort': 1.0}
self.check_post(form_data, '/process?id=', 400)
self.assertEqual(1, len(Process.all(self.db_conn)))
+ form_data = {'title': 'foo', 'description': 'foo', 'effort': 1.0,
+ 'condition': []}
+ self.check_post(form_data, '/process?id=', 302, '/')
+ form_data['condition'] = [1]
+ self.check_post(form_data, '/process?id=', 404)
+ form_data_cond = {'title': 'foo', 'description': 'foo'}
+ self.check_post(form_data_cond, '/condition', 302, '/')
+ self.check_post(form_data, '/process?id=', 302, '/')
+ form_data['undoes'] = [1]
+ self.check_post(form_data, '/process?id=', 302, '/')
+ form_data['fulfills'] = [1]
+ self.check_post(form_data, '/process?id=', 302, '/')
def test_do_GET(self) -> None:
"""Test /process and /processes response codes."""
from plomtask.todos import Todo
from plomtask.days import Day
from plomtask.processes import Process
+from plomtask.conditions import Condition
from plomtask.exceptions import (NotFoundException, BadFormatException,
HandledException)
self.assertEqual(Todo.by_date(self.db_conn, day2.date), [])
self.assertEqual(Todo.by_date(self.db_conn, 'foo'), [])
+ def test_Todo_from_process(self) -> None:
+ """Test spawning of Todo attributes from Process."""
+ day = Day('2024-01-01')
+ process = Process(None)
+ c1 = Condition(None, False)
+ c1.save(self.db_conn)
+ assert c1.id_ is not None
+ c2 = Condition(None, True)
+ c2.save(self.db_conn)
+ assert c2.id_ is not None
+ process.set_conditions(self.db_conn, [c1.id_])
+ todo = Todo(None, process, False, day)
+ self.assertEqual(todo.conditions, [c1])
+ todo.set_conditions(self.db_conn, [c2.id_])
+ self.assertEqual(todo.conditions, [c2])
+ self.assertEqual(process.conditions, [c1])
+ process.set_fulfills(self.db_conn, [c1.id_])
+ todo = Todo(None, process, False, day)
+ self.assertEqual(todo.fulfills, [c1])
+ todo.set_fulfills(self.db_conn, [c2.id_])
+ self.assertEqual(todo.fulfills, [c2])
+ self.assertEqual(process.fulfills, [c1])
+ process.set_undoes(self.db_conn, [c1.id_])
+ todo = Todo(None, process, False, day)
+ self.assertEqual(todo.undoes, [c1])
+ todo.set_undoes(self.db_conn, [c2.id_])
+ self.assertEqual(todo.undoes, [c2])
+ self.assertEqual(process.undoes, [c1])
+
+ def test_Todo_on_conditions(self) -> None:
+ """Test effect of Todos on Conditions."""
+ day = Day('2024-01-01')
+ process = Process(None)
+ process.save_without_steps(self.db_conn)
+ c1 = Condition(None, False)
+ c2 = Condition(None, True)
+ c1.save(self.db_conn)
+ c2.save(self.db_conn)
+ assert c1.id_ is not None
+ assert c2.id_ is not None
+ todo = Todo(None, process, False, day)
+ todo.save(self.db_conn)
+ todo.set_fulfills(self.db_conn, [c1.id_])
+ todo.set_undoes(self.db_conn, [c2.id_])
+ todo.is_done = True
+ self.assertEqual(c1.is_active, True)
+ self.assertEqual(c2.is_active, False)
+ todo.is_done = False
+ self.assertEqual(c1.is_active, True)
+ self.assertEqual(c2.is_active, False)
+
+ def test_Todo_enablers_disablers(self) -> None:
+ """Test Todo.enablers_for_at/disablers_for_at."""
+ day1 = Day('2024-01-01')
+ day2 = Day('2024-01-02')
+ process = Process(None)
+ process.save_without_steps(self.db_conn)
+ c1 = Condition(None, False)
+ c2 = Condition(None, True)
+ c1.save(self.db_conn)
+ c2.save(self.db_conn)
+ todo1 = Todo(None, process, False, day1)
+ todo1.save(self.db_conn)
+ assert c1.id_ is not None
+ assert c2.id_ is not None
+ todo1.set_fulfills(self.db_conn, [c1.id_])
+ todo1.set_undoes(self.db_conn, [c2.id_])
+ todo1.save(self.db_conn)
+ assert todo1.id_ is not None
+ todo2 = Todo(None, process, False, day1)
+ todo2.save(self.db_conn)
+ assert todo2.id_ is not None
+ todo2.set_fulfills(self.db_conn, [c2.id_])
+ todo2.save(self.db_conn)
+ todo3 = Todo(None, process, False, day2)
+ todo3.save(self.db_conn)
+ assert todo3.id_ is not None
+ todo3.set_fulfills(self.db_conn, [c2.id_])
+ todo3.save(self.db_conn)
+ self.assertEqual(Todo.enablers_for_at(self.db_conn, c1, day1.date),
+ [todo1])
+ self.assertEqual(Todo.enablers_for_at(self.db_conn, c1, day2.date),
+ [])
+ self.assertEqual(Todo.disablers_for_at(self.db_conn, c1, day1.date),
+ [])
+ self.assertEqual(Todo.disablers_for_at(self.db_conn, c1, day2.date),
+ [])
+ self.assertEqual(Todo.enablers_for_at(self.db_conn, c2, day1.date),
+ [todo2])
+ self.assertEqual(Todo.enablers_for_at(self.db_conn, c2, day2.date),
+ [todo3])
+ self.assertEqual(Todo.disablers_for_at(self.db_conn, c2, day1.date),
+ [todo1])
+ self.assertEqual(Todo.disablers_for_at(self.db_conn, c2, day2.date),
+ [])
+
def test_Todo_children(self) -> None:
"""Test Todo.children relations."""
day = Day('2024-01-01')
with self.assertRaises(BadFormatException):
todo_2.add_child(todo_1)
+ def test_Todo_conditioning(self) -> None:
+ """Test Todo.doability conditions."""
+ day = Day('2024-01-01')
+ process = Process(None)
+ process.save_without_steps(self.db_conn)
+ todo_1 = Todo(None, process, False, day)
+ todo_1.save(self.db_conn)
+ todo_2 = Todo(None, process, False, day)
+ todo_2.save(self.db_conn)
+ todo_2.add_child(todo_1)
+ with self.assertRaises(BadFormatException):
+ todo_2.is_done = True
+ todo_1.is_done = True
+ todo_2.is_done = True
+ todo_2.is_done = False
+ condition = Condition(None)
+ condition.save(self.db_conn)
+ assert condition.id_ is not None
+ todo_2.set_conditions(self.db_conn, [condition.id_])
+ with self.assertRaises(BadFormatException):
+ todo_2.is_done = True
+ condition.is_active = True
+ todo_2.is_done = True
+
def test_Todo_singularity(self) -> None:
"""Test pointers made for single object keep pointing to it."""
day = Day('2024-01-01')